MQTT 入门教程:三步从 Docker 部署到 Java 客户端实现

在物联网(IoT)与边缘计算快速发展的今天,设备间的高效通信成为核心需求。MQTT 作为一种轻量级的发布 / 订阅模式协议,凭借其低带宽占用、强稳定性和灵活的消息路由能力,已成为物联网通信的事实标准。无论是智能家居的设备联动、工业传感器的数据采集,还是车联网的实时信息交互,MQTT 都在其中扮演着关键角色。
本文将从零开始搭建:从使用 Docker 部署轻量级 MQTT 服务器(Broker),到基于 Java 语言实现完整的消息发布与订阅功能,通过清晰的步骤和可直接运行的代码,最短时间内搭建起自己的 MQTT 通信系统。

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布 / 订阅模式消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)、传感器网络和移动设备通信等场景。
核心概念

  • Broker:消息服务器,负责接收和转发所有消息
  • Publisher:消息发布者,发送消息到 Broker
  • Subscriber:消息订阅者,从 Broker 接收消息
  • Topic:消息主题,用于消息分类和路由
  • QoS (Quality of Service):服务质量等级,定义消息传递的可靠性
    -在这里插入图片描述

第一步:使用 Docker 部署 MQTT Broker

我们将使用 Eclipse Mosquitto,一个流行的开源 MQTT Broker。

1. 拉取 Mosquitto 镜像

docker pull eclipse-mosquitto

2. 创建配置文件

首先创建一个目录用于存放配置文件和数据:

mkdir -p ~/mosquitto/config ~/mosquitto/data ~/mosquitto/log

创建配置文件 mosquitto.conf:

nano ~/mosquitto/config/mosquitto.conf

添加以下内容:

persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
listener 1883
allow_anonymous true
listener 1883:MQTT 默认端口
allow_anonymous true:允许匿名连接(生产环境建议关闭)

3. 启动 Mosquitto 容器

docker run -d \--name mosquitto \-p 1883:1883 \-v ~/mosquitto/config:/mosquitto/config \-v ~/mosquitto/data:/mosquitto/data \-v ~/mosquitto/log:/mosquitto/log \eclipse-mosquitto

4. 验证 Broker 是否运行

docker ps | grep mosquitto

如果看到运行中的容器,说明 Broker 部署成功。

第二步:Java 客户端实现

我们将使用 Eclipse Paho Java 客户端库来实现 MQTT 客户端。

1. 添加依赖

如果使用 Maven,在pom.xml中添加:

<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>

2. MQTT 工具类

首先创建一个工具类封装 MQTT 连接的通用功能:

//运行
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MQTTUtils {// MQTT Broker地址private static final String BROKER = "tcp://localhost:1883";/*** 创建MQTT客户端并连接到Broker* @param clientId 客户端ID,应唯一* @return 已连接的MQTT客户端* @throws MqttException 连接异常*/public static MqttClient connect(String clientId) throws MqttException {// 设置客户端持久化方式为内存MemoryPersistence persistence = new MemoryPersistence();// 创建客户端MqttClient client = new MqttClient(BROKER, clientId, persistence);// 配置连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true); // 清除会话connOpts.setConnectionTimeout(10); // 连接超时时间connOpts.setKeepAliveInterval(20); // 心跳间隔// 连接到BrokerSystem.out.println("Connecting to broker: " + BROKER);client.connect(connOpts);System.out.println("Connected");return client;}/*** 发布消息* @param client MQTT客户端* @param topic 消息主题* @param content 消息内容* @param qos 服务质量等级 (0, 1, 2)* @throws MqttException 发布异常*/public static void publish(MqttClient client, String topic, String content, int qos) throws MqttException {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(topic, message);System.out.println("Published message: " + content + " to topic: " + topic);}/*** 订阅主题* @param client MQTT客户端* @param topic 要订阅的主题* @param qos 服务质量等级* @throws MqttException 订阅异常*/public static void subscribe(MqttClient client, String topic, int qos) throws MqttException {System.out.println("Subscribing to topic: " + topic);client.subscribe(topic, qos);}
}

3. 订阅者客户端实现

//运行
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MQTTSubscriber {// 订阅的主题private static final String TOPIC = "test/topic";// 客户端IDprivate static final String CLIENT_ID = "subscriber-client";// QoS等级private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 连接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 设置消息监听器client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message on topic: " + topic);System.out.println("Message content: " + new String(message.getPayload()));System.out.println("QoS: " + message.getQos());}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 对于订阅者来说,这个方法通常不需要实现}});// 订阅主题MQTTUtils.subscribe(client, TOPIC, QOS);// 保持客户端运行以接收消息System.out.println("Waiting for messages...");while (true) {Thread.sleep(1000);}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}

4. 发布者客户端实现

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;public class MQTTPublisher {// 发布的主题private static final String TOPIC = "test/topic";// 客户端IDprivate static final String CLIENT_ID = "publisher-client";// QoS等级private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 连接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 发布几条测试消息for (int i = 1; i <= 5; i++) {String message = "Hello, MQTT! This is message " + i;MQTTUtils.publish(client, TOPIC, message, QOS);Thread.sleep(2000); // 间隔2秒发送一条}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}

第三步:运行和测试

1. 启动订阅者

首先运行MQTTSubscriber类,它会连接到 Broker 并开始等待接收消息:

Connecting to broker: tcp://localhost:1883
Connected
Subscribing to topic: test/topic
Waiting for messages...

2. 启动发布者

然后运行MQTTPublisher类,它会发送 5 条消息到指定主题:

Connecting to broker: tcp://localhost:1883
Connected
Published message: Hello, MQTT! This is message 1 to topic: test/topic
Published message: Hello, MQTT! This is message 2 to topic: test/topic

3. 查看结果

在订阅者的控制台,你应该能看到接收到的消息:

Received message on topic: test/topic
Message content: Hello, MQTT! This is message 1
QoS: 1
Received message on topic: test/topic
Message content: Hello, MQTT! This is message 2
QoS: 1

总结
本文介绍了 MQTT 的基本概念,展示了如何使用 Docker 快速部署 Mosquitto Broker,并通过 Java 代码实现了 MQTT 客户端的发布和订阅功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/91906.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/91906.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

公网服务器上Nginx或者Openresty如何屏蔽IP直接扫描

0x01 背景云服务器很多时候为了通信需要设置公网访问&#xff0c;但是网络当中存在很多的扫描器&#xff0c;无时无刻在扫描&#xff0c;当80,443端口暴露时&#xff0c;成了这些扫描IP的攻击对象&#xff0c;无时无刻收到威胁。0x02 扫描攻击方式1.直接通过公网IP地址进行一些…

C语言(长期更新)第8讲 函数递归

C语言&#xff08;长期更新&#xff09; 第8讲:函数递归 跟着潼心走&#xff0c;轻松拿捏C语言&#xff0c;困惑通通走&#xff0c;一去不回头~欢迎开始今天的学习内容&#xff0c;你的支持就是博主最大的动力。 目录 C语言&#xff08;长期更新&#xff09; 第8讲 函数递归…

[硬件电路-129]:模拟电路 - 继电器的工作原理、关键指标、常用芯片与管脚定义

一、工作原理继电器是一种基于电磁感应原理的自动开关装置&#xff0c;通过控制小电流电路实现大电流电路的通断。其核心结构包括&#xff1a;电磁铁&#xff08;线圈铁芯&#xff09;&#xff1a;通电时产生磁场&#xff0c;吸引衔铁动作。触点系统&#xff1a;包含常开触点&a…

Haproxy调度算法 - 静态算法介绍与使用

文章目录一、概述二、socat工具三、static-rr四、firstHAProxy通过固定参数 balance 指明对后端服务器的调度算法&#xff0c;该参数可以配置在listen或backend选项中。HAProxy的调度算法分为静态和动态调度算法&#xff0c;但是有些算法可以根据参数在静态和动态算法中相互转换…

模拟激光相机工作站版本6.0 5.2.32 6.0.44 6.031 5.2.20

模拟激光相机工作站版本6.0 5.2.32 6.0.44 6.031 5.2.20

AWS Blockchain Templates:快速部署企业级区块链网络的终极解决方案

无需精通底层架构&#xff0c;一键搭建Hyperledger Fabric或以太坊网络&#xff01;AWS Blockchain Templates 可帮助您快速基于不同的区块链框架在 AWS 上创建和部署区块链网络。区块链是一种分布式数据库技术&#xff0c;用于维护不断增长的交易记录和智能合约集合&#xff0…

Vue 服务端渲染 Nuxt 使用详解

Nuxt 是基于 Vue 的高层框架&#xff0c;专注于服务器端渲染应用开发。它封装了繁琐的配置和通用模式&#xff0c;提供了开箱即用的 SSR 功能&#xff0c;使开发者能够专注于编写业务逻辑。 1. Nuxt 的核心特性 SSR 支持&#xff1a;默认支持服务端渲染&#xff0c;提高应用性…

使用ACK Serverless容器化部署大语言模型FastChat

核心概念 阿里云ACK Serverless&#xff1a;是一种基于 Kubernetes 的无服务器容器服务。用户无需管理底层节点和服务器&#xff0c;即可快速部署容器化应用&#xff0c;并根据实际使用的 CPU 和内存资源按需付费&#xff0c;只专注于应用本身而非基础设施管理。 FastChat&…

最新Android Studio汉化教程--兼容插件包

[ ] 软件版本&#xff1a;Android Studio Meerkat Feature Drop | 2024.3.2 Build #AI-243.25659.59.2432.13423653, built on April 30, 2025 Runtime version: 21.0.613368085-b895.109 amd64 VM: OpenJDK 64-Bit Server VM by JetBrains s.r.o. Toolkit: sun.awt.windows.WT…

Unity_数据持久化_IXmlSerializable接口

Unity数据持久化 三、XML数据持久化 3.5 IXmlSerializable接口 3.5.1 IXmlSerializable接口基础概念 什么是IXmlSerializable接口&#xff1a; IXmlSerializable 是.NET框架提供的一个接口&#xff0c;允许类自定义XML序列化和反序列化的过程。当默认的XML序列化行为无法满足需…

如何快速解决PDF解密新方法?

有时从网络下载的PDF文档会带有加密限制&#xff0c;导致无法编辑、复制或打印。它的体积仅约10MB&#xff0c;无需安装&#xff0c;解压即用。遇到受限制的文件时&#xff0c;只需将其拖入界面&#xff0c;选择是否覆盖原文件&#xff0c;点击执行&#xff0c;瞬间完成解密。「…

译|数据驱动智慧供应链的构成要素与关联思考

数据质量&#xff0c;通过识别关键决策和瓶颈构建信息供应链。该模型适用于优化库存管理、自动化物流、预测需求、实现产品全生命周期追溯及应对突发风险。例如&#xff0c;通过AI机器人自动管理仓库&#xff0c;或利用数字孪生模拟和优化全球采购网络。 汇总来自三篇文章&…

OS21.【Linux】环境变量

目录 1.与环境变量有关的实验 A.对比命令和自制程序的运行 为什么.像ls、pwd这样的命令运行是不需要加路径? 执行自制程序而不加路径的方法,看看PATH环境变量 方法1:将自制程序移动到系统的搜索路径下 方法2:临时修改PATH环境变量 B.查看系统中所有环境变量 解释几个常…

加密流量论文复现:《Detecting DNS over HTTPS based data exfiltration》(上)

本文将以我个人的理解去阅读该篇流量加密论文&#xff0c;并在下一篇尽力对其中的实验部分进行复现。话不多说&#xff0c;先从论文开始着手。 内容介绍 传统的DNS(Domain Name System)协议是以明文传输的。DNS作为互联网的基础设施&#xff0c;最初设计时主要考虑的是功能和效…

Apache RocketMQ 中Message (消息)的核心概念

好的&#xff0c;我们来深入理解一下 Apache RocketMQ 中 Message (消息) 这个核心概念。这份文档详细阐述了消息的定义、在模型中的位置、内部属性、约束和使用建议。 你可以将 Message 看作是 RocketMQ 系统中数据传输和处理的最小原子单位。它承载了业务数据&#xff0c;并附…

C 语言问题

1. C语言中 union 与 struct 的区别类型structunion内存分配机制编译器为每个成员‌独立分配内存空间&#xff0c;总内存大小 所有成员大小之和&#xff08;考虑内存对齐&#xff09;所有成员‌共享同一段内存空间&#xff0c;总内存大小 ‌最大成员的大小‌数据存储特性1. 所…

[ LeetCode优选算法专题一双指针-----盛最多的水]

1.题目链接 LeetCode盛最多的水 2.题目描述 3.题目解析 问题本质分析 "盛最多水的容器" 问题可以抽象为&#xff1a;在坐标轴上有 n 条垂直线段&#xff0c;第 i 条线段的两个端点分别是 (i, 0) 和 (i, height [i])。找到两条线段&#xff0c;使得它们与 x 轴共同…

旧笔记本电脑如何安装飞牛OS

01引言随着电子产品的更新换代&#xff0c;我们有很多的电子产品已经满足不了现在的工作需求和日常娱乐了&#xff0c;比如&#xff1a;用了很久厚重笔记本电脑放在现在办公也是有点吃力了&#xff0c;我们现在换新了旧的还不想放在那里吃灰&#xff0c;怎么办呢&#xff1f;我…

某金服Java面试终极指南:25题完整解析与场景化方案

涵盖分布式锁、缓存、事务、高并发等金融系统核心考点&#xff0c;附解决方案与抗风险设计一、分布式锁深度解决方案 1. Redis分布式锁完整实现 // 原子加锁 防死锁 String uuid UUID.randomUUID().toString(); Boolean locked redisTemplate.opsForValue().setIfAbsent(&qu…

MATLAB 2025a的下载以及安装,安装X310的测试附加功能(附加安装包)

首先将安装包下载到本地中之后解压该文件夹&#xff0c;打开文件发现有两个文件&#xff0c;其中crach文件夹中是破解matlab所用到的文件。而另一个压缩包就是需要安装的文件&#xff0c;要先解压在安装。在安装之前将网络断开&#xff0c;不然可能破解不成功&#xff0c;先进入…