详解Kafka通过幂等性实现分区消息不重复的机制

一、核心机制:PID与序列号

1. Producer ID (PID)

  • 唯一标识:每个生产者实例启动时,由Kafka Broker分配一个全局唯一的PID,用于标识消息来源。
  • 持久化存储:PID由Broker持久化保存,确保生产者重启后仍能追踪历史状态(但跨会话时PID会变更)。

2. 序列号 (Sequence Number)

  • 分区级递增:生产者为每个分区维护一个单调递增的序列号,从0开始。
  • 消息附加:每条消息发送时,附带当前分区的序列号。
  • Broker验证:Broker为每个<PID, Partition>对记录最后接收的序列号,新消息的序列号必须满足:
    • 等于预期值SN_new = SN_old + 1 → 接受并更新序列号。
    • 小于预期值SN_new < SN_old + 1 → 视为重复消息,丢弃。
    • 大于预期值SN_new > SN_old + 1 → 视为乱序或丢失,触发异常。

二、分区级别幂等性实现

1. 单分区内的唯一性保证

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默认值,确保消息可靠存储
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
  • 机制:通过PID和序列号,确保同一生产者实例向同一分区发送的消息不重复。
  • 限制
    • 跨分区无效:同一生产者向不同分区发送的消息可能重复。
    • 跨会话无效:生产者重启后PID变更,跨会话消息无法保证幂等性。

2. Broker端去重缓存

  • 缓存结构:Broker维护最近接收的<PID, SequenceNumber>映射,缓存最近5个批次的消息(固定大小,不可配置)。
  • 验证流程
    1. 接收消息后,检查PID和序列号是否存在于缓存。
    2. 若存在且序列号连续,接受消息并更新缓存。
    3. 若序列号不连续或重复,丢弃消息。

三、配置与启用

1. 生产者配置

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-cluster:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 默认值,确保消息可靠存储
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试

2. 默认行为

  • 启用幂等性后,acks自动设为all,确保所有副本确认后再返回成功。
  • 重试机制默认启用,避免因网络问题导致消息丢失。

四、限制与扩展

1. 单会话限制

  • PID变更:生产者重启后,PID变更,跨会话消息无法保证幂等性。
  • 解决方案:结合事务机制(transactional.id)实现跨会话的精确一次语义。

2. 事务扩展

  • 事务ID:通过配置transactional.id,将生产者ID与事务关联,确保跨分区和跨会话的原子性。
  • 配置示例
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "pay-service-1");
    producer.initTransactions();
    try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
    } catch (Exception e) {producer.abortTransaction();
    }

3. 消费者端处理

  • 去重需求:消费者需自行处理重复消息,例如:
    • 数据库唯一约束:在消息处理时添加业务唯一键(如订单ID)。
    • 业务逻辑去重:通过状态检查避免重复操作。

五、性能与调优

1. 性能影响

  • Broker端开销:维护PID和序列号缓存增加内存消耗,但通过固定缓存大小(5个批次)平衡性能与空间。
  • 客户端优化
    • 增大batch.sizelinger.ms,减少网络请求次数。
    • 调整max.in.flight.requests.per.connection(默认5)以控制并发请求。

2. 高并发优化

  • Broker配置
    • 增加num.io.threadsqueued.max.requests,提升处理能力。
  • 架构优化:动态均衡分区热点,避免单分区过载。

六、总结

  • 核心原理:通过PID和序列号在分区级别实现消息唯一性,确保同一生产者会话内消息不重复。
  • 适用场景:单分区消息去重,结合事务可扩展至跨分区和跨会话。
  • 消费者责任:需额外处理重复消息,依赖业务逻辑或外部机制(如数据库唯一约束)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/88509.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

压缩包方式在centos7版本上安装mysql8.0

使用tar命令解压 tar -zxvf mysql-8.0.32-el7-x86_64.tar.gz -C /usr/local/到/usr/local/修改解压后的文件名为mysql 创建mysql用户组和用户&#xff0c;自己在mysql下面创建data目录存储信息&#xff0c;把权限交给mysql这个用户 groupadd mysql useradd -r -g mysql mysql c…

使用ansible给被管理节点安装docker

在跳板机上安装ansible,再通过ansible的playbook,给被管理节点安装docker。 跳板机配置 实验环境 华为云上按需开两台2核2G的Ubuntu的ECS&#xff1b;2台公网IP为5Mbit/s&#xff0c;按需按流量&#xff1b;2台服务器在一个子网内;跳板机和被管理节点主机分别挂不通的安全组 在…

《Java EE与中间件》实验三 基于Spring Boot框架的购物车

目 录 一、实验目的和要求 二、实验实现思路及步骤 1、实验思路 2、实验步骤 3、实验方案 三、主要开发工具 四、实验效果及实现代码 1、购物车数据库构建实现 &#xff08;1&#xff09;建立javaee-project数据库 &#xff08;2&#xff09;建立t_cart数据表 &…

DAS3D: Dual-modality Anomaly Synthesis for 3D Anomaly Detection 论文精读

题目&#xff1a;DAS3D: Dual-modality Anomaly Synthesis for 3D Anomaly Detection 题目&#xff1a;DAS3D&#xff1a;用于三维异常检测的双模态异常合成 论文地址&#xff1a;ECCVW 2024 2410 Dual-modality 双模态 Anomaly Synthesis 异常合成 for 3D Anomaly Detection…

EasyCVR视频汇聚平台国标接入设备TCP主动播放失败排查指南

部分客户现场的下级平台通过国标级联接入安防监控系统EasyCVR后&#xff0c;只能通过TCP主动的播放方式进行播放&#xff08;并不是所有下级平台都支持tcp主动播放&#xff0c;模式需下级平台支持&#xff09;&#xff0c;但是有些平台刚接入的时候发现不能播放。核心原因分析&…

linux打包指令和移动指令

在Linux中&#xff0c;常用的文件夹打包命令是 tar&#xff0c;它可以将文件夹压缩打包成 .tar、.tar.gz、.tar.bz2 等格式的文件。以下是具体用法&#xff1a; 1. 基础打包&#xff08;不压缩&#xff0c;生成 .tar 文件&#xff09; 将文件夹 folder 打包为 folder.tar&#…

神经符号AI:结合深度学习和符号逻辑的下一代AI

神经符号AI&#xff1a;结合深度学习和符号逻辑的下一代AI当AI医生解释诊断时&#xff0c;它不仅能指出医学影像中的异常像素模式&#xff0c;还能引用临床指南中的第三条第二款&#xff0c;推演病理发展的逻辑链条——这正是神经符号AI赋予机器的“理性之光”。2025年初&#…

SpringBoot JWT

jsonwebtoken 引依赖 <dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.12.3</version></dependency> 测试一下&#xff0c;jwt是2个带逗号的3段字符串 官网参考&#xff1a;JSON …

读取QPS 10万,写入QPS 1000,如何设计系统架构?

你是否也曾深陷在臃肿的领域模型&#xff08;Domain Model&#xff09;的泥潭&#xff0c;一个 User 或 Order 实体类&#xff0c;既要处理复杂的业务逻辑和数据校验&#xff0c;又要承载各种为前端展示而生的DTO转换&#xff0c;导致模型越来越胖&#xff0c;读写性能相互掣肘…

UE5 Rotate 3 Axis In One Material

首先没有用旋转矩阵&#xff0c;我用过旋转矩阵&#xff0c;传进去的角度旋转的角度和欧拉角传进去角度旋转出来的不一样&#xff0c;就没有用最后用的RotateAboutAxis&#xff0c;这个玩意儿研究老半天&#xff0c;只能转一个轴&#xff0c;角度和欧拉角的一样的最后研究出Rot…

计算机网络实验——访问H3C网络设备

一、实验目的1. 熟悉H3C路由器的开机界面&#xff1b;2. 通过Console端口实现对上电的H3C路由器的第一次本地访问&#xff1b;3. 掌握H3C设备命名等几个常用指令&#xff1b;4. 掌握如何将H3C设备配置为Telnet服务器&#xff1b;5. 掌握如何将H3C设备配置为Telnet客户端并实现访…

【C语言】学习过程教训与经验杂谈:思想准备、知识回顾(四)

&#x1f525;个人主页&#xff1a;艾莉丝努力练剑 ❄专栏传送门&#xff1a;《C语言》、《数据结构与算法》、C语言刷题12天IO强训、LeetCode代码强化刷题 &#x1f349;学习方向&#xff1a;C/C方向 ⭐️人生格言&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为…

Vim 指令

Vim 是一款功能强大但学习曲线陡峭的文本编辑器&#xff0c;核心在于其模式化操作。掌握常用指令能极大提升效率。以下是指令分类整理&#xff1a;一、核心模式切换 (必须掌握&#xff01;)i&#xff1a;在光标前进入 插入模式 (Insert Mode)a&#xff1a;在光标后进入 插入模式…

vue2中使用xgplayer播放流视频

1、官网 2、安装后无法播放时&#xff0c;经测试&#xff0c;需要降低版本 "xgplayer-hls": "2.2.2","xgplayer": "2.31.6"改为以上版本可以正常播放 3、完整使用 &#xff08;1&#xff09;引入 import xgplayer import hlsjsPlayer…

Jmeter进阶篇(35)完美解决Jmeter转换HTML报告报错“Begin size 0 is not equal to fixed size 5”

今天博主在使用Jmeter运行完压测,使用生成的csv文件,运行以下命令: C:\apache-jmeter-5.2.1\bin>jmeter -g C:\res.csv -o C:\report生成HTML报告时,发现报错“Begin size 0 is not equal to fixed size 5”。 问题原因 原因是我:本地用的是JDK17,但Jmeter5.2.1仅支…

linux中tcpdump抓包中有组播数据,应用程序收不到数据问题

问题描述服务器运行正常&#xff0c;维保需要&#xff0c;重启服务器后应用程序无法收到组播的媒体数据。百思不得其解。原因分析最终的定位原因是 linux系统的自我保护机制导致的。rp_filter&#xff08;反向路径过滤&#xff09;是Linux内核的一个安全特性&#xff0c;用于防…

人工智能-基础篇-29-什么是低代码平台?

低代码平台&#xff08;Low-Code Development Platform, LCDP&#xff09;是一种通过可视化界面和少量代码&#xff08;或无需代码&#xff09;快速构建应用程序的开发工具。它的核心目标是通过简化开发流程&#xff0c;降低技术门槛&#xff0c;使企业能够更高效地响应业务需求…

PyTorch随机擦除:提升模型抗遮挡能力

PyTorch中内置的随机擦除&#xff08;Random Erasing&#xff09;数据增强通过torchvision.transforms.RandomErasing实现&#xff0c;以下是原理和用法的详细说明&#xff1a;核心原理正则化作用&#xff1a; 随机擦除在训练图像上随机遮盖一个矩形区域&#xff0c;模拟遮挡场…

微信小程序交互精髓:点击操作与状态管理实战

目录 一、点击事件绑定&#xff1a;bindtap 与 catchtap 的正确使用 基础语法对比 事件对象详解 二、点击切换选中状态&#xff1a;数据驱动视图的实现 1. 单元素状态切换 2. 多元素单选状态 3. 多元素多选状态 三、样式动态切换&#xff1a;数据绑定与 CSS 的完美结合 …

Language Models are Few-Shot Learners: 开箱即用的GPT-3(二)

接上一篇 Approach 前面的摘要和Introduction做了一些概要性的介绍,论文在第二章,也就是approach中,介绍了模型的设计,zero,one,few-shot的设计等等。 这一章一开头就说,GPT-3的结构和GPT-2的结构一样,只是在相应的把模型尺寸,数据规模,训练时间等增加了。Our bas…