C# .NET Framework 中的高效 MQTT 消息传递

介绍:

在当今互联互通的世界里,设备之间高效可靠的通信至关重要。MQTT(消息队列遥测传输)就是为此而设计的轻量级消息传递协议。本文将探讨 MQTT 是什么、它的优势以及如何在 .NET 框架中设置和实现它。最后,您将对 MQTT 有一个清晰的了解,并通过一个实际示例来帮助您入门。

1.什么是MQTT?

定义和概述:MQTT 代表消息队列遥测传输 (Message Queuing Telemetry Transport)。它是一种轻量级的发布-订阅网络协议,用于在设备之间传输消息。对于需要较少代码占用空间或网络带宽有限的远程位置的连接,MQTT 非常有用。

MQTT的优点:

    • 低带宽使用率:旨在最大限度地减少网络带宽使用率,使其成为资源受限环境的理想选择。

    • 高效的消息传递:确保以不同的服务质量 (QoS) 级别可靠地传递消息。

    • 可扩展性:适用于小型到大型实施,无缝处理数千台设备。

    • 易于使用:易于实施并与各种平台集成。

与传统方法的比较:

    • 没有 MQTT:HTTP 等传统通信方法更繁重且效率更低,尤其是对于物联网应用而言。

    • 使用 MQTT:提供轻量、高效、可靠的消息传递机制,提高性能并减少延迟。

2. 在 .NET Framework 中设置 MQTT

分步指南:

    安装 MQTT 包:

        1、在 Visual Studio 中打开您的 .NET 项目。

        2、在解决方案资源管理器中右键单击您的项目并选择“管理 NuGet 包”。

        3、搜索并安装 M2Mqtt 包。

编写 MQTT 客户端代码:创建一个新类来处理 MQTT 客户端操作。初始化 MQTT 客户端,配置连接选项,并连接到代理。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Exceptions;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace MQTT
{
    public class MqttManager
    {
        #region Observer Pattern
        private HashSet<string> topics = new HashSet<string>();
        private HashSet<IMqttListener> listeners = new HashSet<IMqttListener>();
        private readonly ushort keepAlivePeriod = 30;

        private readonly string BrokerIp;
        private readonly int? BrokerPort = null;
        private string clientId;

        private string Username = null;
        private string Password = null;

        private MqttClient mqttClient;

        public MqttManager(string brokerIp, int? brokerPort = null, string username = null, string password = null)
        {
            BrokerIp = brokerIp;
            BrokerPort = brokerPort;
            Username = username;
            Password = password;

            clientId = RandomClientId();
            Debug.WriteLine($"mqttClient clientId: {clientId}");

            InitMQTT();
        }

        public void Register(string topic, IMqttListener listener)
        {
            if (!topics.Contains(topic))
            {
                Subscribe(topic);
            }

            listeners.Add(listener);
        }

        public void UnRegister(IMqttListener listener)
        {
            listeners.Remove(listener);
        }

        public void NotifyListeners(string topic, string message)
        {
            foreach (IMqttListener listener in listeners)
            {
                listener.OnMqttMessage(topic, message);
            }
        }

        #endregion

        #region Init Mqtt        

        async private void InitMQTT()
        {
            Debug.WriteLine($"mqttClient.Try connecting to brokerIp: {BrokerIp}");
            mqttClient = new MqttClient(BrokerIp);
            mqttClient.MqttMsgPublishReceived += (sender, e) =>
            {
                string topic = e.Topic;
                string message = Encoding.UTF8.GetString(e.Message);

                Debug.WriteLine($"mqttClient.MessageReceived. {topic} -> {message}");

                NotifyListeners(topic, message);
            };

            var connectToBroker = new Func<Task>(() =>
            {
                return Task.Run(() =>
                {
                    byte resultCode = mqttClient.Connect(clientId, Username, Password, true, keepAlivePeriod);

                    if (resultCode == 0)
                    {
                        foreach (var topic in topics)
                        {
                            Subscribe(topic);
                        }
                    }
                });
            });

            mqttClient.ConnectionClosed += async (s, e) =>
            {
                await Task.Delay(TimeSpan.FromSeconds(5));

                while (!mqttClient.IsConnected)
                {
                    try
                    {
                        await connectToBroker();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"An error occurred: {ex.Message}");
                        await Task.Delay(TimeSpan.FromSeconds(5));
                    }
                }
            };

            while (!mqttClient.IsConnected)
            {
                try
                {
                    await connectToBroker();
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"An error occurred: {ex.Message}");
                    await Task.Delay(TimeSpan.FromSeconds(5));
                }
            }

            Console.WriteLine("Connected to Broker Successfully!");
        }

        async private void Subscribe(string topic)
        {
            Debug.WriteLine($"mqttClient.TryToSubscribe -> {topic}");
            mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
            topics.Add(topic);
        }

        private string RandomClientId()
        {
            return $"Server-{Guid.NewGuid()}";
        }
    }

    public interface IMqttListener
    {
        void OnMqttMessage(string topic, string message);
    }
}

3.示例实现:处理消息

场景:实现 MQTT 客户端来订阅主题并处理传入的消息。

详细步骤:

    • 订阅主题:修改客户端代码以订阅特定主题。

    • 处理传入消息:使用事件处理程序来处理传入消息。

public class BoostMqtt : Singleton<BoostMqtt>, IMqttListener
{
    public readonly MqttManager MqttManager;
    private const string TOPIC = "example_topic";
    public static ConcurrentQueue<MqttMessage> MqttMessageQueue = new ConcurrentQueue<MqttMessage>();

    private BoostMqtt()
    {
        string brokerIp = GetBrokerIp();
        int? brokerPort = GetBrokerPort();
        (string username, string password) = GetUsernamePassword();

        MqttManager = new MqttManager(brokerIp, brokerPort, username, password);
        MqttManager.Register(TOPIC, this);

        StartProcessingMqttMessagesFromQueue();
    }

    ~BoostMqtt()
    {
        MqttManager?.UnRegister(this);
    }

    private string GetBrokerIp()
    {
        try
        {
            return ConfigurationManager.AppSettings["MqttBrokerIpAddress"];
        }
        catch (Exception e)
        {
            Logging.Instance.WriteErrorToLog(e);
            return string.Empty;
        }
    }

    private int? GetBrokerPort()
    {
        try
        {
            if (int.TryParse(ConfigurationManager.AppSettings["MqttBrokerPort"], out int port))
            {
                return port;
            }
        }
        catch (Exception e)
        {
            Logging.Instance.WriteErrorToLog(e);
        }
        return null;
    }

    private (string, string) GetUsernamePassword()
    {
        try
        {
            string username = ConfigurationManager.AppSettings["MqttUsername"];
            string password = ConfigurationManager.AppSettings["MqttPassword"];
            return (username, password);
        }
        catch (Exception e)
        {
            Logging.Instance.WriteErrorToLog(e);
            return (null, null);
        }
    }

    public void StartProcessingMqttMessagesFromQueue()
    {
        int numThreads = Environment.ProcessorCount;

        for (int i = 0; i < numThreads; i++)
        {
            var thread = new Thread(() =>
            {
                while (true)
                {
                    try
                    {
                        if (MqttMessageQueue.TryDequeue(out MqttMessage message))
                        {
                            ProcessMqttMessage(message.Topic, message.Message);
                        }
                    }
                    catch (Exception e)
                    {
                        Logging.Instance.WriteErrorToLog(e);
                    }
                    Thread.Sleep(1);
                }
            });
            thread.Start();
        }
    }

    public void ProcessMqttMessage(string topic, string original_message)
    {
        if (topic.Contains("example_topic"))
        {
            // Process the message here
        }
    }
}

4. 比较 HiveMQ、RabbitMQ、Kafka 和 MQTT

    • HiveMQ:针对物联网进行了优化,具有广泛的监控和管理功能,非常适合大规模工业和商业物联网部署。

    • RabbitMQ:提供多种协议和复杂路由功能的灵活性,适用于企业消息传递和微服务架构。

    • Kafka:擅长高吞吐量、实时数据处理,适用于大数据和事件流应用。

    • MQTT:轻量级且高效,适用于低带宽、高延迟环境,是物联网和移动应用的理想选择。

结论:

    本文介绍了 MQTT 的基础知识、优势以及如何在 .NET 框架中设置它。我们还演示了一个使用 MQTT 处理消息的实际示例。利用 MQTT 轻量级且高效的协议,您可以显著改善应用程序中设备之间的通信。

如果您喜欢此文章,请收藏、点赞、评论,谢谢,祝您快乐每一天。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/86930.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/86930.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nn.Embedding 和 word2vec 的区别

理解它们的关键在于​​区分概念层级和职责​​。 可以将它们类比为&#xff1a; ​​word2vec&#xff1a;​​ 一个​​专门制作高质量词向量模型的“工厂”​​。​​nn.Embedding&#xff1a;​​ 一个​​可存储、查找并训练词向量的“智能储物柜”​​&#xff08;作为…

华为云Flexus+DeepSeek征文|​​华为云ModelArts Studio大模型 + WPS:AI智能PPT生成解决方案​

引言&#xff1a;告别繁琐PPT制作&#xff0c;AI赋能高效办公 ​​ 在商业汇报、学术研究、产品发布等场景中&#xff0c;制作专业PPT往往需要耗费大量时间进行内容整理、逻辑梳理和视觉美化。​​华为云ModelArts Studio大模型​​与​​WPS​​深度结合&#xff0c;推出AI-P…

【连接redis超时】

报错 客户端输出缓冲区超限 Client … scheduled to be closed ASAP for overcoming of output buffer limits 表示这些客户端&#xff08;通过 psubscribe 命令进行发布订阅操作&#xff09;的输出缓冲区超过了 Redis 配置的限制&#xff0c;Redis 会关闭这些客户端连接来避免…

PHP「Not enough Memory」实战排错笔记

目录 PHP「Not enough Memory」实战排错笔记 1. 背景 2. 快速定位 3. 为什么 5 MB 的图片能耗尽 128 MB&#xff1f; 3.1 粗略估算公式&#xff08;GD&#xff09; 4. 实际峰值监控 5. 解决过程 6. 最佳实践与防御措施 7. 总结 PHP「Not enough Memory」实战排错笔记 —…

Java垃圾回收机制和三色标记算法

一、对象内存回收 对于对象回收&#xff0c;需要先判断垃圾对象&#xff0c;然后收集垃圾。 收集垃圾采用垃圾收集算法和垃圾收集器。 判断垃圾对象&#xff0c;通常采用可达性分析算法。 引用计数法 每个对象设置一个引用计数器。每被引用一次&#xff0c;计数器就加1&am…

基于python网络数据挖掘的二手房推荐系统

基于网络数据挖掘的二手房推荐系统设计与实现 【摘要】 随着互联网技术在房地产行业的深入应用&#xff0c;线上房源信息呈爆炸式增长&#xff0c;给购房者带来了信息过载的挑战。为了提升二手房筛选的效率与精准度&#xff0c;本文设计并实现了一个基于网络数据挖掘的二手房推…

Java + 阿里云 Gmsse 实现 SSL 国密通信

前言 解决接口或页面仅密信浏览器&#xff08;或 360 国密浏览器&#xff09;能访问的问题 测试页面 测试网站-中国银行&#xff1a;https://ebssec.boc.cn/boc15/help.html 使用其他浏览器&#xff08;google&#xff0c;edge等&#xff09;打开 使用密信浏览器打开 解决…

国产数据库分类总结

文章目录 一、华为系数据库1. 华为 GaussDB 二、阿里系数据库1. 阿里云 OceanBase2. PolarDB&#xff08;阿里云自研&#xff09; 三、腾讯系数据库1. TDSQL&#xff08;腾讯云&#xff09;2. TBase&#xff08;PostgreSQL增强版&#xff09; 四、传统国产数据库1. 达梦数据库&…

解密闭包:函数如何记住外部变量

&#x1f9e0; 什么是闭包&#xff1f; 闭包是一个函数对象&#xff0c;它不仅记住它的代码逻辑&#xff0c;还记住了定义它时的自由变量&#xff08;即非全局也非局部&#xff0c;但被内部函数引用的变量&#xff09;。即使外部函数已经执行完毕&#xff0c;这些自由变量的值…

I2C协议详解及STM32 HAL库硬件I2C卡死问题分析

一、I2C协议详解 1. I2C协议概述 Inter-Integrated Circuit (I2C) 是由 Philips 半导体&#xff08;现 NXP 半导体&#xff09;于 1980 年代设计的一种同步串行通信总线协议。该协议采用半双工通信模式&#xff0c;支持多主从架构&#xff0c;专为短距离、低速率的芯片间通信…

HTTP协议-后端接收请求

起因就是不知道post这个请求体中这些格式有什么区别&#xff0c;后端又怎么去接收这些不同格式的内容 Get请求 get请求是比较简单的一类 正常的直接用参数接收&#xff08;不写的话名字要匹配&#xff09;或者RequestParam都可以接收&#xff0c;用对象绑定也可以 resultful…

HTML5 实现的圣诞主题网站源码,使用了 HTML5 和 CSS3 技术,界面美观、节日氛围浓厚。

以下是一个 HTML5 实现的圣诞主题网站源码&#xff0c;使用了 HTML5 和 CSS3 技术&#xff0c;界面美观、节日氛围浓厚。它包括&#xff1a; 圣诞树动画 &#x1f384;雪花飘落特效 ❄️圣诞祝福语 &#x1f381;响应式布局&#xff0c;适配移动端 你可以将代码保存为 index.…

Spring Cloud Bus 和 Spring Cloud Stream

Spring Cloud Bus 和 Spring Cloud Stream 都是 Spring Cloud 生态中的消息通信组件&#xff0c;但它们的定位和使用场景有显著区别&#xff1a; 1. Spring Cloud Bus 核心定位&#xff1a;分布式系统的消息广播&#xff08;配置刷新、事件传播&#xff09;。 典型场景&#x…

磁悬浮轴承位移信号的高精度估计:卡尔曼滤波算法深度解析

无需位移传感器,滤波算法如何实现微米级精度? 磁悬浮轴承作为革命性的非接触式支承技术,凭借无磨损、无需润滑、高转速等优势,在飞轮储能、高速电机、人工心脏泵和航空航天领域获得了广泛应用。其核心控制依赖于对转子位移信号的高精度实时检测,传统电涡流传感器虽能提供位…

DAY 43 预训练模型

目录 一、预训练的概念 二、 经典的预训练模型 2.1 CNN架构预训练模型 2.2 Transformer类预训练模型 2.3 自监督预训练模型 三、常见的分类预训练模型介绍 3.1 预训练模型的发展史 3.2 预训练模型的训练策略 知识点回顾&#xff1a; 预训练的概念常见的分类预训练模型图像…

Redis:事物

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Redis &#x1f525; 什么是事务 Redis的事务和MySQL的事务概念上是类似的.都是把⼀系列操作绑定成⼀组.让这⼀组能够批量执⾏. 但是注意体会Redis的事务和MySQL事务的区别: 弱化的原⼦性:redi…

CppCon 2018 学习:An allocator is a handle to a heap Lessons learned from std::pmr

“An allocator is a handle to a heap — Lessons learned from std::pmr” 翻译过来就是&#xff1a;“分配器&#xff08;allocator&#xff09;是对堆&#xff08;heap&#xff09;的一种句柄&#xff08;handle&#xff09;——从 std::pmr 中学到的经验”。 基础概念 分…

设备健康实时监测方法演进:从传感网络到AI决策树的工业智能实践

引言&#xff1a;当设备运维遇上AIoT革命 在工业4.0进程中&#xff0c;​毫秒级设备状态捕获能力正成为智能工厂的核心竞争力。传统监测方法因数据滞后、诊断粗放被诟病&#xff0c;本文将深入探讨三大前沿实时监测技术路径&#xff0c;并揭秘中讯烛龙系统如何通过深度强化学习…

剑指offer53_二叉树的深度

二叉树的深度 输入一棵二叉树的根结点&#xff0c;求该树的深度。 从根结点到叶结点依次经过的结点&#xff08;含根、叶结点&#xff09;形成树的一条路径&#xff0c;最长路径的长度为树的深度。 数据范围 树中节点数量 [ 0 , 500 ] [0,500] [0,500]。 样例 输入&#…

探秘AI的秘密:leaked-system-prompts

揭秘:揭秘系统提示合集背后的秘密 在当今这个人工智能技术迅速发展的时代,了解和使用大型语言模型(LLM)已成为技术爱好者、开发者和研究人员的共同目标。而作为核心组成部分,系统提示(system prompts)的设计和应用直接影响了LLM的表现和功能。今天, 我们将为大家揭示一…