详解RabbitMQ高级特性之发送方确认机制

目录

发送方确认

添加配置

常量类

声明队列和交换机并绑定二者关系

confirm确认模式 

编写生产消息代码

生产消息1

解决方法

多次生产消息2

解决方法

生产消息3

return 模式

编写生产消息代码(路由正确)

生产消息1

编写生产消息代码(路由错误)

生产消息2

面试题


发送方确认

在使⽤ RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ为我们提供了两种解决⽅案:

a. 通过事务机制实现
b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多, 下面主要介绍confirm机制来实现发送⽅的确认.

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递:

1. confirm确认模式
2. return退回模式

添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionpublisher-confirm-type: correlated   #消息发送确认
常量类
public class Constants {//发送方确认public static final String CONFIRM_QUEUE = "confirm.queue";public static final String CONFIRM_EXCHANGE = "confirm.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {//发送方确认@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
confirm确认模式 

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达
Exchange, 这个监听都会被执⾏, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false。

RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别

在RabbitMQ中, ConfirmListener和ConfirmCallback都是⽤来处理消息确认的机制, 但它们属于不同的客⼾端库, 并且使⽤的场景和⽅式有所不同.
1. ConfirmListener 是 RabbitMQ Java Client 库中的接⼝. 这个库是 RabbitMQ 官⽅提供的⼀个直接与RabbitMQ服务器交互的客⼾端库. ConfirmListener 接⼝提供了两个⽅法: handleAck 和handleNack, ⽤于处理消息确认和否定确认的事件.
2. ConfirmCallback 是 Spring AMQP 框架中的⼀个接⼝. 专⻔为Spring环境设计. ⽤于简化与
RabbitMQ交互的过程. 它只包含⼀个 confirm ⽅法,⽤于处理消息确认的回调.
在 Spring Boot 应⽤中, 通常会使⽤ ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可以利⽤ Spring 的配置和依赖注⼊功能. ⽽在使⽤ RabbitMQ Java Client 库时, 则可能会直接实现ConfirmListener 接⼝, 更直接的与RabbitMQ的Channel交互

编写生产消息代码
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息发送成功";}
}
public interface ConfirmCallback {
        /**
        * 确认回调
        * @param correlationData: 发送消息时的附加信息 , 通常⽤于在确认回调中识别特定的消
        * @param ack: 交换机是否收到消息 , 收到为 true, 未收到为 false
        * @param cause: 当消息确认失败时 , 这个字符串参数将提供失败的原因 . 这个原因可以⽤于调 试和错误处理 .
        * 成功时 , cause null
        */
        void confirm ( @Nullable CorrelationData correlationData, boolean ack,
        @Nullable String cause);
}

生产消息1

第一次生产消息

第二次生产消息

此时我们看到,第一次生产消息时能够正常生产消息,但是当我们第二次生产消息时却抛异常了,异常信息为:java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate

解决方法

是为什么呢?从异常信息中我们可以看到,ConfirmCallback只能被设置一次,但是从我们的代码中可以看到,我们每次生产消息时都会设置一次ConfirmCallback,显然这就是问题所在。

下面我们把刚刚的ConfirmCallback提取出来,重新设置RabbitTemplate。

RabbitTemplateConfig

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});return rabbitTemplate;}
}

ProducerController

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息发送成功";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息发送成功";}
}

生产消息

多次生产消息2

此时我们可以看到,我们解决了前面多次生产消息导致的ConfirmCallback被设置多次的问题,但是我们此时的代码就真的没有问题了吗?

当我们生产其它消息时,发现我们并没有给这个生产消息的方法设置ConfirmCallback啊,但是为什么在控制台上看到执行了我们设置的ConfrimCallback,这是为什么呢?

是因为我们在前面设置了RabbitTemplate,而且使用了@Autowired注解注入了RabbitTemplate,虽然我们注入了两个,一个是rabbitTemplate,一个是confirmRabbitTemplate,但是这两个都是同一个RabbitTemplate。

解决方法

解决办法:我们在RabbitTemplateConfig中设置两个RabbitTemplate.

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});return rabbitTemplate;}
}

与此同时,我们修改注入方式:

此时,当再次使用/producer/pres来生产消息时,就没问题了。

生产消息3

下面我们修改一下生产消息时给消息设置的路由规则:

    @RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息发送成功";}

生产消息

我们知道,上面生产消息时给消息设置的路由规则并不存在,按道理说,应该会打印“未收到消息”而非“收到消息”,原因是因为,上面的confirm确认模式是用来确定生产消息是否到达了交换机,而上面的路由规则是针对消息从交换机到队列的,解决上面的路由问题使用到另一种确认模式。

return 模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理。

修改RabbitTemplateConfig,设置消息退回的回调方法

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
}

使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false). 这个属性
的作⽤是告诉RabbitMQ, 如果⼀条消息⽆法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此时 ReturnCallback 就会被触发。

回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:

public class ReturnedMessage {
        //返回的消息对象,包含了消息体和消息属性
        private final Message message;
        //由 Broker 提供的回复码 , 表⽰消息⽆法路由的原因 . 通常是⼀个数字代码,每个数字代表不同 的含义 .
        private final int replyCode;
        //⼀个⽂本字符串 , 提供了⽆法路由消息的额外信息或错误描述 .
        private final String replyText;
        //消息被发送到的交换机名称
        private final String exchange;
        //消息的路由键,即发送消息时指定的键
        private final String routingKey;
}
编写生产消息代码(路由正确)
    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "returns test...", correlationData);return "消息发送成功";}
生产消息1

编写生产消息代码(路由错误)
    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);return "消息发送成功";}
生产消息2

此时我们可以看到,队列中依旧是只有1条消息,而且代码执行了消息退回,而且消息退回时打印了消息信息,显然我们可以看到,消息的路由规则是错误的,不会入队列。

面试题

如何保证RabbitMQ消息的可靠传输?

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:

1. ⽣产者将消息发送到 RabbitMQ失败
        a. 可能原因: ⽹络问题等
        b. 解决办法: [发送⽅确认-confirm确认模式]
2. 消息在交换机中⽆法路由到指定队列:
        a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
        b. 解决办法: [发送⽅确认-return模式]
3. 消息队列⾃⾝数据丢失
        a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
        b. 解决办法: [持久性]. 开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会⾃动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的⽅式提⾼可靠性)
4. 消费者异常, 导致消息丢失
        a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
        b. 解决办法: [消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认, 当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制, 当消息消费异常时, 通过消息重试确保消息的可靠性。

欢迎大家来访问我的主页----》链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/909738.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/909738.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Google Play开发者账号8.3/10.3政策违规自救指南

最近,有一位开发者焦急地向我们诉说,其辛苦开发的多个应用,毫无征兆地全部下架,账户提示违反政策 8.3 和 10.3。经过连夜排查,原来是换皮应用与误导性描述导致的问题。 这并非个例,在 2024 年,G…

pythonday50

作业: 1.好好理解下resnet18的模型结构 2.尝试对vgg16cbam进行微调策略 import torch import torch.nn as nn import torch.optim as optim import torchvision import torchvision.transforms as transforms from torchvision import models from torch.utils.d…

天猫618高增长背后:电商迈入价值战新周期

作者 | 曾响铃 文 | 响铃说 这次618,来“真”的了。 天猫618玩法变得极致简单,只设了“官方立减”的85折的基础优惠,再叠加行业品类券、国补等优惠,最高立减可达50%,十分直观。 让消费者省心的结果也是显而易见的&…

tauri+vue自动更新客户端打包配置

拉取最新代码打开项目根目录下"~.tauri\myapp.key"文件并复制内容 打开项目的powershell窗口,输入如下内容并回车 $env:TAURI_SIGNING_PRIVATE_KEY"复制的myapp.key" $env:TAURI_SIGNING_PRIVATE_KEY_PASSWORD""然后修改tauri.conf.…

硬件------51单片机

一.基本概念 1.裸机程序 BSP BSP:bord suppord pack 板级支持包 就是程序编写的内容是没有操作系统的,直接通过代码去控制寄存器,让硬件按照要求去工作。 主要内容:51单片机 IMAX6ULL 2.linux驱动部分 在裸机BSP程序的基础…

java 基础方法 list分页

新增一个list 泛型分类方法 hutools没这个方法, mybatis 里面的方法不好用 故新增此方法 package com.common.base.util.page;import lombok.Data;import java.util.List;/*** className: VoPage* description: list分页* author: chenyuanlong* date: 2025年6月16日 0016 上午…

操作系统期末复习--操作系统初识以及进程与线程

操作系统概念与主要功能 操作系统的概念 在信息化时代,软件是计算机系统的灵魂,而作为软件核心的操作系统,已与现代计算机系统密不可分、融为一体。计算机系统自下而上大致分为4部分:硬件、操作系统、应用程序和用户 操作系统管…

使用jhat查看dump.hprof文件内具体对象的属性值信息

jhat是JDK自带的堆转储分析工具,可以用来查看.hprof文件中对象的具体内容。本文演示使用的是JKD8. 一、启动jhat 执行启动命令。 jhat -J-Xmx4g your_heap_dump.hprof -J-Xmx4g表示为jhat分配4GB内存,根据你自己情况调整大小。your_heap_dump.hprof是…

freeRTOS之队列(queue)

一.概述 1.介绍 队列(queue)可以用于"任务到任务"、“任务到中断”、"中断到任务"直接传输信息。 2.核心功能 线程安全:自动处理多任务访问时的互斥问题。 数据复制:入队时复制数据(而非引用),…

【python】typing用法

一、基础类型提示 1. 基本类型注解 # 变量类型注解 age: int 30 name: str "Alice" is_student: bool False height: float 1.752. 函数注解 def greet(name: str, age: int) -> str:return f"Hello {name}, you are {age} years old!"二、组合类…

web前端开发核心基础:Html结构分析,head,body,不同标签的作用

前端技术协同关系 协作流程:HTML构建页面框架—>css美化样式(选择器属性)—>JavaScript实现交互(类似于python的脚本语言)扩展基础:在上面三项基础上学习Vue\React、构建工具WePack和浏览器工作原理…

精益数据分析(105/126):移动应用核心指标解析与用户分层营收策略

精益数据分析(105/126):移动应用核心指标解析与用户分层营收策略 在移动应用市场竞争白热化的今天,单纯追求下载量已无法保证商业成功,精细化运营核心指标成为盈利关键。本文将深入解析每日活跃用户平均营收&#xff…

被CC攻击了,对服务器有什么影响?

博客正文: 最近,不少网站管理员和运维人员反映遭遇了CC攻击,导致服务器性能异常甚至瘫痪。那么,CC攻击究竟会对服务器造成哪些影响?本文将为你简要解析CC攻击的原理及其带来的危害,帮助你更好地理解并应对…

Tensorflow安装出现dependency conflict错误

Python版本: 3.11.4 pip版本已升到最新 电脑上有mac的原装Python2.x,我装的3.11.4,还有个什么依赖的3.9 运行 pip3 install tensorflow 出现类似以下错误 (我报错的是另一个不是tensorflow—estimator,但基本就是…

2025年HTTP半开与错误攻击防御指南:原理拆解与实战防护

你以为限流就能防住HTTP攻击?黑客用协议畸形包AI调度正在撕裂传统防线! 一、HTTP半开攻击:慢速绞杀服务器资源 ▶ 攻击原理剖析 HTTP半开攻击(如Slowloris)是一种应用层DoS攻击,通过建立大量半开连接耗尽…

Mybatis(XML映射文件、动态SQL)

目录 基础操作 准备: 删除: 新增: 更新: 查询: 条件查询: XML映射文件 动态SQL if foreach sql&include 基础操作 准备: 准备数据库表 创建一个新的springboot工程&#xff0…

python校园拼团系统

目录 技术栈介绍具体实现截图系统设计研究方法:设计步骤设计流程核心代码部分展示研究方法详细视频演示试验方案论文大纲源码获取/详细视频演示 技术栈介绍 Django-SpringBoot-php-Node.js-flask 本课题的研究方法和研究步骤基本合理,难度适中&#xf…

多模态大语言模型arxiv论文略读(127)

When SAM2 Meets Video Camouflaged Object Segmentation: A Comprehensive Evaluation and Adaptation ➡️ 论文标题:When SAM2 Meets Video Camouflaged Object Segmentation: A Comprehensive Evaluation and Adaptation ➡️ 论文作者:Yuli Zhou, …

剑指offer32_二叉搜索树的后序遍历序列

二叉搜索树的后序遍历序列 输入一个整数数组,判断该数组是不是某二叉搜索树的后序遍历的结果。 如果是则返回true,否则返回false。 假设输入的数组的任意两个数字都互不相同。 数据范围 数组长度 [ 0 , 1000 ] [0,1000] [0,1000]。 样例 输入&…

《仿盒马》app开发技术分享-- 订单结合优惠券结算(端云一体)

技术栈 Appgallery connect 开发准备 上一节我们已经实现了优惠券的选择,并且成功的把券后的价格也展示给用户,不能使用的优惠券我们也用友好的方式告知用户,这一节我们来实现优惠券内容的下一步,优惠券内容结合订单进行结算提…