RabbitMQ 高级特性之消息确认

1. 简介

RabbitMQ 的消息发送流程:

  • producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费

那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。

“消息确认” 讨论的是 consumer 与 broker 之间的消息发送。

2. 为什么会有这个特性

当 broker 给 consumer 发送消息时,可能会出现下面两种情况:

  • 消息未成功到达 consumer;
  • 消息成功到达 consumer,但是 consumer 没有成功消费这条消息,如:在处理消息时发生异常等情况。

这时,就需要有一种解决方案,保证 broker 与 consumer 之间消息传输的可靠性,于是就有了消息确认这一特性。

3. 使用 RabbitMQ Java 时如何进行消息确认(不是重点)

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

这是一段路由模式的代码,在这段代码中,有下面一条语句:

channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

在这个方法中,有三个参数:

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • queue:consumer 通过哪个队列获取 broker 发送的消息;
  • autoAck:是否自动确认;
  • callback:consumer 消费消息的逻辑。

其中,autoAck 就是消息确认的体现:

  • autoAck 为 true:RabbitMQ 会将发送给 consumer 的消息视为已被成功接收和消费(consumer 可能并没有成功接收到或成功消费,但是 RabbitMQ 不管了),就会被将这条消息删除;
  • autoAck 为 false:当 RabbitMQ 发送消息后,并不会马上就将消息删除,而是会等 consumer 调用 Basic.Ack,收到 ack 后,才会将消息删除。

将 autoAck 设置为 false 后,若 broker 长时间没有收到 consumer 发送的 ack 且 consumer 已经断开连接,就会将这条消息重新入队列,继续发送给 consumer 进行消费,此时,队列中的消息就分为了两种:

  • 还未被发送的消息;
  • 已经发送了的消息,但是没有收到 ack 而重新入队列等待被消费。

4. 在 spring 中使用 RabbitMQ 时如何进行消息确认

4.1 basicAck

在 spring 下的 Channel 类中提供了下面几种方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

在这个方法中,有三个参数:

  • deliveryTag:是 broker 给 consumer 发送消息的唯一标识,在一个 channel 中 deliveryTag 是唯一的;
  • mulitple: 是否批量确认

使用这个方法后,就会告知 broker 这条消息已经成功被消费,可以将其删除。

4.2 basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

 在这个方法中,多了一个参数:

  • requeue:是否重新入队列。

使用这个方法,就相当于给 broker 发送 nack,即这条消息没有被正确消费。

若 requeue 为 true,就会将这条消息重新入队列,继续给 consumer 消费;

若 requeue 为 false,broker 就会这条消息删除。

4.3 basicReject

void basicReject(long deliveryTag, boolean requeue) throws IOException;

这个方法与 basicNack 大致相同,此处省略。

4.4 配置

在 spring 中,提供了三种配置用于消息确认:

  • none:当消息发送给 consumer,不管 consumer 是否成功消费了消息,broker 都会当作这条消息被成功消费了,然后删除这条消息;
  • auto:在 consumer 处理消息时没有抛出异常时,就会确认消息,反之就不会确认,并且将消息重新放入队列中,进行下一次的消费;
  • manual:手动确认,我们需要在代码中指定这条消息是消费成功还是消费失败,分别使用 basicAck 和 basicNack。
spring:rabbitmq:listener:simple:acknowledge-mode: none

5. 代码测试

@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {String messageInfo = "consumer ack mode test...";rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);return "消息发送成功";}
}

这段代码代表的是一个 producer,下面接收到的消息都是通过这段代码发送的。

5.1 none

① 无异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代码运行结果如下:

我们可以通过访问 RabbitMQ 客户端来观察这条消息是否成功被消费:

 

可以看到,Messages 这一列中,Ready 和 Unacked 都为 0,表示消息被成功消费。 

 ② 有异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代码运行结果如下:

由于我们使用了除零操作,于是抛出了异常,我们可以通过访问 RabbitMQ 来观察这条消息是否被删除:

和上面一样,在 broker 中这条消息已经被删除,这与 none 配置性质一致。

5.2 auto 

① 无异常时的消费者代码:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

代码运行结果如下:

在 RabbitMQ 客户端中显示,这条消息已经被成功消费:

 

② 有异常时的消费者代码:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

 代码运行结果如下:

在运行结果中,一直会有报错产生,并且都是两个两个为一组,并且在报错信息中可以看到,producer 发送的消息一直在被消费,这是因为存在异常,就会导致这条消息一直在队列中,通过观察 RabbitMQ 客户端可以看出,这条消息依然保存在队列中:

5.3  manual

① 无异常的消费者代码如下:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在这段代码中,我们使用了 basicAck 和 basicNack 来进行消息确认,当消息处理成功后,就会执行 basicAck,告诉 broker 这条消息已经被成功消费,可以将其删除;当消息执行发生异常后,就会执行 basicNack,并且根据 requeue 参数决定如何处理这条消息。

代码运行结果如下:

RabbitMQ 客户端显示这条消息被成功消费:

 

 ② 有异常的消费者代码如下:

当 requeue 为 true:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);int n = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在此处的 basicNack,将 requeue 设置为了 true,当消息处理失败后,就会将消息重新入队列,重新被消费:

我们可以看到,这条消息一直在被消费,并且 delivertTag 在递增。

并且从 RabbitMQ 客户端中可以看到,这条消息依然存在,等待被成功消费:

 当 requeue 为 false:

当处理消息发生异常后,就会将消息从队列中删除。

代码运行结果如下:

虽然异常依然存在,但是消息却没有重复发送,并且 RabbitMQ 中也将这条消息删除:

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/89704.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/89704.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【51单片机用数码管显示流水灯的种类是按钮控制数码管加一和流水灯】2022-6-14

缘由 #include "REG52.h" unsigned char code smgduan[]{0x3f,0x06,0x5b,0x4f,0x66,0x6d,0x7d,0x07,0x7f,0x6f,0x77,0x7c,0x39,0x5e,0x79,0x71,0,64}; //共阴0~F消隐减号 unsigned char Js0, miao0;//中断计时 秒 分 时 毫秒 sbit k0P3^0; sbit k1P3^1; void smxs(u…

Android15 开机动画播放结束之后如何直接启动应用

问题背景 软件版本:Android15 在一些需求场景里面,需要开机动画播放结束立马去启动一个应用,下面介绍如何实现这种方案。 解决方案 首选我们需要知道开机动画播放结束之后的流程,这里会调用到wms里面,也就是一些enableScreen之类的函数,知道这个大概流程之后,再去对应…

AI实践:大模型痛点和解决方案讨论

大家好,我是星野,欢迎来到我的CSDN博客。在这个技术日新月异的时代,我们一起学习,共同进步。 今天想和大家分享的是大模型在实际应用中的痛点以及解决方案,特别是RAG(检索增强生成)技术。 大模…

Web前端工程化

Web前端工程化 前端工程化是指将软件工程的方法和原则应用到前端开发中,以提高开发效率、保证代码质量、便于团队协作和项目维护的一套体系化实践。以下是前端工程化的主要内容和实践: 核心组成部分 1. 模块化开发 JavaScript模块化:Comm…

Java 原生 HTTP Client

​介绍 Java 原生 HttpClient 是从 Java 11 开始引入的标准库,用于简化 HTTP 请求的发送与响应处理。它支持同步和异步请求,并内置对 HTTP/1.1 和 HTTP/2 协议的支持。HttpClient 提供了易用的 API 来设置请求头、请求体、处理响应以及配置 SSL/TLS 加密…

【C语言刷题】第十天:加量加餐继续,代码题训练,融会贯通IO模式

🔥个人主页:艾莉丝努力练剑 ❄专栏传送门:《C语言》、《数据结构与算法》、C语言刷题12天IO强训、LeetCode代码强化刷题 🍉学习方向:C/C方向 ⭐️人生格言:为天地立心,为生民立命,为…

【WEB】Polar靶场 6-10题 详细笔记

六.jwt 这题我又不会写 先来了解下jwt **JWT(JSON Web Token)**是一种基于JSON的开放标准(RFC 7519),主要用于在网络应用环境间传递声明信息。JWT通常用于身份验证和信息交换,确保在各方之间安全地传输信…

高阶亚马逊运营秘籍:关键词矩阵打法深度解析与应用

当竞争对手还在为单个大词竞价厮杀时,头部卖家已悄然构建了一张覆盖数千长尾关键词的隐形网络,精准触达每一个细分需求,以更低的成本撬动更高的转化率在亚马逊流量红利消退、广告成本高企的2025年,传统“爆款关键词”打法已显疲态…

【问题解决】org.springframework.web.util.NestedServletException Handler dispatch failed;

详细异常信息: org.springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter at org.springframework.web.servlet.DispatcherServlet.doDispatch(Disp…

【已解决】mac 聚焦搜索设置了edge 的地址栏搜索为google,还是跳转到百度

问题详情:在macbook的聚焦搜索中点击edge搜索的时候,跳转到了百度,即使已经将地址栏的搜索引擎设置为了goole,但是还是会跳转到百度。解决方案:1、打开safari浏览器。(看清了,是打开Safari&…

MimicMotion 让你的图片动起来

MimicMotion 是由腾讯公司推出的一款人工智能人像动态视频生成框架。可以模仿视频动作再让图片模仿动作姿态,最后生成视频。 MimicMotion 的核心在于其置信度感知的姿态引导技术,确保视频帧的高质量和时间上的平滑过渡。 以前咱们也手搭过Animate-X让图…

云计算考核 - 分析电子银行需求采用微服务架构对系统进行设计

二、使用的技术以及分析 微服务(Microservices)是一种架构风格,一个大型复杂软件应用由一个或多个微服务组成。系统中的各个微服务可被独立部署,各个微服务之间是松耦合的。每个微服务仅关注于完成一件任务并很好地完成该任务。在…

Ionic 安装使用教程

一、Ionic 简介 Ionic 是一个基于 Web 技术(HTML、CSS、JavaScript)的跨平台移动应用开发框架,结合 Angular、React 或 Vue 可快速构建 iOS 和 Android 应用。Ionic 提供丰富的 UI 组件、命令行工具及原生插件封装,广泛用于混合应…

渗透测试 - 简介

Web渗透测试简介 Web渗透测试(Penetration Testing)是一种模拟黑客攻击的安全评估方法,旨在发现Web应用程序中的漏洞,帮助开发者修复问题并提升系统安全性。它涉及主动测试目标系统(如网站或API)的弱点&am…

云原生AI研发体系建设路径

当AI遇上云原生,就像咖啡遇上牛奶,总能擦出不一样的火花 ☕️ 📋 文章目录 引言:为什么要建设云原生AI研发体系整体架构设计:搭建AI研发的"乐高积木"技术栈选择:选择合适的"武器装备"…

【网络安全】深入理解 IoC 与 IoA:从“事后识别”到“事前防御”

1. 简介 在网络安全领域,IoC(Indicators of Compromise,入侵指标) 和 IoA(Indicators of Attack,攻击指标) 是两个核心概念。它们是安全分析师识别攻击行为、调查事件、制定防御策略的重要依据…

贪心专题练习

牛牛学括号题目要求每次操作必须删除一个左括号和一个右括号,且删除后序列仍需合法。合法的括号序列要求每个右括号之前必须有对应的左括号。分析输入的都是合法的括号,即左括号右括号,可利用这一点去解题注意:中间取模是必要的&a…

屏幕分辨率修改工具 SwitchResX(Mac电脑)

苹果电脑屏幕分辨率修改工具,SwitchResX for Mac,可以为您提供控制显示器分辨率所需的工具和功能。 原文地址:屏幕分辨率修改工具 SwitchResX(Mac电脑)

【Java编程动手学】Java中的数组与集合

文章目录 一、Java数组基础1.1 数组结构概述1.2 一维数组1.2.1 声明与初始化1.2.2 访问与修改元素1.2.3 数组遍历 1.3 二维数组1.3.1 声明与初始化1.3.2 访问与遍历 1.4 三维数组及更高维数组1.5 数组类(Arrays)1.5.1 常用方法 1.6 复制数组1.6.1 系统复制方法1.6.2 手动复制 二…

Linux在线安装docker

1.切换阿里云镜像源 备份原有 repo 文件 sudo mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup 下载阿里云的 CentOS 7 repo 文件 sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo 清华 sudo…