MQTTServer服务器根据MQTTClient客户端已订阅的主题推送 分发消息

网络读卡器介绍:https://item.taobao.com/item.htm?ft=t&id=22173428704&spm=a21dvs.23580594.0.0.52de2c1bgK3bgZ

本示例使用了MQTTNet插件

C# MQTTNETServer 源码

using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet.Protocol;namespace MQTTNETServerForms
{public partial class Form1 : Form{private MqttServerOptionsBuilder optionBuilder;private IMqttServer server;//mqtt服务器对象List<TopicItem> Topics = new List<TopicItem>();public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e){//创建服务器对象server = new MqttFactory().CreateMqttServer();server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(Server_ApplicationMessageReceived));//绑定消息接收事件server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(new Action<MqttServerClientConnectedEventArgs>(Server_ClientConnected));//绑定客户端连接事件server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(new Action<MqttServerClientDisconnectedEventArgs>(Server_ClientDisconnected));//绑定客户端断开事件server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(new Action<MqttServerClientSubscribedTopicEventArgs>(Server_ClientSubscribedTopic));//绑定客户端订阅主题事件server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(new Action<MqttServerClientUnsubscribedTopicEventArgs>(Server_ClientUnsubscribedTopic));//绑定客户端退订主题事件server.StartedHandler = new MqttServerStartedHandlerDelegate(new Action<EventArgs>(Server_Started));//绑定服务端启动事件server.StoppedHandler = new MqttServerStoppedHandlerDelegate(new Action<EventArgs>(Server_Stopped));//绑定服务端停止事件}/// 绑定消息接收事件private void Server_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e){string msg = e.ApplicationMessage.ConvertPayloadToString();WriteLog(">>> 收到消息:" + msg + ",QoS =" + e.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic);}/// 绑定客户端连接事件private void Server_ClientConnected(MqttServerClientConnectedEventArgs e){Task.Run(new Action(() =>{lbClients.BeginInvoke(new Action(() =>{lbClients.Items.Add(e.ClientId);}));}));WriteLog(">>> 客户端" + e.ClientId + "连接");}/// 绑定客户端断开事件private void Server_ClientDisconnected(MqttServerClientDisconnectedEventArgs e){Task.Run(new Action(() =>{lbClients.BeginInvoke(new Action(() =>{lbClients.Items.Remove(e.ClientId);}));}));WriteLog(">>> 客户端" + e.ClientId + "断开");}/// 绑定客户端订阅主题事件private void Server_ClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e){Task.Run(new Action(() =>{var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter.Topic);if (topic == null){topic = new TopicItem { Topic = e.TopicFilter.Topic, Count = 0 };Topics.Add(topic);}if (!topic.Clients.Exists(c => c == e.ClientId)){topic.Clients.Add(e.ClientId);topic.Count++;}lvTopic.Invoke(new Action(() =>{this.lvTopic.Items.Clear();}));foreach (var item in this.Topics){lvTopic.Invoke(new Action(() =>{this.lvTopic.Items.Add($"{item.Topic}:{item.Count}");}));}}));WriteLog(">>> 客户端" + e.ClientId + "订阅主题" + e.TopicFilter.Topic);}/// 绑定客户端退订主题事件private void Server_ClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e){Task.Run(new Action(() =>{var topic = Topics.FirstOrDefault(t => t.Topic == e.TopicFilter);if (topic != null){topic.Count--;topic.Clients.Remove(e.ClientId);}this.lvTopic.Items.Clear();foreach (var item in this.Topics){this.lvTopic.Items.Add($"{item.Topic}:{item.Count}");}}));WriteLog(">>> 客户端" + e.ClientId + "退订主题" + e.TopicFilter);}/// 绑定服务端启动事件private void Server_Started(EventArgs e){WriteLog(">>> 服务端已启动!");Invoke(new Action(() => {this.button1.Text = "停止服务";}));}/// 绑定服务端停止事件private void Server_Stopped(EventArgs e){WriteLog(">>> 服务端已停止!");Invoke(new Action(() => {this.button1.Text = "启动MQTT服务";}));}/// 显示日志public void WriteLog(string message){if (txtMsg.InvokeRequired){txtMsg.Invoke(new Action(() =>{txtMsg.Text = "";txtMsg.Text = (message + "\r");}));}else{txtMsg.Text = "";txtMsg.Text = (message + "\r");}}[Obsolete]private async void button1_Click(object sender, EventArgs e){if (button1.Text == "启动MQTT服务")   /// 启动服务{optionBuilder = new MqttServerOptionsBuilder().WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(this.txtIP.Text)).WithDefaultEndpointPort(int.Parse(this.txtPort.Text)).WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000)).WithConnectionValidator(t =>{string un = "", pwd = "";un = this.txtUname.Text;pwd = this.txtUpwd.Text;if (t.Username != un || t.Password != pwd){t.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;}else{t.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;}});var option = optionBuilder.Build();await server.StartAsync(option);}else{if (server != null)   //停止服务{                    server.StopAsync();}}}}
}

C# MQTTNETClient 源码

using MQTTnet.Client.Options;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet;
using static System.Windows.Forms.Design.AxImporter;
using System.Net.Security;namespace MQTTNETClientForms
{public partial class Form1 : Form{private MqttFactory factory;private IManagedMqttClient mqttClient;//客户端mqtt对象private MqttClientOptionsBuilder mqttClientOptions;private ManagedMqttClientOptionsBuilder options;private bool connstate;public Form1(){InitializeComponent();}private void Form1_Load(object sender, EventArgs e){}/// 显示日志private void WriteLog(string message){if (txtMsg.InvokeRequired){txtMsg.Invoke(new Action(() =>{txtMsg.Text = (message);}));}else{txtMsg.Text = (message);}}/// 订阅[Obsolete]private async void btnSub_Click(object sender, EventArgs e){if (connstate == false){WriteLog(">>> 请先与MQTT服务器建立连接!");return;}if (string.IsNullOrWhiteSpace(this.txtTopic.Text)){WriteLog(">>> 请输入主题");return;}//在 MQTT 中有三种 QoS 级别: //At most once(0) 最多一次//At least once(1) 至少一次//Exactly once(2) 恰好一次//await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.tbTopic.Text).WithAtMostOnceQoS().Build());//最多一次, QoS 级别0await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(this.txtTopic.Text).WithAtLeastOnceQoS().Build());//恰好一次, QoS 级别1 WriteLog($">>> 成功订阅");}/// 发布private async void btnPub_Click(object sender, EventArgs e){if (connstate==false){WriteLog(">>> 请先与MQTT服务器建立连接!");return;}if (string.IsNullOrWhiteSpace(this.txtTopik.Text)){WriteLog(">>> 请输入主题");return;}var result = await mqttClient.PublishAsync(this.txtTopik.Text,this.txtContent.Text,MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);//恰好一次, QoS 级别1   WriteLog($">>> 主题:{this.txtTopik.Text},消息:{this.txtContent.Text},结果: {result.ReasonCode}");}private async void button1_Click(object sender, EventArgs e){if (button1.Text == "连接到MQTT服务器"){connstate = false;factory = new MqttFactory();mqttClient = factory.CreateManagedMqttClient();//创建客户端对象//绑定断开事件mqttClient.UseDisconnectedHandler(async ee =>{                   WriteLog("与服务器之间的连接断开了,正在尝试重新连接");Invoke(new Action(() => {this.button1.Text = "连接到MQTT服务器";}));// 等待 5s 时间await Task.Delay(TimeSpan.FromSeconds(5));try{if (factory == null) { factory = new MqttFactory() ; }//创建客户端对象 if (mqttClient == null) { mqttClient = factory.CreateManagedMqttClient(); }//创建客户端对象 mqttClient.UseConnectedHandler(tt =>{connstate = true;WriteLog(">>> 连接到服务成功");Invoke(new Action(() => {this.button1.Text = "断开与MQTT服务器的连续";}));});}catch (Exception ex){connstate = false;WriteLog($"重新连接服务器失败:{ex}");Invoke(new Action(() => {this.button1.Text = "连接到MQTT服务器";}));}});//绑定接收事件mqttClient.UseApplicationMessageReceivedHandler(aa =>{try{string msg = aa.ApplicationMessage.ConvertPayloadToString();WriteLog(">>> 消息:" + msg + ",QoS =" + aa.ApplicationMessage.QualityOfServiceLevel + ",客户端=" + aa.ClientId + ",主题:" + aa.ApplicationMessage.Topic);}catch (Exception ex){WriteLog($"+ 消息 = " + ex.Message);}});//绑定连接事件mqttClient.UseConnectedHandler(ee =>{connstate =true;WriteLog(">>> 连接到服务成功");Invoke(new Action(() => {this.button1.Text = "断开与MQTT服务器的连续";}));});var mqttClientOptions = new MqttClientOptionsBuilder().WithClientId(this.txtId.Text).WithTcpServer(this.txtIP.Text, int.Parse(this.txtPort.Text)).WithCredentials(this.txtName.Text, this.txtUpwd.Text);var options = new ManagedMqttClientOptionsBuilder().WithAutoReconnectDelay(TimeSpan.FromSeconds(5)).WithClientOptions(mqttClientOptions.Build()).Build();//开启await mqttClient.StartAsync(options);                }else{if (mqttClient != null){if (mqttClient.IsStarted){await mqttClient.StopAsync();}mqttClient.Dispose();connstate = false;}button1.Text = "连接到MQTT服务器";}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87290.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87290.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【seismic unix 合并两个su文件】

Seismic Unix简介 Seismic Unix&#xff08;SU&#xff09;是由科罗拉多矿业学院开发的开源地震数据处理软件包&#xff0c;基于Unix/Linux环境运行。它提供了一系列命令行工具&#xff0c;用于地震数据加载、处理、分析和可视化&#xff0c;支持SEG-Y格式和SU自定义格式。SU广…

【vmware虚拟机使用】安装vmware workstations17

安装vmware17 本章学习目标VMware虚拟机简介开始实操下载VMware workstation虚拟机安装虚拟机配置虚拟机网络 总结 本章学习目标 1.安装vmware workstation虚拟机 2.自定义配置虚拟机网络&#xff0c;避免网络冲突 VMware虚拟机简介 ​ VMware的核心是Hypervisor&#xff0…

QT6 源(147)模型视图架构里的表格窗体 QTableWidget 的范例代码举例,以及其条目 QTableWidgetItem 类型的源代码。

&#xff08;1&#xff09;先用一个简单的例子&#xff0c;学习一下本类里的成员函数的使用。生成如下图的界面&#xff0c;表格窗体与初始数据&#xff1a; 查看其 ui_widget . h 文件 &#xff0c;里面的将是最标准的表格窗体的使用代码 &#xff1a; #ifndef UI_WIDGET_H #…

URL时间戳参数深度解析:缓存破坏与前端优化的前世今生

&#x1f50d; URL时间戳参数深度解析&#xff1a;缓存破坏与前端优化的前世今生 在日常的Web开发中&#xff0c;你是否注意到很多接口URL后面都会带有一个时间戳参数&#xff1f;比如 xxx/getMsg?_1751413509056。这个看似简单的参数背后&#xff0c;却隐藏着前端缓存策略、性…

分布式锁实现方式:基于Redis的分布式锁实现(Spring Boot + Redis)

Redis实现分布式锁的原理 Redis分布式锁基于其单线程执行命令的特性&#xff0c;通过原子操作实现多节点间的互斥访问。下面从原理、实现、问题及优化四个方面详细解析&#xff1a; 1.原子性与互斥性 Redis分布式锁的核心是原子性操作&#xff1a; 获取锁&#xff1a;使用SE…

linux升级降级内核实验

✅实验环境 vmware workstation 17 centos7.9 下载链接&#xff1a; https://vault.centos.org/7.9.2009/isos/x86_64/ ubuntu24.04 下载链接&#xff1a; https://old-releases.ubuntu.com/releases/24.04/ ✅实验目的 为了解决日常环境部署中某些驱动软件依赖特定内…

华为云开始了“开发者空间 AI Agent 开发”活动

引言 今天在华为云App上偶然看到一个新活动&#xff1a;Developer Events_Developer Alliance-Huawei Cloud。这个活动要求开发者可结合自己的工作实践&#xff0c;须在华为开发者空间内完成应用构建&#xff0c;应用构建类型和主题为AI Agent应用开发。 AI Agent平台 华为开…

2025.6.26总结

今天和我做同一业务得同事进行了工作交接&#xff0c;主要给我讲了怎么去执行自动化。包括性能自动化&#xff0c;API自动化&#xff0c;UI自动化&#xff0c;除了UI自动化要写些代码&#xff0c;其他跑得话也就在工具上配个参数&#xff0c;就是个搬砖得活&#xff0c;没太大技…

ip网络基础

交换机工作原理&#xff1a; 自主学习mac地址并成mac地址表 根据mac地址表再进行单播、广播转发 主机通信原理&#xff08;局域网&#xff09;&#xff1a; 需要了解arp协议 拓扑图&#xff1a; 首先&#xff0c;我们观察icmp数据包&#xff0c;发现缺少目标mac地址&#…

AI大模型如何重塑软件开发流程?

文章目录 每日一句正能量前言一、AI大模型的定义与特点&#xff08;一&#xff09;定义&#xff08;二&#xff09;特点 二、AI大模型在软件开发中的应用场景&#xff08;一&#xff09;代码自动生成&#xff08;二&#xff09;智能测试&#xff08;三&#xff09;需求分析与设…

Kafka与RabbitMQ相比有什么优势?

大家好&#xff0c;我是锋哥。今天分享关于【Kafka与RabbitMQ相比有什么优势&#xff1f;】面试题。希望对大家有帮助&#xff1b; Kafka与RabbitMQ相比有什么优势&#xff1f; 超硬核AI学习资料&#xff0c;现在永久免费了&#xff01; Kafka与RabbitMQ在消息队列的设计和应…

LeetCode 2090. 半径为 k 的子数组平均值

题目链接 2090. 半径为 k 的子数组平均值 题目描述 给定一个下标从 0 开始的整数数组 nums 和整数 k&#xff0c;构建并返回一个长度为 n 的数组 avgs&#xff0c;其中 avgs[i] 表示以下标 i 为中心、半径为 k 的子数组的平均值。具体规则如下&#xff1a; 无效位置&#x…

深入理解C++11原子操作:从内存模型到无锁编程

文章目录 C并发编程的新纪元内存模型基础&#xff1a;可见性与有序性数据竞争的根源happens-before关系memory_order枚举详解1. memory_order_relaxed2. memory_order_acquire/memory_order_release3. memory_order_seq_cst 原子操作详解std::atomic模板核心原子操作1. 读取与存…

DQL-1-基础查询

基础查询 DQL-1-基础查询 基础查询DQL - 介绍DQL - 语法DQL - 基本查询案例 DQL - 介绍 SQL 英文全称是 Data Query Language, 数据查询语言, 用来查询数据库中表的记录 查询关键字: SELECT DQL - 语法 SELECT 字段列表FROM 表名列表WHERE条件列表GROUP BY分组字段列表HAVI…

Prompt 精通之路(七)- 你的终极 AI 宝典:Prompt 精通之路系列汇总

你的终极 AI 宝典&#xff1a;Prompt 精通之路系列汇总 标签&#xff1a; #Prompt指南 #AI学习资源 #速查手册 #ChatGPT #系列总结 &#x1f680; Prompt 精通之路&#xff1a;系列文章导航 第一篇&#xff1a;AI 时代的新语言&#xff1a;到底什么是 Prompt&#xff1f;为什么…

P27:RNN实现阿尔茨海默病诊断

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、过程解读 PyTorch 实战&#xff1a;阿尔茨海默病数据预测模型 今天&#xff0c;我将带大家一起探索一个基于 PyTorch 的深度学习小项目——利用 RNN 模…

HakcMyVM-Arroutada

信息搜集 主机发现 ┌──(kali㉿kali)-[~] └─$ nmap -sn 192.168.21.0/24 Starting Nmap 7.95 ( https://nmap.org ) at 2025-07-01 07:13 EDT Nmap scan report for 192.168.21.11 Host is up (0.00062s latency). MAC Address: 08:00:27:4E:CC:FB (PCS Systemtechnik/Or…

TEXT Submitting Solutions

前言 USACO 训练项目配备了一个自动评分系统&#xff0c;用于批改你的作业题目。你可以直接在题目页面提交你的程序&#xff1b;系统会对程序进行编译和评分&#xff0c;几秒钟内就能将结果反馈给你。 支持的语言有 C、C&#xff08;含 C11 和 C14&#xff09;、PASCAL、Pyth…

Reactor 瞬态错误

在响应式编程中&#xff0c;retryWhen 操作符通过 RetrySignal 接口提供了对重试行为的精细控制&#xff0c;特别是在处理 瞬态错误&#xff08;transient errors&#xff09; 时。瞬态错误是指那些在一段时间内发生&#xff0c;但随后会自行恢复的错误&#xff0c;例如网络请求…

基于 SpringBoot+Vue.js+ElementUI 的小型超市商品管理系统设计与实现7000字论文设计

摘要 本论文设计并实现了一个基于 SpringBoot、Vue.js 和 ElementUI 的小型超市商品管理系统。该系统旨在为小型超市提供一个高效、便捷的商品管理解决方案&#xff0c;实现商品信息的录入、查询、修改、删除等功能&#xff0c;同时支持库存管理、销售统计等业务需求。论文首先…