RabbitMQ--延时队列总结

一、延迟队列概念

        延迟队列(Delay Queue)是一种特殊类型的队列,队列中的元素需要在指定的时间点被取出和处理。简单来说,延时队列就是存放需要在某个特定时间被处理的消息。它的核心特性在于“延迟”——消息在队列中停留一段时间,直到满足设定的延迟时间才会被处理。

关键特性:
  • 延时队列中的消息会在指定时间点才被消费。

  • 适用于时间敏感的任务调度,如订单过期、任务超时等。

二、延迟队列的使用场景

延迟队列适用于以下场景:

  1. 订单支付超时自动取消

    • 例如,订单生成后 10 分钟未支付,自动取消订单。

  2. 店铺商品上传提醒

    • 新店铺如果 10 天内没有上传商品,系统自动发送提醒消息。

  3. 用户未登录短信提醒

    • 用户注册后,若 3 天内没有登录,发送短信提醒用户登录。

  4. 退款超时提醒

    • 用户发起退款请求后,如果 3 天内未处理,自动通知运营人员。

  5. 会议提醒

    • 预定会议后,提前 10 分钟通知与会人员。

这些场景的特点是:在某个事件发生后,或者在某个时间点之前,需要完成某项任务。比如,在订单生成事件发生 10 分钟后,检查订单支付状态,未支付则关闭订单。

为什么不使用定时任务?
  • 对于小规模的数据量,可以使用定时任务每秒轮询一次进行处理。

  • 但是当数据量非常庞大(如百万级别的订单检查)时,轮询的方式会给数据库和系统带来巨大压力,无法满足高效处理的需求。

  • 延时队列通过精准的延迟时间控制和异步处理,能够高效地解决这个问题。

三、 如何在 RabbitMQ 中实现延时队列?

我们有两种常用的方式来实现延时队列:

  1. 通过 TTL(消息过期时间或队列过期时间)和死信队列实现:我们可以给队列里的消息设置一个有效期(TTL),一旦消息过期,它就会被路由到一个死信队列,再由死信队列进行消费。

  2. 使用 x-delayed-message 插件:这个插件是官方提供的,它允许我们给消息指定一个延迟时间,在这个时间到期之前,消息不会被消费者消费。

四、通过 TTL(消息过期时间或队列过期时间)和死信队列实现样例

在讨论这个问题前先来了解几个知识点

知识点①:RabbitMQ 中的 TTL(Time-to-Live)

        TTL 是 RabbitMQ 中用来控制消息或队列存活时间的属性。TTL 的单位是毫秒,表示一条消息或队列中的消息在指定时间内没有被消费时,消息会过期,成为死信。

(1) TTL 的两种设置方式
  • 消息 TTL:可以在发送每条消息时指定 TTL。

    例如,发送消息时设置 TTL 为 10 秒:

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息延迟时间为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
    
  • 队列 TTL:在创建队列时设置该队列内所有消息的 TTL。队列的 TTL 会影响队列中所有消息的过期时间。

    例如,在队列声明时设置 x-message-ttl

    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 10000);  // 设置消息的 TTL 为 10 秒
    channel.queueDeclare("queue", true, false, false, args);
    
(2) TTL 的行为
  • 队列 TTL:如果设置了队列 TTL,则队列内所有消息的 TTL 会被统一管理。如果消息超时,它会被丢弃或者路由到死信队列。

  • 消息 TTL:如果设置了消息 TTL,那么每条消息的 TTL 都会单独管理。如果消息未能在 TTL 时间内消费,则会成为“死信”。

知识点②: 死信队列(Dead Letter Queue)

 这里死信队列以及TTL的讲解笔者可以去查看这篇博客:Rabbitmq中的死信队列-CSDN博客

当消息过期或被拒绝时,消息会被发送到死信队列。死信队列用于接收那些已经过期的消息或被拒绝的消息,这样消费者可以集中处理这些需要处理的消息。

(1) 如何利用死信队列实现延时队列?
  1. 设置队列的 TTL,使消息在到期后成为死信。

  2. 配置死信队列,使过期的消息进入死信队列。

  3. 消费者从死信队列消费,定期消费这些过期的消息。

方式一:RabbitMQ 延时队列的实现方式(给消息设置TTL和死信队列)

(1)配置文件类代码

@Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE = "Y";public static final String QUEUE_C = "QC";//声明队列 C 死信交换机@Bean("queueC")public Queue queueB(){Map<String, Object> args = new HashMap<>(3);//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机@Beanpublic Binding queuecBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}

(2)消息生产者代码

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
}

(3) 发送请求

  • http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
  • http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000       

发送一个 HTTP 请求,参数中包括消息内容和 TTL(过期时间)。

        http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000 表示发送消息"你好 2"并设置该消息的 TTL 为 2000 毫秒(即 2 秒)。2 秒内没有被消费者消费,该消息就会被 RabbitMQ 丢弃。

(4) 给消息设置TTL和死信队列的问题

        你当前的设计是为每条消息单独设置 TTL(通过         correlationData.getMessageProperties().setExpiration(ttlTime)),而不是为队列本身设置 TTL。这样做的目的是希望每条消息有不同的过期时间,从而实现不同的延时处理。

设计中可能存在的问题

        消息 TTL 是通过设置每条消息的 expiration 属性来控制每条消息的过期时间。每条消息可以有不同的 TTL,这样可以灵活地指定不同的消息延迟时间。    

    问题出在 RabbitMQ 的消息消费机制 上:RabbitMQ 是按照队列中的消息顺序来消费消息的,且它只会检查队列里的消息是否过期,而不是单独检查每条消息的 TTL。

  • 消费顺序问题

    假设队列中有两条消息:

    1. 第一条消息的 TTL 设置为 20 秒。

    2. 第二条消息的 TTL 设置为 2 秒。

                在这种情况下,RabbitMQ 会 按顺序检查队列中的消息,也就是说,它首先会检查第一条消息(TTL 20 秒),即使第二条消息的 TTL 很短(只有 2 秒)。如果第一条消息还没有过期,RabbitMQ 会先检查它,然后再检查第二条消息。结果就是,第二条消息可能会被延迟,即使它的 TTL 已经过期。

              也就是说即使第二条消息的 TTL 设置为 2 秒,然后此时第二条消息已经过期,它也会等待第一条消息被消费(进入死信队列后)后才会检查。这意味着 第二条消息在第一条消息未过期的情况下不会立刻进入死信队列,而是会等到第一条消息被消费,才会去检查是否过期。所以会被延迟

        

方式二:RabbitMQ 延时队列的实现方式(给队列设置TTL和死信队列)

RabbitMQ 的延时队列可以通过 TTL 配合死信队列实现,具体步骤如下:

(1) 设置队列的 TTL

在创建队列时,我们设置队列的 x-message-ttl 属性,控制消息的生存时间。例如:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000);  // 设置队列消息的 TTL 为 10 秒
args.put("x-dead-letter-exchange", "dlx_exchange");  // 设置死信交换机
args.put("x-dead-letter-routing-key", "dlx_routing_key");  // 设置死信路由键
channel.queueDeclare("ttl_queue", true, false, false, args);
(2) 配置死信队列

设置死信交换机和死信路由键,当消息 TTL 到期后,它会进入死信队列。

channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing_key");
(3) 发送消息时设置 TTL

发送消息时,可以给消息设置 expiration 属性来控制消息的延迟时间。例如,10 秒后该消息将变为死信:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息的 TTL 为 10 秒.build();channel.basicPublish("exchange", "routing_key", properties, "message".getBytes());
(4) 消费死信队列

消费者从死信队列中获取消息进行处理:

channel.basicConsume("dlx_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received expired message: " + message);
}, consumerTag -> {});
(5)总结
  • 延迟队列 通过让消息在指定时间后再被消费,解决了定时任务和轮询检查的性能问题。

  • TTL死信队列 是实现 RabbitMQ 延时队列的关键技术,通过控制消息的存活时间和让过期消息进入死信队列,消费者可以按需处理这些消息。

  • 适用场景包括:订单超时、任务调度、消息提醒等。

  • 延时队列的核心需求是让消息在指定时间后被处理,而 RabbitMQ 中的 TTL(过期时间)正好能实现这一点。当消息的 TTL 到期后,它会变成死信并被投递到死信队列。这样,消费者只需要持续从死信队列消费消息即可,因为队列中的消息都是等待被及时处理的。这种方式实现了高效的延时处理,同时避免了轮询和重复检查。

(6) 给队列设置TTL和死信队列的问题
        如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10秒1个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,需要要增加无数个队列才能满足需求

五、使用 x-delayed-message 插件实现延时队列实现样例

5.1 安装插件

要启用延时队列,首先要安装 rabbitmq-delayed-message-exchange 插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
5.2 创建延时交换机

创建一个交换机时,指定它是一个 x-delayed-message 类型的交换机。通过这个交换机来处理延时消息:

Channel channel = connection.createChannel();
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");  // 设定延时交换机的类型,通常是 direct 类型channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
5.3 发送延时消息

发送消息时,我们需要指定延迟的时间。这个时间通过设置消息的 expiration 属性来实现,单位是毫秒:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("10000")  // 设置消息延迟时间为 10 秒.build();channel.basicPublish("delayed_exchange", "routing_key", properties, "Hello, delayed message".getBytes());
5.4 消费消息

消费者与普通消息的消费方式一样,消息会在延迟时间到期后被消费:

channel.basicConsume("delayed_queue", true, (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received message: " + message);
}, consumerTag -> {});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/95779.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/95779.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 提取 PDF 文件内容:告别手动复制粘贴,拥抱自动化解析!

在日常工作中&#xff0c;我们经常需要处理大量的 PDF 文档&#xff0c;无论是提取报告中的关键数据&#xff0c;还是解析合同中的重要条款&#xff0c;手动复制粘贴不仅效率低下&#xff0c;还极易出错。当面对海量的 PDF 文件时&#xff0c;这种传统方式更是让人望而却步。那…

关键字 const

Flutter 是一个使用 Dart 语言构建的 UI 工具包&#xff0c;因此它完全遵循 Dart 的语法和规则。Dart 中的 const 是语言层面的特性&#xff0c;而 Flutter 因其声明式 UI 和频繁重建的特性&#xff0c;将 const 的效能发挥到了极致。Dart 中的 const&#xff08;语言层面&…

Ubuntu22.04中使用cmake安装abseil-cpp库

Ubuntu22.04中使用cmake安装abseil-cpp库 关于Abseil库 Abseil 由 Google 的基础 C 和 Python 代码库组成&#xff0c;包括一些正支撑着如 gRPC、Protobuf 和 TensorFlow 等开源项目并一起 “成长” 的库。目前已开源 C 部分&#xff0c;Python 部分将在后续开放。 Abseil …

FreeRTOS项目(序)目录

这章是整个专栏的目录&#xff0c;负责记录这个小项目的开发日志和目录。附带总流程图。 目录 项目简介 专栏目录 开发日志 总流程图 项目简介 本项目基于STM32C8T6核心板和FreeRTOS&#xff0c;实现一些简单的功能。以下为目前已实现的功能。 &#xff08;1&#xff09…

Python 多任务编程:进程、线程与协程全面解析

目录 一、多任务基础&#xff1a;并发与并行 1. 什么是多任务 2. 两种表现形式 二、进程&#xff1a;操作系统资源分配的最小单位 1. 进程的概念 2. 多进程实现多任务 2.1 基础示例&#xff1a;边听音乐边敲代码 2.2 带参数的进程任务 2.3 进程编号与应用注意点 2.3.…

ADSL技术

<摘要> ADSL&#xff08;非对称数字用户线路&#xff09;是一种利用传统电话线实现宽带上网的技术。其核心原理是频率分割&#xff1a;将一根电话线的频带划分为语音、上行数据&#xff08;慢&#xff09;和下行数据&#xff08;快&#xff09;三个独立频道&#xff0c;从…

信号衰减中的分贝到底是怎么回事

问题&#xff1a;在一个低通滤波中&#xff0c;经常会看到一个值-3dB&#xff08;-3分贝&#xff09;&#xff0c;到底是个什么含义&#xff1f; 今天我就来粗浅的讲解这个问题。 在低通滤波器中&#xff0c;我们说的 “截止频率”&#xff08;或叫 - 3dB 点&#xff09;&…

工具分享--IP与域名提取工具2.0

基于原版的基础上新增了一个功能点:IP-A段过滤&#xff0c;可以快速把内网192、170、10或者其它你想要过滤掉的IP-A段轻松去掉&#xff0c;提高你的干活效率&#xff01;&#xff01;&#xff01; 界面样式预览&#xff1a;<!DOCTYPE html> <html lang"zh-CN&quo…

如何通过日志先行原则保障数据持久化:Redis AOF 和 MySQL redo log 的对比

在分布式系统或数据库管理系统中&#xff0c;日志先行原则&#xff08;Write-Ahead Logging&#xff0c;WAL&#xff09; 是确保数据一致性、持久性和恢复能力的重要机制。通过 WAL&#xff0c;系统能够在发生故障时恢复数据&#xff0c;保证数据的可靠性。在这篇博客中&#x…

临床研究三千问——临床研究体系的3个维度(8)

在上周的文章中&#xff0c;我们共同探讨了1345-10战策的“临床研究的起点——如何提出一个犀利的临床与科学问题”。问题固然是灵魂&#xff0c;但若没有坚实的骨架与血肉&#xff0c;灵魂便无所依归。今天&#xff0c;我们将深入“1345-10战策”中的“3”&#xff0c;即支撑起…

AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年9月7日第172弹

从今天开始&#xff0c;咱们还是暂时基于旧的模型进行预测&#xff0c;好了&#xff0c;废话不多说&#xff0c;按照老办法&#xff0c;重点8-9码定位&#xff0c;配合三胆下1或下2&#xff0c;杀1-2个和尾&#xff0c;再杀4-5个和值&#xff0c;可以做到100-300注左右。(1)定位…

万字详解网络编程之socket

一&#xff0c;socket简介1.什么是socketsocket通常也称作"套接字"&#xff0c;⽤于描述IP地址和端⼝&#xff0c;是⼀个通信链的句柄&#xff0c;应用程序通常通过"套接字"向⽹络发出请求或者应答⽹络请求。⽹络通信就是两个进程间的通信&#xff0c;这两…

维度跃迁:当万物皆成电路,智能将从“拥有”变为“存在”

我们习以为常的电子世界&#xff0c;其本质是一个由电路构成的精密宇宙。而一场从二维到三维的终极变革&#xff0c;正在悄然酝酿&#xff0c;它将彻底颠覆我们创造和交互的方式。一、电子世界的本质&#xff1a;一切都是电路 在深入未来之前&#xff0c;我们首先要理解当下。电…

大语言模型预训练数据采集与清洗技术实践:从语料到知识库的全流程优化

大语言模型(LLM)的性能上限由 “数据质量 数据规模 数据多样性” 共同决定 —— 预训练阶段的海量语料决定模型的泛化能力与语言理解基础,而知识库数据则决定模型的知识准确性与领域专业性。当前 LLM 落地面临的核心痛点之一,便是 “数据脏、处理难、知识杂”:预训练语料…

模拟音频采集设备的制作

模拟音频程序与设备的制作 需要设备 esp32s3 pcm1808 pcm5102(非必须) 程序界面 程序代码 代码链接

Java Modbus通信实战(四):Modbus通信测试与故障排查

在工业现场&#xff0c;设备通信系统就像工厂的神经网络&#xff0c;连接着各种传感器、控制器和执行器。当你搭建好这套系统后&#xff0c;最关键的一步就是全面测试&#xff0c;确保每个环节都能正常工作。 就像汽车出厂前要经过严格的路试一样&#xff0c;Modbus RTU通信系统…

少儿编程C++快速教程之——1. 基础语法和输入输出

1. 欢迎来到C编程世界&#xff01; 1.1 什么是编程&#xff1f; 编程就像是给计算机写一份详细的"说明书"&#xff0c;告诉它该做什么、怎么做。C是一种强大的编程语言&#xff0c;可以用来创建游戏、应用程序和各种有趣的软件&#xff01; 1.2 第一个C程序&#xff…

arma::imat22

arma::imat22 是 Armadillo C 线性代数库中定义的一个固定大小的 2x2 有符号整数矩阵类型。它主要用于处理小型、维度在编译时已知的整数矩阵&#xff0c;因其在栈上分配内存&#xff0c;故通常比动态矩阵有更高的效率。 下面是一个汇总了 arma::imat22 主要特性的表格&#xf…

狗都能看懂的HunYuan3D 1.0详解

HunYuan3D 1.0 HunYuan3D 1.0是2024年9月发布的一篇论文。虽然站在现在的时间节点&#xff0c;HunYuan3D系列已经出到2.5了&#xff0c;但是1.0版本的改进思路&#xff0c;和它trick集成的做法&#xff0c;还是很值得学习的。由于文章用到了很多技术&#xff0c;由于篇幅有限&a…

踏脚迈入奇幻乐园

每天早上上班的路上都会经过一个小花园。它被夹在丁字路口的拐角&#xff0c;面积不大&#xff0c;匆匆而过的行人都不会注意到它。但如果顺着几个不起眼的入口走进去&#xff0c;里面却是别有洞天。清早的街道还没有车水马龙的喧哗&#xff0c;花园里静悄悄的。各式各样的花草…