java每日精进 6.11【消息队列】

1.内存级Spring_Event

1.1 控制器层:StringTextController

/*** 字符串文本管理控制器* 提供通过消息队列异步获取文本信息的接口*/
@RestController
@RequestMapping("/api/string-text")
public class StringTextController {@Resourceprivate StringTextProducer stringTextProducer;/*** 通过消息队列异步查询字符串文本信息* * 流程:* 1. 接收前端查询参数* 2. 封装为消息对象发送到消息队列* 3. 立即返回"消息已发送"响应,不等待实际处理结果* * 优点:* - 避免长耗时查询阻塞HTTP连接* - 支持水平扩展处理能力* * @param pageReqVO 分页查询参数* @return 统一响应结果(仅包含消息发送状态)*/@GetMapping("/getStringTextByMQ")@Operation(summary = "通过消息队列获取test信息")public CommonResult<String> getStringTextByMQ(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到MQ请求,线程名: {}", Thread.currentThread().getName());try {// 发送消息到队列(异步处理)stringTextProducer.sendStringTextQueryMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("消息已发送,异步处理中");} catch (Exception e) {log.error("消息发送异常", e);return CommonResult.error(500, "消息发送失败");}}
}

1.2 消息生产者:StringTextProducer

/*** 字符串文本查询消息生产者* 负责将查询请求封装为消息并发送到消息队列* * 设计模式:* - 使用Spring事件机制作为轻量级消息队列* - 可无缝切换为Kafka/RocketMQ等真正的MQ系统*/
@Slf4j
@Component
public class StringTextProducer {@Resourceprivate ApplicationContext applicationContext;/*** 发送字符串查询消息* * @param text 查询文本(用于模糊匹配)* @param pageNo 页码* @param pageSize 每页大小* * 消息传递机制:* 1. 创建StringTextQueryMessage对象* 2. 通过Spring事件发布器将消息广播到事件总线* 3. 由StringTextConsumer监听并处理该消息*/public void sendStringTextQueryMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryMessage message = new StringTextQueryMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);log.info("[sendStringTextQueryMessage][发送消息: {}]", message);// 使用Spring事件机制发布消息// 注意:此处使用同步事件,通过@Async注解在Consumer端实现异步applicationContext.publishEvent(message);}
}

1.3 消息消费者:StringTextConsumer

/*** 字符串文本查询消息消费者* 负责从消息队列接收查询请求并执行实际查询操作* * 线程模型:* - 使用@Async注解指定专用线程池"curdAsyncExecutor"* - 实现请求处理与HTTP请求线程分离*/
@Slf4j
@Component
public class StringTextConsumer {@Resourceprivate StringTextService stringTextService;/*** 处理字符串查询消息* * @param message 查询消息对象* * 执行流程:* 1. 从消息中提取查询参数* 2. 调用Service层执行实际查询* 3. 记录查询结果(可扩展为保存到结果表或通知前端)*/@EventListener@Async("curdAsyncExecutor") // 使用专用异步线程池public void onMessage(StringTextQueryMessage message) {log.info("[onMessage][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());// 1. 将消息转换为请求对象StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());// 2. 执行实际查询(可能是数据库或外部系统)PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][处理结果: {}]", pageResult);// 3. 可扩展逻辑:// - 将结果保存到临时表// - 推送WebSocket通知前端结果就绪// - 触发后续处理流程}
}

1.4 消息模型:StringTextQueryMessage

/*** 字符串文本查询消息* 用于在生产者和消费者之间传递查询参数* * 设计特点:* - 实现Serializable接口以便于消息传输* - 使用JSR-303注解进行参数校验* - 字段与StringTextPageReqVO保持语义一致*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StringTextQueryMessage implements Serializable {private static final long serialVersionUID = 20230610L;/*** 查询的文本内容(用于模糊查询)*/private String text;/*** 页码(从1开始)*/@NotNull(message = "页码不能为空")private Integer pageNo;/*** 每页大小*/@NotNull(message = "每页大小不能为空")private Integer pageSize;
}

客户端 → HTTP请求 → StringTextController → StringTextProducer → 消息队列 → StringTextConsumer → StringTextService → 数据库查询

1.5 问题提出

1.5.1 生产者如何找到消费者?

  • StringTextProducer 通过 applicationContext.publishEvent 发布 StringTextQueryMessage 事件。
  • Spring 容器根据事件类型(StringTextQueryMessage)将事件分发给 StringTextConsumer 的 @EventListener 方法。
  • 匹配基于 Java 类型系统,参数类型必须兼容事件类型。


1.5.2 假如添加多个新的接收参数为StringTextQueryMessage的消费者,还有几个是接收的参数是StringTextQueryMessage及其子类,会全部接收到还是会匹配精度最高的?

  • 事件类型匹配
    • Spring 根据事件对象的 运行时类型(StringTextQueryMessage 或其子类)查找所有 @EventListener 方法,检查其参数类型是否与事件类型兼容。
    • 兼容的规则是:监听方法的参数类型可以是事件类型的 类本身超类接口
    • 例如,如果发布的事件是 StringTextQueryMessage,所有参数类型为 StringTextQueryMessage、其超类(如 Object)或接口的 @EventListener 方法都会被调用。
  • 多消费者行为
    • Spring Event 不基于“精度最高”选择单个消费者,而是 广播 事件给所有匹配的监听器。
    • 如果有多个消费者监听 StringTextQueryMessage 或其子类,Spring 会按顺序调用所有匹配的 @EventListener 方法。
  • 执行顺序
    • 默认情况下,Spring 不保证 @EventListener 方法的调用顺序。
    • 可以通过 @Order 注解或实现 Ordered 接口指定执行顺序(较低的 order 值优先执行)。
  • 异步性
    • 如果 @EventListener 方法标注了 @Async(如您的 StringTextConsumer),每个消费者的处理会在异步线程中执行,互不阻塞。

1.5.3以上情况能不能指定一个consumer匹配?

方法 1:使用条件注解(@EventListener(condition = ...))
  • 在 @EventListener 中添加 condition 属性,通过 SpEL(Spring Expression Language)过滤事件。
  • 示例:修改 StringTextConsumer1 和 StringTextConsumer2,添加条件:
  • @EventListener(condition = "#message.text == 'specific'")

  • 如果 message.text == "specific",只有 Consumer1 处理。

方法 2:使用自定义事件路由
  • 引入一个事件路由机制,通过事件对象的额外属性指定目标消费者。
  • 修改 StringTextQueryMessage,添加 consumerId 字段:
  • @Data

    public class StringTextQueryMessage {

        private String text;

        @NotNull(message = "页码不能为空")

        private Integer pageNo;

        @NotNull(message = "每页大小不能为空")

        private Integer pageSize;

        private String consumerId; // 新增:指定目标消费者

    }

  • @Slf4j

    @Component

    public class StringTextConsumer1 {

        @Resource

        private StringTextService stringTextService;

        @EventListener

        @Async("curdAsyncExecutor")

        public void onMessage(StringTextQueryMessage message) {

            if (!"consumer1".equals(message.getConsumerId())) {

                return; // 忽略不匹配的 consumerId

            }

            log.info("[Consumer1][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());

            StringTextPageReqVO reqVO = new StringTextPageReqVO();

            reqVO.setText(message.getText());

            reqVO.setPageNo(message.getPageNo());

            reqVO.setPageSize(message.getPageSize());

            PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);

            log.info("[Consumer1][处理结果: {}]", pageResult);

        }

    }

    @Slf4j

    @Component

    public class StringTextConsumer2 {

        @EventListener

        @Async("curdAsyncExecutor")

        public void onMessage(StringTextQueryMessage message) {

            if (!"consumer2".equals(message.getConsumerId())) {

                return;

            }

            log.info("[Consumer2][接收消息: {},线程名: {}]", message, Thread.currentThread().getName());

        }

    }

方法 3:自定义事件分发器
  • 重写 Spring 的事件分发逻辑,使用自定义 ApplicationEventMulticaster:
    • 创建自定义 ApplicationEventMulticaster:
    •     @Component("applicationEventMulticaster")

          public class CustomEventMulticaster extends SimpleApplicationEventMulticaster {

              @Override

              public void multicastEvent(ApplicationEvent event) {

                  if (event instanceof StringTextQueryMessage) {

                      StringTextQueryMessage message = (StringTextQueryMessage) event;

                      // 根据条件选择特定消费者,例如 consumerId

                      getApplicationListeners(event).stream()

                              .filter(listener -> {

                                  // 假设 listener 的方法名或类名包含 consumerId

                                  return listener.getListenerId().contains(message.getConsumerId());

                              })

                              .forEach(listener -> listener.onApplicationEvent(event));

                  } else {

                      super.multicastEvent(event);

                  }

              }

          }

执行结果如下:

2025-06-10 10:39:03.349 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor    | [preHandle][开始请求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 参数({text=username})]Controller 方法路径:cn.iocoder.moyun.module.curd.controller.admin.CurdTestController(CurdTestController.java:57)
2025-06-10 10:39:03.355 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.c.admin.CurdTestController     | 控制器接收到MQ请求,线程名: http-nio-48080-exec-7
2025-06-10 10:39:03.355 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.m.c.m.p.sms.StringTextProducer     | [sendStringTextQueryMessage][发送消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10)]
2025-06-10 10:39:03.357 |  INFO 16540 | http-nio-48080-exec-7 [TID: N/A] c.i.m.f.a.c.i.ApiAccessLogInterceptor    | [afterCompletion][完成请求 URL(/admin-api/curd/stringtext/getStringTextByMQ) 耗时(6 ms)]
2025-06-10 10:39:03.358 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer     | [onMessage][接收消息: StringTextQueryMessage(text=username, pageNo=1, pageSize=10),线程名: curd-async-2]
2025-06-10 10:39:03.374 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.s.s.StringTextServiceImpl      | 同步查询结果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"临时用户","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"测试用户","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理员","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)
2025-06-10 10:39:03.375 |  INFO 16540 | curd-async-2 [TID: N/A] c.i.m.m.c.m.c.sms.StringTextConsumer     | [onMessage][处理结果: PageResult(list=[StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-03T10:00, updateTime=null, creator=system, updater=null, deleted=false), tenantId=1), id=3, text={"username":"user002","nickname":"临时用户","password":"xyz789","status":1}, loginIp=192.168.1.102, loginDate=2023-05-03T10:20), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-02T09:00, updateTime=2023-05-03T11:30, creator=system, updater=admin, deleted=false), tenantId=1), id=2, text={"username":"user001","nickname":"测试用户","password":"abc123","email":"test@example.com","mobile":"13800138001"}, loginIp=192.168.1.101, loginDate=2023-05-02T09:15), StringTextDO(super=TenantBaseDO(super=BaseDO(createTime=2023-05-01T08:00, updateTime=2023-05-05T15:00, creator=system, updater=admin, deleted=false), tenantId=1), id=1, text={"username":"admin","nickname":"管理员","password":"123456","dept_id":1001,"post_ids":[101,102]}, loginIp=192.168.1.100, loginDate=2023-05-01T08:30)], total=3)]

2. Redis消息队列

第一部分:Redis完成MQ的详细过程梳理

Yudao项目利用Redis实现了两种消息队列机制:

  • Redis Stream:用于集群消费,消息持久化存储,支持消费者组,适合需要可靠性和消息追踪的场景。
  • Redis Pub/Sub(Channel):用于广播消费,消息不持久化,适合实时性要求高的场景,所有订阅者都会收到消息。

我将模拟一个完整的消息发送和消费过程,分别讲解Stream和Channel的实现,详细说明每个类的作用、注册方式以及它们如何协作。

2.1 Redis Stream消息队列的完整过程

场景:通过/getStringTextByStream接口发送Stream消息

假设用户通过HTTP请求调用/getStringTextByStream接口,传入StringTextPageReqVO对象(包含text、pageNo、pageSize字段)。以下是消息从发送到消费的详细流程:


步骤1:接收HTTP请求(Controller层)
  • 相关类:@GetMapping("/getStringTextByStream")(未显式定义类名,假设为StringTextController)
    • 作用:这是一个Spring MVC控制器方法,负责接收前端的HTTP GET请求,处理通过Redis Stream发送消息的逻辑。
    • 功能
      • 接收StringTextPageReqVO参数,包含分页查询的信息(如text、pageNo、pageSize)。
      • 调用StringTextRedisProducer的sendStreamMessage方法发送消息。
      • 返回CommonResult表示消息已发送,异步处理中。
    • 框架注册
      • 通过Spring的@GetMapping注解,自动注册为一个HTTP端点。
      • 依赖Spring Boot的Web模块,控制器类通常标注@RestController,由Spring容器管理。
    • 与其他类的交互
      • 依赖StringTextRedisProducer来发送Stream消息。
      • 使用StringTextPageReqVO作为请求参数的VO(Value Object)。
  • 代码分析
@GetMapping("/getStringTextByStream")
public CommonResult<String> getStringTextByStream(@Validated @ModelAttribute StringTextPageReqVO pageReqVO) {log.info("控制器接收到 Stream 请求,线程名: {}", Thread.currentThread().getName());try {stringTextRedisProducer.sendStreamMessage(pageReqVO.getText(), pageReqVO.getPageNo(), pageReqVO.getPageSize());return CommonResult.success("Stream 消息已发送,异步处理中");} catch (Exception e) {log.error("Stream 消息发送异常", e);return CommonResult.error(500, "Stream 消息发送失败");}
}
  • 控制器接收到请求后,提取pageReqVO中的字段,传递给StringTextRedisProducer。
  • 异常处理确保即使发送失败,也能返回错误响应。
步骤2:生产者发送Stream消息
  • 相关类:StringTextRedisProducer
    • 作用:这是一个生产者类,负责创建并发送Redis Stream消息。
    • 功能
      • 提供sendStreamMessage方法,构造StringTextQueryStreamMessage对象,设置text、pageNo、pageSize字段。
      • 使用RedisMQTemplate的send方法将消息发送到Redis Stream。
      • 记录日志,跟踪消息发送情况。
    • 框架注册
      • 标注@Component,由Spring容器管理。
      • 通过@Resource注入RedisMQTemplate依赖。
    • 与其他类的交互
      • 依赖RedisMQTemplate执行实际的Redis Stream消息发送。
      • 使用StringTextQueryStreamMessage作为消息载体。
      • 被StringTextController调用。
  • 代码分析
@Component
public class StringTextRedisProducer {@Resourceprivate RedisMQTemplate redisMQTemplate;public void sendStreamMessage(String text, Integer pageNo, Integer pageSize) {StringTextQueryStreamMessage message = new StringTextQueryStreamMessage();message.setText(text);message.setPageNo(pageNo);message.setPageSize(pageSize);redisMQTemplate.send(message);log.info("[sendStreamMessage][发送 Stream 消息: {}]", message);}
}
  • 创建StringTextQueryStreamMessage对象,填充请求参数。
  • 调用redisMQTemplate.send(message),将消息发送到Redis Stream。
  • 相关类:StringTextQueryStreamMessage
    • 作用:Stream消息的载体,定义消息的结构和Stream Key。
    • 功能
      • 继承AbstractRedisStreamMessage,包含text、pageNo、pageSize字段。
      • 重写getStreamKey(),指定Stream的键为"string-text-query-stream"。
    • 框架注册
      • 作为普通Java类,无需Spring注册,直接由StringTextRedisProducer实例化。
    • 与其他类的交互
      • 被StringTextRedisProducer用于封装消息内容。
      • 被StringTextStreamConsumer用于解析消息内容。
  • 代码分析
@Data
public class StringTextQueryStreamMessage extends AbstractRedisStreamMessage {private String text;@NotNull(message = "页码不能为空")private Integer pageNo;@NotNull(message = "每页大小不能为空")private Integer pageSize;@Overridepublic String getStreamKey() {return "string-text-query-stream";}
}
  • 定义消息结构,确保pageNo和pageSize不为空。
  • 指定Stream Key,用于Redis Stream的消息存储。
步骤3:RedisMQTemplate发送消息
  • 相关类:RedisMQTemplate
    • 作用:Redis MQ操作的核心模板类,封装了Redis Stream和Pub/Sub的消息发送逻辑。
    • 功能
      • 提供send方法,针对Stream消息,将消息序列化为JSON并添加到指定的Stream Key。
      • 支持拦截器机制,在消息发送前后调用RedisMessageInterceptor的钩子方法。
      • 使用RedisTemplate执行底层的Redis操作。
    • 框架注册
      • 通过YudaoRedisMQProducerAutoConfiguration类注册为Spring Bean。
      • 依赖StringRedisTemplate和RedisMessageInterceptor列表。
    • 与其他类的交互
      • 被StringTextRedisProducer调用,用于发送Stream消息。
      • 依赖RedisTemplate执行Redis操作。
      • 调用RedisMessageInterceptor处理消息发送前后的扩展逻辑。
      • 被StringTextStreamConsumer用于ACK消息。
  • 代码分析
public class RedisMQTemplate {private final RedisTemplate<String, ?> redisTemplate;private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)).withStreamKey(message.getStreamKey()));} finally {sendMessageAfter(message);}}
}
  • 调用sendMessageBefore执行拦截器前置逻辑。
  • 使用redisTemplate.opsForStream().add将消息添加到Stream,返回RecordId。
  • 调用sendMessageAfter执行拦截器后置逻辑。
  • 相关类:YudaoRedisMQProducerAutoConfiguration
    • 作用:生产者配置类,负责注册RedisMQTemplate。
    • 功能
      • 创建RedisMQTemplate实例,注入StringRedisTemplate和拦截器列表。
      • 确保在YudaoRedisAutoConfiguration之后加载(依赖Redis配置)。
    • 框架注册
      • 标注@AutoConfiguration,由Spring Boot自动加载。
      • 使用@Bean注册RedisMQTemplate。
    • 与其他类的交互
      • 为StringTextRedisProducer和StringTextStreamConsumer提供RedisMQTemplate。
      • 依赖StringRedisTemplate和RedisMessageInterceptor。
  • 代码分析
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {@Beanpublic RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,List<RedisMessageInterceptor> interceptors) {RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);interceptors.forEach(redisMQTemplate::addInterceptor);return redisMQTemplate;}
}

相关类:RedisMessageInterceptor

  • 作用:消息拦截器接口,提供发送和消费消息前后的扩展点。
  • 功能
    • 定义sendMessageBefore、sendMessageAfter、consumeMessageBefore、consumeMessageAfter方法。
    • 适用于多租户场景或其他扩展需求(如日志记录、权限检查)。
  • 框架注册
    • 实现类需标注@Component,由Spring容器管理。
    • 由YudaoRedisMQProducerAutoConfiguration注入到RedisMQTemplate。
  • 与其他类的交互
    • 被RedisMQTemplate调用,用于消息处理扩展。
步骤4:消费者监听和处理Stream消息
  • 相关类:StringTextStreamConsumer
    • 作用:Stream消息的消费者,负责处理收到的StringTextQueryStreamMessage消息。
    • 功能
      • 继承AbstractRedisStreamMessageListener,实现onMessage方法。
      • 将消息转换为StringTextPageReqVO,调用StringTextService执行分页查询。
      • 记录处理结果日志。
    • 框架注册
      • 标注@Component,由Spring容器管理。
      • 通过YudaoRedisMQConsumerAutoConfiguration注册到StreamMessageListenerContainer。
    • 与其他类的交互
      • 依赖StringTextService执行业务逻辑。
      • 依赖RedisMQTemplate进行消息ACK。
      • 接收StringTextQueryStreamMessage消息。
  • 代码分析
@Component
public class StringTextStreamConsumer extends AbstractRedisStreamMessageListener<StringTextQueryStreamMessage> {@Resourceprivate StringTextService stringTextService;@Overridepublic void onMessage(StringTextQueryStreamMessage message) {log.info("[onMessage][Stream 消息: {},线程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Stream 处理结果: {}]", pageResult);}
}
  • 相关类:AbstractRedisStreamMessageListener
    • 作用:Stream消息监听器的抽象基类,封装通用消费逻辑。
    • 功能
      • 实现StreamListener接口,处理Redis Stream消息。
      • 解析消息为指定类型(StringTextQueryStreamMessage)。
      • 调用拦截器前置/后置逻辑,执行消息ACK。
    • 框架注册
      • 作为抽象类,不直接注册,其子类(如StringTextStreamConsumer)注册。
    • 与其他类的交互
      • 被StringTextStreamConsumer继承。
      • 依赖RedisMQTemplate进行ACK。
      • 被YudaoRedisMQConsumerAutoConfiguration用于注册监听器。
  • 相关类:YudaoRedisMQConsumerAutoConfiguration
    • 作用:消费者配置类,负责注册Stream和Pub/Sub的监听容器。
    • 功能
      • 创建StreamMessageListenerContainer,配置Stream监听。
      • 注册StringTextStreamConsumer到容器,绑定Stream Key和消费者组。
      • 创建RedisPendingMessageResendJob处理未消费消息。
    • 框架注册
      • 标注@AutoConfiguration,在YudaoRedisAutoConfiguration之后加载。
      • 使用@Bean注册StreamMessageListenerContainer和RedisPendingMessageResendJob。
    • 与其他类的交互
      • 依赖RedisMQTemplate和StringTextStreamConsumer。
      • 为RedisPendingMessageResendJob提供监听器列表。
  • 代码分析
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {@Beanpublic StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {// 配置容器,注册监听器}
}

相关类:RedisPendingMessageResendJob

  • 作用:定时任务,重新投递Stream中未消费的超时消息。
  • 功能
    • 每分钟检查Pending消息,超时(默认5分钟)后重新投递。
    • 使用分布式锁(Redisson)确保单实例执行。
  • 框架注册
    • 由YudaoRedisMQConsumerAutoConfiguration注册为Bean。
    • 标注@Scheduled启用定时任务。
  • 与其他类的交互
    • 依赖RedisMQTemplate和StringTextStreamConsumer的监听器列表。
    • 使用RedissonClient实现分布式锁。
步骤5:业务逻辑处理
  • 相关类:StringTextService
    • 作用:业务服务层,执行分页查询逻辑。
    • 功能
      • 提供getUserPage方法,根据StringTextPageReqVO查询分页数据。
      • 返回PageResult<StringTextDO>。
    • 框架注册
      • 通常标注@Service,由Spring容器管理。
    • 与其他类的交互
      • 被StringTextStreamConsumer调用。
  • 相关类:StringTextPageReqVO
    • 作用:分页请求的VO,定义查询参数。
    • 功能
      • 包含text、pageNo、pageSize等字段。
      • 继承PageParam,支持分页参数。
    • 框架注册
      • 作为普通Java类,无需注册。
    • 与其他类的交互
      • 被StringTextController接收前端参数。
      • 被StringTextStreamConsumer用于构造查询参数。

第二部分:具体类解析

2.2 RedisMQTemplate类
/*** Redis MQ 操作模板类* 封装了基于Redis的两种消息模型的发送操作:* 1. Pub/Sub模式:发布订阅模式,适合广播消息* 2. Stream模式:消息流模式,适合需要消息顺序和可靠性的场景* 同时提供了消息拦截器机制,允许在消息发送前后执行自定义逻辑*/
@AllArgsConstructor
public class RedisMQTemplate {@Getterprivate final RedisTemplate<String, ?> redisTemplate;/*** 拦截器数组* 用于在消息发送前后执行自定义逻辑,如日志记录、指标收集、分布式追踪等*/@Getterprivate final List<RedisMessageInterceptor> interceptors = new ArrayList<>();/*** 发送 Redis 消息,基于 Redis pub/sub 实现* * @param message 继承自AbstractRedisChannelMessage的消息对象* @return void 无返回值* @功能 基于Redis的Pub/Sub模式发布消息,在发送前后分别执行拦截器的前置和后置处理*       使用JSON序列化消息内容,通过message.getChannel()获取消息发布的频道名称*/public <T extends AbstractRedisChannelMessage> void send(T message) {try {sendMessageBefore(message);// 发送消息redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));} finally {sendMessageAfter(message);}}/*** 发送 Redis 消息,基于 Redis Stream 实现* * @param message 继承自AbstractRedisStreamMessage的消息对象* @return RecordId Redis Stream生成的消息ID,格式如"1686475200000-0"* @功能 基于Redis的Stream模式发送消息,在发送前后分别执行拦截器的前置和后置处理*       使用JSON序列化消息内容,通过message.getStreamKey()获取Stream的键名*/public <T extends AbstractRedisStreamMessage> RecordId send(T message) {try {sendMessageBefore(message);// 发送消息return redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(JsonUtils.toJsonString(message)) // 设置内容.withStreamKey(message.getStreamKey())); // 设置 stream key} finally {sendMessageAfter(message);}}/*** 添加拦截器* * @param interceptor 消息拦截器对象* @return void 无返回值* @功能 向拦截器列表中添加一个消息拦截器,拦截器将在消息发送前后执行自定义逻辑*/public void addInterceptor(RedisMessageInterceptor interceptor) {interceptors.add(interceptor);}/*** 消息发送前执行拦截器逻辑* * @param message 消息对象* @return void 无返回值* @功能 在消息发送前调用所有拦截器的前置处理方法,按拦截器添加的顺序执行(正序)*/private void sendMessageBefore(AbstractRedisMessage message) {// 正序执行拦截器的前置处理方法interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));}/*** 消息发送后执行拦截器逻辑* * @param message 消息对象* @return void 无返回值* @功能 在消息发送后调用所有拦截器的后置处理方法,按拦截器添加顺序的逆序执行(倒序)*/private void sendMessageAfter(AbstractRedisMessage message) {// 倒序执行拦截器的后置处理方法for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).sendMessageAfter(message);}}}
2.3 消息父类及其子类
AbstractRedisChannelMessageListener自定义监听类父类
/*** Redis Pub/Sub 监听器抽象类,用于实现广播消费** @param <T> 消息类型。一定要填写噢,不然会报错*/
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {/*** 消息类型*/private final Class<T> messageType;/*** Redis Channel*/private final String channel;/*** RedisMQTemplate*/@Setterprivate RedisMQTemplate redisMQTemplate;/*** messageType 在 onMessage 方法中用于将 Redis 收到的消息(JSON 字符串)反序列化为 T 类型的对象* 通过 JsonUtils.parseObject(message.getBody(), messageType),明确的消息类型确保了正确的反序列化* channel 字段用于指定监听器订阅的 Redis Pub/Sub 通道* getChannel() 方法允许每个消息类型定义自己的通道,使监听器能够灵活适应不同的消息类型和通道* messageType 确定了后续消息反序列化的目标类型,确保消息被解析为 StringTextQueryChannelMessage。* channel 确定了监听器订阅的 Redis 通道,Spring Data Redis 会使用 getChannel() 的返回值("string.text.query.channel")订阅该通道。* @SneakyThrows 隐藏了反射相关的异常(如 NoSuchMethodException),假设 StringTextQueryChannelMessage 有无参构造函数且 getChannel() 可访问。*/@SneakyThrowsprotected AbstractRedisChannelMessageListener() {//通过反射获取子类的泛型参数this.messageType = getMessageClass();/*** 具体案例:* StringTextQueryChannelMessage 的无参构造函数* 调用 newInstance() 创建一个 StringTextQueryChannelMessage 实例。* 调用实例的 getChannel() 方法,获取通道名称(如 "string.text.query.channel")*/this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();}/*** 获得 Sub 订阅的 Redis Channel 通道** @return channel*/public final String getChannel() {return channel;}/*** Message message:Redis 传递的原始消息对象,包含消息体(message.getBody())和通道信息* T messageObj = JsonUtils.parseObject(message.getBody(), messageType)* 使用 JsonUtils.parseObject 将消息体(message.getBody(),字节数组)反序列化为 T 类型的对象* messageType 是构造函数中初始化的消息类(Class<T>),确保消息被正确解析为指定类型(如 OrderMessage)* 调用 this.onMessage(messageObj),这是抽象方法,子类必须实现以定义具体的消息处理* messageType 是 StringTextQueryChannelMessage.class(由构造函数设置)。* 具体案例:* JsonUtils.parseObject(message.getBody(), messageType)* 将 JSON 字节数组解析为 StringTextQueryChannelMessage 对象:* @param message* @param bytes*/@Overridepublic final void onMessage(Message message, byte[] bytes) {T messageObj = JsonUtils.parseObject(message.getBody(), messageType);try {consumeMessageBefore(messageObj);// 消费消息this.onMessage(messageObj);} finally {consumeMessageAfter(messageObj);}}/*** 处理消息** @param message 消息*/public abstract void onMessage(T message);/*** 通过解析类上的泛型,获得消息类型** @return 消息类型*/@SuppressWarnings("unchecked")private Class<T> getMessageClass() {Type type = TypeUtil.getTypeArgument(getClass(), 0);if (type == null) {throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));}return (Class<T>) type;}private void consumeMessageBefore(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 正序interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));}private void consumeMessageAfter(AbstractRedisMessage message) {assert redisMQTemplate != null;List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();// 倒序for (int i = interceptors.size() - 1; i >= 0; i--) {interceptors.get(i).consumeMessageAfter(message);}}
StringTextChannelConsumerListener自定义监听器实现类
@Slf4j
@Component
public class StringTextChannelConsumerListener extends AbstractRedisChannelMessageListener<StringTextQueryChannelMessage> {@Resourceprivate StringTextService stringTextService;/*** 在 StringTextChannelConsumerListener 实例化时(由 Spring 容器管理)* 会调用父类 AbstractRedisChannelMessageListener 的构造函数* 当 Redis 通道 "string.text.query.channel" 收到消息时* Spring Data Redis 调用 StringTextChannelConsumerListener 的 onMessage(Message, byte[]) 方法(继承自父类)* @param message 消息*/@Overridepublic void onMessage(StringTextQueryChannelMessage message) {log.info("[onMessage][Channel 消息: {},线程名: {}]", message, Thread.currentThread().getName());StringTextPageReqVO reqVO = new StringTextPageReqVO();reqVO.setText(message.getText());reqVO.setPageNo(message.getPageNo());reqVO.setPageSize(message.getPageSize());PageResult<StringTextDO> pageResult = stringTextService.getUserPage(reqVO);log.info("[onMessage][Channel 处理结果: {}]", pageResult);}
}

监听器执行逻辑:

  • Spring Data Redis 的调用
    • 当通道有消息时,Spring Data Redis 调用 MessageListener 的 onMessage(Message, byte[]),即 AbstractRedisChannelMessageListener 的实现。
  • 父类到子类的调用
    • 父类的 onMessage(Message, byte[]) 处理消息反序列化和拦截器逻辑,然后通过 this.onMessage(T) 调用子类的 onMessage(StringTextQueryChannelMessage)。
  • 子类没有直接调用父类
    • 子类的 onMessage 是父类调用的结果,不是子类主动调用父类。
    • 子类的 onMessage 只处理业务逻辑,依赖父类的预处理。
  • 构造函数的作用
    • 初始化 messageType 和 channel,确保消息反序列化和通道订阅正确,直接影响子类 onMessage 的执行。

这种设计通过模板方法模式实现了通用逻辑和业务逻辑的分离,父类控制流程,子类提供具体实现。

2.4 配置类
YudaoRabbitMQAutoConfiguration
/*** RabbitMQ 消息队列配置类* 配置RabbitMQ的消息转换器,使用Jackson进行JSON序列化* 当生产者发送消息时,将Java对象转换为JSON格式的字节数组* 当消费者接收消息时,将JSON格式的消息转换回Java对象*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class YudaoRabbitMQAutoConfiguration {/*** Jackson2JsonMessageConverter Bean:使用 jackson 序列化消息*/@Beanpublic MessageConverter createMessageConverter() {return new Jackson2JsonMessageConverter();}
}

YudaoRabbitMQAutoConfiguration:配置 RabbitMQ 消息转换器,使用 Jackson 进行 JSON 序列化。

YudaoRedisMQConsumerAutoConfiguration

/*** Redis 消息队列 Consumer 配置类* redisMessageListenerContainer(): 创建Pub/Sub消息监听容器* redisStreamMessageListenerContainer(): 创建Stream消息监听容器* redisPendingMessageResendJob(): 创建待处理消息重发任务*/
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQConsumerAutoConfiguration {/*** 创建 Redis Pub/Sub 广播消费的容器*/@Bean@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听public RedisMessageListenerContainer redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {// 创建 RedisMessageListenerContainer 对象RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 设置 RedisConnection 工厂。container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());// 添加监听器listeners.forEach(listener -> {listener.setRedisMQTemplate(redisMQTemplate);container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",listener.getChannel(), listener.getClass().getName());});return container;}/*** 创建 Redis Stream 重新消费的任务*/@Bean@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,RedisMQTemplate redisTemplate,@Value("${spring.application.name}") String groupName,RedissonClient redissonClient) {return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);}/*** 创建 Redis Stream 集群消费的容器** 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>*/@Bean(initMethod = "start", destroyMethod = "stop")@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();checkRedisVersion(redisTemplate);// 第一步,创建 StreamMessageListenerContainer 容器// 创建 options 配置StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10) // 一次性最多拉取多少条消息.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化.build();// 创建 container 对象StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);// 第二步,注册监听器,消费对应的 Stream 主题String consumerName = buildConsumerName();listeners.parallelStream().forEach(listener -> {log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",listener.getStreamKey(), listener.getClass().getName());// 创建 listener 对应的消费者分组try {redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());} catch (Exception ignore) {}// 设置 listener 对应的 redisTemplatelistener.setRedisMQTemplate(redisMQTemplate);// 创建 Consumer 对象Consumer consumer = Consumer.from(listener.getGroup(), consumerName);// 设置 Consumer 消费进度,以最小消费进度为准StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());// 设置 Consumer 监听StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false) // 不自动 ack.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 falsecontainer.register(builder.build(), listener);log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",listener.getStreamKey(), listener.getClass().getName());});return container;}/*** 构建消费者名字,使用本地 IP + 进程编号的方式。* 参考自 RocketMQ clientId 的实现** @return 消费者名字*/private static String buildConsumerName() {return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());}/*** 校验 Redis 版本号,是否满足最低的版本号要求!*/private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {// 获得 Redis 版本Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);String version = MapUtil.getStr(info, "redis_version");// 校验最低版本必须大于等于 5.0.0int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));if (majorVersion < 5) {throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));}}}

方法 1: redisMessageListenerContainer

  • 作用
    • 创建并配置 RedisMessageListenerContainer,用于监听 Redis Pub/Sub 通道消息。
    • 注册所有 AbstractRedisChannelMessageListener 子类的监听器(如 StringTextChannelConsumerListener)。

方法 2: redisPendingMessageResendJob

作用

  • 创建 RedisPendingMessageResendJob Bean,定时重发 Redis Stream 的待处理(pending)消息。

方法 3: redisStreamMessageListenerContainer

  • 作用
    • 创建并配置 StreamMessageListenerContainer,用于监听 Redis Stream 消息。
    • 注册所有 AbstractRedisStreamMessageListener 子类的监听器。
  • 输入参数
    • RedisMQTemplate redisMQTemplate:提供 Redis 连接和拦截器。
    • List<AbstractRedisStreamMessageListener<?>> listeners:所有 Stream 监听器。
  • 输出:StreamMessageListenerContainer<String, ObjectRecord<String, String>>,用于 Stream 消息消费。
  • 实现逻辑
    1. 检查 Redis 版本
      • checkRedisVersion(redisTemplate); 确保 Redis 版本 ≥ 5.0.0(支持 Stream)。
    2. 创建容器
      • 配置 containerOptions:
        • batchSize(10):每次拉取最多 10 条消息。
        • targetType(String.class):消息体为 String 类型(JSON 字符串),由监听器反序列化。
      • 创建 StreamMessageListenerContainer。
    3. 注册监听器
      • 使用 parallelStream 并行处理监听器,提高效率。
      • 对每个监听器:
        • 创建消费者组(createGroup)。
        • 注入 redisMQTemplate。
        • 创建 Consumer(组名 + 消费者名)。
        • 设置 StreamOffset(从最后消费位置读取)。
        • 注册监听器,配置手动 ACK 和错误处理。
      • 日志记录注册过程。
2.5 RedisPendingMessageResendJob(定时任务类)
/*** 这个任务用于处理,crash 之后的消费者未消费完的消息* 重新分发信息(清理员)*/
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {private static final String LOCK_KEY = "redis:pending:msg:lock";/*** 消息超时时间,默认 5 分钟** 1. 超时的消息才会被重新投递* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到*/private static final int EXPIRE_TIME = 5 * 60;private final List<AbstractRedisStreamMessageListener<?>> listeners;private final RedisMQTemplate redisTemplate;private final String groupName;private final RedissonClient redissonClient;/*** 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题*/@Scheduled(cron = "35 * * * * ?")public void messageResend() {RLock lock = redissonClient.getLock(LOCK_KEY);// 尝试加锁if (lock.tryLock()) {try {execute();} catch (Exception ex) {log.error("[messageResend][执行异常]", ex);} finally {lock.unlock();}}}/*** 执行清理逻辑** @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>*/private void execute() {//redisTemplate.getRedisTemplate().opsForStream() 获取 Redis Stream 的操作接口,用于执行 pending、range、add、acknowledge 等操作。StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();//检查每条传送带listeners.forEach(listener -> {//ops.pending(listener.getStreamKey(), groupName):查询 Stream 和消费者组的 pending 消息概况。PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));// 每个消费者的 pending 队列消息数量(检查每个传送带(Stream)上有哪些卡住的包裹(pending 消息))Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);// 每个消费者的 pending消息的详情信息,getStreamKey():返回 Stream 名称(如 "string.text.query.stream"),pendingMessagesPerConsumer:Map,键是消费者名称,值是 pending 消息数量PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);if (pendingMessages.isEmpty()) {return;}pendingMessages.forEach(pendingMessage -> {// 获取消息上一次传递到 consumer 的时间,long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();if (lastDelivery < EXPIRE_TIME){return;}// 获取指定 id 的消息体(对于超时的包裹,清理员去传送带上找到它的具体内容(records),比如包裹里装了啥(JSON 数据)。如果没找到(比如包裹被删了),就跳过)List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));if (CollUtil.isEmpty(records)) {return;}// 重新投递消息redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().ofObject(records.get(0).getValue()) // 设置内容.withStreamKey(listener.getStreamKey()));// ack 消息消费完成redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());});});});}
}
  • RedisPendingMessageResendJob 是一个定时任务类,专门用于处理 Redis Stream 消息队列中因消费者异常(如崩溃)而未确认(ACK)的待处理(pending)消息。
  • 它通过定期扫描消费者组的 pending 消息,检查消息是否超时(默认 5 分钟),并将超时的消息重新投递到 Stream,同时确认(ACK)原消息,以避免消息丢失或重复处理。

3. 总结

3.1 整体背景

想象你在一家快递公司(你的系统)工作,Redis 消息队列就像公司的两条传送带:

  • Pub/Sub 传送带(广播模式):像广播电台,包裹(消息)发出去,所有听众(消费者)都能收到,但包裹不会保存。
  • Stream 传送带(集群模式):像物流流水线,包裹有编号(消息 ID),可以保存、追踪,多个快递员(消费者)分工处理。

实现代码CurdTestController 负责把包裹放上传送带,框架类代码负责管理传送带、派送包裹、处理异常(比如包裹卡住)。下面,我会先介绍每个角色的作用(类分析),然后讲一个包裹从发出到送达的故事(全过程)。


3.2 类结构与作用分析

3.2.1 框架的 Redis MQ 相关类
  1. YudaoRedisMQProducerAutoConfiguration
    • 作用:像快递公司的“生产设备管理员”,负责初始化消息发送工具(RedisMQTemplate)。
    • 职责
      • 创建 RedisMQTemplate Bean,注入 StringRedisTemplate 和拦截器(RedisMessageInterceptor)。
      • 配置拦截器,增强消息发送的扩展性(如日志、租户处理)。
    • 关键方法
      • redisMQTemplate:构造 RedisMQTemplate,添加拦截器。
    • 使用场景:Spring 启动时,自动配置 RedisMQTemplate,供生产者(如 StringTextRedisProducer)使用。
  2. YudaoRedisMQConsumerAutoConfiguration
    • 作用:像“派送中心管理员”,负责配置 Pub/Sub 和 Stream 的消息监听容器,以及异常包裹清理任务。
    • 职责
      • 配置 RedisMessageListenerContainer:监听 Pub/Sub 通道(如 string-text-query-channel)。
      • 配置 StreamMessageListenerContainer:监听 Stream 队列(如 string-text-query-stream)。
      • 创建 RedisPendingMessageResendJob:定时清理 Stream 的待处理(pending)包裹。
      • 检查 Redis 版本,确保支持 Stream(≥5.0.0)。
      • 生成消费者名称(如 192.168.1.1@1234)。
    • 关键方法
      • redisMessageListenerContainer:注册 Pub/Sub 监听器。
      • redisStreamMessageListenerContainer:注册 Stream 监听器,设置批处理(10 条/次)、手动 ACK。
      • redisPendingMessageResendJob:创建定时任务。
      • buildConsumerName:生成唯一消费者名称。
      • checkRedisVersion:验证 Redis 版本。
    • 使用场景:Spring 启动时,自动配置消费者容器,监听消息并处理异常。
  3. RedisMQTemplate
    • 作用:像“传送带操作员”,负责把包裹放上 Pub/Sub 或 Stream 传送带,并支持拦截器扩展。
    • 职责
      • 发送 Pub/Sub 消息:convertAndSend 到指定通道。
      • 发送 Stream 消息:opsForStream().add 到指定 Stream,返回消息 ID。
      • 执行拦截器:发送前后调用 sendMessageBefore/sendMessageAfter。
      • 提供 opsForStream() 等接口,供定时任务(如 RedisPendingMessageResendJob)操作 Stream。
    • 关键方法
      • send(T extends AbstractRedisChannelMessage):发送 Pub/Sub 消息。
      • send(T extends AbstractRedisStreamMessage):发送 Stream 消息。
      • addInterceptor:添加拦截器。
    • 使用场景:被生产者(如 StringTextRedisProducer)调用发送消息,被定时任务调用操作 Stream。
  4. RedisMessageInterceptor
    • 作用:像“包裹检查员”,在消息发送或消费前后进行额外处理(如日志、租户隔离)。
    • 职责
      • 定义接口:sendMessageBefore/After、consumeMessageBefore/After。
      • 默认空实现,允许自定义扩展。
    • 使用场景:由 RedisMQTemplate 在消息发送/消费时调用,典型用于多租户场景或日志记录。
  5. RedisPendingMessageResendJob
    • 作用:像“包裹清理员”,定时(每分钟 35 秒)检查 Stream 传送带上卡住的包裹(pending 消息),重新投递超时的包裹。
    • 职责
      • 使用分布式锁(RedissonClient)确保单一实例执行。
      • 扫描每个 Stream 的 pending 消息,检查超时(5 分钟)。
      • 重新投递超时消息,确认(ACK)原消息。
    • 关键方法
      • messageResend:定时任务入口,加锁调用 execute。
      • execute:扫描 pending 消息,重新投递并 ACK。
    • 使用场景:消费者崩溃未确认消息时,定时重发确保消息不丢失。
  6. AbstractRedisMessage
    • 作用:像“包裹模板”,定义消息的基本结构。
    • 职责
      • 提供 headers(键值对),存储元数据(如租户 ID)。
      • 抽象基类,供具体消息类继承。
    • 使用场景:被 AbstractRedisChannelMessage 和 AbstractRedisStreamMessage 继承。
  7. AbstractRedisChannelMessage
    • 作用:像“Pub/Sub 包裹模板”,定义 Pub/Sub 消息的结构。
    • 职责
      • 提供 getChannel(),默认返回类名,子类可自定义通道(如 string-text-query-channel)。
      • 忽略序列化通道名(@JsonIgnore)。
    • 使用场景:被你的 StringTextQueryChannelMessage 继承。
  8. AbstractRedisStreamMessage
    • 作用:像“Stream 包裹模板”,定义 Stream 消息的结构。
    • 职责
      • 提供 getStreamKey(),默认返回类名,子类可自定义 Stream(如 string-text-query-stream)。
      • 忽略序列化 Stream 键。
    • 使用场景:被你的 StringTextQueryStreamMessage 继承。
  9. AbstractRedisChannelMessageListener<T>
    • 作用:像“Pub/Sub 快递员”,监听 Pub/Sub 通道,处理消息。
    • 职责
      • 自动获取消息类型(messageType)和通道(channel)。
      • 反序列化消息(JsonUtils.parseObject)。
      • 执行拦截器(consumeMessageBefore/After)。
      • 调用子类的 onMessage 处理消息。
    • 关键方法
      • onMessage(Message, byte[]):处理原始 Redis 消息。
      • onMessage(T):抽象方法,子类实现具体逻辑。
    • 使用场景:被你的 StringTextChannelConsumerListener 继承。
  10. AbstractRedisStreamMessageListener<T>
    • 作用:像“Stream 快递员”,监听 Stream 队列,处理消息。
    • 职责
      • 自动获取消息类型(messageType)、Stream 键(streamKey)、消费者组(group)。
      • 反序列化消息,执行拦截器,调用子类 onMessage。
      • 手动 ACK 消息(opsForStream().acknowledge)。
    • 关键方法
      • onMessage(ObjectRecord):处理 Stream 消息。
      • onMessage(T):抽象方法,子类实现。
    • 使用场景:被你的 StringTextStreamConsumerListener 继承。

3.2.2 实现定义类
  1. CurdTestController
    • 作用:像“客户服务中心”,接收用户请求,触发消息发送或异步查询。
    • 职责
      • 提供 REST 接口:
        • /getStringText:异步查询(StringTextService.getUserPageAsync)。
        • /getStringTextByMQ:触发 MQ 消息(未实现具体生产者)。
        • /getStringTextByStream:发送 Stream 消息(StringTextRedisProducer.sendStreamMessage)。
        • /getStringTextByChannel:发送 Pub/Sub 消息(StringTextRedisProducer.sendChannelMessage)。
      • 记录日志,处理异常。
    • 使用场景:用户通过 HTTP 请求触发消息队列或异步处理。
  2. StringTextRedisProducer
    • 作用:像“包裹打包员”,创建并发送 Pub/Sub 和 Stream 消息。
    • 职责
      • sendStreamMessage:发送 StringTextQueryStreamMessage 到 string-text-query-stream。
      • sendChannelMessage:发送 StringTextQueryChannelMessage 到 string-text-query-channel。
      • 使用 RedisMQTemplate 发送消息,记录日志。
    • 使用场景:被 CurdTestController 调用,发送查询请求到消息队列。
  3. StringTextChannelConsumerListener
    • 作用:像“Pub/Sub 派送员”,监听 string-text-query-channel,处理消息。
    • 职责
      • 接收 StringTextQueryChannelMessage,转换为 StringTextPageReqVO。
      • 调用 StringTextService.getUserPage 查询数据。
      • 记录处理日志。
    • 使用场景:处理 Pub/Sub 广播消息,适合实时通知场景。
  4. StringTextStreamConsumerListener
    • 作用:像“Stream 派送员”,监听 string-text-query-stream,处理消息。
    • 职责
      • 接收 StringTextQueryStreamMessage,转换为 StringTextPageReqVO。
      • 调用 StringTextService.getUserPage 查询数据。
      • 手动 ACK 消息,记录日志。
    • 使用场景:处理 Stream 集群消息,适合持久化、可靠投递场景。
  5. StringTextQueryChannelMessage
    • 作用:像“Pub/Sub 包裹”,定义 Pub/Sub 消息结构。
    • 职责
      • 包含 text、pageNo、pageSize,校验非空。
      • 指定通道 string-text-query-channel。
    • 使用场景:由 StringTextRedisProducer 发送,StringTextChannelConsumerListener 消费。
  6. StringTextQueryStreamMessage
    • 作用:像“Stream 包裹”,定义 Stream 消息结构。
    • 职责
      • 包含 text、pageNo、pageSize,校验非空。
      • 指定 Stream string-text-query-stream。
    • 使用场景:由 StringTextRedisProducer 发送,StringTextStreamConsumerListener 消费。
  7. StringTextServiceImpl
    • 作用:像“数据仓库”,提供数据查询服务。
    • 职责
      • getUserPage:同步查询分页数据。
      • getUserPageAsync:异步查询,使用线程池(curdAsyncExecutor)。
      • 调用 StringTextMapper 访问数据库。
    • 使用场景:被控制器和消费者调用,处理查询逻辑。

3.3 文字图总结

3.3.1 图示总结
YudaoRedisMQProducerAutoConfiguration
└─ redisMQTemplate(StringRedisTemplate redisTemplate, List<RedisMessageInterceptor> interceptors)│  │  // 初始化消息发送工具,注入 Redis 模板和拦截器(默认空)│  ├─ Parameters:│  ├─ redisTemplate: StringRedisTemplate // Redis 操作模板│  └─ interceptors: List<RedisMessageInterceptor> // 拦截器列表(默认空)│  └─ Returns: RedisMQTemplate│  └─ RedisMQTemplate(redisTemplate)│  └─ addInterceptor(interceptor) // 添加拦截器(默认空,支持链式调用)
YudaoRedisMQConsumerAutoConfiguration
├─ checkRedisVersion(StringRedisTemplate redisTemplate)
│  │  
│  │  // 校验 Redis 版本 ≥ 5.0.0
│  │  
│  ├─ Parameters: redisTemplate: StringRedisTemplate
│  └─ Returns: void
│
├─ redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners)
│  │  
│  │  // 配置 Pub/Sub 监听容器(基于 Redis 发布订阅模式)
│  │  
│  ├─ Parameters:
│  │  ├─ redisMQTemplate: RedisMQTemplate
│  │  └─ listeners: List<AbstractRedisChannelMessageListener<?>> 
│  │       // 包含 StringTextChannelConsumerListener 实现类
│  │  
│  └─ Returns: RedisMessageListenerContainer
│       │  
│       └─ addMessageListener(StringTextChannelConsumerListener listener, ChannelTopic("string-text-query-channel"))
│           │  
│           │  // 绑定监听器到指定通道(channel)
│           │  
│           ├─ Parameters:
│           │  ├─ listener: StringTextChannelConsumerListener
│           │  └─ topic: ChannelTopic("string-text-query-channel") 
│           │       // 通道名称固定为 string-text-query-channel
│           │  
│           └─ StringTextChannelConsumerListener.setRedisMQTemplate(redisMQTemplate)
│               // 注入消息发送模板,用于消费时的响应逻辑
│
├─ redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners)
│  │  
│  │  // 配置 Stream 监听容器(基于 Redis Stream 数据结构)
│  │  
│  ├─ Parameters:
│  │  ├─ redisMQTemplate: RedisMQTemplate
│  │  └─ listeners: List<AbstractRedisStreamMessageListener<?>> 
│  │       // 包含 StringTextStreamConsumerListener 实现类
│  │  
│  └─ Returns: StreamMessageListenerContainer
│       │  
│       └─ register(StreamReadRequest("string-text-query-stream", Consumer("my-app", "192.168.1.1@1234"), StringTextStreamConsumerListener listener))
│           │  
│           │  // 绑定监听器到指定 Stream(支持消费者组模式)
│           │  
│           ├─ Parameters:
│           │  ├─ request: StreamReadRequest 
│           │  │  // StreamKey: string-text-query-stream 
│           │  │  // Consumer: my-app@192.168.1.1@1234(消费者组名+实例标识)
│           │  └─ listener: StringTextStreamConsumerListener
│           │  
│           ├─ redisTemplate.opsForStream().createGroup("string-text-query-stream", "my-app")
│           │  // 自动创建消费者组(若不存在),组名与应用名一致
│           │  
│           └─ StringTextStreamConsumerListener.setRedisMQTemplate(redisMQTemplate)
│               // 注入消息发送模板,用于消费时的响应逻辑
│
└─ redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, RedisMQTemplate redisTemplate, String groupName, RedissonClient redissonClient)│  │  // 创建定时任务,处理 Stream 中未确认的 pending 消息│  // 防止消息堆积,保证至少一次投递语义│  ├─ Parameters:│  ├─ listeners: List<AbstractRedisStreamMessageListener<?>> │  │       // 包含 StringTextStreamConsumerListener 实现类│  ├─ redisTemplate: RedisMQTemplate│  ├─ groupName: String // 默认使用 spring.application.name("my-app")│  └─ redissonClient: RedissonClient // 分布式锁,保证任务单点执行│  └─ Returns: RedisPendingMessageResendJob// 定时任务会定期扫描 pending 消息并重新投递
通过 Stream 发送消息的流程
CurdTestController
└─ getStringTextByStream(StringTextPageReqVO pageReqVO)│  │  // 处理 Stream 消息请求│  Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│  Returns: "Stream 消息已发送,异步处理中"│  └─ StringTextRedisProducer.sendStreamMessage(text, pageNo, pageSize)│  │  // 发送 Stream 消息(参数:example, 1, 10)│  Returns: void│  └─ RedisMQTemplate.send(StringTextQueryStreamMessage message)│  │  // 发送到 Stream(消息内容包含 text/pageNo/pageSize)│  Returns: RecordId(如 "1234567890-0")│  ├─ sendMessageBefore(message)│  // 调用消息发送前的拦截器(默认无操作)│  ├─ JsonUtils.toJsonString(message)│  // 序列化消息为 JSON│  Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│  ├─ redisTemplate.opsForStream().add(StreamRecords)│  // 添加到 Stream(StreamKey: string-text-query-stream)│  Parameters: JSON 字符串 + StreamKey│  Returns: RecordId(消息唯一标识)│  └─ sendMessageAfter(message)// 调用消息发送后的拦截器(默认无操作)
StreamMessageListenerContainer
└─ StringTextStreamConsumerListener.onMessage(ObjectRecord<String, String> message)│  │  // 处理 Stream 消息(参数:包含 StreamKey 和 JSON 内容的记录)│  Returns: void│  ├─ JsonUtils.parseObject(message.getValue(), StringTextQueryStreamMessage.class)│  // 反序列化 JSON 为消息对象│  Returns: {text="example", pageNo=1, pageSize=10}│  ├─ consumeMessageBefore(messageObj)│  // 调用消息消费前的拦截器(默认无操作)│  ├─ onMessage(StringTextQueryStreamMessage message)│  // 子类自定义处理逻辑(由业务实现)│  Parameters: 反序列化后的消息对象│  Returns: void│  │  └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│      │  │      │  // 业务逻辑:查询数据│      │  Parameters: {text="example", pageNo=1, pageSize=10}│      │  Returns: 分页结果(包含查询到的数据)│      │  │      └─ StringTextMapper.selectPage(reqVO)│          // 数据库层查询│          Returns: 从数据库获取的分页结果│  ├─ redisMQTemplate.getRedisTemplate().opsForStream().acknowledge("my-app", message)│  // 关键步骤:确认消息已消费│  Parameters: 消费者组名 + 消息记录│  Returns: void│  (若不确认,消息会被视为 pending 并由定时任务重新投递)│  └─ consumeMessageAfter(messageObj)// 调用消息消费后的拦截器(默认无操作)
通过 Pub/Sub 发送消息的流程
CurdTestController
└─ getStringTextByChannel(StringTextPageReqVO pageReqVO)│  │  // 处理 Pub/Sub 消息请求│  Parameters: pageReqVO: {text="example", pageNo=1, pageSize=10}│  Returns: "Channel 消息已发送,异步处理中"│  └─ StringTextRedisProducer.sendChannelMessage(text, pageNo, pageSize)│  │  // 发送 Pub/Sub 消息(参数:example, 1, 10)│  Returns: void│  └─ RedisMQTemplate.send(StringTextQueryChannelMessage message)│  │  // 发送到通道(channel: string-text-query-channel)│  Returns: void│  ├─ sendMessageBefore(message)│  // 调用消息发送前的拦截器(默认无操作)│  ├─ JsonUtils.toJsonString(message)│  // 序列化消息为 JSON│  Returns: "{\"text\":\"example\",\"pageNo\":1,\"pageSize\":10}"│  ├─ redisTemplate.convertAndSend("string-text-query-channel", json)│  // 发布到 Redis 通道│  Parameters: 通道名 + JSON 字符串│  Returns: void(无返回值,Fire-and-Forget 模式)│  └─ sendMessageAfter(message)// 调用消息发送后的拦截器(默认无操作)
RedisMessageListenerContainer
└─ StringTextChannelConsumerListener.onMessage(Message message, byte[] bytes)│  │  // 处理 Pub/Sub 消息(参数:消息对象+通道字节数组)│  Returns: void(无返回值,Fire-and-Forget 模式)│  ├─ JsonUtils.parseObject(message.getBody(), StringTextQueryChannelMessage.class)│  // 反序列化 JSON 为消息对象│  Returns: {text="example", pageNo=1, pageSize=10}│  ├─ consumeMessageBefore(messageObj)│  // 调用消息消费前的拦截器(默认无操作)│  ├─ onMessage(StringTextQueryChannelMessage message)│  // 子类自定义处理逻辑(由业务实现)│  Parameters: 反序列化后的消息对象│  Returns: void│  │  └─ StringTextService.getUserPage(StringTextPageReqVO reqVO)│      │  │      │  // 业务逻辑:查询数据│      │  Parameters: {text="example", pageNo=1, pageSize=10}│      │  Returns: 分页结果(包含查询到的数据)│      │  │      └─ StringTextMapper.selectPage(reqVO)│          // 数据库层查询│          Returns: 从数据库获取的分页结果│  └─ consumeMessageAfter(messageObj)// 调用消息消费后的拦截器(默认无操作)

RedisPendingMessageResendJob 定时任务流程
3.3.2 类作用总结
  • 框架类
    • YudaoRedisMQProducerAutoConfiguration:初始化生产工具。
    • YudaoRedisMQConsumerAutoConfiguration:配置消费容器和定时任务。
    • RedisMQTemplate:发送消息,操作 Stream.
    • RedisMessageInterceptor:扩展点(未使用)。
    • RedisPendingMessageResendJob:重发 pending 消息。
    • AbstractRedisMessage:消息基类。
    • AbstractRedisChannelMessage:Pub/Sub 消息模板。
    • AbstractRedisStreamMessage:Stream 消息模板。
    • AbstractRedisChannelMessageListener:Pub/Sub 消费者基类。
    • AbstractRedisStreamMessageListener:Stream 消费者基类。
  • 具体业务实现类
    • CurdTestController:触发消息发送。
    • StringTextRedisProducer:发送消息。
    • StringTextChannelConsumerListener:处理 Pub/Sub 消息。
    • StringTextStreamConsumerListener:处理 Stream 消息。
    • StringTextQueryChannelMessage:Pub/Sub 消息。
    • StringTextQueryStreamMessage:Stream 消息。
    • StringTextServiceImpl:查询数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/86703.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【凌智视觉模块】rv1106 部署 ppocrv4 检测模型 rknn 推理

PP-OCRv4 文本框检测 1. 模型介绍 如有需要可以前往我们的仓库进行查看 凌智视觉模块 PP-OCRv4在PP-OCRv3的基础上进一步升级。整体的框架图保持了与PP-OCRv3相同的pipeline&#xff0c;针对检测模型和识别模型进行了数据、网络结构、训练策略等多个模块的优化。 从算法改…

uniapp Vue2 获取电量的独家方法:绕过官方插件限制

在使用 uniapp 进行跨平台应用开发时&#xff0c;获取设备电量信息是一个常见的需求。然而&#xff0c;uniapp 官方提供的uni.getBatteryInfo方法存在一定的局限性&#xff0c;它不仅需要下载插件&#xff0c;而且目前仅支持 Vue3&#xff0c;这让使用 Vue2 进行开发的开发者陷…

Go语言中的if else控制语句

if else是Go语言中最基础也最常用的条件控制语句&#xff0c;用于根据条件执行不同的代码块。下面我将详细介绍Go语言中if else的各种用法和特性。 1. 基本语法 1.1. 最简单的if语句 if 条件表达式 {// 条件为true时执行的代码 } 示例&#xff1a; if x > 10 {fmt.Prin…

[Spring]-AOP

AOP场景 AOP: Aspect Oriented Programming (面向切面编程) OOP: Object Oriented Programming (面向对象编程) 场景设计 设计: 编写一个计算器接口和实现类&#xff0c;提供加减乘除四则运算 需求: 在加减乘除运算的时候需要记录操作日志(运算前参数、运算后结果)实现方案:…

Web3 借贷与清算机制全解析:链上金融的运行逻辑

Web3 借贷与清算机制全解析&#xff1a;链上金融的运行逻辑 超额抵押借款 例如&#xff0c;借款人用ETH为抵押借入DAI&#xff1b;借款人的ETH的价值一定是要超过DAI的价值&#xff1b;借款人可以任意自由的使用自己借出的DAI 稳定币 第一步&#xff1a;借款人需要去提供一定…

RK3588开发笔记-GNSS-RTK模块调试

目录 前言 一、什么是GNSS/RTK 二、硬件连接 三、内核配置 四、模块调试 五、ntripclient使用 总结 前言 在RK3588平台上集成高精度定位功能是许多工业级应用的需求。本文记录了我调试GNSS-RTK模块的全过程,包含硬件连接、驱动移植、数据解析和精度优化等关键环节,希望对…

Vue.js $emit的介绍和简单使用

前言 在 Vue.js 开发中&#xff0c;组件化是核心思想之一。但组件间的通信是一个重要课题&#xff0c;特别是子组件向父组件传递数据的场景。Vue 提供了多种通信方式&#xff0c;而$emit正是实现子→父通信的关键方法。本文将深入解析$emit的原理、使用场景及最佳实践。 一、$e…

【Linux 学习计划】-- 简易版shell编写

目录 思路 创建自己的命令行 获取用户命令 分割命令 检查是否是内建命令 cd命令实现 进程程序替换执行程序 总代码 结语 思路 int main() {while (1){// 1. 自己的命令行PrintCommandLine();// 2. 获取用户命令char command[SIZE];int n GetUserCommand(command, si…

一个完整的日志收集方案:Elasticsearch + Logstash + Kibana+Filebeat (二)

&#x1f4c4; 本地 Windows 部署 Logstash 连接本地 Elasticsearch 指南 ✅ 目标 在本地 Windows 上安装并运行 Logstash配置 Logstash 将数据发送至本地 Elasticsearch测试数据采集与 ES 存储流程 &#x1f9f0; 前提条件 软件版本要求安装说明Java17Oracle JDK 下载 或 O…

Java使用Selenium反爬虫优化方案

当我们爬取大站的时候&#xff0c;就得需要对抗反爬虫机制的场景&#xff0c;因为项目要求使用Java和Selenium。Selenium通常用于模拟用户操作&#xff0c;但效率较低&#xff0c;所以需要我们结合其他技术来实现高效。 在 Java 中使用 Selenium 进行高效反爬虫对抗时&#xff…

状态管理方案对比与决策

1. 状态管理的基本概念 现代前端应用随着功能复杂度提升&#xff0c;状态管理已成为架构设计的核心挑战。状态管理本质上解决的是数据的存储、变更追踪和响应式更新问题&#xff0c;以确保UI与底层数据保持同步。 核心挑战: 状态共享与组件通信可预测的状态变更性能优化与重…

Fetch与Axios:区别、联系、优缺点及使用差异

Fetch与Axios&#xff1a;区别、联系、优缺点及使用差异 文章目录 Fetch与Axios&#xff1a;区别、联系、优缺点及使用差异一、联系二、区别1. 浏览器支持与兼容性2. 响应处理3. 请求拦截和响应拦截4. 错误处理 三、优缺点1. Fetch API优点缺点 2. Axios优点缺点 四、使用上的差…

【Docker】快速入门与项目部署实战

我们在部署一个项目时&#xff0c;会出现一系列问题比如&#xff1a; 命令太多了&#xff0c;记不住软件安装包名字复杂&#xff0c;不知道去哪里找安装和部署步骤复杂&#xff0c;容易出错 其实上述问题不仅仅是新手&#xff0c;即便是运维在安装、部署的时候一样会觉得麻烦…

Java面试题尚硅谷版第1季

1、写出如下代码运行结果 1.1、 使用局部变量表和操作数栈解题 1.2、使用前置和后置递增解题 2、写一个单例模式 2.1、考察知识点 2.2、单例模式实现 3、类加载和初始化顺序 package classload;public class Father {private int i test();private static int j method();st…

关于Qt阻断样式继承的解决办法

引言 在使用 Qt 开发桌面应用时&#xff0c;借助样式表&#xff08;StyleSheet&#xff09;来统一定义界面风格是非常常见的做法。通常&#xff0c;你会在主程序中通过 qApp->setStyleSheet(...) 或者直接给某个父控件设置样式表&#xff0c;让所有的子控件都采用相同的配色…

鼠标右键添加新建某种文件的方法

场景 我经常用到.emmx&#xff0c;.eddx文件&#xff0c;电脑上装的是wpsX亿图&#xff08;因为有wps会员&#xff09;&#xff0c;没有开亿图会员。 然后问题就是&#xff0c;思维导图和流程图我都能正常开&#xff0c;正常编辑&#xff0c;但鼠标右键没有新建这两个文件的按…

Inxpect安全雷达传感器与控制器:动态检测 + 抗干扰技术重构工业安全防护体系

Inxpect 推出工业安全领域新型智能传感器与控制器&#xff0c;其核心产品为雷达扫描仪&#xff0c;具备动态调整检测区域、抗干扰能力强等特点&#xff0c;可精准检测危险区域人员进入或存在情况&#xff0c;适用于移动机器人等场景。 Inxpect安全雷达传感器核心功能 动态检测…

【AI学习】李广密与阶跃星辰首席科学家张祥雨对谈:多模态发展的历史和未来

仔细阅读了文章《专访张祥雨&#xff1a;多模态推理和自主学习是未来的 2 个 「GPT-4」 时刻》 https://mp.weixin.qq.com/s/892QuRPH9uP6zN6dS-HZMw 非常赞叹的一篇文章&#xff0c;说清楚了NLP、CV发展中的许多重大问题&#xff0c;读来醍醐灌顶&#xff01;这样的文章&…

C++中std::deque详解和实战工程代码示例

C中std::deque详解和实战工程代码示例 std::deque&#xff08;双端队列&#xff09;是 C 标准库中的一个序列容器&#xff0c;与 std::vector 类似&#xff0c;但它支持从头部和尾部高效地插入和删除元素。它底层采用分段连续空间实现&#xff0c;兼具灵活性与性能。 一、基本…

【AI大模型入门指南】概念与专有名词详解 (二)

【AI大模型入门指南】概念与专有名词详解 &#xff08;二&#xff09; 一 、前言 当你和聊天机器人聊得天花乱坠时&#xff0c;当你用文字让AI生成精美图片时&#xff0c;当手机相册自动帮你分类照片时 —— 这些看似智能的操作背后&#xff0c;都藏着 AI 大模型的身影。 本…