RocketMq如何保证消息的顺序性

文章目录

  • 1.顺序消息的全流程
    • 1.1 发送阶段:消息分区
    • 1.2.存储阶段:顺序写入
    • 1.3.消费阶段:串行消费
  • 2.第三把锁有什么用?
  • 3.顺序消费存在的问题

Kafka只支持同一个Partition内消息的顺序性一样,RocketMQ中也提供了基于队列(分区)的顺序消费。即同个队列内的消息可以做到有序,但是不同队列内的消息是无序的!

在RocketMq中,它的顺序消息通过客户端的三把锁以及同一队列顺序写入协同工作,确保消息从发送、存储到消费的全流程严格有序。其核心在于:

  • 分区一致性(同一业务逻辑的消息发送到同一个分区)
  • 存储顺序性(单线程顺序写入)
  • 消费串行化(单线程消费分区)

这种设计在需要严格顺序性的场景(如金融交易、订单处理)中非常关键。
RocketMQ 的顺序消息需要从 发送、存储、消费 三个环节严格控制顺序性,因此引入了三把锁:

锁类型作用阶段实现方式目标
发送端消息发送阶段, 固定 MessageQueue,确保同一业务逻辑的消息发送到同一个 MessageQueue,为后续的存储和消费顺序性奠定基础MessageQueueSelector消息分区一致性
Broker消息存储阶段, 确保同一 MessageQueue的消息按顺序写入磁盘CommmitLog顺序写机制+ConsumeQueue 分区索引,每个 MessageQueue 对应一个 ConsumeQueue 文件,记录消息在 CommitLog 中的位置(偏移量),确保同一 MessageQueue 的消息在 ConsumeQueue 中的顺序性消息存储顺序性
消费锁消息消费阶段,通过三把锁确保同一 MessageQueue的消息单线程串行消费分布式锁 + 本地锁 + ProcessQueue 锁
分布式锁:消费者向 Broker 申请分布式锁(默认每 20 秒续签),确保同一消费组内只有一个消费者能消费该 MessageQueue
本地锁:通过 MessageQueueLock 的 Synchronized 锁,确保同一消费者线程池中只有一个线程处理该 MessageQueue
ProcessQueue 锁:通过 ProcessQueue 的 ReentrantLock(consumeLock),防止消费过程中因负载均衡或重平衡导致 ProcessQueue 被删除
通过这三个组合确保同一 MessageQueue 的消息由单线程串行消费,避免多线程并发导致顺序错乱
消息消费顺序性

1.顺序消息的全流程

1.1 发送阶段:消息分区

生产者通过 MessageQueueSelector(如 ShardingKeySelector)将 同一业务逻辑的消息(如同一订单 ID)发送到 同一个 MessageQueue

当我们作为MQ的生产者需要发送顺序消息时,需要在Send方法中,传入一个MessageQueueSelector
MessageQueueSelector中需要实现一个select方法,这个方法就是用来定义要把消息发送到哪个MessageQueue的,通常可以使用取模法进行路由:

    public void send() {SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;//根据参数,计算出一个要接收消息的MessageQueue的下标int index = id % mqs.size();//返回这个MessageQueuereturn mqs.get(index);}}, orderId);}

通过以上形式就可以将需要有序的消息发送到同一个队列中。需要注意的是,这里需要使用同步发送的方式!

1.2.存储阶段:顺序写入

Broker 接收到消息后,根据 MessageQueue 的物理分区(CommitLog + ConsumeQueue)进行 顺序写入。

同一 MessageQueue 的消息在物理文件中 RocketMQ 通过单线程写入 CommitLog 保证顺序性,按顺序追加写入,保证存储顺序性。

消息按照顺序发送的消息队列中之后,那么,消费者如何按照发送顺序进行消费呢?

1.3.消费阶段:串行消费

RocketMQ的MessageListener回调函数提供了两种消费模式:

  • 有序消费模式MessageListenerOrderly
  • 并发消费模式MessageListenerConcurrently。

所以,想要实现顺序消费,需要使用MessageListenerOrderly模式接收消息:

        consumer.registerMessagelistener(new MessagelistenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {System.out.printf("Receive order msg:" + new String(msgs.get(0) .getBody( )));return ConsumeOrderlyStatus.SUCCESS;}}, new ConsumeOrderlyContext());

当我们用以上方式注册一个消费之后,为了保证同一个队列中的有序消息可以被顺序消费,就要保证RocketMQ Broker只会把消息发送到同一个消费者上,这时候就需要加锁了。

在实现中,ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,会尝试向 Broker 为当前消费者客户端申请分布式锁(第一把锁)。如果获取成功,那么后续消息将会只发给这个Consumer。

接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入 ProcessQueue,同时将消息提交到消费线程池进行执行。

那么拉取之后的消费过程,怎么保证顺序消费呢?这里就需要更多的锁了

RocketMQ在消费的过程中,需要申请 MessageQueue 锁,消费线程通过 MessageQueueLock 的 synchronized 锁(第二把锁)住当前 MessageQueue 的消费过程,确保在同一时间,一个队列中只有一个线程能处理列中的消息。

获取到 MessageQueue 的锁后,就可以从ProcessQueue中依次拉取一批消息处理了,但是这个过程中,为了保证消息不会出现重复消费,还需要对ProcessQueue进行加锁,通过 ProcessQueue 的 ReentrantLock。然后就可以开始处理业务逻辑了

总结下来就是三次加锁:

  • 首先锁定Broker上的MessageQueue,确保消息只会投递到唯一的消费者(分布式锁)
  • 然后对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列。(synchronized)
  • 最后对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现消息的重复消费。(ReentrantLock)

在这里插入图片描述

里面有几个点需要大家注意下:

2.第三把锁有什么用?

前面介绍客户端加锁过程中,一共加了三把锁,那么,有没有想过这样一个问题,第三把锁如果不加的话,是不是也没问题?

因为我们已经对MessageQueue加锁了,为啥还需要对ProcessQueue再次加锁呢?

这里其实主要考虑的是重平衡(Rebalance)的问题

当我们的消费者集群,新增了一些消费者,发生重平衡的时候,某个队列可能会原来属于客户端A消费的,但是现在要重新分配给客户端B了。

这时候客户端A就需要把自己加在Broker上的锁解掉,而在这个解锁的过程中,就需要确保消息不能在消费过程中就被移除了,因为如果客户端A可能正在处理一部分消息,但是位点信息还没有提交,如果客户端B立马去消费队列中的消息,那存在一部分数据会被重复消费

那么如何判断消息是否正在消费中呢,就需要通过这个ProcessQueue上面的锁来判断了,也就是说在解锁的线程也需要尝试对ProcessQueue进行加锁,加锁成功才能进行解锁操作。以避免过程中有消息消费。

3.顺序消费存在的问题

通过上面的介绍,我们知道了RocketMQ的顺序消费是通过在消费者上多次加锁实现的,这种方式带来的问题就是会降低吞吐量,并且如果前面的消息阻寨,会导致更多消息阻塞。所以,顺序消息需要慎用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/91985.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/91985.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zabbix平台无法删除已停用主机的处理案例

在zabbix平台上删除已停用的主机&#xff0c;提示“SQL描述式执行已失败: "DELETE FROM items WHERE (itemid IN &#xff08;.....)”&#xff0c;无法删除&#xff0c;本文为处理情况。一、问题现象在zabbix平台上删除已停用的主机&#xff0c;提示“SQL描述式执行已失败…

【计算机网络】6应用层

1.网络应用模型 特性 客户/服务器模型(Client-Server, C/S) 对等模型(Peer-to-Peer, P2P) 中心化 是(依赖服务器) 否(去中心化) 角色特点 服务器 客户机 无中心服务器 提供计算服务 请求计算服务 每个节点(Peer)既是客户机也是服务器 永久在线 间歇接入网络 节点间…

基于 Spring Boot + Vue 实现人脸采集功能全流程

一、技术栈与依赖引入 后端依赖 (pom.xml) <!-- 百度AI SDK --> <dependency><groupId>com.baidu.aip</groupId><artifactId>java-sdk</artifactId><version>4.16.19</version><exclusions><exclusion><grou…

《Python基础》第3期:使用PyCharm编写Hello World

我们写文档大多用 Word、写表格大多用 Excel、写幻灯片大多用 PPT。 写代码也需要一个软件作为编辑器&#xff08;传说的大神用记事本写代码纯属玩笑了&#xff0c;越是大神越追求效率&#xff0c;用的软件功能越强&#xff09;。 Python 现在已经有了非常多的代码编辑器&#…

我的第一个开源项目:排序算法的多种实现方式

以 排序算法 为例&#xff0c;展示如何在 Python 中进行不同实现方式的对比项目概述本项目旨在通过 Python 实现几种经典的排序算法&#xff0c;并通过性能对比、代码注释和优化手段&#xff0c;为开源社区提供参考。选择排序、冒泡排序、快速排序和归并排序作为主要算法&#…

5G-LEO - 用于 5g satellite 链接的 OpenAirInterface™ 扩展

目标&#xff1a;5G-LEO 旨在加速 OAI 作为开源工具的发展&#xff0c;允许卫星通信社区交流和比较 5G NTN 结果&#xff0c;并促进研发活动的合作。扩展的OAI软件库被视为开发早期原型的重要工具&#xff0c;用于验证关键的5G NTN设计方面&#xff0c;并为3GPP标准化过程提供及…

基于 Mybatis 框架*的完整开发流程与顺序

基于 MyBatis 框架 的完整开发流程与顺序一、环境准备阶段1. 新建 Maven 项目&#xff08;或普通 Java 项目&#xff09;作用&#xff1a;用 Maven 统一管理依赖&#xff0c;自动下载 MyBatis、MySQL 驱动等 Jar 包操作&#xff1a;IDE&#xff08;如 IDEA&#xff09;选 Maven…

机械学习--决策树(实战案例)

决策树分两种分类和回归&#xff0c;这篇博客我将对两种方法进行实战讲解一、分类决策树代码的核心任务是预测 “电信客户流失状态”&#xff0c;这是一个典型的分类任务数据集附在该博客上&#xff0c;可以直接下载代码整体结构整理代码主要分为以下几个部分&#xff1a;导入必…

SQL154 插入记录(一)

描述牛客后台会记录每个用户的试卷作答记录到exam_record表&#xff0c;现在有两个用户的作答记录详情如下&#xff1a;用户1001在2021年9月1日晚上10点11分12秒开始作答试卷9001&#xff0c;并在50分钟后提交&#xff0c;得了90分&#xff1b;用户1002在2021年9月4日上午7点1分…

BeanFactory 和 ApplicationContext 的区别?

口语化答案好的&#xff0c;面试官。BeanFactory和ApplicationContext都是用于管理Bean的容器接口。BeanFactory功能相对简单。提供了Bean的创建、获取和管理功能。默认采用延迟初始化&#xff0c;只有在第一次访问Bean时才会创建该Bean。因为功能较为基础&#xff0c;BeanFact…

VNC连接VirtualBox中的Ubuntu24.04 desktop图形化(GUI)界面

测试环境&#xff1a;VirtualBox 7,Ubuntu24.04 desktop,Ubuntu24.04 server(no desktop) 一、下载和安装dRealVNC viewer。 二、配置 VirtualBox 网络&#xff1a;NAT 模式 端口转发 1、打开 VirtualBox&#xff0c;选择您的 Ubuntu 虚拟机&#xff0c;点击 设置。 选择 网…

浮动路由和BFD配置

拓扑图 前期的拓扑图没有交换机配置步骤 1、配置IP地址 终端IP地址的配置 路由器IP地址的配置 配置router的对应接口的IP地址 <Huawei>sys [Huawei]sysname router [router]interface Ethernet 0/0/0 [router-Ethernet0/0/0]ip address 192.168.10.254 24 [router-Ethern…

Docker 实战 -- Nextcloud

文章目录前言1. 创建 docker-compose.yml2. 启动 Nextcloud3. 访问 Nextcloud4. 配置优化&#xff08;可选&#xff09;使用 PostgreSQL使用 redis添加 Cron 后台任务5. 常用命令6. 反向代理&#xff08;Nginx/Apache&#xff09;前言 当你迷茫的时候&#xff0c;请点击 Docke…

【计算机网络 | 第2篇】计算机网络概述(下)

文章目录七.因特网服务提供商&#x1f95d;八.接入网&#x1f95d;主流的家庭宽带接入方式介入网工作原理&#x1f9d0;DSL技术&#xff1a;铜线上的“三通道”通信DSL的速率标准呈现出显著的"不对称"特征&#x1f914;电缆互联网接入技术&#x1f34b;‍&#x1f7e…

SpringMVC 6+源码分析(四)DispatcherServlet实例化流程 3--(HandlerAdapter初始化)

一、概述 HandlerAdapter 是 Spring MVC 框架中的一个核心组件&#xff0c;它在 DispatcherServlet 和处理程序&#xff08;handler&#xff09;之间扮演适配器的角色。DispatcherServlet 接收到 HTTP 请求后&#xff0c;需要调用对应的 handler 来处理请求&#xff08;如控制器…

【lucene】FastVectorHighlighter案例

下面给出一套可直接拷贝运行的 Lucene 8.5.0 FastVectorHighlighter 完整示例&#xff08;JDK 8&#xff09;&#xff0c;演示从建索引、查询到高亮的全过程。 > 关键点&#xff1a;字段必须 1. 存储原始内容&#xff08;setStored(true)&#xff09; 2. 开启 TermVecto…

C++返回值优化(RVO):高效返回对象的艺术

在C开发中&#xff0c;按值返回对象的场景十分常见&#xff08;如运算符重载、工厂函数等&#xff09;&#xff0c;但开发者常因担忧“构造/析构的性能开销”而陷入纠结&#xff1a;该不该返回对象&#xff1f;如何避免额外成本&#xff1f;本文将剖析痛点、拆解错误思路&#…

用 PyTorch 实现一个简单的神经网络:从数据到预测

PyTorch 是目前最流行的深度学习框架之一&#xff0c;以其灵活性和易用性受到开发者的喜爱。本文将带你从零开始&#xff0c;用 PyTorch 实现一个简单的神经网络&#xff0c;用于解决经典的 MNIST 手写数字分类问题。我们将涵盖数据准备、模型构建、训练和预测的完整流程&#…

四级页表通俗讲解与实践(以 64 位 ARM Cortex-A 为例)

&#x1f4d6; &#x1f3a5; B 站博文精讲视频&#xff1a;点击链接&#xff0c;配合视频深度学习 四级页表通俗讲解与实践&#xff08;以 64 位 ARM Cortex-A 为例&#xff09; 本文面向希望彻底理解现代 64 位架构下四级页表的开发者&#xff0c;结合 ARM Cortex-A 系列处理…

AI模型整合包上线!一键部署ComfyUI,2.19TB模型全解析

最近体验了AIStarter平台上线的AI模型整合包&#xff0c;包含2.19TB ComfyUI大模型&#xff0c;整合市面主流模型&#xff0c;一键部署ComfyUI&#xff0c;省去重复下载烦恼&#xff01;以下是使用心得和部署步骤&#xff0c;适合AI开发者参考。工具亮点这款AI模型整合包由熊哥…