RocketMQ、RabbitMQ与Kafka对比及常见问题解决方案
一、概述
消息队列(Message Queue, MQ)是企业IT系统内部通信的核心手段,用于提升性能、实现系统解耦和流量削峰。它具有低耦合、可靠投递、广播、流量控制、最终一致性等功能,是异步RPC的主要手段之一。当前主流消息中间件包括ActiveMQ、RabbitMQ、Kafka和RocketMQ等。本文对比RocketMQ、RabbitMQ和Kafka,并总结常见问题的解决方案。
Kafka是一种分布式流处理平台,最初由LinkedIn开发,现由Apache维护,专注于高吞吐量、持久化和实时数据流处理,广泛用于大数据和日志聚合场景。
二、特性对比
1. RocketMQ
RocketMQ是阿里巴巴自主研发的分布式消息中间件,具有高性能、高可靠性和高可扩展性,广泛应用于高并发场景。
- NameServer:轻量级服务协调与治理组件,负责记录和维护Topic、Broker信息,监控Broker运行状态。NameServer几乎无状态,可集群部署,节点间无信息同步,类似注册中心。
- Broker:消息服务器,提供核心消息服务。每个Broker与NameServer集群中的所有节点保持长连接,定时注册Topic信息。
- Producer:消息生产者,负责生成消息并发送至Broker。
- Consumer:消息消费者,从Broker获取消息并处理业务逻辑。
2. RabbitMQ
RabbitMQ是基于AMQP协议的开源消息中间件,注重灵活的路由机制和易用性,适合中小型企业或复杂路由场景。
- Exchange:交换机,根据路由规则将消息转发到对应队列。
- Broker:消息服务器,提供核心消息服务。
- Channel:基于TCP连接的虚拟连接,用于消息传输。
- Routing Key:生产者发送消息时携带的键,指定消息路由规则。
- Binding Key:绑定Exchange与Queue时指定的键,Routing Key与Binding Key匹配时,消息被路由到对应Queue。
3. Kafka
Kafka是分布式、持久化的消息系统,设计为高吞吐量的日志流处理平台,支持分区和副本机制,适合大数据管道和实时分析。
- Topic:消息主题,可分为多个分区(Partition),每个分区是一个有序的日志序列,支持并行消费。
- Broker:Kafka服务器节点,负责存储和转发消息。多个Broker组成集群。
- Producer:消息生产者,将消息发送到指定Topic的分区,支持分区策略(如轮询、键哈希)。
- Consumer:消息消费者,通过Consumer Group实现负载均衡,每个消费者订阅Topic并消费分区消息。
- Zookeeper/KRaft:早期依赖Zookeeper进行元数据管理和选举,新版使用KRaft(Kafka Raft)模式实现内置共识,无需Zookeeper。
三、常见问题及解决方案
1. 重复消费
问题描述:消费者消费消息后需发送确认消息(ACK)给消息队列,通知消息已被消费。若确认消息因网络故障等原因未送达,消息队列可能重复分发消息给其他消费者。
解决方案:
-
保证消息幂等性
:确保消息多次消费不影响结果。常见方法:
- 使用唯一消息ID,消费者检查是否已处理。
- 数据库操作使用唯一约束或版本号控制。
-
RocketMQ
- 消费者在业务逻辑处理完成后发送ACK,确保消息被正确消费。
- 使用事务性消息或本地事务状态表,防止重复消费影响业务。
-
RabbitMQ
- 采用手动确认模式(Manual ACK),处理消息成功后再回复确认。
- 消费者通过检查消息的唯一标识(如Message ID)避免重复处理。
-
Kafka
- 消费者管理Offset(消费偏移量),手动提交Offset(disable auto-commit)。
- 如果消费者崩溃未提交Offset,重启后从上次Offset消费,可能重复;通过幂等操作或Exactly-Once语义(启用idempotence)处理。
- Consumer Group中,Rebalance可能导致重复消费,使用唯一ID或状态存储(如数据库)确保幂等。
2. 数据丢失
问题描述:消息可能在生产者、消息队列或消费者端丢失,导致业务异常。
RocketMQ
- 生产者丢数据
- 使用同步发送(send()),同步感知发送结果,失败可重试(默认重试3次)。
- 失败消息存储在CommitLog中,支持后续重试。
- 消息队列丢数据
- 消息持久化到CommitLog,即使Broker宕机后重启,未消费消息可恢复。
- 支持同步刷盘(确保消息写入磁盘)和异步刷盘(高性能但可能丢失少量数据)。
- Broker集群支持1主N从,同步复制确保主节点磁盘故障不丢失消息,异步复制性能更高但有毫秒级延迟。
- 消费者丢数据
- 完全消费成功后发送ACK。
- 维护持久化的Offset记录消费进度,防止因故障丢失消费状态。
RabbitMQ
- 生产者丢数据
- 使用事务模式(支持回滚)或Confirm模式(ACK确认),确保生产者可靠发送。
- 消息队列丢数据
- 开启消息持久化,消息写入磁盘后通知生产者ACK。
- 配合Confirm机制,确保消息持久化到磁盘。
- 消费者丢数据
- 禁用自动确认模式,改为手动确认(Manual ACK),确保消息处理成功后再确认。
- 消费者维护消费状态,避免因故障重复消费或丢失。
Kafka
- 生产者丢数据
- 配置acks参数:acks=0(不确认,高性能可能丢失)、acks=1(Leader确认)、acks=all(所有副本确认,确保不丢失)。
- 启用重试和幂等生产者(enable.idempotence=true),防止重复发送。
- 消息队列丢数据
- 消息持久化到日志文件(Log Segments),支持配置保留策略(时间或大小)。
- 通过Replication Factor(副本因子)设置分区副本数,Leader-Follower机制确保高可用;min.insync.replicas配置最小同步副本数。
- Broker宕机时,副本可选举新Leader,消息不丢失(视副本配置)。
- 消费者丢数据
- 禁用自动提交Offset(enable.auto.commit=false),手动提交确保处理成功。
- 如果自动提交,处理中崩溃可能丢失消息;使用Exactly-Once语义结合事务处理。
3. 消费顺序
问题描述:某些业务场景要求消息按顺序消费,但分布式系统或多线程消费可能导致乱序。
解决方案:
- 单线程消费:保证队列内消息按顺序处理,但可能影响性能。
- 消息编号:为消息分配序列号,消费者根据编号判断顺序。
- Queue有序性
- 消息队列内部数据天然有序。
- 消费者端通过单线程消费或内存队列排序,确保顺序处理。
- RocketMQ
- 使用顺序消息(Sequential Message),将相关消息发送到同一分区,保证分区内顺序。
- 消费者单线程拉取并处理分区消息。
- RabbitMQ
- 利用Queue的FIFO特性,单线程消费确保顺序。
- 多线程消费时,消费者内部维护内存队列进行排序。
- Kafka
- 分区内消息严格有序(append-only日志),但多分区无全局顺序。
- 对于顺序需求,将相关消息发送到同一分区(基于键哈希)。
- 消费者组中,每个分区分配给单一消费者,确保分区内顺序;多线程消费需消费者内部协调。
4. 高可用
问题描述:消息队列需保证高可用,防止单点故障导致服务不可用。
RocketMQ
- 多Master模式
- 配置简单,性能最高。
- 单机宕机或重启期间,该机器未消费消息不可用,影响实时性。
- 可能有少量消息丢失(视配置)。
- 多Master多Slave异步模式
- 每Master配一个Slave,消息写入Master,异步复制到Slave。
- 性能接近多Master,实时性高,主备切换对应用透明。
- Master宕机或磁盘损坏可能丢失少量消息(毫秒级延迟)。
- 多Master多Slave同步模式
- 每Master配一个Slave,消息同步写入主备,成功后返回。
- 服务和数据可用性高,但性能略低于异步模式。
- 主节点宕机后,备节点无法自动切换为主,需人工干预。
RabbitMQ
- 普通集群模式
- 多台机器运行RabbitMQ实例,Queue仅存储在一个实例上,其他实例同步元数据。
- 消费时,若连接到非Queue所在实例,会从Queue所在实例拉取数据。
- 若Queue所在实例宕机,需等待其恢复(持久化消息不丢失),影响实时性。
- 镜像集群模式
- Queue的元数据和消息同步到多个实例,写入消息时自动同步到所有实例的Queue。
- 优点:高可用,单节点宕机不影响服务。
- 缺点:
- 数据同步导致性能开销大。
- 无法线性扩容,因每个节点存储全量数据,单节点容量受限。
Kafka
- 分布式Broker集群
- 通过分区和副本机制实现高可用,每个分区有多个副本分布在不同Broker。
- Leader选举由Controller(基于Zookeeper或KRaft)管理,Broker宕机时自动切换到Follower副本。
- 支持水平扩展,添加Broker可重新分配分区,提高吞吐量。
- 优点:高吞吐(百万级TPS),数据持久化强,适合大规模数据流。
- 缺点:配置复杂,依赖外部协调(如Zookeeper,KRaft缓解);实时性不如RabbitMQ,但延迟低(毫秒级)。
四、总结
- RocketMQ:适合高并发交易场景,强调性能和分布式架构,NameServer和Broker设计支持大规模集群。数据丢失防护完善,适合对实时性要求高的场景,但在同步模式下主备切换需人工干预。
- RabbitMQ:基于AMQP协议,灵活的路由机制适合复杂路由场景,易用性强。但镜像集群性能开销大,扩展性受限,适合中小规模应用。
- Kafka:专注于高吞吐量和数据流处理,分区机制支持并行消费和扩展,适合日志、大数据管道。但不原生支持复杂路由,顺序消费限于分区内,配置较复杂。
- 常见问题解决方案
- 重复消费:三者均通过幂等性和手动确认/提交Offset解决。
- 数据丢失:RocketMQ和Kafka通过主从/副本复制,RabbitMQ通过持久化和Confirm。
- 消费顺序:利用分区/队列有序性,结合单线程或键分区。
- 高可用:Kafka的分布式副本最强扩展性,RabbitMQ镜像集群数据一致性高,RocketMQ平衡性能与可靠性。
市场上几大消息队列对比如下:
对比项 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
公司 | Rabbit | Apache | 阿里 | Apache |
语言 | Erlang | Java | Java | Scala & Java |
协议支持 | AMQP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义 | 自定义协议,社区封装了 HTTP 协议支持 |
客户端支持语言 | 官方支持 Erlang、Java、Ruby 等,社区产出多种 API,几乎支持所有语言 | Java、C、C++、Python、PHP、Perl、.NET 等 | Java、C++(不成熟) | 官方支持 Java,社区产出多种 API,如 PHP、Python 等 |
单机吞吐量 | 万级(约 3 万) | 万级(约 4 万) | 十万级(约 10 万) | 十万级(约 10 万) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
可用性 | 高,基于主从架构实现可用性 | 高,基于主从架构实现可用性 | 非常高,分布式架构 | 非常高,分布式架构,一个数据多副本 |
消息可靠性 | - | 有较低概率丢失数据 | 经过参数优化配置,可做到零丢失 | 经过参数配置,可做到零丢失 |
功能支持 | 基于 Erlang 开发,并发性能极强,性能极好,延时低 | MQ 领域功能极其完备 | MQ 功能较为完备,分布式扩展性好 | 功能较为简单,主要支持基本 MQ 功能 |
优势 | Erlang 开发,性能极好、延时低,吞吐量万级,功能完备,管理界面优秀,社区活跃,互联网公司使用多 | 成熟稳定,功能强大,业内大量应用 | 接口简单易用,阿里出品,吞吐量大,分布式扩展方便,社区活跃,支持大规模 Topic 和复杂业务场景,可定制开发 | 超高吞吐量,毫秒级延时,极高可用性和可靠性,分布式扩展方便 |
劣势 | 吞吐量较低,Erlang 开发不易定制,集群动态扩展麻烦 | 偶尔有低概率消息丢失,社区活跃度不高 | 不遵循 JMS 规范,系统迁移需改大量代码,存在被替代风险 | 可能发生消息重复消费 |
应用 | 各类场景均有使用 | 主要用于解耦和异步,较少用于大规模吞吐 | 适用于大规模吞吐、复杂业务场景 | 大数据实时计算、日志采集等场景的业界标准 |
选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多,其它几种:
RabbitMQ:
优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置
缺点:性能和吞吐量不太理想,不易进行二次开发
RocketMQ:
优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区
缺点:兼容性上不是太好
Kafka:
优点:拥有强大的性能及吞吐量,兼容性很好
缺点:由于“攒一波再处理”导致延迟比较高
RocketMQ专栏
1. 推模式(Push)与拉模式(Pull)的区别与实现
推模式:RocketMQ 的 PushConsumer 实际基于长轮询(Long Polling)实现,Broker 收到请求后若队列无消息,会挂起请求并在新消息到达时立即响应。
拉模式:消费者主动拉取,需自行控制频率(如 DefaultLitePullConsumer),适用于需精准控制消费速率的场景。
对比:
推模式实时性高,但需 Broker 维护连接状态,可能因消费能力不足导致积压。
拉模式灵活性高,但需处理消息延迟与空轮询问题。
2. 如何保证消息顺序性?
生产者:通过 MessageQueueSelector 将同一业务 ID 的消息发送至固定队列(如哈希取模)。
消费者:使用 MessageListenerOrderly 监听器,锁定队列并单线程消费。
源码关键点:RebalanceLockManager 管理队列锁,确保同一队列仅被一个线程消费。
3. 事务消息的实现机制
两阶段提交:
发送 Half 消息(预提交),Broker 存储但暂不投递。
执行本地事务,返回 Commit/Rollback 状态。
Broker 根据状态投递或删除消息,若未收到确认则发起事务回查。
应用场景:跨系统分布式事务(如订单创建与库存扣减)。
4. 消息积压的解决方案
临时扩容:增加 Consumer 实例或线程数,提升消费能力。
批量消费:调整 consumeMessageBatchMaxSize 参数,一次处理多条消息。
跳过非关键消息:若允许部分消息丢失,可重置消费位点(resetOffsetByTime)。
异步处理:将耗时操作(如 DB 写入)异步化,减少消费阻塞。
5. 消息的存储结构是怎样的?CommitLog 和 ConsumeQueue 的关系?
CommitLog 存储原始消息,ConsumeQueue 存储逻辑队列的偏移量,通过偏移量快速定位消息。
6. Consumer 的负载均衡策略是什么?
平均分配、一致性 Hash 等,通过 RebalanceService 定时调整队列分配。
7. 如何实现消息的精准一次投递?
RocketMQ 不保证,需业务端结合事务消息 + 幂等性实现。
8. Broker 的刷盘机制如何选择?
高可靠性场景用 SYNC_FLUSH,高性能场景用 ASYNC_FLUSH。
9. NameServer 宕机后,Producer 和 Consumer 还能工作吗?
可以,客户端会缓存路由信息,但无法感知新 Broker 或 Topic 变化。
10. 性能调优
Broker 参数:
sendMessageThreadPoolNums:发送线程数。
pullMessageThreadPoolNums:拉取线程数。
零拷贝技术:通过 MappedFile 内存映射文件减少数据拷贝。
11. Broker 如何处理拉取请求?
长轮询机制:Consumer 拉取请求无消息时,Broker 挂起请求(默认 30s),新消息到达后立即响应。
源码关键点:PullRequestHoldService 管理挂起请求,通过 checkHoldRequest 周期性检查消息到达。
12. RocketMQ 消息存储结构:CommitLog 与 ConsumeQueue 的关系
CommitLog:所有 Topic 的消息按顺序追加写入,文件名格式为 {文件起始偏移量}.log,固定大小 1GB(可配置)。
ConsumeQueue:逻辑队列索引,存储消息在 CommitLog 中的偏移量、大小、Tag HashCode,文件名格式为 {Topic}/{QueueId}/{ConsumeQueueOffset}。
关系:消费者通过 ConsumeQueue 快速定位 CommitLog 中的消息,实现高效检索。
13. 主从同步机制(SYNC/ASYNC)的区别与选型
SYNC_MASTER:
生产者收到 Slave 写入成功 ACK 后才返回,保证数据强一致。
适用场景:金融交易、资金扣减。
ASYNC_MASTER:
主节点写入成功即返回,Slave 异步复制,性能更高。
适用场景:日志传输、允许短暂不一致。
14. 消息重试与死信队列(DLQ)机制
重试队列:消费失败的消息进入重试队列(命名格式:%RETRY%{ConsumerGroup}),按延迟等级(1s, 5s, 10s…)重试。
死信队列:重试 16 次后仍失败,消息进入死信队列(%DLQ%{ConsumerGroup}),需人工处理。
配置参数:maxReconsumeTimes(默认 16 次)。
15. 如何实现消息轨迹(Trace)追踪?
开启方式:Broker 配置 traceTopicEnable=true,Producer/Consumer 设置 enableMsgTrace=true。
原理:消息发送/消费时,额外生成轨迹数据写入内部 Topic RMQ_SYS_TRACE_TOPIC。
查询工具:RocketMQ Console 或自定义消费者订阅轨迹 Topic。
16. Rebalance 机制如何工作?
触发条件:Consumer 数量变化、Broker 上下线、Topic 路由变更。
流程:
客户端定时向 Broker 发送心跳,上报 Consumer Group 信息。
Broker 通过 RebalanceService 计算队列分配策略(平均分配、一致性 Hash)。
Consumer 根据新分配结果调整拉取队列。
源码入口:RebalanceImpl#rebalanceByTopic。
17. RocketMQ 5.0 新特性(如 Proxy 模式)
Proxy 模式:解耦 Broker 与客户端协议,支持多语言客户端(如 HTTP/gRPC),增强云原生兼容性。
事务增强:支持 TCC 模式,提供更灵活的事务解决方案。
轻量级 SDK:简化客户端依赖,提升启动速度。
三、高级特性与源码原理
18. 零拷贝技术
RocketMQ:使用 mmap 内存映射文件,减少用户态与内核态数据拷贝。
Kafka:采用 sendfile 系统调用,实现更高吞吐但灵活性较低。
19. DLedger 高可用机制
基于 Raft 协议实现主从选举,主节点故障时自动切换,保障数据一致性。
20. 消息过滤
Tag 过滤:Broker 端过滤,减少网络传输。
SQL 过滤:需开启 enablePropertyFilter=true,支持复杂条件匹配。
21. 事务消息实现细节
两阶段提交:
发送 Half 消息(预提交),Broker 存储但暂不投递。
执行本地事务,返回 Commit/Rollback 状态。
Broker 根据状态投递或删除消息,若未收到确认则发起事务回查。
源码分析:TransactionMQProducer 处理本地事务回调,TransactionalMessageService 管理事务状态。
22. 消息索引文件(IndexFile)的作用
存储结构:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通过 Message Key 或 Unique Key 快速查询消息,支持按时间范围检索。
源码类:IndexService, IndexFile。
23. PageCache 与 Mmap 如何提升性能?
PageCache:利用操作系统缓存,将磁盘文件映射到内存,加速读写。
Mmap:通过内存映射文件,避免 read()/write() 系统调用的数据拷贝,提升 CommitLog 写入效率。
刷盘策略:SYNC_FLUSH(同步刷盘)依赖 FileChannel.force(),ASYNC_FLUSH 使用后台线程批量刷盘。
24. 消息消费位点(Offset)管理机制
本地存储:Consumer 默认将 Offset 存储在本地文件(~/.rocketmq_offsets)。
远程存储:集群模式下,Offset 上报至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:从最大位点开始消费。
CONSUME_FROM_FIRST_OFFSET:从最小位点开始消费。
25. 消息索引文件(IndexFile)的作用
存储结构:哈希索引(Key: Message Key, Value: CommitLog Offset)。
用途:通过 Message Key 或 Unique Key 快速查询消息,支持按时间范围检索。
源码类:IndexService, IndexFile。
26. PageCache 与 Mmap 如何提升性能?
PageCache:利用操作系统缓存,将磁盘文件映射到内存,加速读写。
Mmap:通过内存映射文件,避免 read()/write() 系统调用的数据拷贝,提升 CommitLog 写入效率。
刷盘策略:SYNC_FLUSH(同步刷盘)依赖 FileChannel.force(),ASYNC_FLUSH 使用后台线程批量刷盘。
27. 消息消费位点(Offset)管理机制
本地存储:Consumer 默认将 Offset 存储在本地文件(~/.rocketmq_offsets)。
远程存储:集群模式下,Offset 上报至 Broker(ConsumerOffsetManager)。
重置方式:
CONSUME_FROM_LAST_OFFSET:从最大位点开始消费。
CONSUME_FROM_FIRST_OFFSET:从最小位点开始消费。
场景设计题
1 .设计一个高并发秒杀系统,如何利用 RocketMQ 优化?
流量削峰:将秒杀请求写入 RocketMQ 队列,异步处理订单创建与库存扣减。顺序消息:使用哈希选择器将同一用户请求路由到固定队列,避免超卖。事务消息:扣减库存与生成订单通过事务消息保证最终一致性。动态扩容:根据监控指标(如堆积消息数)自动扩容 Consumer,快速消化积压
2 . 设计一个秒杀系统,如何用 RocketMQ 解决超卖问题?
消息队列削峰填谷 + 数据库乐观锁 + 事务消息保证最终库存一致。
3 . 如何实现分布式事务(订单扣库存+生成订单)?
事务消息:半消息预扣库存,本地事务生成订单,失败则回滚库存。
4.如何设计一个异地多活消息队列系统?
跨城同步:Broker 集群分机房部署,通过 Async replication 同步消息。单元化路由:Producer 根据用户 ID 哈希选择本地机房 Broker,减少跨城延迟。容灾切换:监控机房状态,自动切换消息路由至可用机房。
5.消息丢失的可能原因与解决方案
生产者丢失:原因:异步发送未处理 SendCallback 异常。解决:使用同步发送 + 重试机制。Broker 丢失:原因:异步刷盘时宕机,PageCache 数据未落盘。解决:SYNC_FLUSH 刷盘 + 主从同步。消费者丢失:原因:消费成功但 Offset 未提交。解决:先处理业务逻辑,再手动提交 Offset。、
Demo实操
- 引入依赖
<dependencies><!-- RocketMQ 客户端 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency><!-- Spring Boot Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
2. 配置文件 application.yml
yamlrocketmq:namesrvAddr: 127.0.0.1:9876producer:group: test-producer-groupconsumer:group: test-consumer-grouptopic: TestTopic
3. 生产者配置类
javapackage com.example.demo.config;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RocketMQProducerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.producer.group}")private String producerGroup;@Beanpublic DefaultMQProducer mqProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);producer.start();System.out.println("🚀 RocketMQ Producer 启动成功");return producer;}
}
4. 生产者发送接口
javapackage com.example.demo.controller;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;import java.nio.charset.StandardCharsets;@RestController
@RequestMapping("/mq")
public class ProducerController {@Autowiredprivate DefaultMQProducer producer;@Value("${rocketmq.topic}")private String topic;@PostMapping("/send")public String sendMessage(@RequestParam String msg) throws Exception {Message message = new Message(topic, msg.getBytes(StandardCharsets.UTF_8));SendResult result = producer.send(message);return "发送成功: " + result;}
}
5. 消费者配置类
java复制编辑
package com.example.demo.config;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.nio.charset.StandardCharsets;
import java.util.List;@Configuration
public class RocketMQConsumerConfig {@Value("${rocketmq.namesrvAddr}")private String namesrvAddr;@Value("${rocketmq.consumer.group}")private String consumerGroup;@Value("${rocketmq.topic}")private String topic;@Beanpublic DefaultMQPushConsumer mqConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(namesrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);System.out.println("📩 收到消息: " + body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("✅ RocketMQ Consumer 启动成功");return consumer;}
}
最后有兴趣可以尝试自动重试,TraceId 追踪, 异步发送, 批量发送