Flink cdc 使用总结

Flink 与 Flink CDC 版本兼容对照表

Flink 版本支持的 Flink CDC 版本关键说明
Flink 1.11.xFlink CDC 1.2.x早期版本,需注意 Flink 1.11.0 的 Bug(如 Upsert 写入问题),建议使用 1.11.1 及以上。
Flink 1.12.xFlink CDC 2.0.x(MySQL 使用 flink-connector-mysql-cdcFlink 1.12.x 支持 CDC 2.0.x,MySQL 使用新版 Connector。
Flink 1.13.xFlink CDC 2.2.x, 2.3.x, 2.4.x2.2.x 起支持 Flink 1.13.x,2.4.x 兼容性更广(支持到 Flink 1.15.x)。
Flink 1.14.xFlink CDC 2.2.x, 2.3.x, 2.4.x同 Flink 1.13.x,需注意 2.4.x 对 1.14.x 的支持。
Flink 1.15.xFlink CDC 2.3.x, 2.4.x2.4.x 是 Flink 1.15.x 的推荐版本,支持增量快照框架。
Flink 1.16.xFlink CDC 2.3.x, 2.4.x2.4.x 支持 Flink 1.16.x,但需注意部分功能可能受限。
Flink 1.17.xFlink CDC 2.5.x 及以上(如 2.5.0)官方未声明 2.4.x 支持 Flink 1.17.x,需升级 Flink CDC 至 2.5+ 或降级 Flink 至 1.15.x。
Flink 2.0.x未明确说明(需参考最新 Flink CDC 文档)Flink 2.0 为新版本,建议关注 Flink CDC 官方文档的最新支持情况。
Flink CDC 3.x仅支持 Flink 1.13.x 及以上(具体版本需看文档)Flink CDC 3.x 是新一代数据集成框架,需与 Flink 1.13+ 配合使用。

1.flink  cdc 的两种使用方式

source:type: mysql-cdchostname: localhostport: 3306username: rootpassword: "123456"database-list: app_dbtable-list: app_db.*scan.startup.mode: initialscan.incremental.snapshot.enabled: truescan.newly-added-table.enabled: truesink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL to Dorisparallelism: 2execution.runtime-mode: STREAMING

./bin/flink-cdc.sh run mysql-to-doris.yaml

2. flink cdc 另一种使用方式

        <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency>

package com.example.demo.cdc;import com.example.demo.ConnectionConstants;
import com.example.demo.deserial.SafeStringKafkaDeserializationSchema;
import com.example.demo.domain.TableData;
import com.example.demo.dynamic.ExtractKafaRowAndTableName;
import com.example.demo.sink.DynamicJdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*;public class FlinkKafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", "192.168.64.141:9092");SafeStringKafkaDeserializationSchema schema = new SafeStringKafkaDeserializationSchema();//CustomKafkaDeserializationSchema schema = new CustomKafkaDeserializationSchema();FlinkKafkaConsumer<ConsumerRecord<String, String>> kafkaSource = new FlinkKafkaConsumer<>("part.t_part", // 匹配所有 testdb 下的表schema,kafkaProps);kafkaSource.setStartFromEarliest();DataStreamSource<ConsumerRecord<String, String>> ds = env.addSource(kafkaSource);ExtractKafaRowAndTableName  extractRowAndTableName = new ExtractKafaRowAndTableName();SingleOutputStreamOperator<TableData> mapStream = ds.map(extractRowAndTableName);JdbcExecutionOptions options = JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(2000).withMaxRetries(2).build();DynamicJdbcSink dynamicJdbcSink = new DynamicJdbcSink(ConnectionConstants.PG_DRIVER_CLSSNAME,ConnectionConstants.PG_URL,ConnectionConstants.PG_USER_NAME,ConnectionConstants.PG_PASSWORD);mapStream.addSink(dynamicJdbcSink);env.enableCheckpointing(5000); // 每 5 秒做一次 checkpointkafkaSource.setCommitOffsetsOnCheckpoints(true);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 最小间隔env.getCheckpointConfig().setCheckpointTimeout(6000); // 超时时间env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 并行数env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.execute("Multi-table CDC to PostgreSQL via DataStream");}
}

使用flink cdc 写代码的时候jar 包方式,你需要考虑不同数据库的序列化和反序列化问题, yaml 方式就是没提供的功能你无法用不够灵活。

一、判断Flink CDC同步完成的常见方法

Flink CDC的同步分为全量同步和增量同步阶段,完成标志如下:

  1. 监控 currentEmitEventTimeLag 指标

    • 这是核心判断依据。该指标表示数据从数据库产生到离开Source节点的时间延迟。
    • 全量同步完成标志:当 currentEmitEventTimeLag 从 ≤0 变为 >0 时,表示已从全量阶段进入增量(Binlog)读取阶段。
    • 原理:全量阶段该指标为0(无延迟),进入增量阶段后延迟值变为正数。
    • 实现:通过Flink的Metrics系统(如Prometheus、Grafana)实时监控该指标。
  2. 检查日志输出

    • 在日志中搜索关键词 BinlogSplitReader is created 或 全量同步结束,这通常标志全量阶段完成。
    • 全量同步完成后,日志会显示Binlog读取开始。
  3. 观察作业状态和指标

    • 作业状态:通过Flink Web UI或API检查Job状态,若为 FINISHED(仅限批处理任务),表示同步完成。
    • 其他指标
      • sourceIdleTime:源空闲时间增加,可能表示无新数据。
      • currentFetchEventTimeLag:类似 currentEmitEventTimeLag,监控数据读取延迟。
  4. 验证目标数据

    • 对比源数据库和目标存储(如数据湖)的数据一致性:
      • 全量同步后,目标数据应包含源数据库的所有记录。
      • 使用数据校验工具(如比对哈希值)确保一致性。

二、为什么不用数据条数判断?

  • 动态性:增量同步中数据持续流入,条数无法作为静态终点。
  • 准确性问题
    • 数据删除、更新可能导致条数波动。
    • 分布式系统中,分片同步可能不同步完成。
  • 替代方案:上述指标和状态监控更实时可靠。

三、实践建议

  • 实时监控:优先使用 currentEmitEventTimeLag,结合Prometheus等工具告警。
  • 自动化验证:在ETL管道中加入数据校验步骤,确保同步质量。
  • 日志审计:定期审查日志,辅助异常排查。

如果您有具体同步场景(如MySQL到数据湖),可进一步优化方案。

Flink中的滑动窗口和滚动窗口

1. 滑动窗口(Sliding Window):

  • 定义:滑动窗口有一个固定的大小,并且可以有重叠。这意味着数据项可能会被包含在一个或多个窗口中。
  • 用途:适用于需要分析一段时间内的趋势或模式的情况,例如计算过去5分钟内每1分钟的数据平均值。
  • 特点
    • 窗口大小和滑动步长可以独立配置。
    • 可能导致较高的计算成本,因为它涉及到更多的窗口操作。

2. 滚动窗口(Tumbling Window):

  • 定义:滚动窗口是滑动窗口的一种特殊情况,其中窗口之间没有重叠(即滑动步长等于窗口大小)。每个数据项只会属于一个特定的窗口。
  • 用途:适合于定期汇总数据的场景,比如每天统计一次用户活动量。
  • 特点
    • 简单易懂,实现起来相对直接。
    • 数据不会跨窗口重复处理,减少了计算负担。

限流熔断机制中的滑动窗口

3. 限流熔断机制中的滑动窗口:

  • 定义:在分布式系统或微服务架构中,为了防止某个服务过载而采取的一种保护措施。这里的滑动窗口通常用于监控请求速率,以便决定是否应该限制请求或触发熔断。
  • 用途:主要用于控制流量、保护下游服务免受突发流量的影响。
  • 特点
    • 主要关注点在于时间间隔内的请求数量或错误率。
    • 实现方式可能包括固定大小的时间桶(buckets),随着时间推移,新的请求会进入最新的时间桶,而旧的时间桶会被丢弃。
    • 目的是快速响应流量变化,提供即时反馈以调整系统的负载能力。

区别总结

  • 应用场景不同:Flink的窗口函数主要用于流处理任务中的数据分析;而限流熔断机制中的滑动窗口则用于保障系统稳定性和可用性。
  • 技术细节差异:Flink中的窗口涉及复杂的数据聚合逻辑,可能跨越多个节点进行分布式计算;相比之下,限流熔断机制中的滑动窗口更注重实时性和效率,通常在单个服务实例内部执行。
  • 目标不同:前者旨在提取有价值的信息,如统计信息、模式识别等;后者的目标是通过限制请求频率来维持系统的健康状态。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/89176.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/89176.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

企业培训笔记:axios 发送 ajax 请求

文章目录axios 简介一&#xff0c;Vue工程中安装axios二&#xff0c;编写app.vue三&#xff0c;编写HomeView.vue四&#xff0c;Idea打开后台项目五&#xff0c;创建HelloController六&#xff0c;配置web访问端口七&#xff0c;运行项目&#xff0c;查看效果&#xff08;一&am…

Maven下载与配置对Java项目的理解

目录 一、背景 二、JAVA项目与Maven的关系 2.1标准java项目 2.2 maven 2.2.1 下载maven 1、下载 2、配置环境 2.2.2 setting.xml 1、配置settings.xml 2、IDEA配置maven 一、背景 在java项目中&#xff0c;新手小白很有可能看不懂整体的目录结构&#xff0c;以及每个…

Mars3d的走廊只能在一个平面的无法折叠的解决方案

问题场景&#xff1a;1. Mars3d的CorridorEntity只能在一个平面修改高度值&#xff0c;无法根据坐标点位制作有高度值的走廊效果&#xff0c;想要做大蜀山盘山走廊的效果实现不了。解决方案&#xff1a;1.使用原生cesium实现对应的走廊的截面形状、走廊的坐标点&#xff0c;包括…

LeetCode 每日一题 2025/7/7-2025/7/13

记录了初步解题思路 以及本地实现代码&#xff1b;并不一定为最优 也希望大家能一起探讨 一起进步 目录7/7 1353. 最多可以参加的会议数目7/8 1751. 最多可以参加的会议数目 II7/9 3439. 重新安排会议得到最多空余时间 I7/10 3440. 重新安排会议得到最多空余时间 II7/11 3169. …

Bash常见条件语句和循环语句

以下是 Bash 中常用的条件语句和循环语句分类及语法说明&#xff0c;附带典型用例&#xff1a;一、条件语句 1. if 语句 作用&#xff1a;根据条件执行不同代码块 语法&#xff1a; if [ 条件 ]; then# 条件为真时执行 elif [ 其他条件 ]; then# 其他条件为真时执行 else# 所有…

uni-app 选择国家区号

uni-app选择国家区号组件 hy-countryPicker 我们在做登录注册功能的时候&#xff0c;可能会遇到选择区号来使用不同国家手机号来登录或者注册的功能。这里我就介绍下我这个uni-app中使用的选择区号的组件&#xff0c;包含不同国家国旗图标。 效果图 别的不说&#xff0c;先来…

客户端主机宕机,服务端如何处理 TCP 连接?详解

文章目录一、客户端主机宕机后迅速重启1、服务端有数据发送2、服务端开启「保活」机制3、服务端既没有数据发送&#xff0c;也没有开启「保活」机制二、客户端主机宕机后一直没有重启1、服务端有数据发送2、服务端开启「保活」机制3、服务端既没有数据发送&#xff0c;也没有开…

《大数据技术原理与应用》实验报告五 熟悉 Hive 的基本操作

目 录 一、实验目的 二、实验环境 三、数据集 四、实验内容与完成情况 4.1 创建一个内部表 stocks&#xff0c;字段分隔符为英文逗号&#xff0c;表结构下所示。 4.2 创建一个外部分区表 dividends&#xff08;分区字段为 exchange 和symbol&#xff09;&#xff0c;字段…

【橘子分布式】Thrift RPC(编程篇)

一、简介 之前我们研究了一下thrift的一些知识&#xff0c;我们知道他是一个rpc框架&#xff0c;他作为rpc自然是提供了客户端到服务端的访问以及两端数据传输的消息序列化&#xff0c;消息的协议解析和传输&#xff0c;所以我们今天就来了解一下他是如何实现这些功能&#xff…

清理C盘--办法

c盘经常爆红1、命令行2、属性3、临时文件

Java-71 深入浅出 RPC Dubbo 上手 父工程配置编写 附详细POM与代码

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有…

创客匠人:创始人 IP 打造的内核,藏在有效的精神成长里

当创始人 IP 成为企业增长的重要引擎&#xff0c;许多人急于寻找 “爆款公式”&#xff0c;却忽略了一个更本质的问题&#xff1a;IP 的生命力&#xff0c;终究源于创始人的精神成长。创客匠人在深耕知识付费赛道的过程中&#xff0c;见证了无数案例&#xff1a;那些能持续实现…

GPT和MBR分区

GPT&#xff08;GUID分区表&#xff09;和MBR&#xff08;主引导记录&#xff09;是两种不同的磁盘分区表格式&#xff0c;用于定义硬盘上分区的布局、位置及启动信息&#xff0c;二者在设计、功能和适用场景上有显著差异。以下从多个维度详细对比&#xff1a; 一、核心定义与起…

c#进阶之数据结构(字符串篇)----String

1、String介绍首先我们得明白&#xff0c;string和String代表的实际上是同一个类型&#xff0c;string是C#中的关键字&#xff0c;代表String类型&#xff0c;因此我们直接来学习String类型。从官方的底层实现代码可以看出&#xff0c;当前String类型实际上就是一个Char类型的聚…

快速排序递归和非递归方法的简单介绍

基本思想为&#xff1a;任取待排序元素序列中 的某元素作为基准值&#xff0c;按照该排序码将待排序集合分割成两子序列&#xff0c;左子序列中所有元素均小于基准值&#xff0c;右 子序列中所有元素均大于基准值&#xff0c;然后最左右子序列重复该过程&#xff0c;直到所有元…

从零开始的云计算生活——第三十二天,四面楚歌,HAProxy负载均衡

目录 一.HAProxy简介 二.HAProxy特点和优点&#xff1a; 三.HAProxy保持会话的三种解决方法 四.HAProxy的balance 8种负载均衡算法 1&#xff09;RR&#xff08;Round Robin&#xff09; 2&#xff09;LC&#xff08;Least Connections&#xff09; 3&#xff09;SH&am…

策略模式及优化

策略模式&#xff08;Strategy Pattern&#xff09;是一种行为设计模式&#xff0c;其核心思想是将算法的定义与使用分离&#xff0c;使算法可以独立于客户端进行变化。它通过定义一系列算法&#xff0c;将每个算法封装到独立的类中&#xff0c;并使它们可以互相替换&#xff0…

微信小程序开发-桌面端和移动端UI表现不一致问题记录

桌面端和移动端UI表现不一致零、引擎说明一、样式不同1、text 单行&#xff1a;1.1 空格开发者工具不展示&#xff0c;手机/PC端正常1.2 正常展示省略号&#xff0c;需要2、点击按钮z-index: -1。webview - 桌面端不行&#xff0c; skyline - 移动端可以&#xff1b;3、其他说明…

极限状态下函数开根号的计算理解(含示意图)

遇到一个挺有意思的题做个记录&#xff1a; 求曲线y (x21)(x2−1)0.5\frac{\left(x^{2}1\right)}{\left(x^{2}-1\right)^{0.5}}(x2−1)0.5(x21)​渐近线的条数 比较明显的x 1是无定义点。但是在求极限的时候发现1和1-得到的极限值似乎不一样。似乎是1是趋向于∞&#xff0c;1…

C++——模版(函数模版和类模版)

C 模板&#xff08;Templates&#xff09;完整介绍模板是 C 中一种强大的泛型编程机制&#xff0c;允许开发者编写与类型无关的代码&#xff0c;从而提高代码的复用性和灵活性。通过模板&#xff0c;可以避免为不同数据类型重复编写相似的函数或类&#xff0c;实现真正的代码复…