事务设置和消息分发

事务

RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,因此RabbitMQ也支持事务机制.

SpringAMQP也提供了对事务相关的操作,RabbitMQ事务允许开发者确保消息的发送和接收是原子性的,要么
全部成功,要么全部失败.|

前期准备工作:

    //事务public static final String TRANS_QUEUE = "TRANS_QUEUE";public static final String TRANS_EXCHANGE = "TRANS_EXCHANGE";public static final String TRANS_KEY = "TRANS_KEY";
    //事务@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(MQConstants.TRANS_QUEUE).build();}@Bean("transExchange")public Exchange transExchange() {return ExchangeBuilder.directExchange(MQConstants.TRANS_EXCHANGE).build();}@Bean("transBinding")public Binding transBinding(@Qualifier("transExchange") Exchange exchange, @Qualifier("transQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.TRANS_KEY).noargs();}

配置事务管理器

@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}

添加 @Transactional

如果不添加 @Transactional,我们的事务管理器是不会在这个代码上生效的

    @Transactional@RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans1 ");int n = 10 / 0;rabbitTemplate.convertAndSend(MQConstants.TRANS_EXCHANGE, MQConstants.TRANS_KEY, "trans2 ");return "消息发送成功";}

经过观察,我们可以看到这两条消息要么同时发送成功,要么同时发送失败

消息分发

RabbitMQ队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者,每条消息只会发送给订阅列表里的一个消费者,这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,RabbitMQ是以轮询的方法进行分发的,而不管消费者是否已经收到消费并已经确认了消息,这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者消费速度快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降。

如何处理呢?我们可以使用前面章节讲到的**channel.basicQos(intprefetchCount)**方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量比如:消费端调用了channelbasicQos(5),RabbitMQ会为该消费者数,发送一条消息计数+1,消费一条消息计数-1,当达到了设定的上限,RabbitMQ就不会再向它发送消息了,直到消费者确认了某条消息。

类似TCP/IP中的"滑动窗口".
prefetchCount设置为0时表示没有上限。
basicQos对拉模式的消费无效

限流和负载均衡

配置信息:

    listener:simple:acknowledge-mode: manual # 设置确认模式prefetch: 5 

前期准备:

常量类:

    //限流public static final String QOS_QUEUE = "QOS_QUEUE";public static final String QOS_EXCHANGE = "QOS_EXCHANGE";public static final String QOS_KEY = "QOS_KEY";

声明:

    //限流@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(MQConstants.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(MQConstants.QOS_EXCHANGE).build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.QOS_KEY).noargs();}

生产者:

    @RequestMapping("/qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(MQConstants.QOS_EXCHANGE, MQConstants.QOS_KEY, "qos");}return "消息发送成功";}

如果我们不进行手动确认,观察最大未确认的消息接收量:

在这里插入图片描述
在这里插入图片描述

可以得知消费者最大接收到的未确认消息数量为 我们设置的 prefetch 值


我们可以通过限流这种方式实现负载均衡

@Component
public class QosListener {@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle1(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息:" + messageContent);Thread.sleep(10000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}@RabbitListener(queues = MQConstants.QOS_QUEUE)public void handle2(String messageContent, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费消息:" + messageContent);Thread.sleep(5000);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}

可以观察到消费能力强的队列会持续消费消息,消费能力弱的队列消费的消息会相对较少

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/96111.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/96111.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python 中 try / except / else / finally 异常处理详解

1. 基本结构 try:# 可能会抛出异常的代码 except SomeException as e:# 捕获并处理异常 else:# 如果 try 中代码没有异常&#xff0c;就执行这里 finally:# 无论是否发生异常&#xff0c;最后都会执行这里2. 各部分的作用 try 用途&#xff1a;包含可能发生异常的代码段。如果代…

冰火岛 Tech 传:Apple Foundation Models 心法解密(下集)

引子 上集说到冰火岛冰屋内,谢逊、张翠山、殷素素三人亲见 “指令(Instructions)” 如何让 AI 脱胎换骨,从木讷报地名的 “愣头青”,变身为文采斐然的 “旅行作家”。 正当素素惊叹这 AI 武学的奇妙时,谢逊却突然神色一凛,指着手腕上用冰屑刻的 “4096” 字样道:“这等…

Qt信号与槽机制全面解析

✨ 1. 核心概念信号与槽是Qt独创的一种对象间通信机制&#xff0c;它使得一个对象的状态变化或事件发生能够自动通知其他对象作出响应&#xff0c;从而实现高度解耦的代码设计。1.1 信号&#xff08;Signals&#xff09;定义&#xff1a;信号是由对象在特定事件发生时发出&…

2025年COR SCI2区,基于近似细胞分解的能源高效无人机路径规划问题用于地质灾害监测,深度解析+性能实测

目录1.摘要2.问题描述与数学模型3.能源网格混合元启发式算法4.结果展示5.参考文献6.代码获取7.算法辅导应用定制读者交流1.摘要 本文提出了一种能源高效的无人机路径规划方法&#xff08;EURP&#xff09;用于监测分散的地质灾害易发区域&#xff0c;通过建立无人机飞行模式的…

LangChain关于提示词的几种写法

上文中 <---->(可点击移步查阅),我们已经了解到LangChain如何编写调用大模型的一个基本框架, 在编写完这个基本框架后,就要与大模型进行交互了,这时候,就需要我们编写提示词了, 那么以下,就会介绍提示词的几种编写方式 1 可携带变量的提示词 1.1 PromptTempla…

CyberPoC 是一个现代化的网络安全练习和竞赛平台,支持容器化部署的安全挑战,为用户提供实践网络安全技能的环境。

工具介绍 CyberPoC 是一个现代化的网络安全练习和竞赛平台&#xff0c;支持容器化部署的安全挑战&#xff0c;为用户提供实践网络安全技能的环境。 &#x1f680; 快速开始 使用 Docker Compose (推荐) 克隆项目 git clone https://github.com/dushixiang/cyberpoc.gitcd cy…

【单片机day03】

UART串口通用异步收发器&#xff0c;2个串口(1个串口被用于ISP下载程序&#xff0c;1个串口被用于和主机之间的通信)全双工&#xff08;两根线TX、RX&#xff09;、串行、异步通信方式&#xff1a;单工&#xff1a;发送方和接收方是固定的&#xff0c;数据只能从发送方 发给 …

react16到react19更新及底层实现是什么以及区别

好问题 &#x1f44d;。React 16 → 19 的更新跨度其实涵盖了 React 的几个“时代级”的演进&#xff1a;从 Fiber 架构 的重写、到 并发特性 的引入&#xff0c;再到 React Server Components 和 现代语法支持。我帮你梳理下主要更新点和底层实现的区别&#xff0c;方便你系统…

【分享】基于百度脑图,并使用Vue二次开发的用例脑图编辑器组件

偶然间发现了这个项目&#xff0c;分享出来。 【分享】基于百度脑图&#xff0c;并使用Vue二次开发的用例脑图编辑器组件1 项目地址2 项目简介3 项目部署3.1 安装node和npm3.2 项目下载3.3 修改npm镜像源3.4 部署4 项目中使用1 项目地址 基于百度脑图&#xff0c;并使用Vue二次…

Kotlin中抽象类和开放类

抽象类 (Abstract Class) 定义和特点 抽象类使用 abstract 关键字声明&#xff0c;是一种不能被直接实例化的特殊类&#xff0c;主要用于被其他类继承。 abstract class Base {open fun f() {} }abstract class Derived : Base() {override abstract fun f() // 抽象成员在类中…

TensorFlow深度学习实战(37)——深度学习的数学原理

TensorFlow深度学习实战&#xff08;37&#xff09;——深度学习的数学原理0. 前言1. 反向传播历史2. 微积分相关概念2.1 向量2.2 导数和梯度2.3 梯度下降2.4 链式法则2.5 常用求导公式2.6 矩阵运算3. 激活函数4. 反向传播4.1 前向计算4.2 反向传播5. 交叉熵及其导数6. 批量梯度…

1.1 汽车运行滚动阻力

汽车运行阻力由4部分构成&#xff1a;滚动阻力、空气阻力、坡度阻力、加速阻力。 1).汽车在水平道路上等速行驶时&#xff0c;必须克服来自地面的滚动阻力和来自空气的空气阻力。 2). 当汽车在坡道上上坡行驶时&#xff0c;还必须克服重力沿坡道的分力&#xff0c;称为坡度阻…

e203000

1&#xff09;①BIU作为核心通信枢纽&#xff0c;主要承担两大功能&#xff1a;一是连接处理器核内的关键执行单元&#xff08;包括IFU、LSU和EAI协处理器&#xff09;&#xff0c;统一管理指令和数据的内部传输路径&#xff1b;二是作为"核内计算"与"核外资源&…

Infortrend普安科技IEC私有云平台VM解决方案

Infortrend企业云&#xff08;IEC&#xff09;内置Hypervisor运行VM。功能完整、无需额外付费。在本文中&#xff0c;我们将为您详细介绍IEC是如何支持 VM的。市场现状与挑战市场现状 虚拟化市场面临转型&#xff0c;主流厂商&#xff08;如 VMware&#xff09;改用订阅制…

【代码随想录算法训练营——Day6(Day5周日休息)】哈希表——242.有效的字母异位词、349.两个数组的交集、202.快乐数、1.两数之和

LeetCode题目链接 https://leetcode.cn/problems/valid-anagram/ https://leetcode.cn/problems/intersection-of-two-arrays/ https://leetcode.cn/problems/happy-number/ https://leetcode.cn/problems/two-sum/ 题解 242.有效的字母异位词 这道题要想到用哈希表来做。同时注…

安科瑞基站智慧运维云平台:安全管控与节能降耗双效赋能

功能&#xff1a;基站智慧用电云平台通过对5G宏站和室分站点加装交/直流智能监控设备、无线采集设备以及系统管理平台&#xff0c;完成夜间无业务时段的下电操作&#xff0c;减少电能消耗&#xff0c;降低运营成本支出&#xff0c;以及提升通信设备供电线路状态的实时监测保护功…

处理省市区excel数据加工成SQL

原始数据相关内容链接 处理excel数据加工成SQL的脚本 #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Excel行政区域数据转SQL脚本 - 支持特殊行政单位处理&#xff08;如省直辖县级行政单位&#xff09; - 支持批量处理 """import pand…

双碳目标下的24小时分时综合能源系统低碳优化调度:基于 Matlab/YALMIP/CPLEX的方法与仿真

在“双碳”战略目标的推动下&#xff0c;综合能源系统&#xff08;Integrated Energy System, IES&#xff09;已成为实现能源结构优化与碳排放控制的重要途径。本文以光伏、风电、燃气—电热联产&#xff08;CHP&#xff09;、燃气锅炉、电锅炉、电储能以及碳捕集&#xff08;…

TDengine 选择函数 Last() 用户手册

LAST() 函数用户手册 函数定义 LAST(expr)功能说明 LAST() 函数统计表/超级表中某列的值最后写入的非 NULL 值&#xff0c;即返回时间戳最大的非 NULL 值。 版本要求 最低版本: v3.0.0.0 返回值 数据类型: 同应用的字段返回内容: 时间戳最大的非 NULL 值及其对应的时间戳…

< 自用文 学习 > 用 Claude Code 做一个日历

环境&#xff1a; OS: Windows 11 IDE&#xff1a;TREA Model: Sonnet / Qwen (免费 Token 用完) 参考&#xff1a; Claude Code Beginner Guide – Get Started in 20 Minutes (2025) by Alex Finn 油管博客 https://www.youtube.com/watch?viYiuzAsWnHU&listTLGG1L…