延迟 队列

概念

延迟队列顾名思义就是消息不立即发送给消费者消费,而是延迟一段时间再交给消费者。

RabbitMQ本身没有直接支持延迟队列的的功能,但是可以通过前面所介绍的TTL+死信队列的方式组合
模拟出延迟队列的功能.

RabbitMQ 有些版本还支持延迟队列的插件安装,我们也可以通过安装这个插件实现延迟队列的功能。

TTL + 死信队列

实现思路:

假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过normal_exchange这个交换器将发送的消息存储在normal_queue这个队列中.消费者订阅的并非是normal_queue这个队列,而是dlx_queue这个队列.当消息从normal_queue这个队列中过期之后被存入dlx_queue这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。

在这里插入图片描述

代码演示:

常量设置:

    //死信队列public static final String DL_QUEUE = "DL_QUEUE";public static final String DL_EXCHANGE = "DL_EXCHANGE";public static final String DL_KEY = "DL_KEY";//普通队列public static final String NORMAL_QUEUE = "NORMAL_QUEUE";public static final String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static final String NORMAL_KEY = "NORMAL_KEY";

声明队列、交换机、绑定关系:

    //普通队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(MQConstants.NORMAL_QUEUE).deadLetterExchange(MQConstants.DL_EXCHANGE).deadLetterRoutingKey(MQConstants.DL_KEY).build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(MQConstants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.NORMAL_KEY).noargs();}//死信队列@Bean("dlQueue")public Queue dlQueue() {return QueueBuilder.durable(MQConstants.DL_QUEUE).build();}@Bean("dlExchange")public Exchange dlExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlExchange") Exchange exchange, @Qualifier("dlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DL_KEY).noargs();}

生产者:将消息过期时间设置为 10 s

    @RequestMapping("/dl")public String dl() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "dl" + i, message -> {message.getMessageProperties().setExpiration("10000");return message;});}return "消息发送成功";}

消费者需要消费的队列是死信队列:

@Component
@RabbitListener(queues = MQConstants.DL_QUEUE)
public class DLListener {@RabbitHandlerpublic void handle(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {channel.basicAck(deliveryTag, false);System.out.println("消息成功消费:" + messageContent);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

存在的问题

当我们先发送一条延迟时间长的消息,然后再发送一条延迟时间短的消息,我们会发现,短的消息并没有被即使消费,而是等到长的消息时间一到,才被消费了

    @RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "30s ",message -> {message.getMessageProperties().setExpiration("30000");return message;});rabbitTemplate.convertAndSend(MQConstants.NORMAL_EXCHANGE, MQConstants.NORMAL_KEY, "10s ",message -> {message.getMessageProperties().setExpiration("10000");return message;});return "消息发送成功";}

在这里插入图片描述


原因如下:
消息过期之后,不一定会被马上丢弃因为RabbitMQ只会检查队首消息是否过期,如果过期则丢到死信队列,此时就会造成一个问题,如果第一个消息的延时时间很长,第二个消息的延时时间很短,那第二个
消息并不会优先得到执行。

所以在考虑使用TTL+死信队列实现延迟任务队列的时候,需要确认业务上每个任务的延迟时间是一致的,如果遇到不同的任务类型需要不同的延迟的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

延迟队列的插件

安装

官方文档:Scheduling Messages with RabbitMQ

下载链接:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

下载的插件需要存放到哪个目录:https://www.rabbitmq.com/docs/installing-plugins

根据你不同的环境去选择不同的目录:
在这里插入图片描述

Linux命令:

#查看插件列表
rabbitmq-plugins list#启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange#重启服务
service rabbitmq-server restart

我们去到 rabbitmq 管理界面查看 exchange 有没有延迟类型 “x-delayed-messge” ,如果存在这一类型说明我们的插件安装成功了

在这里插入图片描述

代码演示

常量类:

    //延迟队列public static final String DELAY_QUEUE = "DELAY_QUEUE";public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";public static final String DELAY_KEY = "DELAY_KEY";

声明:

    //延迟队列@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(MQConstants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(MQConstants.DL_EXCHANGE).durable(true).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.DELAY_KEY).noargs();}

生产者:这里我们发送三条不同过期时间的消息来进行演示:
通过setDelayLong() 方法设置延迟时间

    @RequestMapping("/delay")public String delay() {rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "30s ",message -> {message.getMessageProperties().setDelayLong(30000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "10s ",message -> {message.getMessageProperties().setDelayLong(10000L);return message;});rabbitTemplate.convertAndSend(MQConstants.DELAY_EXCHANGE, MQConstants.DELAY_KEY, "40s ", message -> {message.getMessageProperties().setDelayLong(40000L);return message;});return "消息发送成功";}

这里我们将确认模式设置为自动模式,不进行手动确认,便于我们书写代码:

@Component
@RabbitListener(queues = MQConstants.DELAY_QUEUE)
public class DelayListener {@RabbitHandlerpublic void handle(String message) {System.out.printf("%tc 接收到的消息为:%s\n", new Date(), message);}
}

最终效果:
在这里插入图片描述

总结

1.基于死信实现的延迟队列
a优点:1)灵活不需要额外的插件支持
b.缺点: 1) 存在消息顺序问题 2)需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性
2.基于插件实现的延迟队列

a.优点:1)通过插件可以直接创建延迟队列,简化延迟消息的实现. 2)避免了DLX的时序问题
b.缺点:1)需要依赖特定的插件,有运维工作2)只适用特定版本

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/98365.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/98365.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows+Docker一键部署CozeStudio私有化,保姆级

在 ​Windows环境​ 下&#xff0c;通过docker&#xff0c;使用 ​火山引擎Doubao-Seed-1.6模型&#xff0c;面向 ​小白新手​ 的 ​Coze Studio私有化部署详细步骤。整个过程分为四大阶段&#xff0c;包含每一步的指令、成功标志。 Coze Studio 私有化部署指南&#xff08;W…

【HEMCO Reference Guide 参考指南第二期】配置文件的结构和语法

配置文件的结构和语法 HEMCO 配置文件的结构和语法(The HEMCO configuration file) 1. Settings(设置) 2. Extension Switches(扩展模块开关) 3. Base Emissions(基础排放配置) 4. Scale Factors(缩放因子) 5. Masks(掩膜区域) 6. Data Collections(数据集合) 参…

01.单例模式基类模块

一、单例模式的构成1、私有的静态成员变量2、公共的静态成员属性或方法3、私有构造函数using System.Collections; using System.Collections.Generic; using UnityEngine;public class BaseManager : MonoBehaviour {void Start(){}// Update is called once per framevoid Up…

[网络入侵AI检测] 深度前馈神经网络(DNN)模型

第4章&#xff1a;深度前馈神经网络&#xff08;DNN&#xff09;模型 欢迎回来&#x1f43b;‍❄️ 在第1章&#xff1a;分类任务配置&#xff08;二分类 vs. 多分类&#xff09;中&#xff0c;我们学习了如何配置模型以回答不同类型的问题&#xff1b;在第2章&#xff1a;数…

【目录-多选】鸿蒙HarmonyOS开发者基础

All look at the answer 针对包含文本元素的组件&#xff0c;例如Text、Button、TextInput等&#xff0c;可以使用下列哪些属性关于ForEach(arr, itemGenerator, index)组件的描述正确的是下面哪些容器组件是可以滚动的关于Tabs组件和TabContent组件&#xff0c;下列描述正确的…

第一讲 Vscode+Python+anaconda 安装

1、vscode下载和安装官网下载最新版&#xff1a;https://code.visualstudio.com/Download注&#xff1a;文件夹最好不要出现中文和空格 2、将vscode修改为中文环境注意&#xff1a;右下角弹出提示框&#xff0c;点击“yes”若不慎关闭了对话框&#xff0c;也不要紧&#xff0c;…

《sklearn机器学习——回归指标2》

均方对数误差&#xff08;mean_squared_log_error函数&#xff09; mean_squared_log_error函数计算与平方&#xff08;二次方&#xff09;对数误差或损失的期望值相一致的风险指标。 Mean Squared Logarithmic Error 参数与返回值 函数简介 mean_squared_log_error 是用于计算…

当电力设计遇上AI:良策金宝AI如何重构行业效率边界?

在工程设计行业&#xff0c;我们常说“经验为王”。一个资深工程师的价值&#xff0c;往往体现在他对规范的熟悉、对计算的把握、对图纸的掌控。但今天&#xff0c;这个“王座”正在被重新定义。不是经验不重要了&#xff0c;而是——效率的边界&#xff0c;正在被AI重构。以良…

【深度学习】重采样(Resampling)

在深度学习的背景下&#xff0c;重采样主要涉及两个方面&#xff1a; 数据层面的重采样&#xff1a;处理不平衡数据集。模型层面的重采样&#xff1a;在神经网络内部进行上采样&#xff08;UpSampling&#xff09;或下采样&#xff08;DownSampling&#xff09;&#xff0c;常见…

计算机实现乘法运算的方式---ChatGPT 5 thinking作答

计算机如何实现“乘法” 下面分层次把乘法在数据表示 → 整数硬件/软件 → 大整数 → 浮点数 → 特殊场景里的主流实现方式讲清楚&#xff0c;并给出取舍建议与简单伪代码。0&#xff09;前置&#xff1a;数的表示 无符号整数&#xff1a;按二进制位权求值。有符号整数&#xf…

Ubuntu 安装 / 配置 VNC

一、基础环境准备 1. 更新 sudo apt update 2. 安装 VNC 服务器 & 轻量桌面(XFCE) # 安装 TightVNC 服务器 + XFCE 桌面(推荐轻量方案) sudo apt install tightvncserver xfce4 xfce4-goodies xterm -y二、核心配置:让 VNC 加载桌面环境 1. 初始化 VNC 密码(首次…

计算机大数据毕业设计推荐:基于Spark的新能源汽车保有量可视化分析系统

精彩专栏推荐订阅&#xff1a;在下方主页&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f496;&#x1f525;作者主页&#xff1a;计算机毕设木哥&#x1f525; &#x1f496; 文章目录 一、项目介绍二、…

Android Looper源码阅读

看下Android Looper源代码&#xff0c;有助于理解Android系统消息循环流程、handler机制。Looper注释为class used to run a message loop for a thread&#xff0c; 即用于为一个线程运行消息循环&#xff0c; 或者说循环处理一个线程的消息。 Looper源码先看下这个类里的变量…

uni-app 和 uni-app x 的区别

差异解析 uni-app 是 DCloud 推出的成熟跨平台前端框架&#xff0c;基于 Vue.js JavaScript/TypeScript。支持广泛平台&#xff1a;iOS、Android、HarmonyOS、Web、小程序等&#xff0c;用一套代码同时生成多个端应用。渲染方式主要通过 WebView 或小程序原生框架 JS 逻辑&am…

数据结构:深度优先搜索 (Depth-First Search, DFS)

目录 DFS的诞生——“不撞南墙不回头” DFS的核心机制——如何实现“回溯”&#xff1f; DFS算法流程图解&#xff08;递归版&#xff09; C/C代码实现 DFS的应用 上一节我们学习了广度优先搜索 (BFS)&#xff0c;它像水面的波纹一样&#xff0c;一层一层地向外探索。今天…

Spring Boot中策略模式结合依赖注入的实现方式

在Spring Boot项目开发中&#xff0c;常常会遇到根据不同的业务场景执行不同逻辑的需求&#xff0c;策略模式就是一种很好的设计模式来应对这种情况。同时&#xff0c;Spring Boot强大的依赖注入机制可以方便地将不同的策略类进行管理和调用。 1. 定义策略接口 定义一个策略接口…

深入剖析Spring Boot中Spring MVC的请求处理流程

对于任何使用Spring Boot进行Web开发的开发者而言&#xff0c;深入理解Spring MVC的执行流程都是至关重要的。这不仅有助于我们编写更清晰、更高效的代码&#xff0c;更是我们排查诡异问题、进行高级定制开发的知识基石。今天&#xff0c;我们将一起深入Spring Boot应用的内核&…

X448 算法签名验签流程深度解析及代码示例

一、引言&#xff1a;X448 算法的定位与价值在椭圆曲线密码学&#xff08;ECC&#xff09;体系中&#xff0c;X448 是基于蒙哥马利曲线&#xff08;Curve448&#xff09;的密钥交换算法&#xff0c;但其底层数学原理也可支撑签名验签功能&#xff08;实际工程中常与 Ed448 签名…

2025-2026单片机物联网毕业设计题目推荐(定稿付款)

51.基于单片机的非接触式防疫自动门系&#xff08;1&#xff09;人员检测&#xff1a;利用超声波模块进行人员检测&#xff0c;检测到人员靠近门体时触发相应的操作&#xff1b;&#xff08;2&#xff09;门控制&#xff1a;通过舵机实现自动门的开闭控制&#xff0c;当检测到有…

一文详解大模型强化学习(RLHF)算法:PPO、DPO、GRPO、ORPO、KTO、GSPO

一、 引言 大模型强化学习的核心目标是让模型的输出与人类目标、真实场景需求对齐。在工作和学习中&#xff0c;大模型强化学习训练经常会遇到各种算法&#xff0c;各种O&#xff0c;在强化学习训练选型过程中经常容易混淆&#xff0c;也分不清各种训练算法的使用场景和优缺点。…