幂等性、顺序性保障以及消息积压

幂等性

概念

在应用程序中,幂等性就是指对一个系统进行重复调用(相同参数),不论请求多少次,这些请求对系统的影响都是相同的效果.

比如数据库的select操作.不同时间两次查询的结果可能不同,但是这个操作是符合幂等性的.幂等性指的是对资源的影响,而不是返回结果,查询操作对数据资源本身不会产生影响,之所以结果不同,可能是因为两次查询之间有其他操作对资源进行了修改.

比如i++这个操作,就是非幂等性的,如果调用方没有控制好逻辑,一次流程重复调用好几次,结果
就会不同。

MQ的幂等性介绍
对于MQ而言,幂等性是指同一条消息,多次消费,对系统的影响是相同的
一般消息中间件的消息传输保障分为三个层级。
1.At most once:最多一次.消息可能会丢失,但绝不会重复传输.
2.Atleastonce:最少一次.消息绝不会丢失,但可能会重复传输。
3.Exactlyonce:恰好一次.每条消息肯定会被传输一次且仅传输一次.

RabbitMQ支持"最多一次"和"最少一次".
对于"恰好一次",目前RabbitMQ还做不到,不仅是RabbitMQ,目前市面上主流的消息中间件,都做不到这一点
在业务使用中,对于可靠性要求比较高的场景,建议使用”最少一次",以防止消息丢失,最多一次"会因为消息发送过程中,网络问题,消费出现异常等种种原因,导致消息丢失


以下场景可能会导致消息发送重复(包含但不限于)

发送时消息重复:当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户
端宕机,导致服务端对客户端应答失败,如果此时Producer意识到消息发送失败并尝试再次发送消
息,Consumer后续会收到两条内容相同并且MessageID也相同的消息.

投递时消息重复:消息消费的场景下,消息已投递到consumer并完成业务处理,当客户端给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,云消息队列RabbitMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,Consumer后续会收到两条内容相同并且Message ID也相同的消息.

解决方案

为了避免同一消息重复消费的话,我们可以通过以下方案来解决:

全局唯一 ID 设置:
每一个消息都有唯一的 ID 进行标识,我们可以使用 redis 缓存过已经消费过的消息ID,当消费的时候先查阅 redis 是否存在这个 ID ,如果不存在则进行消费,消费成功后将消息ID 存储到 Reids 中,如果存在则不消费这条消息


业务逻辑判断:
例如:通过检查数据库中是否已存在相关数据记录,或者使用乐观锁机制来避免更新已被其他事务更改的数据,再或者在处理消息之前,先检查相关业务的状态,确保消息对应的操作尚未执行,然后才进行处理,具体根据业务场景来处理

顺序性保障

消息的顺序性是指消费者消费的消息和生产者发送消息的顺序是一致的。

比如生产者发送的消息分别是msgl,msg2,msg3,那么消费者也是按照msgl,msg2,msg3的顺序进行消费的.

很多业务场景下,消息的消费是不用保证顺序的,比如使用MQ实现订单超时的处理,但有些业务场景,可能存在多个消息顺序处理的情况,比如用户信息修改,对同一个用户的同一个资料进行修改,需要保证消息的顺序。

一些资料显示RabbitMQ的消息能够保障顺序性,这是不严谨的,在不考虑消息丢失,网络故障等异常的
情况下,如果只有一个消费者,最好也只有一个生产者的情况下,是可以保证消息的顺序性,如果有多个
生产者同时发送消息,无法确定消息到达RabbitMQBroker的前后顺序,也就无法验证消息的顺序性.
哪些情况可能会打破RabbitMQ的顺序性呢?下面介绍几种常见的场景:
1.多个消费者:当队列配置了多个消费者时,消息可能会被不同的消费者并行处理,从而导致消息处理
的顺序性无法保证
2.网络波动或异常:在消息传递过程中,如果出现网络波动或异常,可能会导致消息确认(ACK)丢失,从
而使得消息被重新入队和重新消费,造成顺序性问题。
3.消息重试:如果消费者在处理消息后未能及时发送确认,或者确认消息在传输过程中丢失,那么MQ
可能会认为消息未被成功消费而进行重试,这也可能导致消息处理的顺序性问题
4.消息路由问题:在复杂的路由场景中,消息可能会根据路由键被发送到不同的队列,从而无法保证全
局的顺序性。
5.死信队列:消息因为某些原因(如消费端拒绝消息)被放入死信队列,死信队列被消费时,无法保证消息
的顺序和生产者发送消息的顺序一致
包括但不仅限于以上几种情形会使RabbitMQ消息错序,如果要保证消息的顺序性,需要业务方使用
RabbitMQ之后做进一步的处理

实现方案

消息顺序性保障分为:局部顺序性保证和全局顺序性保证
局部顺序性通常指的是在单个队列内部保证消息的顺序,全局顺序性是指在多个队列或多个消费者之间保证消息的顺序。

在实际应用中,全局顺序性很难实现,可以考虑使用业务逻辑来保证顺序性,比如在消息中嵌入序列号,
并在消费端进行排序处理,相对而言,局部顺序性更常见,也更容易实现

RabbitMQ作为一个分布式消息队列,主要优化的是吞吐量和可用性,而不是严格的顺序性保证,如果业
务场景确实需要严格的消息顺序,可能需要在应用层面进行额外的设计和实现

实现策略:
1.单队列单消费者
最简单的方法是使用单个队列,并由单个消费者进行处理,同一个队列中的消息是先进先出的,这是
RabbitMQ来帮助我们保证的.

2.分区消费
单个消费者的吞吐太低了,当需要多个消费者以提高处理速度时,可以使用分区消费,把一个队列分割成
多个分区,每个分区由一个消费者处理,以此来保持每个分区内消息的顺序性
RabbitMQ本身并不支持分区消费,需要业务逻辑去实现,或者借助spring-cloud-stream来实现,官方文档:https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html

3.业务逻辑实现

消息积压

消息积压是指在消息队列(如RabbitMQ)中,待处理的消息数量超过了消费者处理能力,导致消息在队列
中不断堆积的现象.

通常有以下几种原因:
1.消息生产过快:在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理能力。

2.消费者处理能力不足:消费者处理处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压.
可能原因有:
1)消费端业务逻辑复杂,耗时长
2)消费端代码性能低
3)系统资源限制,如CPU、内存、磁盘I/O等也会限制消费者处理消息的效率。
4)异常处理处理不当,消费者在处理消息时出现异常,导致消息无法被正确处理和确认

3.网络问题:因为网络延迟或不稳定,消费者无法及时接收或确认消息,最终导致消息积压

4.RabbitMQ服务器配置偏低
消息积压可能会导致系统性能下降,影响用户体验,甚至导致系统崩溃,因此,及时发现消息积压并解决
对于维护系统稳定性至关重要。

解决方案

遇到消息积压时,首先要分析消息积压造成的原因,根据原因来调整策略。

主要从以下几个方面来解决:

  1. 提高消费者效率
    a. 增加消费者实例数量,比如新增机器
    b. 优化业务逻辑,比如使用多线程来处理业务
    c. 设置prefetchCount,当一个消费者阻塞时,消息转发到其他未阻塞的消费者.
    d. 消息发生异常时,设置合适的重试策略,或者转入到死信队列

  2. 限制生产者速率,比如流量控制,限流算法等。
    a. 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送速率
    b. 限流:使用限流工具,为消息发送速率设置一个上限
    c. 设置过期时间,如果消息过期未消费,可以配置死信队列,以避免消息丢失,并减少对主队列的压力

  3. 资源与配置优化.比如升级RabbitMQ服务器的硬件,调整RabbitMQ的配置参数等

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/98408.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/98408.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法训练营DAY58 第十一章:图论part08

拓扑排序精讲 卡码网:117. 软件构建(opens new window) 题目描述: 某个大型软件项目的构建系统拥有 N 个文件,文件编号从 0 到 N - 1,在这些文件中,某些文件依赖于其他文件的内容,这意味着如果文件 A 依…

如何在Python中使用正则表达式?

在Python中使用正则表达式主要通过内置的re模块实现。正则表达式用于匹配、查找、替换字符串中的特定模式,是处理文本的强大工具。以下是使用正则表达式的核心方法和示例: 一、基本用法步骤 导入re模块:import re定义正则表达式模式&#xff…

用 Trae 玩转 Bright Data MCP 集成

引言 在自动化与智能体浪潮中,Trae 以“开箱即用、所见即所得”的工具编排体验,成为个人与团队落地 AI 工作流的高效选择。本篇将以 Trae 为主角,展示如何通过最少配置完成与 Bright Data MCP 的对接,并快速构建一个可用、可观测…

大数据Spark(六十三):RDD-Resilient Distributed Dataset

文章目录 RDD-Resilient Distributed Dataset 一、RDD五大特性 二、RDD创建方式 RDD-Resilient Distributed Dataset 在 Apache Spark 编程中,RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark Core 中最基本的数…

java,通过SqlSessionFactory实现动态表明的插入和查询(适用于一个版本一个表的场景)

1,测试实体类package org.springblade.sample.test;import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data;/*** Author: 肖扬* CreateTime: 2025-09-05* Description: SqlSessionFactoryTest测试* Version: 1.0*/ Data TableName("session_factory_…

鹧鸪云光储流程系统全新升级:视频指引与分阶段模块使用指南

鹧鸪云光储流程系统近日完成重要更新,全面优化了操作指引体系,为用户带来更高效、直观的使用体验。本次升级重点推出了全套功能操作视频,并明确了不同业务阶段的核心模块使用指南,助力用户快速上手、提升工作效率。全覆盖视频操作…

ChatGPT 协作调优:把 SQL 查询从 5s 优化到 300ms 的全过程

ChatGPT 协作调优:把 SQL 查询从 5s 优化到 300ms 的全过程 🌟 Hello,我是摘星! 🌈 在彩虹般绚烂的技术栈中,我是那个永不停歇的色彩收集者。 🦋 每一个优化都是我培育的花朵,每一个…

复杂计算任务的智能轮询优化实战

目录 复杂计算任务的智能轮询优化实战 一、轮询方法介绍 二、三种轮询优化策略 1、用 setTimeout 替代 setInterval 2、轮询时间指数退避 3、标签页可见性检测(Page Visibility API) 三、封装一个简单易用的智能轮询方法 四、结语 作者&#xff…

Java开发中常用CollectionUtils方式,以及Spring中CollectionUtils常用方法示例

场景 Java开发中常用的CollectionUtils 一、Spring Framework的CollectionUtils 包路径&#xff1a;org.springframework.util.CollectionUtils 核心方法&#xff1a; isEmpty(Collection<?> coll) List<String> list null; boolean empty CollectionUtil…

人工智能学习:Transformer结构(文本嵌入及其位置编码器)

一、输入部分介绍 输入部分包含: 编码器源文本嵌入层及其位置编码器 解码器目标文本嵌入层及其位置编码器 在transformer的encoder和decoder的输入层中,使用了Positional Encoding,使得最终的输入满足: 这里,input_embedding是通过常规embedding层,将每一个词的…

⸢ 肆 ⸥ ⤳ 默认安全建设方案:c-1.增量风险管控

&#x1f44d;点「赞」&#x1f4cc;收「藏」&#x1f440;关「注」&#x1f4ac;评「论」 在金融科技深度融合的背景下&#xff0c;信息安全已从单纯的技术攻防扩展至架构、合规、流程与创新的系统工程。作为一名从业十多年的老兵&#xff0c;将系统阐述数字银行安全体系的建设…

第二课、熟悉Cocos Creator 编辑器界面

本文主要介绍Cocos Creator 编辑器界面中几个常规的面板功能&#xff0c;让新手了解编辑器界面中常规的面板功能&#xff0c;更好的使用Cocos Creator 编辑器。一、编辑器界面常规面板划分Cocos Creater编辑器默认样式如上&#xff0c;主要包含&#xff1a;1、工具栏&#xff0…

Elixir通过Onvif协议控制IP摄像机,扩展ExOnvif的摄像头连续移动功能 ContinuousMove

Elixir 通过Onvif 对IP设备进行控制时&#xff0c;可以使用 ExOnvif 库。ExOnvif官方文档 此文章仅提供了ContinuousMove的控制方式及示例。 Elixir Onvif协议控制IP设备的其他命令&#xff0c;可以参考以下链接 绝对移动 【AbsoluteMove】 调用指定预置位 【GotoPreset】 …

android studio JNI 环境配置实现 java 调用 c/c++

1、在 app 级的 build.gradle 文件配置两个地方 android{ defaultConfig{ // 在 defaultConfig 里配置下面代码 externalNativeBuild { cmake { cppFlags "-frtti -fexceptions"//添加对 c 的异常处理支持 …

静态时序分析详解之时序路径类型

目录 一、概览 二、时序路径 2.1 数据路径 2.2 时钟路径 2.3 时钟门控路径 2.4 异步路径 2.5 关键路径 2.6 False路径 2.7 单周期路径 2.8 多周期路径 2.9 最长路径和最短路径 三、参考资料 一、概览 ​ ​静态时序分析通过模拟最差条件下分析所有的时序路径&am…

SpringBoot埋点功能技术实现方案深度解析:架构设计、性能优化与扩展性实践

SpringBoot埋点功能技术实现方案深度解析&#xff1a;架构设计、性能优化与扩展性实践 1. 原理剖析与技术实现细节 1.1 埋点技术基本原理 埋点&#xff08;Tracking&#xff09;是通过在代码中植入特定逻辑&#xff0c;收集用户行为数据、系统运行状态和业务指标的技术手段。在…

自建prometheus监控腾讯云k8s集群

自建prometheus监控腾讯云k8s集群 使用场景 k8s集群&#xff08;腾讯云容器服务&#xff09; promtheus (外部自建服务) 腾讯云提供了容器内部自建 Prometheus 监控 TKE 集群的文档&#xff0c;参考。 当前的环境promethues建在k8S外的云服务器上&#xff0c;与上面链接文…

2025高教社国赛数学建模C题参考论文(含模型和代码)

2025 年高教社杯大学生数学建模竞赛 C 题参考论文 目录 NIPT 的时点选择与胎儿的异常判定 摘要 1 问题重述 2 问题分析 2.1 问题 1 分析 2.2 问题 2 分析 2.3 问题 3 分析 2.4 问题 4 分析 3 模型假设与符号定义 3.1 模型假设 4. 孕周在 10-25 周内检测有…

iOS开发环境搭建及打包流程

一、下载xcode 直接去苹果商店的appstore下载就行 二、clone项目 1.登录xcode苹果账号或对应代码仓库账号 2.clone项目 3.安装设备真机环境&#xff08;未安装过的话&#xff09; 三.安装cocoapods 1. 检查并更新 Ruby 环境 CocoaPods 是基于 Ruby 编写的&#xff0c;因此…

数据结构之链表(单向链表与双向链表)

一&#xff0c;链表描述链表是一种常见的重要的数据结构,是动态地进行存储分配的一种结构。常用于需存储的数据的数目无法事先确定。1.链表的一般结构链表的组成&#xff1a; 头指针&#xff1a;存放一个地址&#xff0c;该地址指向一个元素 结点&#xff1a;用户需要的实际数据…