为什么选择 RabbitMQ?*
RabbitMQ 是一款可靠且成熟的消息代理和流处理中间件,可轻松部署在云端、本地数据中心或您的开发机上,目前已被全球数百万用户使用。
优势在哪里
互操作性
RabbitMQ 支持多种开放标准协议,包括 AMQP 1.0 和 MQTT 5.0,并提供多种编程语言的客户端库,您只需选择适合自己语言的版本即可,无需担心厂商锁定!
灵活性
RabbitMQ 提供多种功能组合,可自定义消息从生产者到单个或多个消费者的传递方式,如路由、过滤、流处理、联邦传输等,应有尽有。
可靠性
通过消息确认机制和集群消息复制,RabbitMQ 能确保您的消息安全无误。
总结
RabbitMQ 是轻量级、易用、功能全面的消息中间件,适合需要灵活路由、多协议支持、高可靠性的场景。如果您的业务需要简单的任务队列或复杂的事件路由,RabbitMQ 是理想选择;若是日志处理或大数据流,可考虑 Kafka。
常见使用场景示例
以下是来自社区或客户的一些典型用例,帮助您更好地理解 RabbitMQ 的功能及其实际应用价值。
1. 解耦互联服务(通过RabbitMQ实现通知系统)
场景描述
你有一个后端服务,需要向终端用户发送通知。通知有两种渠道:
- 电子邮件(Email)
- 移动应用推送(Push Notification)
传统做法可能是后端直接调用邮件服务和推送服务,但这会导致:
- 系统紧耦合:如果邮件服务宕机,可能影响整个通知流程。
- 无法应对突发流量:如果突然有大量通知请求,后端可能被压垮。
- 维护困难:升级或维护某个通知服务时,需要停用整个系统。
RabbitMQ 的解决方案
使用 RabbitMQ 解耦后端服务和通知渠道,流程如下:
-
后端服务(Producer) 生成通知后,不直接调用邮件或推送服务,而是将消息发布到两个独立的队列:
- email_queue(邮件队列)
- push_queue(推送队列)
-
邮件服务和推送服务(Consumer) 各自订阅对应的队列,并异步处理消息。
-
[Backend Service]
│
├── Publishes to → [email_queue] → Consumed by [Email Service]
│
└── Publishes to → [push_queue] → Consumed by [Push Notification Service]
-
2. 基于 RabbitMQ 的远程过程调用(RPC)实现票务系统
场景描述
你经营一个音乐厅,门票通过多个渠道销售:
- 线上网站
- 线下自助终端(Kiosk)
所有渠道的订单都需要经过复杂的票务库存处理(如检查余票、分配座位等),并且要求极低延迟的响应(用户下单后需立即知道是否成功)。
传统同步调用的问题
如果直接使用同步调用(如 HTTP API):
- 高延迟:库存服务处理时间不稳定,可能阻塞前端响应。
- 系统耦合:库存服务宕机会导致所有销售渠道不可用。
- 并发冲突:多个渠道同时抢票时,可能超卖(需依赖数据库事务,性能低下)。
RabbitMQ 的 RPC 解决方案
通过 RabbitMQ 实现异步 RPC 调用,流程如下:
-
1、发布订单请求
-
网站或终端(RPC Client)生成订单,附带唯一
correlation_id
,发送到请求队列 -
[Client] → Publish to → [rpc_request_queue]
(Message: {order_details, correlation_id})
-
-
2、处理订单
- 库存服务(RPC Server)从
rpc_request_queue
消费消息,处理业务逻辑(如检查余票)。
- 库存服务(RPC Server)从
-
3、返回响应
- 库存服务将处理结果(成功/失败)发送到响应队列,并携带相同的
correlation_id
: -
[Server] → Publish to → [rpc_reply_queue]
(Message: {result, correlation_id})
- 库存服务将处理结果(成功/失败)发送到响应队列,并携带相同的
-
4、客户端接收响应
- 客户端监听
rpc_reply_queue
,通过correlation_id
匹配自己的请求,获取结果。 -
±---------------+ ±------------------+
| RPC Client | | RPC Server |
| (Website/Kiosk)| | (Inventory Service)|
±------±-------+ ±--------±--------+
| |
| 1. Order + correlation_id|
±------------------------------------->+
| |
| 3. Result + correlation_id|
<--------------------------------------+
| |
±------±-------+ ±--------±--------+
| rpc_request_queue | rpc_reply_queue |
±---------------+ ±------------------+
- 客户端监听
关键设计点
-
关联 ID(correlation_id)
确保响应与请求正确匹配,避免多客户端场景下的混乱。 -
队列选择
- 经典队列(Classic Queue):低延迟,但消息可能丢失(适合可重试场景)。
- 仲裁队列(Quorum Queue):高可靠性,通过多数节点确认保证消息安全(适合金融级场景)。
-
序列化处理
所有订单按先进先出(FIFO)处理,避免超卖,无需数据库事务。
实际应用扩展
- 机票预订系统:多平台同时抢票时,通过 RabbitMQ RPC 保证库存一致性。
- 秒杀活动:突发流量下,用队列缓冲请求,按序处理。
3、RabbitMQ流式处理(Streaming)详解
场景描述
你运营一个视频平台(如B站、YouTube)。当用户上传视频后,系统需要完成多个后续任务:
1. 实时任务:立即通知订阅该UP主的粉丝。
2、延迟任务:
-
视频内容分析(如AI鉴黄、标签提取)
-
转码生成多种清晰度的副本(如1080p、720p)
-
数据统计(如播放量预测)
传统架构的问题
若用普通消息队列(如RabbitMQ经典队列):
-
消息重复:每个任务需要独立队列,同一视频上传事件会被复制多份(通知队列、转码队列等),浪费资源。
-
无法回溯:消息被消费后即删除,若分析服务崩溃,可能丢失数据。
RabbitMQ Streams的解决方案
1. 核心机制
- 持久化流(Stream):
上传服务将“新视频”事件按顺序追加到唯一流中(类似数据库的WAL日志)。
[视频上传事件流]
──▶ [事件1: 视频A上传] → [事件2: 视频B上传] → [事件3: 视频C上传] → …
- 多消费者独立订阅:
每个后端服务(通知、转码、分析)可独立读取流中的事件,互不干扰。
2. 工作流程
-
2.1视频上传完成 → 事件写入流(如video_upload_stream)。
-
2.2通知服务:
- 实时读取最新事件,立即推送粉丝。
- 读取后不删除事件,其他服务仍可访问。 -
2.3转码服务:
按自身速度消费事件,生成多分辨率视频。 -
2.4分析服务:
每天凌晨批量读取全天事件,运行离线分析。
3. 关键优势
优势 | 说明 |
---|---|
高效无重复 | 所有任务共享同一流,无需为每个任务复制消息。 |
消费者独立性 | 每个服务可自由控制读取位置(如通知读最新,分析读历史)。 |
消息可回溯 | 即使服务崩溃,重启后可从断点继续消费,数据不丢失。 |
高吞吐 | 流式存储针对顺序读写优化,性能高于传统队列。 |
技术对比:Stream vs 经典队列
特性 | RabbitMQ经典队列 | RabbitMQ Streams |
---|---|---|
消息生命周期 | 消费后默认删除 | 永久存储(除非手动删除) |
多消费者 | 同一队列消息只能被一个消费者 | 多消费者独立读取同一流 |
回溯能力 | 不支持 | 支持(通过偏移量定位) |
适用场景 | 任务分发、RPC | 事件溯源、日志处理 |
实际应用示例
-
视频平台(如案例所述)
-
电商订单流水:
- 订单创建事件写入流,实时通知用户、异步更新库存、离线生成报表。
-
物联网传感器数据:
- 设备数据实时流,同时供监控告警和离线分析使用。
为何选择RabbitMQ Streams?
-
解耦:上传服务无需知道下游有哪些任务。
-
弹性:新增任务(如后续增加“视频指纹提取”)只需新增一个消费者,无需改原有架构。
-
可靠性:消息持久化,即使服务器重启也不丢失。
一句话总结: RabbitMQ Streams通过持久化、可共享的消息流,实现了高效、灵活的事件驱动架构,特别适合混合实时与离线处理的场景。
4、基于 RabbitMQ 的星际无人机(IoT)通信系统
场景描述
你经营一家提供银河系包裹配送的公司,拥有大量太空无人机(Space Drones)。这些无人机需要定期向位于系外行星 Kepler-438 b 的中央服务器上报状态(如位置、电量、包裹状态)。但星际网络连接极不稳定,经常因行星轨道错位或宇宙干扰中断。
传统方案的挑战
若直接让无人机通过 HTTP 或 TCP 连接中央服务器:
- 网络不可靠:信号延迟高(可能几小时到几天),连接频繁中断。
- 数据丢失风险:断网时无人机的状态报告无法送达。
- 海量连接管理:数百万无人机同时在线,服务器难以承受。
RabbitMQ 的解决方案
-
1. 分布式消息缓冲架构
-
每个无人机本地部署 RabbitMQ 节点:
- 无人机将状态报告先写入本地 RabbitMQ(边缘存储),避免依赖实时网络。
- 数据持久化,即使无人机重启也不会丢失。
-
上游中心化 RabbitMQ:
- 位于 Kepler-438 b 的服务器运行主 RabbitMQ,接收所有无人机的最终数据。
-
-
2. 网络恢复后同步数据
-
行星对齐时(网络恢复):
本地 RabbitMQ 通过 Shovel/Federation 插件,将积压的报告批量同步到上游服务器。-
Shovel:单向跨节点消息转移,适合临时连接。
-
Federation:动态拓扑的消息同步,适合复杂网络。
-
-
-
3. 协议选择
-
MQTT 协议:
-
无人机与本地 RabbitMQ 之间使用 MQTT(轻量级 IoT 协议),支持:
- 低功耗(适合太空设备)。
- 高并发连接(百万级无人机)。
-
RabbitMQ 通过 MQTT 插件 兼容此协议
-
-
系统架构图
[无人机 Drone 1] → [本地 RabbitMQ] -
[无人机 Drone 2] → [本地 RabbitMQ] → 行星对齐时同步 → [上游 RabbitMQ (Kepler-438 b)] → [中央服务器]
… /
[无人机 Drone N] → [本地 RabbitMQ]
核心优势(Benefits)
优势 | 说明 |
---|---|
离线优先 | 无人机在网络中断时仍能持续记录数据,不会丢失报告。 |
弹性网络适应 | 仅在网络可用时同步,容忍高延迟和间歇性连接。 |
高并发支持 | MQTT 协议优化海量设备连接,RabbitMQ 集群横向扩展。 |
解耦与可靠性 | 中央服务器无需知道无人机是否在线,消息最终一致。 |
关键技术组件
1、RabbitMQ Shovel
- 将本地队列的消息自动转发到上游交换器。
- 配置示例:
{rabbitmq_shovel, [{shovels, [{drone_to_central, [{sources, [{brokers, ["localhost"]}]},{destinations, [{brokers, ["kepler-438b.server"]}]},{queue, <<"drone_reports">>}]}]}
]}
2、RabbitMQ Federation
-
跨星球的 RabbitMQ 节点形成联邦,按需同步消息。
-
适合长期网络不稳定的场景。
3、MQTT 插件
- 启用 MQTT 协议支持:
rabbitmq-plugins enable rabbitmq_mqtt
对比其他消息中间件
特性 | RabbitMQ (+MQTT) | Kafka | AWS IoT Core |
---|---|---|---|
离线支持 | ✔️(边缘节点缓冲) | ❌(需持续连接) | ✔️(但需额外配置) |
协议兼容性 | MQTT/AMQP/STOMP | 自定义协议 | 仅MQTT/HTTP |
部署复杂度 | 中(需配置Shovel) | 高 | 低(全托管) |
适用场景 | 边缘计算+最终一致性 | 高吞吐日志流 | 纯云端IoT |
实际应用扩展
-
地球上的类似场景:
-
远洋船舶、油田设备在卫星网络间歇可用时的数据上报。
-
军事无人机在敌占区的隐蔽通信。
-
其他行业:
- 智能电网(电表数据批量同步)。
- 自动驾驶车队(离线时本地存储,网络恢复后上传)。
总结
通过 RabbitMQ 的分布式消息队列 + MQTT 协议,该系统实现了:
- 抗网络中断:边缘节点缓冲数据,行星对齐时同步。
- 海量设备管理:MQTT 支持百万级无人机连接。
- 灵活性:Shovel/Federation 适应不同网络拓扑。
这种架构是 深空通信、边缘计算、IoT 设备管理 的理想选择! 🚀
RabbitMQ工作模型
rabbitmq和mysql作类比,理解rabbitmq概念:
- broker 相当于mysql服务器
- virtual host相当于数据库(一个mysql服务器可以有多个数据库,一台broker可以有多个虚拟主机)
- queue相当于表(一个数据库有多张表,一个虚拟主机有多个队列)
- 消息相当于记录(一张表中有多条记录,一个队列中有多条消息)
消息队列有三个核心要素: 消息生产者
、消息队列
、消息消费者
;
生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)
消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)
代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;
连接(Connection):连接RabbitMQ服务器的TCP长连接;
信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道进行的;
虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual host创建exchange/queue等;(分类比较清晰、相互隔离)
交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用;
路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址;
绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);
队列(Queue):存储消息的缓存;
消息(Message):由生产者通过RabbitMQ发送给消费者的信息;(消息可以为任何数据,字符串,user对象,json串,图片,mp3,视频等等)
Exchange(X) 可翻译成交换机/交换器/路由器