rabbitmq的高级特性

一.发送者的可靠性

1.生产者重试机制

修改publisher模块的application.yaml文件

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

注意:

①当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是 阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
②如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。 

2.生产者确认机制

RabbitMQ提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的 回执

如何返回基本内容如下:

① 当消息投递到MQ,但是路由失败时,通过 Publisher Return 返回异常信息,同时通过 Publisher Confirm 返回ACK 的确认信息,代表投递成功。
② 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
③ 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功。
④ 其它情况都会返回NACK,告知投递失败。

其中 ack 和 nack 属于 Publisher Confirm 机制,ack 是投递成功;nack 是投递失败。而return 则属于 Publisher Return 机制。默认两种机制都是关闭状态,需要通过配置文件来开启。

①开启生产者确认机制

在publisher模块的 application.yaml 中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型  publisher-returns: true # 开启publisher return机制

这里 publisher-confirm-type 有三种模式可选:

① none:关闭confirm机制
② simple:同步阻塞等待MQ的回执
③ correlated:MQ异步回调返回回执

②定义ReturnCallback

每个 RabbitTemplate 只能配置一个 ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
③定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

总结:Publisher Confirm 用来确认消息是否发送到MQ,而Publish Return 用来通知生产者哪些消息由于路由失败没有被接收

注意:

开启生产者确认比较消耗MQ性能,一般不建议开启。 

二.MQ的可靠性

1.数据持久化

交换机持久化,队列持久化,消息持久化(先保存到内存在写入磁盘),这三个持久化都是默认开启的。如果消息类型是非持久化的,只有在消息队列满了后会被迫写入磁盘。

总结:

持久化是持续将消息写入磁盘,非持久化是当mq内存被使用完毕后才将消息写入磁盘,因此性能较差。

2.LazyQueue

① 接收到(不论临时还是持久的消息)消息后直接存入磁盘而非内存

② 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)(如果消费者的速度很快也会把消息提前缓存到内存)
③ 支持数百万条的消息存储

这种模式属于数据持久化的升级版。

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的模式,而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。

三.消费者的可靠性

1.消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  - ack:成功处理消息,RabbitMQ从队列中删除该消息
  - nack:消息处理失败,RabbitMQ需要再次投递消息
  - reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

     ①none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
     ②manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 reject ,存在业务入侵,但更灵活。
     ③auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack.  当业务出现异常时,根据异常判断返回不同结果:

  • 如果是 业务异常,会自动返回 nack,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。
  • 如果是 消息处理或校验异常,自动返回 reject。

 通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做处理

2.失败重试机制(对消费者确认机制的增强)

如果上面的代码一直返回nack会导致无线循环。

所以我们配置消费者自己重试,如果超过了配置重试的次数,就会返回reject

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

配置之后出现的现象:

- 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了 AmqpRejectAndDontRequeueException 异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是 reject 

3.自定义失败处理策略(对消费者重试机制的增强)

有上面失败之后是直接返回的reject,这可能并不是我们想要返回的结果,于是我们有了失败处理策略。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery 接口来定义的,它有3个不同实现:

  ①RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject ,丢弃消息。默认就是这种方式 (黑马说不可选,下面两个可选)
  ②ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack ,消息重新入队 
  ③RepublishMessageRecoverer :重试耗尽后,将失败消息投递到指定的交换机,然后转入到指定的队列,后续由人工集中处理。

代码实现:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); //这里的with是routingkey}// 这个是配置消息处理失败之后投入到那个交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.确保业务幂等性(解决消费者重复消费的问题)

幂等性:指同一个业务,执行一次或多次对业务状态的影响是一致的。

数据的删除,查询一般是幂等的,但是修改和新增不是幂等的,案例如下:

所以,我们要尽可能避免业务被重复执行。

为了解决上面图中的问题我们有了如下的解决方案:

①使用唯一消息id

给每一个消息都设置一个唯一ID,用ID区分是否被消费过。当我们消费一个消息的时候,先在数据库中查询是否存在这个数据的ID,如果不存在就消费。如果存在就说明这个消息之前被消费过。

给消息设置唯一id:这就是在配置消息转换器的时候添加了一点代码

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

 获取消息的id:

@RabbitListener(queues = "simple.queue")
public void listensimpleQueue(Message message) {log.info("监听到simple.queue的消息:ID:【{}】", message.getMessageProperties().getMessageId());log.info("监听到simple.queue的消息:【{}】", new String(message.getBody()));
}

缺点:

 但是这种方法也有自己的缺点,也就是对业务逻辑有侵入性,而且还有额外的数据库操作。 

②根据业务判断

根据上面出现非幂等性问题,我们可以把业务逻辑改成这样就可以保证业务的幂等性了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/84712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

北京大学肖臻老师《区块链技术与应用》公开课:02-BTC-密码学原理

文章目录 1.比特币中用到的密码学的功能2. hash3. 签名 1.比特币中用到的密码学的功能 比特币中用到密码学中两个功能&#xff1a; hash、 签名。 2. hash hash函数的三个特性&#xff1a;抗碰撞性&#xff08;Collision Resistance&#xff09;、隐蔽性&#xff08;Hiding&…

Spring Cloud Gateway高并发限流——基于Redis实现方案解析

本文是一个基于 Spring Cloud Gateway 的分布式限流方案&#xff0c;使用Redis Lua实现高并发场景下的精准流量控制。该方案支持动态配置、多维度限流&#xff08;API路径/IP/用户&#xff09;&#xff0c;并包含完整的代码实现和性能优化建议。 一、架构设计 #mermaid-svg-vg…

SpringAI--RAG知识库

SpringAI–RAG知识库 RAG概念 什么是RAG&#xff1f; RAG(Retrieval-Augmented Genreation&#xff0c;检索增强生成)是一种结合信息检索技术和AI内容生成的混合架构&#xff0c;可以解决大模型的知识时效性限制和幻觉问题。 RAG在大语言模型生成回答之前&#xff0c;会先从…

【PhysUnits】14 二进制数的标准化表示(standardization.rs)

一、源码 这段代码主要用于处理二进制数的标准化表示。它定义了两个特质(trait) IfB0 和 IfB1&#xff0c;以及它们的实现&#xff0c;用于处理二进制数的前导零及前导一的简化。 use super::basic::{B0, B1, Z0, N1, Integer, NonZero, NonNegOne};/// 处理 B0<H> 类型…

将 ubutun 的网络模式 从NAT 改到 桥接模式后,无法上网,linux 没有IP地址 的解决方案

首先要将 ubutun 的网络模式设置为桥接模式 这里再从 NAT 模式改动成 桥接模式的时候&#xff0c;还出现了一个问题。改成桥接模式后&#xff0c;linux没有ip地址了。原因是 不知道什么时候 将 虚拟网络编辑器 中的值改动了 要选择这个 自动 选项

多模态大语言模型arxiv论文略读(九十)

Hybrid RAG-empowered Multi-modal LLM for Secure Data Management in Internet of Medical Things: A Diffusion-based Contract Approach ➡️ 论文标题&#xff1a;Hybrid RAG-empowered Multi-modal LLM for Secure Data Management in Internet of Medical Things: A Di…

电脑主板VGA长亮白灯

电脑主板VGA长亮白灯 起因解决方法注意事项&#xff1a; 起因 搬家没有拆机整机在车上晃荡导致显卡松动接触不良&#xff08;一般VGA长亮白灯都和显卡有关&#xff0c;主要排查显卡&#xff09; 解决方法 将显卡拆下重新安装即可 注意事项&#xff1a; 不可直接拔下显卡&a…

【监控】pushgateway中间服务组件

Pushgateway 是 Prometheus 生态中的一个中间服务组件&#xff0c;以独立工具形式存在&#xff0c;主要用于解决 Prometheus 无法直接获取监控指标的场景&#xff0c;弥补其定时拉取&#xff08;pull&#xff09;模式的不足。 其用途如下&#xff1a; 突破网络限制&#xff1…

打造AI智能旅行规划器:基于LLM和Crew AI的Agent实践

引言 今天来学习大佬开发的一个AI驱动的旅行规划应用程序&#xff0c;它能够自动处理旅行规划的复杂性——寻jni找航班、预订酒店以及优化行程。传统上&#xff0c;这个过程需要手动搜索多个平台&#xff0c;常常导致决策效率低下。 通过利用**代理型人工智能&#xff08;Age…

21. 自动化测试框架开发之Excel配置文件的测试用例改造

21. 自动化测试框架开发之Excel配置文件的测试用例改造 一、测试框架核心架构 1.1 组件依赖关系 # 核心库依赖 import unittest # 单元测试框架 import paramunittest # 参数化测试扩展 from chap3.po import * # 页面对象模型 from file_reader import E…

如何在电力系统中配置和管理SNTP时间同步?

在电力系统中配置和管理 SNTP 时间同步需结合行业标准&#xff08;如《DL/T 1100.1-2019》&#xff09;和分层架构特点&#xff0c;确保安全性、可靠性和精度适配。以下是具体操作指南&#xff0c;涵盖架构设计、设备配置、安全管理、运维监控四大核心环节&#xff0c;并附典型…

MTK-关于HW WCN的知识讲解

前言: 最近做项目过程中和硬件打交道比较多,现在关于整理下硬件的HW wcn的知识点 一 MTK常见的MT6631 Wi-Fi 2.4GHz 匹配调谐指南 ‌拓扑结构选择‌ 推荐采用并联电容拓扑(‌shunt cap topology‌)代替并联电感拓扑(‌shunt inductor topology‌),以减少潜在电路设计…

(1)课堂 1--5,这五节主要讲解 mysql 的概念,定义,下载安装与卸载

&#xff08;1&#xff09;谢谢老师&#xff1a; &#xff08;2&#xff09;安装 mysql &#xff1a; &#xff08;3&#xff09;镜像下载 &#xff0c;这个网址很好 &#xff1a; &#xff08;4&#xff09; 另一个虚拟机的是 zhang 123456 &#xff1a; 接着配置…

U-Boot ARMv8 平台异常处理机制解析

入口点&#xff1a;arch/arm/cpu/armv8/start.S 1. 判断是否定义了钩子&#xff0c;如有则执行&#xff0c;否则往下走。执行save_boot_params&#xff0c;本质就是保存一些寄存器的值。 2. 对齐修复位置无关码的偏移 假设U-Boot链接时基址为0x10000&#xff0c;但实际加载到0…

mysql安装教程--笔记

一、Windows 系统安装 方法1&#xff1a;使用 MySQL Installer&#xff08;推荐&#xff09; 1. 下载安装包 访问 MySQL 官网下载页面&#xff0c;选择 MySQL Installer for Windows。 2. 运行安装程序 双击下载的 .msi 文件&#xff0c;选择安装类型&#xff1a; ◦ Developer…

投资策略规划最优决策分析

目录 一、投资策略规划问题详细 二、存在最优投资策略&#xff1a;每年都将所有钱投入到单一投资产品中 &#xff08;一&#xff09;状态转移方程 &#xff08;二&#xff09;初始条件与最优策略 &#xff08;三&#xff09;证明最优策略总是将所有钱投入到单一投资产品中…

NGINX HTTP/3 实验指南安装、配置与调优

一、HTTP/3 简介 基于 QUIC&#xff1a;在 UDP 之上实现的多路复用传输&#xff0c;内置拥塞控制与前向纠错&#xff0c;无需三次握手即可恢复连接。零 RTT 重连&#xff1a;借助 TLS 1.3&#xff0c;实现连接恢复时的 0-RTT 数据发送&#xff08;视底层库支持&#xff09;。多…

编程日志5.28

string赋值操作 算法: #include<iostream> using namespace std; int main() { //1.字符串常量的赋值 string s1; s1 = "英雄哪里出来"; cout << s1 << endl; //2.字符串变量的赋值 string s2; s2 = s1; cout <…

AE的ai图层导到Ai

AE的ai图层导到ai 解决方法: 1、打开ai软件&#xff0c;不用新建&#xff0c;留在那就行。 2、在AE里选中任意一个ai文件图层&#xff0c;只需同时按住ctrl和英文字母键&#xff0c;图层就会自动全部导入到ai中 英文字母键的详情可以参考&#xff1a;http://www.yayihouse.co…

【Springboot+LangChain4j】Springboot项目集成LangChain4j(下)

前置条件&#xff1a;根据上篇文章完成springboot初步集成LangChain4j 【SpringbootLangChain4j】根据LangChain4j官方文档&#xff0c;三分钟完成Springboot项目集成LangChain4j&#xff08;上&#xff09;-CSDN博客 但是接口方法中&#xff0c;关于大模型的配置都是写死的&a…