定义
Mssage Queue:消息队列。它是一种“先进先出”(FIFO)的数据结构,用于在分布式系统或应用程序之间进行异步通信。
组成
1. 生产者(Producer)
- 定义:消息的发送方,负责将业务系统产生的消息(如订单创建、日志数据)按照指定格式封装后,发送到 MQ 的 “交换机 / 主题” 等入口组件。
- 核心能力:
- 支持同步 / 异步发送(同步需等待确认,异步提高吞吐量);
- 消息序列化(如 JSON、Protobuf),确保跨系统可解析;
- 重试机制(发送失败时自动重试,避免消息丢失)。
2. 消费者(Consumer)
- 定义:消息的接收方,负责从 MQ 的 “队列” 中拉取或接收消息,并执行后续业务逻辑(如订单支付回调处理、日志存储)。
- 核心能力:
- 消费模式:分为 拉取模式(Pull)(消费者主动向 MQ 请求消息)和 推送模式(Push)(MQ 主动将消息推送给消费者);
- 消费确认(Ack):处理完消息后向 MQ 发送确认信号,MQ 收到后才删除消息,避免重复消费;
- 消费组(Consumer Group):多个消费者组成一个组,共同消费一个 “主题 / 队列” 的消息(实现负载均衡,提高消费效率)。
3. 消息(Message)
- 定义:MQ 传递的数据载体,是生产者与消费者之间的 “通信内容”。
- 核心结构:
- 消息体(Body):实际业务数据(如 “订单 ID=123,金额 = 100”);
- 消息头(Header):元数据,用于 MQ 路由和控制,常见字段包括:
- 消息 ID(唯一标识,用于去重和追踪);
- 主题 / 路由键(用于确定消息投递到哪个队列);
- 过期时间(TTL,超过时间未消费则丢弃 / 进入死信队列);
- 优先级(高优先级消息优先投递,部分 MQ 支持)。
4. 交换机 / 主题(Exchange/Topic)
- 定义:消息的 “路由中枢”,接收生产者发送的消息,并根据预设的 “路由规则”,将消息分发到对应的队列中。不同 MQ 对这一组件的命名和实现略有差异:
- RabbitMQ:明确使用 “交换机(Exchange)”,支持 4 种路由类型(Direct 精确匹配、Topic 模糊匹配、Fanout 广播、Headers 头匹配);
- Kafka/RocketMQ:使用 “主题(Topic)”,主题下细分 “分区(Partition)”,消息按分区规则(如哈希、轮询)分发到不同分区,本质是简化的路由逻辑。
5. 队列(Queue/Partition)
- 定义:消息的 “存储容器”,接收交换机 / 主题分发的消息,并暂存消息,等待消费者拉取或接收。不同 MQ 的队列设计差异较大:
- RabbitMQ:队列是独立的存储单元,支持 “持久化”(重启后不丢失消息)、“独占”(仅一个消费者访问)、“自动删除”(无消费者时自动删除);
- Kafka/RocketMQ:队列以 “分区(Partition)” 形式存在,一个主题下有多个分区,分区内消息按 “偏移量(Offset)” 有序存储,支持分布式存储和并行消费(每个分区可被一个消费者消费)。
6. Broker 节点
- 定义:MQ 的 “服务实例”,是上述所有组件(交换机、队列、消息存储)的物理载体,负责接收生产者消息、存储消息、向消费者投递消息。
- 部署模式:
- 单机模式:仅一个 Broker 节点,用于测试;
- 集群模式:多个 Broker 节点组成集群,实现负载均衡和高可用(如 Kafka 集群通过 “副本(Replica)” 机制保证分区数据的冗余,RocketMQ 通过 “主从架构” 实现故障转移)。
7. 消息存储(Message Storage)
- 定义:负责持久化存储消息,避免消息因 Broker 重启或故障丢失。不同 MQ 的存储方案差异显著:
- Kafka:基于 “磁盘日志文件” 存储,消息按分区写入日志文件,通过 “页缓存(Page Cache)” 和 “顺序写入” 优化性能,支持长时间存储(可配置保留期);
- RocketMQ:使用 “混合存储”(内存 + 磁盘),消息先写入内存 “CommitLog”,再异步刷盘持久化,兼顾性能和可靠性;
- RabbitMQ:默认将消息存储在内存(非持久化消息),持久化消息存储在磁盘的 “Mnesia 数据库” 中,适合短时间暂存消息。
8. 消息确认机制(Ack Mechanism)
- 定义:确保消息 “不丢失、不重复、不遗漏” 的核心保障,分为三个阶段:
- 生产者确认(Producer Ack):MQ 接收消息并持久化后,向生产者返回确认信号(如 Kafka 的 ACK 机制,支持 0 不确认、1 主分区确认、-1 所有副本确认);
- 消费者确认(Consumer Ack):消费者处理完消息后,向 MQ 返回确认信号,MQ 收到后删除消息(若消费者崩溃未确认,MQ 会将消息重新投递给其他消费者,避免丢失);
- Broker 内部确认:集群模式下,主节点接收消息后,同步给从节点 / 副本,确认同步完成后才认为消息持久化成功(如 RocketMQ 的主从同步、Kafka 的副本 ISR 机制)。
9. 死信队列(Dead-Letter Queue, DLQ)
- 定义:“异常消息的垃圾桶”,用于存储无法正常消费的消息(如消费者多次处理失败、消息过期、队列满),避免异常消息阻塞正常消费流程。
- 作用:开发人员可后续分析死信队列中的消息,定位业务问题(如数据格式错误、业务逻辑异常),并支持 “死信重投”(将死信消息重新发送到正常队列,重试处理)。
分类
常用的:Kafka、ActiveMQ、RabbitMQ、RocketMQ等
特性 | Apache Kafka | RabbitMQ | Apache RocketMQ | ActiveMQ |
---|---|---|---|---|
核心定位 | 高吞吐量的分布式发布-订阅消息系统,流处理平台 | 通用性强的消息代理,实现了高级消息队列协议(AMQP) | 低延迟/高吞吐的分布式消息和流处理平台 | 成熟的面向消息的中间件(MOM),支持多种协议 |
语言/社区 | Scala/Java,Apache 顶级项目,非常活跃 | Erlang,非常活跃 | Java,Apache 顶级项目,非常活跃(尤其在中国) | Java,Apache 项目,ActiveMQ 5.x 经典版较慢,ActiveMQ Artemis 较新 |
吞吐量 | 极高(百万级/秒),吞吐量为首要设计目标 | 中等(万级/秒) | 极高(十万级/秒),与 Kafka 媲美 | 一般(万级/秒),Artemis 版本有显著提升 |
延迟 | 毫秒级(非严格实时) | 微秒级(极低) | 毫秒级(低) | 毫秒级 |
消息可靠性 | 非常高(通过副本机制保证) | 非常高(通过确认、持久化、镜像队列保证) | 非常高(刷盘、同步复制) | 高(支持持久化) |
功能特性 | - 顺序读写、高吞吐 - 持久化、多副本 - 丰富的流处理生态(Kafka Streams, Connect) | - 消息确认、持久化、优先级 - 灵活的路由(Exchange) - 死信队列、延迟队列(插件) | - 顺序消息、事务消息 - 消息回溯、定时/延迟消息 - 消息轨迹、审计 | - 支持多种协议(OpenWire, STOMP, AMQP, MQTT) - 主从架构、消息组 |
优点 | 1. 吞吐量巨大,性能卓越 2. 持久化能力强,适合海量数据 3. 分布式、扩展性极佳 4. 生态丰富,流处理支持好 | 1. 成熟稳定,社区活跃 2. 管理界面友好,易于监控 3. 协议支持丰富,灵活路由 4. 延迟极低,适合实时消息 | 1. 高吞吐、低延迟 2. 功能全面(顺序/事务/延迟消息) 3. 金融级数据可靠性 4. 阿里背书,久经考验 | 1. 支持协议最多,集成方便 2. 与 JMS 规范完美集成 3. 部署简单,易于上手 |
缺点 | 1. 功能较单一(主要是Pub/Sub) 2. 监控和管理工具相对复杂 3. 可能会消息重复(At least once) | 1. Erlang 语言,二次开发难度大 2. 吞吐量有瓶颈,不如 Kafka/RocketMQ 3. 集群动态扩展稍麻烦 | 1. 社区国际化程度不如 Kafka 2. 客户端语言支持主要为 Java 3. 文档和最佳实践相对较少(英文) | 1. 经典版性能一般,吞吐量较低 2. 社区活跃度下降(转向Artemis) 3. 不适合超大规模场景 |
主要协议 | 自定义协议(基于TCP) | AMQP 0-9-1(核心),STOMP, MQTT | 自定义协议(基于TCP),JMS | OpenWire(默认),AMQP, STOMP, MQTT, JMS |
持久化 | 磁盘顺序追加日志 | 内存、磁盘(支持消息和队列索引) | 磁盘高速读写 | 支持(KahaDB, JDBC, LevelDB等) |
适用场景 | - 日志收集、流式数据处理 - 用户活动跟踪(点击流) - 监控Metrics聚合 - 事件溯源 | - 企业级应用集成(EAI) - 异步任务处理(后台作业) - 订单处理、通知 - 需要复杂路由规则的场景 | - 金融交易(如订单、支付) - 电商峰值削谷(如秒杀) - 大数据分析管道 - 需要严格顺序和事务的场景 | - 传统的企业级JMS应用 - 需要支持多种协议(如MQTT用于IoT) - 中小规模的消息代理 |
大数据用 Kafka,通用集成用 RabbitMQ,金融电商等高要求场景用 RocketMQ,传统 JMS 或多协议需求考虑 ActiveMQ/Artemis。
消息可靠性
如何保证消息的可靠性?
生产者端:
使用 异步Confirm机制(或事务消息)。
处理Broker返回的NACK和超时情况,并实现重发逻辑。
(可选)在本地缓存已发送未确认的消息,用于故障恢复。
Broker端:
设置消息和队列为持久化。
配置至少2个以上副本的同步复制(如Kafka的
min.insync.replicas > 1
和acks=all
,RabbitMQ的镜像队列同步模式)。
消费者端:
关闭自动确认,开启手动确认。
确保业务逻辑处理成功后再发送ACK。
实现消费幂等性,以应对重复消息。