Redisson - 实现延迟队列

Redisson 延迟队列

Redisson 是基于 Redis 的一款功能强大的 Java 客户端。它提供了诸如分布式锁、限流器、阻塞队列、延迟队列等高可用、高并发组件。

其中,RDelayedQueue 是对 Redis 数据结构的高阶封装,能让你将消息延迟一定时间后再进入消费队列。

延迟队列的组成

Redisson 延迟队列由两个 Redis 队列组成:

RDelayedQueue:延迟元素容器,暂存在 Redis 的 zset 中(按时间排序)
RBlockingQueue(目标队列):当延迟时间到达后,Redisson 会自动将元素移动到这个队列,供消费者消费。

Redisson 内部使用 定时轮询线程 来扫描延迟数据并迁移至目标队列。

原理

Redisson 的 RDelayedQueue 设计巧妙,它并非一个单一的 Redis 数据结构,而是结合了 Redis 的 ZSET (有序集合) 和 List (列表,具体实现为 RBlockingQueue) 来实现的

> 生产者  -- (元素, 延迟时间) -->  RDelayedQueue (API)
>                                      |
>                                      | (Redisson 内部)
>                                      V
>             +--------------------------------------------------+
>             |   ZSET (有序集合)                                |
>             |   Key: delayed_queue_name_zset                   |
>             |   Member: task_id_or_payload                     |
>             |   Score: execution_timestamp                     |
>             +--------------------------------------------------+
>                        ^
>                        |  (Eviction Scheduler 定期扫描)
>                        |  (到期任务)
>                        V
>             +--------------------------------------------------+
>             |   RBlockingQueue (目标队列,基于 Redis List)      |
>             |   Key: destination_queue_name                    |
>             |   Value: task_payload                            |
>             +--------------------------------------------------+
>                                      ^
>                                      |
>                                      | (消费者 take()/poll())
>                                      V 消费者  <---------------------------

Redisson 延迟队列的优势

分布式特性:基于 Redis,天然支持分布式环境,多个生产者和消费者实例可以共享同一个延迟队列系统。
高性能与持久化:依赖 Redis 的高性能特性。如果 Redis 配置了持久化 (AOF/RDB),延迟任务的元数据也能得到持久化保障。
易用性:Redisson 提供了简洁易懂的 API,屏蔽了底层 ZSET 和 List 的复杂操作。
精确度较高:任务的到期判断依赖 Redis 服务器的时间,通常比较准确。
支持任务取消:可以通过 RDelayedQueue.remove(object) 方法从延迟队列中移除尚未到期的任务(前提是任务对象能被正确 equals 比较)。

Redisson队列实践

添加依赖

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.24.3</version>
</dependency>

Redisson 配置 (application.yml):

spring:

  application:name: redisson-delayed-order-app# Redisson 配置 (如果使用 redisson-spring-boot-starter,它会尝试自动配置)# 你也可以提供一个 redisson.yaml 文件或在 Java Config 中配置 RedissonClient Bean# 例如,指定单个 Redis 服务器:redis: # Spring Boot 2.x 使用 spring.redis, Spring Boot 3.x 使用 spring.data.redis# Redisson starter 会尝试使用这些配置,但更推荐使用 Redisson 自身的配置方式# address: redis://127.0.0.1:6379 # Redisson 推荐的格式host: 127.0.0.1port: 6379# database: 0# password:# Redisson 自己的配置方式 (更灵活,可以放在 redisson.yaml 中)
# redisson:
#   singleServerConfig:
#     address: "redis://127.0.0.1:6379"
#     database: 0
#   # codec: org.redisson.codec.JsonJacksonCodec # 推荐使用 Jackson 序列化

定义延迟任务处理器(消费者)

@Component
public class OrderCloseConsumer implements InitializingBean {private static final String QUEUE_NAME = "order-close-queue";@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate OrderService orderService;@Overridepublic void afterPropertiesSet() {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(QUEUE_NAME);RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);// 启动消费线程Executors.newSingleThreadExecutor().submit(() -> {while (true) {try {String orderSn = blockingQueue.take(); // 阻塞等待orderService.closeOrder(orderSn); // 业务处理log.info("订单超时关闭成功:{}", orderSn);} catch (Exception e) {log.error("延迟关单处理异常", e);}}});}
}

在下单时设置延迟任务

public void createOrder(String orderSn) {// 1. 业务入库逻辑...// 2. 加入延迟队列RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("order-close-queue");RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(orderSn, 30, TimeUnit.MINUTES); // 30分钟后执行log.info("订单加入延迟关闭队列:{}", orderSn);
}

关键注意事项

1、序列化 (Codec):
Redisson 默认使用 MarshallingCodec,它要求任务对象实现 java.io.Serializable。
推荐配置 RedissonClient 使用 org.redisson.codec.JsonJacksonCodec,这样任务对象无需实现 Serializable,更灵活且跨语言友好。在 RedissonClient Bean 配置中设置:config.setCodec(new JsonJacksonCodec());

2、幂等性: 消费者的处理逻辑(如 processOrderTimeout)必须是幂等的。即使同一个任务被重复消费(如消费者处理中断后任务重投,或 remove 失败后任务仍到期),最终结果也应一致。通过检查订单当前状态是保证幂等性的关键。

3、任务取消的 equals() 和 hashCode(): RDelayedQueue.remove(object) 依赖于任务对象的 equals() 和 hashCode() 方法来查找并删除。确保任务载体类 (如 OrderTimeoutTask) 正确实现了这两个方法,通常基于任务的唯一标识(如订单号)。

4、消费者线程管理: 使用 InitializingBean 和 DisposableBean 来管理消费者线程的生命周期。消费者逻辑应在单独的线程中执行,避免阻塞主线程。可以使用 Executors 创建线程池来并发处理任务。

5、错误处理与重试: 消费者循环中应有完善的 try-catch 块,处理单个任务失败的情况,避免整个消费者线程挂掉。可以考虑引入失败任务记录、告警或简单的延时重试(但要注意避免无限重试)。

6、Redis 持久化: 为保证系统重启后延迟任务不丢失(ZSET 中的元数据),Redis 服务器应配置 RDB 或 AOF 持久化。

7、消费者并发与伸缩:
可以启动多个消费者实例(不同JVM进程),它们会从同一个目标 RBlockingQueue 中竞争获取任务,实现负载均衡。
在单个消费者实例内部,也可以使用线程池来并发处理从队列中获取的任务。

8、监控: 关注 Redis 中 ZSET 和 List 的长度、Redisson 调度线程的健康状况、任务处理的成功率和耗时等指标,以便及时发现和处理问题。

9、Redisson 版本: 确保使用较新的稳定版 Redisson,因为早期版本可能在延迟队列的某些边缘场景下存在问题。

总结

Redisson 的 RDelayedQueue 通过巧妙地结合 Redis ZSET 和 List,提供了一个强大、易用且高效的分布式延迟队列解决方案。它非常适合如订单超时关闭、延时通知、定时任务等场景。在实践中,务必注意序列化、幂等性、任务取消的正确实现以及消费者端的健壮性设计,才能充分发挥其优势,构建稳定可靠的分布式应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/83646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

上门服务小程序订单系统框架设计

一、逻辑分析 上门服务小程序订单系统主要涉及服务展示、用户下单、订单处理、服务人员接单与服务完成反馈等核心流程。 服务展示&#xff1a;不同类型的上门服务&#xff08;如家政、维修等&#xff09;需要在小程序中展示详细信息&#xff0c;包括服务名称、价格、服务内容介…

Android apk装机编译类型: verify、speed-profile, speed与启动耗时

Android apk装机编译类型: verify、speed-profile, speed与启动耗时 Dex2oat (dalvik excutable file to optimized art file) &#xff0c;对 dex 文件进行编译优化&#xff0c;Android 虚拟机可识别的是dex文件&#xff0c;应用运行过程如果每次都将dex文件加载内存&#xff…

winrm登录失败,指定的凭据被服务器拒绝

winrm登录失败&#xff0c;指定的凭据被服务器拒绝。 异常提示&#xff1a;the specified credentials were rejected by the server 在windows power shell执行 set-executionpolicy remotesigned winrm quickconfig winrm set winrm/config/service/auth {Basic"true…

Unity3D ET框架游戏脚本系统解析

前言 ET框架在Unity3D中实现的GamePlay脚本系统是一种革命性的、基于ECS&#xff08;实体-组件-系统&#xff09;架构的设计&#xff0c;它彻底改变了传统的基于MonoBehaviour的游戏逻辑编写方式。其核心思想是追求高性能、高解耦、易热更新&#xff0c;特别适合大型复杂的网络…

android与Qt类比

一、概念对应关系 Android RecyclerView 组件类比描述Qt 模型 - 视图组件Qt 类比描述RecyclerView画板&#xff08;容器&#xff09;QAbstractItemView视图&#xff08;展示数据的容器&#xff0c;如列表、表格&#xff09;RecyclerView.Adapter画布&#xff08;数据桥梁&…

Jenkins 2.479.1安装和邮箱配置教程

1.安装 在JDK安装并设置环境变量完成后&#xff0c;下载官网对应的war版本&#xff0c;在对应目录下打开命令行窗口并输入 java -jar jenkins.war其余参数感兴趣可以自行查阅&#xff0c;这里启动的 jenkins 服务默认占用8080端口&#xff0c;在浏览器输入 localhost:8080进入…

多分辨率 LCD 的 GUI 架构设计与实现

1.1多分辨率显示系统的挑战与解决方案 1.1.1 分辨率适配的核心问题 在嵌入式系统中,同时支持不同分辨率的 LCD(如 240160、320480 等)面临以下挑战: 布局适配:同一界面元素在不同分辨率下需要调整大小和位置 字体显示:小分辨率屏幕需要更小的字体,而大分辨率需要更清…

11. MySQL事务管理(上)

1. CURD不加控制&#xff0c;会有什么问题&#xff1f; 火车票售票系统tickets表 id name nums 10 西安<->兰州 1 客户端A 客户端B if (nums > 0) { if (nums > 0) { 卖票 卖票 // update numsnums - 1 update numsnums - 1 } } 当客户端A检查还有一张票时&#xf…

Beta分布Dirichlet分布

目录 Beta分布Dirichlet分布Beta分布&Dirichlet分布从Dirichlet分布生成Beta样本Beta分布&Dirichlet分布应用 Beta分布 Beta分布是定义在区间 [ 0 , 1 ] [0, 1] [0,1]上的连续概率分布&#xff0c;通常用于模拟概率或比例的随机变量。Beta分布的概率密度函数&#xff…

嵌入式系统中常用的开源协议

目录 1、GNU通用公共许可证&#xff08;GPL&#xff09; 2、GNU宽松通用公共许可证&#xff08;LGPL&#xff09; 3、MIT许可证 4、Apache许可证2.0 5、BSD许可证 6、如何选择合适的协议 在嵌入式系统开发中&#xff0c;开源软件的使用已成为主流趋势。从物联网设备到汽车…

告别延迟,拥抱速度:存储加速仿真应用的解决方案【1】

需求分析 现代仿真&#xff08;如CFD流体动力学、FEA结构分析、电磁仿真、气候模拟、自动驾驶场景仿真、芯片设计等&#xff09;会产生PB级甚至EB级的数据。海量数据的生成、处理和存储&#xff0c;主要体现在以下几个关键方面&#xff1a; 数据量爆炸式增长&#xff1a;高分…

vue封装gsap自定义动画指令

1、指令文件封装 import { gsap } from gsap;// 动画类型配置 const ANIMATION_TYPES {// 缩放scale: {from: { scale: 0.5, opacity: 0 },to: { scale: 1, opacity: 1 },hide: { scale: 0.5, opacity: 0 },},// 透明度fade: {from: { opacity: 0 },to: { opacity: 1, ease: …

HTTP 如何升级成 HTTPS

有一个自己的项目需要上线&#xff0c;域名解析完成后&#xff0c;发现只能使用 http 协议&#xff0c;这在浏览器上会限制&#xff0c;提示用户不安全&#xff0c;所以需要把 HTTP 升级成 HTTPS 协议&#xff0c;但又不想花钱。 前提条件&#xff1a; 已经配置好 Nginx 服务器…

测试面试题总结一

目录 列表、元组、字典的区别 nvicat连接出现问题如何排查 mysql性能调优 python连接mysql数据库方法 参数化 pytest.mark.parametrize 装饰器 list1 [1,7,4,5,5,6] for i in range(len(list1): assert list1[i] < list1[i1] 这段程序有问题嘛&#xff1f; pytest.i…

[蓝桥杯]密文搜索

密文搜索 题目描述 福尔摩斯从 X 星收到一份资料&#xff0c;全部是小写字母组成。 他的助手提供了另一份资料&#xff1a;许多长度为 8 的密码列表。 福尔摩斯发现&#xff0c;这些密码是被打乱后隐藏在先前那份资料中的。 请你编写一个程序&#xff0c;从第一份资料中搜…

打卡第36天:模型可视化以及推理

知识点回顾&#xff1a; 1.三种不同的模型可视化方法&#xff1a;推荐torchinfo打印summary权重分布可视化 2.进度条功能&#xff1a;手动和自动写法&#xff0c;让打印结果更加美观 3.推理的写法&#xff1a;评估模式 作业&#xff1a;调整模型定义时的超参数&#xff0c;对…

8天Python从入门到精通【itheima】-68(元组)

目录 65节——元组的定义和操作 1.学习目标 2.为什么要学习元组 3.元组的定义 4.定义元组的注意事项 5.元组的嵌套 6.元组的相关操作 【1】index方法 【2】count方法 【3】len方法 7.元组的遍历 【1】while循环进行元组的遍历 【2】for循环进行元组的变量 Python …

链表题解——环形链表【LeetCode】

141. 环形链表 方法一 核心思想&#xff1a; 使用一个集合 seen 来记录已经访问过的节点。遍历链表&#xff0c;如果当前节点已经存在于集合中&#xff0c;说明链表存在环&#xff1b;否则&#xff0c;将当前节点添加到集合中&#xff0c;继续遍历。如果遍历结束&#xff08;h…

【免费数据】1980-2022年中国2384个站点的水质数据

水&#xff0c;是生命之源&#xff0c;关乎着地球上每一个生物的生存与发展。健康的水生生态系统维持着整个水生态的平衡与活力&#xff1b;更是确保人类能持续获得清洁水源的重要保障。水质数据在水质研究、海洋生物量测算以及生物多样性评估等诸多关键领域都扮演着举足轻重的…

分享推荐高精度磁阻式磁编码器芯片

磁编码器其通过感应旋转磁场来实现角度、转速的测量&#xff0c;因此&#xff0c;相较于传统的光编码器&#xff0c;磁编码器对粉尘、污垢和油脂等污染物有很强的耐受性&#xff0c;即使在较为恶劣的环境中仍能够保持高分辨率与检测精度&#xff0c;安装和维护简捷方便&#xf…