Flink CDC—实时数据集成框架

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API),它基于数据库日志的 CDC(变更数据捕获)技术实现了统一的增量和全量数据读取。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

  • ✅ 端到端的数据集成框架
  • ✅ 为数据集成的用户提供了易于构建作业的 API
  • ✅ 支持在 Source 和 Sink 中处理多个表
  • ✅ 整库同步
  • ✅具备表结构变更自动同步的能力(Schema Evolution)

一、如何使用 Flink CDC

Flink CDC 提供了基于 YAML 格式的用户 API,更适合于数据集成场景。以下是一个 YAML 文件的示例,它定义了一个数据管道(Pipeline),该Pipeline从 MySQL 捕获实时变更,并将它们同步到 Apache Doris:

source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2

通过使用 flink-cdc.sh 提交 YAML 文件,一个 Flink 作业将会被编译并部署到指定的 Flink 集群。

二、理解核心概念

1、Data Pipeline

由于Flink CDC中的事件以管道方式从上游流向下游,因此整个ETL任务被称为数据管道。

我们可以使用下面的yaml文件来定义一个简洁的数据管道,描述将MySQL app_db数据库下的所有表同步到Doris:

  source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

 我们可以使用下面的yaml文件定义一个复杂的数据管道,描述将MySQL app_db数据库下的所有表同步到Doris,并给出特定的目标数据库名称ods_db和特定的目标表名称前缀ods_:

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 10 AND order_id > 100description: project fields and filter- source-table: adb.web_order02projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 20 AND order_id > 200description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass

Pipeline 配置: 

支持数据管道级别的以下配置选项:

parametermeaningoptional/required
name管道的名称,将作为作业名称提交到Flink集群。optional
parallelism管道的全局并行性。默认为1。optional
local-time-zone本地时区定义当前会话时区id。optional

2、Data Source

数据源用于访问元数据,并从外部系统读取更改的数据。数据源可以同时从多个表中读取数据。

要描述数据源,需要以下内容:

parametermeaningoptional/required
type数据源的类型,如mysql。required
name数据源的名称,由用户定义(提供默认值)。optional
configurations of Data Source用于构建数据源的配置,例如连接配置和源表属性。optional

source:type: mysqlname: mysql-source   #optional,description informationhost: localhostport: 3306username: adminpassword: passtables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_\.*

3、Data Sink

数据接收器用于应用架构更改并将更改数据写入外部系统。数据接收器可以同时写入多个表。

Parameters 

要描述数据接收器,需要以下内容:

parametermeaningoptional/required
typeThe type of the sink, such as doris or starrocks.required
nameThe name of the sink, which is user-defined (a default value provided).optional
configurations of Data SinkConfigurations to build the Data Sink e.g. connection configurations and sink table properties.optional

Example 

我们可以使用此yaml文件定义doris接收器:

sink:type: dorisname: doris-sink           	# Optional parameter for description purposefenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1      	# Optional parameter for advanced functionalities

4、Table Id

5、Transform 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/81364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ES(ES2023/ES14)最新更新内容,及如何减少内耗

截至2023年10月,JavaScript(ECMAScript)的最新版本是 ES2023(ES14)。 ES2023 引入了许多新特性,如findLast、toSorted等,同时优化了性能。通过减少全局变量、避免内存泄漏、优化循环、减少DOM操作、使用Web Workers、懒加载、缓存、高效数据结构和代码压缩,可以显著降低…

常见的 Python 环境配置问题及解决方案

1. Python 环境配置的常见问题 初学者在配置 Python 环境时,可能会遇到以下几类问题: 1.1 不同版本的兼容性 Python 目前有两个主要版本系列:Python 2.x 和 Python 3.x。Python 2.x 已于 2020 年 1 月 1 日停止维护,因此强烈建…

day20-线性表(链表II)

一、调试器 1.1 gdb(调试器) 在程序指定位置停顿 1.1.1 一般调试 gcc直接编译生成的是发布版(Release) gcc -g //-g调式版本,(体积大,内部有源码)(DeBug&#…

基于Spring Boot+Layui构建企业级电子招投标系统实战指南

一、引言:重塑招投标管理新范式 在数字经济浪潮下,传统招投标模式面临效率低、透明度不足、流程冗长等痛点。本文将以Spring Boot技术生态为核心,融合Mybatis持久层框架、Redis高性能缓存及Layui前端解决方案,构建一个覆盖招标代理…

uniapp -- uCharts 仪表盘刻度显示 0.9999999 这样的值问题处理。

文章目录 🍉问题🍉解决方案🍉问题 在仪表盘上,23.8变成了 23.799999999999997 🍉解决方案 formatter格式化问题 1:在 config-ucharts.js 或 config-echarts.js 配置对应的 formatter 方法 formatter: {yAxisDemo1: function (

git 对于已经追踪,但没有git add 的文件,撤回修改的方法

要撤销对已追踪文件的修改&#xff08;但尚未使用git add添加到暂存区&#xff09;&#xff0c;你可以使用以下几种方法&#xff1a; 1. 使用 git restore (Git 2.23.0及更高版本) 这是较新版本Git中推荐的方式&#xff1a; # 撤销单个文件的修改git restore <file># …

脚本语言Lua

本文来源 &#xff1a;腾讯元宝 Lua是一种轻量级、可嵌入的脚本语言&#xff0c;由巴西里约热内卢天主教大学的Roberto Ierusalimschy、Waldemar Celes和Luiz Henrique de Figueiredo于1993年开发。其设计目标是嵌入应用程序中&#xff0c;提供灵活的扩展和定制功能。 主要特性…

ThingsBoard使用Cassandra部署时性能优化

1、概述 当遇到ThingsBoard设备数量特别多的时候,并且传输数据遥测点量特别大的时候,我们需要调整一下参数来进行优化,使其性能达到最佳的进行快速写入。 注意:以下这些参数再系统部署的时候就需要规划好配置,不能安装好了再二次来进行配置。 2、Cassandra配置参数优化 …

Git Worktree 使用

新入职了一家公司&#xff0c;发现不同项目用的使用一个 git 仓库管理。不久之后我看到这篇文章。 Git 的设计部​​分是为了支持实验。一旦你确定你的工作被安全地跟踪&#xff0c;并且存在安全的状态&#xff0c;以便在出现严重错误时可以恢复&#xff0c;你就不会害怕尝试新…

维智定位 Android 定位 SDK

概述 维智 Android 定位 SDK是为 Android 移动端应用提供的一套简单易用的定位服务接口&#xff0c;为广大开发者提供融合定位服务。通过使用维智定位SDK&#xff0c;开发者可以轻松为应用程序实现极速、智能、精准、高效的定位功能。 重要&#xff1a;为了进一步加强对最终用…

【CSS】使用 CSS 绘制三角形

一、Border 边框法&#xff08;最常用&#xff09; 原理&#xff1a;通过设置元素的宽高为 0&#xff0c;利用透明边框相交形成三角形。 .triangle {width: 0;height: 0;border-left: 50px solid transparent; /* 左侧边框透明 */border-right: 50px solid transparent; /* …

RabbitMQ 快速上手:安装配置与 HelloWorld 实践(一)

一、引言 在当今分布式系统大行其道的技术浪潮下&#xff0c;各个服务之间的通信与协同变得愈发复杂。想象一下&#xff0c;一个电商系统在大促期间&#xff0c;订单服务、库存服务、支付服务、物流服务等众多模块需要紧密配合。如果没有一种高效的通信机制&#xff0c;系统很容…

【deekseek】TCP Offload Engine

是的&#xff0c;TOE&#xff08;TCP Offload Engine&#xff09;通过专用硬件电路&#xff08;如ASIC或FPGA&#xff09;完整实现了TCP/IP协议栈&#xff0c;将原本由CPU软件处理的协议计算任务完全转移到网卡硬件中。其延迟极低的核心原因在于 硬件并行性、零拷贝架构 和 绕过…

JavaScript 的编译与执行原理

文章目录 前言&#x1f9e0; 一、JavaScript 编译与执行过程1. 编译阶段&#xff08;发生在代码执行前&#xff09;✅ 1.1 词法分析&#xff08;Lexical Analysis&#xff09;✅ 1.2 语法分析&#xff08;Parsing&#xff09;✅ 1.3 语义分析与生成执行上下文 &#x1f9f0; 二…

WORD个人简历单页326款模版分享下载

WORD个人简历模版下载&#xff1a;WORD个人简历模版https://pan.quark.cn/s/7e79a822c490

Android 中 显示 PDF 文件内容(AndroidPdfViewer 库)

PDFView 是一个用于在 Android 应用中显示 PDF 文档的库。它提供了丰富的功能和灵活的配置选项&#xff0c;使得开发者能够轻松地在应用中嵌入 PDF 阅读器。 一、 添加依赖 在模块的 build.gradle 文件中添加以下依赖&#xff1a; // pdfimplementation("com.github.bar…

微信小程序学习之搜索框

1、第一步&#xff0c;我们在index.json中引入vant中的搜索框控件&#xff1a; {"usingComponents": {"van-search": "vant/weapp/search/index"} } 2、第二步&#xff0c;直接在index.wxml中添加布局&#xff1a; <view class"index…

OpenCL C++ 常见属性与函数

核心对象与属性 对象/属性描述示例cl::Platform表示OpenCL平台cl::Platform::get(&platforms)cl::Device表示计算设备cl::Device::getDefault()cl::Context管理设备、内存和命令队列的上下文cl::Context(contextDevices)cl::CommandQueue命令队列,用于提交命令cl::Command…

Milvus 视角看重排序模型(Rerankers)

在信息检索和生成式人工智能领域&#xff0c;重排序器是优化初始搜索结果顺序的重要工具。重排序器与传统的嵌入模型不同&#xff0c;它将查询和文档作为输入&#xff0c;并直接返回相似度得分&#xff0c;而不是嵌入。该得分表示输入查询和文档之间的相关性。 重排序器通常在…

C语言:gcc 如何调用 Win32 打开文件对话框 ?

在 Windows 平台上使用 gcc 调用原生 Win32 API 实现文件打开对话框是可行的&#xff0c;但需要直接使用 Win32 的 GetOpenFileName 函数&#xff08;位于 commdlg.h 头文件&#xff0c;依赖 comdlg32.lib 库&#xff09;。以下是完整实现步骤和代码示例&#xff1a; 编写 file…