✅ 11.1 为什么要用消息队列?在哪些场景下最适合?
✅ 作用:
-
削峰填谷:缓解高并发压力,异步处理任务(如秒杀下单 → MQ → 异步扣库存)
-
解耦服务:上下游解耦(如下单服务和短信服务)
-
异步处理:降低请求延迟(如下单后异步发短信、记录日志)
-
广播通知:多服务订阅同一事件(如订单支付成功 → 通知物流、用户、财务)
✅ 典型使用场景:
-
秒杀限流、异步发送邮件/短信
-
日志收集、用户行为打点
-
微服务事件驱动架构
-
延迟任务、定时推送
✅ 11.2 RabbitMQ / Kafka / Redis Stream 的区别与适用场景?
特性 | RabbitMQ | Kafka | Redis Stream |
---|---|---|---|
类型 | AMQP 协议消息队列 | 分布式日志系统(高吞吐) | 内存消息队列(轻量级) |
吞吐量 | 中 | 高(百万级 TPS) | 中 |
消息顺序 | 支持 | 分区内保证顺序 | 支持 |
持久化 | 支持磁盘持久化 | 高效持久化 | 内存为主+RDB持久化 |
消费模式 | 推模式(push) | 拉模式(pull) | 拉(XREAD)+消费者组 |
优势 | 易用、功能全、支持ACK/TTL等 | 高性能、大数据分析、日志采集 | Node.js 原生整合好、灵活 |
场景 | 通用业务异步、RPC、订单消息等 | 日志分析、监控链路、大规模消费 | 中小型异步任务队列 |
✅ 11.3 MQ 如何保证消息不丢?如何实现消息的重试机制?
✅ 防止消息丢失的策略:
-
生产端确认机制:
-
RabbitMQ 开启
publisher confirm
-
Kafka 开启
acks=all
-
Redis 使用
XADD
并手动确认消费
-
-
持久化机制:
-
RabbitMQ:设置
durable queue
+persistent message
-
Kafka:磁盘持久化 + 日志压缩
-
Redis Stream:持久化配置 + 手动 ack
-
-
消费端手动 ack:
-
消费成功才 ack,失败不确认可重试
-
✅ 消息重试机制:
-
RabbitMQ:死信队列(DLX)+ 延迟交换器 + retry 次数记录
-
Kafka:设置 retry/backoff,或使用重试 topic
-
Redis:失败记录进失败队列或重入主队列,配合重试次数标记
✅ 11.4 如何处理消息重复消费问题?幂等性的实现方法?
✅ 产生原因:
-
消息重试或网络问题,导致同一消息被多次消费
✅ 幂等处理策略:
-
唯一标识:每条消息带唯一
msgId
,消费记录保存到 Redis / DB -
操作前去重判断:如
INSERT ... ON DUPLICATE KEY UPDATE
-
数据库事务或锁机制:防止并发写入重复
-
消费端去重队列:维护消费记录(如 Redis Set)
✅ 11.5 如何做异步任务处理?Node.js 中有哪些队列库?
✅ 实现方式:
-
使用消息队列发送异步任务,由专门的 worker 进程消费
✅ 常用队列库:
库名 | 特点 |
---|---|
BullMQ | 基于 Redis,支持任务调度/重试/延迟/优先级 |
Bee-Queue | 简洁快速,适合中小项目 |
Agenda | MongoDB 支持,适用于定时任务 |
Kue | 老牌 Redis 队列,但维护较少 |
Bree | 基于原生 worker_threads 和 cron |
推荐 BullMQ(配合 Redis Stream 实现可靠任务队列)
✅ 11.6 MQ 如何处理消息堆积问题?如何限速?
✅ 消息堆积原因:
-
消费端消费慢 / 宕机
-
消息生产过快
-
任务执行时间长
✅ 解决方式:
-
增加消费者并发(水平扩展 Worker)
-
限速控制生产端(令牌桶算法、漏桶算法)
-
异步分批处理任务(批量消费)
-
设置 TTL 和死信队列:避免无效消息堆积
-
流量预估 + 指标报警(Prometheus + Grafana)
✅ 11.7 什么是发布/订阅模型?和点对点模型的区别?
模型 | 描述 | 例子 |
---|---|---|
发布/订阅(Pub/Sub) | 一个生产者 → 多个订阅者都能收到消息 | Redis Pub/Sub,Kafka Topic |
点对点(P2P) | 每条消息只有一个消费者能收到 | RabbitMQ work queue 模式 |
Pub/Sub 更适合广播场景(如发系统通知),点对点适合任务队列场景(如视频转码)
✅ 11.8 如何实现延迟消息、定时消息发送?
✅ 常见实现方式:
工具 | 实现方式 |
---|---|
RabbitMQ | 使用延迟插件 rabbitmq_delayed_message_exchange 或 TTL + 死信队列 |
Kafka | 无原生支持,可通过延迟 topic + 轮询实现 |
Redis | ZSET + 时间戳排序(如 BullMQ 的延迟任务) |
BullMQ | job.delay() 支持毫秒级延迟 |
✅ 11.9 如何监控消息队列的健康状态和消费情况?
✅ RabbitMQ:
-
管理面板(Web UI)
-
队列堆积长度、连接数、消费速率
-
配合 Prometheus + Grafana 面板
✅ Kafka:
-
Kafka Manager、Kafka UI
-
消费组 lag(滞后)监控
-
消息发送/消费 TPS
✅ Redis:
-
RedisInsight、命令行查看 stream length
-
BullMQ 提供 UI 面板(bull-board、arena)
✅ 11.10 NestJS 如何使用 RabbitMQ / Kafka?使用过 @EventPattern() 吗?
NestJS 提供微服务模块,支持多种 MQ 协议。
✅ 使用方式(以 RabbitMQ 为例):
// main.ts
const app = await NestFactory.createMicroservice(AppModule, {transport: Transport.RMQ,options: {urls: ['amqp://localhost:5672'],queue: 'tasks_queue',queueOptions: { durable: true },},
});
await app.listen();
✅ 消费消息(使用 @EventPattern):
import { Controller } from '@nestjs/common';
import { EventPattern, Payload, Ctx, RmqContext } from '@nestjs/microservices';@Controller()
export class TaskConsumer {@EventPattern('user.created')handleUserCreated(@Payload() data: any, @Ctx() context: RmqContext) {const channel = context.getChannelRef();const originalMsg = context.getMessage();console.log('Received user event:', data);// ack 消息channel.ack(originalMsg);}
}
@EventPattern()
用于监听 MQ 的某个 routing key 或 topic 事件,适用于异步事件处理。
✅ 总结表格
编号 | 关键知识点 | 核心点 |
---|---|---|
11.1 | 为什么使用 MQ | 解耦、异步、削峰填谷 |
11.2 | MQ 类型对比 | RabbitMQ / Kafka / Redis Stream 区别 |
11.3 | 消息不丢 + 重试 | ACK + 持久化 + 死信队列 |
11.4 | 消息幂等性 | msgId 去重、Redis 标记 |
11.5 | 异步任务库 | BullMQ / Bee-Queue / Agenda |
11.6 | 消息堆积限速 | 扩容 + 限流算法 + TTL |
11.7 | Pub/Sub vs P2P | 广播 vs 分发 |
11.8 | 延迟消息 | TTL + 死信 + ZSET + delay job |
11.9 | 队列监控工具 | Prometheus / RabbitMQ UI / Kafka Manager |
11.10 | NestJS 消费 MQ | @EventPattern + Transport 设置 |