内嵌式mqtt server

添加moquette依赖

 <dependency><groupId>io.moquette</groupId><artifactId>moquette-broker</artifactId><version>0.17</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-reload4j</artifactId></exclusion></exclusions>
</dependency>

配置文件类MoquetteProperties

package com.mqtt.config;import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class MoquetteProperties {@Value("${mqtt.port:1883}")private String mqttPort;@Value("${mqtt.host:0.0.0.0}")private String mqttHost;@Value("${mqtt.allow_anonymous:false}")private String allowAnonymous;@Value("${mqtt.username:admin}")private String username;@Value("${mqtt.password:moque123432}")private String password;}

权限认证类CustomAuthenticator

package com.mqtt.interceptor;import com.mqtt.config.MoquetteProperties;
import io.moquette.broker.security.IAuthenticator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class CustomAuthenticator implements IAuthenticator {@Autowiredprivate MoquetteProperties moquetteProperties;@Overridepublic boolean checkValid(String clientId, String username, byte[] password) {String passwordStr = new String(password);if (moquetteProperties.getUsername().equals(username)&& moquetteProperties.getPassword().equals(passwordStr)) {return true;}log.error("CustomAuthenticator checkValid: 用户名或密码错误");return false;}
}

消息拦截类MqttMessageInterceptor

package com.mqtt.interceptor;import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Slf4j
@Component
public class MqttMessageInterceptor extends AbstractInterceptHandler {@Overridepublic String getID() {return MqttMessageInterceptor.class.getName();}@Overridepublic void onPublish(InterceptPublishMessage msg) {String clientId = msg.getClientID();String topic = msg.getTopicName();// 获取消息的有效载荷ByteBuf payload = msg.getPayload();String content = safeReadByteBuf(payload);log.info("MqttMessageInterceptor Received message - Client: {}, Topic: {}, Payload: {}",clientId, topic, content);}@Overridepublic void onSessionLoopError(Throwable error) {log.error("MqttMessageInterceptor onSessionLoopError", error);}/*** 安全读取 ByteBuf 数据*/private String safeReadByteBuf(ByteBuf byteBuf) {try {if (byteBuf == null || !byteBuf.isReadable()) {return "";}if (byteBuf.hasArray()) {// 堆内缓冲区byte[] array = byteBuf.array();int offset = byteBuf.arrayOffset() + byteBuf.readerIndex();int length = byteBuf.readableBytes();return new String(array, offset, length, StandardCharsets.UTF_8);} else {// 堆外缓冲区byte[] array = new byte[byteBuf.readableBytes()];byteBuf.getBytes(byteBuf.readerIndex(), array);return new String(array, StandardCharsets.UTF_8);}} finally {// 确保释放 ByteBuf 资源if (byteBuf != null && byteBuf.refCnt() > 0) {byteBuf.release();}}}
}

MQTT Server类MoquetteBrokerConfig

参考
https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22

package com.mqtt.config;import com.mqtt.interceptor.CustomAuthenticator;
import com.mqtt.interceptor.MqttMessageInterceptor;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.interception.InterceptHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;
import java.util.List;
import java.util.Properties;@Slf4j
@Configuration
public class MoquetteBrokerConfig {@Autowiredprivate MoquetteProperties moquetteProperties;@Bean(destroyMethod = "stopServer")public Server mqttBroker(MqttMessageInterceptor interceptor, CustomAuthenticator customAuthenticator) {// 创建 Moquette 的配置Properties properties = new Properties();// 设置监听端口为 1883properties.setProperty(IConfig.PORT_PROPERTY_NAME, moquetteProperties.getMqttPort());// 监听所有网络接口properties.setProperty(IConfig.HOST_PROPERTY_NAME, moquetteProperties.getMqttHost());// 允许匿名连接properties.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, moquetteProperties.getAllowAnonymous());IConfig config = new MemoryConfig(properties);// 初始化 Moquette 服务器Server mqttServer = new Server();List<InterceptHandler> handlers = List.of(interceptor);try {mqttServer.startServer(config, handlers, null, customAuthenticator, null);} catch (IOException e) {log.error("启动内嵌式MQTT服务器失败", e);throw new RuntimeException(e);}log.info("内嵌式MQTT服务器已启动,监听端口: 1883");return mqttServer;}
}

go 内嵌mqtt

配置

package constantsconst (MqttUser     = "admin"MqttPassword = "bydmqtt123432"MqttPort     = "1883"
)

MQTT Server

package mqttimport ("bytes""fmt""iaas-server-manager/constants""github.com/google/uuid""github.com/labstack/gommon/log"mqtt "github.com/mochi-mqtt/server/v2""github.com/mochi-mqtt/server/v2/hooks/auth""github.com/mochi-mqtt/server/v2/listeners""github.com/mochi-mqtt/server/v2/packets"
)// CustomAuthHook 实现自定义认证
type CustomAuthHook struct {auth.Hook
}func (h *CustomAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {username := pk.Connect.Usernamepassword := pk.Connect.Passwordif len(username) == 0 || len(password) == 0 {return false}// 自定义认证逻辑if string(username) == constants.MqttUser && string(password) == constants.MqttPassword {return true}// 或者检查数据库等外部系统// user := db.GetUser(pk.Username)// return user != nil && user.CheckPassword(pk.Password)return false
}// 自定义 Hook 实现
// https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22
type MqttCustomHook struct {mqtt.HookBase
}func (h *MqttCustomHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {log.Info("client connected", "client", cl.ID)return nil
}func (h *MqttCustomHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {if err != nil {log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)} else {log.Info("client disconnected", "client", cl.ID, "expire", expire)}
}// OnPacketRead 在读取数据包时触发
func (h *MqttCustomHook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {switch pk.FixedHeader.Type {case packets.Publish:log.Printf("收到PUBLISH消息 clientId = %s, TopicName = %s, Payload = %s", cl.ID, pk.TopicName, string(pk.Payload))case packets.Connect:log.Printf("客户端连接clientId = %s", cl.ID)case packets.Subscribe:log.Printf("客户端订阅clientId = %s", cl.ID)}return pk, nil // 继续处理数据包
}func (h *MqttCustomHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {log.Printf("subscribed clientId=%s, qos=%v", cl.ID, "filters", reasonCodes)
}func (h *MqttCustomHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {log.Printf("unsubscribed clientId=%s", cl.ID)
}func (h *MqttCustomHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {log.Printf("received from clientId=%s, payload=%s", cl.ID, string(pk.Payload))return pk, nil
}func (h *MqttCustomHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {log.Printf("published to clientId=%s, payload=%s", cl.ID, string(pk.Payload))
}func (h *MqttCustomHook) Provides(b byte) bool {return bytes.Contains([]byte{mqtt.OnConnect,mqtt.OnDisconnect,mqtt.OnSubscribed,mqtt.OnUnsubscribed,mqtt.OnPublished,mqtt.OnPublish,mqtt.OnPacketRead,}, []byte{b})
}type MqttServer struct {
}// https://github.com/mochi-mqtt/server/blob/main/README-CN.md
// 启动MqttServer
func (ms *MqttServer) StartServer() {// Create the new MQTT Server.server := mqtt.New(&mqtt.Options{InlineClient: true,})// Allow all connections.errAddHook := server.AddHook(new(CustomAuthHook), nil)if errAddHook != nil {log.Fatal("CustomAuthHook AddHook fail", errAddHook)panic(errAddHook)}errAddHook = server.AddHook(new(MqttCustomHook), nil)if errAddHook != nil {log.Fatal("MqttCustomHook AddHook fail", errAddHook)panic(errAddHook)}serverId := uuid.New()// Create a TCP listener on a standard port.tcp := listeners.NewTCP(listeners.Config{ID: fmt.Sprintf("mqtt_%s", serverId), Address: fmt.Sprintf(":%s", constants.MqttPort)})err := server.AddListener(tcp)if err != nil {log.Fatal("AddListener fail", err)panic(err)}go func() {err := server.Serve()if err != nil {log.Fatal("AddListener Serve", err)panic(err)}}()log.Info("StartServer start finish, Listener On Port: ", constants.MqttPort)
}

启动

package mainfunc main() {//启动mqtt服务mqttServer := mqtt.MqttServer{}mqttServer.StartServer()
}

mqtt客户端

添加依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

订阅消息

package com.olive;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttReceiver2 {public static void main(String[] args) {String broker = "tcp://localhost:1883";String clientId = "ReceiverClient2";String topic = "exclusive/test/topic";String username = "admin";String password = "moque123432";try {MqttClient client = new MqttClient(broker, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.setCallback(new MqttCallback() {@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("收到消息: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 不需要处理}@Overridepublic void connectionLost(Throwable cause) {System.out.println("连接丢失");}});client.connect(options);System.out.println("接收方客户端已连接");client.subscribe(topic);System.out.println("已订阅主题: " + topic);} catch (MqttException e) {e.printStackTrace();}}
}

发布消息

package com.olive;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttSender {public static void main(String[] args) {String broker = "tcp://localhost:1883";String clientId = "SenderClient";String topic = "patrol/publish/result";String content = "Hello from SenderClient";String username = "admin";String password = "bydmqtt123432";try {MqttClient client = new MqttClient(broker, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);System.out.println("发送方客户端已连接");MqttMessage message = new MqttMessage(content.getBytes());message.setQos(1); // 设置 QoS 级别为 1client.publish(topic, message);System.out.println("消息已发送到主题: " + topic);client.disconnect();System.out.println("发送方客户端已断开连接");} catch (MqttException e) {e.printStackTrace();}}
}

go语言发布消息

package mainimport ("fmt""log""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("收到消息: %s from topic: %s\n", msg.Payload(), msg.Topic())
}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {log.Println("已连接到 MQTT Broker")// 订阅主题topic := "exclusive/test/topic"qos := byte(0)token := client.Subscribe(topic, qos, messagePubHandler)token.Wait()if token.Error() != nil {log.Printf("订阅失败: %v\n", token.Error())} else {log.Printf("已订阅主题: %s\n", topic)}
}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("连接丢失: %v", err)
}func main() {// 设置 MQTT Broker 的地址broker := "tcp://localhost:1883"clientID := "go_mqtt_client"// 创建客户端选项opts := mqtt.NewClientOptions()opts.AddBroker(broker)opts.SetUsername("admin")opts.SetPassword("bydmqtt123432")opts.SetClientID(clientID)opts.SetAutoReconnect(true)                    // 启用自动重连opts.SetResumeSubs(true)                       // 重连后恢复订阅opts.SetMaxReconnectInterval(30 * time.Second) // 最大重连间隔// 设置回调函数opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandler// 创建客户端client := mqtt.NewClient(opts)// 连接到 Brokerif token := client.Connect(); token.Wait() && token.Error() != nil {fmt.Println("连接失败:", token.Error())return}fmt.Println("成功连接到 MQTT Broker")// // 订阅主题// topic := "exclusive/test/topic"// qos := byte(0)// token := client.Subscribe(topic, qos, messagePubHandler)// token.Wait()// if token.Error() != nil {// 	fmt.Println("订阅失败:", token.Error())// 	return// }// fmt.Println("成功订阅主题:", topic)// 发布消息topic := "exclusive/test/topic/1"payload := "back Hello, MQTT from Go!"qos := byte(0)retain := falsetoken := client.Publish(topic, qos, retain, payload)token.Wait()if token.Error() != nil {fmt.Println("发布失败:", token.Error())return}// 保持程序运行以接收消息for {time.Sleep(1 * time.Second)}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/86059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php执行后报502,无错误提示的排查和解决

文章目录 一、阐述问题二、开始排查1.执行代码展示2.PHP层面排查问题3.系统层面排查问题1. 分析系统日志2. core dump 分析2.1 core dump 是什么2.2 core dump 配置 并 生成 core 文件2.3 gdb 解析 core 文件 4. 问题解决 三、赠送内容四、总结 一、阐述问题 这个问题花了我起…

MySQL 核心知识点解析

最近正在复习Java八股&#xff0c;所以会将一些热门的八股问题&#xff0c;结合ai与自身理解写成博客便于记忆 InnoDB 和 MyISAM 的区别 特性InnoDBMyISAM事务支持支持ACID事务不支持事务锁机制行级锁表级锁外键支持支持不支持崩溃恢复有crash-safe能力无存储结构聚簇索引非…

CppCon 2015 学习:Comparison is not simple, but it can be simpler.

What is comparison? 这段文字是从计算机科学、编译器设计或系统优化的角度来定义和评价“比较&#xff08;comparison&#xff09;”这个操作&#xff1a; 1. Pervasive&#xff08;无处不在&#xff09; 比较操作在编程中极为常见&#xff0c;存在于&#xff1a; 分支语句&…

RocketMQ入门5.3.2版本(基于java、SpringBoot操作)

一、RocketMQ概述 RocketMQ是一款由阿里巴巴于2012年开源的分布式消息中间件&#xff0c;旨在提供高吞吐量、高可靠性的消息传递服务。主要特点有&#xff1a; 灵活的可扩展性 海量消息堆积能力 支持顺序消息 支持多种消息过滤方式 支持事务消息 支持回溯消费 支持延时消…

VR线上展厅特点分析与优势

VR线上展厅&#xff1a;特点、优势与实际应用 VR线上展厅&#xff0c;作为虚拟现实&#xff08;VR&#xff09;技术在展示行业的创新应用&#xff0c;正逐步改变着传统的展览方式。通过模拟真实的物理环境&#xff0c;为参观者提供身临其境的展览体验&#xff0c;成为展示行业…

QT 5.9.2+VTK8.0实现等高线绘制

项目下载链接&#xff1a;QT5.9.2VTK8.0实现等高线绘制资源-CSDN文库 示例如下&#xff1a; 主要代码如下&#xff1a; #include "vtkRenderer.h" #include "vtkRenderWindow.h" #include "vtkRenderWindowInteractor.h" #include "vtkPo…

MySQL:忘记root密码

修改配置文件&#xff1a; vi /etc/my.cnf## 修改配置文件 ##[mysqld] skip - grant - tables## 重启 ##/etc/init.d/mysqld restart ## 或service mysqld restart## 登录mysqld -u root -p -h 127.0.0.1USE mysql; UPDATE user SET Password password(123456) WHERE User r…

JSP、HTML和Tomcat

9x9上三角乘法表 乘法表的实现 <% page contentType"text/html;charsetUTF-8" language"java" %> <!DOCTYPE html> <html> <head><title>99 上三角乘法表</title><style>body {font-family: monospace;padding…

常用枚举技巧:基础(一)

文章目录 常用枚举技巧&#xff1a;基础&#xff08;一&#xff09;LeetCode 1. 两数之和思路Golang 代码 LeetCode 2441. 与对应负数同时存在的最大正整数思路Golang 代码 LeetCode 1512. 好数对的数目思路Golang 代码 LeetCode 2001. 可互换矩形的对数思路Golang 代码 LeetCo…

从混乱到秩序:探索管理系统如何彻底改变工作流程

内容摘要 在许多企业与组织中&#xff0c;工作流程混乱是阻碍发展的“绊脚石”。员工们常常被繁琐的步骤、模糊的职责和沟通不畅等问题搞得焦头烂额&#xff0c;工作效率低下&#xff0c;错误频发。而与之形成鲜明对比的是&#xff0c;一些引入了先进管理系统的团队&#xff0…

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…

华为×小鹏战略合作:破局智能驾驶深水区的商业逻辑深度解析

当中国智能电动车竞争进入下半场&#xff0c;头部玩家的合纵连横正在重构产业格局。华为与小鹏汽车近日官宣的“战略合作”&#xff0c;表面看是技术互补的常规操作&#xff0c;实则暗藏改写行业游戏规则的深层商业逻辑。 一、技术破壁&#xff1a;从“单点突破”到“全栈协同”…

Tailwind CSS 实战:基于 Kooboo 构建 AI 对话框页面(六):图片上传交互功能

在 《Tailwind CSS 实战&#xff1a;基于 Kooboo 构建 AI 对话框页面&#xff08;五&#xff09;》 中&#xff0c;完成了语音交互功能的优化。本文作为该系列教程的第六篇&#xff0c;将聚焦于图片上传功能的开发。通过集成图片上传与预览能力&#xff0c;我们将进一步完善 AI…

40. 自动化异步测试开发之编写异步业务函数、测试函数和测试类(类写法)

40. 自动化异步测试开发之编写异步业务函数、测试函数和测试类&#xff08;类写法&#xff09; 一、类结构设计解析 1.1 基类设计 class Base:async_driver None # &#x1f697; 存储浏览器驱动实例async def get(self, url: str http://secure.smartbearsoftware.com/.…

面向开发者的提示词工程④——文本推断(Inferring)

文章目录 前言一、情感&#xff08;正向/负向&#xff09;二、识别情感类型三、识别愤怒四、从客户评论中提取产品和公司名称五、一次完成多项任务 前言 面向开发者的提示词工程——导读 在这节课中&#xff0c;你将从产品评论和新闻文章中推断情感和主题。 举了个商品评论的例…

java day15 (数据库)

进入数据库的学习 DB 因为数据太多了&#xff0c;方便统一管理的软件 操作就不用改代码了&#xff0c;直接改数据库则可&#xff1b; 命令就是sql语句 这些都是关系型数据库&#xff0c;sql可以控制全部&#xff0c;至于具体的环境我以前就有安装过了&#xff1b; 理解&am…

国标GB28181设备管理软件EasyGBS远程视频监控方案助力高效安全运营

一、方案背景​ 在商业快速扩张的背景下&#xff0c;连锁店门店数量激增&#xff0c;分布范围广。但传统人工巡检、电话汇报等管理方式效率低下&#xff0c;存在信息滞后、管理盲区&#xff0c;难以掌握店铺运营情况&#xff0c;影响企业效率与安全。网络远程视频监控系统可有…

Python 字典(dict)的高级用法与技巧

今天我们继续深入讲解 Python 字典的 高级用法与技巧&#xff0c;包括&#xff1a; defaultdict&#xff1a;带默认值的字典Counter&#xff1a;快速统计工具字典排序&#xff1a;按键或值排序合并字典&#xff08;传统方式和 Python 3.9 新语法&#xff09;嵌套字典的安全访问…

动静态库的使用(Linux)

1.库 通俗来说&#xff0c;库就是现有的&#xff0c;可复用的代码&#xff0c;例如&#xff1a;在C/C语言编译时&#xff0c;就需要依赖相关的C/C标准库。本质上来说库是一种可执行代码的二进制形式&#xff0c;可以被操作系统载入内存执行。通常我们可以在windows下看到一些后…

R²ec: 构建具有推理能力的大型推荐模型,显著提示推荐系统性能!!

摘要&#xff1a;大型推荐模型通过编码或项目生成将大型语言模型&#xff08;LLMs&#xff09;扩展为强大的推荐工具&#xff0c;而近期在LLM推理方面的突破也同步激发了在推荐领域探索推理的动机。目前的研究通常将LLMs定位为外部推理模块&#xff0c;以提供辅助性思考来增强传…