首先明确概念,什么是MQTT?
MQTT是一种轻量级、基于发布 / 订阅(Publish/Subscribe)模式的物联网(IoT)通信协议,在带宽有限、网络不稳定的环境下,实现低功耗、低延迟的设备间通信,是物联网领域的核心通信协议之一(被 OASIS 标准化,广泛支持于 AWS IoT、Azure IoT、阿里云 IoT 等平台)。
核心设计目标
MQTT 的所有特性均围绕 “适配物联网场景” 展开,核心目标可概括为 4 点:
轻量级
协议头部极小(固定头部仅 2 字节,可变头部和负载按需添加),设备侧客户端代码体积可压缩至几十 KB,适配单片机、传感器等资源受限设备(如 Arduino、ESP8266)。
低带宽 / 低功耗
减少数据传输量(无冗余字段),支持 “休眠 - 唤醒” 模式,适合 2G/4G/NB-IoT 等窄带或按流量计费的网络。
高可靠性
通过 “服务质量(QoS)” 机制保障消息不丢失、不重复,应对物联网常见的 “网络波动、设备离线” 场景。
解耦通信
基于 “发布 / 订阅” 模式,设备(发布者)和平台(订阅者)无需直接交互,甚至无需知道对方的 IP / 端口,降低系统耦合度。
核心架构:发布 / 订阅模式
MQTT 不采用 “点对点(P2P)” 直接通信,而是通过中间代理(Broker) 实现消息转发,架构包含 3 个核心角色
角色 | 功能 | 典型示例 |
---|---|---|
发布者(Publisher) | 发送消息的设备 / 应用,无需知道谁会接收消息,仅需指定消息的 “主题(Topic)”。 | 温湿度传感器、智能门锁 |
订阅者(Subscriber) | 接收消息的设备 / 应用,需提前 “订阅” 感兴趣的 “主题(Topic)”,仅接收对应消息。 | 物联网平台、手机 APP、PLC |
代理(Broker) | 核心中间件,负责接收发布者的消息,根据 “主题” 匹配订阅者,并将消息转发给订阅者;同时管理设备连接、会话、QoS 等逻辑。 | EMQX、Mosquitto、AWS IoT Core |
关键概念:主题(Topic)
“主题” 是 MQTT 消息的 “分类标签”,类似文件系统的路径(用/分隔层级),用于实现消息的精准路由。例如:
传感器主题:sensor/room1/temperature(1 号房间温度传感器)、sensor/room2/humidity(2 号房间湿度传感器);
设备控制主题:device/light/room1/control(控制 1 号房间灯光)。
这里借助的MQTT的插件是MQTTnet+拓展MQTTnet.Extensions.ManagedClient,这个拓展提供了一个托管的 MQTT 客户端实现,简化了 MQTT 客户端的使用并增强了可靠性。它自动处理连接管理、重连逻辑、消息队列等功能,非常适合需要稳定 MQTT 通信的应用程序。
https://download.csdn.net/download/weixin_44347839/91777855
using UnityEngine;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;
using MQTTnet.Protocol;
using MQTTnet.Packets;
using System.Linq;
using MyFrameworkPro;
public class MQTTClient : MonoSingleton<MQTTClient>
{[Header("MQTT Broker Settings")][Tooltip("MQTT服务器地址")]public string brokerAddress = "test.mosquitto.org";[Tooltip("MQTT服务器端口")]public int brokerPort = 1883;[Tooltip("客户端ID,为空则自动生成")]public string clientId = "";[Tooltip("用户名,无则留空")]public string username = "";[Tooltip("密码,无则留空")]public string password = "";[Header("Connection Settings")][Tooltip("是否使用TLS加密连接")]public bool useTls = false;[Tooltip("自动重连间隔(秒)")]public int reconnectInterval = 5;[Tooltip("心跳包间隔(秒)")]private int KeepAlive = 60;// 管理型MQTT客户端private IManagedMqttClient _mqttClient;// 连接状态public bool IsConnected => _mqttClient?.IsConnected ?? false;// 消息队列,用于在主线程处理接收到的消息private readonly Queue<MqttApplicationMessageReceivedEventArgs> _messageQueue =new Queue<MqttApplicationMessageReceivedEventArgs>();// 事件定义public event Action OnConnected;public event Action OnDisconnected;public event Action<string> OnLog;public event Action<string, string> OnMessageReceived; // 主题, 消息内容private void Awake(){// 确保场景中只存在一个MQTT客户端实例if (FindObjectsOfType<MQTTClient>().Length > 1){Destroy(gameObject);}else{DontDestroyOnLoad(gameObject);}}private void Start(){// 初始化客户端InitializeClient();}private void Update(){// 在主线程处理消息队列ProcessMessageQueue();}protected override void OnDestroy(){base.OnDestroy();// 断开连接并清理资源DisconnectAsync().ConfigureAwait(false);_mqttClient?.Dispose();}/// <summary>/// 初始化MQTT客户端/// </summary>public void InitializeClient(){if (_mqttClient != null){_mqttClient.Dispose();}clientId = string.IsNullOrEmpty(clientId) ? Guid.NewGuid().ToString() : clientId;//配置连接参数Options = new MqttClientOptionsBuilder().WithTcpServer(brokerAddress, brokerPort).WithClientId(clientId).WithCredentials(username, password).WithCleanSession().WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepAlive)).Build();//配置重连机制ManagedOptions = new ManagedMqttClientOptionsBuilder().WithAutoReconnectDelay(TimeSpan.FromSeconds(5)).WithClientOptions(Options).Build();// 创建管理型客户端_mqttClient = new MqttFactory().CreateManagedMqttClient();// 注册事件_mqttClient.ConnectedAsync += OnConnectedAsync;_mqttClient.DisconnectedAsync += OnDisconnectedAsync;_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;_mqttClient.ConnectingFailedAsync += OnConnectingFailedAsync;}private MqttClientOptions Options;private ManagedMqttClientOptions ManagedOptions;/// <summary>/// 连接到MQTT服务器/// </summary>public async Task ConnectAsync(){if (_mqttClient == null){InitializeClient();}if (!_mqttClient.IsConnected){Log("连接到MQTT服务器: " + brokerAddress + ":" + brokerPort);await _mqttClient.StartAsync(ManagedOptions);}else{Log("已经连接到MQTT服务器");}}/// <summary>/// 从MQTT服务器断开连接/// </summary>public async Task DisconnectAsync(){if (_mqttClient != null && _mqttClient.IsConnected){Log("正在断开与MQTT服务器的连接");await _mqttClient.StopAsync();}}/// <summary>/// 发布消息到指定主题/// </summary>/// <param name="topic">主题</param>/// <param name="payload">消息内容</param>/// <param name="qosLevel">服务质量等级</param>/// <param name="retain">是否保留消息</param>public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false){if (!IsConnected){Log("无法发布消息 - 未连接到服务器");return;}if (string.IsNullOrEmpty(topic)){Log("主题不能为空");return;}try{var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).WithQualityOfServiceLevel(qosLevel).WithRetainFlag(retain).Build();await _mqttClient.EnqueueAsync(message);Log($"已发布消息到主题: {topic} - 内容: {payload}");}catch (Exception ex){Log($"发布消息失败: {ex.Message}");}}/// <summary>/// 订阅主题/// </summary>/// <param name="topic">要订阅的主题</param>/// <param name="qosLevel">服务质量等级</param>public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce){if (!IsConnected){Log("无法订阅主题 - 未连接到服务器");return;}if (string.IsNullOrEmpty(topic)){Log("订阅的主题不能为空");return;}try{var topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qosLevel).Build();// 将单个主题过滤器包装在列表中await _mqttClient.SubscribeAsync(new List<MqttTopicFilter> { topicFilter });Log($"已订阅主题: {topic}");}catch (Exception ex){Log($"订阅主题失败: {ex.Message}");}}/// <summary>/// 取消订阅主题/// </summary>/// <param name="topic">要取消订阅的主题</param>public async Task UnsubscribeAsync(string topic){if (!IsConnected){Log("无法取消订阅 - 未连接到服务器");return;}if (string.IsNullOrEmpty(topic)){Log("取消订阅的主题不能为空");return;}try{await _mqttClient.UnsubscribeAsync(topic);Log($"已取消订阅主题: {topic}");}catch (Exception ex){Log($"取消订阅失败: {ex.Message}");}}/// <summary>/// 订阅多个主题/// </summary>public async Task SubscribeMultipleAsync(Dictionary<string, MqttQualityOfServiceLevel> topicsWithQos){if (!IsConnected){Log("无法订阅主题 - 未连接到服务器");return;}if (topicsWithQos == null || topicsWithQos.Count == 0){Log("订阅的主题列表不能为空");return;}try{var topicFilters = new List<MqttTopicFilter>();foreach (var topic in topicsWithQos){var topicFilter = new MqttTopicFilterBuilder().WithTopic(topic.Key).WithQualityOfServiceLevel(topic.Value).Build();topicFilters.Add(topicFilter);}await _mqttClient.SubscribeAsync(topicFilters);Log($"已订阅 {topicFilters.Count} 个主题");}catch (Exception ex){Log($"批量订阅主题失败: {ex.Message}");}}/// <summary>/// 取消多个订阅/// </summary>/// <param name="topics"></param>/// <returns></returns>public async Task UnsubscribeMultipleAsync(IEnumerable<string> topics){if (!IsConnected){Log("无法取消订阅 - 未连接到服务器");return;}if (topics == null){Log("取消订阅的主题列表不能为null");return;}try{var topicList = topics.ToList();if (topicList.Count == 0){Log("取消订阅的主题列表不能为空");return;}await _mqttClient.UnsubscribeAsync(topicList);Log($"已取消订阅 {topicList.Count} 个主题");}catch (Exception ex){Log($"批量取消订阅失败: {ex.Message}");}}/// <summary>/// 处理接收到的消息队列(在主线程中执行)/// </summary>private void ProcessMessageQueue(){while (_messageQueue.Count > 0){var messageArgs = _messageQueue.Dequeue();string topic = messageArgs.ApplicationMessage.Topic;string payload = Encoding.UTF8.GetString(messageArgs.ApplicationMessage.PayloadSegment.ToArray());Log($"收到消息 - 主题: {topic}, 内容: {payload}");// 触发消息接收事件OnMessageReceived?.Invoke(topic, payload);}}/// <summary>/// 连接成功事件/// </summary>private Task OnConnectedAsync(MqttClientConnectedEventArgs args){Log("已连接到MQTT服务器");OnConnected?.Invoke();return Task.CompletedTask;}/// <summary>/// 断开连接事件/// </summary>private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs args){Log("已断开连接");OnDisconnected?.Invoke();return Task.CompletedTask;}/// <summary>/// 接收消息事件(在后台线程)/// </summary>private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){// 将消息加入队列,在主线程处理lock (_messageQueue){_messageQueue.Enqueue(args);}return Task.CompletedTask;}/// <summary>/// 连接失败事件/// </summary>private Task OnConnectingFailedAsync(ConnectingFailedEventArgs args){Log($"连接失败: {args.Exception.Message},将在 {reconnectInterval} 秒后重试");return Task.CompletedTask;}/// <summary>/// 日志输出/// </summary>private void Log(string message){Debug.Log($"[MQTTClient] {message}");OnLog?.Invoke(message);}
}
使用案例
using UnityEngine;
using MQTTnet.Protocol;
using System;public class MQTTTest : MonoBehaviour
{public MQTTClient mqttClient;public string testTopic = "sensor/room1/temperature";private void Start(){if (mqttClient != null){mqttClient.OnConnected += OnMqttConnected;mqttClient.OnDisconnected += OnMqttDisconnected;mqttClient.OnLog += OnMqttLog;mqttClient.OnMessageReceived += OnMqttMessageReceived;}else{Debug.LogError("未找到MQTTClient组件");}}[ContextMenu("connect")]// 连接public async void ConnectToBroker(){if (mqttClient != null && !mqttClient.IsConnected){await mqttClient.ConnectAsync();}}[ContextMenu("disconnect")]// 断开连接public async void DisconnectFromBroker(){if (mqttClient != null && mqttClient.IsConnected){await mqttClient.DisconnectAsync();}}[ContextMenu("subscribe")]// 订阅主题public async void SubscribeToTestTopic(){if (mqttClient != null && mqttClient.IsConnected){await mqttClient.SubscribeAsync(testTopic, MqttQualityOfServiceLevel.AtLeastOnce);}else{Debug.LogWarning("请先连接到MQTT服务器");}}[ContextMenu("publish")]// 发布消息public async void PublishTestMessage(){if (mqttClient != null && mqttClient.IsConnected){string message = $"Unity测试消息: {DateTime.Now:yyyy-MM-dd HH:mm:ss}";await mqttClient.PublishAsync(testTopic,message,MqttQualityOfServiceLevel.AtLeastOnce,false);}else{Debug.LogWarning("请先连接到MQTT服务器");}}[ContextMenu("unsubscribe")]// 取消订阅public async void UnsubscribeFromTestTopic(){if (mqttClient != null && mqttClient.IsConnected){await mqttClient.UnsubscribeAsync(testTopic);}else{Debug.LogWarning("请先连接到MQTT服务器");}}// MQTT连接成功回调private void OnMqttConnected(){Debug.Log("MQTT连接成功,准备订阅主题...");// 连接成功后可以自动订阅必要的主题//SubscribeToTestTopic();}// MQTT断开连接回调private void OnMqttDisconnected(){Debug.Log("MQTT已断开连接");}// MQTT日志回调private void OnMqttLog(string message){// 在这里处理日志}// 收到MQTT消息回调private void OnMqttMessageReceived(string topic, string payload){Debug.Log($"收到来自主题 {topic} 的消息: {payload}");// TODO:在这里处理业务逻辑}private void OnDestroy(){// 移除事件监听,避免内存泄漏if (mqttClient != null){mqttClient.OnConnected -= OnMqttConnected;mqttClient.OnDisconnected -= OnMqttDisconnected;mqttClient.OnLog -= OnMqttLog;mqttClient.OnMessageReceived -= OnMqttMessageReceived;}}
}