Spring Boot 整合 Redis 实现发布/订阅(含ACK机制 - 事件驱动方案)

Spring Boot整合Redis实现发布/订阅(含ACK机制)全流程

一、整体架构

二、实现步骤

步骤1:添加Maven依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>

步骤2:配置Redis连接

# application.yml
spring:redis:host: localhostport: 6379lettuce:pool:max-active: 16max-idle: 8
# redisStream配置信息
app:redis:stream: app-eventsgroup: app-groupconsumer: consumer-${random.int(1000)}

步骤3:创建消费者组

@Configuration
public class RedisConfig {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Beanpublic void createConsumerGroup(StringRedisTemplate redisTemplate) {try {redisTemplate.opsForStream().createGroup(streamKey, groupName);} catch (Exception e) {System.out.println("消费者组已存在: " + groupName);}}
}

步骤4:配置消息监听容器

@Configuration
public class RedisConfig {// 配置消息监听线程池@Bean(name = "redisStreamTaskExecutor")public ThreadPoolTaskExecutor redisStreamTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setThreadNamePrefix("redis-stream-");return executor;}// 创建消息监听容器@Bean(initMethod = "start", destroyMethod = "stop")public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamContainer(RedisConnectionFactory factory,@Qualifier("redisStreamTaskExecutor") ThreadPoolTaskExecutor executor) {StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).executor(executor).batchSize(10).build();return StreamMessageListenerContainer.create(factory, options);}
}

步骤5:注册消息监听器

@Component
public class StreamListenerRegistrar {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;@PostConstructpublic void registerListener(StreamMessageListenerContainer container, RedisMessageProcessor processor) {StreamReadRequest<String> readRequest = StreamReadRequest.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed())).consumer(Consumer.from(groupName, consumerName)).autoAcknowledge(false) // 手动ACK.build();container.register(readRequest, processor);}
}

步骤6:实现消息处理器

@Component
public class RedisMessageProcessor implements StreamListener<String, MapRecord<String, String, String>> {@Overridepublic void onMessage(MapRecord<String, String, String> record) {CompletableFuture.runAsync(() -> {try {// 业务处理逻辑processBusiness(record);// 处理成功发送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());} catch (Exception e) {// 失败消息进入Pending List}});}private void processBusiness(MapRecord<String, String, String> record) throws Exception {String eventType = record.getValue().get("eventType");String payload = record.getValue().get("payload");// 根据事件类型处理switch (eventType) {case "ORDER_CREATED": handleOrder(payload); break;case "PAYMENT_PROCESSED": handlePayment(payload); break;}}
}

步骤7:实现Pending消息处理器

@Component
@Slf4j
public class PendingMessageProcessor {@Value("${app.redis.stream}")private String streamKey;@Value("${app.redis.group}")private String groupName;@Value("${app.redis.consumer}")private String consumerName;// 每分钟处理一次Pending消息@Scheduled(fixedRate = 60000)public void processPendingMessages() {// 1. 查询Pending消息PendingMessages pending = redisTemplate.opsForStream().pending(streamKey, groupName, Range.unbounded(), 100);pending.forEach(this::handlePendingMessage);}private void handlePendingMessage(PendingMessage pending) {try {// 2. 重新认领消息List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().claim(streamKey, Consumer.from(groupName, consumerName), Duration.ofSeconds(30), pending.getId());if (!records.isEmpty()) {MapRecord<String, String, String> record = records.get(0);// 3. 重试处理messageProcessor.processBusiness(record);// 4. 处理成功发送ACKredisTemplate.opsForStream().acknowledge(streamKey, groupName, record.getId());}} catch (Exception e) {// 5. 超过重试次数移入死信队列if (pending.getTotalDeliveryCount() > 3) {moveToDeadLetterQueue(pending);}}}private void moveToDeadLetterQueue(PendingMessage pending) {// 获取消息内容List<MapRecord<String, String, String>> records = redisTemplate.opsForStream().range(streamKey, Range.from(pending.getId()));if (!records.isEmpty()) {// 添加到死信队列redisTemplate.opsForStream().add("dead-letter:" + streamKey, records.get(0).getValue());// 确认原始消息redisTemplate.opsForStream().acknowledge(streamKey, groupName, pending.getId());}}
}

步骤8:实现消息生产者

@Service
public class RedisMessageProducer {@Value("${app.redis.stream}")private String streamKey;public String sendMessage(String eventType, String payload) {Map<String, String> message = Map.of("eventType", eventType,"payload", payload,"timestamp", String.valueOf(System.currentTimeMillis()));return redisTemplate.opsForStream().add(streamKey, message).getValue();}
}

步骤9:创建REST接口

@RestController
@RequestMapping("/messages")
public class MessageController {private final RedisMessageProducer producer;@PostMappingpublic String sendMessage(@RequestBody MessageRequest request) {return producer.sendMessage(request.getEventType(), request.getPayload());}@Datapublic static class MessageRequest {private String eventType;private String payload;}
}

三、消息生命周期流程图

1. 正常消息处理流程

2. Pending消息处理流程

 

3. ACK机制工作原理

四、生产环境建议

  1. 消费者命名策略

    @Value("${app.redis.consumer}")
    private String consumerName;// 在应用启动时设置
    @PostConstruct
    public void initConsumerName() {String hostName = InetAddress.getLocalHost().getHostName();String port = environment.getProperty("server.port");consumerName = "consumer-" + hostName + "-" + port;
    }
  2. 动态配置重试策略

    app:pending:max_retry: 5retry_interval: 30000 # 30秒
  3. 死信队列监控

    @Scheduled(fixedRate = 3600000) // 每小时检查一次
    public void checkDeadLetterQueue() {Long size = redisTemplate.opsForStream().size("dead-letter:" + streamKey);if (size > 0) {alertService.sendAlert("死信队列有 " + size + " 条未处理消息");}
    }
  4. 消息TTL设置

    // 发送消息时设置最大长度
    public String sendMessage(String eventType, String payload) {MapRecord<String, String, String> record = ...;return redisTemplate.opsForStream().add(Record.of(record).withMaxLen(10000).approximate(true));
    }

六、总结

本文详细介绍了Spring Boot整合Redis实现发布/订阅功能并添加ACK机制的完整方案:

  1. 事件驱动架构:使用Redis Stream监听器实现真正的发布/订阅模式

  2. 可靠ACK机制:通过手动ACK确认确保消息可靠处理

  3. 自动恢复系统:Pending消息处理器自动处理失败消息

  4. 死信队列:隔离无法处理的消息,防止系统阻塞

  5. 生产就绪:包含多实例部署、动态配置、监控告警等生产级特性

该方案适用于需要高可靠性消息传递的场景,如订单处理、支付系统、事件溯源等,在保证系统吞吐量的同时提供了消息可靠性保障。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92555.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92555.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sklearn 机器学习 线性回归

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Sklearn 机器学习线性回归实战详解 线性回归是机器学习中最基础也最经典的算法之一,…

AJAX案例合集

案例一&#xff1a;更换网站背景JS核心代码<script>document.querySelector(.bg-ipt).addEventListener(change, e > {//选择图片上传&#xff0c;设置body背景const fd new FormData()fd.append(img, e.target.files[0])axios({url: http://hmajax.itheima.net/api/…

vscode环境下c++的常用快捷键和插件

本文提供一些能够在vscode的环境下&#xff0c;提高c代码书写效率的快捷键&#xff0c;插件以及设置等等。 快捷键ctrlshiftx&#xff1a; 弹出插件菜单ctrlshiftp&#xff1a;弹出命令面板可以快捷执行一些常见命令插件安装这个后&#xff0c;可以按住ctrl跳转到方法的实现&am…

React + ts 中应用 Web Work 中集成 WebSocket

一、Web Work定义useEffect(() > {let webSocketIndex -1const websocketWorker new Worker(new URL(./websocketWorker.worker.ts?worker, import.meta.url),{type: module // 必须声明模块类型});//初始化WEBSOCKET&#xff08;多个服务器选择最快建立连接…

RabbitMQ面试精讲 Day 3:Exchange类型与路由策略详解

【RabbitMQ面试精讲 Day 3】Exchange类型与路由策略详解 文章标签 RabbitMQ,消息队列,Exchange,路由策略,AMQP,面试题,分布式系统 文章简述 本文是"RabbitMQ面试精讲"系列第3天内容&#xff0c;深入解析RabbitMQ的核心组件——Exchange及其路由策略。文章详细剖析…

深入解析Hadoop MapReduce Shuffle过程:从环形缓冲区溢写到Sort与Merge源码

MapReduce与Shuffle过程概述在大数据处理的经典范式MapReduce中&#xff0c;Shuffle过程如同人体血液循环系统般连接着计算框架的各个组件。作为Hadoop最核心的分布式计算模型&#xff0c;MapReduce通过"分而治之"的思想将海量数据处理分解为Map和Reduce两个阶段&…

Kafka MQ 消费者

Kafka MQ 消费者 1 创建消费者 在读取消息之前,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似—把想要传给消费者的属性放在Properties对象里。本章后续部分将深入介绍所有的配置属性。为简单起见,这里只提供3个必要的属性:boo…

人工智能——Opencv图像色彩空间转换、灰度实验、图像二值化处理、仿射变化

一、图像色彩空间转换&#xff08;一&#xff09;颜色加法1、直接相加1、直接相加2、调用cv.add()函数进行饱和操作 在OpenCV中进行颜色的加法&#xff0c;我们说图像即数组&#xff0c;所以从数据类型来说我们可以直接用numpy的知识来进行直接相加&#xff0c;但是存在…

【JToken】JToken == null 判断无效的问题

if (innerNode null) {continue; }Debug.Log($"toNode type: {node["toNode"]?.GetType()}");发现这个JToken 无法正确的判断 是否为 null&#xff0c;再排除逻辑问题后&#xff0c;我基本能确定的是 这个对象 不返回的不是真正的C# NULL 输出类型后是 N…

C++基于libmodbus库实现modbus TCP/RTU通信

今天看到了一个参考项目中用到了modbus库&#xff0c;看着使用很是方便&#xff0c;于是记录一下。后面有时间了或者用到了再详细整理。 参考&#xff1a;基于libmodbus库实现modbus TCP/RTU通信-CSDN博客 一、介绍 1.1库文件包含 1.2最简单的使用 本人在QT6.5下&#xff0…

【原创】微信小程序添加TDesign组件

前言 TDesign 是腾讯公司推出的一款UI界面库,至于腾讯的实力嘛,也不用多说了。 官网:https://tdesign.tencent.com/ 源码:https://github.com/Tencent/tdesign 目前处于活跃状态,发文前5日,该库仍在更新中… 遇到的问题 虽然腾讯为微信小程序开发提供了一个讨论的论坛,…

Vue的路由模式的区别和原理

路由模式 Vue 的路由模式指的是 Vue Router 提供的 URL 处理方式&#xff0c;主要有两种&#xff1a;Hash 模式和History 模式。 Hash模式 在 Vue Router 中&#xff0c;默认使用的是 hash 模式&#xff0c;即 mode: hash。如果想要使用 history 模式&#xff0c;可以设置 mode…

通过TPLink路由器进行用户行为审计实战

用户行为审计是指对用户在网络平台上的行为进行监控和记录&#xff0c;以便对其行为进行分析和评估的过程。随着互联网的普及和发展&#xff0c;用户行为审计在网络安全和数据隐私保护方面起到了重要的作用。 用户行为审计可以帮助发现和预防网络安全威助。通过对用户的行为进行…

MYSQL 第一次作业

新建产品库mysql> CREATE DATABASE mydb6_product;使用产品库mysql> USE mydb6_product;创建employess表mysql> CREATE TABLE employees (-> id INT PRIMARY KEY,-> name VARCHAR(50) NOT NULL,-> age INT,-> gender VARCHAR(10) NOT NULL DEFAULT unknow…

暑期前端训练day7——有关vue-diff算法的思考

前言 今天分享一下我对vue的diff的探究&#xff0c;跟我一起深入底层&#xff0c;看一看vue是怎么进行diff的&#xff0c;它的复杂度如何&#xff0c;为什么性能这么高&#xff0c;diff的目标是尽可能的复用原来的真实dom&#xff0c;减少删除真实dom和创建真实的dom的开销&…

【Docker】Docker的初步认识以及Ubuntu下的Docker环境安装、配置

前言 在当今快速迭代的软件开发与部署领域&#xff0c;容器化技术已成为不可或缺的核心力量&#xff0c;而 Docker 作为容器化技术的杰出代表&#xff0c;正以其轻量、高效、可移植的特性深刻改变着开发与运维的模式。它有效解决了 “在我机器上能运行&#xff0c;在你那里却不…

【密码学】2. 古典密码

目录2. 古典密码2.1 经典加密技术基础2.2 代换技术2.2.1 算术密码2.2.2 代换密码&#xff08;Substitution Cipher&#xff09;2.3 置换技术2.4 乘积密码2.5 历史上的教训2. 古典密码 2.1 经典加密技术基础 分类 代换&#xff08;Substitution&#xff09;&#xff1a;明文内…

CSS3文本阴影特效全攻略

CSS3文本阴影效果实现 下面我将创建一个展示各种CSS3文本阴影效果的页面&#xff0c;包含多种样式示例和代码实现。 设计思路 创建具有视觉吸引力的标题区域提供多种文本阴影效果实例显示对应的CSS代码以供参考添加交互元素让用户自定义效果 实现代码 <!DOCTYPE html&g…

JavaScript 03 严格检查模式Strict字符串类型详解

2.4 严格检查模式Strict在 JavaScript 里&#xff0c;也是 有 “作用域” 这个说法的。 所以说&#xff0c;变量 也分 全局变量 和 局部变量。 当我们 直接 把 代码 写在 script 双标签里面的时候&#xff0c;我们 JS 会认为 这只是 一个 没有名字的 函数&#xff01;&#xff…

车载诊断ECU架构

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…