RabbitMQ 消费幂等性与消息重放实现

一、幂等性实现

1.1 什么是幂等性?

幂等性是指同一条消息无论被消费多少次,业务结果都只生效一次,防止重复扣款、重复发货等问题。

RabbitMQ 的投递模式是“至少一次交付”(at-least-once delivery),如果消费者处理失败或者没有及时确认,消息会被多次投递。如果业务本身不具备幂等性,就可能导致重复扣款、重复发货等严重后果。

1.2 实现思路

RabbitMQ 只负责消息的可靠投递,而不会记录每条消息是否已经被成功消费。因此,需要由消费者端维护消费状态,常见做法是借助 Redis 实现去重逻辑。

消息在生产阶段应携带全局唯一的 message_id(例如订单号:order:10010)。在消费逻辑中,先通过 Redis 的原子命令 SETNX 尝试写入该 message_id:①如果 SETNX返回1,表示第一次消费,可以处理;②如果返回0,表示已消费,直接忽略

 二、消息重放实现

在RabbitMQ中,ack和nack机制是保证可靠投递、实现重放的关键。

2.1 ack和nack

如果你的消费逻辑里既没有调用ack,也没有调用nack,消息状态会一直unacked。只要没确认,就永远不会删除消息。

(1) ack

确认消息已被消费成功。当消费者调用:

ch.basic_ack(delivery_tag=method.delivery_tag)

RabbitMQ就会把消息从队列里永久删除。只要你ack了,这条消息就不可能再来了。

(2) nack

告诉RabbitMQ“我没处理好”。有两种方式:

# 发送nack并重入队列
# RabbitMQ会立刻把消息放回队列,再投递给其他消费者。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)# 发送nack不重入队列
# 消息就会被丢弃(或者,如果绑定了死信队列,就转入死信队列)。
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

2.2 实现代码

下方代码实现了以下关键功能:

1. 消息通过 SETNX + EXPIRE 在 Redis 中写入幂等标记,确保同一消息只会被一个消费者处理。
2. 如果标记已存在,判断是“已完成”还是“正在处理”,分别选择直接确认或稍后重试。
3. 业务处理成功后将标记更新为 done 并延长过期,表示消费已完成。
4. 如果处理失败,删除标记以便下次重新消费,并根据重试次数决定是否放弃或重试。

生产者代码
import pika
import uuidconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)message_id = str(uuid.uuid4())
body = "test message"    # 可以通过推送body = "fail message" 模拟消费异常properties = pika.BasicProperties(delivery_mode=2,message_id=message_id
)channel.basic_publish(exchange='',routing_key='test_queue',body=body,properties=properties
)print(f"[x] Sent '{body}' with message_id {message_id}")connection.close()
 消费者代码
import pika
import redis
import time# Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)# RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)MAX_RETRY = 5def callback(ch, method, properties, body):message_id = properties.message_idif not message_id:import hashlibmessage_id = hashlib.md5(body).hexdigest()redis_key = f"msg:{message_id}"retry_key = f"retry:{message_id}"# 尝试用SETNX写入幂等标记result = r.setnx(redis_key, "processing")if not result:status = r.get(redis_key)if status and status.decode() == "done":# 已经处理过ch.basic_ack(delivery_tag=method.delivery_tag)print(f"[!] Duplicate message detected: {message_id}")else:# 正在处理,稍后重试ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)print(f"[!] Message {message_id} is being processed by another consumer.")return# SETNX成功,要设置过期时间,防止永久占用r.expire(redis_key, 300)  # 300秒try:# 获取重试次数retry_count = r.get(retry_key)if retry_count is None:retry_count = 0else:retry_count = int(retry_count)print(f"[x] Processing message: {body.decode()} (retry: {retry_count})")# 模拟失败if "fail" in body.decode():raise Exception("Simulated failure")# 业务逻辑# ...# 处理成功,改为done并延长过期r.set(redis_key, "done")r.expire(redis_key, 24*60*60)r.delete(retry_key)ch.basic_ack(delivery_tag=method.delivery_tag)print("[+] Message processed successfully")except Exception as e:retry_count += 1r.set(retry_key, retry_count)r.expire(retry_key, 24*60*60)print(f"[!] Error processing message (retry {retry_count}): {e}")# 失败时删除幂等标记,下次可以继续处理r.delete(redis_key)if retry_count >= MAX_RETRY:ch.basic_ack(delivery_tag=method.delivery_tag)print("[!] Max retries reached, moving message to dead letter log.")else:ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test_queue',on_message_callback=callback,auto_ack=False
)print("[*] Waiting for messages. CTRL+C to exit")
channel.start_consuming()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/89213.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/89213.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【HarmonyOS 5】鸿蒙TEE(可信执行环境)详解

【HarmonyOS 5】鸿蒙TEE(可信执行环境)详解 一、TEE是什么? 1、TEE的定义: 可信执行环境(Trusted Execution Environment),简称TEE,是存在于智能手机、平板或任意移动设备主处理器…

算法: 冒泡排序

冒泡排序是一种简单的排序算法,通过相邻元素的比较和交换,使较大的元素逐渐"浮"到数组末尾。 时间复杂度:最佳 O(n) | 平均 O(n) | 最差 O(n) 空间复杂度:O(1) 稳定性:稳定 应用场景/前提条件 适用于小规模数据对几乎已排序的数据效率较高…

基于SpringBoot的家电销售展示平台

源码编号:S567 源码名称:基于SpringBoot的家电销售展示平台 用户类型:双角色,用户、管理员 数据库表数量:14 张表 主要技术:Java、Vue、ElementUl 、SpringBoot、Maven 运行环境:Windows/M…

java+vue+SpringBoo智慧旅游系统(程序+数据库+报告+部署教程+答辩指导)

源代码数据库LW文档(1万字以上)开题报告答辩稿ppt部署教程代码讲解代码时间修改工具 技术实现 开发语言:后端:Java 前端:vue框架:springboot数据库:mysql 开发工具 JDK版本:JDK1.…

Docker 入门教程(三):镜像操作命令

文章目录 🐳 Docker 入门教程(三):镜像操作命令获取镜像:docker pull查看镜像:docker images删除镜像:docker rmi搜索镜像:docker search镜像打标签:docker tag镜像详情与…

如何修改discuz文章标题字数限制 修改成255

在 Discuz! X3.5 中,文章(主题)标题字数的限制可以通过修改数据库结构以及后台配置来实现,以下是完整的修改方法,将标题长度限制改为 255 个字符: ✅ 一、修改数据库字段长度 Discuz 默认标题字段是 subje…

基于BP神经网络的26个英文字母识别

本课题旨在设计并实现一个基于BP(反向传播)神经网络的英文字母识别系统,实现对手写或打印的26个英文字母(A-Z)的自动分类识别。项目首先对字母图像进行预处理(如灰度化、归一化、二值化和特征提取&#xff…

系统架构设计师论文分享-论云原生技术的应用

我的软考历程 摘要 2023年2月,我所在的公司做了开发纱线MES系统的决定,该系统为国内纱线工厂提供SAAS服务,旨在提高纱线工厂的智能化和数字化水平。我在该项目中被任命为系统架构设计师,全面掌管该项目的架构设计工作。该项目涉…

重置 MySQL root 密码

引言 在linux可能存在安装mysql安装失败,一直不出现默认密码 /usr/local/mysql/mysql-8.0.26/bin/mysqld --defaults-file/etc/my.cnf --usermysql --basedir/usr/local/mysql/mysql-8.0.26 --datadir/usr/local/mysql/mysql-8.0.26/data --lower-case-table-name…

面试八股---HTML

面试八股 1、HTML 1.1 src和href的区别 src 用于替换当前元素,href 用于在当前文档和引用资源之间确立联系。 核心区别在于 href 关联的资源(主要是 CSS)是用于描述页面外观的,浏览器可以先生成内容再应用样式,因此…

气候智能体:AI如何重构人类应对气候危机的决策体系?

前言 前些天发现了一个巨牛的人工智能免费学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站 《气候智能体:AI如何重构人类应对气候危机的决策体系?》 展开全景式论述。文章结合2025年最新技术突破与…

UITableView的位置向下偏移, contentInsetAdjustmentBehavior使用详情

一.contentInsetAdjustmentBehavior 作用: 在iOS 11及以后,苹果引入了安全区域(Safe Area)的概念,当UITableView的frame超出了安全区域,系统会自定调整SafeAreaInsets的值,它可以自动调整内容的内边距,使得内容不会被导航栏遮挡。…

腾讯云RayData全新推出“行业解决方案模板”,一键快捷制作3D数据可视化作品

点击蓝字⬆ 关注我们 本文共计958字 预计阅读时长3分钟 腾讯云RayData Plus是一款专注于高视效的3D数据可视化的实时渲染工具。 功能全面:提供了三维、二维、动画、数据、交互逻辑等各类能力; 零代码制作:灵活的节点式创作,即便没…

深度解析基于贝叶斯的垃圾邮件分类

贝叶斯垃圾邮件分类的核心逻辑是基于贝叶斯定理,利用邮件中的特征(通常是单词)来计算该邮件属于“垃圾邮件”或“非垃圾邮件”的概率,并根据概率大小进行分类。它是一种朴素贝叶斯分类器,因其假设特征(单词…

WPF 3D 开发全攻略:实现3D模型创建、旋转、平移、缩放

🎮 WPF 3D 入门实战:从零打造一个可交互的立方体模型 标题: 🚀《WPF 3D 开发全攻略:实现旋转、平移、缩放与法线显示》 💡 引言 在现代图形应用中,3D 可视化已经成为不可或缺的一部分。WPF 提供…

Ruby 安装使用教程

一、Ruby 简介 Ruby 是一种简单快捷的面向对象脚本语言,以优雅、简洁、易读著称。它常被用于 Web 开发(如 Ruby on Rails 框架)、自动化脚本、DevOps、命令行工具等领域。 二、Ruby 安装教程 2.1 支持平台 Ruby 支持跨平台运行&#xff0c…

python | numpy小记(五):理解 NumPy 中的 `np.arccos`:反余弦函数

python | numpy小记(五):理解 NumPy 中的 np.arccos:反余弦函数 一、函数签名与核心参数二、数学定义与取值范围三、基础使用示例四、与 Python 内建 math.acos 的对比五、常见问题与注意事项六、典型应用场景1. 三维向量夹角计算…

华为云Flexus+DeepSeek征文 | 华为云ModelArts与Reor的完美结合:创建高效本地AI笔记环境

华为云FlexusDeepSeek征文 | 华为云ModelArts与Reor的完美结合:创建高效本地AI笔记环境 引言一、ModelArts Studio平台介绍华为云ModelArts Studio简介ModelArts Studio主要特点 二、Reor介绍Reor简介Reor主要特点 三、安装Reor工具下载Reor软件安装Reor工具 四、开…

【启发式算法】Dynamic A*(D*)算法详细介绍(Python)

📢本篇文章是博主人工智能(AI)领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅…

报告怎么写

替代方案(按场景选择) 岗前准备阶段 ✅ "熟悉业务流程/系统操作" ✅ "掌握XX工具/平台的核心功能" ✅ "完成上岗前技术对接" 知识转化场景 ✅ "梳理产品知识体系" ✅ "转化技术文档为实操方案" ✅ &…