MQTT:Java集成MQTT

目录

  • Git项目路径
  • 一、原生java架构
    • 1.1 导入POM文件
    • 1.2 编写测试用例
  • 二、SpringBoot集成MQTT
    • 2.1 导入POM文件
    • 2.2 在YML文件中增加配置
    • 2.3 新建Properties配置文件映射配置
    • 2.4 创建连接工厂
    • 2.5 增加入站规则配置
    • 2.6 增加出站规则配置
    • 2.7 创建消息发送网关
    • 2.8 测试消息发送
    • 2.9 项目结构


Git项目路径

一、原生java架构

1.1 导入POM文件

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

1.2 编写测试用例

package com.ming;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;/*** 使用java原生方法连接MQTT*/
public class MqttPahoTest {private final String serverURI = "tcp://localhost:1883";private final String clientId = "emqx_spring_client_132";/*** 建立连接* @throws MqttException*/@Testpublic MqttClient createConnection() throws MqttException {// 创建MQTT对象MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());// 发送建立连接的请求MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName("admin");mqttConnectOptions.setPassword("admin".toCharArray());mqttConnectOptions.setCleanSession(true);client.connect(mqttConnectOptions);return client;}/*** 发送消息* @throws MqttException*/@Testpublic void sendMsg() throws MqttException {// 创建对象MqttClient client = createConnection();// 发送消息MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(2);mqttMessage.setPayload("Hello World".getBytes());client.publish("java/a", mqttMessage);// 关闭连接client.disconnect();client.close();}/*** 接收消息* @throws MqttException*/@Testpublic void receiveMsg() throws MqttException {// 创建MQTT对象MqttClient client = new MqttClient(serverURI, clientId, new MemoryPersistence());// 发送建立连接的请求MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName("admin");mqttConnectOptions.setPassword("admin".toCharArray());mqttConnectOptions.setCleanSession(true);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {  // 当连接丢失时的回调System.out.println("Connection lost...");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {  // 消息接收回调System.out.println(String.format("%s ---> %s", topic, new String(mqttMessage.getPayload())));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  // 消息传输完毕System.out.println("Delivery complete");}});client.connect(mqttConnectOptions);// 订阅主题client.subscribe("java/b", 2);while (true);}
}

二、SpringBoot集成MQTT

2.1 导入POM文件

<!-- spring boot 项目集成消息中间件基础依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<!-- spring boot 项目和MQTT客户端集成依赖 -->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.4.3</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version>
</dependency>

2.2 在YML文件中增加配置

spring:mqtt:username: adminpassword: adminurl: tcp://localhost:1883subClientId: sub_client_id_123subTopic: atguigu/iot/lamp/line1,atguigu/iot/lamp/line2pubClientId: pub_client_id_123

2.3 新建Properties配置文件映射配置

package com.ming.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {private String username;private String password;private String url;private String subClientId;private String subTopic;private String pubClientId;
}

2.4 创建连接工厂

package com.ming.config;import com.ming.properties.MqttConfigurationProperties;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;/*** 创建MQTT的配置类 配置连接工厂*/
@Configuration
public class MqttConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Beanpublic MqttPahoClientFactory mqttPahoClientFactory() {DefaultMqttPahoClientFactory mqttPahoClientFactory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttConfigurationProperties.getUsername());options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});mqttPahoClientFactory.setConnectionOptions(options);return mqttPahoClientFactory;}
}

2.5 增加入站规则配置

package com.ming.config;import com.ming.handler.ReceiverMessageHandler;
import com.ming.properties.MqttConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT入站规则配置类(接收消息)*/
@Configuration
public class MqttInboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;@Autowiredprivate ReceiverMessageHandler receiverMessageHandler;/*** 消息通道* @return*/@Beanpublic MessageChannel messageInboundChannel() {return new DirectChannel();}/*** 配置入站适配器,作用:设置订阅主题,以及指定消息的相关属性* @return*/@Beanpublic MessageProducer messageProducer() {MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl(),mqttConfigurationProperties.getSubClientId(),mqttPahoClientFactory,mqttConfigurationProperties.getSubTopic().split(","));mqttPahoMessageDrivenChannelAdapter.setQos(1);mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageInboundChannel());return mqttPahoMessageDrivenChannelAdapter;}/*** 消息处理器* @return*/@Bean@ServiceActivator(inputChannel = "messageInboundChannel")public MessageHandler messageHandler() {return receiverMessageHandler;}
}
package com.ming.handler;import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;/*** 当订阅的主题有消息时就会触发此处回调*/
@Component
public class ReceiverMessageHandler implements MessageHandler {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {Object payload = message.getPayload();  // 获取消息的内容System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  // 主题名称System.out.println(payload);  // 消息主体}
}

2.6 增加出站规则配置

package com.ming.config;import com.ming.properties.MqttConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;/*** MQTT出站规则配置类(发送消息)*/
@Configuration
public class MqttOutboundConfiguration {@Autowiredprivate MqttConfigurationProperties mqttConfigurationProperties;@Autowiredprivate MqttPahoClientFactory mqttPahoClientFactory;/*** 消息通道* @return*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** 配置出站适配器* @return*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutboundMessageHandler() {MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl(),mqttConfigurationProperties.getPubClientId(),mqttPahoClientFactory);mqttPahoMessageHandler.setDefaultQos(0);mqttPahoMessageHandler.setDefaultTopic("default");mqttPahoMessageHandler.setAsync(true);return mqttPahoMessageHandler;}
}

2.7 创建消息发送网关

package com.ming.getway;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;/*** MQTT发送消息的网关*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGetWay {public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, String payload);public abstract void sendMsgToMqtt(@Header(value = MqttHeaders.TOPIC) String topic, @Header(value = MqttHeaders.QOS) int qos,String payload);
}
package com.ming.service;import com.ming.getway.MqttGetWay;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MqttMessageSender {@Autowiredprivate MqttGetWay mqttGetWay;public void sendMsg(String topic, String message) {mqttGetWay.sendMsgToMqtt(topic, message);}public void sendMsg(String topic, int qos, String message) {mqttGetWay.sendMsgToMqtt(topic, qos, message);}
}

2.8 测试消息发送

package com.ming;import com.ming.service.MqttMessageSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest(classes = SpringMqttDemoApplication.class)
public class MqttMessageSenderTest {@Autowiredprivate MqttMessageSender mqttMessageSender;@Testpublic void sendToMsg(){mqttMessageSender.sendMsg("java/c","hello mqtt spring boot ...");}
}

2.9 项目结构

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93168.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day 16 stm32 IIC

1.IIC概述1基于对话的形式完成&#xff0c;不需要同时进行发送和接收所以删掉了一根数据线&#xff0c;变成半双工2为了安全起见添加了应答机制3可以接多个模块&#xff0c;且互不干扰4异步时序&#xff0c;要求严格&#xff0c;发送过程中不能暂停&#xff0c;所以需要同步时序…

AMD KFD的BO设计分析系列 0:开篇

开启我始终不敢碰的GPU存储系列&#xff0c;先上个图把核心关系表达下&#xff0c;以此纪念。注&#xff1a;图中kfdm_mm误写&#xff0c;应该为kfd_mm&#xff0c;不修改了&#xff0c;请大家不要介意。

EUDR的核心内容,EUDR认证的好处,EUDR意义

近年来&#xff0c;全球森林退化问题日益严峻&#xff0c;毁林行为不仅加剧气候变化&#xff0c;还威胁生物多样性和原住民权益。为应对这一挑战&#xff0c;欧盟于2023年6月正式实施《欧盟零毁林法案》&#xff08;EU Deforestation-free Regulation, EUDR&#xff09;&#x…

数据分析专栏记录之 -基础数学与统计知识

数据分析专栏记录之 -基础数学与统计知识&#xff1a; 1、描述性统计 均值 data_set [10, 20, 30, 40, 50] mean sum(data_set)/len(data_set)np 里面的函数&#xff0c;对二维进行操作时&#xff0c; 默认每一列 mean1 np.mean(data_set) print(mean, mean1)s 0 for i…

《星辰建造师:C++多重继承的奇幻史诗》

&#x1f30c;&#x1f525; 《星辰建造师&#xff1a;多重继承与this指针的终极史诗》 &#x1f525;&#x1f30c;—— 一场融合魔法、科技与哲学的C奇幻冒险&#x1f320;&#x1f30c; 序章&#xff1a;代码宇宙的诞生 &#x1f30c;&#x1f320;在无尽的代码维度中&#…

云计算-OpenStack 运维开发实战:从 Restful API 到 Python SDK 全场景实现镜像上传、用户创建、云主机部署全流程

一、python-Restful Api 简介 Restful API 是一种软件架构风格,基于 HTTP 协议设计,通过统一的接口(如 URL 路径)和标准的 HTTP 方法(GET/POST/PUT/DELETE 等)实现资源(如数据、文件等)的操作,具有无状态、可缓存、客户端 - 服务器分离等特点。方法如下 用 GET 请求获…

RxJava 在 Android 中的深入解析:使用、原理与最佳实践

前言RxJava 是一个基于观察者模式的响应式编程库&#xff0c;它通过可观察序列和函数式操作符的组合&#xff0c;简化了异步和事件驱动程序的开发。在 Android 开发中&#xff0c;RxJava 因其强大的异步处理能力和简洁的代码风格而广受欢迎。本文将深入探讨 RxJava 的使用、核心…

面试实战 问题三十 HTTP协议中TCP三次握手与四次挥手详解

HTTP协议中TCP三次握手与四次挥手详解 在HTTP协议中&#xff0c;连接建立和断开依赖于底层的TCP协议。虽然HTTP本身不定义握手过程&#xff0c;但所有HTTP通信都通过TCP三次握手建立连接&#xff0c;通过四次挥手断开连接。以下是详细解析&#xff1a;一、TCP三次握手&#xff…

读《精益数据分析》:双边市场的核心指标分析

双边市场数据分析指南&#xff1a;从指标体系到实战落地&#xff08;基于《精益数据分析》框架&#xff09;在互联网平台经济中&#xff0c;双边市场&#xff08;如电商、出行、外卖、自由职业平台等&#xff09;的核心矛盾始终是"供需平衡与效率优化"。这类平台连接…

Queue参考代码

queue.c #include "queue.h" #include "stdlib.h" // 初始化循环队列 void initializeCircularQueue(CircularQueue *cq, uint8_t *buffer, uint32_t size) {cq->front 0;cq->rear 0;cq->count 0;cq->size size;cq->data buffer; }…

通过时间计算地固系到惯性系旋转矩阵

通过时间计算地固系到惯性系旋转矩阵 1. 引言 在航天工程和卫星导航领域&#xff0c;经常需要在地固坐标系(ECEF)和惯性坐标系(ECI)之间进行转换。本文将详细介绍如何根据UTC时间计算这两个坐标系之间的旋转矩阵&#xff0c;并提供完整的C语言实现。 2. 基本概念 2.1 坐标系定义…

【Datawhale AI 夏令营】金融文档分析检索增强生成系统的架构演变与方法论进展

# **金融文档分析检索增强生成系统的架构演变与方法论进展****第一部分&#xff1a;基础原则和基线系统分析****第一部分&#xff1a;金融领域检索增强生成范式的解构****第二部分&#xff1a;基线剖析&#xff1a;流水线的二分法****同步轨迹 (SimpleRAG)****异步改进 (AsyncS…

C语言相关简单数据结构:顺序表

目录 1.顺序表的概念及结构 1.1 线性表 如何理解逻辑结构和物理结构&#xff1f; 1.2 顺序表分类 顺序表和数组的区别&#xff1a; 顺序表分类&#xff1a; 静态顺序表 动态顺序表 1.3 动态顺序表的实现 初始化 尾插 头插 尾删 头删 在指定位置之前插入数据 删…

nginx配置代理服务器

Nginx 作为代理服务器时&#xff0c;主要用于反向代理&#xff08;最常用&#xff0c;转发客户端请求到后端服务&#xff09;或正向代理&#xff08;较少用&#xff0c;为客户端提供访问外部网络的代理&#xff09;。以下是两种场景的具体配置示例&#xff1a; 一、反向代理配置…

MySQL数据库知识体系总结 20250813

一、数据库的原理 1.数据库的分类 我们可以根据数据的结构类型&#xff0c;将数据分成三类&#xff0c;分别是&#xff1a;结构化数据&#xff0c;半结构化数据&#xff0c;非结构化数据。 要点&#xff1a;对于结构化数据来讲通常是先有结构再有数据。要点&#xff1a;对于半…

C++ 中构造函数参数对父对象的影响:父子控件管理机制解析

文章目录C 中构造函数参数对父对象的影响&#xff1a;父子控件管理机制解析1. Qt 中的父对象管理机制2. 构造函数传递父对象的不同方式2.1. 父控件是 QWidget parent&#xff08;通用方式&#xff09;分析&#xff1a;2.2. 父控件是 Books_Client parent&#xff08;限制父控件…

直播美颜SDK开发实战:高性能人脸美型的架构与实现

在直播行业里&#xff0c;美颜已经不再是锦上添花&#xff0c;而是标配中的标配。无论是游戏主播、带货达人&#xff0c;还是唱歌、跳舞的才艺主播&#xff0c;直播美颜SDK往往决定了用户的第一印象和停留时长。尤其是高性能人脸美型技术&#xff0c;不仅能让主播的五官更加自然…

JavaWeb(苍穹外卖)--学习笔记18(Apache POI)

前言 本篇文章是学习B站黑马程序员苍穹外卖的学习笔记&#x1f4d1;。我的学习路线是Java基础语法-JavaWeb-做项目&#xff0c;管理端的功能学习完之后&#xff0c;就进入到了用户端微信小程序的开发&#xff0c;用户端开发的流程大致为用户登录—商品浏览&#xff08;其中涉及…

OpenJDK 17 源码 安全点轮询的信号处理流程

OpenJDK 17 源码&#xff0c;安全点轮询的信号处理流程如下&#xff08;重点分析安全点轮询相关部分&#xff09;&#xff1a;核心信号处理流程信号触发&#xff1a;当线程访问安全点轮询内存页时&#xff08;SafepointMechanism::is_poll_address&#xff09;&#xff0c;会触…

InfluxDB 在工业控制系统中的数据监控案例(一)

工业控制系统数据监控的重要性**在工业领域&#xff0c;生产过程的复杂性和连续性使得数据监控成为保障生产稳定运行的关键环节。通过实时收集、处理和分析生产数据&#xff0c;企业能够及时掌握设备运行状态、产品质量信息以及生产流程的各项参数&#xff0c;从而为生产决策提…