【RabbitMQ】高级特性—持久性、重试机制详解

持久性

我们在前面说了消息端处理消息时,消息如何不丢失,但是如何保证当 RabbitMQ 服务器停掉之后,生产者发送的消息不丢失呢?

默认情况下,RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息,除非告知他不要这么做

RabbitMQ 的持久化分为三个部分:

  • 交换器的持久化
  • 队列的持久化
  • 消息的持久化

交换机持久化

交换机的持久化,是通过在声明交换机时将 durable 参数置为 true 实现的

  • 相当于将交换机的属性在服务器内部保存,当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新去建立交换机,交换机会自动建立,相当于一直存在
  • 如果交换器不设置持久化,那么在 RabbitMQ 服务器重启之后,相关的交换机元数据会丢失,对一个长期使用的交换机来说,建议将其置为持久化的
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();

队列持久化

队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的

  • 如果队列不设置持久化,那么在 RabbitMQ 服务器重启之后,该队列就会被删除掉,此时数据也会丢失(队列没有了,消息也无处可存了)
  • 队列的持久化能保证该队列本身的元数据不会因异常情况而丢失,但是并不能保证内部存储的消息不会丢失
  • 要确保消息不会丢失,需要将消息设置为持久化

咱们前面用的创建队列的方式都是持久化的

QueueBuilder.durable(COnstant.ACK_QUEUE).build();

点进去看源码会发现,该方法默认 durabletrue

public static QueueBuilder durable() {  return durable(namingStrategy.generateName());  
}  private QueueBuilder setDurable() {  this.durable = true;  return this;  
}

通过下面代码,可以创建非持久化的队列

QueueBuilder.noDurable(Constant.ACK_QUEUE).build();

消息持久化

消息实现持久化,需要把消息的投递模式 (MessageProperties 中的 deliveryMode)设置为 2,也就是 MessageDeliveryMode.PERSISTENT

public enum MessageDeliveryMode {  NON_PERSISTENT,  PERSISTENT;

设置了队列和消息的持久化,当 RabbitMQ 服务器重启之后,消息依旧存在。

  • 如果只设置队列持久化,重启之后消息会丢失
  • 如果只设置消息持久化,重启之后队列消息,继而消息也丢失
    所以单单设置消息持久化而不设置队列的持久化显得毫无意义
// 非持久化信息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());// 持久化信息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAN, msg.getBytes());

MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性

    public static final BasicProperties PERSISTENT_TEXT_PLAIN =  new BasicProperties("text/plain",  null,  null,  2,  0, null, null, null,  null, null, null, null,  null, null);  
}

如果使用 RabbitTemplate 发送持久化消息,代码如下:

// 要发送的消息内容
String message = "This is a persistent message";// 创建一个 Message 对象,设置为持久化
Message messagePbject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 使用 RabbitTemplate 发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);

  • RabbitMQ 默认情况下会将消息视为持久化,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化
  • 我们也可以通过打印 Message 这个对象,来观察消息是否持久化
(Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=2, consumerTag=amq.ctag-mtd-2Mec9zH2fXizRqVAqg, consumerQueue=ack_queue])
  • 将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能(随机)
  • 写入磁盘的速度比写入内存的速度慢得不只一点点,对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量
  • 在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡

数据丢失

将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的

  1. 从消费者来说,如果在订阅消息队列时将 autoAck 参数设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将 autoAck 参数设置为 false,并进行手动确认,详细可以参考(消息确认章节)

  2. 在持久化的消息正确存入 RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘中。RabbitMQ 并不会为每条信息都进行同步存盘(调用内核的 fsync 方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内 RabbitMQ 服务器节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失


这个问题如何解决呢?

  1. 引入 RabbitMQ仲裁队列(后面会说),如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉

    • 此方法也不能保证 100% 可靠,但是配置了仲裁队列要比没有配置的可靠性要高很多,实际生产环境中的关键业务队列一般都会设置仲裁队列
  2. 还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确地发送病存储至 RabbitMQ 中(详情参考后面发送方确认)

重试机制

在消息传递过程中,可能会遇到各种问题,如网络故障,服务不可用,资源不足等,这些问题可能导致消息处理失败

为了解决这些问题,RabbitMQ 提供了重试机制,允许消息在处理失败后重新发送

但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数

1. 重试配置

spring:   rabbitmq:  addresses: amqp://guest:guest@127.0.0.1:5672/coding  listener:  simple:  acknowledge-mode: manual  # 消息接收确认  retry:  enabled: true  # 开启消费者失败重试  initial-interval: 5000ms  # 初始失败等待时长为 5s          max-attempts: 5  # 最大重试次数(包括自身消费的一次)

2. 配置交换机&队列

public static final String RETRY_EXCHANGE_NAME = "retry_exchange";  
public static final String RETRY_QUEUE = "retry_queue";
// 1. 交换机  
@Bean("retryExchange")  
public FanoutExchange retryExchange() {  return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();  
}  // 2. 队列  
@Bean("retryQueue")  
public Queue retryQueue() {  return QueueBuilder.durable(Constant.RETRY_QUEUE).build();  
}  // 3. 队列和交换机绑定 Binding@Bean("retryBinding")  
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange exchange, @Qualifier("retryQueue") Queue queue) {  return BindingBuilder.bind(queue).to(exchange);  
}

3. 发送消息

@RequestMapping("/retry")  
public String retry() {  rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test...");  return "发送成功";  
}

4. 消费消息

@Component  
public class RetryQueueListener {  // 指定监听队列的名称  @RabbitListener(queues = Constant.RETRY_QUEUE)  public void Listener(Message message) throws Exception {  System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  // 模拟处理失败  int num = 3 / 0;  System.out.println("处理完成");  }

5. 运行程序

运行程序,调用接口,发送消息: http://127.0.0.1:8080/producer/retry

[外链图片转存中…(img-7OSM9aK6-1752140853006)]

异常捕获

如果对异常进行捕获,那么就不会进行重试

  • 代码修改如下
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  
// 模拟处理失败  
try {  int num = 3 / 0;  System.out.println("处理完成");  
} catch (Exception e) {  System.out.println("处理失败");  
}

重新运行程序,结果如下:[外链图片转存中…(img-7hevkq3m-1752140853007)]

6. 手动确认

改为手动确认


@RabbitListener(queues = Constant.RETRY_QUEUE)  
public void ListenerQueue(Message message, Channel channel) throws Exception{  long deliveryTag = message.getMessageProperties().getDeliveryTag();  try {  System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  message.getMessageProperties().getDeliveryTag());  // 模拟处理失败  int num = 3 / 0;  System.out.println("处理完成");  // 3. 手动签收  channel.basicAck(deliveryTag, true);  } catch (Exception e) {  // 4. 异常了就拒绝签收  Thread.sleep(1000);  // 第三个参数 requeue,是否重新发送。若为 true,则重发;若为 false,则直接丢弃  channel.basicNack(deliveryTag, true, true);  }  
}

运行结果:[外链图片转存中…(img-ne4pJWMZ-1752140853008)]

  • 可以看到,手动确认模式时,重试次数的限制不会像在自动确认模式下那样直接生效,因为是否重试以及何时重试更多的取决于应用程序的逻辑和消费者的实现

自动确认模式下RabbitMQ 会在消息被投递给消费者后自动确认消息。

  • 如果消费者处理消息时抛出异常,RabbitMQ 根据配置的重试参数自动将消息重新入队,从而实现重试
  • 重试次数和重试间隔等参数可以直接在 RabbitMQ 的配置中设定,并且 RabbitMQ 会负责执行这些重试策略

手动确认模式下,消费者需要显式地对消息进行确认。如果消费者在处理消息时遇到异常,可以选择不确认消息,使消息重新入队

  • 重试的控制权在于应用程序本身,而不是 RabbitMQ 的内部机制
  • 应用程序可以通过自己的逻辑和利用 RabbitMQ 的高级特性来实现有效的重试策略

使用重试机制时需要注意:

  1. 自动确认模式下:程序逻辑异常,多次重试还是失败,消息就会被自动确认,那么消息就丢失了
  2. 手动确认模式下:程序逻辑异常,多次重试消息依然处理失败,无法被确认,就一直是 unacked 的状态,导致消息积压

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/92173.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/92173.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

零基础人工智能学习规划之路

一、引言:为什么选择人工智能?人工智能(AI)是当前科技领域最炙手可热的方向之一,涵盖机器学习、深度学习、计算机视觉、自然语言处理等多个分支。无论是就业市场的高需求,还是技术改变生活的潜力&#xff0…

【科研绘图系列】R语言绘制误差棒图

文章目录 介绍 加载R包 数据下载 导入数据 数据预处理 画图 系统信息 参考 介绍 【科研绘图系列】R语言绘制误差棒图 加载R包 library(tidyverse) library(ggplot2) library(ggsignif) library(RColorBrewer) library(waterfalls) library(reshape2

期权定价全解析:从Black-Scholes到量子革命的金融基石

在金融市场中,期权定价如同航海中的罗盘,为风险定价提供方向。本文将深入剖析期权定价的核心逻辑、应用场景及量子计算带来的颠覆性变革,并附实战代码示例。 一、期权定价的本质:风险的时间价值 1. 核心公式解析 C = e^{-rT}\mathbb{E}^\mathbb{Q}[\max(S_T-K,0)] C:期权…

实现div内容的垂直居中

Flexbox 弹性盒子(推荐) div {display: flex;align-items: center; /* 垂直居中 */justify-content: center;/* 水平居中 */height: 300px; /* 需要指定高度 */ }✅ 现代浏览器首选方案,支持响应式布局 Grid 网格布局 div {displ…

Juc高级篇:可见性,有序性,cas,不可变,设计模式

目录 一.Java内存模型 1.可见性 1.1设计模式 (1.1.1)两阶段终止 (1.1.2)Balking模式 2.有序性 3.volatile原理 3.1保证可见性与有序性 3.2单例模式DCL 3.3 happens-before规则 4.线程安全单例 4.1饿汉式 二.无锁并发 1.原子整数 2.原子引用 2.1 AtomicReference…

JDK源码

java.util.concurrent 以下是atomic包下的 AtomicInteger Unsafe类:提供的方法可以直接访问内存、线程。 属性:Unsafe、int value 通过Unsafe方法中的CAS循环,保证int类型值的原子操作 int var5; do {var5 this.getIntVolatile(var1, var2);…

Linux网络编程【基于UDP网络通信的字典翻译服务】

1. 基本框架:前面我们已近完成了,基于UDP协议的网络通信,但是我们服务器接收到来自客户端的信息即字符串时只是进行了简单的发送会客户端和在日志中回显打印,并没有实际的业务服务。那么接下来,我们就设计一个字典翻译…

Quality Control II: Trimming (二):BBDuk

参考:BBDuk Guide - Archive 在我们了解了如何使用trimmomatic之后,我们开始进一步了解另外一种trim工具BBDuk 首先小编要声明:如果想要完全掌握一个工具是需要较长时间的钻研和学习的,这里呢只是提供BBDuk处理数据的基本逻辑和…

AlmaLinux8 平替 manylinux_2_28-python 的 GPG密钥管理、安装 cuda sdk

0. 下载 AlmaLinux 8 docker 镜像 https://hub.docker.com/r/almalinux/8-base/tags 下载镜像: sudo docker pull almalinux/8-base:8.4 创建一个容器: sudo docker run --gpus all -it --name cudaq_src_py_LHL_06 -v /home/hanmeimei/big…

BM1684X平台:Qwen-2-5-VL图像/视频识别应用

一、 简介 Qwen-2-5-VL 是阿里巴巴通义千问团队推出的多模态大语言模型(MLLM),属于 Qwen-2 系列模型的一部分,支持视觉(Vision)与语言(Language)的多模态交互。 1、特性 动态分辨…

前端项目工程化配置webpack与vite

webpack与vite一、了解 webpack入口(entry)输出(output)loader插件(plugin)模式(mode)二、项目中使用webpackvue项目react项目三、了解vite构建选项(build)模块解析(Resolve)模块处理(Module)服务器选项&am…

机器学习(3):KNN算法-分类

一、KNN算法 K-近邻算法(K-Nearest Neighbors,简称KNN),根据K个邻居样本的类别来判断当前样本的类别;如果一个样本在特征空间中的k个最相似(最邻近)样本中的大多数属于某个类别,则该类本也属于这个类别。一些距离&…

Redis Windows迁移方案与测试

我想将开源软件Redis的主程序和附属程序迁移到Windows平台,目前它只能在Linux上运行,让它可以在Windows 11和Windows Server 2025上运行,这需要考虑Linux操作系统和Windows操作系统的差异,请列举出将Redis在Linux系统上运行的GCC的…

信息安全概述--实验总结

数据链路层--ARP欺骗ARP欺骗原理XP2要与XP3通信,要发送ARP请求,询问XP3的MAC地址kali冒充XP3持续给XP2发送ARP应答,XP2会以为收到的MAC地址是XP3的,实际是kali的之后XP2发送的数据都是发给kali的如果说XP2需要想要访问互联网&…

【Electron】打包后图标不变问题,图标问题

windows上图标未更换。图标已经换了,但新打出的包或是安装后的 exe 图标没有更换。这个时候可以右击你的exe或是安装包点属性,看看图标是否正常,如果这里的图标正常,那其实就是成功的了。主要原因是因为 windows 图标缓存机制导致…

单词拆分 II

题目&#xff1a;思考&#xff1a; 本质上和单词拆分1没什么区别单词拆分1是问能不能拆单词拆分2是问把所有拆的方案列出来要列出所有方案&#xff0c;采用字典树回溯 实现&#xff1a; class Node { public:vector<Node*> check;bool isEnd;Node(int num){for (int i0;i…

国产三防平板电脑是什么?三防平板推荐

国产三防平板电脑&#xff0c;专为应对极端工作环境而生。这类设备集防水、防尘、防摔三大防护性能于一体&#xff0c;通过IP67/IP68防护认证及MIL-STD-810军规标准测试&#xff0c;能在建筑工地、油田勘探、应急救援等恶劣场景中稳定运行。其核心价值在于将消费级平板的智能体…

优思学院|什么是精益生产管理?原则与方法详述

在企业经营中&#xff0c;「利润&#xff1d;价格&#xff0d;成本」这条公式可谓家喻户晓。传统的成本思维通常认为价格由公司设定&#xff0c;而成本则是难以撼动的既定事实。然而&#xff0c;随着市场经济与自由定价机制的成熟&#xff0c;企业逐渐意识到——价格其实是由市…

【银行测试】银行票据项目业务+票据测试点分析(四)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、提示付款 功能…

基于华为开发者空间的Open WebUI数据分析与可视化实战

1 概述 1.1 案例介绍 本案例演示如何在华为开发者空间云主机上搭建Open WebUI环境&#xff0c;结合DeepSeek-R1模型进行数据分析、统计建模、数据可视化和业务洞察挖掘等实际数据科学任务。 1.2 适用对象 数据分析师业务分析师数据科学工程师市场研究人员统计学专业学生 1…