RabbitMQ核心特性——重试、TTL、死信队列

一、重试机制

      在消息传输过程中,可能遇到各种问题,如网络故障,服务器不可用等,这些问题可能导致消息处理失败,因此RabbitMQ提供了重试机制,允许消息处理失败后重新发送,但是,如果是因为程序逻辑发生的错误,那么重试多次也是无用的,因此重试机制可以设置重试次数。

1.1 重试配置

spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto #消息接收确认 retry:enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时⻓为5秒 max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次) 

2.2 配置交换机、队列

(1)配置交换机、队列、及绑定关系:

    /** 重试机制*/@Bean("retryQueue")public Queue retryQueue(){return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}@Bean("retryExchange")public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).build();}@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryQueue") Queue queue,@Qualifier("retryExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("retry");}

(2)生产者

    /** 重试机制*/@RequestMapping("/retry")public String retry(){rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","retry test...");return "消息发送成功";}

(3)消费者

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("业务处理完成");} 
}

2.3 测试

     可以看到,重试后还是未能正常消费消息,抛出异常,需要注意的是,如果手动处理异常,是不会触发重试的,如:

@Component
public class RetryListener {@RabbitListener(queues = Constants.RETRY_QUEUE)public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("[" + Constants.RETRY_QUEUE + "]接收到消息: %s," +" deliveryTag: %S \n",new String(message.getBody(),"UTF-8"),deliveryTag);int num = 10/0;System.out.println("业务处理完成");}catch (Exception e){System.out.println("业务处理失败");}}

再次测试代码:

没有触发重试


2.4 重试注意事项

1. 自动确认模式 : 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失

2. 手动确认模式:程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是 unacked的状态, 导致消息积压


 二、TTL 机制

       TTL 即 Time To Live(过期时间), RabbitMQ可以对消息或队列设置过期时间,当消息过期后,就会被自动清除,无论是对消息设置TTL还是对队列设置TTL,本质上都是设置消息的TTL

 

2.1 设置消息的TTL

一、准备工作(声明队列、交换机)

//未设置TTL的queue@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).build();}@Bean("ttlExchange")public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constants.TTL_exchange).build();}@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl");}

 二、如何设置消息的TTL

设置消息的TTL是在发送消息是设置的,通过下面这个方法来发送:

	public void convertAndSend(String exchange, String routingKey, final Object message,final MessagePostProcessor messagePostProcessor)

     这个方法比前面使用的方法多了一个参数,就是通过这个messagePostProcessor来设置消息的TTL,只需要在发送消息前,构造一个MessagePostProcesser对象并传入即可:

    @RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);return "消息发送成功";}

三、测试代码

10s后:

消息已近被清除


四、设置消息的TTL存在的问题

     大家都知道,队列满足先进先出的特性,那么如果先发送一条TTL为30s的消息,再发送一条TTL为10s的消息,那么当10s后,后进队列的那条消息是否会被移除?

    不妨测试一下:

    @RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};MessagePostProcessor messagePostProcessor2 = message -> {message.getMessageProperties().setExpiration("30000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 30s...",messagePostProcessor2);rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);return "消息发送成功";}

    运行程序:

为了解决这个问题,我们可以对队列设置TTL


2.2 设置队列的TTL

 一、声明队列,绑定交换机(绑定在前面声明的交换机上即可)

 //设置TTL的queue@Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20*1000).build();//设置队列TTL为20s}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2") Queue queue,@Qualifier("ttlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl2");}

二、如何对队列设置TTL?

其实在上面声明队列时已经设置了,只需要在声明队列时通过 ttl方法 设置即可。


三、在TTL队列中存放为设置TTL的消息,消息是否移除?

   生产者代码:

    @RequestMapping("/ttl")public String ttl(){rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test...");return "消息发送成功";}

   测试:

   可以看到,消息同样会过期


四、队列TTL为20s,消息TTL为10s,消息什么时候过期?

   生产者代码:

    @RequestMapping("/ttl")public String ttl(){MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setExpiration("10000");return message;};rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl" ,"ttl test 10s...",messagePostProcessor);rabbitTemplate.convertAndSend(Constants.TTL_exchange,"ttl2" ,"ttl2 test...");return "消息发送成功";}

   测试:


三、死信队列

 3.1 什么是死信

     由于各种原因,导致的无法被消费的消息,就是死信,死信队列就是用来存储死心的队列,当一个消息在队列中变成死信后,可以被重新发送到另一个交换机DLX(Dead Letter Exchange)中,这个交换机绑定的队列就是死信队列DLQ(Dead Letter Queue)。

    消息变成死信有以下几种原因:

1> 消息过期

2> 消息被拒绝 ,且requeue参数置为false

3> 队列达到最大长度


3.2 死信代码示例

一、声明队列、交换机及绑定关系

    @Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("normal");}@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue queue,@Qualifier("dlExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dl");}

     上面在声明了普通交换机、队列以及死信交换机、队列,还要声明普通队列与死信交换机的关系(确保消息变成死信后会通过死信交换机路由到死信队列),只需要在声明普通队列时通过 deadLetterExchange 和 deadLetterRoutingKey 绑定即可。


二、测试由于消息过期而导致的死信

    前面在声明normalQueue时已经通过ttl方法设置了过期时间,所以只需编写生产者代码即可:

    @RequestMapping("/dl")public String dl(){rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test...");return "消息发送成功";}

    运行程序,测试:


三、测试由于消息被拒绝导致的死信

    编写消费者代码:

@Component
public class DlListener {@RabbitListener(queues = Constants.NORMAL_QUEUE)public void handlerMessage1(Message message, Channel channel) throws IOException {Long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");int num = 10 / 0;System.out.println("业务处理完成");channel.basicAck(deliveryTag, false);} catch (Exception e) {//!!!注意requeue一定要置为false才能变成死信System.out.println("业务处理失败");channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = Constants.DL_QUEUE)public void handlerMessage2(Message message, Channel channel) throws IOException {Long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("死信队列接收到消息:%s,deliveryTag: %d \n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());channel.basicAck(deliveryTag,false);}
}

    测试:


四、测试由于队列达到最大长度导致的死信

    修改normal队列的声明(添加一个maxLength方法指定队列最大长度):

    @Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).ttl(10000).maxLength(1).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dl").build();}

    重新编写生产者代码,连续发送10条消息:

 @RequestMapping("/dl")public String dl(){for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl test..." + i);}return "消息发送成功";}

    由于队列的最大长度为1,因此应该有9条消息进入死信队列,测试:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/84276.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MVCC实现原理

MVCC的基本概念 MVCC&#xff0c;一个数据的多个版本&#xff0c;使得读写操作没有冲突。 在多个事务并发的情况下&#xff0c;确定到底要访问哪个版本。 MVCC实现原理 MVCC实现依赖于隐式字段&#xff0c;undo log日志&#xff0c;readView 隐式字段 在mysql用户自定义的…

湖北理元理律师事务所债务优化方案解析:如何科学规划还款保障生活质量

在当前经济环境下&#xff0c;债务问题已成为困扰许多家庭的重要难题。据相关统计数据显示&#xff0c;我国个人负债率呈现逐年上升趋势&#xff0c;如何合理规划还款、保障基本生活质量成为亟待解决的社会问题。湖北理元理律师事务所基于多年实务经验&#xff0c;研发出一套科…

ffmpeg 转换视频格式

使用FFmpeg将视频转换为MP4格式的常用命令&#xff1a; ffmpeg -i input.mov -c:v libx264 -crf 23 -c:a aac output.mp4 -i input.avi&#xff1a;指定输入文件 -c:v libx264&#xff1a;使用H.264视频编码器 -crf 23&#xff1a;控制视频质量&#xff08;范围18-28&#…

LLM Tuning

Lora-Tuning 什么是Lora微调&#xff1f; LoRA&#xff08;Low-Rank Adaptation&#xff09; 是一种参数高效微调方法&#xff08;PEFT, Parameter-Efficient Fine-Tuning&#xff09;&#xff0c;它通过引入低秩矩阵到预训练模型的权重变换中&#xff0c;实现无需大规模修改…

实现tdx-hs300-mcp

文章目录 项目简介功能说明使用方法配置说明项目简介 tdx-hs300-mcp是一个Model Context Protocol (MCP)的服务 功能说明 下载数据自动保存为CSV格式文件使用方法 确保已安装Python 3.7+和依赖库: pip install pytdx fastapi uvicorn启动MCP服务: mcp run MCP.py使用MCP工具…

《100天精通Python——基础篇 2025 第20天:Thread类与线程同步机制详解》

目录 一、概念简单回顾二、Python的线程开发2.1 Thread类2.1.1 线程启动2.1.2 线程退出2.1.3 线程的传参2.1.4 threading的属性和方法2.1.5 Thread实例的属性和方法2.1.6 start和run方法 2.2 多线程2.3 线程安全2.4 daemon线程2.5 threading.local类2.6 __slots__拓展 三、线程…

【web应用】前后端分离开源项目联调运行的过程步骤ruoyi

文章目录 ⭐前言⭐一、项目运行环境准备⭐二、数据库创建&#x1f31f;1、新建数据库&#x1f31f;2、导入数据脚本 ⭐三、运行后端项目&#x1f31f;1、打开后端项目&#x1f31f;2、后端项目配置项修改 ⭐四、运行前端项目VUE3&#x1f31f;1、在IDEA另一个窗口中打开前端项目…

【深度剖析】三一重工的数字化转型(下篇1)

在数字经济持续发展的背景下,企业数字化转型方案成为实现转型的关键。不同行业内的企业因转型动机和路径的差异,其转型成效也各异。三一重工作为机械制造行业的领军企业,较早地实施了数字化转型,并积累了丰富的经验。本研究选取三一重工作为案例,通过梳理相关文献,对其数…

Nacos适配GaussDB超详细部署流程

1部署openGauss 官方文档下载 https://support.huaweicloud.com/download_gaussdb/index.html 社区地址 安装包下载 本文主要是以部署轻量级为主要教程 1.1系统环境准备 操作系统选择 系统AARCH64X86-64openEuler√√CentOS7√Docker√√1.2软硬件安装环境 版本轻量版(单…

国际前沿知识系列五:时间序列建模方法在头部撞击运动学测量数据降噪中的应用

目录 国际前沿知识系列五&#xff1a;时间序列建模方法在头部撞击运动学测量数据降噪中的应用 一、引言 二、时间序列建模方法 &#xff08;一&#xff09;ARIMA 模型 &#xff08;二&#xff09;指数平滑法 &#xff08;三&#xff09;小波变换 三、实际案例分析 &…

线性代数中的向量与矩阵:AI大模型的数学基石

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家、CSDN平台优质创作者&#xff0c;高级开发工程师&#xff0c;数学专业&#xff0c;10年以上C/C, C#, Java等多种编程语言开发经验&#xff0c;拥有高级工程师证书&#xff1b;擅长C/C、C#等开发语言&#xff0c;熟悉Java常用开…

第十七次CCF-CSP算法(含C++源码)

第十七次CCF-CSP认证 小明种苹果AC代码 小明种苹果&#xff08;续&#xff09;AC代码 后面好难哈哈 小手冰凉 小明种苹果 输入输出&#xff1a; 题目链接 AC代码 #include<iostream> using namespace std; int n,m; int res,res3; int sum; int res21; int main(){cin …

curl常用指令

curl使用记录 curl常用指令安装请求get请求post请求错误排查 curl常用指令 安装 sudo apt update sudo apt install curl -y请求 get请求 curl [URL]如果能正常请求&#xff0c;则会返回正常的页面信息 post请求 发送 JSON 数据​ curl -X POST [URL] -H "Content-…

C++ 输入输出流示例代码剖析

一、开篇&#xff1a;代码核心概述 本文围绕一段融合输入输出流操作、自定义类型重载、文件读写的C代码展开&#xff0c;深入探究其底层原理与实践应用。代码通过类型转换、操作符重载等技术&#xff0c;实现自定义类型与标准输入输出流的交互&#xff0c;同时借助文件流完成数…

常见嵌入式软件架构

常见的嵌入式软件架构 一、ASW文件夹&#xff08;Application Software&#xff0c;应用软件&#xff09;定义与作用常见子目录结构特点 二、BSP文件夹&#xff08;Board Support Package&#xff0c;板级支持包&#xff09;定义与作用常见子目录结构特点 三、OS文件夹&#xf…

【PostgreSQL】数据探查工具1.0研发可行性方案

👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 想抢先解锁数据自由的宝子,速速戳我!评论区蹲一波 “蹲蹲”,揪人唠唠你的超实用需求! 【PostgreSQL】数据探查工具1.0研发可行性方案,数据调研之秒解析数据结构,告别熬夜写 SQL【PostgreSQL】数据探查工具…

Lambda表达式与匿名内部类的对比详解

Lambda表达式与匿名内部类的对比详解 1. 语法简洁性 Lambda表达式&#xff1a; 仅适用于函数式接口&#xff08;只有一个抽象方法的接口&#xff09;&#xff0c;语法简洁。 示例&#xff1a; Runnable r () -> System.out.println("Hello Lambda");匿名内部类&…

Seata Server 1.6.1 高可用部署终极指南:Nacos配置中心+DB存储+多实例实战

文章目录 高可用 - 关键因素存储模式配置中心注册中心高可用 - 步骤第 1 步:使用 db 作为存储模式第 2 步:使用 Nacos 配置中心自定义 seata-server 配置添加 seata-server.properties 到 Nacos第 3 步:修改 application.yml使用 Nacos 作为配置中心使用 Nacos 作为注册中心…

JS 中判断 null、undefined 与 NaN 的权威方法及场景实践

在 JavaScript 中&#xff0c;null、undefined 和 NaN 是三个特殊的「非正常值」&#xff0c;正确判断它们是保证代码健壮性的关键。本文结合 ECMA 规范与 MDN 权威文档&#xff0c;系统梳理三者的判断方法、原理及典型场景&#xff0c;帮助开发者规避常见误区。 一、理解三个…

基于DenseNet的医学影像辅助诊断系统开发教程

本文源码地址: https://download.csdn.net/download/shangjg03/90873921 1. 简介 本教程将使用DenseNet开发一个完整的医学影像辅助诊断系统,专注于胸部X光片的肺炎检测。我们将从环境搭建开始,逐步介绍数据处理、模型构建、训练、评估以及最终的系统部署。 2. 环境准备<…