深入解析 Apache Flink FLIP-511:优化 Kafka Sink 事务处理,减轻 Broker 负载

一、 背景与核心问题:Kafka Sink 事务的痛点

Flink Kafka Sink 在 Exactly-Once 模式下依赖 Kafka 事务来确保数据写入的原子性,并与 Flink 检查点对齐。然而,非优雅关闭(如任务失败、非 stop-with-savepoint 的停止)会导致 “滞留事务”。这些滞留事务在 Kafka 中会:

  1. 阻塞消费者 (READ_COMMITTED):阻碍消费进度 (LSO)。
  2. 阻碍数据卸载和主题压缩
  3. 最关键的是:Kafka Broker 会在内存中保留每个事务的元数据 长达 7 天

1、旧方案

探测式事务恢复 (INCREMENTING + PROBING) 的致命缺陷

Flink 原有的恢复机制基于“探测”:

  • 事务 ID 命名规则: transactionalIdPrefix-subtaskId-checkpointId (每个检查点生成唯一 ID)。
  • 恢复逻辑: 根据检查点状态,尝试初始化并提交/中止可能滞留的事务 ID(按检查点 ID 和子任务 ID 维度递增探测)。探测到 epoch > 0 表示事务滞留需中止。

该方案存在两大严重问题:

  1. Kafka Broker 内存爆炸性增长:
    • 高检查点频率(如 1 分钟)结合唯一 ID 策略,导致海量短期事务 ID
    • 计算:7天 * 24小时 * 60分钟 * 并行度 ≈ 10080 * 并行度 个 ID 需在 Broker 内存保留 7 天。
    • 这是 Kafka 设计(预期 ID 重用)与 Flink 实现(唯一 ID)的根本冲突,给 Broker 带来巨大且不必要的内存压力 (FLINK-34554)。
  2. 恢复时间不可预测与“探测爆炸”:
    • 在连续重启失败(无法完成新检查点)的最坏情况下,每次重启探测的 ID 范围会指数级扩大(每次约 3 倍)。
    • 恢复时间可能变得非常长且难以预估。
    • 虽然成功检查点能重置此问题,但重启循环本身已表明系统存在其他问题,此机制会雪上加霜。

二、 FLIP-511 解决方案:池化 ID 与精准清理

提案的核心是摒弃唯一 ID 策略,改为重用有限数量的事务 ID,并利用 Kafka 3.0+ 的 ListTransactions API 实现精准的事务状态查询和清理。

1、新方案核心机制 (POOLING + LISTING)

1、事务 ID 命名与池化管理 (POOLING):

  • 格式仍为 <prefix>-<subtask id>-<counter>,但 counter 是动态递增的整数
  • Writer (写入器) 职责:
    • 启动: 创建一个新事务(分配新 ID 或复用池中可用 ID),开始写入。存储当前使用的 ID 到状态
    • 检查点 (snapshotState): 将当前活跃事务 finalize 并传递给 Committer。立即开启一个新事务(分配新 ID 或复用)。存储所有已开始但未最终释放(提交/中止/复用)的 ID 到状态
    • 检查点完成通知 (notifyCheckpointComplete): 收到 Committer 成功提交某事务 ID 的通知后,将该 ID 标记为可用并放入池中复用
    • 状态合并/清理 (snapshotState/initializeState): 在后续检查点或恢复时,清理已确认完成的事务 ID 状态,回收其计数器或标记 ID 可用。
    • 关闭: 中止当前活跃事务。
  • Committer (提交器) 职责:
    • 接收 Writer 传递的需要提交的事务 ID 信息。
    • 执行 commitTransaction。成功后将 ID 释放通知回 Writer(通过回调或状态更新)。

2、精准恢复利用 ListTransactions API (LISTING):

  • 恢复启动时:
    1. 查询: 调用 Kafka AdminClient 的 ListTransactions API,获取 Kafka Broker 上所有属于该 Sink 的 未完成 (Open) 事务
    2. 对比: 从 Flink 状态中恢复出需要重新提交的事务 ID 列表(即上次运行中已 finalize 但可能未提交的事务)。
    3. 清理: 精准中止所有在 ListTransactions 结果中但 不在 需重新提交列表中的 Open 事务。这些是真正的“滞留垃圾事务”。
  • 重新提交: Committer 重新提交状态中记录的待提交事务 ID。幂等操作,已提交的事务会静默成功。

2、新方案的优势

  • 大幅减少 Broker 内存占用:
    • 预期 ID 数量 ≈ 3 * 并行度 (1 Writer 活跃事务 + 1-2 个等待/提交中事务)。
    • 相比旧方案(可能数万/数十万 ID),减少 2-3 个数量级。即使临时峰值到 100 个 ID,影响也远小于旧方案。
  • 稳定且快速的恢复:
    • 无需复杂探测逻辑,恢复时间确定且快速
    • 彻底消除“探测爆炸”问题。
  • 更健壮: 直接依赖 Kafka API 查询事务状态,逻辑更清晰可靠。
  • 资源效率提升: 减少了网络交互(探测)和状态管理开销。

三、 公共接口与配置变更

提案引入了灵活的配置选项,允许用户选择策略:

public class KafkaSinkBuilder<IN> {...public KafkaSinkBuilder<IN> setTransactionNamingStrategy(TransactionNamingStrategy transactionNamingStrategy);// 设置命名策略}public class KafkaConnectorOptions {...public static final ConfigOption<TransactionNamingStrategy> TRANSACTION_NAMING_STRATEGY =ConfigOptions.key("sink.transaction-naming-strategy").enumType(TransactionNamingStrategy.class).defaultValue(TransactionNamingStrategy.DEFAULT);// 表/SQL 选项}@PublicEvolving
public enum TransactionNamingStrategy {
// 旧行为:递增唯一ID + 探测恢复 (INCREMENTING + PROBING)INCREMENTING(...),
// 新行为:池化ID + ListTransactions恢复 (POOLING + LISTING)POOLING(...);public static final TransactionNamingStrategy DEFAULT = INCREMENTING;// 初始默认值}
  • sink.transaction-naming-strategy:核心配置项,可选 INCREMENTING (旧) 或 POOLING (新)。
  • 默认值:初始版本保持 INCREMENTING 以确保行为一致性和向后兼容性。用户需显式启用 POOLING 以使用新特性。
  • 设计考量:使用 enum 为未来可能的其他策略(如静态池 STATIC_POOL)预留了扩展空间。

四、 实现关键点与兼容性

  1. 状态扩展:
    • Writer State:需要扩展以存储 当前活跃事务 ID 和 所有已开始但尚未释放(等待提交确认或复用)的事务 ID 列表。这是实现 ID 池化和精准恢复的基础。
  2. 策略抽象:
    • 将事务 ID 生成 (TransactionNamingStrategyImpl) 和滞留事务中止 (TransactionAbortStrategyImpl) 逻辑解耦并抽象为策略模式。
    • 现有代码重构为 INCREMENTING (命名) + PROBING (中止)。
    • 新增 POOLING (命名) + LISTING (中止)。
  3. Kafka 版本依赖:
    • LISTING 策略强依赖 Kafka Broker 3.0+ 提供的 ListTransactions API。使用前需确保集群版本满足要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/91414.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/91414.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式:组合模式 Composite

目录前言问题解决方案结构代码前言 组合是一种结构型设计模式&#xff0c;你可以使用它将对象组合成树状结构&#xff0c;并且能像使用独立对象一样使用它们。 问题 如果应用的核心模型能用树状结构表示&#xff0c; 在应用中使用组合模式才有价值。 例如&#xff0c; 你有两…

嵌入式 C 语言入门:函数封装与参数传递学习笔记 —— 从定义到内存机制

前言 大家好&#xff0c;这里是 Hello_Embed。在前一篇笔记中&#xff0c;我们用循环实现了 LED 闪烁&#xff0c;其中重复使用了两段几乎一样的延时代码&#xff1a; for(i 0; i < 100000000; i); // 延时这种重复不仅让代码冗余&#xff0c;还不利于后续修改&#xff08…

第一个大语言模型的微调

模型推理 现在,我们的模型应该能够针对输入的任何短句生成类似尤达大师风格的句子作为回应。 该模型要求其输入格式规范。我们需要构建一个 “消息” 列表 —— 在这个案例中,就是来自用户的消息 —— 并通过提示表明轮到模型进行输出,以促使其做出回答。 add_generation…

Linux内核驱动开发核心问题全解

&#x1f4d6; 推荐阅读&#xff1a;《Yocto项目实战教程:高效定制嵌入式Linux系统》 &#x1f3a5; 更多学习视频请关注 B 站&#xff1a;嵌入式Jerry Linux内核驱动开发核心问题全解 本文系统梳理了 Linux 驱动开发、内核同步、中断处理、内存管理、进程通信、系统启动等典型…

【C++篇】C++11入门:踏入C++新世界的大门

文章目录C11简介列表初始化1. {}初始化2. initializer_list容器initializer_list的使用场景声明1. auto2. decltype3. nullptrSTL中的变化1. 新容器array容器forward_list容器unordered_map和unordered_set容器2. 新接口C11简介 C98/03&#xff1a;在2003年C标准委员会曾经提交…

Java 日期时间处理:分类、用途与性能分析

Java提供了多种日期时间处理API&#xff0c;随着版本演进不断改进。以下是主要日期时间类的分类、用途和性能分析&#xff1a;一、Java日期时间API分类1. 传统日期时间API (Java 1.0/1.1)java.util.Date - 表示特定的瞬间&#xff0c;精确到毫秒java.util.Calendar - 抽象类&am…

[Linux]学习笔记系列 --GCC

文章目录属性__cleanup__attribute_malloc__ 用于标记函数返回一个新分配的内存块__attribute_alloc_size__ 用于指定分配的内存大小__attribute__((const)) 标记为纯函数(pure function)__attribute__((__externally_visible__)) 使其在编译器优化过程中保持对外部模块的可见性…

【龙泽科技】汽车维护与底盘拆装检修仿真教学软件【风光580】

产品简介汽车维护与底盘拆装检修仿真教学软件是依托《全国职业院校技能大赛》“汽车维修”赛项中“汽车维护与底盘拆装检修模块”竞赛模块&#xff0c;自主开发的一款仿真教学软件。软件采用仿真仿真技术模拟实际汽车维修工的岗位技能操作流程&#xff0c;操作内容主要包括&…

Spring之【循环引用】

目录前置知识SingletonBeanRegistryDefaultSingletonBeanRegistrySpring中处理循环引用的流程分析定义两个具有循环引用特点的Bean执行A的实例化执行A的属性填充(执行过程中发现A依赖B&#xff0c;就去执行B的实例化逻辑)执行B的实例化执行B的属性填充执行B的初始化执行A的属性…

LRU缓存淘汰算法的详细介绍与具体实现

LRU&#xff08;Least Recently Used&#xff0c;最近最少使用&#xff09;是一种基于时间局部性原理的缓存淘汰策略。其核心思想是&#xff1a;最近被访问的数据在未来更可能被再次使用&#xff0c;而最久未被访问的数据应优先被淘汰&#xff0c;从而在有限的缓存空间内保留高…

JS-第十九天-事件(一)

一、事件基础概念1.1 事件三要素事件源&#xff1a;触发事件的元素事件类型&#xff1a;事件的种类&#xff08;如click、mouseover等&#xff09;事件处理程序&#xff1a;响应事件的函数1.2 事件流机制事件传播分为三个阶段&#xff1a;捕获阶段&#xff1a;事件从顶层开始&a…

Matplotlib(三)- 图表辅助元素

文章目录一、图表辅助元素简介二、坐标轴的标签、刻度范围和刻度标签1. 坐标轴标签1.1 x轴标签1.2 y轴标签1.3 示例&#xff1a;绘制天气气温折线图2. 刻度范围和刻度标签2.1 刻度范围2.1.1 x轴刻度范围2.1.2 y轴刻度范围2.2 刻度标签2.2.1 x轴刻度标签2.2.2 y轴刻度标签2.3 示…

【Linux基础知识系列】第七十八篇 - 初识Nmap:网络扫描工具

在网络管理和安全领域&#xff0c;网络扫描是一个不可或缺的工具。它可以帮助网络管理员了解网络中的设备、服务以及潜在的安全漏洞。Nmap&#xff08;Network Mapper&#xff09;是一个功能强大的开源网络扫描工具&#xff0c;它能够快速发现网络中的主机、端口和服务&#xf…

EasyGBS的两种录像回看

EasyGBS 支持两种录像回看&#xff0c;即“平台端”的录像回看和“设备端”的录像回看。本期我们来介绍两者的区别和使用方法。一、平台端录像1、什么是平台端录像平台端录像是指由 EasyGBS 平台直接录制并存储。2、配置平台端录像进入平台&#xff0c;依次点击【录像回放】→【…

大模型学习思路推荐!

为进一步贯彻落实中共中央印发《关于深化人才发展体制机制改革的意见》和国务院印发《关于“十四五”数字经济发展规划》等有关工作的部署要求&#xff0c;深入实施人才强国战略和创新驱动发展战略&#xff0c;加强全国数字化人才队伍建设&#xff0c;持续推进人工智能从业人员…

数据库连接池性能优化实战

背景我们公司正在处于某个项目的维护阶段&#xff0c;领导对资源告警比较重视&#xff0c;服务器资源告警的就不说了&#xff0c;运维同学每隔一小时都会检测线上环境的应用服务信息&#xff0c;例如&#xff1a;网关日志响应时间告警/nginx日志接口响应时间告警/日志关键字异常…

Excel常用函数大全,非常实用

一、数学与统计函数1. SUM作用&#xff1a;求和SUM(number1, [number2], ...)SUM(A1:A10) ➔ 计算A1到A10单元格的总和注意&#xff1a;自动忽略文本和空单元格2. AVERAGE作用&#xff1a;计算平均值AVERAGE(number1, [number2], ...)AVERAGE(B2:B20) ➔ 计算B列20个数据的平均…

性能优化(一):时间分片(Time Slicing):让你的应用在高负载下“永不卡顿”的秘密

性能优化(一)&#xff1a;时间分片&#xff08;Time Slicing&#xff09;&#xff1a;让你的应用在高负载下“永不卡顿”的秘密 引子&#xff1a;那张让你浏览器崩溃的“无限列表” 想象一个场景&#xff1a;你需要渲染一个包含一万个项目的列表。在我们的“看不见”的应用中&a…

《C++》STL--list容器详解

在 C 标准模板库(STL)中&#xff0c;list 是一个非常重要的序列容器&#xff0c;它实现了双向链表的数据结构。与 vector 和 deque 不同&#xff0c;list 提供了高效的插入和删除操作&#xff0c;特别是在任意位置。本文将深入探讨 list 容器的特性、使用方法以及常见操作。 文…

Day 28:类的定义和方法

DAY 28 类的定义和方法 知识点学习 1. 类的定义 在Python中&#xff0c;类是创建对象的模板。使用class关键字来定义一个类。类名通常采用首字母大写的命名方式&#xff08;PascalCase&#xff09;。 # 最简单的类定义 class MyClass:pass # 使用pass占位符类的定义就像是…