RabbitMQ 高级特性之延迟队列

1. 简介

在某些场景下,当生产者发送消息后,可能不需要让消费者立即接收到,而是让消息延迟一段时间后再发送给消费者。

2. 实现方式

2.1 TTL + 死信队列

给消息设置过期时间后,若消息在这段时间内没有被消费,就会将消息发送到死信队列中,我们可以利用这一特性,将需要延迟发送的消息设置过期时间,然后再让消费者从死信队列中获取消息,这样就实现了消息的延迟发送。

队列与交换机配置如下:

@Configuration
public class DLConfig {/*** 正常队列、交换机* @return*/@Bean("norQueue")public Queue norQueue() {return QueueBuilder.durable(Constants.NOR_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE) //绑定死信交换机.deadLetterRoutingKey(Constants.DL_ROUTINGKEY).build();}@Bean("norExchange")public DirectExchange norExchange() {return ExchangeBuilder.directExchange(Constants.NOR_EXCHANGE).build();}@Bean("norBind")public Binding norBind(@Qualifier("norExchange") DirectExchange directExchange,@Qualifier("norQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.NOR_ROUTINGKEY);}/*** 死信队列、交换机*/@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange() {return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBind")public Binding dlBind(@Qualifier("dlExchange") DirectExchange directExchange,@Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DL_ROUTINGKEY);}
}

生产者代码如下:

    @RequestMapping("/dl1")public String dl1() {String messageInfo = "dl... " + new Date();MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000"); //10s 后过期return message;}};rabbitTemplate.convertAndSend(Constants.NOR_EXCHANGE, Constants.NOR_ROUTINGKEY, messageInfo, messagePostProcessor);return "消息发送成功";}

消费者代码如下:

@Component
@Slf4j
public class DLListener {/*** ttl + 死信队列 -> 延时队列* @param message*/@RabbitListener(queues = Constants.DL_QUEUE)public void listener(Message message) {String messageInfo = new String(message.getBody());log.info("接收到消息: {}, time: {}", messageInfo, new Date());}
}

由于消息发送到了死信队列,于是我们只需要从死信队列中获取消息即可。

代码运行结果如下:

从运行结果中可以看出,消息延迟了 10s 才被消费。

这种实现方式的问题:

但是,当我们连续发送两条消息,第一条消息的过期时间为 15s,第二条消息的过期时间为 10s,代码运行结果如下:

这里我们看到,虽然第二条消息先过期,但却和第一条消息一起被消费,按照正常情况下第二条消息应该率先被消费,于是这种实现方式存在一定的问题。

2.2 使用插件 

2.2.1 安装插件

虽然 RabbitMQ 没有提供延迟队列的使用方式,但是提供了延迟队列的插件,我们可以安装插件并使用。

插件安装地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

需要下载 .ez 的插件。

需要根据本机的 RabbitMQ 版本选择匹配的插件版本,不然无法使用。

插件下载完成后,需要将插件放到 /usr/lib/rabbitmq/plugins 目录下,若没有需要进行创建。

安装完成后,使用下面这行命令启动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动完成后,需要重启 RabbitMQ 服务,这样插件就能正常运行。

2.2.2 使用插件

插件安装完成后,交换机的类型就会多出下面一种:

即延迟队列,于是我们在声明交换机是,就能够声明这个类型的交换机。

队列与交换机配置如下:

@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}/*** 延迟交换机* @return*/@Bean("delayExchange")public DirectExchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed().build();}@Bean("delayBind")public Binding delayBind(@Qualifier("delayExchange") DirectExchange directExchange,@Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.DELAY_ROUTINGKEY);}
}

 在声明交换机时,使用了 delayed 来声明该队列是延迟队列。

生产者代码如下:

    @RequestMapping("/delay")public String delay() {String messageInfo = "delay ...";rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "25000ms", message -> {message.getMessageProperties().setDelayLong(20000L); //过期时间,单位为 msreturn message;});rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_ROUTINGKEY, messageInfo + "10000ms", message -> {message.getMessageProperties().setDelayLong(10000L); //过期时间,单位为 msreturn message;});return "消息发送成功";}

消费者代码如下:

@Component
@Slf4j
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void listener(Message message) {log.info("接收到消息: {}, time: {}", new String(message.getBody()), new Date());}
}

运行结果如下:

从结果中可以看出,虽然第二条消息的过期时间是后入队列的,但是却会先被消费,这就解决了 TTL + 死信队列实现方式的不足。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/88796.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/88796.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp app安卓下载文件 图片 doc xls 数据流文件 app安卓本地路径下载保存

//下载图片 downloadToLocal() {plus.android.requestPermissions([android.permission.WRITE_EXTERNAL_STORAGE],(success) > {uni.saveImageToPhotosAlbum({filePath: /static/x.png,//本地地址success: () > {this.$refs.uToast.show({message: "模版下载成功&am…

Context Engineering:从Prompt Engineering到上下文工程的演进

最近在做Deepresearch以及刷到一个不错的文章:context-engineering-guide ,这篇文章揭示了提示工程以及上下文过程在智能体应用开源流程中,包括Deepresearch,MCP在内的一些概念,起到了非常重要的作用! Cont…

jenkins部署vue前端项目

文章目录前言一、安装nginx二、jenkins构建项目总结前言 前面已经使用jenkins部署了后端springboot项目,现在开始学习jenkins部署前端Vue项目。 一、安装nginx 访问nginx官网,https://nginx.org/en/download.html下载tar包 上传到服务器目录中 然后到…

设计总监年中复盘:用Adobe XD内容识别布局,告别“手动调距”

时至年中,这不仅是检视上半年项目成果的节点,更是优化团队工作流、为下半年挑战储备动能的关键时期。在海外设计界工作的十余年间,我发现,一个高效的设计团队与一个疲于奔命的团队之间,最大的差别往往就在于是否建立了…

Unity 在Rider中通过Lingma插件使用MCP

环境: Unity 2022.3.12f1 JetBrains Rider 2025.1.4 Lingma 2.5.14 Python 3.13.4 下载包 首先在unity package manager 加入unity-mcp包 https://github.com/justinpbarnett/unity-mcp.git 然后下载uv包(要先先下载python),网上很多…

pycharm+SSH 深度学习项目 远程后台运行命令

pycharmSSH 深度学习项目 远程后台运行命令碎碎念,都是实验室里那说关机就关机,说重启就重启的台式机逼得。。学吧记录 运行:nohup /root/miniconda3/bin/python -u "run.py" > /root/log/nohup.log 2>&1 &实时查看日…

【Linux | 网络】应用层(HTTP)

目录一、认识URL二、urlencode和urldecode三、HTTP协议格式(使用Fiddler抓包)3.1 安装并使用Fiddler抓包3.2 HTTP协议格式3.2.1 HTTP请求3.2.1.1 资源URL路径3.2.1.2 请求方法(Method)3.2.1.3 Location头字段(重定向相…

编程实践:单例模式(懒汉模式+饿汉模式)

说明:本专栏文章有两种解锁方案 1:付费订阅,畅享所有文章 2:免费获取,点击下方链接,关注,自动获取免费链接 https://free-img.400040.xyz/4/2025/04/29/6810a50b7ac8b.jpg 主题:C++ 单例模式 什么是单例模式

破局电机制造四大痛点:MES与AI视觉的协同智造实践

万界星空科技电机行业MES系统解决方案是针对电机制造过程中多工序协同难、质量追溯复杂、设备管理要求高等痛点设计的数字化管理系统。一、电机行业的核心痛点1. 多工序协同困难 电机制造涉及绕线、装配、测试等多道工序,工艺衔接复杂,传统人工调度效率…

HTML 初体验

HTML(超文本标记语言)全称:HyperText Markup Language。超文本是什么?答:超文本就是网页中的链接。标记是什么?答:标记也叫标签,是带尖括号的文本。需求1:将“我爱中国”…

网络层TCP机制

1.确认应答机制由于发送信息的距离可能较远,可能出现后发的信息先到的情况,怎么办?TCP将每个字节的数据都进行了编号,即为序列号如何分辨一个数据包是普通数据还是应答数据呢2.超时重传由于丢包是一个随机的事件,因此在上述tcp传输的过程中,丢包就存在两种情况但是在发送方的角…

【一起来学AI大模型】微调技术:LoRA(Low-Rank Adaptation) 的实战应用

LoRA(Low-Rank Adaptation) 的实战应用,使用 Hugging Face 的 peft (Parameter-Efficient Fine-Tuning) 库对大型语言模型进行高效微调。LoRA 因其显著降低资源消耗(显存和计算)同时保持接近全量微调性能的特点&#x…

RedisJSON 内存占用剖析与调优

一、基础内存模型指针包装 所有 JSON 值(标量、对象、数组、字符串等)至少占用 8 字节,用于存储一个带类型标记的指针。标量与空容器 null、true、false、小整数(静态缓存)、空字符串、空数组、空对象 均不分配额外内存…

【LeetCode 热题 100】23. 合并 K 个升序链表——(解法一)逐一合并

Problem: 23. 合并 K 个升序链表 题目:给你一个链表数组,每个链表都已经按升序排列。 请你将所有链表合并到一个升序链表中,返回合并后的链表。 文章目录整体思路完整代码时空复杂度时间复杂度:O(K * N)空间复杂度:O(1…

垃圾收集器-Serial Old

第一章 引言1.1 JVM 中垃圾收集的简要概述JVM(Java Virtual Machine)作为 Java 程序的运行时环境,负责将字节码加载至内存并执行,同时也承担着内存管理的重任。垃圾收集(Garbage Collection,简称 GC&#x…

Docker(02) Docker-Compose、Dockerfile镜像构建、Portainer

Docker-Compose 1、Docker Desktop 在Windows上安装Docker服务,可以使用Docker Desktop这个应用程序。 下载并安装这样的一个安装包 安装好后:执行命令 docker --version 从Docker Hub提取hello-world映像并运行一个容器: docker run h…

大数据时代UI前端的用户体验设计新思维:以数据为驱动的情感化设计

hello宝子们...我们是艾斯视觉擅长ui设计和前端数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩!一、引言:从 “经验设计” 到 “数据共情” 的体验革命传统 UI 设计常陷入 “设计师主观经…

TypeScript 学习手册

1.TypeScript 概念 TypeScript(简称 TS,静态类型)是微软公司开发的一种基于 JavaScript (简称 JS,动态类型)语言的编程语言。TypeScript 可以看成是 JavaScript 的超集(superset)&a…

掌握现代CSS:变量、变形函数与动态计算

CSS近年来发展迅速,引入了许多强大的功能,如变量、高级变形函数和动态计算能力。本文将深入探讨如何在CSS中设置并使用变量,以及如何有效利用translate3d、translateY和translateX等变形方法。我们还将解析var()和calc()函数的关键作用。一、…

贝尔量子实验设想漏洞

1 0 1 0 1 1 0 1 0 1 1 1 0 0 1 0 带墨镜如果先上下交换再左右交换,很可能不一样的概率是2%,但是因为交换诞生了一个与之前序列相同的所以不一样概率变成1%,我们在测的时候不能这么测啊,你得看序列完…