RabbitMQ--消息顺序性

看本章之前强烈建议先去看博主的这篇博客

              RabbitMQ--消费端单线程与多线程-CSDN博客

一、消息顺序性概念

消息顺序性是指消息在生产者发送的顺序消费者接收处理的顺序保持一致。


二、RabbitMQ 顺序性保证机制

情况顺序保证情况备注
单队列,单消费者消息严格按发送顺序消费最简单且唯一保证顺序的场景
单队列,多个消费者无法保证全局顺序,但可以设置 QoS 保证消费者串行处理自己收到的消息通过 basicQos(1) 保证每个消费者一次只处理一条消息,但整体队列消息按消费者分配,顺序不保证
消息确认和重发机制如果未正确使用 ack,消息重发可能导致顺序乱需开启手动确认,确保消息处理完毕后才 ack
消息重试与死信机制可能导致消息顺序错乱需要设计合理的重试策略和死信队列策略


三、顺序性的保证方式

  1. 单队列单消费者

    • 保证消息完全顺序消费。适合严格顺序场景。

  2. 消息确认机制

    • 使用手动确认 autoAck=false,处理完后再 basicAck,防止消息乱序重发。

  3. QoS(basicQos)

    • 设置 basicQos(1),保证消费者一次只处理一条消息,避免多条消息并发处理导致乱序。

  4. 业务分区设计

    • 按某个字段(比如订单ID)分区到不同队列,保证分区内顺序。


四、原生 Java 示例


1. 依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

2. 生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("Sent: " + message);Thread.sleep(100);  // 模拟发送间隔}}}
}

3. 消费者代码(单个消费者,保证顺序)

import com.rabbitmq.client.*;public class Consumer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置每次只处理一条消息,避免乱序channel.basicQos(1);System.out.println("Waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("Received: " + message);try {// 模拟处理消息Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Ack sent for: " + message);}};// 关闭自动确认,开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

4. 多消费者并发消费注意事项

  • 多个消费者消费同一队列,消息分发是轮询,整体消息顺序无法保证

  • basicQos(1) 只保证单个消费者串行处理自己拿到的消息,但多个消费者间消息顺序无保证。

  • 若需要严格顺序,需要保证单消费者消费或者分队列处理。


五、Spring Boot 示例


1. pom.xml 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. application.yml

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 每个消费者预取消息数量,类似 basicQos(1)prefetch: 1

3. RabbitMQ 配置类

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "order_queue";@Beanpublic Queue orderQueue() {return new Queue(QUEUE_NAME, true); // 持久化队列}
}

4. 生产者代码

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessages() throws InterruptedException {for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, message);System.out.println("Sent: " + message);Thread.sleep(100);}}
}

5. 消费者代码

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(String message) throws InterruptedException {System.out.println("Received: " + message);// 模拟消息处理时间,确保消息顺序Thread.sleep(500);System.out.println("Processed: " + message);}
}

6. 主启动类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(RabbitOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessages();}
}

六、总结

方面说明
单队列单消费者保证严格消息顺序,消息先进先出。
单队列多消费者消息轮询分发,整体顺序无法保证;设置 basicQos(1) 保证单个消费者顺序处理自己的消息。
消息确认机制手动 ack,避免消息未处理完成就确认导致顺序乱。
Spring Boot 配置spring.rabbitmq.listener.simple.prefetch=1 控制每个消费者预取消息数。
业务设计建议对于严格顺序场景,推荐单队列单消费者或消息分区+单消费者方案。

如果要严格保证消息顺序性:

        1. 单队列单消费者 

        2. 多消费者分区顺序

                当你只要求 “某一类业务 ID 下的顺序”一致,如订单、用户、设备号等,而不要求全局顺序时,这种方案很好。

                不能做到全局顺序消费!

                        不同队列之间顺序是无法控制的

                        比如 order_1order_5 属于不同分区,它们的处理时间会交叉,整体顺序就乱了。

多消费者分区顺序代码样例

  • 利用多个队列(分区),每个队列绑定一个消费者,保证队列内消息顺序;

  • 生产者根据某个分区键(如订单ID哈希)选择发送到对应队列,保证同一个分区的消息顺序。


多消费者分区顺序消费示例(Spring Boot)


1. 项目结构与依赖

pom.xml 添加:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置类:定义多个队列与交换机绑定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final int PARTITION_COUNT = 3;@Beanpublic DirectExchange directExchange() {return new DirectExchange("order_exchange");}@Beanpublic Queue queue0() {return new Queue("order_queue_0", true);}@Beanpublic Queue queue1() {return new Queue("order_queue_1", true);}@Beanpublic Queue queue2() {return new Queue("order_queue_2", true);}@Beanpublic Binding binding0(Queue queue0, DirectExchange directExchange) {return BindingBuilder.bind(queue0).to(directExchange).with("partition_0");}@Beanpublic Binding binding1(Queue queue1, DirectExchange directExchange) {return BindingBuilder.bind(queue1).to(directExchange).with("partition_1");}@Beanpublic Binding binding2(Queue queue2, DirectExchange directExchange) {return BindingBuilder.bind(queue2).to(directExchange).with("partition_2");}
}

3. 生产者:根据订单ID哈希选择分区,发送到对应RoutingKey

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final int PARTITION_COUNT = RabbitConfig.PARTITION_COUNT;public void sendOrder(String orderId, String message) {int partition = Math.abs(orderId.hashCode()) % PARTITION_COUNT;String routingKey = "partition_" + partition;rabbitTemplate.convertAndSend("order_exchange", routingKey, message);System.out.println("Sent to " + routingKey + ": " + message);}
}

4. 消费者:为每个队列配置单独消费者,保证分区顺序

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "order_queue_0")public void receivePartition0(String message) {System.out.println("Partition 0 received: " + message);// 业务处理,保证队列内顺序}@RabbitListener(queues = "order_queue_1")public void receivePartition1(String message) {System.out.println("Partition 1 received: " + message);}@RabbitListener(queues = "order_queue_2")public void receivePartition2(String message) {System.out.println("Partition 2 received: " + message);}
}

5. 测试调用示例(主程序)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PartitionOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(PartitionOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 发送多条订单消息,orderId不同分区for (int i = 0; i < 20; i++) {String orderId = "order" + i;String message = "Order message for " + orderId;producer.sendOrder(orderId, message);Thread.sleep(100);}}
}

6. 说明

  • 消息根据订单ID哈希决定发送哪个队列

  • 每个队列由单个消费者消费,保证该分区消息顺序

  • 多个队列+多消费者,实现并发消费和分区顺序

🔁 顺序保证范围

粒度保证情况
同一个 orderId✅ 顺序消费(始终落在同一队列)
不同 orderId❌ 不保证顺序(本来就不是要求)

✅ 结论

你这套方案:

  • 👍 是 Spring Boot 下 RabbitMQ 顺序消费的推荐做法

  • 👍 保证了“每个订单 ID 的消息顺序

  • 👍 可扩展,增加分区数提升并发能力

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916038.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916038.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

.net core接收对方传递的body体里的json并反序列化

1、首先我在通用程序里有一个可以接收对象型和数组型json串的反序列化方法public static async Task<Dictionary<string, string>> AllParameters(this HttpRequest request){Dictionary<string, string> parameters QueryParameters(request);request.Enab…

(10)机器学习小白入门 YOLOv:YOLOv8-cls 模型评估实操

YOLOv8-cls 模型评估实操 (1)机器学习小白入门YOLOv &#xff1a;从概念到实践 (2)机器学习小白入门 YOLOv&#xff1a;从模块优化到工程部署 (3)机器学习小白入门 YOLOv&#xff1a; 解锁图片分类新技能 (4)机器学习小白入门YOLOv &#xff1a;图片标注实操手册 (5)机器学习小…

Vue 脚手架基础特性

一、ref属性1.被用来给元素或子组件注册引用信息&#xff08;id的替代者&#xff09;2.应用在html标签上获取的是真实DOM元素&#xff0c;用在组件标签上是组件实例对象3.使用方式&#xff1a;(1).打标识&#xff1a;<h1 ref"xxx">...</h1> 或 <Schoo…

Ubuntu安装k8s集群入门实践-v1.31

准备3台虚拟机 在自己电脑上使用virtualbox 开了3台1核2G的Ubuntu虚拟机&#xff0c;你可以先安装好一台&#xff0c;安装第一台的时候配置临时调高到2核4G&#xff0c;安装速度会快很多&#xff0c;安装完通过如下命令关闭桌面&#xff0c;能够省内存占用&#xff0c;后面我们…

Word Press富文本控件的保存

新建富文本编辑器&#xff0c;并编写save方法如下&#xff1a; edit方法&#xff1a; export default function Edit({ attributes, setAttributes }) {return (<><div { ...useBlockProps() }><RichTexttagNameponChange{ (value) > setAttributes({ noteCo…

【编程趣味游戏】:基于分支循环语句的猜数字、关机程序

&#x1f31f;菜鸟主页&#xff1a;晨非辰的主页 &#x1f440;学习专栏&#xff1a;《C语言学习》 &#x1f4aa;学习阶段&#xff1a;C语言方向初学者 ⏳名言欣赏&#xff1a;"编程的核心是实践&#xff0c;而非空谈" 目录 1. 游戏1--猜数字 1.1 rand函数 1.2 sr…

UE5 UI 控件切换器

文章目录分类作用属性分类 面板 作用 可以根据索引切换要显示哪个子UI&#xff0c;可以拥有多个子物体&#xff0c;但是任何时间只能显示一个 属性 在这里指定要显示的UI的索引

scikit-learn 包

文章目录scikit-learn 包核心功能模块案例其他用法**常用功能详解****(1) 分类任务示例&#xff08;SVM&#xff09;****(2) 回归任务示例&#xff08;线性回归&#xff09;****(3) 聚类任务示例&#xff08;K-Means&#xff09;****(4) 特征工程&#xff08;PCA降维&#xff0…

Excel 将数据导入到SQLServer数据库

一般系统上线前期都会导入期初数据&#xff0c;业务人员一般要求你提供一个Excel模板&#xff0c;业务人员根据要求整理数据。SQLServer管理工具是支持批量导入数据的&#xff0c;所以我们可以使用该工具导入期初。Excel格式 第一行为字段1、连接登入的数据库并且选中你需要导入…

剪枝和N皇后在后端项目中的应用

剪枝算法&#xff08;Pruning Algorithm&#xff09; 生活比喻&#xff1a;就像修剪树枝一样&#xff0c;把那些明显不会结果的枝条提前剪掉&#xff0c;节省养分。 在后端项目中的应用场景&#xff1a; 搜索优化&#xff1a;在商品搜索中&#xff0c;如果某个分类下没有符合条…

cocos 2d游戏中多边形碰撞器会触发多次,怎么解决

子弹打到敌机 一发子弹击中&#xff0c;碰撞回调多次执行 我碰撞组件原本是多边形碰撞组件 PolygonCollider2D&#xff0c;我改成盒碰撞组件BoxCollider2D 就好了 用前端的节流方式。或者loading处理逻辑。我测试过了&#xff0c;是可以 本来就是多次啊,设计上貌似就是这样的…

Kubernetes环境中GPU分配异常问题深度分析与解决方案

Kubernetes环境中GPU分配异常问题深度分析与解决方案 一、问题背景与核心矛盾 在基于Kubernetes的DeepStream应用部署中&#xff0c;GPU资源的独占性分配是保障应用性能的关键。本文将围绕一个典型的GPU分配异常问题展开分析&#xff1a;多个请求GPU的容器本应独占各自的GPU&…

Django与模板

我叫补三补四&#xff0c;很高兴见到大家&#xff0c;欢迎一起学习交流和进步今天来讲一讲视图Django与模板文件工作流程模板引擎&#xff1a;主要参与模板渲染的系统。内容源&#xff1a;输入的数据流。比较常见的有数据库、XML文件和用户请求这样的网络数据。模板&#xff1a…

日本上市IT企业|8月25日将在大连举办赴日it招聘会

株式会社GSD的核心战略伙伴贝斯株式会社&#xff0c;将于2025年8月25日在大连香格里拉大酒店商务会议室隆重举办赴日技术人才专场招聘会。本次招聘会面向全国范围内的优秀IT人才&#xff0c;旨在为贝斯株式会社东京本社长期发展招募优质的系统开发与管理人才。招聘计划&#xf…

低功耗设计双目协同画面实现光学变焦内带AI模型

低功耗设计延长续航&#xff0c;集成储能模块保障阴雨天气下的铁塔路线的安全一、智能感知与识别技术 多光谱融合监控结合可见光、红外热成像、激光补光等技术&#xff0c;实现全天候监测。例如&#xff0c;红外热成像可穿透雨雾监测山火隐患&#xff0c;激光补光技术则解决夜间…

datasophon下dolphinscheduler执行脚本出错

执行hive脚本出错&#xff1a; 错误消息&#xff1a; FAILED: RuntimeException Error loading hooks(hive.exec.post.hooks): java.lang.ClassNotFoundException: org.apache.atlas.hive.hook.HiveHookat java.net.URLClassLoader.findClass(URLClassLoader.java:387)at java.…

【Elasticsearch】安全地删除快照仓库、快照

《Elasticsearch 集群》系列&#xff0c;共包含以下文章&#xff1a; 1️⃣ 冷热集群架构2️⃣ 合适的锅炒合适的菜&#xff1a;性能与成本平衡原理公式解析3️⃣ ILM&#xff08;Index Lifecycle Management&#xff09;策略详解4️⃣ Elasticsearch 跨机房部署5️⃣ 快照与恢…

nodejs的npm

1. 什么是 npm&#xff1f; npm&#xff08;Node Package Manager&#xff09; 是 Node.js 的默认包管理工具&#xff0c;用于&#xff1a; 安装和管理依赖&#xff08;第三方库、框架等&#xff09;。运行项目脚本&#xff08;如启动服务、测试、构建等&#xff09;。发布和共…

外网访问内部私有局域网方案,解决运营商只分配内网IP不给公网IP问题

相信不少网友和我一样&#xff0c;为了实现远程控制、NAS访问、组建私有云、摄像头监控之类的需求&#xff0c;把光猫改成了桥接模式&#xff0c;并用自己的路由器拨号、进行端口了映射。本人之前一直用着没啥问题&#xff0c;不过&#xff0c;最近突然出现了无法访问的情况&am…

大模型——上下文工程 (Context Engineering) – 现代 AI 系统的架构基础

上下文工程 (Context Engineering) – 现代 AI 系统的架构基础 最近,AI大神 Andrej Karpathy 在YC的一个演讲《Software in the era of AI 》带火了一个新的概念 Context Engineering,上下文工程,LangChain也于7月2号在官网博客发表以《Context Engineering》为题目的文章(h…