SpringAMQP 的发布方确认

前言

这里的发布方确认是以 SpringAMQP 写的,之前我们在前面的篇章中就学过了 使用 Java 原生的SDK编写,当时是发布确认模式,在这里我们将用 Spring 集成的 rabbitmq 方法来编写

开启发布者确认机制需要进行下面的配置,以 yml 为例:

spring:rabbitmq:publisher-confirm-type: correlated   #消息发送确认

在使用RabbitMQ的时候,可以通过消息持久化来解决因为服务器的异常崩溃而导致的消息丢失,但是还有一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?

如果在消息到达服务器之前已经丢失(比如RabbitMQ重启,那么RabbitMQ重启期间生产者消息投递失败),持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ为我们提供了两种解决方案:
a.通过事务机制实现
b.通过发送方确认(publisherconfirm)机制实现

事务机制比较消耗性能,在实际工作中使用也不多,咱们主要介绍confirm机制来实现发送方的确认.
RabbitMQ为我们提供了两个方式来控制消息的可靠性投递
1.confirm确认模式
2. return退回模式

我们提前准备一下交换机和队列:

    //发送方确认public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";public static final String CONFIRM_ROUTING_KEY = "CONFIRM_ROUTING_KEY";
    //发送方确认@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(MQConstants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.directExchange(MQConstants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.CONFIRM_ROUTING_KEY).noargs();}

confirm

Producer在发送消息的时候,对发送端设置一个ConfirmCallback的监听,无论消息是否到达
Exchange,这个监听都会被执行,如果Exchange成功收到,ACK(Acknowledgecharacter,确认
字符)为true,如果没收到消息,ACK就为false.

confirm 模式是用于确认发送方的消息是否到达交换机

要启动 confirm ,我们还需要创建一个新的 RabbitTemplate,通过rabbitTemplate.setConfirmCallback 来进行设置,ack 有两种返回值,true和false,根据不同的返回值做不同的处理

@Configuration
public class RabbitTemplateConfig {/*** 构建原生的 RabbitTemplate* @param connectionFactory* @return*/@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}/*** 构建发布方确认的 RabbitTemplate* @param connectionFactory* @return*/@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("调用了 confirm 方法");if(ack){System.out.printf("消息发送成功,消息ID:%s \n", correlationData == null ? null : correlationData.getId());}else {System.out.printf("消息发送失败, 消息ID:%s, cause: %s \n", correlationData == null ? null :correlationData.getId(), cause);//响应的业务处理,如重发消息等等...}}});return rabbitTemplate;}
}

这里如果项目需要使用到原始的 RabbitTemplate 的话,需要创建一个,因为在你自定义创建过 RabbitTmeplate 的时候,Spring 就不会自动帮你创建原始的 RabbitTemplate 了。

参数介绍:

public void confirm(CorrelationData correlationData, boolean ack, String cause)

correlationData是消息的唯一标识,也就说序列号,我们可以通过生产者那边进行设置,注意这里和消费者那边的 deliveryTag 是不一样的,correlationData是生产者使用的,deliveryTag 是队列创建的

ack 是否确认消息达到
cause 存储消息发送失败的原因

我们来演示消息发送失败的场景:

    @RequestMapping("/confirm")public String confirm() {for (int i = 0; i < 10; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());confirmRabbitTemplate.convertAndSend(MQConstants.CONFIRM_EXCHANGE, "hello", "confirm: " + i, correlationData);}return "消息发送成功";}

我们将 routingkey 设置错误,观察效果:
在这里插入图片描述
我们会发现消息发送成功,这是因为消息确实达到的指定的交换机,但是我们查看队列的时候是没有消息的,因此进一步验证了 confirm 模式只是确认消息是否达到交换机,并不能确认是否到达指定的队列


接下来我们来指定一个不存在的交换机:

    @RequestMapping("/confirm")public String confirm() {for (int i = 0; i < 10; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());confirmRabbitTemplate.convertAndSend("hello 85246", MQConstants.CONFIRM_ROUTING_KEY, "confirm: " + i, correlationData);}return "消息发送成功";}

在这里插入图片描述
在 cause 的打印信息中:得知队列不存在,因此 confirm 确实可以验证消息有没有达到指定的交换机里

returns

消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中.Exchange到Queue的过程,如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理

简单来说,returns 是确认交换机上的消息是否成功到达队列上的

    @Bean("returnRabbitTemplate")public RabbitTemplate returnRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("调用了 returnedMessage 方法");System.out.printf("消息被退回:%s \n", returned);}});return rabbitTemplate;}

使用RabbitTemplate的setMandatory方法设置消息的mandatory属性为true(默认为false).这个属性的作用是告诉RabbitMQ,如果一条消息无法被任何队列消费,RabbitMQ应该将消息返回给发送者,此时ReturnCallback就会被触发.

如果我们指定一个不存在的 routingkey,那就没有队列能接收到消息,就会触发ReturnsCallback方法
在这里插入图片描述

合并使用

当然这两个模式是可以一起使用的,代码演示:

    /*** 构建发布方确认的 RabbitTemplate* @param connectionFactory* @return*/@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("调用了 confirm 方法");if(ack){System.out.printf("消息发送成功,消息ID:%s \n", correlationData == null ? null : correlationData.getId());}else {System.out.printf("消息发送失败, 消息ID:%s, cause: %s \n", correlationData == null ? null :correlationData.getId(), cause);//响应的业务处理,如重发消息等等...}}});//设置消息退回回调rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("调用了 returnedMessage 方法");System.out.printf("消息被退回:%s \n", returned);}});return rabbitTemplate;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/96250.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/96250.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一套自用的git提交规范,可清晰的识别到关联的任务/bug

分享一套自用的git提交规范&#xff0c;可清晰的识别到关联的任务/bug 一、提交信息的基本结构 推荐使用约定式提交的一种变体&#xff0c;结构如下&#xff1a; <类型>(<范围>): <主题> [#<禅道-ID>]<正文>&#xff08;可选&#xff09;<脚注…

从音频到文本实现高精度离线语音识别

会议频繁&#xff0c;记录繁琐&#xff1f;语音转换成文字工具价格高昂&#xff0c;自己手动整理又耗时费力&#xff1f; 它支持本地离线运行&#xff0c;无需联网&#xff0c;所有数据留在本地&#xff0c;隐私安全毫无顾虑&#xff0c;同时它的功能是实时语音转文字&#xf…

SpringMVC 工作原理

SpringMVC 工作原理 SpringMVC 是 Spring 框架中用于构建 Web 应用的核心模块&#xff0c;其工作流程围绕 “前端控制器&#xff08;DispatcherServlet&#xff09;” 展开&#xff0c;通过组件间的协作完成请求处理与响应。理解其工作原理是掌握 SpringMVC 开发的关键&#xf…

HoRain云--Python机器学习神器:Sklearn全解析

&#x1f3ac; HoRain云小助手&#xff1a;个人主页 &#x1f525; 个人专栏: 《Linux 系列教程》《c语言教程》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 推荐 前些天发现了一个超棒的服务器购买网站&#xff0c;性价比超高&#xff0c;大内存超划算&#xff01;…

疯狂星期四文案网第64天运营日记

网站运营第64天&#xff0c;点击观站&#xff1a; 疯狂星期四 crazy-thursday.com 全网最全的疯狂星期四文案网站 运营报告 今日访问量 今日搜索引擎收录情况

设计一个 AB 测试平台

1. 需求明确化 功能需求实验管理 创建、编辑、删除、复制实验设置实验参数&#xff08;变体、权重、目标指标、时长等&#xff09;实验状态管理&#xff08;草稿、运行中、已结束&#xff09;用户分流与分配 支持多种分流策略&#xff08;随机分配、分层分配、定向分配&#xf…

HiCMAE 论文复现:基于 RAVDESS 数据集的音视频情感识别

HiCMAE 论文复现:基于 RAVDESS 数据集的音视频情感识别 1. 项目背景与论文概述 1.1 多模态情感识别背景 多模态情感识别是人工智能领域的重要研究方向,旨在通过结合多种感知模态(如音频、视频、文本等)来更准确地识别人类情感状态。与传统单模态方法相比,多模态方法能够…

HarmonyOS 数据处理性能优化:算法 + 异步 + 分布式实战

摘要 不管是写 App&#xff0c;还是做 IoT 设备开发&#xff0c;数据处理都是绕不开的主题。你可能要处理几百条传感器数据&#xff0c;也可能要应对几十万条用户行为日志。如果算法不够高效&#xff0c;应用就会卡顿甚至直接崩溃。尤其是在 HarmonyOS&#xff08;鸿蒙系统&…

华为麒麟操作系统运维常见知识点

1.开放root账号密码登录。(1)修改/etc/ssh/sshd_config文件中&#xff0c;PermitRootLogin 属性值为yes。PermitRootLogin yes(2)使用passwd命令设置root密码。sudo su 切换到root账户下&#xff0c;使用passwd 设置密码。(3)重启sshd服务。systemctl restart sshd2.避免使用ch…

嵌入式面试|MCU+RTOS技术栈——面试八股文整理3:STM32

目录 1.单片机启动流程 2.看门狗 3.最小系统 4.ROM、RAM、Flash 5.EPROM、EEPROM 6.Bootloader与OTA 7.NAND FLASH 和NOR FLASH 相同点 区别 适用场景 8.CPU、MPU、MCU、SOC、SOPC 9.交叉编译 10.寄存器 寄存器的作用 寄存器与内存的区别 11.Cortex-M3寄存器组…

用 Wisdom SSH 轻松实现服务器自动化任务调度

用Wisdom SSH轻松实现服务器自动化任务调度 在服务器管理工作中&#xff0c;自动化任务调度至关重要&#xff0c;它能让系统在特定时间自动执行预设任务&#xff0c;极大提升运维效率。Wisdom SSH作为一款具备AI助手的强大工具&#xff0c;为自动化任务调度带来便捷解决方案。 …

远场学习_FDTD_dipole(1)

项目4.4 Reflection calculation using a dipole source在此页面中&#xff0c;我们采用了一种不同于标准平面波源方法的替代模拟设置&#xff0c;使用偶极子源来计算多层堆叠结构的反射。在此情况下&#xff0c;我们使用空气 - 玻璃界面。这种技术很有吸引力&#xff0c;因为它…

机器学习入门,用Lima在macOS免费搭建Docker环境,彻底解决镜像与收费难题!

国内用户必看】用Lima在macOS免费搭建Docker环境&#xff0c;彻底解决镜像与收费难题&#xff01; 为了在不同操作系统有一致操作体验&#xff0c;我选择使用docker技术安装ollama&#xff0c;这样子还有一个好处&#xff0c;即使玩坏了&#xff0c;删除重建即可&#xff0c;所…

Java 生成签名证书

在Java中生成签名证书通常涉及到使用Java密钥和证书管理工具&#xff08;Java Key and Certificate Management API&#xff0c;即Java Keytool&#xff09;。这个过程通常包括创建密钥库&#xff08;KeyStore&#xff09;&#xff0c;生成密钥对&#xff08;Key Pair&#xff…

语法分析:编译器中的“语法警察”

在编程的世界里&#xff0c;每一种编程语言都有自己的语法规则。就像中文有标点符号和语序规则一样&#xff0c;编程语言也有自己严格的语法规则。语法分析器就像一个严格的"语法警察"&#xff0c;它的职责是检查源代码是否符合语言规范&#xff0c;同时为后续的处理…

容器的定义及工作原理

定义 你可以把 容器 想象成一个 “打包好的软件快递箱”。 里面有什么&#xff1f; 这个箱子里不仅装着你的软件&#xff08;比如一个网站程序&#xff09;&#xff0c;还把软件运行所需要的所有东西都打包进去了&#xff0c;比如它需要哪个版本的Python、需要哪些系统文件、配…

云服务扫盲笔记(2) —— SLS 接入与设置自动化

前篇我们学习了SLS的核心用途及概念&#xff0c;本篇以将一个linux服务器的json格式日志接入阿里云SLS为例&#xff0c;继续学习SLS接入中的关键设置及注意事项&#xff0c;以及如何将其实现简单自动化快速操作。 一、 SLS 日志接入流程 [1] 准备工作&#xff08;确定日志路径…

LwIP入门实战 — 6 LwIP 网络数据包

目录 6.1 pbuf结构体 6.2 pbuf 的类型 6.2.1 PBUF_RAM 类型的pbuf 6.2.2 PBUF_POOL 类型的pbuf 6.2.3 PBUF_ROM 和 PBUF_REF 类型pbuf 6.3 pbuf 6.3.1 pbuf_alloc() 6.3.2 pbuf_free() 6.4 其它pbuf 操作函数 6.5 网卡中使用的 pbuf 6.5.1 low_level_output() 6.5.…

【已解决】Linux中程序脚本可以手动执行成功,但加在rc.local中不能开机自启

之前开发遇到的一个问题&#xff1a;在Linux中&#xff0c;明明程序脚本可以手动执行成功&#xff0c;但加到开机自启动里&#xff0c;却会失败&#xff0c;属实让人摸不着头脑。 问题排查&#xff1a; 有以下几种可能&#xff1a; 自启动脚本&#xff0c;执行权限不足或者脚本…

切块、清洗、烹饪:RAG知识库构建的三步曲

嘿&#xff0c;各位AI技术爱好者们&#xff0c;你是不是经常遇到这样的情况&#xff1a;辛辛苦苦训练的AI助手&#xff0c;面对专业问题时却"一问三不知"或者"胡言乱语"&#xff1f;明明你已经喂了它一堆PDF和Word文档&#xff0c;为啥它就是不会用&#x…