【flink】 flink 读取debezium-json数据获取数据操作类型op/rowkind方法

flink 读取debezium-json数据获取数据操作类型op/rowkind方法。
op类型有c(create),u(update),d(delete)
参考官网案例:此处的"op": "u",就是操作类型。

{"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null
}

思路:添加自定义metareader
具体修改方法:去github flink 下载flink源码,选择tags,后按照自己需要的版本下载。
至idea中。修改org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat

enum ReadableMetadata枚举类中添加枚举对象:
注意:operation.type可以按需更改,opudf也能按需更改,但不能是op,否则原有的冲突。
添加如下代码:

        OPERATION_TYPE("operation.type",DataTypes.STRING().nullable(),true,DataTypes.FIELD("opudf", DataTypes.STRING()),new MetadataConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(GenericRowData row, int pos) {return row.getString(2);}})

编译项目:

# install
mvn clean package -DskipTests "-Dmaven.test.skip=true" "-Pfast" -pl flink-formats/flink-json
# maven install 到本地
mvn install:install-file "-DgroupId=org.apache.flink" "-DartifactId=flink-json-udf" "-Dversion=1.13.6" -Dpackaging=jar "-Dfile=D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar"# gradle 不能使用 implement(files("libs/flink-json-1.13.6.jar")) 的方式。需要直接加载到仓库。或者user/.m2目录下的临时仓库。
# 如下路径不能直接复制,可供参考,自己找下。
copy D:\projects\flink-release-1.13.6\flink-formats\flink-json\target\flink-json-1.13.6.jar D:\env\Gradle_Repository\caches\modules-2\files-2.1\org.apache.flink\flink-json\1.13.6\cb4daaf018e2
10faa54e76488cbb11e179502728

使用方法:
添加列定义:
opt_type STRING METADATA FROM 'value.operation.type' VIRTUAL,

create table test.some_debezium
(event_time      TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,origin_table    STRING METADATA FROM 'value.source.table' VIRTUAL,opt_type        STRING METADATA FROM 'value.operation.type' VIRTUAL,id            BIGINT,name          STRING,) WITH('connector'='kafka','topic'='your_topic','properties.bootstrap.servers'='host1:6667,host2:6667,host3:6667','properties.group.id'='your_group_id','properties.security.protocol'='SASL_PLAINTEXT','properties.sasl.kerberos.service.name'='kafka','format'='debezium-json','debezium-json.timestamp-format.standard'='ISO-8601','debezium-json.schema-include'='true','debezium-json.ignore-parse-errors'='true'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/85739.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/85739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

某手游cocos2dlua反编译

一、获取加载的luac文件 通过frida hook libccos2dlua.so 的luaL_loadbuffer函数对luac进行dump js代码如下,得到dump后的lua文件 // 要加载的目标库名 var targetLibrary "libcocos2dlua.so"; var dlopen Module.findExportByName(null, "dlope…

`toRaw` 与 `markRaw`:Vue3 响应式系统的细粒度控制

🤍 前端开发工程师、技术日更博主、已过CET6 🍨 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 🕠 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》、《前端求职突破计划》 🍚 蓝桥云课签约作者、…

Python文件迁移之Shutil库详解

Shutil是一个Python内置的用来高效处理文件和目录迁移任务的库。Shutil不仅支持基本的文件复制、移动和删除操作,还具备处理大文件、批量迁移目录、以及跨平台兼容性等特性。通过使用Shutil,我们可以更加轻松地实现文件系统的管理和维护,本文…

【服务器R环境架构】基于 micromamba下载 R 库包

目录 准备工作:下载并安装R环境下载并安装R环境方式1:下载 .tar.bz2 压缩包进行解压执行(官方推荐)方式2: 创建并激活R环境 下载R库包安装CRAN包在 micromamba 中安装 GitHub 包(如 BPST) 参考 …

基于 Apache POI 实现的 Word 操作工具类

基于 Apache POI 实现的 Word 操作工具类 这个工具类是让 AI 写的,已覆盖常用功能。 如不满足场景的可以让 AI 继续加功能。 已包含的功能: 文本相关: 添加文本、 设置字体颜色、 设置字体大小、 设置对齐方式、 设置字符间距、 设置字体加粗…

时间序列预测、分类 | 图神经网络开源代码分享(上)

本期结合《时间序列图神经网络(GNN4TS)综述》,整理了关于图神经网络在时间序列预测、分类等任务上的开源代码和学习资料以供大家学习、研究。 参考论文:《A Survey on Graph Neural Networks for Time Series: Forecasting, Classification, Imputation,…

Vue 添加水印(防篡改: 删除水印元素节点、修改水印元素的样式)

MutationObserver_API: 观察某一个元素的变化// index.vue<template><div class="container"><Watermark text="版权所有" style="background: #28c848"><!-- 可给图片、视频、div...添加水印 --><div class=&quo…

如何处理开发不认可测试发现的问题

解决方案 第一步&#xff1a;收集确凿证据 确保有完整的复现结果准备详细的记录材料&#xff1a; 截屏录屏操作步骤记录 带着这些证据与开发人员进行沟通 第二步&#xff1a;多角度验证 如果与开发人员沟通无果&#xff1a; 竞品分析&#xff1a;查看市场上同类产品如何…

linux生产环境下根据关键字搜索指定日志文件命令

grep -C 100 "error" server.log 用于在 server.log 文件中查找包含 “error” 的行&#xff0c;并同时显示该行前后100行的上下文。这是排查日志问题的常用技巧&#xff0c;解释一下&#xff1a; 命令参数详解 grep&#xff1a;文本搜索工具&#xff0c;用于在文件…

用vue和echarts怎么写一个甘特图,并且是分段式瀑布流

vue echarts 甘特图功能 index.vue <template><div ref"echart" id"echart" class"echart"></div> </template><script setup>import { nextTick, onMounted, ref } from "vue";import * as echarts f…

Pandas使用教程:从入门到实战的数据分析利器

一、Pandas基础入门 1.1 什么是Pandas Pandas是Python生态中核心的数据分析库&#xff0c;提供高效的数据结构&#xff08;Series/DataFrame&#xff09;和数据分析工具。其名称源于"Panel Data"&#xff08;面板数据&#xff09;和"Python Data Analysis"…

NuttX Socket 源码学习

概述 NuttX 的 socket 实现是一个精心设计的网络编程接口&#xff0c;提供了标准的 BSD socket API。该实现采用分层架构设计&#xff0c;支持多种网络协议族&#xff08;如 TCP/IP、UDP、Unix域套接字等&#xff09;&#xff0c;具有良好的可扩展性和模块化特性。 整体架构设…

基于YOLO的语义分割实战(以猪的分割为例)

数据集准备 数据集配置文件 其实语义分割和目标检测类似&#xff0c;包括数据集制备、存放格式基本一致像这样放好即可。 然后需要编写一个data.yaml文件&#xff0c;对应的是数据的配置文件。 train: C:\图标\dan\语义分割pig\dataset\train\images #绝对路径即可 val: C:\…

钉钉智能会议室集成指纹密码锁,临时开门密码自动下发

在当今快节奏的工作环境中&#xff0c;会议室的高效管理和使用成为了企业提升工作效率的关键一环。湖南某知名企业近期成功升级了原有使用的钉钉智能会议室系统&#xff0c;并配套使用了启辰智慧联网指纹密码锁&#xff0c;实现了会议室管理的智能化升级&#xff0c;提升了会议…

C++讲解—类(1)

类 在 C 中&#xff0c;类是一个关键概念&#xff0c;凭借其封装和继承的特性&#xff0c;能够助力程序员之间实现高效的分工协作&#xff0c;共同完成复杂的大型项目。我们先从最简单的概念入手&#xff0c;再进行更深层次的了解和应用。 1. 类的定义 类是用户自定义的一种…

什么是Hadoop Yarn

Hadoop YARN&#xff1a;分布式集群资源管理系统详解 1. 什么是YARN&#xff1f; YARN&#xff08;Yet Another Resource Negotiator&#xff09;是 Apache Hadoop 生态系统中的资源管理和作业调度系统&#xff0c;最初在 Hadoop 2.0 中引入&#xff0c;取代了 Hadoop 1.0 的…

项目开发中途遇到困难的解决方案

1. 正视困难&#xff0c;避免逃避 开发遇阻时&#xff0c;退缩会带来双重损失&#xff1a;既成为"失败者逃兵"&#xff0c;又损害职业信心1。 行动建议&#xff1a; 立即向团队透明化问题&#xff08;如进度延迟、技术瓶颈&#xff09;&#xff0c;避免问题滚雪球…

Blender硬表面建模篇收集学习建模过程中的Demo

c 齿轮 创建一个圆柱体&#xff0c;选择侧面的所有&#xff0c;然后进行隔断选择&#xff0c;两次挤出面&#xff0c;一次缩放面&#xff0c;通过圆柱面三次插入面缩放挤出得到齿轮中心&#xff0c;选中齿轮的锯齿中间&#xff0c;然后进行相同周长选择行选择齿与齿中间的面&…

Chromium 136 编译指南 macOS篇:获取源代码(四)

1. 引言 在现代软件开发的宏大版图中&#xff0c;源代码的获取往往标志着从理论探索向实践应用的关键转折。对于Chromium 136这样一个拥有超过2500万行代码、涉及数百个第三方库的超大规模开源项目而言&#xff0c;源代码的获取不仅仅是简单的文件下载&#xff0c;更是一个涉及…

OpenCV C++ 边缘检测与图像分割

一、边缘检测 在数字图像处理领域&#xff0c;边缘检测是一项至关重要的基础技术。它如同为图像赋予 “骨架”&#xff0c;帮助计算机快速识别图像中的物体轮廓、形状与结构&#xff0c;广泛应用于目标识别、图像分割、图像配准等多个领域。 1.1 概念 边缘检测的核心目标是找…