Spring-rabbit重试消费源码分析

在集成RabbitMQ与Spring Boot 3.1.x时,RetryOperationsInterceptor 是实现消息重试机制的关键组件。这里将深入分析 RetryOperationsInterceptor 的工作原理,尤其是在消费者消费失败时的行为,并结合底层源码进行详解。

一、配置解析

首先,需要提供 RetryOperationsInterceptor 配置如下:

// 配置重试拦截器
RetryOperationsInterceptor retryInterceptor = RetryInterceptorBuilder.stateless().maxAttempts(3) // 初次消费 + 2次重试.backOffOptions(1000, 2.0, 10000) // 初始间隔1秒,倍增因子2.0,最大间隔10秒.recoverer(new RepublishMessageRecoverer(rabbitTemplate,DEAD_LETTER_EXCHANGE,DEAD_LETTER_ROUTING_KEY)).build();

这段配置的含义如下:

  • maxAttempts(3): 设置最大尝试次数为3次,包括初次消费和2次重试。
  • backOffOptions(1000, 2.0, 10000): 设置重试的间隔策略,初始间隔为1秒,倍增因子为2.0,最大间隔为10秒。
  • recoverer: 当所有重试尝试失败后,使用 RepublishMessageRecoverer 将消息转发到指定的死信交换机和路由键。

二、RetryOperationsInterceptor 的工作原理

RetryOperationsInterceptor 是Spring Retry提供的拦截器,用于在方法执行失败时自动进行重试。结合Spring AMQP(RabbitMQ)的消息监听器容器,它能够在消息处理失败时执行重试逻辑。

1. 消息消费流程

当RabbitMQ消费者接收到消息时,以下步骤会依次执行:

  1. 消息接收: 消息被送到监听方法 onMessage
  2. 消息处理: 执行 processMessage(message) 方法进行业务处理。
  3. 成功确认: 如果处理成功,消息被确认(ACK)。
  4. 处理失败: 如果抛出异常,触发重试机制。

2. 重试拦截器的作用

processMessage 方法抛出异常时,RetryOperationsInterceptor 会拦截这个异常,并按照配置的重试策略进行重试。具体流程如下:

  1. 拦截异常: 异常被 RetryOperationsInterceptor 捕获。
  2. 执行重试: 根据 backOffOptions 设置的间隔和倍增因子,等待指定时间后重新执行 onMessage 方法。
  3. 重试次数限制: 如果重试次数未超过 maxAttempts,则继续重试;否则,执行 recoverer 逻辑。

3. 重试逻辑的底层实现

从源码角度看,RetryOperationsInterceptor 主要依赖于 RetryTemplate 来执行重试逻辑。以下是关键步骤:

  1. 构建 RetryTemplate:

    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(retryPolicy);ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(2.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    
  2. 执行重试:

    retryTemplate.execute(context -> {// 调用实际的消息处理方法onMessage(message);return null;
    }, context -> {// 重试失败后的回调recoverer.recover(message, context.getLastThrowable());return null;
    });
    
  3. 异常处理与恢复:

    • 每次重试失败时,RetryTemplate 会根据 BackOffPolicy 计算下次重试的等待时间。
    • 如果所有重试次数都失败,则调用 RepublishMessageRecoverer 将消息发送到死信队列。

三、与 @RabbitListener 的集成

在您的消费者代码中:

@RabbitListener(queues = RabbitConfig.MAIN_QUEUE, containerFactory = "rabbitListenerContainerFactory")
public void onMessage(String message) throws Exception {LOGGER.info("接收到消息: {}", message);// 处理消息processMessage(message);LOGGER.info("消息处理成功并确认: {}", message);
}

这里的关键在于 containerFactory 的配置,确保 RetryOperationsInterceptor 被正确应用到消息监听器容器中。

1. rabbitListenerContainerFactory 配置示例

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory,RabbitTemplate rabbitTemplate) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);factory.setAdviceChain(retryInterceptor(rabbitTemplate));return factory;
}@Bean
public RetryOperationsInterceptor retryInterceptor(RabbitTemplate rabbitTemplate) {return RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(1000, 2.0, 10000).recoverer(new RepublishMessageRecoverer(rabbitTemplate,DEAD_LETTER_EXCHANGE,DEAD_LETTER_ROUTING_KEY)).build();
}

在这里,RetryOperationsInterceptor 被添加到监听器容器的 adviceChain 中,使其能够拦截 onMessage 方法的执行。

四、底层源码分析

让我们更深入地看看Spring AMQP和Spring Retry的集成是如何实现的。

1. Spring AMQP 消息监听器容器

SimpleRabbitListenerContainerFactory 创建的 SimpleMessageListenerContainer 是实际的消息监听器容器。该容器负责从RabbitMQ获取消息并调用相应的监听方法。

2. RetryOperationsInterceptor 的集成

SimpleMessageListenerContainer 中,adviceChain 被应用到消息处理逻辑中。具体来说,RabbitListenerEndpointContainer#invokeListener 方法会被 RetryOperationsInterceptor 包裹,确保在调用监听方法时执行重试逻辑。

3. RetryTemplate 的执行流程

RetryOperationsInterceptor 内部使用 RetryTemplate 来管理重试流程。其核心逻辑如下:

public Object invoke(MethodInvocation invocation) throws Throwable {return retryTemplate.execute(context -> {try {return invocation.proceed();} catch (Exception e) {throw e;}}, context -> {// 重试失败后的恢复逻辑recoverer.recover(message, context.getLastThrowable());return null;});
}

在每次重试中,RetryTemplate 会调用 invocation.proceed() 执行实际的消息处理。如果抛出异常,则根据 RetryPolicy 决定是否继续重试。

4. RepublishMessageRecoverer 的作用

当所有重试尝试失败后,RepublishMessageRecoverer 会将消息重新发布到指定的死信交换机和路由键。这是通过以下方式实现的:

public void recover(Message message, Throwable cause) {MessageProperties properties = message.getMessageProperties();properties.setHeader("x-exception-stacktrace", getStackTrace(cause));rabbitTemplate.send(deadLetterExchange, deadLetterRoutingKey, message);
}

这样,未成功处理的消息不会丢失,而是被转发到死信队列,便于后续分析和处理。

五、总结

RetryOperationsInterceptor 在Spring Boot与RabbitMQ集成中,通过拦截消息处理方法的异常,按照配置的重试策略自动执行重试逻辑,极大地提高了系统的可靠性和健壮性。其底层依赖于Spring Retry的 RetryTemplateBackOffPolicy,并通过 RepublishMessageRecoverer 实现失败后的消息转发。

通过理解上述工作原理和源码实现,可以更灵活地配置和优化消息重试机制,确保消息处理的稳定性和可控性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/84984.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用JacksonTypeHandler处理mysql json字符串转List对象的问题

在使用mysql5.7或更高版本时&#xff0c;json类型字段应用场景越来越多&#xff0c;对于普通的对象或者List<Integer>、List<String>这些基础类型&#xff0c;jacksonTypeHandler都能很好的处理&#xff0c;如下&#xff1a; 1、定义一个person对象 import com.f…

华为云Flexus+DeepSeek征文 | 基于Dify构建股票分析助手

华为云FlexusDeepSeek征文 | 基于Dify构建AI 图片生成应用 一、构建股票分析助手前言二、构建股票分析助手环境2.1 基于FlexusX实例的Dify平台2.2 基于MaaS的模型API商用服务 三、构建股票分析助手实战3.1 配置Dify环境3.2 配置Dify工具3.3 创建股票分析助手3.4 使用股票分析助…

【0.1 漫画计算机组成原理】

🖥️ 漫画计算机组成原理 🎯 学习目标:深入理解计算机硬件基础,为后续Java编程和性能优化打下坚实基础 📋 目录 CPU架构与指令集内存层次结构冯诺依曼架构与哈佛架构总线系统与IO设备计算机性能分析实际应用场景🎭 漫画引言 小明: “为什么我的Java程序有时候跑得飞…

pytorch 实战二 CNN手写数字识别

系列文章目录 文章目录 系列文章目录前言一、torchvision.datasets1. 数据下载2. 数据分批次传入 二、网络1. 网络搭建2. 训练3.测试 完整代码三、保存模型与推理&#xff08;inference&#xff09;模型保存推理鸣谢 前言 手写数字识别&#xff0c;就是要根据手写的数字0~9&…

[Godot] C#读取CSV表格创建双层字典实现本地化

最近研究了一下本地化&#xff0c;给大家用简单易懂的方式说明我是怎么实现的&#xff0c;使用CSV表格填写翻译&#xff0c;然后在Godot中读取为字典 表格填写 首先&#xff0c;我们表格可以按照下面这种格式填写 idzhenjaruesdefrapple苹果appleリンゴяблокоmanzanaA…

Spark 之 Subquery

各类 Subquery src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala /*** Evaluates to `true` if `values` are returned in `query`s result set.*/ case class InSubquery(values: Seq[Expression], query: ListQuery)extends Predicate with Une…

3.1.3_栈的链式存储实现

知识总览&#xff1a; 链栈定义&#xff1a; 头插法建立单链表&#xff1a; 每次要插入一个元素的时候&#xff0c;总是把该元素插在头节点之后的位置&#xff0c;如果规定只能在单链表的链头一端进行操作即为进栈操作 每次删除一个元素的时候&#xff0c;规定只能在单链表…

华为OD机试_2025 B卷_字符串重新排列(Python,100分)(附详细解题思路)

题目描述 给定一个字符串s&#xff0c;s包括以空格分隔的若干个单词&#xff0c;请对s进行如下处理后输出&#xff1a; 1、单词内部调整&#xff1a;对每个单词字母重新按字典序排序 2、单词间顺序调整&#xff1a; 1&#xff09;统计每个单词出现的次数&#xff0c;并按次数降…

http的缓存问题

一句话概括&#xff1a;浏览器请求资源的时候&#xff0c;会首先检查本地是否有缓存&#xff0c;减少向服务器请求的次数 一、缓存类型&#xff1a; 1. 强缓存&#xff08;本地缓存&#xff09;&#xff1a;直接读本地&#xff0c;不发请求 控制方式&#xff1a; ① Cache-C…

【网络安全】SRC漏洞挖掘思路/手法分享

文章目录 Tip1Tip2Tip3Tip4Tip5Tip6Tip7Tip8Tip9Tip10Tip11Tip12Tip13Tip14Tip15Tip16Tip17Tip18Tip19Tip20Tip21Tip22Tip23Tip24Tip25Tip26Tip27Tip28Tip29Tip30Tip1 “复制该主机所有 URL”:包含该主机上的所有接口等资源。 “复制此主机里的链接”:包括该主机加载的第三…

「Linux中Shell命令」Shell常见命令

知识点及案例解析 1. who 命令 功能:显示当前登录系统的用户信息,包括用户名、终端、登录时间、IP等。 案例: who输出示例: root tty1 2025-06-13 19:42 root pts/0 2025-06-13 19:45 (192.168.226.1)解析: 显示两个用户登录信息: 第一列(用…

StampedLock入门教程

文章目录 一、理解“戳” (Stamp)二、为什么 StampedLock 能提高读性能&#xff1f;秘密在于“乐观读”StampedLock性能对比性能对比结果图 总结 StampedLock完整演示代码对代码的疑问之处问题一&#xff1a;为什么 demonstrateOptimisticReadFailure 中写线程能修改成功&#…

基于云计算的振动弦分析:谐波可视化与波动方程参数理解-AI云计算数值分析和代码验证

振动弦方程是一个基础的偏微分方程&#xff0c;它描述了弹性弦的横向振动。其应用范围广泛&#xff0c;不仅可用于模拟乐器和一般的波动现象&#xff0c;更是数学物理以及深奥的弦理论中的重要基石。 ☁️AI云计算数值分析和代码验证 振动弦方程是描述固定两端弹性弦横向振动的…

Qt .pro配置gcc相关命令(三):-W1、-L、-rpath和-rpath-link

目录 1.Linux 动态库相关知识 1.1.动态库查找路径 1.2.查看程序依赖的动态库 1.3.修改动态库查找路径的方法 1.4.动态链接器缓存管理 2.-Wl参数 3.-L选项&#xff08;编译时路径&#xff09; 4.-rpath参数(运行时路径) 5.-rpath-link 参数 6.常见问题与解决方案 7.总…

Hoppscotch

官方地址 xixiaxiazxiaxix下载 • Hoppscotch Hoppscotch 是一款轻量级、基于 Web 的 API 开发套件&#xff0c;其核心功能和特点如下&#xff1a; 核心功能3 交互式 API 测试&#xff1a;允许用户实时发送请求并查看响应&#xff0c;方便记录 API 行为&#xff0c;在记录响…

RabbitMQ 知识详解(Java版)

RabbitMQ 知识详解&#xff08;Java版&#xff09; RabbitMQ 是一个开源的消息代理&#xff0c;实现了高级消息队列协议&#xff08;AMQP&#xff09;。它用于在分布式系统中实现应用解耦、异步通信和流量削峰。 核心概念 生产者(Producer)&#xff1a;发送消息的应用消费者(…

Flink task、Operator 和 UDF 之间的关系

要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用&#xff0c;仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制&#xff0c;洞悉数据从代码到分布式执行的完整生命周期&#xff0c;以及明晰各个核心组件之间错综复杂而又协同工作的关系&#xff0c;…

Veeam Backup Replication系统的安装与使用

Veeam Backup & Replication系统安装与使用 系统简介 核心功能 备份与恢复&#xff1a;专注于虚拟化环境&#xff08;如VMware和Hyper-V&#xff09;的备份与恢复&#xff0c;支持物理服务器和云环境。快速恢复&#xff1a;提供即时恢复功能&#xff0c;可在几分钟内恢复…

十四、【ESP32全栈开发指南:搭建轻量级HTTP服务器】

一、HTTP协议基础 HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;作为互联网基础协议&#xff0c;采用请求-响应模型工作&#xff1a; 1.1 HTTP请求组成 GET /uri?query1value1 HTTP/1.1 // 请求行&#xff08;方法URI协议版本&#xff09; Host: example…

java中LinkedList和ArrayList的区别和联系?

我们被要求解释Java中LinkedList和ArrayList的区别和联系。下面将分别从实现原理、性能特点、使用场景等方面进行详细说明&#xff0c;并总结它们的相同点和不同点。 # 一、联系&#xff08;共同点&#xff09; 1. 都实现了List接口&#xff0c;因此具有List接口的所有方法&…