Java应用Flink CDC监听MySQL数据变动内容输出到控制台

文章目录

  • maven 依赖
  • 自定义数据变化处理器
  • flink cdc监听
  • 验证

maven 依赖

<properties><flink.version>1.14.0</flink.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><!-- Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink.version}</version></dependency></dependencies>

自定义数据变化处理器

package org.example;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class CustomSink extends RichSinkFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}@Overridepublic void invoke(String value, Context context) throws Exception {//0P字段,该字段也有4种取值。分别是C(Create ) , U(Updlate) . D(Delete ),Read 。// 对于U操作,其数据部分同时包含了Before和After.System.out.println(">>>" + value);}
}

flink cdc监听

package org.example;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MysqlSourceExample {public static void main(String[] args) throws Exception {DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();MySqlSource<String> source = MySqlSource.builder().hostname("127.0.0.1").port(3306).databaseList("canal_manager")// set captured database.tableList("canal_manager.canal_user")// set captured table.startupOptions(StartupOptions.latest()) // 设置从最新的修改记录开始读取.username("root").password("123456").deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string.includeSchemaChanges(true).build();//启动一个webuI。Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);//检者点间隔时间env .enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new CustomSink());env.execute();}
}

验证

启动后web页面地址访问http://localhost:8081/,MySQL数据库canal_manager中的表canal_user数据发生修改,控制台有输出json:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/909105.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

猎板厚铜PCB工艺能力如何?

在电子产业向高功率、高集成化狂奔的今天&#xff0c;电路板早已不是沉默的配角。当5G基站、新能源汽车、工业电源等领域对电流承载、散热效率提出严苛要求时&#xff0c;一块能够“扛得住大电流、耐得住高温”的厚铜PCB&#xff0c;正成为决定产品性能的关键拼图。而在这条赛道…

业务:资产管理功能

文章目录 一、项目背景1.1概述1.2编写目的 二、注意点说明三、页面效果四、代码AssetManagementControllerHwinfoAssetManagementHwinfoAssetManagementServiceHwinfoAssetManagementServiceImplHwinfoAssetManagementMapperHwinfoAssetManagementMapper.xmlSfpAssetManagement…

【MySQL进阶】MySQL程序

目录 一.有哪些MySQL程序 二. mysqld —— MySQL服务器 三.mysql——MySQL客户端 3.1.连接mysql客⼾端 3.2.mysql客户端选项 3.2.1.mysql常用选项 3.2.2.在命令⾏中使⽤选项 3.3.MySQL 选项&#xff08;配置&#xff09;文件 3.3.1.Linux环境下默认配置文件的路径 3.…

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…

​​CentOS 7.9​​ 上配置 ​​Fail2ban 自动封禁 IP​​ 的完整步骤,整合了多篇权威资料的最佳实践

&#x1f527; ​​一、安装 Fail2ban​​ ​​启用 EPEL 仓库​​ yum install epel-release -y ​​安装 Fail2ban​​ yum install fail2ban -y ​​启动并设置开机自启​​ systemctl start fail2ban systemctl enable fail2ban ⚠️ 注意&#xff1a;CentOS 7.9 默认 Py…

损坏的RAID5 第十六次CCF-CSP计算机软件能力认证

纯大模拟 提前打好板子 我只通过4个用例点 然后就超时了。 #include<iostream> #include<cstring> #include<algorithm> #include<unordered_map> #include<bits/stdc.h> using namespace std; int n, s, l; unordered_map<int, string>…

Kafka Topic中的数据在消费后还存在吗

在 Kafka 的主题(Topic)和分区(Partition)中,数据在被消费者消费后是否仍然存在,取决于 Kafka 的设计机制和配置策略。

Linuxkernel学习-deepseek-2

以下是国际上广受好评的 Linux 内核权威公开课&#xff0c;均来自顶级高校和技术组织&#xff0c;附课程链接和特色说明&#xff1a; —### 一、殿堂级大学课程1. MIT 6.S081: Operating System Engineering - 核心&#xff1a;基于 RISC-V 架构 重写 Unix 内核&#xff08;xv6…

高频面试之6Hive

Hive 文章目录 Hive6.1 Hive的架构6.2 HQL转换为MR流程6.3 Hive和数据库比较6.4 内部表和外部表6.5 系统函数6.6 自定义UDF、UDTF函数6.7 窗口函数6.8 Hive优化6.8.1 分组聚合6.8.2 Map Join6.8.3 SMB Map Join6.8.4 Reduce并行度6.8.5 小文件合并6.8.6 谓词下推6.8.7 并行执行…

分类场景数据集大全「包含数据标注+训练脚本」 (持续原地更新)

一、作者介绍&#xff1a;六年算法开发经验、AI 算法经理、阿里云专家博主。擅长&#xff1a;检测、分割、理解、大模型 等算法训练与推理部署任务。 二、数据集介绍&#xff1a; 质量高&#xff1a;高质量图片、高质量标注数据&#xff0c;吐血标注、整理&#xff0c;可以作为…

从硬件视角审视Web3安全:CertiK CTO主持Proof of Talk圆桌论坛

6月10日&#xff0c;在备受瞩目的全球Web3与AI峰会Proof of Talk 2025上&#xff0c;CertiK首席技术官Li Kang博士主持了一场聚焦“Web3钱包与托管安全”&#xff08;Web3 Wallet and Custodial Security&#xff09;的圆桌论坛。本次论坛从硬件与系统软件的底层视角出发&#…

从DevOps到AIOps:智能体如何接管持续交付流程

引言&#xff1a;从DevOps到AIOps的时代跃迁 DevOps 作为软件开发与运维一体化的最佳实践&#xff0c;已经广泛应用于现代软件工程体系中。在 CI/CD&#xff08;持续集成/持续交付&#xff09;的支撑下&#xff0c;软件交付从季度变为月度、从周变为日&#xff0c;乃至分钟级更…

MAC-安装Homebrew、安装Git

1.首先尝试用中科大和清华的源发现不行 中国科学技术大学(USTC)提供了 Homebrew 的镜像仓库,同步官方更新,适合国内用户。 安装命令​​: /bin/bash -c "$(curl -fsSL https://mirrors.ustc.edu.cn/brew/install.sh)" 步骤说明​​: 复制上述命令到终端,按…

flutter基础面试知识汇总(二)

一、全局状态管理工具-----GetX、Provider、Bloc 1.Provider Provider 是 Flutter 中一个流行的状态管理库&#xff0c;它简化了数据共享和状态管理的过程。它通过依赖注入的方式&#xff0c;让不同的 Widget 共享数据&#xff0c;而无需过多地传递参数。Provider也是官方推荐…

基于YOLOv12的电力高空作业安全检测:为电力作业“保驾护航”,告别安全隐患!

在电力行业&#xff0c;尤其是高空作业场景&#xff0c;安全隐患无处不在。高空作业本身就存在着极高的风险&#xff0c;尤其是对于电力维护和检修工作来说&#xff0c;稍有不慎便可能造成严重的安全事故。传统的安全监管方式&#xff0c;如人工巡检和视频监控&#xff0c;存在…

大话软工笔记—需求分析汇总

需求调研和分析完成&#xff0c;可汇总形成两份文档&#xff1a;需求规格说明书和解决方案。 1. 需求规格说明书 1.1 主要内容 引言&#xff0c;包括项目目的、背景、用语等基础信息。项目概述&#xff0c;对项目自身的说明、包括范围、主要处理对象、与其他系统的关系等。功…

openstack实例创建过程分析

用户验证 1、某用户以登录web界面或执行rc文件的方式&#xff0c;通过RESTful API向keystone获取credentials&#xff1b; 2、keystone进行authentication&#xff0c;若正确则生成并返回auth-token&#xff1b; 3、以携带auth-token的形式&#xff0c;在web界面或命令行cli&a…

安卓首次启动Fallbackhome是否可以直接去除?--学员作业

背景&#xff1a; 有学员朋友在vip群提出一个需求相关的问题&#xff0c;他想要把settings裁剪掉&#xff0c;但是发现裁剪后Fallbackhome肯定就没了&#xff0c;发现Launcher居然无法启动了&#xff0c;一直处于Bootanimation的画面&#xff0c;无法进入系统。 针对这个去除…

C++ 实现环形缓冲区

环形缓冲区&#xff08;Ring Buffer&#xff09;是一种常见的用于数据流缓冲的结构&#xff0c;通常用于生产者-消费者模型、音视频处理等场景。 因为环形缓冲区使用的场景大多为性能敏感的场景&#xff0c;我们采用数组的数据结构和位运算来实现&#xff0c;以提高代码效率。…

MySQL虚拟列:一个被低估的MySQL特性

前言 最近在做订单系统重构时&#xff0c;遇到了一个有趣的问题。 系统里有很多地方都要计算订单的总价&#xff08;数量单价&#xff09;&#xff0c;这个计算逻辑分散在各个服务中&#xff0c;产生了不少相似甚至重复的代码。 代码评审时&#xff0c;同事提出了一个建议 —…