深入理解Kafka事务

一 kafka事务介绍

1.1 Kafka事务的作用

  • Exactly-Once Semantics (EOS):在“消费 → 处理 → 生产”的流式链路里避免重复写重复读带来的副作用,确保“处理一次且仅一次”的可见效果。

  • 跨分区 / 跨 Topic 原子性:将一次处理内写入的多分区多主题消息,以及本次消费位点 offset 的提交,绑定在同一个事务里,要么都生效,要么都回滚。

1.2 相关术语

  • PID / Producer IDEpochSequence Number:幂等生产者元数据,避免重复写。

  • 事务协调器(Transaction Coordinator):位于 broker 侧的协调者,管理事务状态机与两阶段提交。

  • 控制批次(Control Batch / Control Records):日志里的特殊记录,用于标记事务,主要是 COMMIT / ABORT(注意:数据分区不写“BEGIN”标记)。

  • LSO(Last Stable Offset)HW(High Watermark):对 read_committed 消费者只暴露到 LSO,屏蔽未决事务。

  • __transaction_state:kafka内部主题,用于持久化事务状态机。

  • __consumer_offsets:kafka内部主题,存消费组位点;位点也可以被纳入事务。

  • 僵尸实例:一个旧的 Producer 实例(带着同样的 transactional.id)在崩溃或网络分区后挂掉了,但它可能在恢复后继续尝试往 Kafka 写数据,但是与此同时,已经有一个新的 Producer 实例已经起来并接管了同样的 transactional.id,我们把这个宕机后又恢复的producer叫做僵尸实例

1.3 消费者隔离级别

消费者的隔离级别有下面两种

  • read_uncommitted(默认):可读到未提交已提交数据。

  • read_committed:只读取已提交事务的数据(EOS 流水线应使用)。

假设想要配置消费者隔离级别为read_committed,可通过下面配置完成

props.put("isolation.level", "read_committed");

二、使用 Kafka 事务

2.1 生产者端配置

Properties props = new Properties(); 
// broker地址
props.put("bootstrap.servers", "broker1:9092,broker2:9092"); 
// transactional.id 必须唯一且稳定(可复用)
props.put("transactional.id", "order-service-txn-1");  
// 配了 transactional.id 会自动开启,但是最好还是显式配置
props.put("enable.idempotence", "true"); 
/** 
然后通常由客户端自动/隐式设置为适配幂等语义:
要求acks=all、retries>0 max.in.flight.requests.per.connection<=5 等,
不配置就会取默认的值,比如retries = Integer.MAX_VALUE
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(props); 
// 找到协调器、申请 PID/epoch、登记事务状态
producer.initTransactions(); 

2.2 事务性生产

// 开启事务
producer.beginTransaction();// 发送消息
producer.send(new ProducerRecord<>("demo-topic", "key1", "message-1"));
producer.send(new ProducerRecord<>("demo-topic", "key2", "message-2"));
producer.send(new ProducerRecord<>("demo-topic2", "key3", "message-3"));// 提交事务
producer.commitTransaction();

这样,对于配置了read_committed的消费者而言,要么这三个消息同时可见,要么同时不可见。

2.3 实践建议

  • 使用稳定且可复用的 transactional.id,这样服务重启后就可恢复事务上下文,还能对“僵尸实例”做围栏。

  • 事务应尽可能短小且频繁提交,避免长时间占用导致 LSO 卡住,增加读延迟。

  • 失败重试要以事务回滚为界,确保回滚后可安全重放。

  • EOS 只覆盖 Kafka 内部的原子性;涉及外部系统,则需要额外使用 Outbox/Saga 等模式。

三 kafka事务的实现

3.1 关键组件

  • 事务生产者:发数据、报告参与分区、发起事务结束(提交/回滚)。

  • 消费组协调器(Group Coordinator):当offset被纳入事务时,消费组协调器需要把最新offset发送到专门存储offset的内部主题__consumer_offsets中, 所以消费者协调器和__consumer_offsets里的对应分区也是事务参与者。

  • 事务协调器(Transaction Coordinator):负责给生产者分配 Producer ID/epoch(每个 transactional.id对应一个PID),维护事务状态机,持久化事务日志,并且当事务结束时(commit 或 abort),事务协调器 会把这个事务的结果(commit/abort 标记)广播到所有该事务涉及的分区)。

  • 数据分区所在的 Broker Leader:接受数据与控制批次写入,维护 High Watermark/Last Stable Offset与中止事务索引。

  • 消费者:根据隔离级别获取数据,包括read_committed 和read_uncommitted ,依赖隔离级别和 abortedTransactions 过滤。

3.2 事务实现流程

下图是kafka事务消息的总体流程图

3.2.1 幂等生产者

  • 协调器为每个 transactional.id 分配 PIDepoch

  • 生产者对每个分区维护单调递增的序列号;Broker 端以 (PID, epoch, seq) 去重,避免“重复写”。

  • 若同一 transactional.id 的新实例启动并 initTransactions(),协调器会提升 epoch 并围栏旧实例;旧实例写入不会成功并且得到 INVALID_PRODUCER_EPOCH/ProducerFencedException

3.2.2 事务状态机与内部日志

  • 事务协调器将每个事务的状态持久化到 __transaction_state
    EMPTY/ONGOING → PREPARE_COMMIT | PREPARE_ABORT → COMPLETE_COMMIT | COMPLETE_ABORT

  • 事务涉及到的分区集合(数据分区与 __consumer_offsets 的目标分区)由生产者在首次写入/首次提交位点时通过
    AddPartitionsToTxn / AddOffsetsToTxn 报告给协调器并持久化。

3.2.3  两阶段提交(2PC)

与传统数据库不同的是,数据分区里只写“结束标记”——COMMIT 或 ABORT 的控制批次;不写 BEGIN。BEGIN 只体现在协调器的内部状态机与日志。信息会包含自己所属的事务producer。

阶段 A:事务进行中(ONGOING)

  1. beginTransaction() 后,生产者向多个分区写入消息(每条携带 PID/epoch/seq)。

  2. 如首次写入某分区,生产者会先向协调器请求 AddPartitionsToTxn,协调器会记录“本事务涉及到这个分区”。

阶段 B:准备提交(PREPARE_COMMIT)/ 准备回滚(PREPARE_ABORT)

  1. 生产者调用 commitTransaction()(或 abortTransaction()),就会发送 EndTxn请求给协调器。

  2. 协调器把事务状态改为 PREPARE_COMMIT(或 PREPARE_ABORT)并写入kafka内内部主题 __transaction_state

  3. 扇出:协调器向所有涉及分区的 leader 发起WriteTxnMarkers请求。

阶段 C:各分区落盘控制记录 + 反馈

  1. 在收到事务协调器的WriteTxnMarkers请求后,各分区在自己的日志里追加一个“控制批次(Control Batch)”,类型为 COMMIT 或 ABORT。注意kafka没有“BEGIN”控制批次,BEGIN 信息由协调器掌

  2. 分区 leader 追加成功后应答协调器。

  3. 当所有目标分区都落成控制批次,协调器将事务状态置为 COMPLETE_COMMIT(或 COMPLETE_ABORT),并更新 __transaction_state

3.3 可见性控制

  • HW(High Watermark):副本多数派确认的最高位移。read_uncommitted 可读到 HW。

  • LSO(Last Stable Offset):保证其之前没有“未决事务”的最末位移
    read_committed,Broker 只返回 ≤ LSO 的数据,从源头屏蔽未提交事务。

  • 为何消费者还能拿到“已中止事务”的数据片段?
    为性能考虑,Broker 可能仍返回包含已中止事务记录的批次,但会携带一个
    abortedTransactions 列表(含 producerIdfirstOffset)。客户端在解码时跳过这些记录

  • 事务索引(.txnindex):每个日志段都有一个中止事务索引,Broker 用它在 Fetch 时快速收集 abortedTransactions 列表。

小结:在 read_committed 下,消费者不用“暂存不确定状态数据”去等控制标记;Broker 通过 LSO 保证不给你发“未决事务”的记录。客户端只需在已决事务里过滤 ABORT 记录(根据 abortedTransactions)。

3.4 消费-处理-生产 模式中消费offset与输出的原子绑定

sendOffsetsToTransaction(offsets, groupMetadata) 背后做了两件事,

AddOffsetsToTxn告诉事务协调器:这次事务会提交哪个消费组的位点

TxnOffsetCommit 把位点写入 __consumer_offsets 对应分区

在最终 COMMIT(或 ABORT)时,__consumer_offsets 分区也会收到相应的 COMMIT/ABORT 控制批次,从而与输出数据一并原子生效(或放弃)。

3.5 常见故障的处理

3.5.1 失败与恢复

  • 如果某些分区暂不可用,协调器会持续重试 WriteTxnMarkers最终一致的 2PC)。

  • 事务超时(由客户端 transaction.timeout.ms 申请,受 broker 上限约束)协调器主动 ABORT 并下发 ABORT 标记。

  • 协调器宕机可通过 __transaction_state 重放恢复事务状态并继续扇出事务标记。

  • 在事物未提交之前,配置了read_committed的消费者不会看到未决事务。

3.5.2 应对僵尸实例

  • Kafka 引入了 Producer Epoch,通过围栏机制来隔离僵尸实例。每个 Producer 在第一次用某个 transactional.id 初始化事务时,Kafka 的 Transaction Coordinator 会给它分配一个 producerIdproducerEpoch。当相同 transactional.id 的新实例启动时,Coordinator 会给它分配 更高的 epoch,并更新元数据。就这样,新实例可以用高 epoch 写数据,而旧实例(僵尸)带着低 epoch 再写数据时,Broker 会直接拒绝。

四 运维和调优要点

事务大小与超时

  • 客户端的 transaction.timeout.ms 受 Broker 端上限约束(如 transaction.max.timeout.ms)。

  • 事务过大或时间过长,会拖慢 LSO 前进,导致 read_committed 消费延迟升高

围栏与异常

  • ProducerFencedException / INVALID_PRODUCER_EPOCH:同一 transactional.id 新实例已接管;旧实例必须停止。

  • TransactionAbortedException:本事务已被中止;需要清理/重启事务。

副本与可靠性

  • 幂等/EOS 通常要求 acks=all 与合适的 min.insync.replicas。避免不干净选主导致重写。

重要监控指标

  • 生产端:transactional.commit.latency.avgtransactional.abort.raterecord-errors/retries

  • Broker:transaction-coordinator-metrics(扇出延迟、超时/中止率)、replica-fetcher-metrics

  • 消费端:records-lag-max(在 read_committed 下对 LSO 滞后敏感)。

主题压缩与控制记录

  • 控制批次(COMMIT/ABORT)是特殊记录,日志清理/压缩会保留其必要语义,确保历史可正确回放。

边界与限制

  • 事务只在同一 Kafka 集群内跨 Topic/分区原子;不跨外部系统

  • 超大事务(大量分区/消息)会放大标记扇出成本与恢复时间。

五 Kafka Streams 中的事务

  • processing.guarantee=exactly_once_v2/exactly_once:Streams 在内部为每个任务(Task)维护事务性生产者,把处理结果与位点绑定到同一事务中;重平衡时靠 epoch 围栏防止旧实例写入。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/95328.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/95328.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RabbitMinQ(模拟实现消息队列项目)

目录 一.消息队列背景 二.需求分析 核心概念: BrokerServer: BrokerServer的核心API: 交换机Exchange: 持久化&#xff1a; 网络通信&#xff1a; 消息应答&#xff1a; 三、模块划分 四、创建项目 五、创建核心类 Exchange: MSGQueue: Binding: Message: 六.…

如何构建StarRocks官方文档

不知道是网络问题还是官网问题&#xff0c;StarRocks文档经常出现卡顿的情况&#xff0c;曾经构建过Flink文档&#xff0c; 所以也想尝试自己构建一个StarRocks的本地官方文档 断断续续折腾了好几天&#xff0c;就不废话了&#xff0c;直接上实际步骤 1. 环境 1.1 Linux环境 …

堡垒机(跳板机)入门指南:构建更安全的多服务器运维架构

随着你的业务不断扩张&#xff0c;你云上服务器的数量&#xff0c;是不是也从一台&#xff0c;变成了三台、五台、甚至一个由几十台机器组成的庞大集群&#xff1f;你像一个尽职的“国王”&#xff0c;为你王国的每一座“城池”&#xff08;每一台服务器&#xff09;&#xff0…

(链表)Leetcode206链表反转+Leetcode6删除链表的倒数第N个结点+虚拟头节点使用

虚拟头结点的作用是&#xff1a;简化插入/删除逻辑方便返回头节点减少边界错误 Leetcode206链表反转 206. 反转链表 - 力扣&#xff08;LeetCode&#xff09; 头插法 # Definition for singly-linked list. # class ListNode(object): # def __init__(self, val0, nextN…

自然语言处理NLP:嵌入层Embedding中input_dim的计算——Tokenizer文本分词和编码

1. 词汇表大小&#xff08;input_dim&#xff09;计算方法 嵌入层Embedding中的input_dim是根据数据中所有唯一词&#xff08;或字&#xff09;的总数来决定的。可以通过Tokenizer文本分词和编码得到。 简单说&#xff0c;Tokenizer 是一个文本分词和编码器&#xff0c;它主要做…

python中的分代垃圾回收机制的原理【python进阶二、2】

1. 分代设计思想Python 将对象按存活时间分为三代&#xff08;Generation 0, 1, 2&#xff09;&#xff1a;0代&#xff08;年轻代&#xff09;&#xff1a;新创建的对象。1代&#xff08;中年代&#xff09;&#xff1a;经历一次GC扫描后存活的对象。2代&#xff08;老年代&am…

【后端】云服务器用nginx配置域名访问前后端分离项目

云服务器有多个服务&#xff08;前端 3000 端口、后端 8288 端口&#xff0c;甚至还有别的服务&#xff09;。希望用户只输入 域名&#xff08;比如 https://example.com&#xff09;&#xff0c;而不是 example.com:3000、example.com:8288。本质上是要做 端口隐藏 域名统一入…

软考中级数据库系统工程师学习专篇(67、数据库恢复)

67、数据库恢复数据库故障恢复中基于检查点的事务分类与处理策略在数据库系统发生故障后的恢复过程中&#xff0c;​检查点&#xff08;Checkpoint&#xff09;​​ 技术是关键机制&#xff0c;它能有效缩小恢复范围&#xff0c;减少需要扫描的日志量&#xff0c;从而加速恢复进…

SpringBoot 分库分表 - 实现、配置与优化

分库分表&#xff08;Database Sharding&#xff09;是一种数据库架构优化技术&#xff0c;通过将数据分散到多个数据库或表中&#xff0c;以应对高并发、大数据量场景&#xff0c;提升系统性能和扩展性。 在 Spring Boot 中&#xff0c;分库分表可以通过框架支持&#xff08;如…

爬虫代理实操:选择可靠的HTTP(S)代理的方法

在爬虫工作里&#xff0c;选对代理协议&#xff08;HTTP/HTTPS&#xff09;只是第一步&#xff0c;更关键的是找到 “可靠” 的代理 —— 哪怕是 HTTPS 代理&#xff0c;若节点不稳定、IP 纯净度低&#xff0c;照样会频繁被封&#xff0c;反而耽误采集进度。这几年踩过不少坑&a…

数据库常见故障类型

数据库常见故障类型数据库系统运行过程中可能发生的故障主要分为以下三类&#xff0c;其破坏性由小到大&#xff1a;故障类型别名根本原因影响范围典型例子​1. 事务故障​逻辑故障事务内部的程序逻辑错误或输入异常。​单个或少量事务。- 输入数据不合法&#xff08;如除零错误…

【Android】Span富文本简介

一&#xff0c;概述android.text包下span体系类&#xff0c;主要指Spanned、Spannable、ParagraphStyle、CharacterStyle实现类。Android通过Span体系&#xff0c;搭建了富文本API&#xff0c;其中Spanned、Spannable实现了CharSequence接口&#xff0c;旨在映射段落start~end之…

【HTML】draggable 属性:解锁网页交互新维度

一、简介 在Web开发中&#xff0c;用户与内容的交互方式直接影响用户体验的深度。在 HTML 中&#xff0c;draggable 是一个全局属性&#xff0c;通过简单配置即可让任意元素实现拖拽功能。也可通过结合 draggable 属性和 JavaScript 事件&#xff0c;可以实现丰富的拖放交互功能…

如何在Github中创建仓库?如何将本地项目上传到GitHub中?

1.1 点击New repository&#xff08;这个是创建代码仓库的意思&#xff09;初次完成后只有一个文件最后&#xff1a;在本地git clone 项目地址然后把项目文件复制到git的文件夹内再提交到远程仓库git add . git commit -m "修改https"git push origin mainmain为分支…

【前端教程】HTML 基础界面开发

一、网站导航栏设计与实现 导航栏是网站的重要组成部分&#xff0c;负责引导用户浏览网站的各个板块。以下是一个实用的导航栏实现方案&#xff1a; 实现代码 HTML 结构&#xff1a; <!DOCTYPE html> <html> <head><meta charset"utf-8" /&…

【学Python自动化】 6. Python 模块系统学习笔记

一、模块基础 什么是模块&#xff1f;包含 Python 定义和语句的 .py 文件解决代码复用和组织问题每个模块有自己的命名空间创建模块示例# fibo.py - 斐波那契模块 def fib(n):"""打印小于n的斐波那契数列"""a, b 0, 1while a < n:print(a, e…

机器学习-时序预测2

门控循环单元GRU 接着机器学习-时序预测1-CSDN博客这个说&#xff0c;GRU是LSTM的一个简化而高效的变体&#xff0c;都使用“门控机制”来控制信息流&#xff0c;但它通过合并一些组件&#xff0c;使结构更简单、参数更少、计算更快&#xff0c;同时在许多任务上性能与 LSTM 相…

数据湖与数据仓库

大数据前沿技术详解 目录 数据湖技术湖仓一体架构数据网格实时流处理技术云原生数据技术数据治理与血缘AI原生数据平台边缘计算与大数据 核心内容包括&#xff1a; 数据湖技术 - 架构模式、技术栈、面临的挑战 湖仓一体架构 - Delta Lake、Iceberg、Hudi等主流实现 数据网格…

Python OpenCV图像处理与深度学习:Python OpenCV入门-图像处理基础

Python OpenCV入门实践&#xff1a;图像处理基础 学习目标 通过本课程&#xff0c;学员们将了解OpenCV的基本概念、安装方法&#xff0c;掌握如何使用Python和OpenCV进行基本的图像处理操作&#xff0c;包括图像的读取、显示、保存以及简单的图像变换。 相关知识点 Python Open…

【lua】Lua 入门教程:从环境搭建到基础编程

Lua 入门教程&#xff1a;从环境搭建到基础编程 Lua 是一种轻量级、可扩展的脚本语言&#xff0c;广泛应用于游戏开发&#xff08;如《魔兽世界》《Roblox》&#xff09;、嵌入式系统、Web 后端等领域。它语法简洁、运行高效&#xff0c;非常适合作为编程入门语言或辅助开发工…