资源
es:7.18 kibana:7.18 flink:1.17.2
目录
mkdir -p /usr/project/flink/{conf,job,logs}
chmod -R 777 /usr/project/flink
#资源情况
mysql8.0 Elasticsearch7.18 自建# 目录结构
/usr/project/flink/
/usr/project/flink/
├── conf/
│ ├── flink-conf.yaml
│ └── log4j2.xml
├── job/
│ ├── flink-connector-elasticsearch7-3.0.1-1.17.jar
│ ├── flink-connector-elasticsearch-base-3.0.1-1.17.jar
│ ├── flink-sql-connector-mysql-cdc-3.1.1.jar
│ └── win_user.sql
├── logs/
└── docker-compose.yml
本地创建es kibana
version: '3.8'services:jobmanager:image: flink:1.17.2container_name: flink-jobmanagerrestart: alwaysports:- "8081:8081" # Flink Web UIcommand: jobmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagervolumes:- ./conf:/opt/flink/conf- ./job:/opt/flink/job- /usr/project/flink/logs:/opt/flink/lognetworks:- flink-networktaskmanager:image: flink:1.17.2container_name: flink-taskmanagerrestart: alwaysdepends_on:- jobmanagercommand: taskmanagerenvironment:- JOB_MANAGER_RPC_ADDRESS=jobmanagervolumes:- ./conf:/opt/flink/conf- ./job:/opt/flink/job- /usr/project/flink/logs:/opt/flink/lognetworks:- flink-networkelasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:7.17.18container_name: elasticsearchrestart: alwaysenvironment:- discovery.type=single-node- ELASTIC_PASSWORD=123456- xpack.security.enabled=true- network.host=0.0.0.0ports:- "9200:9200"- "9300:9300"volumes:- es_data:/usr/share/elasticsearch/datanetworks:- flink-networkkibana:image: docker.elastic.co/kibana/kibana:7.17.18container_name: kibanarestart: alwaysenvironment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200- ELASTICSEARCH_USERNAME=elastic- ELASTICSEARCH_PASSWORD=123456ports:- "5601:5601"networks:- flink-networkvolumes:es_data:networks:flink-network:driver: bridge
启动
#目录
cd /usr/project/flink#保存文件后,重新启动容器
docker-compose up -d#关闭 停止并移除容器(不删除数据卷)
docker-compose down#检查 restart 策略是否生效
docker inspect -f '{{.Name}} {{.HostConfig.RestartPolicy.Name}}' $(docker ps -q)
验证
http://127.0.0.1:9200curl -u elastic:123456 http://127.0.0.1:9200#账户密码
elastic
123456
Flink SQL Job 示例
文件 /usr/project/flink/job/win_user.sql
存量增量模式
scan.startup.mode
设置为 'initial'
,以从表的初始状态开始读取数据,然后再进行增量同步
将其设置为 'latest-offset'
,以从最新的偏移量开始读取数据,实现增量同步
验证表是否成功创建
/opt/flink/bin/sql-client.sh embeddedSHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embeddedSHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embeddedSHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
配置模块
vim /usr/project/flink/job/win_user.sql
CREATE TABLE v99_source_win_user (id INT,username STRING,merchant_id INT,avatar STRING,fcoin DECIMAL(15,4),coin_commission DECIMAL(15,4),level_id TINYINT,role TINYINT,is_promoter TINYINT,flag INT,real_name STRING,signature STRING,birthday STRING,area_code STRING,mobile STRING,email STRING,sex TINYINT,bind_bank TINYINT,address STRING,score INT,promo_code STRING,id_path STRING,sup_uid_1 INT,sup_username_1 STRING,sup_uid_2 INT,sup_uid_3 INT,sup_uid_4 INT,sup_uid_5 INT,sup_uid_6 INT,sup_uid_top INT,sup_username_top STRING,sup_level_top INT,password_hash STRING,password_coin STRING,ip STRING,third_login_type STRING,ip_region STRING,status TINYINT,last_login_ip STRING,last_login_ip_region STRING,last_login_time INT,last_login_device_id STRING,created_at INT,updated_at INT,freeze_cause STRING,freeze_at INT,operator_name STRING,fb_pid STRING,fb_cid STRING,created_name STRING,memberType TINYINT,google_sub_id STRING,facebook_sub_id STRING,secret STRING,code_url STRING,code_status TINYINT,user_type TINYINT,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = '123456','database-name' = 'db_name','table-name' = 'win_user','scan.startup.mode' = 'initial', -- 读取存量数据'debezium.snapshot.mode' = 'never', -- 使用快照模式initial 增量模式never 增量模式'scan.incremental.snapshot.enabled' = 'true' -- 启用增量同步
);CREATE TABLE es_sink_table_win_user (id INT,username STRING,merchant_id INT,avatar STRING,fcoin DECIMAL(15,4),coin_commission DECIMAL(15,4),level_id TINYINT,role TINYINT,is_promoter TINYINT,flag INT,real_name STRING,signature STRING,birthday STRING,area_code STRING,mobile STRING,email STRING,sex TINYINT,bind_bank TINYINT,address STRING,score INT,promo_code STRING,id_path STRING,sup_uid_1 INT,sup_username_1 STRING,sup_uid_2 INT,sup_uid_3 INT,sup_uid_4 INT,sup_uid_5 INT,sup_uid_6 INT,sup_uid_top INT,sup_username_top STRING,sup_level_top INT,password_hash STRING,password_coin STRING,ip STRING,third_login_type STRING,ip_region STRING,status TINYINT,last_login_ip STRING,last_login_ip_region STRING,last_login_time INT,last_login_device_id STRING,created_at INT,updated_at INT,freeze_cause STRING,freeze_at INT,operator_name STRING,fb_pid STRING,fb_cid STRING,created_name STRING,memberType TINYINT,google_sub_id STRING,facebook_sub_id STRING,secret STRING,code_url STRING,code_status TINYINT,user_type TINYINT,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://elasticsearch:9200','username' = 'elastic','password' = '123456','index' = 'win_user', -- 确保索引名称与 Elasticsearch 中的匹配'sink.bulk-flush.interval' = '1s','sink.bulk-flush.backoff.max-retries' = '3', -- 设置最大重试次数'sink.bulk-flush.max-actions' = '100', -- 一条数据也会同步不等待'sink.bulk-flush.max-size' = '1mb', -- 达到 1MB 或 200 条数据时批量 flush'sink.bulk-flush.backoff.delay' = '100ms', -- 设置重试的延迟'sink.bulk-flush.backoff.strategy' = 'constant' -- 重试策略
);-- 3. 执行数据插入任务
INSERT INTO es_sink_table_win_user
SELECT * FROM v99_source_win_user;
验证
/opt/flink/bin/sql-client.sh embedded
#验证
SHOW TABLES;
desc es_sink_table_win_user;
DROP TABLE IF EXISTS es_sink_table_win_user;
DROP TABLE IF EXISTS v99_source_win_user;# Flink 1.17 中,您可以使用以下命令查看已注册的连接器
SHOW TABLES;
#作业状态
SHOW JOBS;
#详情
EXPLAIN SELECT * FROM v99_source_win_user;SELECT * FROM v99_source_win_user LIMIT 10;
优化配置 必须要配置
/opt/flink/bin/sql-client.sh embedded
#增加的 Session 全局配置(SET)
SET execution.checkpointing.interval = '1s';
SET restart-strategy = 'fixed-delay';
SET restart-strategy.fixed-delay.attempts = '3';
SET restart-strategy.fixed-delay.delay = '5s';
SET parallelism.default = 4;
SET state.backend = 'rocksdb';
SET state.backend.rocksdb.memory.managed = 'true';
SET execution.parallelism = 8;#-- 提交作业时设置 Sink 的并行度提升
SET parallelism.default = 2;
#最高作业任务
SET execution.parallelism = 8;
#查看验证配置
SET;
连接器下载配置
flink-connector-elasticsearch包官方下载地址 https://repo1.maven.org/maven2/org/apache/flink/ 要选对版本 es7.17
flink-1.17.2
cd /usr/project/flink/job
#删除当前目录除win_user.sql其他的文件
find . -maxdepth 1 ! -name 'win_user.sql' ! -name '.' -type f -exec rm -f {} +# MySQL CDC
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.1/flink-sql-connector-mysql-cdc-3.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# Elasticsearch
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base/3.0.1-1.17/flink-connector-elasticsearch-base-3.0.1-1.17.jar# 补充依赖
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
wget https://repo1.maven.org/maven2/commons-codec/commons-codec/1.11/commons-codec-1.11.jar