Unity MQTT通讯

首先明确概念,什么是MQTT?

MQTT是一种轻量级、基于发布 / 订阅(Publish/Subscribe)模式的物联网(IoT)通信协议,在带宽有限、网络不稳定的环境下,实现低功耗、低延迟的设备间通信,是物联网领域的核心通信协议之一(被 OASIS 标准化,广泛支持于 AWS IoT、Azure IoT、阿里云 IoT 等平台)。

核心设计目标

MQTT 的所有特性均围绕 “适配物联网场景” 展开,核心目标可概括为 4 点:

轻量级
协议头部极小(固定头部仅 2 字节,可变头部和负载按需添加),设备侧客户端代码体积可压缩至几十 KB,适配单片机、传感器等资源受限设备(如 Arduino、ESP8266)。
低带宽 / 低功耗
减少数据传输量(无冗余字段),支持 “休眠 - 唤醒” 模式,适合 2G/4G/NB-IoT 等窄带或按流量计费的网络。
高可靠性
通过 “服务质量(QoS)” 机制保障消息不丢失、不重复,应对物联网常见的 “网络波动、设备离线” 场景。
解耦通信
基于 “发布 / 订阅” 模式,设备(发布者)和平台(订阅者)无需直接交互,甚至无需知道对方的 IP / 端口,降低系统耦合度。

核心架构:发布 / 订阅模式

MQTT 不采用 “点对点(P2P)” 直接通信,而是通过中间代理(Broker) 实现消息转发,架构包含 3 个核心角色

角色功能典型示例
发布者(Publisher)发送消息的设备 / 应用,无需知道谁会接收消息,仅需指定消息的 “主题(Topic)”。温湿度传感器、智能门锁
订阅者(Subscriber)接收消息的设备 / 应用,需提前 “订阅” 感兴趣的 “主题(Topic)”,仅接收对应消息。物联网平台、手机 APP、PLC
代理(Broker)核心中间件,负责接收发布者的消息,根据 “主题” 匹配订阅者,并将消息转发给订阅者;同时管理设备连接、会话、QoS 等逻辑。EMQX、Mosquitto、AWS IoT Core

关键概念:主题(Topic)

“主题” 是 MQTT 消息的 “分类标签”,类似文件系统的路径(用/分隔层级),用于实现消息的精准路由。例如:

传感器主题:sensor/room1/temperature(1 号房间温度传感器)、sensor/room2/humidity(2 号房间湿度传感器);
设备控制主题:device/light/room1/control(控制 1 号房间灯光)。

这里借助的MQTT的插件是MQTTnet+拓展MQTTnet.Extensions.ManagedClient,这个拓展提供了一个托管的 MQTT 客户端实现,简化了 MQTT 客户端的使用并增强了可靠性。它自动处理连接管理、重连逻辑、消息队列等功能,非常适合需要稳定 MQTT 通信的应用程序。
https://download.csdn.net/download/weixin_44347839/91777855

using UnityEngine;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;
using MQTTnet.Protocol;
using MQTTnet.Packets;
using System.Linq;
using MyFrameworkPro;
public class MQTTClient : MonoSingleton<MQTTClient>
{[Header("MQTT Broker Settings")][Tooltip("MQTT服务器地址")]public string brokerAddress = "test.mosquitto.org";[Tooltip("MQTT服务器端口")]public int brokerPort = 1883;[Tooltip("客户端ID,为空则自动生成")]public string clientId = "";[Tooltip("用户名,无则留空")]public string username = "";[Tooltip("密码,无则留空")]public string password = "";[Header("Connection Settings")][Tooltip("是否使用TLS加密连接")]public bool useTls = false;[Tooltip("自动重连间隔(秒)")]public int reconnectInterval = 5;[Tooltip("心跳包间隔(秒)")]private int KeepAlive = 60;// 管理型MQTT客户端private IManagedMqttClient _mqttClient;// 连接状态public bool IsConnected => _mqttClient?.IsConnected ?? false;// 消息队列,用于在主线程处理接收到的消息private readonly Queue<MqttApplicationMessageReceivedEventArgs> _messageQueue =new Queue<MqttApplicationMessageReceivedEventArgs>();// 事件定义public event Action OnConnected;public event Action OnDisconnected;public event Action<string> OnLog;public event Action<string, string> OnMessageReceived; // 主题, 消息内容private void Awake(){// 确保场景中只存在一个MQTT客户端实例if (FindObjectsOfType<MQTTClient>().Length > 1){Destroy(gameObject);}else{DontDestroyOnLoad(gameObject);}}private void Start(){// 初始化客户端InitializeClient();}private void Update(){// 在主线程处理消息队列ProcessMessageQueue();}protected override void OnDestroy(){base.OnDestroy();// 断开连接并清理资源DisconnectAsync().ConfigureAwait(false);_mqttClient?.Dispose();}/// <summary>/// 初始化MQTT客户端/// </summary>public void InitializeClient(){if (_mqttClient != null){_mqttClient.Dispose();}clientId = string.IsNullOrEmpty(clientId) ? Guid.NewGuid().ToString() : clientId;//配置连接参数Options = new MqttClientOptionsBuilder().WithTcpServer(brokerAddress, brokerPort).WithClientId(clientId).WithCredentials(username, password).WithCleanSession().WithKeepAlivePeriod(TimeSpan.FromSeconds(KeepAlive)).Build();//配置重连机制ManagedOptions = new ManagedMqttClientOptionsBuilder().WithAutoReconnectDelay(TimeSpan.FromSeconds(5)).WithClientOptions(Options).Build();// 创建管理型客户端_mqttClient = new MqttFactory().CreateManagedMqttClient();// 注册事件_mqttClient.ConnectedAsync += OnConnectedAsync;_mqttClient.DisconnectedAsync += OnDisconnectedAsync;_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;_mqttClient.ConnectingFailedAsync += OnConnectingFailedAsync;}private MqttClientOptions Options;private ManagedMqttClientOptions ManagedOptions;/// <summary>/// 连接到MQTT服务器/// </summary>public async Task ConnectAsync(){if (_mqttClient == null){InitializeClient();}if (!_mqttClient.IsConnected){Log("连接到MQTT服务器: " + brokerAddress + ":" + brokerPort);await _mqttClient.StartAsync(ManagedOptions);}else{Log("已经连接到MQTT服务器");}}/// <summary>/// 从MQTT服务器断开连接/// </summary>public async Task DisconnectAsync(){if (_mqttClient != null && _mqttClient.IsConnected){Log("正在断开与MQTT服务器的连接");await _mqttClient.StopAsync();}}/// <summary>/// 发布消息到指定主题/// </summary>/// <param name="topic">主题</param>/// <param name="payload">消息内容</param>/// <param name="qosLevel">服务质量等级</param>/// <param name="retain">是否保留消息</param>public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce, bool retain = false){if (!IsConnected){Log("无法发布消息 - 未连接到服务器");return;}if (string.IsNullOrEmpty(topic)){Log("主题不能为空");return;}try{var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).WithQualityOfServiceLevel(qosLevel).WithRetainFlag(retain).Build();await _mqttClient.EnqueueAsync(message);Log($"已发布消息到主题: {topic} - 内容: {payload}");}catch (Exception ex){Log($"发布消息失败: {ex.Message}");}}/// <summary>/// 订阅主题/// </summary>/// <param name="topic">要订阅的主题</param>/// <param name="qosLevel">服务质量等级</param>public async Task SubscribeAsync(string topic, MqttQualityOfServiceLevel qosLevel = MqttQualityOfServiceLevel.AtMostOnce){if (!IsConnected){Log("无法订阅主题 - 未连接到服务器");return;}if (string.IsNullOrEmpty(topic)){Log("订阅的主题不能为空");return;}try{var topicFilter = new MqttTopicFilterBuilder().WithTopic(topic).WithQualityOfServiceLevel(qosLevel).Build();// 将单个主题过滤器包装在列表中await _mqttClient.SubscribeAsync(new List<MqttTopicFilter> { topicFilter });Log($"已订阅主题: {topic}");}catch (Exception ex){Log($"订阅主题失败: {ex.Message}");}}/// <summary>/// 取消订阅主题/// </summary>/// <param name="topic">要取消订阅的主题</param>public async Task UnsubscribeAsync(string topic){if (!IsConnected){Log("无法取消订阅 - 未连接到服务器");return;}if (string.IsNullOrEmpty(topic)){Log("取消订阅的主题不能为空");return;}try{await _mqttClient.UnsubscribeAsync(topic);Log($"已取消订阅主题: {topic}");}catch (Exception ex){Log($"取消订阅失败: {ex.Message}");}}/// <summary>/// 订阅多个主题/// </summary>public async Task SubscribeMultipleAsync(Dictionary<string, MqttQualityOfServiceLevel> topicsWithQos){if (!IsConnected){Log("无法订阅主题 - 未连接到服务器");return;}if (topicsWithQos == null || topicsWithQos.Count == 0){Log("订阅的主题列表不能为空");return;}try{var topicFilters = new List<MqttTopicFilter>();foreach (var topic in topicsWithQos){var topicFilter = new MqttTopicFilterBuilder().WithTopic(topic.Key).WithQualityOfServiceLevel(topic.Value).Build();topicFilters.Add(topicFilter);}await _mqttClient.SubscribeAsync(topicFilters);Log($"已订阅 {topicFilters.Count} 个主题");}catch (Exception ex){Log($"批量订阅主题失败: {ex.Message}");}}/// <summary>/// 取消多个订阅/// </summary>/// <param name="topics"></param>/// <returns></returns>public async Task UnsubscribeMultipleAsync(IEnumerable<string> topics){if (!IsConnected){Log("无法取消订阅 - 未连接到服务器");return;}if (topics == null){Log("取消订阅的主题列表不能为null");return;}try{var topicList = topics.ToList();if (topicList.Count == 0){Log("取消订阅的主题列表不能为空");return;}await _mqttClient.UnsubscribeAsync(topicList);Log($"已取消订阅 {topicList.Count} 个主题");}catch (Exception ex){Log($"批量取消订阅失败: {ex.Message}");}}/// <summary>/// 处理接收到的消息队列(在主线程中执行)/// </summary>private void ProcessMessageQueue(){while (_messageQueue.Count > 0){var messageArgs = _messageQueue.Dequeue();string topic = messageArgs.ApplicationMessage.Topic;string payload = Encoding.UTF8.GetString(messageArgs.ApplicationMessage.PayloadSegment.ToArray());Log($"收到消息 - 主题: {topic}, 内容: {payload}");// 触发消息接收事件OnMessageReceived?.Invoke(topic, payload);}}/// <summary>/// 连接成功事件/// </summary>private Task OnConnectedAsync(MqttClientConnectedEventArgs args){Log("已连接到MQTT服务器");OnConnected?.Invoke();return Task.CompletedTask;}/// <summary>/// 断开连接事件/// </summary>private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs args){Log("已断开连接");OnDisconnected?.Invoke();return Task.CompletedTask;}/// <summary>/// 接收消息事件(在后台线程)/// </summary>private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){// 将消息加入队列,在主线程处理lock (_messageQueue){_messageQueue.Enqueue(args);}return Task.CompletedTask;}/// <summary>/// 连接失败事件/// </summary>private Task OnConnectingFailedAsync(ConnectingFailedEventArgs args){Log($"连接失败: {args.Exception.Message},将在 {reconnectInterval} 秒后重试");return Task.CompletedTask;}/// <summary>/// 日志输出/// </summary>private void Log(string message){Debug.Log($"[MQTTClient] {message}");OnLog?.Invoke(message);}
}

使用案例

using UnityEngine;
using MQTTnet.Protocol;
using System;public class MQTTTest : MonoBehaviour
{public MQTTClient mqttClient;public string testTopic = "sensor/room1/temperature";private void Start(){if (mqttClient != null){mqttClient.OnConnected += OnMqttConnected;mqttClient.OnDisconnected += OnMqttDisconnected;mqttClient.OnLog += OnMqttLog;mqttClient.OnMessageReceived += OnMqttMessageReceived;}else{Debug.LogError("未找到MQTTClient组件");}}[ContextMenu("connect")]// 连接public async void ConnectToBroker(){if (mqttClient != null && !mqttClient.IsConnected){await mqttClient.ConnectAsync();}}[ContextMenu("disconnect")]// 断开连接public async void DisconnectFromBroker(){if (mqttClient != null && mqttClient.IsConnected){await mqttClient.DisconnectAsync();}}[ContextMenu("subscribe")]// 订阅主题public async void SubscribeToTestTopic(){if (mqttClient != null && mqttClient.IsConnected){await mqttClient.SubscribeAsync(testTopic, MqttQualityOfServiceLevel.AtLeastOnce);}else{Debug.LogWarning("请先连接到MQTT服务器");}}[ContextMenu("publish")]// 发布消息public async void PublishTestMessage(){if (mqttClient != null && mqttClient.IsConnected){string message = $"Unity测试消息: {DateTime.Now:yyyy-MM-dd HH:mm:ss}";await mqttClient.PublishAsync(testTopic,message,MqttQualityOfServiceLevel.AtLeastOnce,false);}else{Debug.LogWarning("请先连接到MQTT服务器");}}[ContextMenu("unsubscribe")]// 取消订阅public async void UnsubscribeFromTestTopic(){if (mqttClient != null && mqttClient.IsConnected){await mqttClient.UnsubscribeAsync(testTopic);}else{Debug.LogWarning("请先连接到MQTT服务器");}}// MQTT连接成功回调private void OnMqttConnected(){Debug.Log("MQTT连接成功,准备订阅主题...");// 连接成功后可以自动订阅必要的主题//SubscribeToTestTopic();}// MQTT断开连接回调private void OnMqttDisconnected(){Debug.Log("MQTT已断开连接");}// MQTT日志回调private void OnMqttLog(string message){// 在这里处理日志}// 收到MQTT消息回调private void OnMqttMessageReceived(string topic, string payload){Debug.Log($"收到来自主题 {topic} 的消息: {payload}");// TODO:在这里处理业务逻辑}private void OnDestroy(){// 移除事件监听,避免内存泄漏if (mqttClient != null){mqttClient.OnConnected -= OnMqttConnected;mqttClient.OnDisconnected -= OnMqttDisconnected;mqttClient.OnLog -= OnMqttLog;mqttClient.OnMessageReceived -= OnMqttMessageReceived;}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97069.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97069.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaSE:类和对象2

一、封装封装的概念面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要研究的就是封装特性。何为封装呢&#xff1f;简单来说 就是套壳屏蔽细节。例如手机&#xff0c;你看不到任何的内部实现细节&#xff0c;只留下一些公开的接口给你使用&am…

RandAR训练自己的数据集

论文题目:RandAR: Decoder-only Autoregressive Visual Generation in Random Orders(随机顺序下仅解码器的自回归视觉生成) 会议:CVPR2025 摘要:我们介绍了RandAR,一种仅解码器的视觉自回归(AR)模型,能够以任意令牌顺序生成图像。与之前依赖于预定义生成顺序的纯解码器…

基于PHP服装租赁管理系统/基于php的服装管理系统的设计与实现

基于PHP服装租赁管理系统/基于php的服装管理系统的设计与实现

高并发内存池(12)-ThreadCache回收内存

高并发内存池&#xff08;12&#xff09;-ThreadCache回收内存 代码如下&#xff1a; // 释放对象时&#xff0c;链表过长时&#xff0c;回收内存回到中心缓存 void ThreadCache::ListTooLong(FreeList& list, size_t size) {void* start nullptr;void* end nullptr;list…

读大语言模型09超级智能

1. 超级智能1.1. 如果人工智能超越人类智能&#xff0c;可能会成为人类存在的一个重大威胁1.1.1. 对超级人工智能潜在危险最为担忧的群体中&#xff0c;恰恰包括那些否认大语言模型具备真正智能的人1.2. 计算机科学已经成为所有科学领域中不可或缺的重要组成部1.3. GPT具备编写…

阿里云拉取dockers镜像

假如你已经在云服务器上安装了docker需要配置下docker镜像加速代理就行了找到自己的加速网址&#xff1a;然后在云服务器上&#xff0c;修改docker 配置文件&#xff0c;vi /etc/docker/daemon.json没有这个文件的话&#xff0c;需要创建一个。{"default-address-pools&qu…

python自学笔记14 NumPy 线性代数

在Numpy库中有专门的linalg 模块用来做线性代数相关的运算。 本文中线性代数的一般概念不会解释 拆解矩阵 鸢尾花数据矩阵结构如下&#xff08;150 4&#xff09;&#xff1a;取其中的行向量和列向量&#xff1a; # 导入包 import numpy as np from sklearn.datasets import l…

ubuntu20搭建MQTT

sudo apt update sudo apt install mosquitto mosquitto-clients sudo mosquitto_passwd -c /etc/mosquitto/passwd myuser sudo nano /etc/mosquitto/mosquitto.conf# 允许匿名用户连接&#xff08;默认为 true&#xff0c;我们先关闭它&#xff09; allow_anonymous false# 指…

云服务器的主要用途都有哪些?

企业可以利用云服务器构建官方网站&#xff0c;企业官网需要稳定的运行环境来展示产品、服务、公司动态等信息&#xff0c;云服务器提供的高可用性和可扩展性&#xff0c;能保障大量用户同时访问时网站的稳定运行。移动应用的后端服务可以部署在云服务器上&#xff0c;如社交类…

IntelliJ IDEA Debug 模式功能指南

文章目录前言&#x1f4a1; 1. 断点类型与设置&#x1f680; 2. 启动 Debug 模式⚙️ 3. 调试控制按钮详解&#x1f440; 4. 查看与监控变量&#x1f9f0; 5. 高级调试技巧&#x1f48e; 总结前言 作为一名 Java 开发者&#xff0c;熟练掌握调试技巧是提高开发效率的关键。Int…

在pycharmIDE中如何快速掌握一个新模块的使用方法

一、文档使用悬停文档&#xff1a;鼠标悬停在模块/函数上显示文档摘要 (⭐最常用)快速文档&#xff1a;选中标识符按 CtrlQ (Windows/Linux) 或 F1 (Mac)跳转定义&#xff1a;Ctrl左键单击 直接跳转到源码定义处 (⭐最权威)参数提示&#xff1a;输入函数名时自动显示参数列表&a…

win11自定义停止更新方法

一、打开运行窗口&#xff08;winr&#xff09;输入regedit打开注册表编辑器。按照如下路径寻找。计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings二、在Settings页面下右击——>新建——>DWORD(32位)值(D)&#xff0c;并重命名为粉色框中的名字…

Unity委托、匿名方法与事件深度解析:从理论到实战

Unity委托、匿名方法与事件深度解析&#xff1a;从理论到实战 摘要&#xff1a;本文深入剖析Unity中委托、匿名方法与事件的核心机制&#xff0c;结合理论框架与实战案例&#xff0c;帮助开发者掌握高效的事件驱动编程技巧。全文包含12个代码片段及6个核心原理图示框架&#x…

大脑的藏宝图——神经科学如何为自然语言处理(NLP)的深度语义理解绘制新航线

摘要&#xff1a; 截至2025年&#xff0c;大型语言模型&#xff08;LLM&#xff09;已展现出惊人的能力&#xff0c;但其内在的“黑箱”特性和对深层语义理解的局限性也日益凸显。本报告旨在深入探讨一个充满潜力的前沿交叉领域&#xff1a;借鉴地球上最古老、最精密的语言处理…

记录使用ruoyi-flowable开发部署中出现的问题以及解决方法(二)

1.vform的使用与传值 使用动态表单&#xff0c;把当前的用户名传值进动态表单&#xff0c;另外动态表单的上传组件成功后传值会父组件。 在父组件的加载函数中增加&#xff1a; mounted(){this.$refs.vFormRef.addEC("getuploadfile",this);},该方法为给表单加载外…

Apifox 8 月更新|新增测试用例、支持自定义请求示例代码、提升导入/导出 OpenAPI/Swagger 数据的兼容性

Apifox 作为全能 API 工具&#xff0c;正以迅猛之势革新开发者的工作方式&#xff01;想象一下&#xff0c;您正为测试用例编写头疼&#xff0c;或因 OpenAPI 文件导入失败而延误项目&#xff0c;而 Apifox 8 月更新却带来“救命稻草”&#xff1a;新增测试用例功能、自定义请求…

多机多卡微调流程

多机多卡&#xff08;Distributed Training&#xff09;微调大模型是一项复杂但非常高效的任务。它允许你利用多台机器的计算资源来训练一个模型&#xff0c;从而显著缩短训练时间。 多机多卡微调核心流程 整个流程可以概括为以下几个核心步骤&#xff1a; 环境准备与硬件配置 …

Redis(23) RDB和AOF有什么区别?

Redis 的 RDB&#xff08;Redis Database&#xff09;和 AOF&#xff08;Append-Only File&#xff09;是两种主要的持久化机制。每种机制都有其独特的工作方式、优缺点和适用场景。以下是两者的详细比较&#xff0c;并结合代码示例进行解释。 RDB&#xff08;Redis Database&a…

在WSL2 Ubuntu中部署FastDFS服务的完整指南

在WSL2 Ubuntu中部署FastDFS服务的完整指南&#x1f4d6; 前言&#x1f6e0;️ 环境准备1. 系统要求2. Ubuntu应用&#x1f680; 安装服务1. 更新系统2. 安装编译依赖3. 下载源码4. 编译安装&#x1f527; 配置服务1. 设置配置文件2. 创建数据目录3. 配置Tracker服务4. 配置Sto…

新手向:网络编程完全指南

1. 引言&#xff1a;什么是网络编程&#xff1f;网络编程&#xff08;Network Programming&#xff09;是指利用计算机网络实现程序间通信的技术。它构建在计算机网络协议基础上&#xff0c;通过编程实现不同设备间的数据交换与资源共享。从底层协议实现到高层应用开发&#xf…