基于Flink 1.20、StarRocks与TiCDC构建高效数据处理链路教程

在大数据处理领域,实现高效、实时的数据处理与分析至关重要。Flink作为强大的流批一体化计算框架,结合StarRocks这一高性能的实时分析型数据库,再搭配TiCDC(TiDB Change Data Capture)用于捕获数据变更,能够构建出极为高效的数据处理链路。本教程将详细介绍如何利用这些技术实现从MySQL数据源抽取数据,经Flink处理后写入StarRocks的完整流程,并对相关表结构和字段进行合理抽象与调整,以保障数据处理的通用性与安全性。

一、技术简介

Flink 1.20

Flink 1.20是Apache Flink的一个重要版本,它进一步强化了流批一体的计算能力。在流处理方面,其能够以低延迟处理大规模的实时数据流;而在批处理场景下,也具备高效的性能表现。Flink提供了丰富的连接器(Connector),方便与各类数据源和数据存储系统进行对接,同时支持使用SQL进行数据处理操作,大大降低了开发成本,提升了开发效率。

StarRocks

StarRocks是一款高性能的实时分析型数据库,采用MPP(Massively Parallel Processing)架构,能够对海量数据进行亚秒级的查询分析。它支持多种数据模型,包括聚合模型、主键模型等,适用于各类数据分析场景,如报表生成、实时看板、即席查询等。StarRocks通过其高效的存储和查询引擎,以及对多种数据格式的支持,为数据的快速分析提供了有力保障。

TiCDC

TiCDC是TiDB生态中的数据变更捕获工具,它基于TiDB的分布式事务和MVCC(Multi-Version Concurrency Control)机制,能够实时捕获TiDB数据库中的数据变更,包括增、删、改操作。TiCDC将这些变更数据以有序的方式输出,为数据同步、实时数据处理等场景提供了可靠的数据源。在本教程中,虽然我们主要从MySQL数据源抽取数据,但TiCDC的原理和应用思路可作为扩展参考,在涉及TiDB数据源时能够快速迁移应用。

二、环境准备

安装与配置Flink 1.20

  1. 下载Flink 1.20.0:通过curl命令下载安装包,执行 curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
  2. 解压文件:使用命令 tar -xzvf flink-1.20.0-bin-scala_2.12.tgz 解压下载的压缩包。
  3. 移动到目标目录(可选):可将解压后的Flink目录移动到 /opt 或其他目标位置,例如执行 sudo mv flink-1.20.0 /opt/flink
  4. 配置环境变量:编辑 ~/.bashrc 文件,添加如下内容:
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出文件后,运行 source ~/.bashrc 使修改生效。
5. 配置Flink:Flink默认已配置一些基本设置。若无需集群配置,可跳过 mastersworkers 文件的配置。如需调整参数,如内存配置或其他作业配置,可修改Flink配置文件 config.yaml,该文件位于 /opt/flink/conf 目录下。例如,将 bind-host 设置从 localhost 改为 0.0.0.0,使Flink能够绑定所有网络接口,修改如下:

jobmanager:bind-host: 0.0.0.0
rpc:address: 0.0.0.0port: 6123
memory:process:size: 1600m
execution:failover-strategy: region
taskmanager:bind-host: 0.0.0.0host: 0.0.0.0numberOfTaskSlots: 1
memory:process:size: 1728m
parallelism:address: 0.0.0.0bind-address: 0.0.0.0
  1. 启动Flink:进入Flink目录,执行 ./bin/start-cluster.sh 启动Flink。若要关闭Flink,执行 ./bin/stop-cluster.sh。启动后,可通过浏览器访问Flink Web UI,默认地址为 http://<your_server_ip>:8081(例如 http://192.168.1.1:8081),以查看Flink集群的状态、提交作业等。

安装与配置StarRocks

  1. 下载与部署:从StarRocks官方网站获取安装包,按照官方文档指引进行下载与解压操作。根据实际的生产环境需求,选择合适的部署方式,如单节点部署用于测试环境,集群部署用于生产环境。
  2. 配置参数:在StarRocks的配置文件中,对一些关键参数进行设置,如FE(Frontend)节点的内存分配、BE(Backend)节点的存储路径等。例如,在FE节点的 fe.conf 文件中设置 query_mem_limit = 2147483648 来限制查询内存,在BE节点的 be.conf 文件中设置 storage_root_path = /data/starrocks/be 来指定存储路径。
  3. 启动服务:分别启动FE和BE节点,确保各个节点正常运行且相互通信正常。启动后,可通过MySQL客户端连接到StarRocks,验证其是否正常工作,例如执行 mysql -h <starrocks_fe_host> -P 9030 -u root -p

配置MySQL数据源

  1. 开启Binlog:确保MySQL开启了Binlog功能,在MySQL配置文件(通常为 my.cnfmy.ini)中,添加或修改如下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

修改完成后,重启MySQL服务使配置生效。
2. 创建测试表:在MySQL中创建用于测试的数据表,例如创建一个名为 example_table 的表,表结构如下:

CREATE TABLE example_table (id BIGINT NOT NULL,data_column_1 VARCHAR(255),data_column_2 INT,PRIMARY KEY (id)
);

向表中插入一些测试数据,以便后续进行数据同步与处理测试。

三、表结构设计与调整

StarRocks表结构设计

在StarRocks中创建用于存储数据的表,以用户标签相关数据存储为例,设计如下表结构:

CREATE TABLE table_demo (id BIGINT NOT NULL COMMENT '主键',sign CHAR(32) NOT NULL COMMENT '签名',shop_id BIGINT NOT NULL COMMENT 'shopID',shop_type BIGINT NOT NULL COMMENT '类型',user_id BIGINT NULL COMMENT 'userID',create_time DATETIME NULL COMMENT '记录创建时间',operation_type VARCHAR(20) COMMENT '操作类型',row_change_type VARCHAR(20) COMMENT '行变更类型'
) ENGINE=OLAP
PRIMARY KEY (id)
COMMENT '用户商品表'
DISTRIBUTED BY HASH(`id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","bloom_filter_columns" = "shop_id, user_id","in_memory" = "false","storage_format" = "DEFAULT","enable_persistent_index" = "false","compression" = "LZ4"
);

该表结构设计充分考虑了数据的存储与查询需求,通过主键约束、哈希分布以及相关属性设置,保障数据的高效存储与查询性能。

Flink中MySQL CDC表结构定义

在Flink中通过MySQL CDC连接器读取MySQL数据时,定义如下表结构:

CREATE TABLE mysql_cdc_example (id BIGINT,sign STRING COMMENT '签名',shop_id BIGINT COMMENT 'shopID',shop_type BIGINT COMMENT '类型',user_id BIGINT COMMENT 'userID',create_time TIMESTAMP(0),operation_type STRING COMMENT '业务操作字段',operation_timestamp TIMESTAMP_LTZ(3) METADATA FROM 'operation_timestamp' VIRTUAL,row_change_type STRING METADATA FROM 'row_change_type' VIRTUAL,PRIMARY KEY (`id`) NOT ENFORCED
)
WITH 
('connector' ='mysql-cdc','hostname' = '192.168.0.1','port' = '3306','database-name' = 'your_database_name','table-name' = 'example_table','username' = 'your_username','password' = 'your_password','debezium.snapshot.mode' = 'initial'
);

该表结构定义与StarRocks中的目标表结构相对应,同时通过WITH参数配置了MySQL CDC连接器的相关信息,包括数据源地址、端口、数据库名、表名、用户名、密码以及快照模式等。

Flink中StarRocks Sink表结构定义

在Flink中定义用于将处理后数据写入StarRocks的Sink表结构如下:

CREATE TABLE starrocks_sink_example (id BIGINT PRIMARY KEY NOT ENFORCED,sign STRING,shop_id BIGINT,shop_type BIGINT,user_id bigint,create_time STRING,operation_type STRING,row_change_type STRING
) 
WITH 
('connector'='starrocks','sink.max-retries'='5','jdbc-url' = 'jdbc:mysql://192.168.0.1:9030/your_database_name?useUnicode=true&characterEncoding=utf-8&useSSL=false&connectTimeout=3000&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&&serverTimezone=Asia/Shanghai&sessionVariables=query_timeout=86400','load-url'='192.168.0.1:8030','table-name' = 'table_demo','username'='your_username','password'='your_password','sink.buffer-flush.interval-ms'='5000','sink.parallelism' = '2','database-name'='your_database_name'
);

此Sink表结构与StarRocks中的目标表结构一致,通过WITH参数配置了StarRocks连接器的相关信息,如JDBC URL、Load URL、表名、用户名、密码、缓冲刷新间隔以及并行度等,确保Flink能够将处理后的数据准确高效地写入StarRocks。

四、数据同步与处理流程

使用Flink SQL进行数据抽取与转换

  1. 配置Flink SQL环境:在Flink的SQL客户端或相关集成开发环境中,配置好Flink SQL的运行环境,确保能够执行SQL语句对数据进行操作。
  2. 编写数据抽取与转换SQL:编写SQL语句从MySQL CDC表中抽取数据,并进行必要的转换操作,例如将时间格式进行转换、根据业务规则对某些字段进行计算等。以下是一个简单的示例,将 create_time 字段从 TIMESTAMP 类型转换为字符串类型,并根据 operation_typerow_change_type 字段确定最终的操作类型:
INSERT INTOstarrocks_sink_example
SELECTid,sign,shop_id,shop_typeuser_id,cast(create_time as CHAR) as create_time,CASE WHEN operation_type = 'DELETE' THEN 'DELETE'WHEN row_change_type = '+I' THEN 'INSERT'WHEN row_change_type IN ('-U', '+U') THEN 'UPDATE'WHEN row_change_type = '-D' THEN 'DELETE'ELSE 'UNKNOWN'END AS operation_type,row_change_type 
FROMmysql_cdc_example;

该SQL语句从 mysql_cdc_example 表中读取数据,对 create_time 字段进行类型转换,并根据不同的变更类型确定最终的 operation_type,然后将处理后的数据插入到 starrocks_sink_example 表中。

使用Routine Load进行数据实时摄入(以Kafka数据源为例)

  1. 配置Kafka数据源:在Kafka中创建用于存储数据变更的主题,确保数据源能够正常向该主题发送数据。例如,创建一个名为 user_table_changes 的主题。
  2. 创建StarRocks的Routine Load任务:在StarRocks中创建Routine Load任务,用于实时消费Kafka主题中的数据并写入到StarRocks表中。以下是一个示例:
CREATE ROUTINE LOAD your_load_job_name ON table_demo
COLUMNS (id,sign,shop_id,shop_type,user_id,create_time,operation_type,row_change_type,temp_operation_type=IF(operation_type = 'DELETE', 'DELETE', IF(operation_type = 'UPDATE', 'UPSERT', 'APPEND'))
)
PROPERTIES ("desired_concurrent_number" = "1","max_batch_interval" = "10","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM
KAFKA ("kafka_broker_list" = "192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092","kafka_topic" = "user_table_changes","property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

该Routine Load任务配置了从Kafka主题 user_table_changes 中读取数据,按照指定的列映射关系写入到 user_table_mapping 表中,并设置了相关的属性,如期望的并发数、最大批次间隔、最大批次行数、最大批次大小、严格模式以及数据格式等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/87827.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/87827.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

便捷的Office批量转PDF工具

软件介绍 本文介绍的软件是一款能实现Office批量转换的工具&#xff0c;名为五五Excel word批量转PDF。 软件小巧 这款五五Excel word批量转PDF软件大小不到2M。 操作步骤一 使用该软件时&#xff0c;只需把软件和需要转换的Word或Excel文件放在同一个文件夹里。 操作步骤…

tcp长连接与短连接

TCP连接本身是一个传输层协议&#xff0c;它既可以实现长连接&#xff0c;也可以实现短连接。这取决于应用层的使用方式。 短连接&#xff08;Short Connection&#xff09; 特点&#xff1a;每次请求都建立新的TCP连接&#xff0c;完成后立即关闭流程&#xff1a;建立连接 →…

llvm polly,亲自测试

1&#xff09;下载并安装 Polly - Getting Started git clone https://github.com/llvm/llvm-project.git 大概需要半个小时&#xff0c;有时候被墙掉就打不开 2&#xff09; mkdir build && cd build cmake -DLLVM_ENABLE_PROJECTSclang;polly ../llvm cmake --b…

Spring AI 项目实战(十四):Spring Boot + Vue3 +AI + DeepSeek 实现空气质量智能预测系统(附完整源码)

系列文章 序号文章名称1Spring AI 项目实战(一):Spring AI 核心模块入门2Spring AI 项目实战(二):Spring Boot + AI + DeepSeek 深度实战(附完整源码)3Spring AI 项目实战(三):Spring Boot + AI + DeepSeek 打造智能客服系统(附完整源码)4

腾讯云 CDN 不支持 WebSocket 的现状与华为云 CDN 的替代方案-优雅草卓伊凡

腾讯云 CDN 不支持 WebSocket 的现状与华为云 CDN 的替代方案-优雅草卓伊凡 问题背景 卓伊凡今天发现&#xff0c;腾讯云 CDN 不支持 WebSocket 协议&#xff0c;而公司的部分业务&#xff08;如实时聊天、在线协作、游戏互动、股票行情推送等&#xff09;依赖长连接通信。昨…

MybatisPlus(一)扩展功能

扩展功能 一、静态工具二、逻辑删除三、通用枚举1、定义枚举2、配置枚举处理器3、测试 四、JSON类型处理器1、定义实体2、使用类型处理器 五、分页1、配置分页插件2、分页API3、示例 一、静态工具 有的时候Service之间也会相互调用&#xff0c;为了避免出现循环依赖问题&#…

Redis哨兵模式之Sentinel模式(二)

一、多节点哨兵如何配置&#xff1f; 哨兵配置原理图 注意&#xff1a;sentinel哨兵模式的搭建是建立在redis主从复制节点配置基础而搭建&#xff0c;在主从配置中从库需要配置好replicaof关联上主库并关闭安全模式&#xff0c;然后设置好bind端口才能关联上机器&#xff0c;而…

基于Excel的数据分析思维与分析方法

数据分析一定要会Excel、SQL和Python&#xff1f;非常肯定地回答您&#xff0c;Python、R语言、Excel函数和VBA&#xff0c;以及高级数据分析软件&#xff0c;都学不到&#xff0c;您将学到&#xff1a;5个有效的数据分析利器&#xff0c;以及分析思维 一、描述性统计分析 在…

计算机网络笔记(不全)

一、计算机网络体系结构1.计算机网络的概念计算机网络&#xff1a;由若干结点和连接这些结点的链路组成。结点可以是计算机、集线器、交换机、路由器等。互连网(internet)&#xff1a;多个计算机网络通过路由器互相连接而成&#xff0c;可用任意协议通信。互联网(因特网Interne…

XML Schema 复合元素

XML Schema 复合元素 引言 XML(可扩展标记语言)作为一种灵活的标记语言,广泛应用于数据交换和存储。XML Schema 是一种用于描述和定义 XML 文档结构的语言,它定义了 XML 文档的元素、属性、类型和约束。本文将详细介绍 XML Schema 中的复合元素,并探讨其在实际应用中的重…

华为云Flexus+DeepSeek征文 | 弹性算力实战:Flexus X实例自动扩缩容策略优化

华为云FlexusDeepSeek征文 | 弹性算力实战&#xff1a;Flexus X实例自动扩缩容策略优化 &#x1f31f; 嗨&#xff0c;我是IRpickstars&#xff01; &#x1f30c; 总有一行代码&#xff0c;能点亮万千星辰。 &#x1f50d; 在技术的宇宙中&#xff0c;我愿做永不停歇的探索者…

【仓颉】运行环境配置VSCode + Win11

作者&#xff1a;大李子 团队&#xff1a;坚果派 十年iOS&#xff0c;All in转鸿蒙 前言 “仓颉编程语言是一款面向全场景智能的新一代编程语言&#xff0c;主打原生智能化、天生全场景、高性能、强安全。融入鸿蒙生态&#xff0c;为开发者提供良好的编程体验。” ——摘自仓…

【K线训练软件研发历程】【日常记录向】1.K线滑动窗口

文章目录 当前效果未来发展思路技术选型值得分享的技术点数据加载、解析的代码echats的代码当前效果 👆相当于有个hello world了。 未来发展思路 开源 技术选型 界面直接采用electron,等开源后,可以直接挂release,用户下载安装包后,一键安装,一键运行,降低使用门槛…

抖音解析下载工具 v1.0.0:免安装单文件,一键无水印保存高清视音频

宝子们&#xff0c;今天给你们带来一款超轻量的抖音下载神器——抖音解析下载工具 v1.0.0。 它只有单文件&#xff0c;双击就能用&#xff0c;免安装、无广告、完全免费&#xff0c;复制粘贴链接即可一键解析下载高清无水印视频/音频&#xff0c;简直不要太方便&#xff01; 为…

Ingress——2

目录 ‌一. 域名重定向&#xff08;HTTP→HTTPS/旧域名跳转&#xff09;‌ ‌二. 前后端分离Rewrite&#xff08;路径改写&#xff09;‌ ‌三. 混合配置示例&#xff08;重定向Rewrite&#xff09;‌ ‌四. SSL/TLS配置&#xff08;HTTPS加密&#xff09;‌ ‌五. 基本认…

12. grafana-Dashboard的Variable(过滤)使用

说明制作这样一个选择过滤的下拉框&#xff0c;可以选择某个服务器的步骤1. 点击最上面的Dashboard settings2. 选择Variables 并点击ADD variable3. 写出过滤的标签名和查询条件&#xff08;label_values(查询条件)&#xff09;4. 点击 save as... 保存退出5. 出来后左上角就…

Cursor一键续杯pro教程,支持最新1.0系列版本

使用前检查&#xff1a; 使用前请先看左下角&#xff0c;是否获取到Cursor的版本号 如果没有请先在 功能页面 -→ 自定义Cursor路径 选择你Cursor的安装的路径&#xff0c;并开启后重启YCursor&#xff0c;获取到版本后才能正常使用功能 检查软件左下角的权限标识是否为绿色 如…

pyhton基础【25】面向对象进阶六

目录 十七.单例模式 实现单例模式的两种方式 __new__方法概述 单例模式的使用场景 十七.单例模式 引入 单例模式是一种常用的软件设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点来获取这个实例。 实现单例模式的两种方式 使用类属性创…

后端树形结构

案例 在后端开发中&#xff0c;树形结构数据的查询和处理是一个常见的需求&#xff0c;比如部门管理、分类目录展示等场景。接下来&#xff0c;我们以一个部门管理系统为例&#xff0c;详细介绍如何实现后端的树查询功能。 案例背景 假设我们正在开发一个公司的内部管理系统&am…

高效沟通04-RIDE说服模型

高效沟通专栏–组织运转的命脉与个人成功的基石 目录 1. RIDE模型的核心理念2. RIDE模型的应用场景3. RIDE模型使用步骤4. RIDE模型示例与练习4.1 应用RIDE模型:4.2 练习:你来试试!5. 总结RIDE模型是一种结构化的说服框架,旨在帮助你在沟通(尤其是书面沟通或需要清晰逻辑…