目录
一、概要介绍
二、架构与原理
三、消费模式
1、Kafka—纯拉模式
2、RocketMQ—拉模式
3、RocketMQ—推模式
4、模式对比
四、特殊消息
1、顺序消息
2、消息过滤
3、延迟消息
4、事务消息
5、广播消息
五、高吞吐
六、高可用
七、高可靠
一、概要介绍
- Apache Kafka
- 分布式、多区、多副本、基于 ZooKeeper 协调的消息系统
- 核心组件:ZooKeeper、Broker、Producer、Consumer
- 特点:高吞吐(顺序写+零拷贝)、低延迟、易扩展
- Apache RocketMQ
- 分布式、轻量 NameServer路由、主从多副本、阿里巴巴开源的消息引擎
- 核心组件:NameServer、Broker、Producer、Consumer
- 特点:原生事务消息、定时/延迟投递、灵活顺序消费
二、架构与原理
1、Kafka
- 服务设计
- 目标:提供可水平扩展的分布式提交日志服务,实现高吞吐、低延迟、容错与持久化;
- 分层:
- 存储层:基于 CommitLog 的顺序写入,利用零拷贝提高磁盘 I/O 效率;
- 计算层:Producer/Consumer、Kafka Streams、Connect API,支持流处理与数据集成。
- 核心组件
- ZooKeeper:集群元数据管理、Leader 选举与配置存储;
- Broker:Kafka 服务器节点,负责接收、存储、分发消息;
- Producer:消息生产方,将数据发送至指定 Topic;
- Consumer:消息消费方,通过拉模型按 Offset 拉取数据。
- 架构要点
- Topic & Partition:Topic 逻辑分组,Partition 物理子分片,支持并行与负载均衡;
- Replication:每个 Partition 在多个 Broker 上拥有 Leader/Follower 副本,实现高可用;
- Controller:集群内唯一的 Controller Broker,负责分区分配与故障检测。
- 工作原理
- 生产流程:Producer 从 ZooKeeper 或 Controller 获取 Topic 路由信息 —> 选择 Partition —> 顺序写入 CommitLog —> 返回 Offset;
- 复制流程:Leader 接收写入请求 —> 同步写入本地 —> Follower 拉取并写入 —> 保持 ISR(In-Sync-Replicas);
- 消费流程:Consumer Group 拉取 Topic 路由 —> 各 Consumer 按分区拉取消息 —> 根据 Offset 进行位点提交与 Rebalance
2、RocketMQ
- 服务设计
- 目标:提供高可靠、低延迟、灵活多样的消息中间件,强调事务、顺序与延迟消息;
- 轻量化:去中心化 NameServer,无状态设计,简化元数据管理。
- 核心组件
- NameServer:服务发现与路由注册中心,无需持久化,节点间不通信,水平扩展简单
- Broker:消息存储与转发节点,分 Master/Slave 部署,顺序写 CommitLog,并维护 ConsumeQueue 索引;
- Producer:通过 Netty 连接 NameServer —> 获取路由 —> 发送消息至 Broker,可选择同步/异步/单向模式;
- Consumer:支持 Push/Pull 模型,可配置集群消费或广播消费,消费后向 Broker ACK,并可重试或进入私信队列。
- 架构要点
- Topic & Queue:Topic 逻辑分类,Queue 无力分片;多 Queue 支持并行消费与顺序消费;
- Replica & HA:Master/Slave 同步或异步复制,故障自动切换保证可用;
- 事务 & 定时:原生事务消息二阶段提交;内置定时/延迟投递机制,无需额外组件。
- 工作原理
- 注册与心跳:Broker 启动向所有 NameServer 注册并定期心跳;Producer/Consumer 启动向 NameServer 获取路由并心跳;
- 生产流程:Producer 获取 Topic-Queue 路由 —> 选择 Broker Master —> 顺序写入 CommitLog —> 更新 ConsumeQueue;
- 消费流程:Consumer 拉取路由 —> 对应 Queue 发起 Pull 请求或 Broker 主动 Push —> 处理消息后 ACK —> Broker 更新消费进度。
三、消费模式
- Kafka只支持拉模式(pull):应用线程主动调用 poll(),并在业务中自行控制拉取节奏、批量大小和offset提交,适合对消费流程有高度可控需求的场景;
- RocketMQ既支持拉模式(PullConsumer)也支持推模式(PushConsumer):
- pull模式下,开发者同样需要掌握全部拉取逻辑;
- push模式下,SDK内部自动拉取并通过回调线程池分发,自动流控和失败重试,适合简洁快速上手的场景。
1、Kafka—纯拉模式
- 实现原理
- 客户端通过 KafkaConsumer.poll(timeout) 向Broker拉取批量消息;
- 内部心跳线程负责与协调器(Coordinator)保持心跳,监测自身在消费组的存活;
- 收到消息后,业务线程处理并显示调用 consumer.commitSync() 或 commitAsync() 提交 offset
- 工作机制
- 双线程模型:心跳线程+应用线程;
- Rebalance:当组内成员变动或分区列表更新时,Coordinator触发再平衡,重新分配分区;
- 长轮询:fetch.min.bytes + fetch.max.wait.ms 控制空拉等待,减少空结果
- 特性
- 可控性最强:拉取时机、批量大小、并行度、重试、Offset提交都由应用决定;
- 灵活度高:可与自定义线程池、业务逻辑无缝结合;
- 开发成本:需额外管理心跳、Rebalance异常、消费节奏与错误重试;
- 使用场景:大数据实时ETL、流计算框架接入、需要精细消费控制的中间件或网关。
2、RocketMQ—拉模式
- 实现原理
- 通过 DefaultMQPullConsumer.pull() 或 pullBlockIfNotFound(...) 向 Broker 拉取;
- 无内置心跳线程,由应用线程自行循环调用;
- 拉到后需调用 updateConsumeOffset() 显示提交消费进度
- 工作机制
- 应用线程完全掌控:何时拉、拉多少、何时提交、何时休眠;
- 阻塞拉取:pullBlockIfNotFound 在无新消息时阻塞,避免频繁空拉;
- 全流程自定义:可插入本地缓冲、线程池或异步处理逻辑;
- 特性
- 高度自由:可灵活实现批次消费、事务边界、动态backoff策略;
- 开发负担:需开发者处理循环、节奏控制、重试与流控;
- 使用场景:对消费节奏、回溯逻辑、负责业务处理由深度定制需求时。
3、RocketMQ—推模式
- 实现原理
- SDK 启动后,内部拉取线程组不断调用 pullBlockIfNotFound;
- 将拉到的消息按批次交给回调线程池执行 MessageListener;
- Listener 返回 CONSUME_SUCCESS 时,SDK 自动提交 offset;返回 RECONSUME_LATER 则重试。
- 工作机制
- 双线程池模型
- 拉取线程组:负责持续拉取
- 回调线程池:大小在 consumeThreadMin - consumeThreadMax 之间动态伸缩
- 内置流控:当队列积压超过 pullThresholdForQueue 时,自动暂停拉取;
- 自动重试:失败后按 maxReconsumeTimes 和重试延迟自动重试。
- 双线程池模型
- 特性
- 开箱即用:业务层仅关注 MessageListener,无需关心循环、心跳、提交和流控;
- 稳定性好:SDK 层面提供失败隔离、重试、流控、并发管理;
- 使用场景:微服务快速集成、对消费可靠性和简洁化要求较高的业务。
4、模式对比
维度 | Kafka(拉) | RocketMQ(拉) | RocketMQ(推) |
---|---|---|---|
线程模型 | 心跳线程+应用线程 | 应用线程 | SDK拉取线程组+回调线程池 |
触发消费 | 主动 poll() | 主动 pull() / pullBlockIfNotFound() | 注册 Listener,SDK 自动拉取并回调 |
Offset 提交 | 自动/手动提交到 _consumer_offsets | 应用手动 updateConsumeOffset() | 回调返回 CONSUME_SUCCESS 即提交 |
流控 & 重试 | 无内置流控;需应用处理 | 无内置流控;需应用处理 | 内置流控阈值 & 重试机制 |
并发 & 批量 | max.poll.records + 长轮询 | batchSize + 自定义 sleep/阻塞 | pullBatchSize/consumeMessageBatchMaxSize + 线程池 |
开发复杂度 | 高:需管理心跳、冲平衡、提交 | 中:需管理拉取循环 & 提交 | 低:只需关注业务逻辑 |
适用场景 | 流计算、大数据ETL、细粒度控制 | 复杂消费逻辑、定制化回溯、事务场景 | 微服务快速集成、标准业务无特殊需求 |
四、特殊消息
1、顺序消息
- Kafka的顺序消息只能在单个分区内保证严格顺序,通过消息key或者自定义分区器将相关消息路由到同一个分区,再由消费者单线程按 Offset 顺序消费;
- RocketMQ除了类似的“单队列”顺序外,还通过 MessageQueueSelector 实现“分区顺序”(局部有序),并在 Broker 端对队列级别加锁,保证每个队列在同一时刻只有一个线程消费。
(1)Kafka
全局顺序(Global Order)
- 实现:将 topic 配置为只有一个 partition,并且只有1个消费者实例,即可在全局上保持顺序
- 性能折中:吞吐与并行度大幅下降,因为只有单分区、单线程在跑
分区内顺序(Partition Order)
- 实现原理:Kafka 只在单个 Partition 内保证顺序,所有写入该 Partition 的消息按 Offset 严格有序
- 消息路由:
- Key分区:ProducerRecord 构造时带 key,DefaultPartitioner 会把相同 key 的消息路由到同一分区
- 自定义分区器:实现 Partitioner 接口,覆盖 partition() 方法
// Producer
// 构造 ProducerRecord(topic, key, value) → 客户端调用 send(record)。
// DefaultPartitioner.partition(topic, keyBytes, ...) 内部做:
int numPartitions = partitionsFor(topic).size();
return (Utils.murmur2(keyBytes) & 0x7fffffff) % numPartitions;// 自定义分区器 实现 org.apache.kafka.clients.producer.Partitioner 接口:
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {// 比如按用户ID取模return Math.abs(key.hashCode()) % cluster.partitionsForTopic(topic).size();}
}// 客户端配置:
partitioner.class=com.example.MyPartitioner// 这样所有 producer.send() 时都会走你的 partition() 逻辑,在发送前就决定好了分区。
// Producer 的路由逻辑在调用 send() 时就完成了,消费者不会再“路由”——它只会拉取自己所属的分区。// Comsuner
// 分区分配:Consumer Group 启动后,协调出每个实例各自负责哪些分区(由协调器 / rebalance 算法决定)。
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 这里 record.topic(), record.partition(), record.offset() 是固定的}consumer.commitSync();
}// 总结:Producer 决定“放到哪个分区”;Consumer 决定“从哪些分区拉”。中间没有二次路由。
(2)RocketMQ
全局顺序(Global Order)
- 实现:Topic 只能配置1个队列,1个消费者实例;所有消息都发送到同一队列,单线程消费
// 发送端:只定义一个队列
producer.send(message, new MessageQueueSelector() {public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {return mqs.get(0); // 始终选择第一个队列}
}, null);
// 消费端:使用顺序监听器
consumer.registerMessageListener(new MessageListenerOrderly() { ... });
分区顺序(Partition Order)
- 实现:根据业务的 Sharding Key 选队列,同一 Key 的消息走同一队列;消费者可多实例,每个实例独占若干队列。
producer.send(msg, (mqs, msg, key) -> {int index = key.hashCode() % mqs.size();return mqs.get(index);
}, orderId);
consumer.registerMessageListener(new MessageListenerOrderly() { ... });
队列级锁:
Broker 在消费每个队列前会加锁,保证同一时刻只有一个线程消费该队列,顺序不会被打乱。
总结:
- 全局顺序:topic 只配1个队列,并且消费者实例数=1;所有消息都跑到同一个队列;
- 分区顺序:topic配多个队列;Producer 根据 Sharding Key 选定队列;多个消费者实例,各自独占若干队列;
- 队列级锁
- Broker端:收到消费者订阅请求后,为每个 MessageQueue(队列)维护一把分布式锁;
- 加锁时机:当消费者实例调用 MessageListenerOnderly 顺序消费时,客户端会向 Broker 请求“对某个队列(MessageQueue)加锁”。加锁成功后,该实例再整个批次(ConsumeOrderlyBatchMaxSize)乃至单条消息逐个处理完毕并显示提交ACK之前,不会释放锁;
- 锁的生命周期:
- 获取锁 —> 开始 pull 并依次交给 listener 处理
- 处理完成所有拉到的消息后 —> 主动释放锁
- 如果在 consumeOrderlyTimeout 时间内未能完成处理并释放锁,Broker 会强制过期这把锁,然后其他实例才能继续抢锁消费
- Broker 再将队列分配给下一个有空闲线程的消费者实例
- 消费模式:一条一条拉取 vs 批量拉取
- 批量拉取:默认 RocketMQ 顺序消费每次 pull 一批消息(可配置),但客户端内部会将这批消息逐条调用 MessageListenerOrderLy
- 单条消费:你的 listener 也可以每次只处理一条,处理完再 ack。只要队列锁未释放,就会继续拉取下一条。
2、消息过滤
- RocketMQ 原生支持服务端过滤(Tag过滤、SQL92属性过滤)和消费端多虑,可以在Broker上就把不感兴趣的消息“丢掉”,降低网络与客户端压力;
- Kafka本身不支持Broker层的消息内容过滤,只能靠消费端过滤或借助 Kafka Streams/KSQL 等流处理层来实现,真正的筛选总是在消费者一侧完成。
(1)Kafka
消费端过滤
消费者在 poll() 后,根据消息内容自行判断;无用的消息也会进入消费端
Kafka Streams / KSQL 过滤(附加层)
如果需要在 Broker 与下游业务之间做过滤“类似”服务端效果,可部署 Kafka Streams 或 ksqlDB:
// Kafka Streams 示例
KStream<String, Order> orders = builder.stream("TopicTest");
KStream<String, Order> filtered = orders.filter((k, v) -> v.getStatus().equals("CREATED"));
filtered.to("TopicCreatedOrders");
将过滤结果写入新的 Topic,消费者再只订阅 TopicCreatedOrders。
缺点:需要额外部署流处理集群,消息经过二次写入才可消费。
(2)RocketMQ
Tag过滤(最轻量)
原理:给消息打上一个或多个Tag(标签),Broker 为每个队列维护 Tag 到消息位置的简单索引;
服务端:
// Producer 发送时指定 Tag
Message msg = new Message("TopicTest", "TagA", "OrderID001", body);
producer.send(msg);
Broker 在存储时,会同时在消费队列的索引中记录该消息的 Tag;当消费者订阅 TagA 时,只扫描索引取出带有 TagA 的消息。
消费端:
// Consumer 订阅时指定 Tag 过滤
consumer.subscribe("TopicTest", "TagA || TagB");
消费者只拉取包含 A 或 B 的消息,不会接收到其它 Tag 的数据。
SQL92 属性过滤(精细化)
原理:消息可附带任意用户属性(key-value),Broker在存储时把这些属性信息也写入 ConsumeQueue 的二级索引;订阅时可传入 SQL 表达式,Broker 会对索引做简单的条件匹配,只返回符合条件的消息。
生产端:
Message msg = new Message("TopicTest", // topic"TagA", // tagbody);
msg.putUserProperty("orderStatus", "CREATED");
producer.send(msg);
消费端:
// 服务端过滤
consumer.subscribe("TopicTest",MessageSelector.bySql("orderStatus = 'CREATED' AND price > 100"));
这样只有属性条件满足的消息才会被拉取到客户端,极大减少网络流量与客户端CPU消耗。
3、延迟消息
- RocketMQ 原生支持延迟消息,通过将消息先存入内部特殊的调度主题 SCHEDULE_TOPIC_XXXX,并由18个定时任务定期扫描、判断、转发到目标 Topic,来实现任意一级延迟;
- Kafka 核心并不内置延迟消息能力,仅能借助外部定时重投、Kafka Streams 定时器或延迟队列 Topic 等手段来模拟延迟发送。
(1)Kafka
Kafka API 中没有类似 RocketMQ 的“延迟等级”或“调度主题”机制;Producer 的 send()
方法只能指定目标 Topic、Partition、Key、Timestamp 等字段,无法内置延迟投递。
(2)RocketMQ
延迟等级与配置:
- 默认 18 个等级:从 1s、5s、... 到 2h;
- 可在 broker.conf 中通过 messageDelayLevel=... 自定义对应的延迟时间,并在 Broker 启动时加载到 DelayLevelTable
消息流转:
- 生产者发送带延迟级别的消息:
Message msg = new Message(topic, tags, body); // 发送时指定延迟级别 3(10s) SendResult res = producer.send(msg, 3);
- 存储到调度主题:Broker 不直接存入目标 Topic,而是写入内部主题 SCHEDULE_TOPIC_XXXX 对应的第3号队列(延迟级别队列)
- 定时调度:RocketMQ启动后,会为每个延迟等级启动一个定时任务(默认每100ms执行一次),执行:
for each delayLevel in DelayLevelTable:queueId = delayLevel.index - 1nextOffset = topicScheduleMessageService.fetchOffset(queueId)msgExt = scheduleMessageStore.getMessage(queueId, nextOffset)if msgExt.storeTimestamp + delayTime(delayLevel) <= now:// 时间到了,投递到原 TopicbrokerController.getMessageStore().putMessage(clone msg with real Topic)scheduleMessageStore.updateOffset(queueId, nextOffset + 1)
- 转发到真实Topic:调度任务将消息“透传”到原本的 Topic 和队列,消费者就像正常消费普通消息一样接收。
关键点:
- 调度主题不可见:客户端无法直接订阅 SCHEDULE_TOPIC_XXXX;
- 定时队列指针:每个等级维护偏移量,下次从上次结束处继续扫描;
- 并发调度:18个任务并行处理不同等级,互不影响;
- 可调整精度:通过修改定时任务间隔(20ms -> 100ms)与延迟级别时间,可在毫秒级到小时级自由配置。
4、事务消息
- RocketMQ 原生支持事务消息,通过“半消息+本地事务+回查”三步走,实现了本地事务与消息投递的原子性,保证“订单写入成功 <=> 消息投递成功”同步一致。
- Kafka 从0.11版本起也提供了事务API,支持 Producer 端在多个分区和多个 Topic 上的 Exactly-Once 语义,但其实现机制与 RcoketMQ 略有差异。
(1)Kafka
- 核心机制:事务协调器(Transaction Coordinator)
- 幂等生产者:开启 enable.idempotence=true 后,Kafka 客户端自动为每条消息分配序列号,Broker 去重;
- 事务 API:
- initTransactions() 初始化与协调器的连接;
- beginTransaction() 开启事务;
- 多次 send() 到同一个或多个 Topic/Partition;
- 成功阶段:commitTransaction() —> 协调器写入事务完成标记;
- 失败阶段:abortTransaction() —> 丢弃事务内消息;
- Exactly-Once:消费者需配置 isolation.level=read_committed,只读已提交事务消息。
- 实现细节与限制
- 事务协调器:部署在 Kafka Broker 上,每个事务由协调器管理;
- 事务范围:同一 Producer 实例内,多个 Topic/Patition;
- 幂等与事务耦合:Producer 自动在幂等基础上开启事务,确保无重复并原子提交;
- 性能考量:
- 事务内消息在提交前对消费者不可见,需额外的协调开销;
- 事务跨分区会触发协调器同步,增加延迟。
(2)RocketMQ
- 基本模型:二阶段提交
- Prepare(半消息):Producer调用 sendMessageInTransaction(msg, arg),Broker 接收并保存为“半消息”(不可见给消费者);
- 执行本地事务:Producer 在本地执行业务(如下单、扣库存),代码在 executeLoacalTransaction() 中完成,返回 COMMIT 或 ROLLBACK;
- Commit / Rollback:Producer 调用 endTransaction() 通知 Broker:
- COMMIT_MESSAGE —> Broker 将半消息变为可投递消息;
- ROLLBACK_MESSAGE —> Broker 删除半消息,不投递。
- 事务回查:若 Producer 在超时时间内未调用 endTransaction() (网络异常、进程崩溃等),Broker 会定期发起回查请求,Producer 的 checkLocalTransaction(MessageExt msg) 负责读本地状态(持久化的事务记录)并返回状态,最终保证“悬挂”事务决断。
- 实现细节与关键点
- 半消息存储:写入到内部主题 RMQ_SYS_TRANS_HALF_TOPIC,并带上唯一 transactionId、msg.getKeys() 或 userProprety,用于回查定位;
- 事务状态持久化:强烈建议将本地事务状态(如 orderId —> SUCCESS/FAILED)持久化到数据库,避免 JVM 崩溃后丢失状态;
- 幂等与安全:
- checkLoaclTransaction() 可能被多次或并发调用,务必保证幂等、安全;
- 回查返回 UNKNOW 可让 Broker 继续重试,以防业务系统暂时无法判断。
- 回查频率:Broker 后台线程定期(默认每30s)检查未决事务,直到收到明确状态;
- 性能影响:
- 半消息写两次磁盘:一次 Prepare,一次 Commit;
- 持久化事务记录与幂等回查的额外 DB 访问。
5、广播消息
- RocketMQ原生支持“广播消息”模式,只要将消费者的 MessageModel 设置为 BROADCASTING,就能让同一消费组内的所有实例都各自收到并消费每一条消息;
- Kafka 不支持同组内广播。
(1)Kafka
Kafka的 Consumer Group 设计目标就是“同组内消息分发到单实例”,不支持组内广播。
(2)RocketMQ
原理剖析
-
消费模式切换
- 默认是集群模式(CLUSTERING):同组内消息只会被一个实例消费;
- 广播模式(BROADCASTING):同组内每个实例都能消费每条消息。
- 消费位点存储
- 集群模式:位点(offset)由 Broker 统一维护;
- 广播模式:位点由客户端本地保存(通常是本地文件或内存,默认是 ${user.home}/rocketmq_offsets/broadcast_group/),每次拉取后会持久化
- 消息分发:Broker 在推送消息时,不检查该组中其他实例的消费状态,只要实例在线并订阅,就逐条下发;
- 故障与重启:
- 如果本地已有保存的 offset,消费者重启后会从该 offset 继续消费(而非从0开始)
- 如果本地尚无任何 offset(首次启动或数据目录被清空),将根据 consumeFromWhere 配置决定起点(如果没有显示的配置 consumeFromWhere,RMQ的广播消费者在首次启动时会使用默认策略
CONSUME_FROM_LAST_OFFSET,只从最新消息之后开始消费,不会重跑历史消息)
:// 默认为 CONSUME_FROM_LAST_OFFSET consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 可选 CONSUME_FROM_FIRST_OFFSET 或 CONSUME_FROM_TIMESTAMP
- 所以只要不删除本地 offset 文件,每次重启都能接着上次进度继续。
使用方式
// 1. 创建 Consumer 并设置为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 切换为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅 Topic
consumer.subscribe("TopicBroadcast", "*");
// 注册消息监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 逐条处理System.out.printf("实例[%s] 收到消息: %s%n",InetAddress.getLocalHost().getHostName(),new String(msg.getBody(), StandardCharsets.UTF_8));}// 手动 Ackreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
- 手动 ACK:广播模式下,Broker 不等待 ACK,但客户端仍可通过返回 CONSUME_SUCCESS 或抛出异常来触发本地重试/死信机制;
- 重试与DLQ:当消费失败时,客户端可自行重试,或配置 maxReconsumeTimes 后转入死信队列,保证至少一次投递;
- 位点回溯:可在客户端启动时调用 seek() 或设置 consumeFromWhere 为 CONSUME_FROM_TIMESTAMP,回溯到指定时间点。
- 广播模式默认不会自动回溯N条,只在首次无位点时根据 consumeFromWhere 决定起点
- 若需“回溯N条”,手动读取并修改本地 offset;
- 对于集群模式,推荐使用 mqadmin resetOffsetByTime 或 updateConsumerOffset 工具按时间或按条数统一重置
确保消息不丢的最佳实践
- 冗余发送:在 Broker 端开启主从同步刷盘(SYNC_FLUSH),避免挂掉导致消息丢失;
- 定期 pull:消费者使用 pull 模式定期拉取,降低网络抖动带来的漏拉消费;
- 本地记录:消费端可维护“已消费消息ID”列表,配合去重,防止重复或漏消费;
- 重平衡回溯:在实例掉线后重启,主动拉取未消费消息,或回溯N条,确保每条都被消费过
五、高吞吐
在高吞吐领域,Kafka 和 RocketMQ 都通过顺序I/O、索引结构、批量处理和并行化等手段提升性能,但两者的实现思路和侧重点有所差异。Kafka凭借操作系统零拷贝、Page Cache 深度利用、超大分区并行和批量压缩等优化,使其在同等硬件下吞吐能力往往高于 RocketMQ。
1、Kafka
- 顺序读写与 Page Cache
- 顺序写入 CommitLog:所有消息追加到分区对应的文件(Segment)末尾,避免随机 I/O;
- 操作系统 Page Cache:Kafka 不在 JVM 堆内维护大型缓存,而是让 OS 把磁盘数据缓存到内核空间,读写几乎在内存完成,降低 GC 影响;
- 零拷贝(Zero-Copy):sendfile() 系统调用:在消费端,数据从 Page Cache 直接推送到网络 Socket,跳过用户态缓冲区复制,减少上下文切换和内存拷贝次数;
- 分区与分段(Partition & Segment)
- 多分区并行:每个 Topic 可以拥有成百上千个分区,不同分区并行处理,线上集群可轻松横向扩展;
- Segment切片:每个分区又按固定大小(如1GB)切分为多个 Segment,提供文件滚动和索引机制,保证单个文件可控切快速查找。
- 批量与压缩
- 批量发送/拉取:Producer linger.ms + batch.size,Consumer fetch.min.bytes + fetch.max.wait.ms,都鼓励打包处理,提高 I/O 效率;
- 压缩算法:Snappy、LZ4 等实时压缩,在保证低延迟到同时显著降低网络贷款与磁盘吞吐压力。
- 索引与快速定位:Offset索引:每个 Segment 维护内存映射的 Offset 索引,消费者仅需根据 Offset 直接定位文件偏移,无需扫描。
2、RocketMQ
- CommitLog + 内存映射(mmap)
- 单一 CommitLog:所有消息顺序写入同一组文件,简化存储逻辑并提升写入吞吐;
- 内存映射文件:Broker 端通过 Java MappedByteBuffer 将文件映射到内存,写入与刷盘性能接近本地内存操作。
- 消费索引:ConsumeQueue 与 IndexFile
- ConsumeQueue:为每个 Topic-Queue 维护轻量级索引文件,记录物理偏移量、消息大小等,消费者拉取时只查索引定位,无需遍历主日志;
- IndexFile:可选的二级索引,允许按消息 Key 快速检索,适合少量热点查询。
- 批量与刷盘策略
- 批量写入:Producer 支持批量发送,Broker 端累积后一次持久化;
- 灵活刷盘:同步(SYNC_FLUSH)与异步(ASYNC_FLUSH)可选,允许在可靠性和吞吐之间做权衡。
- 分区并行(Queue)
- 多队列并行:每个 Topic 可有多个队列,消费者实例可以并行拉取不同队列,加大并发吞吐;
- 消息路由灵活:通过 MessageQueueSelector 能根据业务 Key 按需分区,既保证顺序又并行。
3、关键差异对比
优化点 | Kafka | RocketMQ |
---|---|---|
内存缓存 | 完全依赖 OS Page Chache,JVM对外,降低GC影响 | 依赖 MappedByteBuffer(对外),写入同时映射到内存 |
网络拷贝 | sendfile() 零拷贝 | 默认通过 Netty ByteBuffer,可选 FileRegion(部分版本支持零拷贝) |
批量处理 | 上层 Producer/Consumer 强制批量配置,如 linger.ms、fetch.min.bytes | Producer 批量 API+Broker 端批量刷盘 |
分区并行 | 分区数可达上千,横向扩容几乎无限 | 队列数受限于 Broker 配置与硬件资源,可并行但通常低于 Kafka 分区数 |
索引结构 | 基于内存映射 Offset 索引,直接文件定位 | ConsumeQueue + IndexFile 二级索引,定位稍多一步 |
刷盘模式 | 可选定时/同步刷盘(多副本确认替代同步刷盘) | 同步或异步刷盘,功能更直观 |
延迟与吞吐 | 网络零拷贝 + 大批量 + 高并行 = 毫秒级延迟 + 百万级 TPS | mmap + 顺序写 + 批量刷盘 = 低延迟 + 十万级 TPS |
4、为何 Kafka 吞吐更大?
- OS 层零拷贝 vs Netty复制:
- Kafka 原生调用 sendfile(),将 Page Cache 中数据直接推送到 Socket 缓冲,避免用户态 <=> 内核态多次拷贝与上下文切换。
- RocketMQ 默认使用 Netty ByteBuf,数据从 MappedByteBuffer —> JVM堆外 —> Netty 缓冲区,多了一次内核—>用户空间的复制。
- 虽然可以通过 Netty 的 FileRegion 插件启用零拷贝,但这需要外配置,不如 Kafka 原生开箱即用。
- 分区并行 vs 队列并行:
- Kafka 分区(Partition)可随时在线添加,且能将分区数调至数百、上千,极大提升并行度;
- RocketMQ 队列(Queue)数在 Topic 创建时确定,扩容需重建 Topic 或滚动重启 Broker,难以像 Kafka 那样无中断大幅增队列。
- 批量策略灵活度:
- Kafka Producer/Consumer 端提供 linger.ms、batch.size、fetch.min.bytes、fetch.max.wait.ms 等多维度参数,可在微秒级动态拼包;
- RocketMQ 虽支持批量发送 API(send(List<Message>))和批量拉取,但缺少类似 Kafka 的“时间阈值 + 最小字节”自适应批量拼包机制,需要应用自行控制调用时机。
- 索引结构轻量 vs 双级索引:
- Kafka 只保留 Offset 索引,在内存映射文件中直接定位偏移,即查即读,无额外索引结构开销。
- RocketMQ 采用 ConsumeQueue + IndexFile 双级索引
- ConsumeQueue 存放 CommitLog 偏移、消息大小
- IndexFile 支持 Key 查询
- 虽然功能更强,但读写时需额外维护和解析索引文件,相对多了 I/O 和 CPU 开销。
- 副本同步开销 vs 拉模型复制:
- Kafka 默认异步复制(Broker 拉取),但在 acks=all 下写入 Leader 时不阻塞后端拷贝,Follower 会后台拉取,避免写路径的阻塞;
- Kafka本身是异步拉取复制架构,但通过 acks=all + min.insync.replicas 配置,可以实现“效果上”的同步复制,Leader 在写入本地后,会等待所有在ISR(同步副本组)内的 Follower 都成功写入并 ACK,才向 Producer 返回成功;
- acks=all:Producer 要求等待所有 ISR 内的副本(包括 Leader 本身)确认写入后,才算写入成功;
- min.insync.replicas=2:意味着写入请求必须至少有2个副本在ISR(可同步的副本集)内确认,否则抛出 NotEnoughReplicas 异常
- 虽然 Follower 依旧是“拉”,但 Leader 会阻塞在本地等待这些 ISR Follower 的ACK(其拉到并写入后会回报给 Leader),再向 Producer 返回,这相当于“同步风格”,保证了至少 min.insync.replicas 个节点上的数据一致。
- RocketMQ 主从复制可选同步刷盘或异步刷盘
- 同步刷盘模式下,Master 必须等待 Slave 磁盘写入返回后才 ACK,增加写延迟
- 异步刷盘虽然速度快,但可靠性下滑,不宜用于高可靠场景。
- RMQ的复制流程
- Master 写入 CommitLog
- 根据
flushDiskType
:- ASYNC_FLUSH:Master 在本地内存提交后异步刷盘,立即返回 ACK。
- SYNC_FLUSH:Master 在本地内存提交后同步写磁盘再返回 ACK;
- 根据
- Slave 拉取同步
- Slave 持续以 Pull 方式向 Master 请求新消息(CommitLog),拉到后写本地并刷盘。
- 无论 Master 是同步刷盘还是异步刷盘,Slave 拉取机制都不变,Master 并不等待 Slave 完成拉取。
- Producer ACK 时机:Producer 在 Master 本地写(并根据
flushDiskType
刷盘)成功后,就会收到 ACK,与 Slave 是否已拉取无关。
- Master 写入 CommitLog
- Kafka 默认异步复制(Broker 拉取),但在 acks=all 下写入 Leader 时不阻塞后端拷贝,Follower 会后台拉取,避免写路径的阻塞;
六、高可用
RocketMQ 和 Kafka 在“高可用”层面,都通过多节点冗余、自动选举/切换、健康监测等手段消除单点,但他们在元数据服务、复制协议、故障切换和运维复杂度上实现细节各异。
1、Kafka
- 核心组件与角色
- ZooKeeper(或 KRaft)
- 功能:存储集群元数据(Broker列表、Topic分区状态、Controller ID)
- 实现:ZNodes 结构+会话心跳,选举 Controller,协调 Broker 注册与分区状态
- Broker
- 接入 Producer 写入,存储 Partition 副本,服务 Consumer 拉取;
- Controller:集群中一个 Broker 充当 Controller,负责分区 Leader 选举与 Consumer Rebalance;
- Producer:端配置 bootstrap.servers,idempotence + transactions
- Consumer:端配置 group.id,isolation.levle,并通过 Coordinator 管理 Rebalance
- ZooKeeper(或 KRaft)
- 复制与故障切换原理
- Leader-Follower + ISR
- Follower 定期(replica.fetch.wait.max.ms)向 Leader 发起 FetchRequest 批量拉取数据
- 只有成功拉取并写入本地的 Follower 才留在 ISR;
- 同步确认:acks=all + min.insync.repliccas=N —> Leader 写入本地后阻塞等待 ISR 内至少N个副本确认,再 ACK Producer
- 不安全选举
- 若 unclean.leader.election.enable=true,可能选用落后副本为 Leader,导致数据丢失
- 生产环境建议 false 仅在 ISR 内选举
- Leader 宕机切换
- Controller 在 ZK 上监测到 Leader 节点失联,自动选举新的 ISR 内副本为 Leader;
- Producer/Consumer 收到 NotLeaderForPartition 错误或 Fetch 错误后,刷新元数据并重试
- Leader-Follower + ISR
- 健康检查与路由更新
- Broker —> ZK 心跳:Broker 会话 TTL(Session.timeout.ms)未续约即失联;
- Controller 选举:首先连接 ZK 的 Broker 成为 Controller,失败重试;
- Consumer Rebalance:Coordinator 定期(heartbeat.interval.ms)心跳,触发分区重新分配。
- 部署方案
- ZooKeeper:>=3/5个节点,独立机房,调优 tickTime / initLimit / syncLimit;
- Broker:>=3个节点,replication.factor>=3、min.insync.replicas=2;
- Client:Producer 配 acks=all + 幂等,Consumer 配 auto.offset.reset + isolation.level
- 跨机房灾备:MirrorMaker 2跨集群复制,或使用 Confluent Replicator
- Rack Awareness:配置 broker.rack,replica.selector.class,保证副本跨机架分布
2、RocketMQ
- 核心组件与角色
- NameServer(无状态路由中心)
- 功能:内存保存所有 Broker 的路由信息(Topic —> Broker 列表、队列数)
- 实现:Netty Remoting,接收 Broker/Producer/Consumer 的注册与心跳,不做任何持久化;重启后需重建路由
- Broker(Master/Slave)
- Master:接收 Producer 写入,提供消费服务
- Slave:只读,从 Master 异步拉取数据
- 复制协议:Master 的 HAService 模块 + Slave 的 HAClient,Slave 通过 Pull 方式定期拉取 CommitLog
- Producer / Consumer:无状态,配置多 NameServer 地址;定期(30s)拉取路由与发送心跳
- NameServer(无状态路由中心)
- 复制与故障切换原理
- Master 刷盘模式
- flushDiskType = SYNC_FLUSH:Master 把消息写入内存后同步刷盘再 ACK Producer;
- ASYNC_FLUSH:内存写入后立即 ACK,再异步刷盘
- Slave 拉取同步:Slave 启动后向 NameServer 获取 Master 地址,通过 HAClient 维持长连接,以 Pull 协议拉取新数据写入本地;
- 主动主备切换
- BrokerRole=SAVE_POINT(默认 ASYNC_MASTER/SYNC_MASTER)配置;
- Master 宕机或网络断开,Slave 监测不到心跳后不会自动升级为 Master(开源版)
- 若启用 DLedger 存储(基于 Raft),Leader 宕机后,剩余多数派自动选举新 Leader,保证多副本无丢失
- Master 刷盘模式
- 健康检查与路由更新
- Broker —> NameServer 心跳:每30s上报自身状态(内存队列、存储量);
- NameServer —> Broker 超时清理:每10s扫描失联120s的 Broker 并从路由表移除;
- Producer/Consumer 订阅/发送前从 NameServer 拉取最新路由,冗余 NameServer 地址保证查询无单点。
- 部署方案
- NameServer:>=3节点,无状态、任意扩容,分布在不同机架或AZ;
- Broker:Master+至少1Slave,为关键业务可多配置几组跨机房;
- Client:配置所有 NameServer、Master/Slave 地址;30s感知路由变化;
- DLedger:如需写入多数副本确认,可启用 DLedger 模式,部署 3-5 个节点 Rafr 集群。
3、差异对比
维度 | RocketMQ | Kafka |
---|---|---|
元数据服务 | NameServer(无状态、内存路由) | ZooKeeper(有状态、ZNode存储) |
复制模型 | Slave 异步拉取,Master 本地刷盘后即 ACK | 异步拉取 + acks=all 可阻塞等待 ISR 确认 |
主备切换 | 开源版无自动 Master 切换(需 DLedger 模式) | Leader 宕机后 ISR 内自动选举 |
健康检查 | Broker-NS 心跳30s/NS-Broker 扫描10s | Broker-ZK 会话心跳(session.timeout.ms) |
扩容能力 | NameServer 可弹性扩容,Broker 队列数固定 | 分区可在线添加,无需重启 Broker |
运维复杂度 | 轻量 NameServer,DLedger 可选;Broker 配置简洁 | 需维护 ZK、Controller;需监控 ISR、Rebalance 事件 |
跨DC灾备 | Master/Slave 跨机房或 DLedger 跨 DC 共识 | MirrorMaker 2 / Confluent Replicator |
七、高可靠
要实现“绝对不丢消息”,必须在 Producer —> Broker —> Consumer 三个环节层层防护。
1、生产者(Producer)端
(1)Kafka
- 发送确认 acks
- acks=0:不等待任何确认,吞吐最高,极不可靠;
- acks=1:仅等 Leader 本地写入即 ACK,中等可靠;
- acks=all:等待所有 ISR 副本写入并 ACK(包含 Leader),高可靠;
- 副本保障
- min.insync.replicas=N:当 ISR 内可用副本数<N时,任何写入都报错,避免“ACK过早”导致丢数据;
- unclean.leader.election.enable=false:禁止非ISR副本当 Leader,避免选出滞后副本引发数据丢失;
- 幂等与事务
- enable.idempotnce=true:自动开启重试(retries=Max)、序列号去重,确保“最多一次”到“恰好一次”;
- 事务 API:initTransactions() —> beginTransaction() —> 多次 send() —> commitTransaction() 或 abortTransaction(),端到端 Exactly-Once
- 重试策略
- retries:发送失败自动重试;
- retry.backoff.ms:重试间隔;
- 本地宕机丢失场景:如果消息只驻留在 JVM 缓冲区或线程内存中,进程崩溃就丢失。解决方案:
- Outbox模式:本地数据库(或文件)先写“待发消息记录”,确保重启后可重发;
- 异步或定时器扫描 Outbox,调用 send(),待 ACK 后删除本地记录。
(2)RocketMQ
- 发送方式
- syncSend():同步发送,等待 Broker 本地刷盘(或主从同步)后返回;
- asyncSend():异步发送,回调通知成功/失败;
- oneWaySend():单向发送,不等确认(高吞吐、低可靠)。
- 重试与幂等
- producer.setRetryTimesWhenSendFailed(n):失败重试n次;
- 幂等需业务实现(例如消息 key 去重)。
- 刷盘与复制模式
- flushDiskType=ASYNC_FLUSH(默认):内存写入后立刻返回,后台异步刷盘;
- flushDiskType=SYNC_FLUSH:等待本地磁盘写入完成再返回;
- brokerRole=ASYNC_MASTER / SYNC_MASTER(DLedger模式):是否等待所有 Replica(Slave)同步确认后再 ACK;
- 分布式事务
- sendMessageInTransaction():发送“半消息”—>在 executeLocalTransaction() 执行业务—>调用 endTransaction(commit/rollback) —> Broker 根据结果投递或删除;
- 超时无确认时,Broker 发起回查,调用 checkLoaclTransaction() 直到明确提交或回滚
- 本地宕机丢失场景:同样可走Outbox模式:先写本地表,再调用 syncSend() / sendMessageInTransaction(),确认成功后清理。
2、消息(Broker)端
(1)Kafka
- 消息存储
- 顺序追加到分区对应的 Segment 文件,写入 OS Page Cache;
- 刷盘由 log.flush.interval.ms(定时刷)或 acks=all 阻塞等待 ISR 副本拉取后刷盘综合保证;
- 多副本复制
- Follower 拉取(replica.fetch.* 参数控制批量与频率);
- ISR(In-Sync Replicas)机制:只有成功拉取并写入本地的 Follower 才算在 ISR;
- acks=all 模式下 Leader 等待 ISR 内所有副本确认,方才 ACK Producer;
- 故障选举
- unclean.leader.election.enable=false 禁止非 ISR 副本当选,保证已 ACK 的数据不丢失;
- Leader 宕机后由 Controller(ZooKeeper / KRaft)在 ISR 内选举新 Leader。
(2)RocketMQ
- CommitLog 存储:顺序写入 MappedByteBuffer 或直接写盘,文件滚动,结合 flushDiskType 配置;
- 主从复制
- Slave 异步拉取 Master 提供的 CommitLog;
- 如需同步多副本,启用 DLedger(基于 Raft),Leader 在多节点确认后才 ACK Producer;
- NameServer 路由:无状态路由中心,Broker 启动后注册,负责路由、健康检查,不影响消息可靠性本身。
3、消费者(Consumer)端
(1)Kafka
- 消费语义
- 默认至少一次:业务处理后手动 commitSync() / commitAsync() 决定 offset;
- 精确一次:结合事务 API 和 isolation.level=read_committed,只读已提交事务消息,消费端也在事务中提交 offset;
- offset 管理
- enable.auto.commit=false:手动提交,确保处理成功才 advance;
- _consumer_offsets 主题持久化,重启后从上次点位恢复;
- 失败重试
- 处理抛异常不提交 offset,下次继续重试;
- 应用层幂等设计避免重复执行副作用。
(2)RocketMQ
- 拉模式
- 应用显示 pull() / pullBlockIfNotFount() 拉批量消息 —> 处理后调用 updateConsumeOffset() 提交;
- 失败时可重试或留在内存不提交 offset
- 推模式
- SDK 内部拉取 + 回调 MessageListenerConcurrentyly / MessageListenerOnderly —> 业务返回 CONSUME_SUCCESS 时 SDK 自动提交 offset;
- 返回 RECONSUME_LATER,SDK 延迟重试,最多 maxReconsumeTimes 次后进入死信队列
- 幂等与事务:业务需保证幂等,可结合事务消息实现“消费结果+offset提交”二阶段。