RabbitMQ 高级特性之消息分发

1. 为什么要消息分发

当 broker 拥有多个消费者时,就会将消息分发给不同的消费者,消费者之间的消息不会重复,RabbitMQ 默认的消息分发机制是轮询,但会无论消费者是否发送了 ack,broker 都会继续发送消息至消费者,这就会造成消费者压力增大。于是,可以限制消费者每一次接收到的消息的数量,当消息达到该数量时,broker 就不会继续给这个消费者发送消息,而是会给其他的消费者派送消息。

所消费者消费了一条消息,并且给 broker 发送了 ack,那么此时消费者所未消耗的消息就没有达到最大消息数量,于是 broker 就会继续给消费者分配消息,直到消费者未消费的消息数量达到上限。

对于这种特性,有下面两个应用场景:

1.1 限流

在某些秒杀场景中,每一秒消费者接收到的消息都会非常大,那么就会造成消费者压力过大。于是我们就可以限制消费者所能接收的最大消息数量。

配置代码如下:

spring:rabbitmq:listener:simple:acknowledge-mode: manualprefetch: 5 #每个队列最多接收五条消息

在配置中,prefetch 表示队列中的消息数量上限为 5 条,若队列中未确认的消息数量达到 5 条,此时 broker 就不会继续给该队列分配消息,而是给其它的未达到上限的队列分配消息。

并且此处要设置为手动确认,若使用 auto 或 none,可能业务逻辑还没有开始消息就已经被签收,这就无法发挥限流的作用。

队列、交换机声明代码如下:

    @Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constants.QOS_QUEUE).build();}@Bean("qoeExchange")public DirectExchange qoeExchange() {return ExchangeBuilder.directExchange(Constants.QOS_EXCHANGE).build();}@Bean("qosBind")public Binding qosBind(@Qualifier("qoeExchange") DirectExchange directExchange,@Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(Constants.QOS_ROUTINGKEY);}

生产者代码如下:

    @RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {String messageInfo = "qos... " + i;rabbitTemplate.convertAndSend(Constants.QOS_EXCHANGE, Constants.QOS_ROUTINGKEY, messageInfo);}return "消息发送成功";}

消费者代码如下:

@Component
@Slf4j
public class QosListener {@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);}}

此处,消费者没有给 broker 发送 ack,那么队列中的消息就会一直存在。

代码运行结果如下:

这里我们可以看到,一共有 5 条未确认的消息, 已经达到了上限,于是就不会继续向消费者发送消息。

1.2 负载均衡

使用消息分发,也可以实现负载均衡。

现有两个消费者 A、B,A 处理消息的速度慢,B处理消息的速度快。若不设置负载均衡,那么就会出现 A 积压的消息过多,而 B 几乎没有什么消息挤压,这就没有充分地利用资源。

于是我们可以将 prefetch 设置为 1,那么每次消费者只会接收到一条消息,当 A、B 接收到消息后,在 B 处理完成时 A 还没有处理完,于是 broker 就会给 B 继续推送消息,直到 A 处理完成后才会继续给 A 推送消息。

消费者代码如下:

@Component
@Slf4j
public class QosListener {/*** 消费者1* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener1(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消费者 1 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(2000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 消费者2* @param message* @param channel* @throws IOException*/@RabbitListener(queues = Constants.QOS_QUEUE)public void listener2(Message message, Channel channel) throws IOException {String messageInfo = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("消费者 2 接收到消息: {}, deliveryTag: {}", messageInfo, deliveryTag);try {Thread.sleep(1000);channel.basicAck(deliveryTag, false);} catch (IOException e) {channel.basicNack(deliveryTag, false, true);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

上述代码,将消费者 2 的处理速度是消费者 1 的两倍,代码运行结果如下:

可以看出,消费者  2 每消耗 2 条数据,消费者 1 才消耗 1 条数据。,也就达到了负载均衡的作用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/90686.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/90686.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux操作系统从入门到实战:怎么查看,删除,更新本地的软件镜像源

Linux操作系统从入门到实战&#xff1a;怎么查看&#xff0c;删除&#xff0c;更新本地的软件镜像源前言一、 查看当前镜像源二、删除当前镜像源三、更新镜像源四、验证前言 我的Linux版本是CentOS 9 stream本篇博客我们来讲解怎么查看&#xff0c;删除&#xff0c;更新国内本…

两台电脑通过网线直连形成局域网,共享一台wifi网络实现上网

文章目录一、背景二、实现方式1、电脑A&#xff08;主&#xff09;2、电脑B3、防火墙4、验证三、踩坑1、有时候B上不了网一、背景 两台windows电脑A和B&#xff0c;想通过**微软无界鼠标&#xff08;Mouse without Borders&#xff09;**实现一套键盘鼠标控制两台电脑&#xf…

Java Reference类及其实现类深度解析:原理、源码与性能优化实践

1. 引言&#xff1a;Java引用机制的核心地位在JVM内存管理体系中&#xff0c;Java的四种引用类型&#xff08;强、软、弱、虚&#xff09;构成了一个精巧的内存控制工具箱。它们不仅决定了对象的生命周期&#xff0c;还为缓存设计、资源释放和内存泄漏排查提供了基础设施支持。…

华为云对碳管理系统的全生命周期数据处理流程

碳管理系统的全生命周期数据处理流程包含完整的数据采集、处理、治理、分析和应用的流程架构,可以理解为是一个核心是围绕数据的“采集-传输-处理-存储-治理-分析-应用”链路展开。以下是对每个阶段的解释,以及它们与数据模型、算法等的关系: 1. 设备接入(IoTDA) 功能: …

大模型安全风险与防护产品综述 —— 以 Otter LLM Guard 为例

大模型安全风险与防护产品综述 —— 以 Otter LLM Guard 为例 一、背景与安全风险 近年来&#xff0c;随着大规模预训练语言模型&#xff08;LLM&#xff09;的广泛应用&#xff0c;人工智能已成为推动文档处理、代码辅助、内容审核等多领域创新的重要技术。然而&#xff0c;…

1.2.2 计算机网络分层结构(下)

继续来看计算机网络的分层结构&#xff0c;在之前的学习中&#xff0c;我们介绍了计算机网络的分层结构&#xff0c;以及各层之间的关系。我们把工作在某一层的软件和硬件模块称为这一层的实体&#xff0c;为了完成这一层的某些功能&#xff0c;同一层的实体和实体之间需要遵循…

实训八——路由器与交换机与网线

补充——基本功能路由器&#xff1a;用于不同逻辑网段通信的交换机&#xff1a;用于相同逻辑网段通信的1.网段逻辑网段&#xff08;IP地址网段&#xff09;&#xff1a;IP地址的前三组数字代表不同的逻辑网段&#xff08;有限条件下&#xff09;&#xff1b;IP地址的后一组数字…

C++——构造函数的补充:初始化列表

C中&#xff0c;构造函数为成员变量赋值的方法有两种&#xff1a;构造函数体赋值和初始化列表。构造函数体赋值是在构造函数里面为成员变量赋值&#xff0c;如&#xff1a;class Data { public://构造函数体赋值Data(int year,int month,int day){_year year;_month month;_d…

代码随想录|图论|12岛屿周长

leetcode:106. 岛屿的周长 题目 题目描述 给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的矩阵&#xff0c;岛屿是被水包围&#xff0c;并且通过水平方向或垂直方向上相邻的陆地连接而成的。 你可以假设矩阵外均被水包围。在矩阵中恰好拥有…

开发制作模仿参考抄别人的小程序系统

很多老板看见别人公司的小程序系统界面好看&#xff0c;功能强大&#xff0c;使用人数多。就想要抄袭模仿参考别人家的小程序系统。想要了解一下有没有侵权风险&#xff0c;以及怎么开发制作开发制作模仿参考抄别人的小程序系统。首先回答第一个问题&#xff0c;只要你的小程序…

c语言中的数组IV

数组的集成初始化 集成初始化的定位 数组的大小 数组的赋值 不能直接将一个数组a赋值给数组b&#xff0c;只能通过遍历来实现 遍历数组 示例——检索元素在数组中的位置#include <stdio.h> int search(int key,int a[],int length); int main(void){int a[] {2,4,6,7,1,…

LDO选型

目录 一、最大输出电流 二、最大输入电压 三、最大功率&#xff1a;Pmax 四、负载动态调整率 五、输入电源纹波抑制比&#xff1a;PSRR 一、最大输出电流 参考TI LM1117IMPX-3.3/NOPB数据手册 由于LDO转换效率很低&#xff0c;LDO的标称最大电流 ≥ 实际最大负载电流 1…

飞算JavaAI:重构Java开发的“人机协同”新范式

目录一、从需求到架构&#xff1a;AI深度参与开发“顶层设计”1.1 需求结构化&#xff1a;自然语言到技术要素的准确转换1.2 架构方案生成&#xff1a;基于最佳实践的动态匹配二、编码全流程&#xff1a;从“手写代码”到“人机协同创作”2.1 复杂业务逻辑生成&#xff1a;以“…

解决SQL Server SQL语句性能问题(9)——SQL语句改写(7)

9.4.15. 消除join场景一 与Oracle等其他关系库类似,SQL Server中,join作为基本语法用于SQL语句中相关表之间的连接,有些场景中,join既可以增强SQL语句的可读性,同时,又可以提升SQL语句的性能,但有些场景中,join会导致CBO为SQL语句产生次优的查询计划,进而出现SQL语句…

深度学习-数据准备

一、数据准备 1.1定义 数据准备&#xff08;Data Preparation&#xff09; 是数据分析与机器学习流程中的核心环节&#xff0c;指将原始数据转换为适合分析或建模的结构化格式的过程。 1.2组成 数据准备主要由两个部分组成&#xff0c;一个是划分数据集&#xff0c;一个是构建…

IPA软件源预览系统源码(源码下载)

这是一款IPA软件源预览系统源码&#xff0c;搭建这个源码一定记住没有软件源的别搭建&#xff0c;因为你玩不明白&#xff0c;不是做IPA软件源的不要下载这套源码&#xff0c;简单的测试了&#xff0c;UI很舒服&#xff0c;喜欢的自行部署&#xff01; 源码下载&#xff1a;htt…

python 数据分析 单细胞测序数据分析 相关的图表,常见于肿瘤免疫微环境、细胞亚群功能研究 ,各图表类型及逻辑关系如下

这是一组 单细胞测序数据分析 相关的图表&#xff0c;常见于肿瘤免疫微环境、细胞亚群功能研究 &#xff0c;各图表类型及逻辑关系如下&#xff1a;使用kimi doubao 和deepseek &#xff0c;分析图标和pdf 豆包最好&#xff0c;用豆包分析| 图表类型 A、E&#xff08;堆叠柱状…

表达式索引海外云持久化实践:关键技术解析与性能优化

随着全球数字化转型加速&#xff0c;表达式索引技术正成为海外云服务商提升数据库性能的核心方案。本文将深度解析如何通过云原生架构实现索引持久化&#xff0c;对比主流云平台的技术实现差异&#xff0c;并给出跨国业务场景下的优化建议。 表达式索引海外云持久化实践&#x…

sprinboot团队任务管理系统 计算机毕业设计源码32322

摘 要 随着团队协作模式的日益多样化&#xff0c;传统的任务管理方法已无法满足现代团队对高效协作和任务分配的需求。因此&#xff0c;本研究通过引入信息化设计并实现了一套团队任务管理系统&#xff0c;旨在为管理员、成员用户和团长用户等用户提供高效、灵活的任务管理…

单链表,咕咕咕

1.引入单链表顺序表对于中间或者头部的删除&#xff0c;时间复杂度为O(N)&#xff0c;增容需要申请新的空间&#xff0c;拷贝数据&#xff0c;释放就空间&#xff0c;消耗。增容一般是2倍的增长&#xff0c;会有空间的浪费。为了解决这些问题&#xff0c;引入了单链表。2.单链表…