Java学习--------消息队列的重复消费、消失与顺序性的深度解析​

        在 Java 分布式系统开发中,消息队列的应用已十分普遍。但随着业务规模扩大,消息的重复消费、意外消失、顺序错乱等问题逐渐成为系统稳定性的隐患。本文将从 Java 开发者的视角,深入分析这三大问题的产生原因、业务后果,并结合具体代码示例给出可落地的解决方案。​
消息重复消费是 Java 开发中最易遇到的问题。例如,使用 Spring Kafka 消费支付消息时,若处理逻辑未做防护,可能导致用户账户被重复扣款,这在金融场景中是致命的。​
产生原因通常有以下几点
生产者重试配置不当:例如:生产者使用RetryTemplate时,若未设置合理的重试条件,网络波动时会重复发送消息。​
消费者 ACK 机制误用:例如:Spring Kafka 默认ack-mode=BATCH,若消费者处理消息后未及时提交 offset,重启后会重复消费批次内消息。​
分布式事务补偿:例如:在 Seata 等分布式事务框架中,回滚后触发的消息重发可能导致重复。​
在不同场景下,带来的后果也十分严重,如在金融领域会导致出现重复扣款、重复转账的问题,从而引发资损和用户投诉。​在库存管理系统中会导致重复扣减库存,从而出现超卖或负库存。为了解决这个     

        通常采用多种方法解决,如:

        (1)幂等性设计

        (2全局 ID+Redis 去重

@Service
public class OrderConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate OrderService orderService;@KafkaListener(topics = "order_pay")public void handlePayMessage(ConsumerRecord<String, String> record) {String messageId = record.headers().lastHeader("messageId").value().toString();// 利用Redis的SETNX判断是否已处理Boolean isFirstHandle = redisTemplate.opsForValue().setIfAbsent("msg:processed:" + messageId, "1", 24, TimeUnit.HOURS);if (Boolean.TRUE.equals(isFirstHandle)) {// 首次处理:执行业务逻辑orderService.processPayment(record.value());} else {// 重复消息:直接返回log.info("重复消息,messageId:{}", messageId);}}
}

        (3)数据库唯一约束

@Transactional
public void processPayment(String orderJson) {OrderDTO order = JSON.parseObject(orderJson, OrderDTO.class);// 插入时若messageId重复,会抛出DuplicateKeyExceptionorderMapper.insert(new OrderPO().setOrderId(order.getOrderId()).setMessageId(order.getMessageId()).setStatus("PAID"));
}


消息消失指消息未被消费却永久丢失,例如用户下单消息消失会导致订单 “幽灵下单”,用户已付款但系统无记录。​
产生原因通常有以下几点
生产者未开启确认机制:如Kafka 生产者未设置acks=all,消息未写入分区副本即返回成功。​
Spring 容器关闭导致丢失:消费者在@PreDestroy阶段被强制关闭,未处理的消息被丢弃。​
中间件配置疏漏:如RabbitMQ 队列未设置durable=true,重启后队列消失;Kafka 未设置log.retention.hours,消息提前过期。​
其也会导致许多严重的后果,如:
交易链路断裂:支付成功但订单状态未更新,用户投诉。​
数据同步失败:跨系统数据未同步,导致库存、会员信息不一致。​
任务调度失效:定时任务触发消息丢失,导致任务漏执行。​
通常采用如下的解决方案:全链路可靠性保障​
1. 生产者确认机制(

@Configuration​
public class KafkaProducerConfig {​@Bean​public ProducerFactory<String, String> producerFactory() {​Map<String, Object> config = new HashMap<>();​config.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认​config.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试3次​config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性生产​return new DefaultKafkaProducerFactory<>(config);​}​
​@Bean​public KafkaTemplate<String, String> kafkaTemplate() {​KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());​// 发送结果回调​template.setProducerListener(new ProducerListener<>() {​@Override​public void onError(ProducerRecord<String, String> record, Exception exception) {​log.error("消息发送失败,topic:{}, msg:{}", record.topic(), record.value(), exception);​// 失败后可存入本地消息表,定时重试​}​});​sdareturn template;​}​
}​



2. 消费者手动确认

​
@Configuration​
public class KafkaConsumerConfig {​@Bean​public ConsumerFactory<String, String> consumerFactory() {​Map<String, Object> config = new HashMap<>();​config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 关闭自动提交​return new DefaultKafkaConsumerFactory<>(config);​}​
​@Bean​public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {​ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();​factory.setConsumerFactory(consumerFactory());​factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 手动确认​return factory;​}​
}​
​
// 消费者代码​
@KafkaListener(topics = "order_create")​
public void handleCreateMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {​try {​orderService.createOrder(record.value());​ack.acknowledge(); // 处理成功:手动提交offset​} catch (Exception e) {​log.error("处理失败,暂不确认", e);​// 可将消息转发到死信队列​}​
}​


消息顺序性指消息处理顺序与发送顺序一致。例如,订单的 “创建→支付→发货” 消息若顺序错乱,会导致 “未支付就发货” 的逻辑错误。​产生原因通常有以下几点:
多线程消费:Spring Kafka 的concurrency>1时,多个线程并行处理同一分区消息。​
分区路由错误:Kafka 生产者未指定partitioner.class,导致同一订单的消息被分配到不同分区。​
重试机制打乱顺序:失败消息进入重试队列后,后续消息先被处理。​
其可能会导致许多后果,如:
状态机异常:订单从 “待支付” 直接跳至 “已发货”,状态流转断裂。​
数据计算错误:账户先扣款后充值,导致余额计算错误。​
日志时序混乱:分布式追踪日志顺序错乱,难以排查问题。​
通常开发人员采用以下方案解决
1. 单分区 + 单线程消费
2. 按业务 ID 路由分区​
通过订单 ID 哈希到固定分区,确保同一订单的消息在同一分区:​

public class OrderPartitioner implements Partitioner {​@Override​public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {​// 订单ID作为key,哈希后取模分区数​String orderId = (String) key;​List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);​return Math.abs(orderId.hashCode()) % partitions.size();​}​
}​



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/89615.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/89615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Oracle】centos7离线静默安装oracle11g(p13390677_112040)

博文地址&#xff1a;https://blog.csdn.net/gitblog_06670/article/details/142569814 仓库地址&#xff1a;https://gitcode.com/Open-source-documentation-tutorial/31eb1/?utm_sourcedocument_gitcode&indexbottom&typecard 参考安装地址&#xff1a; 收费版&…

智能设备畅想

### 智能设备畅想 突然想到了一个好主意 因为最近在查无人机的相关资料&#xff08;很早之前就想搞个无人机玩玩但始终没有买&#xff09; 在了解自组装方面的内容时&#xff0c;和AI沟通了下 正好之前组装的 小智AI 基本上已经完善了&#xff0c;也正在考虑其在其他方向拓展的…

SpringAI——ChatModel

我的前面一篇文章&#xff08;SpringAI——ChatClient配置与使用&#xff09;中讲了ChatClient&#xff0c;它是一个构建于 ChatModel 之上的高层封装&#xff0c;它提供了更丰富的对话交互能力。可以这么说ChatModel相当于发动机&#xff0c;ChatClient相当于一台含有发动机、…

Zabbix监控K8S的PV信息详细教程!

文将介绍如何使用Zabbix自定义键值脚本方式监控K8S的PV卷状态等信息。 在Kubernetes (K8S) 中&#xff0c;PersistentVolume (PV) 是集群中的一个抽象层&#xff0c;它代表了底层存储资源&#xff0c;例如网络存储系统&#xff08;如NFS、Ceph、GlusterFS等&#xff09;或本地存…

wx小程序原生开发使用高德地图api

第一步&#xff1a;注册高德地图api的key第二步&#xff1a;下载amap-wx.js 放到项目的某个目录第三步&#xff1a;配置app.json&#xff08;必须&#xff0c;因为需要定位功能&#xff0c;&#xff09;"requiredPrivateInfos": ["getLocation"],"per…

如何通过mac的前24bit,模糊确认是那一台什么样的设备

MAC Address Lookup - MAC/OUI/IAB/IEEE Vendor Manufacturer Search Wireshark • Go Deep 上面这两个网址提供了&#xff0c;正对mac 的前24位&#xff0c;查找对应的网络设备厂商信息&#xff0c;可以让我们在运维过程中模糊的判断大约是什么型号的设备 使用macvendorloo…

【爬虫】04 - 高级数据存储

爬虫04 - 高级数据存储 文章目录爬虫04 - 高级数据存储一&#xff1a;加密数据的存储二&#xff1a;JSON Schema校验三&#xff1a;云原生NoSQL(了解)四&#xff1a;Redis Edge近端计算(了解)五&#xff1a;二进制存储1&#xff1a;Pickle2&#xff1a;Parquet一&#xff1a;加…

UDP和TCP的主要区别是什么?

在网络通信中&#xff0c;TCP&#xff08;传输控制协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;是两种核心的传输层协议。它们各自的特点和应用场景截然不同&#xff0c;理解两者的区别对于选择合适的通信方式至关重要。本文将通过几个关键点&#xff0c;用简…

Softhub软件下载站实战开发(十八):软件分类展示

Softhub软件下载站实战开发&#xff08;十八&#xff09;&#xff1a;软件分类展示 &#x1f5a5;️ 在之前文章中&#xff0c;我们实现了后台管理相关部分&#xff0c;本篇文章开始我们来实现用户端页面&#xff0c;由于内网使用&#xff0c;不需要sso优化等特性&#xff0c;我…

linux--------------------BlockQueue的生产者消费模型

1.基础BlockingQueue的生产者消费模型 1.1 BlockQueue 在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构&#xff0c;它与普通的队列区别在于&#xff0c;当队列为空时&#xff0c;从队列获取元素的操作将被阻塞&#xff0c;直到队列中放入了新的数据。当…

堆排序算法详解:原理、实现与C语言代码

堆排序&#xff08;Heap Sort&#xff09;是一种高效的排序算法&#xff0c;利用二叉堆数据结构实现。其核心思想是将待排序序列构造成一个大顶堆&#xff08;或小顶堆&#xff09;&#xff0c;通过反复调整堆结构完成排序。下面从原理到实现进行详细解析。一、核心概念&#x…

SSM框架——注入类型

引用类型的注入&#xff1a;Setter方法简单类型的注入&#xff1a;定义简单实例和方法在配置文件中对bean进行配置&#xff0c;使用porperty属性 值用value&#xff08;ref是用来获取bean的&#xff09;构造器方法&#xff1a;构造器方法中需要写name&#xff0c;这样程序就会耦…

信息学奥赛一本通 1552:【例 1】点的距离

【题目链接】 ybt 1552&#xff1a;【例 1】点的距离 【题目考点】 1. 最近公共祖先&#xff08;LCA&#xff09;&#xff1a;倍增求LCA 知识点讲解见&#xff1a;洛谷 P3379 【模板】最近公共祖先&#xff08;LCA&#xff09; 【解题思路】 首先用邻接表保存输入的无权图…

1Panel中的OpenResty使用alias

问题 在服务器上使用了1Panel的OpenResty来管理网站服务&#xff0c;当作是一个Nginx用&#xff0c;想做一个alias来直接管理某个文件夹的文件&#xff0c;于是直接在其中一个网站中使用了alias配置。 location /upload {alias /root/upload;autoindex on;charset utf-8;charse…

小明记账簿焕新记:从单色到多彩的主题进化之路

【从冷静蓝到多彩世界&#xff0c;这一次我们重新定义记账美学】 曾经&#xff0c;打开“小明记账簿”是一片沉稳的蓝色海洋&#xff0c;它像一位理性的财务管家&#xff0c;默默守护着你的每一笔收支。但总有人悄悄问&#xff1a;“能不能多一些颜色&#xff1f;”今天&#x…

Apache IoTDB(1):时序数据库介绍与单机版安装部署指南

目录一、Apache IoTDB 是什么&#xff1f;1.1 产品介绍1.2 产品体系1.3 产品架构二、IoTDB 环境配置2.1 Linux系统需准备环境2.2 Windows系统需准备环境2.3 网络配置2.3.1 关闭防火墙2.3.2 查看端口是否占用2.3.3 避雷经验三、IoTDB 单机版系统部署安装指南3.1 产品下载3.2 注意…

Python 图片爬取入门:从手动下载到自动批量获取

前言 想批量下载网页图片却嫌手动保存太麻烦&#xff1f;本文用 Python 带你实现自动爬取&#xff0c;从分析网站到代码运行&#xff0c;步骤清晰&#xff0c;新手也能快速上手&#xff0c;轻松搞定图片批量获取。 1.安装模块 在开始爬取图片前&#xff0c;我们需要准备好工具…

aspect-ratio: 1 / 1样式在部分手机浏览器中失效的问题怎么解决?

最近在uniapp开发时又遇到了安卓手机不兼容问题&#xff0c;ios系统无影响。开发背景&#xff1a;小编想通过网格布局来实现答题卡的布局&#xff0c;实现五列多行的形式。代码片段&#xff1a;<view class"question-grid"><viewv-for"(question, inde…

RecyclerView与ListView深度对比分析

1. 使用流程对比ListView: 布局XML&#xff1a; 在布局文件中放置 <ListView> 控件&#xff0c;指定 id (如 android:id"id/listView")。数据适配器 (Adapter)&#xff1a; 继承 BaseAdapter 或 ArrayAdapter / CursorAdapter / SimpleAdapter。 重写 getCount…

deepseekAI对接大模型的网页PHP源码带管理后台(可实现上传分析文件)

前端后端都已进行优化&#xff0c;新增可上传文件功能&#xff08;拖拽进去也可以&#xff09;&#xff0c;后端进行风格主题设置&#xff0c;优化数据结构&#xff01;依旧测试网站&#xff1a;iEPMS我的工具箱&#xff0c;你的智慧助手&#xff01;还是那句话兄弟们轻点搞我的…