1.内存级Spring_Event
1.1 控制器层:StringTextController
/*** 字符串文本管理控制器* 提供通过消息队列异步获取文本信息的接口*/
@RestController
@RequestMapping("/api/string-text")
public class StringTextController {@Resourceprivate StringTextProducer stringTextProducer;/*** 通过消息队列异步查询字符串文本信息* * 流程:* 1. 接收前端查询参数* 2. 封装为消息对象发送到消息队列* 3. 立即返回"消息已发送"响应,不等待实际处理结果* * 优点:* - 避免长耗时查询阻塞HTTP连接* - 支持水平扩展处理能力* * @param pageReqVO 分页查询参数* @return 统一响应结果(仅包含消息发送状态)*/@GetMapping("/getStringTextByMQ")@Operation(summary = "通过消息队列获取test信息")public CommonResult<String> getStringTextByMQ(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到MQ请求,线程名: {}", Thread.currentThread().getName());try {// 发送消息到队列(异步处理)stringTextProducer.sendStringTextQueryMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("消息已发送,异步处理中");} catch (Exception e) {log.error("消息发送异常", e);return CommonResult.error(500, "消息发送失败");}}
}
1.2 消息生产者:StringTextProducer
/*** 字符串文本查询消息生产者* 负责将查询请求封装为消息并发送到消息队列* * 设计模式:* - 使用Spring事件机制作为轻量级消息队列* - 可无缝切换为Kafka/RocketMQ等真正的MQ系统*/
@Slf4j
@Component
public class StringTextProducer {@Resourceprivate ApplicationContext applicationContext;/*** 发送字符串查询消息* * @param text 查询文本(用于模糊匹配)* @param pageNo 页码* @param pageSize 每页大小* * 消息传递机制:* 1. 创建StringTextQueryMessage对象* 2. 通过Spring事件发布器将消息广播到事件总线* 3. 由StringTextConsumer监听并处理该消息*/public void sendStringTextQueryMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryMessage message = new StringTextQueryMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);log.info("[sendStringTextQueryMessage][发送消息: {}]", message);// 使用Spring事件机制发布消息// 注意:此处使用同步事件,通过@Async注解在Consumer端实现异步applicationContext.publishEvent(message);}
}
1.3 消息消费者:StringTextConsumer
/*** 字符串文本查询消息消费者* 负责从消息队列接收查询请求并执行实际查询操作* * 线程模型:* - 使用@Async注解指定专用线程池"curdAsyncExecutor"* - 实现请求处理与HTTP请求线程分离*/
@Slf4j
@Component
public class StringTextConsumer {@Resourceprivate StringTextService stringTextService;/*** 处理字符串查询消息* * @param message 查询消息对象* * 执行流程:* 1. 从消息中提取查询参数* 2. 调用Service层执行实际查询* 3. 记录查询结果(可扩展为保存到结果表或通知前端)*/@EventListener@Async("curdAsyncExecutor") // 使用专用异步线程池public void onMessage(StringTextQueryMessage message) {log.info("[onMessage][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());// 1. 将消息转换为请求对象StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());// 2. 执行实际查询(可能是数据库或外部系统)PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][处理结果: {}]", pageResult);// 3. 可扩展逻辑:// - 将结果保存到临时表// - 推送WebSocket通知前端结果就绪// - 触发后续处理流程}
}
1.4 消息模型:StringTextQueryMessage
/*** 字符串文本查询消息* 用于在生产者和消费者之间传递查询参数* * 设计特点:* - 实现Serializable接口以便于消息传输* - 使用JSR-303注解进行参数校验* - 字段与StringTextPageReqVO保持语义一致*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StringTextQueryMessage implements Serializable {private static final long serialVersionUID = 20230610L;/*** 查询的文本内容(用于模糊查询)*/private String text;/*** 页码(从1开始)*/@NotNull(message = "页码不能为空")private Integer pageNo;/*** 每页大小*/@NotNull(message = "每页大小不能为空")private Integer pageSize;
}
客户端 → HTTP请求 → StringTextController → StringTextProducer → 消息队列 → StringTextConsumer → StringTextService → 数据库查询
1.5 问题提出
1.5.1 生产者如何找到消费者?
- StringTextProducer 通过 applicationContext.publishEvent 发布 StringTextQueryMessage 事件。
- Spring 容器根据事件类型(StringTextQueryMessage)将事件分发给 StringTextConsumer 的 @EventListener 方法。
- 匹配基于 Java 类型系统,参数类型必须兼容事件类型。
1.5.2 假如添加多个新的接收参数为StringTextQueryMessage的消费者,还有几个是接收的参数是StringTextQueryMessage及其子类,会全部接收到还是会匹配精度最高的?
- 事件类型匹配:
- Spring 根据事件对象的 运行时类型(StringTextQueryMessage 或其子类)查找所有 @EventListener 方法,检查其参数类型是否与事件类型兼容。
- 兼容的规则是:监听方法的参数类型可以是事件类型的 类本身、超类 或 接口。
- 例如,如果发布的事件是 StringTextQueryMessage,所有参数类型为 StringTextQueryMessage、其超类(如 Object)或接口的 @EventListener 方法都会被调用。
- 多消费者行为:
- Spring Event 不基于“精度最高”选择单个消费者,而是 广播 事件给所有匹配的监听器。
- 如果有多个消费者监听 StringTextQueryMessage 或其子类,Spring 会按顺序调用所有匹配的 @EventListener 方法。
- 执行顺序:
- 默认情况下,Spring 不保证 @EventListener 方法的调用顺序。
- 可以通过 @Order 注解或实现 Ordered 接口指定执行顺序(较低的 order 值优先执行)。
- 异步性:
- 如果 @EventListener 方法标注了 @Async(如您的 StringTextConsumer),每个消费者的处理会在异步线程中执行,互不阻塞。
1.5.3以上情况能不能指定一个consumer匹配?
方法 1:使用条件注解(@EventListener(condition = ...))
- 在 @EventListener 中添加 condition 属性,通过 SpEL(Spring Expression Language)过滤事件。
- 示例:修改 StringTextConsumer1 和 StringTextConsumer2,添加条件:
@EventListener(condition = "#message.text == 'specific'")
如果 message.text == "specific",只有 Consumer1 处理。
方法 2:使用自定义事件路由
- 引入一个事件路由机制,通过事件对象的额外属性指定目标消费者。
- 修改 StringTextQueryMessage,添加 consumerId 字段:
@Data
public class StringTextQueryMessage {
private String text;
@NotNull(message = "页码不能为空")
private Integer pageNo;
@NotNull(message = "每页大小不能为空")
private Integer pageSize;
private String consumerId; // 新增:指定目标消费者
}
@Slf4j
@Component
public class StringTextConsumer1 {
@Resource
private StringTextService stringTextService;
@EventListener
@Async("curdAsyncExecutor")
public void onMessage(StringTextQueryMessage message) {
if (!"consumer1".equals(message.getConsumerId())) {
return; // 忽略不匹配的 consumerId
}
log.info("[Consumer1][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());
StringTextPageReqVO reqVO = new StringTextPageReqVO();
reqVO.setText(message.getText());
reqVO.setPageNo(message.getPageNo());
reqVO.setPageSize(message.getPageSize());
PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);
log.info("[Consumer1][处理结果: {}]", pageResult);
}
}
@Slf4j
@Component
public class StringTextConsumer2 {
@EventListener
@Async("curdAsyncExecutor")
public void onMessage(StringTextQueryMessage message) {
if (!"consumer2".equals(message.getConsumerId())) {
return;
}
log.info("[Consumer2][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());
}
}
方法 3:自定义事件分发器
- 重写 Spring 的事件分发逻辑,使用自定义 ApplicationEventMulticaster:
- 创建自定义 ApplicationEventMulticaster:
@Component("applicationEventMulticaster")
public class CustomEventMulticaster extends SimpleApplicationEventMulticaster {
@Override
public void multicastEvent(ApplicationEvent event) {
if (event instanceof StringTextQueryMessage) {
StringTextQueryMessage message = (StringTextQueryMessage) event;
// 根据条件选择特定消费者,例如 consumerId
getApplicationListeners(event).stream()
.filter(listener -> {
// 假设 listener 的方法名或类名包含 consumerId
return listener.getListenerId().contains(message.getConsumerId());
})
.forEach(listener -> listener.onApplicationEvent(event));
} else {
super.multicastEvent(event);
}
}
}
执行结果如下:
2025-06-10 10:39:03.349 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor | [preHandle][开始请求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 参数({text=username})]Controller 方法路径:cn.iocoder.moyun.module.curd.controller.admin.CurdTestController(CurdTestController.java:57)
2025-06-10 10:39:03.355 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.c.admin.CurdTestController | 控制器接收到MQ请求,线程名: http-nio-48080-exec-7
2025-06-10 10:39:03.355 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.m.p.sms.StringTextProducer | [sendStringTextQueryMessage][发送消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10)]
2025-06-10 10:39:03.357 | INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor | [afterCompletion][完成请求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 耗时(6 ms)]
2025-06-10 10:39:03.358 | INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer | [onMessage][接收消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10),线程名: curd-async-2]
2025-06-10 10:39:03.374 | INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.s.s.StringTextServiceImpl | 同步查询结果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"临时用户","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"测试用户","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理员","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)
2025-06-10 10:39:03.375 | INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer | [onMessage][处理结果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"临时用户","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"测试用户","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理员","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)]
2. Redis消息队列
第一部分:Redis完成MQ的详细过程梳理
Yudao项目利用Redis实现了两种消息队列机制:
- Redis Stream:用于集群消费,消息持久化存储,支持消费者组,适合需要可靠性和消息追踪的场景。
- Redis Pub/Sub(Channel):用于广播消费,消息不持久化,适合实时性要求高的场景,所有订阅者都会收到消息。
我将模拟一个完整的消息发送和消费过程,分别讲解Stream和Channel的实现,详细说明每个类的作用、注册方式以及它们如何协作。
2.1 Redis Stream消息队列的完整过程
场景:通过/getStringTextByStream接口发送Stream消息
假设用户通过HTTP请求调用/getStringTextByStream接口,传入StringTextPageReqVO对象(包含text、pageNo、pageSize字段)。以下是消息从发送到消费的详细流程:
步骤1:接收HTTP请求(Controller层)
- 相关类:@GetMapping("/getStringTextByStream")(未显式定义类名,假设为StringTextController)
- 作用:这是一个Spring MVC控制器方法,负责接收前端的HTTP GET请求,处理通过Redis Stream发送消息的逻辑。
- 功能:
- 接收StringTextPageReqVO参数,包含分页查询的信息(如text、pageNo、pageSize)。
- 调用StringTextRedisProducer的sendStreamMessage方法发送消息。
- 返回CommonResult表示消息已发送,异步处理中。
- 框架注册:
- 通过Spring的@GetMapping注解,自动注册为一个HTTP端点。
- 依赖Spring Boot的Web模块,控制器类通常标注@RestController,由Spring容器管理。
- 与其他类的交互:
- 依赖StringTextRedisProducer来发送Stream消息。
- 使用StringTextPageReqVO作为请求参数的VO(Value Object)。
- 代码分析:
@GetMapping("/getStringTextByStream")
public CommonResult<String> getStringTextByStream(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到 Stream 请求,线程名: {}", Thread.currentThread().getName());try {stringTextRedisProducer.sendStreamMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("Stream 消息已发送,异步处理中");} catch (Exception e) {log.error("Stream 消息发送异常", e);return CommonResult.error(500, "Stream 消息发送失败");}
}
- 控制器接收到请求后,提取pageReqVO中的字段,传递给StringTextRedisProducer。
- 异常处理确保即使发送失败,也能返回错误响应。
步骤2:生产者发送Stream消息
- 相关类:StringTextRedisProducer
- 作用:这是一个生产者类,负责创建并发送Redis Stream消息。
- 功能:
- 提供sendStreamMessage方法,构造StringTextQueryStreamMessage对象,设置text、pageNo、pageSize字段。
- 使用RedisMQTemplate的send方法将消息发送到Redis Stream。
- 记录日志,跟踪消息发送情况。
- 框架注册:
- 标注@Component,由Spring容器管理。
- 通过@Resource注入RedisMQTemplate依赖。
- 与其他类的交互:
- 依赖RedisMQTemplate执行实际的Redis Stream消息发送。
- 使用StringTextQueryStreamMessage作为消息载体。
- 被StringTextController调用。
- 代码分析
@Component
public class StringTextRedisProducer {@Resourceprivate RedisMQTemplate redisMQTemplate;public void sendStreamMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryStreamMessage message = new StringTextQueryStreamMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);redisMQTemplate.send(message);log.info("[sendStreamMessage][发送 Stream 消息: {}]", message);}
}
- 创建StringTextQueryStreamMessage对象,填充请求参数。
- 调用redisMQTemplate.send(message),将消息发送到Redis Stream。
- 相关类:StringTextQueryStreamMessage
- 作用:Stream消息的载体,定义消息的结构和Stream Key。
- 功能:
- 继承AbstractRedisStreamMessage,包含text、pageNo、pageSize字段。
- 重写getStreamKey(),指定Stream的键为"string-text-query-stream"。
- 框架注册:
- 作为普通Java类,无需Spring注册,直接由StringTextRedisProducer实例化。
- 与其他类的交互:
- 被StringTextRedisProducer用于封装消息内容。
- 被StringTextStreamConsumer用于解析消息内容。
- 代码分析:
@Data
public class StringTextQueryStreamMessage extends AbstractRedisStreamMessage {private String text;@NotNull(message = "页码不能为空")private Integer pageNo;@NotNull(message = "每页大小不能为空")private Integer pageSize;@Overridepublic String getStreamKey() {return "string-text-query-stream";}
}
- 定义消息结构,确保pageNo和pageSize不为空。
- 指定Stream Key,用于Redis Stream的消息存储。
步骤3:RedisMQTemplate发送消息
- 相关类:RedisMQTemplate
- 作用:Redis MQ操作的核心模板类,封装了Redis Stream和Pub/Sub的消息发送逻辑。
- 功能:
- 提供send方法,针对Stream消息,将消息序列化为JSON并添加到指定的Stream Key。
- 支持拦截器机制,在消息发送前后调用RedisMessageInterceptor的钩子方法。
- 使用RedisTemplate执行底层的Redis操作。
- 框架注册:
- 通过YudaoRedisMQProducerAutoConfiguration类注册为Spring Bean。
- 依赖StringRedisTemplate和RedisMessageInterceptor列表。
- 与其他类的交互:
- 被StringTextRedisProducer调用,用于发送Stream消息。
- 依赖RedisTemplate执行Redis操作。
- 调用RedisMessageInterceptor处理消息发送前后的扩展逻辑。
- 被StringTextStreamConsumer用于ACK消息。
- 代码分析:
public class RedisMQTemplate {private final RedisTemplate<String, ?> redisTemplate;private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)).withStreamKey(message.getStreamKey()));} finally {sendMessageAfter(message);}}
}
- 调用sendMessageBefore执行拦截器前置逻辑。
- 使用redisTemplate.opsForStream().add将消息添加到Stream,返回RecordId。
- 调用sendMessageAfter执行拦截器后置逻辑。
- 相关类:YudaoRedisMQProducerAutoConfiguration
- 作用:生产者配置类,负责注册RedisMQTemplate。
- 功能:
- 创建RedisMQTemplate实例,注入StringRedisTemplate和拦截器列表。
- 确保在YudaoRedisAutoConfiguration之后加载(依赖Redis配置)。
- 框架注册:
- 标注@AutoConfiguration,由Spring Boot自动加载。
- 使用@Bean注册RedisMQTemplate。
- 与其他类的交互:
- 为StringTextRedisProducer和StringTextStreamConsumer提供RedisMQTemplate。
- 依赖StringRedisTemplate和RedisMessageInterceptor。
- 代码分析:
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {@Beanpublic RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,List<RedisMessageInterceptor> interceptors) {RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);interceptors.forEach(redisMQTemplate::addInterceptor);return redisMQTemplate;}
}
相关类:RedisMessageInterceptor
- 作用:消息拦截器接口,提供发送和消费消息前后的扩展点。
- 功能:
- 定义sendMessageBefore、sendMessageAfter、consumeMessageBefore、consumeMessageAfter方法。
- 适用于多租户场景或其他扩展需求(如日志记录、权限检查)。
- 框架注册:
- 实现类需标注@Component,由Spring容器管理。
- 由YudaoRedisMQProducerAutoConfiguration注入到RedisMQTemplate。
- 与其他类的交互:
- 被RedisMQTemplate调用,用于消息处理扩展。
步骤4:消费者监听和处理Stream消息
- 相关类:StringTextStreamConsumer
- 作用:Stream消息的消费者,负责处理收到的StringTextQueryStreamMessage消息。
- 功能:
- 继承AbstractRedisStreamMessageListener,实现onMessage方法。
- 将消息转换为StringTextPageReqVO,调用StringTextService执行分页查询。
- 记录处理结果日志。
- 框架注册:
- 标注@Component,由Spring容器管理。
- 通过YudaoRedisMQConsumerAutoConfiguration注册到StreamMessageListenerContainer。
- 与其他类的交互:
- 依赖StringTextService执行业务逻辑。
- 依赖RedisMQTemplate进行消息ACK。
- 接收StringTextQueryStreamMessage消息。
- 代码分析:
@Component
public class StringTextStreamConsumer extends AbstractRedisStreamMessageListener<StringTextQueryStreamMessage> {@Resourceprivate StringTextService stringTextService;@Overridepublic void onMessage(StringTextQueryStreamMessage message) {log.info("[onMessage][Stream 消息: {},线程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Stream 处理结果: {}]", pageResult);}
}
- 相关类:AbstractRedisStreamMessageListener
- 作用:Stream消息监听器的抽象基类,封装通用消费逻辑。
- 功能:
- 实现StreamListener接口,处理Redis Stream消息。
- 解析消息为指定类型(StringTextQueryStreamMessage)。
- 调用拦截器前置/后置逻辑,执行消息ACK。
- 框架注册:
- 作为抽象类,不直接注册,其子类(如StringTextStreamConsumer)注册。
- 与其他类的交互:
- 被StringTextStreamConsumer继承。
- 依赖RedisMQTemplate进行ACK。
- 被YudaoRedisMQConsumerAutoConfiguration用于注册监听器。
- 相关类:YudaoRedisMQConsumerAutoConfiguration
- 作用:消费者配置类,负责注册Stream和Pub/Sub的监听容器。
- 功能:
- 创建StreamMessageListenerContainer,配置Stream监听。
- 注册StringTextStreamConsumer到容器,绑定Stream Key和消费者组。
- 创建RedisPendingMessageResendJob处理未消费消息。
- 框架注册:
- 标注@AutoConfiguration,在YudaoRedisAutoConfiguration之后加载。
- 使用@Bean注册StreamMessageListenerContainer和RedisPendingMessageResendJob。
- 与其他类的交互:
- 依赖RedisMQTemplate和StringTextStreamConsumer。
- 为RedisPendingMessageResendJob提供监听器列表。
- 代码分析:
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {// 配置容器,注册监听器}
}
相关类:RedisPendingMessageResendJob
- 作用:定时任务,重新投递Stream中未消费的超时消息。
- 功能:
- 每分钟检查Pending消息,超时(默认5分钟)后重新投递。
- 使用分布式锁(Redisson)确保单实例执行。
- 框架注册:
- 由YudaoRedisMQConsumerAutoConfiguration注册为Bean。
- 标注@Scheduled启用定时任务。
- 与其他类的交互:
- 依赖RedisMQTemplate和StringTextStreamConsumer的监听器列表。
- 使用RedissonClient实现分布式锁。
步骤5:业务逻辑处理
- 相关类:StringTextService
- 作用:业务服务层,执行分页查询逻辑。
- 功能:
- 提供getUserPage方法,根据StringTextPageReqVO查询分页数据。
- 返回PageResult<StringTextDO>。
- 框架注册:
- 通常标注@Service,由Spring容器管理。
- 与其他类的交互:
- 被StringTextStreamConsumer调用。
- 相关类:StringTextPageReqVO
- 作用:分页请求的VO,定义查询参数。
- 功能:
- 包含text、pageNo、pageSize等字段。
- 继承PageParam,支持分页参数。
- 框架注册:
- 作为普通Java类,无需注册。
- 与其他类的交互:
- 被StringTextController接收前端参数。
- 被StringTextStreamConsumer用于构造查询参数。
第二部分:具体类解析
2.2 RedisMQTemplate类
/*** Redis MQ 操作模板类* 封装了基于Redis的两种消息模型的发送操作:* 1. Pub/Sub模式:发布订阅模式,适合广播消息* 2. Stream模式:消息流模式,适合需要消息顺序和可靠性的场景* 同时提供了消息拦截器机制,允许在消息发送前后执行自定义逻辑*/
@AllArgsConstructor
public class RedisMQTemplate {@Getterprivate final RedisTemplate<String, ?> redisTemplate;/*** 拦截器数组* 用于在消息发送前后执行自定义逻辑,如日志记录、指标收集、分布式追踪等*/@Getterprivate final List<RedisMessageInterceptor> interceptors = new ArrayList<>();/*** 发送 Redis 消息,基于 Redis pub/sub 实现* * @param message 继承自AbstractRedisChannelMessage的消息对象* @return void 无返回值* @功能 基于Redis的Pub/Sub模式发布消息,在发送前后分别执行拦截器的前置和后置处理* 使用JSON序列化消息内容,通过message.getChannel()获取消息发布的频道名称*/public <T extends AbstractRedisChannelMessage> void send(T message) {try {sendMessageBefore(message);// 发送消息redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));} finally {sendMessageAfter(message);}}/*** 发送 Redis 消息,基于 Redis Stream 实现* * @param message 继承自AbstractRedisStreamMessage的消息对象* @return RecordId Redis Stream生成的消息ID,格式如"1686475200000-0"* @功能 基于Redis的Stream模式发送消息,在发送前后分别执行拦截器的前置和后置处理* 使用JSON序列化消息内容,通过message.getStreamKey()获取Stream的键名*/public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);// 发送消息return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)) // 设置内容.withStreamKey(message.getStreamKey())); // 设置 stream key} finally {sendMessageAfter(message);}}/*** 添加拦截器* * @param interceptor 消息拦截器对象* @return void 无返回值* @功能 向拦截器列表中添加一个消息拦截器,拦截器将在消息发送前后执行自定义逻辑*/public void addInterceptor(RedisMessageInterceptor interceptor) {interceptors.add(interceptor);}/*** 消息发送前执行拦截器逻辑* * @param message 消息对象* @return void 无返回值* @功能 在消息发送前调用所有拦截器的前置处理方法,按拦截器添加的顺序执行(正序)*/private void sendMessageBefore(AbstractRedisMessage message) {// 正序执行拦截器的前置处理方法interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));}/*** 消息发送后执行拦截器逻辑* * @param message 消息对象* @return void 无返回值* @功能 在消息发送后调用所有拦截器的后置处理方法,按拦截器添加顺序的逆序执行(倒序)*/private void sendMessageAfter(AbstractRedisMessage message) {// 倒序执行拦截器的后置处理方法for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).sendMessageAfter(message);}}}
2.3 消息父类及其子类
AbstractRedisChannelMessageListener自定义监听类父类
/*** Redis Pub/Sub 监听器抽象类,用于实现广播消费** @param <T> 消息类型。一定要填写噢,不然会报错*/
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {/*** 消息类型*/private final Class<T> messageType;/*** Redis Channel*/private final String channel;/*** RedisMQTemplate*/@Setterprivate RedisMQTemplate redisMQTemplate;/*** messageType 在 onMessage 方法中用于将 Redis 收到的消息(JSON 字符串)反序列化为 T 类型的对象* 通过 JsonUtils.parseObject(message.getBody(), messageType),明确的消息类型确保了正确的反序列化* channel 字段用于指定监听器订阅的 Redis Pub/Sub 通道* getChannel() 方法允许每个消息类型定义自己的通道,使监听器能够灵活适应不同的消息类型和通道* messageType 确定了后续消息反序列化的目标类型,确保消息被解析为 StringTextQueryChannelMessage。* channel 确定了监听器订阅的 Redis 通道,Spring Data Redis 会使用 getChannel() 的返回值("string.text.query.channel")订阅该通道。* @SneakyThrows 隐藏了反射相关的异常(如 NoSuchMethodException),假设 StringTextQueryChannelMessage 有无参构造函数且 getChannel() 可访问。*/@SneakyThrowsprotected AbstractRedisChannelMessageListener() {//通过反射获取子类的泛型参数this.messageType = getMessageClass();/*** 具体案例:* StringTextQueryChannelMessage 的无参构造函数* 调用 newInstance() 创建一个 StringTextQueryChannelMessage 实例。* 调用实例的 getChannel() 方法,获取通道名称(如 "string.text.query.channel")*/this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();}/*** 获得 Sub 订阅的 Redis Channel 通道** @return channel*/public final String getChannel() {return channel;}/*** Message message:Redis 传递的原始消息对象,包含消息体(message.getBody())和通道信息* T messageObj = JsonUtils.parseObject(message.getBody(), messageType)* 使用 JsonUtils.parseObject 将消息体(message.getBody(),字节数组)反序列化为 T 类型的对象* messageType 是构造函数中初始化的消息类(Class<T>),确保消息被正确解析为指定类型(如 OrderMessage)* 调用 this.onMessage(messageObj),这是抽象方法,子类必须实现以定义具体的消息处理* messageType 是 StringTextQueryChannelMessage.class(由构造函数设置)。* 具体案例:* JsonUtils.parseObject(message.getBody(), messageType)* 将 JSON 字节数组解析为 StringTextQueryChannelMessage 对象:* @param message* @param bytes*/@Overridepublic final void onMessage(Message message, byte[] bytes) {T messageObj = JsonUtils.parseObject(message.getBody(), messageType);try {consumeMessageBefore(messageObj);// 消费消息this.onMessage(messageObj);} finally {consumeMessageAfter(messageObj);}}/*** 处理消息** @param message 消息*/public abstract void onMessage(T message);/*** 通过解析类上的泛型,获得消息类型** @return 消息类型*/@SuppressWarnings("unchecked")private Class<T> getMessageClass() {Type type = TypeUtil.getTypeArgument(getClass(), 0);if (type == null) {throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));}return (Class<T>) type;}private void consumeMessageBefore(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 正序interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));}private void consumeMessageAfter(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 倒序for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).consumeMessageAfter(message);}}
StringTextChannelConsumerListener自定义监听器实现类
@Slf4j
@Component
public class StringTextChannelConsumerListener extends AbstractRedisChannelMessageListener<StringTextQueryChannelMessage> {@Resourceprivate StringTextService stringTextService;/*** 在 StringTextChannelConsumerListener 实例化时(由 Spring 容器管理)* 会调用父类 AbstractRedisChannelMessageListener 的构造函数* 当 Redis 通道 "string.text.query.channel" 收到消息时* Spring Data Redis 调用 StringTextChannelConsumerListener 的 onMessage(Message, byte[]) 方法(继承自父类)* @param message 消息*/@Overridepublic void onMessage(StringTextQueryChannelMessage message) {log.info("[onMessage][Channel 消息: {},线程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Channel 处理结果: {}]", pageResult);}
}
监听器执行逻辑:
- Spring Data Redis 的调用:
- 当通道有消息时,Spring Data Redis 调用 MessageListener 的 onMessage(Message, byte[]),即 AbstractRedisChannelMessageListener 的实现。
- 父类到子类的调用:
- 父类的 onMessage(Message, byte[]) 处理消息反序列化和拦截器逻辑,然后通过 this.onMessage(T) 调用子类的 onMessage(StringTextQueryChannelMessage)。
- 子类没有直接调用父类:
- 子类的 onMessage 是父类调用的结果,不是子类主动调用父类。
- 子类的 onMessage 只处理业务逻辑,依赖父类的预处理。
- 构造函数的作用:
- 初始化 messageType 和 channel,确保消息反序列化和通道订阅正确,直接影响子类 onMessage 的执行。
这种设计通过模板方法模式实现了通用逻辑和业务逻辑的分离,父类控制流程,子类提供具体实现。
2.4 配置类
YudaoRabbitMQAutoConfiguration
/*** RabbitMQ 消息队列配置类* 配置RabbitMQ的消息转换器,使用Jackson进行JSON序列化* 当生产者发送消息时,将Java对象转换为JSON格式的字节数组* 当消费者接收消息时,将JSON格式的消息转换回Java对象*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class YudaoRabbitMQAutoConfiguration {/*** Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息*/@Beanpublic MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}
YudaoRabbitMQAutoConfiguration:配置 RabbitMQ 消息转换器,使用 Jackson 进行 JSON 序列化。
YudaoRedisMQConsumerAutoConfiguration
/*** Redis 消息队列 Consumer 配置类* redisMessageListenerContainer(): 创建Pub/Sub消息监听容器* redisStreamMessageListenerContainer(): 创建Stream消息监听容器* redisPendingMessageResendJob(): 创建待处理消息重发任务*/
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {/*** 创建 Redis Pub/Sub 广播消费的容器*/@Bean@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听public RedisMessageListenerContainer redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {// 创建 RedisMessageListenerContainer 对象RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置 RedisConnection 工厂。container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());// 添加监听器listeners.forEach(listener -> {listener.setRedisMQTemplate(redisMQTemplate);container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",listener.getChannel(), listener.getClass().getName());});return container;}/*** 创建 Redis Stream 重新消费的任务*/@Bean@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,RedisMQTemplate redisTemplate,@Value("${spring.application.name}") String groupName,RedissonClient redissonClient) {return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);}/*** 创建 Redis Stream 集群消费的容器** 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>*/@Bean(initMethod = "start", destroyMethod = "stop")@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();checkRedisVersion(redisTemplate);// 第一步,创建 StreamMessageListenerContainer 容器// 创建 options 配置StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10) // 一次性最多拉取多少条消息.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化.build();// 创建 container 对象StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);// 第二步,注册监听器,消费对应的 Stream 主题String consumerName = buildConsumerName();listeners.parallelStream().forEach(listener -> {log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",listener.getStreamKey(), listener.getClass().getName());// 创建 listener 对应的消费者分组try {redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());} catch (Exception ignore) {}// 设置 listener 对应的 redisTemplatelistener.setRedisMQTemplate(redisMQTemplate);// 创建 Consumer 对象Consumer consumer = Consumer.from(listener.getGroup(), consumerName);// 设置 Consumer 消费进度,以最小消费进度为准StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());// 设置 Consumer 监听StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false) // 不自动 ack.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 falsecontainer.register(builder.build(), listener);log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",listener.getStreamKey(), listener.getClass().getName());});return container;}/*** 构建消费者名字,使用本地 IP + 进程编号的方式。* 参考自 RocketMQ clientId 的实现** @return 消费者名字*/private static String buildConsumerName() {return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());}/*** 校验 Redis 版本号,是否满足最低的版本号要求!*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 获得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校验最低版本必须大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));}}}
方法 1: redisMessageListenerContainer
- 作用:
- 创建并配置 RedisMessageListenerContainer,用于监听 Redis Pub/Sub 通道消息。
- 注册所有 AbstractRedisChannelMessageListener 子类的监听器(如 StringTextChannelConsumerListener)。
方法 2: redisPendingMessageResendJob
作用:
- 创建 RedisPendingMessageResendJob Bean,定时重发 Redis Stream 的待处理(pending)消息。
方法 3: redisStreamMessageListenerContainer
- 作用:
- 创建并配置 StreamMessageListenerContainer,用于监听 Redis Stream 消息。
- 注册所有 AbstractRedisStreamMessageListener 子类的监听器。
- 输入参数:
- RedisMQTemplate redisMQTemplate:提供 Redis 连接和拦截器。
- List<AbstractRedisStreamMessageListener<?>> listeners:所有 Stream 监听器。
- 输出:StreamMessageListenerContainer<String, ObjectRecord<String, String>>,用于 Stream 消息消费。
- 实现逻辑:
- 检查 Redis 版本:
- checkRedisVersion(redisTemplate); 确保 Redis 版本 ≥ 5.0.0(支持 Stream)。
- 创建容器:
- 配置 containerOptions:
- batchSize(10):每次拉取最多 10 条消息。
- targetType(String.class):消息体为 String 类型(JSON 字符串),由监听器反序列化。
- 创建 StreamMessageListenerContainer。
- 配置 containerOptions:
- 注册监听器:
- 使用 parallelStream 并行处理监听器,提高效率。
- 对每个监听器:
- 创建消费者组(createGroup)。
- 注入 redisMQTemplate。
- 创建 Consumer(组名 + 消费者名)。
- 设置 StreamOffset(从最后消费位置读取)。
- 注册监听器,配置手动 ACK 和错误处理。
- 日志记录注册过程。
- 检查 Redis 版本:
2.5 RedisPendingMessageResendJob(定时任务类)
/*** 这个任务用于处理,crash 之后的消费者未消费完的消息* 重新分发信息(清理员)*/
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {private static final String LOCK_KEY = "redis:pending:msg:lock";/*** 消息超时时间,默认 5 分钟** 1. 超时的消息才会被重新投递* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到*/private static final int EXPIRE_TIME = 5 * 60;private final List<AbstractRedisStreamMessageListener<?>> listeners;private final RedisMQTemplate redisTemplate;private final String groupName;private final RedissonClient redissonClient;/*** 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题*/@Scheduled(cron = "35 * * * * ?")public void messageResend() {RLock lock = redissonClient.getLock(LOCK_KEY);// 尝试加锁if (lock.tryLock()) {try {execute();} catch (Exception ex) {log.error("[messageResend][执行异常]", ex);} finally {lock.unlock();}}}/*** 执行清理逻辑** @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>*/private void execute() {//redisTemplate.getRedisTemplate().opsForStream() 获取 Redis Stream 的操作接口,用于执行 pending、range、add、acknowledge 等操作。StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();//检查每条传送带listeners.forEach(listener -> {//ops.pending(listener.getStreamKey(), groupName):查询 Stream 和消费者组的 pending 消息概况。PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));// 每个消费者的 pending 队列消息数量(检查每个传送带(Stream)上有哪些卡住的包裹(pending 消息))Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);// 每个消费者的 pending消息的详情信息,getStreamKey():返回 Stream 名称(如 "string.text.query.stream"),pendingMessagesPerConsumer:Map,键是消费者名称,值是 pending 消息数量PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);if (pendingMessages.isEmpty()) {return;}pendingMessages.forEach(pendingMessage -> {// 获取消息上一次传递到 consumer 的时间,long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();if (lastDelivery < EXPIRE_TIME){return;}// 获取指定 id 的消息体(对于超时的包裹,清理员去传送带上找到它的具体内容(records),比如包裹里装了啥(JSON 数据)。如果没找到(比如包裹被删了),就跳过)List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));if (CollUtil.isEmpty(records)) {return;}// 重新投递消息redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().ofObject(records.get(0).getValue()) // 设置内容.withStreamKey(listener.getStreamKey()));// ack 消息消费完成redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());});});});}
}
- RedisPendingMessageResendJob 是一个定时任务类,专门用于处理 Redis Stream 消息队列中因消费者异常(如崩溃)而未确认(ACK)的待处理(pending)消息。
- 它通过定期扫描消费者组的 pending 消息,检查消息是否超时(默认 5 分钟),并将超时的消息重新投递到 Stream,同时确认(ACK)原消息,以避免消息丢失或重复处理。
3. 总结
3.1 整体背景
想象你在一家快递公司(你的系统)工作,Redis 消息队列就像公司的两条传送带:
- Pub/Sub 传送带(广播模式):像广播电台,包裹(消息)发出去,所有听众(消费者)都能收到,但包裹不会保存。
- Stream 传送带(集群模式):像物流流水线,包裹有编号(消息 ID),可以保存、追踪,多个快递员(消费者)分工处理。
实现代码CurdTestController 负责把包裹放上传送带,框架类代码负责管理传送带、派送包裹、处理异常(比如包裹卡住)。下面,我会先介绍每个角色的作用(类分析),然后讲一个包裹从发出到送达的故事(全过程)。
3.2 类结构与作用分析
3.2.1 框架的 Redis MQ 相关类
- YudaoRedisMQProducerAutoConfiguration
- 作用:像快递公司的“生产设备管理员”,负责初始化消息发送工具(RedisMQTemplate)。
- 职责:
- 创建 RedisMQTemplate Bean,注入 StringRedisTemplate 和拦截器(RedisMessageInterceptor)。
- 配置拦截器,增强消息发送的扩展性(如日志、租户处理)。
- 关键方法:
- redisMQTemplate:构造 RedisMQTemplate,添加拦截器。
- 使用场景:Spring 启动时,自动配置 RedisMQTemplate,供生产者(如 StringTextRedisProducer)使用。
- YudaoRedisMQConsumerAutoConfiguration
- 作用:像“派送中心管理员”,负责配置 Pub/Sub 和 Stream 的消息监听容器,以及异常包裹清理任务。
- 职责:
- 配置 RedisMessageListenerContainer:监听 Pub/Sub 通道(如 string-text-query-channel)。
- 配置 StreamMessageListenerContainer:监听 Stream 队列(如 string-text-query-stream)。
- 创建 RedisPendingMessageResendJob:定时清理 Stream 的待处理(pending)包裹。
- 检查 Redis 版本,确保支持 Stream(≥5.0.0)。
- 生成消费者名称(如 192.168.1.1@1234)。
- 关键方法:
- redisMessageListenerContainer:注册 Pub/Sub 监听器。
- redisStreamMessageListenerContainer:注册 Stream 监听器,设置批处理(10 条/次)、手动 ACK。
- redisPendingMessageResendJob:创建定时任务。
- buildConsumerName:生成唯一消费者名称。
- checkRedisVersion:验证 Redis 版本。
- 使用场景:Spring 启动时,自动配置消费者容器,监听消息并处理异常。
- RedisMQTemplate
- 作用:像“传送带操作员”,负责把包裹放上 Pub/Sub 或 Stream 传送带,并支持拦截器扩展。
- 职责:
- 发送 Pub/Sub 消息:convertAndSend 到指定通道。
- 发送 Stream 消息:opsForStream().add 到指定 Stream,返回消息 ID。
- 执行拦截器:发送前后调用 sendMessageBefore/sendMessageAfter。
- 提供 opsForStream() 等接口,供定时任务(如 RedisPendingMessageResendJob)操作 Stream。
- 关键方法:
- send(T extends AbstractRedisChannelMessage):发送 Pub/Sub 消息。
- send(T extends AbstractRedisStreamMessage):发送 Stream 消息。
- addInterceptor:添加拦截器。
- 使用场景:被生产者(如 StringTextRedisProducer)调用发送消息,被定时任务调用操作 Stream。
- RedisMessageInterceptor
- 作用:像“包裹检查员”,在消息发送或消费前后进行额外处理(如日志、租户隔离)。
- 职责:
- 定义接口:sendMessageBefore/After、consumeMessageBefore/After。
- 默认空实现,允许自定义扩展。
- 使用场景:由 RedisMQTemplate 在消息发送/消费时调用,典型用于多租户场景或日志记录。
- RedisPendingMessageResendJob
- 作用:像“包裹清理员”,定时(每分钟 35 秒)检查 Stream 传送带上卡住的包裹(pending 消息),重新投递超时的包裹。
- 职责:
- 使用分布式锁(RedissonClient)确保单一实例执行。
- 扫描每个 Stream 的 pending 消息,检查超时(5 分钟)。
- 重新投递超时消息,确认(ACK)原消息。
- 关键方法:
- messageResend:定时任务入口,加锁调用 execute。
- execute:扫描 pending 消息,重新投递并 ACK。
- 使用场景:消费者崩溃未确认消息时,定时重发确保消息不丢失。
- AbstractRedisMessage
- 作用:像“包裹模板”,定义消息的基本结构。
- 职责:
- 提供 headers(键值对),存储元数据(如租户 ID)。
- 抽象基类,供具体消息类继承。
- 使用场景:被 AbstractRedisChannelMessage 和 AbstractRedisStreamMessage 继承。
- AbstractRedisChannelMessage
- 作用:像“Pub/Sub 包裹模板”,定义 Pub/Sub 消息的结构。
- 职责:
- 提供 getChannel(),默认返回类名,子类可自定义通道(如 string-text-query-channel)。
- 忽略序列化通道名(@JsonIgnore)。
- 使用场景:被你的 StringTextQueryChannelMessage 继承。
- AbstractRedisStreamMessage
- 作用:像“Stream 包裹模板”,定义 Stream 消息的结构。
- 职责:
- 提供 getStreamKey(),默认返回类名,子类可自定义 Stream(如 string-text-query-stream)。
- 忽略序列化 Stream 键。
- 使用场景:被你的 StringTextQueryStreamMessage 继承。
- AbstractRedisChannelMessageListener<T>
- 作用:像“Pub/Sub 快递员”,监听 Pub/Sub 通道,处理消息。
- 职责:
- 自动获取消息类型(messageType)和通道(channel)。
- 反序列化消息(JsonUtils.parseObject)。
- 执行拦截器(consumeMessageBefore/After)。
- 调用子类的 onMessage 处理消息。
- 关键方法:
- onMessage(Message, byte[]):处理原始 Redis 消息。
- onMessage(T):抽象方法,子类实现具体逻辑。
- 使用场景:被你的 StringTextChannelConsumerListener 继承。
- AbstractRedisStreamMessageListener<T>
- 作用:像“Stream 快递员”,监听 Stream 队列,处理消息。
- 职责:
- 自动获取消息类型(messageType)、Stream 键(streamKey)、消费者组(group)。
- 反序列化消息,执行拦截器,调用子类 onMessage。
- 手动 ACK 消息(opsForStream().acknowledge)。
- 关键方法:
- onMessage(ObjectRecord):处理 Stream 消息。
- onMessage(T):抽象方法,子类实现。
- 使用场景:被你的 StringTextStreamConsumerListener 继承。
3.2.2 实现定义类
- CurdTestController
- 作用:像“客户服务中心”,接收用户请求,触发消息发送或异步查询。
- 职责:
- 提供 REST 接口:
- /getStringText:异步查询(StringTextService.getUserPageAsync)。
- /getStringTextByMQ:触发 MQ 消息(未实现具体生产者)。
- /getStringTextByStream:发送 Stream 消息(StringTextRedisProducer.sendStreamMessage)。
- /getStringTextByChannel:发送 Pub/Sub 消息(StringTextRedisProducer.sendChannelMessage)。
- 记录日志,处理异常。
- 提供 REST 接口:
- 使用场景:用户通过 HTTP 请求触发消息队列或异步处理。
- StringTextRedisProducer
- 作用:像“包裹打包员”,创建并发送 Pub/Sub 和 Stream 消息。
- 职责:
- sendStreamMessage:发送 StringTextQueryStreamMessage 到 string-text-query-stream。
- sendChannelMessage:发送 StringTextQueryChannelMessage 到 string-text-query-channel。
- 使用 RedisMQTemplate 发送消息,记录日志。
- 使用场景:被 CurdTestController 调用,发送查询请求到消息队列。
- StringTextChannelConsumerListener
- 作用:像“Pub/Sub 派送员”,监听 string-text-query-channel,处理消息。
- 职责:
- 接收 StringTextQueryChannelMessage,转换为 StringTextPageReqVO。
- 调用 StringTextService.getUserPage 查询数据。
- 记录处理日志。
- 使用场景:处理 Pub/Sub 广播消息,适合实时通知场景。
- StringTextStreamConsumerListener
- 作用:像“Stream 派送员”,监听 string-text-query-stream,处理消息。
- 职责:
- 接收 StringTextQueryStreamMessage,转换为 StringTextPageReqVO。
- 调用 StringTextService.getUserPage 查询数据。
- 手动 ACK 消息,记录日志。
- 使用场景:处理 Stream 集群消息,适合持久化、可靠投递场景。
- StringTextQueryChannelMessage
- 作用:像“Pub/Sub 包裹”,定义 Pub/Sub 消息结构。
- 职责:
- 包含 text、pageNo、pageSize,校验非空。
- 指定通道 string-text-query-channel。
- 使用场景:由 StringTextRedisProducer 发送,StringTextChannelConsumerListener 消费。
- StringTextQueryStreamMessage
- 作用:像“Stream 包裹”,定义 Stream 消息结构。
- 职责:
- 包含 text、pageNo、pageSize,校验非空。
- 指定 Stream string-text-query-stream。
- 使用场景:由 StringTextRedisProducer 发送,StringTextStreamConsumerListener 消费。
- StringTextServiceImpl
- 作用:像“数据仓库”,提供数据查询服务。
- 职责:
- getUserPage:同步查询分页数据。
- getUserPageAsync:异步查询,使用线程池(curdAsyncExecutor)。
- 调用 StringTextMapper 访问数据库。
- 使用场景:被控制器和消费者调用,处理查询逻辑。
3.3 文字图总结
3.3.1 图示总结
YudaoRedisMQProducerAutoConfiguration
└─ redisMQTemplate(StringRedisTemplate redisTemplate, List<RedisMessageInterceptor> interceptors)│ │ // 初始化消息发送工具,注入 Redis 模板和拦截器(默认空)│ ├─ Parameters:│ ├─ redisTemplate: StringRedisTemplate // Redis 操作模板│ └─ interceptors: List<RedisMessageInterceptor> // 拦截器列表(默认空)│ └─ Returns: RedisMQTemplate│ └─ RedisMQTemplate(redisTemplate)│ └─ addInterceptor(interceptor) // 添加拦截器(默认空,支持链式调用)
YudaoRedisMQConsumerAutoConfiguration
├─ checkRedisVersion(StringRedisTemplate redisTemplate)
│ │
│ │ // 校验 Redis 版本 ≥ 5.0.0
│ │
│ ├─ Parameters: redisTemplate: StringRedisTemplate
│ └─ Returns: void
│
├─ redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners)
│ │
│ │ // 配置 Pub/Sub 监听容器(基于 Redis 发布订阅模式)
│ │
│ ├─ Parameters:
│ │ ├─ redisMQTemplate: RedisMQTemplate
│ │ └─ listeners: List<AbstractRedisChannelMessageListener<?>>
│ │ // 包含 StringTextChannelConsumerListener 实现类
│ │
│ └─ Returns: RedisMessageListenerContainer
│ │
│ └─ addMessageListener(StringTextChannelConsumerListener listener, ChannelTopic("string-text-query-channel"))
│ │
│ │ // 绑定监听器到指定通道(channel)
│ │
│ ├─ Parameters:
│ │ ├─ listener: StringTextChannelConsumerListener
│ │ └─ topic: ChannelTopic("string-text-query-channel")
│ │ // 通道名称固定为 string-text-query-channel
│ │
│ └─ StringTextChannelConsumerListener.setRedisMQTemplate(redisMQTemplate)
│ // 注入消息发送模板,用于消费时的响应逻辑
│
├─ redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners)
│ │
│ │ // 配置 Stream 监听容器(基于 Redis Stream 数据结构)
│ │
│ ├─ Parameters:
│ │ ├─ redisMQTemplate: RedisMQTemplate
│ │ └─ listeners: List<AbstractRedisStreamMessageListener<?>>
│ │ // 包含 StringTextStreamConsumerListener 实现类
│ │
│ └─ Returns: StreamMessageListenerContainer
│ │
│ └─ register(StreamReadRequest("string-text-query-stream", Consumer("my-app", "192.168.1.1@1234"), StringTextStreamConsumerListener listener))
│ │
│ │ // 绑定监听器到指定 Stream(支持消费者组模式)
│ │
│ ├─ Parameters:
│ │ ├─ request: StreamReadRequest
│ │ │ // StreamKey: string-text-query-stream
│ │ │ // Consumer: my-app@192.168.1.1@1234(消费者组名+实例标识)
│ │ └─ listener: StringTextStreamConsumerListener
│ │
│ ├─ redisTemplate.opsForStream().createGroup("string-text-query-stream", "my-app")
│ │ // 自动创建消费者组(若不存在),组名与应用名一致
│ │
│ └─ StringTextStreamConsumerListener.setRedisMQTemplate(redisMQTemplate)
│ // 注入消息发送模板,用于消费时的响应逻辑
│
└─ redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient)│ │ // 创建定时任务,处理 Stream 中未确认的 pending 消息│ // 防止消息堆积,保证至少一次投递语义│ ├─ Parameters:│ ├─ listeners: List<AbstractRedisStreamMessageListener<?>> │ │ // 包含 StringTextStreamConsumerListener 实现类│ ├─ redisTemplate: RedisMQTemplate│ ├─ groupName: String // 默认使用 spring.application.name("my-app")│ └─ redissonClient: RedissonClient // 分布式锁,保证任务单点执行│ └─ Returns: RedisPendingMessageResendJob// 定时任务会定期扫描 pending 消息并重新投递
通过 Stream 发送消息的流程
CurdTestController
└─ getStringTextByStream(StringTextPageReqVO pageReqVO)│ │ // 处理 Stream 消息请求│ Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│ Returns: "Stream 消息已发送,异步处理中"│ └─ StringTextRedisProducer.sendStreamMessage(text, pageNo, pageSize)│ │ // 发送 Stream 消息(参数:example, 1, 10)│ Returns: void│ └─ RedisMQTemplate.send(StringTextQueryStreamMessage message)│ │ // 发送到 Stream(消息内容包含 text/pageNo/pageSize)│ Returns: RecordId(如 "1234567890-0")│ ├─ sendMessageBefore(message)│ // 调用消息发送前的拦截器(默认无操作)│ ├─ JsonUtils.toJsonString(message)│ // 序列化消息为 JSON│ Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│ ├─ redisTemplate.opsForStream().add(StreamRecords)│ // 添加到 Stream(StreamKey: string-text-query-stream)│ Parameters: JSON 字符串 + StreamKey│ Returns: RecordId(消息唯一标识)│ └─ sendMessageAfter(message)// 调用消息发送后的拦截器(默认无操作)
StreamMessageListenerContainer
└─ StringTextStreamConsumerListener.onMessage(ObjectRecord<String, String> message)│ │ // 处理 Stream 消息(参数:包含 StreamKey 和 JSON 内容的记录)│ Returns: void│ ├─ JsonUtils.parseObject(message.getValue(), StringTextQueryStreamMessage.class)│ // 反序列化 JSON 为消息对象│ Returns: {text="example", pageNo=1, pageSize=10}│ ├─ consumeMessageBefore(messageObj)│ // 调用消息消费前的拦截器(默认无操作)│ ├─ onMessage(StringTextQueryStreamMessage message)│ // 子类自定义处理逻辑(由业务实现)│ Parameters: 反序列化后的消息对象│ Returns: void│ │ └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│ │ │ │ // 业务逻辑:查询数据│ │ Parameters: {text="example", pageNo=1, pageSize=10}│ │ Returns: 分页结果(包含查询到的数据)│ │ │ └─ StringTextMapper.selectPage(reqVO)│ // 数据库层查询│ Returns: 从数据库获取的分页结果│ ├─ redisMQTemplate.getRedisTemplate().opsForStream().acknowledge("my-app", message)│ // 关键步骤:确认消息已消费│ Parameters: 消费者组名 + 消息记录│ Returns: void│ (若不确认,消息会被视为 pending 并由定时任务重新投递)│ └─ consumeMessageAfter(messageObj)// 调用消息消费后的拦截器(默认无操作)
通过 Pub/Sub 发送消息的流程
CurdTestController
└─ getStringTextByChannel(StringTextPageReqVO pageReqVO)│ │ // 处理 Pub/Sub 消息请求│ Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│ Returns: "Channel 消息已发送,异步处理中"│ └─ StringTextRedisProducer.sendChannelMessage(text, pageNo, pageSize)│ │ // 发送 Pub/Sub 消息(参数:example, 1, 10)│ Returns: void│ └─ RedisMQTemplate.send(StringTextQueryChannelMessage message)│ │ // 发送到通道(channel: string-text-query-channel)│ Returns: void│ ├─ sendMessageBefore(message)│ // 调用消息发送前的拦截器(默认无操作)│ ├─ JsonUtils.toJsonString(message)│ // 序列化消息为 JSON│ Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│ ├─ redisTemplate.convertAndSend("string-text-query-channel", json)│ // 发布到 Redis 通道│ Parameters: 通道名 + JSON 字符串│ Returns: void(无返回值,Fire-and-Forget 模式)│ └─ sendMessageAfter(message)// 调用消息发送后的拦截器(默认无操作)
RedisMessageListenerContainer
└─ StringTextChannelConsumerListener.onMessage(Message message, byte[] bytes)│ │ // 处理 Pub/Sub 消息(参数:消息对象+通道字节数组)│ Returns: void(无返回值,Fire-and-Forget 模式)│ ├─ JsonUtils.parseObject(message.getBody(), StringTextQueryChannelMessage.class)│ // 反序列化 JSON 为消息对象│ Returns: {text="example", pageNo=1, pageSize=10}│ ├─ consumeMessageBefore(messageObj)│ // 调用消息消费前的拦截器(默认无操作)│ ├─ onMessage(StringTextQueryChannelMessage message)│ // 子类自定义处理逻辑(由业务实现)│ Parameters: 反序列化后的消息对象│ Returns: void│ │ └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│ │ │ │ // 业务逻辑:查询数据│ │ Parameters: {text="example", pageNo=1, pageSize=10}│ │ Returns: 分页结果(包含查询到的数据)│ │ │ └─ StringTextMapper.selectPage(reqVO)│ // 数据库层查询│ Returns: 从数据库获取的分页结果│ └─ consumeMessageAfter(messageObj)// 调用消息消费后的拦截器(默认无操作)
RedisPendingMessageResendJob 定时任务流程
3.3.2 类作用总结
- 框架类:
- YudaoRedisMQProducerAutoConfiguration:初始化生产工具。
- YudaoRedisMQConsumerAutoConfiguration:配置消费容器和定时任务。
- RedisMQTemplate:发送消息,操作 Stream.
- RedisMessageInterceptor:扩展点(未使用)。
- RedisPendingMessageResendJob:重发 pending 消息。
- AbstractRedisMessage:消息基类。
- AbstractRedisChannelMessage:Pub/Sub 消息模板。
- AbstractRedisStreamMessage:Stream 消息模板。
- AbstractRedisChannelMessageListener:Pub/Sub 消费者基类。
- AbstractRedisStreamMessageListener:Stream 消费者基类。
- 具体业务实现类:
- CurdTestController:触发消息发送。
- StringTextRedisProducer:发送消息。
- StringTextChannelConsumerListener:处理 Pub/Sub 消息。
- StringTextStreamConsumerListener:处理 Stream 消息。
- StringTextQueryChannelMessage:Pub/Sub 消息。
- StringTextQueryStreamMessage:Stream 消息。
- StringTextServiceImpl:查询数据。