RocketMq的消息类型及代码案例

RocketMQ 提供了多种消息类型,以满足不同业务场景对 顺序性、事务性、时效性 的要求。其核心设计思想是通过解耦 “消息传递模式” 与 “业务逻辑”,实现高性能、高可靠的分布式通信。

一、主要类型包括

  1. 普通消息(基础类型)
  2. 顺序消息(保证消费顺序)
  3. 定时 / 延迟消息(控制投递时间)
  4. 事务消息(分布式事务最终一致性)
  5. 批量消息(提升吞吐量)

二、消息类型及代码示例

1. 普通消息(Normal Message)

描述:最基础的消息类型,支持异步发送、批量发送等模式,适用于无需严格顺序和事务保证的场景(如日志收集、通知推送)。
核心优势:高吞吐量、低延迟,生产端通过负载均衡自动选择 Broker。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;public class NormalMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {// 1. 加载服务提供者(支持SPI扩展)ClientServiceProvider provider = ClientServiceProvider.loadService();// 2. 配置认证信息(密钥管理)String accessKey = "yourAccessKey";String secretKey = "yourSecretKey";StaticSessionCredentialsProvider credentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 3. 构建客户端配置(支持多协议、TLS加密)ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("localhost:8081") // 支持域名或IP:端口列表.setCredentialProvider(credentialsProvider).setRequestTimeout(Duration.ofSeconds(3)) // 请求超时时间.build();// 4. 创建生产者(支持自动重试、批量发送)String topic = "normal-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topic).setMaxAttempts(3) // 发送失败最大重试次数.build();// 5. 构建并发送消息Message message = provider.newMessageBuilder().setTopic(topic).setBody("Hello RocketMQ 5.0!".getBytes()).setTag("order") // 可选标签,用于消息过滤.setKeys("key123") // 消息业务键,用于查询.build();// 同步发送(阻塞当前线程直到返回结果)SendReceipt receipt = producer.send(message);System.out.println("消息发送成功: " + receipt.getMessageId());// 异步发送示例/*producer.sendAsync(message).thenAccept(sendReceipt -> {System.out.println("异步发送成功: " + sendReceipt.getMessageId());}).exceptionally(throwable -> {System.out.println("异步发送失败: " + throwable.getMessage());return null;});*/// 6. 关闭资源(重要!避免内存泄漏)producer.close();}
}
2. 顺序消息(Ordered Message)

描述:通过将同一业务主键的消息路由到相同队列,保证消息消费顺序与发送顺序一致。
应用场景:金融交易流水、订单状态变更、时序数据处理。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.util.List;public class OrderedMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "ordered-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 模拟订单状态变更(同一订单ID的消息必须顺序处理)String[] orderIds = {"order1001", "order1002", "order1001"};String[] orderStatus = {"CREATED", "PAYED", "SHIPPED"};for (int i = 0; i < orderIds.length; i++) {String orderId = orderIds[i];String status = orderStatus[i % orderStatus.length];// 关键:通过MessageGroup确保相同订单的消息发送到同一队列Message message = provider.newMessageBuilder().setTopic(topic).setBody(("订单[" + orderId + "]状态变更为: " + status).getBytes()).setMessageGroup(orderId) // 消息组决定消息路由的队列.setKeys(orderId) // 设置业务键便于查询.build();SendReceipt receipt = producer.send(message);System.out.println("发送顺序消息: " + receipt.getMessageId() + ", 订单ID: " + orderId + ", 状态: " + status);}producer.close();}
}
3. 定时 / 延迟消息(Scheduled/Delay Message)

描述:消息发送后,需等待指定时间(或到达指定时间点)才会被消费者可见。
应用场景:订单超时自动关闭、任务调度、延迟重试。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.time.Duration;
import java.time.Instant;public class DelayMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "delay-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 方式一:使用绝对时间戳(精确到毫秒)long timestamp = Instant.now().plus(Duration.ofMinutes(5)).toEpochMilli();Message messageByTimestamp = provider.newMessageBuilder().setTopic(topic).setBody("5分钟后执行的定时消息".getBytes()).setDeliveryTimestamp(timestamp) // 设置投递时间戳.build();// 方式二:使用预定义延迟级别(需Broker配置支持)Message messageByLevel = provider.newMessageBuilder().setTopic(topic).setBody("延迟30秒的消息".getBytes()).addProperty("DELAY", "3") // 假设3对应30秒(需Broker配置).build();// 发送延迟消息SendReceipt receipt = producer.send(messageByTimestamp);System.out.println("延迟消息发送成功: " + receipt.getMessageId() + ", 将于 " + Instant.ofEpochMilli(timestamp) + " 可见");producer.close();}
}
4. 事务消息(Transactional Message)

描述
通过两阶段提交机制,保证本地事务与消息发送的最终一致性。

核心流程

发送半消息(对消费者不可见)

  1. 执行本地事务
  2. 根据事务结果提交或回滚半消息
  3. 支持事务状态回查(处理超时情况)
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.apis.producer.TransactionalProducer;public class TransactionalMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "transactional-message-topic";TransactionalProducer producer = provider.newTransactionalProducerBuilder().setClientConfiguration(config).setTopics(topic)// 关键:设置事务状态回查处理器(当Broker长时间未收到事务状态时触发).setTransactionChecker(messageView -> {System.out.println("回查事务状态: " + messageView.getBodyAsString());// 根据业务ID查询本地事务状态String bizId = messageView.getKeys().iterator().next();boolean transactionStatus = checkLocalTransactionStatus(bizId);return transactionStatus ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();try {// 1. 开启事务上下文producer.beginTransaction();// 2. 发送半消息(未提交状态)Message message = provider.newMessageBuilder().setTopic(topic).setBody("用户账户扣款成功,通知库存系统扣减".getBytes()).setKeys("order_12345") // 设置业务键,用于回查.build();producer.send(message);// 3. 执行本地事务(如数据库操作)boolean localTransactionResult = executeLocalTransaction();// 4. 根据本地事务结果提交或回滚if (localTransactionResult) {producer.commit(); // 提交事务,消息对消费者可见System.out.println("本地事务执行成功,消息提交");} else {producer.rollback(); // 回滚事务,消息被丢弃System.out.println("本地事务执行失败,消息回滚");}} catch (Exception e) {producer.rollback(); // 异常时回滚e.printStackTrace();} finally {producer.close();}}private static boolean executeLocalTransaction() {// 模拟本地事务:如用户账户扣款System.out.println("执行本地事务...");return true; // 返回事务执行结果}private static boolean checkLocalTransactionStatus(String bizId) {// 模拟查询本地事务状态(如查询数据库订单状态)System.out.println("查询本地事务状态: " + bizId);return true; // 实际应根据业务ID查询真实状态}
}
5. 批量消息(Batch Message)

描述:将多条消息打包为一个批次发送,减少网络开销,提升吞吐量。
注意事项

  • 所有消息必须属于同一 Topic
  • 总大小不能超过 4MB(默认限制,可配置)
  • 不支持事务和延迟属性
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.util.ArrayList;
import java.util.List;public class BatchMessageExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();String topic = "batch-message-topic";Producer producer = provider.newProducerBuilder().setClientConfiguration(config).setTopics(topic).build();// 创建批量消息集合List<Message> messages = new ArrayList<>();for (int i = 0; i < 100; i++) { // 示例:批量发送100条消息Message message = provider.newMessageBuilder().setTopic(topic).setBody(("批量消息-" + i).getBytes()).setKeys("key-" + i).build();messages.add(message);}// 智能拆分大批次(避免超过4MB限制)List<List<Message>> batches = splitMessages(messages);// 发送所有批次for (List<Message> batch : batches) {List<SendReceipt> receipts = producer.send(batch);System.out.println("批量发送成功,共" + receipts.size() + "条消息");}producer.close();}// 智能拆分大批次消息(实际生产中建议实现)private static List<List<Message>> splitMessages(List<Message> messages) {// 简单实现:实际应根据消息大小动态拆分List<List<Message>> result = new ArrayList<>();result.add(messages);return result;}
}
6. 消费者示例(通用)

描述:RocketMQ 5.0 支持 Push 和 Pull 两种消费模式,以下是基于长轮询的 PushConsumer 示例。

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;import java.time.Duration;public class ConsumerExample {public static void main(String[] args) throws ClientException, InterruptedException {ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfiguration config = ClientConfiguration.newBuilder().setEndpoints("localhost:8081").build();// 订阅主题和过滤表达式(支持SQL92语法)String topic = "normal-message-topic";FilterExpression filterExpression = new FilterExpression("TAG = 'order' AND age > 18", // 示例SQL过滤条件FilterExpressionType.SQL92);// 创建PushConsumer(基于长轮询的"伪推"模式)PushConsumer consumer = provider.newPushConsumerBuilder().setClientConfiguration(config).setConsumerGroup("my-consumer-group") // 消费组决定消息负载方式.setSubscriptionExpressions(Map.of(topic, filterExpression)).setMaxPollInterval(Duration.ofSeconds(30)) // 长轮询超时时间.setConsumptionThreadCount(10) // 消费线程数.setMessageListener(messageView -> {try {// 处理消息逻辑(业务代码)System.out.println("接收到消息: " + messageView.getBodyAsString());System.out.println("消息属性: " + messageView.getProperties());// 模拟业务处理耗时Thread.sleep(100);// 返回消费结果(成功/失败)return ConsumeResult.SUCCESS;} catch (Exception e) {// 消费失败时返回RETRY,消息将重试消费System.out.println("消息消费失败: " + e.getMessage());return ConsumeResult.FAILURE;}}).build();// 保持主线程运行,避免消费者立即关闭System.out.println("消费者已启动,按Ctrl+C退出...");Thread.sleep(Long.MAX_VALUE);}
}

关键依赖配置(Maven)

<dependencies><!-- RocketMQ 5.0 Java客户端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.1.0</version></dependency><!-- gRPC依赖 --><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.54.0</version></dependency><!-- 序列化依赖 --><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.21.9</version></dependency>
</dependencies>

三、最佳实践建议

  1. 连接配置

    • 生产环境建议使用域名而非 IP,支持动态扩容
    • 开启 TLS 加密(通过ClientConfiguration.setSslTrustStorePath
  2. 消息大小

    • 单条消息建议不超过 1MB
    • 批量消息总大小不超过 4MB(可通过producer.setMaxMessageSize调整)
  3. 异常处理

    • 生产者需捕获ClientException并实现重试逻辑
    • 消费者应避免长时间阻塞,建议使用异步处理
  4. 性能调优

    • 生产者:调整sendMsgTimeoutmaxAttempts参数
    • 消费者:根据业务吞吐量调整consumptionThreadCount
  5. 监控告警

    • 监控 Topic 的 TPS、RT、堆积量等指标
    • 配置告警阈值(如单队列堆积超过 10 万条)

四、总结

RocketMQ支持多种消息类型以满足不同业务需求:普通消息适用于高吞吐场景;顺序消息保证消费顺序;定时/延迟消息控制投递时间;事务消息确保分布式事务一致性;批量消息提升吞吐量。每种类型都提供了对应的Java代码示例,包括生产者配置、消息构建和发送逻辑。最佳实践建议包括合理配置连接、控制消息大小、完善异常处理、性能调优和监控告警。通过解耦消息传递与业务逻辑,RocketMQ实现了高性能、高可靠的分布式通信能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/84396.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

maxkey单点登录系统

github地址 https://github.com/MaxKeyTop/MaxKey/blob/master/README_zh.md 1、官方镜像 https://hub.docker.com/u/maxkeytop 2、MaxKey:Docker快速部署 参考地址&#xff1a; Docker部署 | MaxKey单点登录认证系统 拉取docker脚本MaxKey: Dromara &#x1f5dd;️MaxK…

基于AI生成测试用例的处理过程

基于AI生成测试用例的处理过程是一个结合机器学习、自然语言处理&#xff08;NLP&#xff09;和领域知识的系统性流程。以下是其核心步骤和关键技术细节&#xff0c;以帮助理解如何利用AI自动化生成高效、覆盖全面的测试用例。 1. 输入分析与需求建模 目标 将用户需求、系统文…

《Java vs Go vs C++ vs C:四门编程语言的深度对比》

引言​​ 从底层硬件操作到云端分布式系统&#xff0c;Java、Go、C 和 C 四门语言各自占据不同生态位。本文从​​设计哲学​​、​​语法范式​​、​​性能特性​​、​​应用场景​​等维度进行对比&#xff0c;为开发者提供技术选型参考。 一、​​设计哲学与历史定位​​…

无损提速黑科技:YOLOv8+OREPA卷积优化方案解析(原理推导/代码实现/调参技巧三合一)

文章目录 一、OREPA核心思想与创新突破1.1 传统重参数化的局限性1.2 OREPA的核心创新二、OREPA实现原理与数学推导2.1 卷积核分解策略2.2 动态融合公式三、YOLOv8集成实战(完整代码实现)3.1 OREPA卷积模块定义3.2 YOLOv8模型集成3.3 训练与推理配置四、性能对比与实验分析4.1…

RestTemplate 发送的字段第二个大写字母变成小写的问题探究

在使用RestTemplate 发送http 请求的时候&#xff0c;发现nDecisonVar 转换成了ndecisonVar ,但是打印日志用fastjson 打印的没有问题&#xff0c;换成jackson 打印就有问题。因为RestTemplate 默认使用的jackson 作为json 序列化方式&#xff0c;导致的问题&#xff0c;但是为…

C#核心概念解析:析构函数、readonly与this关键字

&#x1f50d; 析构函数&#xff1a;资源清理的最后防线 核心作用 析构函数&#xff08;~ClassName&#xff09;在对象销毁前执行&#xff0c;专用于释放非托管资源&#xff08;如文件句柄、非托管内存&#xff09;。托管资源&#xff08;如.NET对象&#xff09;由GC自动回收…

FFmpeg中使用Android Content协议打开文件设备

引言 随着Android 10引入的Scoped Storage&#xff08;分区存储&#xff09;机制&#xff0c;传统的文件访问方式发生了重大变化。FFmpeg作为强大的多媒体处理工具&#xff0c;也在不断适应Android平台的演进。本文将介绍如何在FFmpeg 7.0版本中使用Android content协议直接访…

vue——v-pre的使用

&#x1f530; 基础理解 ✅ 什么是 v-pre&#xff1f; v-pre 是一个跳过编译的 Vue 指令。 它告诉 Vue&#xff1a;“这个元素和其子元素中的内容不要被编译处理&#xff0c;按原样输出。” ✅ 使用场景&#xff1a; 展示原始的 Mustache 插值语法&#xff08;{{ xxx }}&a…

PyTorch中TensorBoardX模块与torch.utils.tensorboard模块的对比分析

文章目录 说明1. 模块起源与开发背景2. 功能特性对比3. 安装与依赖关系4. 性能与使用体验5. 迁移与兼容性策略6. 最佳实践与建议7. 未来展望8. 结论实际相关信息推荐资源 说明 TensorBoard&#xff1a;独立工具&#xff0c;只需安装tensorboard。TensorFlow&#xff1a;非必需…

单片机中断系统工作原理及定时器中断应用

文件目录 main.c #include <REGX52.H> #include "TIMER0.H" #include "KEY.H" #include "DELAY.H"//void Timer0_Init() { // TMOD 0x01; // TL0 64536 % 256; // TH0 64536 / 256; // ET0 1; // EA 1; // TR0 1; //}unsigned char…

Python爬虫实战:研究Portia框架相关技术

1. 引言 1.1 研究背景与意义 在大数据时代,网络数据已成为企业决策、学术研究和社会分析的重要资源。据 Statista 统计,2025 年全球数据总量将达到 175ZB,其中 80% 以上来自非结构化网络内容。如何高效获取并结构化这些数据,成为数据科学领域的关键挑战。 传统爬虫开发需…

【机器学习基础】机器学习与深度学习概述 算法入门指南

机器学习与深度学习概述 算法入门指南 一、引言&#xff1a;机器学习与深度学习&#xff08;一&#xff09;定义与区别&#xff08;二&#xff09;发展历程&#xff08;三&#xff09;应用场景 二、机器学习基础&#xff08;一&#xff09;监督学习&#xff08;二&#xff09;无…

[C语言初阶]扫雷小游戏

目录 一、原理及问题分析二、代码实现2.1 分文件结构设计2.2 棋盘初始化与打印2.3 布置雷与排查雷2.4 游戏主流程实现 三、后期优化方向 在上一篇文章中&#xff0c;我们实现了我们的第二个游戏——三子棋小游戏。这次我们继续结合我们之前所学的所有内容&#xff0c;制作出我们…

ROS云课三分钟-破壁篇GCompris-一小部分支持Edu应用列表-2025

开启蓝桥云课ROS ROS 机器人操作系统初级教程_ROS - 蓝桥云课 安装和使用GCompris 终端输入&#xff1a;sudo apt install gcompris sudo apt install gcompris ok&#xff0c;完成即可。 sudo apt install gcompris 如果是平板&#xff0c;秒变儿童学习机。 启动 流畅运…

Linux系统基础——是什么、适用在哪里、如何选

一、Linux是什么 Linux最初是由林纳斯托瓦兹&#xff08;Linus Torvalds&#xff09;基于个人兴趣爱好开发的个人项目&#xff0c;他编写了最核心的内核&#xff1b;后面为了发展壮大Linux系统他将整个项目开源到GitHub上&#xff0c;可以让全世界的人都参与到项目的开发维护中…

26、AI 预测性维护 (燃气轮机轴承) - /安全与维护组件/ai-predictive-maintenance-turbine

76个工业组件库示例汇总 AI 预测性维护模拟组件 (燃气轮机轴承) 概述 这是一个交互式的 Web 组件,旨在模拟基于 AI 的预测性维护 (Predictive Maintenance, PdM) 概念,应用于工业燃气轮机的关键部件(例如轴承)。它通过模拟传感器数据、动态预测剩余使用寿命 (RUL),并根…

el-form 使用el-row el-col对齐 注意事项

1.el-form 使用inline&#xff0c;el-form-item宽度会失效。 2.为了保证el-form-item 和 它内部的el-input 能在一行&#xff0c;要设置el-form-item的label-width <el-form :model"editInspectform"><el-row style"margin-bottom: 20px"><…

mac 安装 mysql 和 mysqlshell

1. 安装 mysql https://dev.mysql.com/downloads/mysql/?spma2c6h.12873639.article-detail.4.37474f4dTHdszC 默认mysql未配置环境变量&#xff0c;可以在设置中找到 2. 安装 mysqlshell https://dev.mysql.com/downloads/shell/ #启动mysql-shell mysqlsh 3. 使用 mysq…

漏洞检测与渗透检验在功能及范围上究竟有何显著差异?

漏洞检测与渗透检验是确保系统安全的重要途径&#xff0c;这两种方法各具特色和功效&#xff0c;它们在功能上有着显著的差异。 目的不同 漏洞扫描的主要任务是揭示系统内已知的安全漏洞和隐患&#xff0c;这就像是对系统进行一次全面的健康检查&#xff0c;看是否有已知的疾…

机器学习模型度量指标(混淆矩阵、准确率、精确率、召回率、F1分数、ROC曲线、AUC、平均精度均值)

我们研究的是多分类问题&#xff0c;下面所有例子以多分类问题举例 混淆矩阵&#xff08;Confusion Matrix&#xff09; 混淆矩阵&#xff08; Confusion Matrix &#xff09;是一个表格&#xff0c;用于可视化机器学习模型在分类问题上 的性能。混淆矩阵的行表示实际类别&…