一个高性能的 .NET MQTT 客户端与服务器库
扮演着至关重要的角色。今天我要为大家介绍一个完全使用 C# 实现的高性能 MQTT 库这个库不仅提供了完整的 MQTT 客户端实现还包含了一个功能齐全的 Broker 服务器支持桥接、集群等企业级特性。核心特性协议支持MQTT 3.1.1- 完整支持MQTT 5.0- 完整支持包括用户属性、消息过期、主题别名等新特性MQTT-SN- 基于 UDP 的轻量级 MQTT 变体适合受限设备CoAP- 约束应用协议网关支持性能特性高性能异步实现零不必要的内存分配缓冲区池技术支持 10000 并发连接企业级功能Broker 桥接多 Broker 消息同步集群支持去中心化 P2P 架构灵活的认证与授权机制TLS/SSL 加密传输持久会话与离线消息存储框架支持.NET 6.0.NET 8.0.NET 10.0技术实现本项目采用了大量现代 .NET 高性能技术下面详细介绍核心技术点。内存管理技术SpanT 和 MemoryT - 零拷贝处理项目使用ref struct实现的二进制读写器完全在栈上分配避免堆内存压力// 零拷贝的二进制读取器 public ref struct MqttBinaryReader { private readonly ReadOnlySpanbyte _buffer; private int _position; // 零拷贝切片操作 [MethodImpl(MethodImplOptions.AggressiveInlining)] public ReadOnlySpanbyte ReadBytes(int count) { var span _buffer.Slice(_position, count); _position count; return span; } }技术优势ref struct只能在栈上分配无 GC 压力ReadOnlySpanbyte支持零拷贝切片避免大量字节数组复制操作ArrayPoolT - 缓冲区复用使用共享内存池减少频繁的内存分配// 从共享池租借缓冲区 var buffer ArrayPoolbyte.Shared.Rent(1024); try { await stream.ReadAsync(buffer.AsMemory(0, length), cancellationToken); // 处理数据... } finally { ArrayPoolbyte.Shared.Return(buffer); // 归还缓冲区 }stackalloc - 小缓冲区栈分配对于小型临时缓冲区直接在栈上分配// 4 字节的可变长度编码缓冲区栈分配 Spanbyte remainingLengthBytes stackalloc byte[4]; var size EncodeRemainingLength(length, remainingLengthBytes);异步编程模型async/await ConfigureAwait所有 IO 操作均采用异步模式并使用ConfigureAwait(false)优化public async TaskMqttConnectResult ConnectAsync(CancellationToken cancellationToken default) { // 建立 TCP 连接 await _tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false); // TLS 握手 if (Options.UseTls) { await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken).ConfigureAwait(false); } // 发送 CONNECT 报文 await SendPacketAsync(connectPacket, cancellationToken).ConfigureAwait(false); }ChannelT - 高性能事件队列Broker 使用有界通道实现非阻塞的事件分发public sealed class MqttBrokerEventDispatcher { private readonly ChannelBrokerEvent _eventChannel; public MqttBrokerEventDispatcher(int capacity 10000) { // 有界通道队列满时丢弃最旧事件 _eventChannel Channel.CreateBoundedBrokerEvent(new BoundedChannelOptions(capacity) { FullMode BoundedChannelFullMode.DropOldest, SingleReader true, SingleWriter false }); } // 非阻塞事件发送 public void DispatchTEventArgs(BrokerEventType type, TEventArgs args, EventHandlerTEventArgs? handler) { _eventChannel.Writer.TryWrite(new BrokerEvent(type, args, handler)); } }TaskCompletionSource - 请求/响应模式实现 QoS 1/2 的确认等待机制private readonly Dictionaryushort, TaskCompletionSourceobject? _pendingPackets new(); private async Taskobject? WaitForPacketAsync(ushort packetId, CancellationToken cancellationToken) { var tcs new TaskCompletionSourceobject?(TaskCreationOptions.RunContinuationsAsynchronously); _pendingPackets[packetId] tcs; using var cts CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(TimeSpan.FromSeconds(30)); // 30 秒超时 using var registration cts.Token.Register(() tcs.TrySetCanceled()); return await tcs.Task.ConfigureAwait(false); }SemaphoreSlim - 发送同步确保报文发送的串行化private readonly SemaphoreSlim _sendLock new(1, 1); private async Task SendPacketBytesAsync(byte[] packet, CancellationToken cancellationToken) { await _sendLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { await _stream.WriteAsync(packet.AsMemory(), cancellationToken).ConfigureAwait(false); await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); } finally { _sendLock.Release(); } }编译器优化MethodImpl 特性针对不同场景使用合适的编译器优化指令// 强制内联 - 用于频繁调用的短方法 [MethodImpl(MethodImplOptions.AggressiveInlining)] public ushort ReadUInt16() { var value (ushort)((_buffer[_position] 8) | _buffer[_position 1]); _position 2; return value; } // 最积极的优化 - 用于热路径 [MethodImpl(MethodImplOptions.AggressiveOptimization)] private static async Taskint DecodeRemainingLengthAsync(Stream stream, CancellationToken ct) { // 可变长度解码实现... } // 禁止内联 - 避免异常处理代码膨胀热路径 [MethodImpl(MethodImplOptions.NoInlining)] private void ThrowIfDisposed() { if (_disposed) throw new ObjectDisposedException(nameof(MqttClient)); }网络编程多层传输抽象支持 TCP、UDP 等多种传输方式public interface ITransportConnection : IAsyncDisposable { string ConnectionId { get; } TransportType TransportType { get; } EndPoint? RemoteEndPoint { get; } bool IsConnected { get; } ValueTaskint ReadAsync(Memorybyte buffer, CancellationToken cancellationToken default); ValueTask WriteAsync(ReadOnlyMemorybyte buffer, CancellationToken cancellationToken default); ValueTask FlushAsync(CancellationToken cancellationToken default); }TLS/SSL 支持使用 SslStream 实现加密传输支持 TLS 1.2 和 TLS 1.3var sslOptions new SslClientAuthenticationOptions { TargetHost Options.Host, EnabledSslProtocols SslProtocols.Tls12 | SslProtocols.Tls13, ClientCertificates Options.ClientCertificate ! null ? new X509CertificateCollection { Options.ClientCertificate } : null }; await sslStream.AuthenticateAsClientAsync(sslOptions, cancellationToken);协议序列化工厂模式 延迟初始化协议处理器采用单例 延迟初始化模式public static class MqttProtocolHandlerFactory { private static readonly LazyIMqttProtocolHandler _v311Handler new(() new V311ProtocolHandler()); private static readonly LazyIMqttProtocolHandler _v500Handler new(() new V500ProtocolHandler()); public static IMqttProtocolHandler GetHandler(MqttProtocolVersion version) { return version switch { MqttProtocolVersion.V311 _v311Handler.Value, MqttProtocolVersion.V500 _v500Handler.Value, _ throw new NotSupportedException() }; } }可变长度整数编码MQTT 协议特有的可变长度编码1-4 字节可表示 0 到 268,435,455public uint ReadVariableByteInteger() { uint value 0; int multiplier 1; byte encodedByte; do { encodedByte _buffer[_position]; value (uint)((encodedByte 0x7F) * multiplier); multiplier * 128; } while ((encodedByte 0x80) ! 0); return value; }并发数据结构ConcurrentDictionary - 线程安全集合用于管理客户端会话和订阅private readonly ConcurrentDictionarystring, MqttClientSession _sessions new(); private readonly ConcurrentDictionarystring, MqttApplicationMessage _retainedMessages new(); public int ConnectedClients _sessions.Count; public IEnumerableMqttClientSession Sessions _sessions.Values;设计模式应用模式应用场景示例工厂模式协议处理器创建MqttProtocolHandlerFactory策略模式不同协议版本实现V311ProtocolHandler/V500ProtocolHandler建造者模式报文构建IPublishPacketBuilder/IConnectPacketBuilder观察者模式事件系统MessageReceived/ClientConnected装饰器模式传输层 TLSSslStream装饰NetworkStream单例模式协议处理器缓存全局共享的处理器实例技术栈总结类别技术作用内存管理SpanT,MemoryT,ref struct,ArrayPoolT,stackalloc零拷贝、栈分配、缓冲区复用异步编程async/await,ChannelT,TaskCompletionSource,SemaphoreSlim高效并发、非阻塞事件处理编译优化AggressiveInlining,AggressiveOptimization,NoInliningJIT 编译器优化提示网络层TcpClient,TcpListener,SslStream, 传输抽象多协议支持、安全传输并发集合ConcurrentDictionary,ConcurrentQueue线程安全的数据结构序列化自定义二进制读写器、可变长度编码高效的协议解析性能优化建议客户端优化选择合适的 QoS大多数场景 QoS 1 就足够了QoS 2 开销较大批量发送如果有大量消息考虑合并后发送合理设置 KeepAlive根据网络环境调整一般 60 秒即可使用持久会话如果需要接收离线消息设置CleanSession falseBroker 优化调整最大连接数根据服务器性能设置MaxConnections限制消息大小设置MaxMessageSize防止恶意大消息离线消息限制设置MaxOfflineMessagesPerClient防止内存溢出使用集群高可用场景使用集群部署客户端使用指南基础连接using System.Net.MQTT; // 配置客户端选项 var options new MqttClientOptions { Host localhost, Port 1883, ClientId my-iot-device, CleanSession true }; // 创建客户端 using var client new MqttClient(options); // 连接到 Broker var result await client.ConnectAsync(); if (result.IsSuccess) { Console.WriteLine(连接成功); }订阅主题// 订阅单个主题 await client.SubscribeAsync(sensors/temperature, MqttQualityOfService.AtLeastOnce); // 使用通配符订阅多个主题 await client.SubscribeAsync(sensors/#, MqttQualityOfService.AtLeastOnce); // 多级通配符 await client.SubscribeAsync(sensors//status, MqttQualityOfService.AtMostOnce); // 单级通配符接收消息client.MessageReceived (sender, e) { Console.WriteLine($收到消息:); Console.WriteLine($ 主题: {e.Message.Topic}); Console.WriteLine($ 内容: {e.Message.PayloadAsString}); Console.WriteLine($ QoS: {e.Message.QualityOfService}); };发布消息// 简单发布 await client.PublishAsync(sensors/temperature, 25.5); // 指定 QoS 发布 await client.PublishAsync(sensors/humidity, 60%, MqttQualityOfService.AtLeastOnce); // 发布保留消息 await client.PublishAsync(device/status, online, MqttQualityOfService.AtLeastOnce, retain: true); // 使用完整的消息对象 var message MqttApplicationMessage.Create( topic: sensors/data, payload: {\temp\: 25.5, \humidity\: 60}, qos: MqttQualityOfService.ExactlyOnce, retain: false ); await client.PublishAsync(message);遗嘱消息Last Will遗嘱消息会在客户端异常断开时自动发布var options new MqttClientOptions { Host localhost, ClientId my-device, WillMessage MqttApplicationMessage.Create( topic: devices/my-device/status, payload: offline, qos: MqttQualityOfService.AtLeastOnce, retain: true ) };TLS 加密连接var options new MqttClientOptions { Host secure-broker.example.com, Port 8883, UseTls true, // 可选客户端证书 ClientCertificate new X509Certificate2(client.pfx, password) };自动重连var options new MqttClientOptions { Host localhost, AutoReconnect true, ReconnectDelayMs 5000 // 5秒后重连 }; client.Connected (s, e) Console.WriteLine(已连接); client.Disconnected (s, e) Console.WriteLine(连接断开正在重连...);完整客户端示例using System.Net.MQTT; var options new MqttClientOptions { Host localhost, Port 1883, ClientId $client-{Guid.NewGuid():N}, Username user, Password password, CleanSession true, KeepAliveSeconds 60, AutoReconnect true }; using var client new MqttClient(options); // 设置事件处理 client.Connected (s, e) Console.WriteLine([事件] 已连接到 Broker); client.Disconnected (s, e) Console.WriteLine([事件] 连接已断开); client.MessageReceived (s, e) { Console.WriteLine($[消息] {e.Message.Topic}: {e.Message.PayloadAsString}); }; // 连接 var result await client.ConnectAsync(); if (!result.IsSuccess) { Console.WriteLine($连接失败: {result.ReasonCode}); return; } // 订阅 await client.SubscribeAsync(test/#, MqttQualityOfService.AtLeastOnce); // 发布测试消息 for (int i 0; i 10; i) { await client.PublishAsync(test/counter, i.ToString()); await Task.Delay(1000); } // 断开连接 await client.DisconnectAsync();服务器Broker使用指南启动基础 Brokerusing System.Net.MQTT.Broker; var options new MqttBrokerOptions { Port 1883, AllowAnonymous true, EnableRetainedMessages true, MaxConnections 10000 }; using var broker new MqttBroker(options); // 启动服务器 await broker.StartAsync(); Console.WriteLine(MQTT Broker 已启动监听端口 1883); // 保持运行 await Task.Delay(Timeout.Infinite); // 停止服务器 await broker.StopAsync();配置认证// 使用简单认证器 broker.Authenticator new SimpleAuthenticator() .AddUser(admin, admin123) .AddUser(device1, device1pass) .AddUser(device2, device2pass); var options new MqttBrokerOptions { Port 1883, AllowAnonymous false // 禁用匿名访问 };自定义认证器public class MyAuthenticator : IMqttAuthenticator { public TaskMqttAuthenticationResult AuthenticateAsync( MqttAuthenticationContext context, CancellationToken cancellationToken) { // 从数据库验证用户 if (ValidateFromDatabase(context.Username, context.Password)) { return Task.FromResult(MqttAuthenticationResult.Success()); } return Task.FromResult(MqttAuthenticationResult.Failure( MqttConnectReasonCode.BadUserNameOrPassword)); } } broker.Authenticator new MyAuthenticator();Broker 事件处理// 客户端连接事件 broker.ClientConnected (s, e) { Console.WriteLine($[连接] 客户端 {e.Session.ClientId} 已连接); Console.WriteLine($ 地址: {e.Session.RemoteEndpoint}); Console.WriteLine($ 当前连接数: {broker.ConnectedClients}); }; // 客户端断开事件 broker.ClientDisconnected (s, e) { Console.WriteLine($[断开] 客户端 {e.Session.ClientId} 已断开); }; // 客户端订阅事件 broker.ClientSubscribed (s, e) { Console.WriteLine($[订阅] {e.Session.ClientId} 订阅了 {e.TopicFilter}); }; // 消息发布事件 broker.MessagePublished (s, e) { Console.WriteLine($[消息] {e.Message.Topic}: {e.Message.PayloadAsString}); Console.WriteLine($ 来自: {e.SourceClientId}); }; // 消息发布前拦截可以阻止消息发布 broker.MessagePublishing (s, e) { // 检查敏感主题 if (e.Message.Topic.StartsWith(admin/) e.SourceClientId ! admin) { e.Cancel true; // 阻止非管理员发布到 admin 主题 } };TLS 配置var options new MqttBrokerOptions { // 普通端口 Port 1883, // TLS 端口 UseTls true, TlsPort 8883, ServerCertificate new X509Certificate2(server.pfx, password), RequireClientCertificate false };高级功能Broker 桥接桥接功能允许将多个 Broker 连接起来实现消息的跨 Broker 同步。var broker new MqttBroker(new MqttBrokerOptions { Port 2883 }); // 添加桥接到父 Broker var bridge broker.AddBridge(new MqttBridgeOptions { Name parent-bridge, RemoteHost parent-broker.example.com, RemotePort 1883, ClientId bridge-client-1, // 上行规则本地消息 - 远程 Broker UpstreamRules { new MqttBridgeRule { LocalTopicFilter sensor/#, Enabled true }, new MqttBridgeRule { LocalTopicFilter device//data, Enabled true } }, // 下行规则远程消息 - 本地 DownstreamRules { new MqttBridgeRule { LocalTopicFilter commands/#, Enabled true }, new MqttBridgeRule { LocalTopicFilter config/#, Enabled true } } }); // 桥接事件 bridge.Connected (s, e) Console.WriteLine(桥接已连接); bridge.MessageForwarded (s, e) { var direction e.Direction BridgeDirection.Upstream ? 上行 : 下行; Console.WriteLine($[桥接-{direction}] {e.OriginalTopic}); }; // 获取统计信息 var stats bridge.GetStatistics(); Console.WriteLine($上行消息: {stats.UpstreamMessageCount}); Console.WriteLine($下行消息: {stats.DownstreamMessageCount});集群部署集群功能实现了去中心化的 P2P 架构任何节点都可以独立运行支持自动故障检测和恢复。var broker new MqttBroker(new MqttBrokerOptions { Port 1883 }); // 启用集群 broker.EnableCluster(new MqttClusterOptions { NodeId node-1, ClusterName my-cluster, ClusterPort 11883, SeedNodes new Liststring { node2.example.com:11883, node3.example.com:11883 }, HeartbeatIntervalMs 5000, NodeTimeoutMs 15000, EnableDeduplication true // 防止消息重复 }); // 集群事件 broker.Cluster!.PeerJoined (s, e) Console.WriteLine($节点加入: {e.Peer.NodeId}); broker.Cluster!.PeerLeft (s, e) Console.WriteLine($节点离开: {e.Peer.NodeId}); broker.Cluster!.MessageForwarded (s, e) Console.WriteLine($消息转发: {e.Topic}); await broker.StartAsync();MQTT-SN 网关MQTT-SN 是基于 UDP 的轻量级协议适合资源受限的嵌入式设备var options new MqttBrokerOptions { Port 1883, EnableMqttSn true, MqttSnPort 1885 };CoAP 网关CoAP 网关允许 CoAP 设备与 MQTT 生态系统互通var options new MqttBrokerOptions { Port 1883, EnableCoAP true, CoapPort 5683, CoapMqttPrefix mqtt }; // CoAP 客户端可以通过以下方式访问 MQTT 主题 // GET coap://broker:5683/mqtt/sensors/temperature // PUT coap://broker:5683/mqtt/sensors/temperature (发布消息)QoS 服务质量MQTT 定义了三种服务质量级别QoS名称说明适用场景0At Most Once最多一次不保证送达传感器数据丢失可接受1At Least Once至少一次可能重复重要数据可处理重复2Exactly Once恰好一次保证送达且不重复计费、订单等关键数据// QoS 0 - 最多一次 await client.PublishAsync(sensor/temp, 25, MqttQualityOfService.AtMostOnce); // QoS 1 - 至少一次 await client.PublishAsync(alert/fire, detected, MqttQualityOfService.AtLeastOnce); // QoS 2 - 恰好一次 await client.PublishAsync(order/create, orderJson, MqttQualityOfService.ExactlyOnce);主题通配符通配符说明示例匹配单个层级sensor//temp匹配sensor/room1/temp#匹配多个层级sensor/#匹配sensor/room1/temp和sensor/room1/humidity// 订阅所有房间的温度 await client.SubscribeAsync(sensor//temperature, MqttQualityOfService.AtLeastOnce); // 订阅所有传感器数据 await client.SubscribeAsync(sensor/#, MqttQualityOfService.AtLeastOnce);MQTT 5.0 新特性如果你使用 MQTT 5.0 协议可以利用以下新特性var options new MqttClientOptions { Host localhost, ProtocolVersion MqttProtocolVersion.V500 // 使用 MQTT 5.0 }; // 创建带有 MQTT 5.0 属性的消息 var message MqttApplicationMessage.CreateWithProperties( topic: request/data, payload: {\query\: \temperature\}, qos: MqttQualityOfService.AtLeastOnce, retain: false, // MQTT 5.0 特有属性 responseTopic: response/client1, // 响应主题 correlationData: Encoding.UTF8.GetBytes(req-123), // 关联数据 messageExpiryInterval: 60, // 消息60秒后过期 contentType: application/json, // 内容类型 userProperties: new ListMqttUserProperty // 用户自定义属性 { new MqttUserProperty(version, 1.0), new MqttUserProperty(source, sensor-hub) } ); await client.PublishAsync(message);总结是一个功能完整、性能优秀的 .NET MQTT 库具有以下优势完整的协议支持MQTT 3.1.1、MQTT 5.0、MQTT-SN、CoAP 全覆盖高性能设计异步 IO、零分配、缓冲区池企业级特性桥接、集群、认证授权易于使用简洁的 API 设计丰富的示例代码现代化支持最新的 .NET 版本