Flink2.0学习笔记:Table API SQL

stevensu1/EC0720

表 API 和 SQL#

表 API 和 SQL——用于统一流和批处理 加工。表 API 是适用于 Java、Scala 和 Python 的语言集成查询 API,它 允许组合来自关系运算符的查询,例如 selection、filter 和 join in 一种非常直观的方式。Flink 的 SQL 支持基于实现 SQL 标准的 Apache Calcite。任一接口中指定的查询具有相同的语义 并指定相同的结果,无论输入是连续的(流式处理:无界)还是有界的(批处理:有界)。

我们的目标是同步mysql表和数据

先完成maven依赖:这里我们只引入flink-table-api-java:概览 |Apache Flink

如果在ide中运行:还要引入<!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->三个模块:概览 |Apache Flink

接着是mysql连接相关JDBC |Apache Flink

JDBC SQL 连接器

JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据和将数据写入任何关系数据库。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。

如果在 DDL 上定义了主键,则 JDBC 接收器以更新插入模式运行,以便与外部系统交换 UPDATE/DELETE 消息,否则,它以追加模式运行,不支持使用 UPDATE/DELETE 消息。

依次引入对应maven依赖:<!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql -->
到此所需的依赖引入完成。不过程序通常需要打包并通过web ui上传到Fink服务器上运行,Fink服务器通过java SPI服务发现运行我们的jar,关于java SPI接口,前面的文章《关于Red Hat Single Sign-On的User Storage SPI》里有提到过。

这是官网的插件配置地址:

第一步 |Apache Flink,所以要需要添加官方提供的maven打包插件:使用 Maven |Apache Flink

最后完整的依赖如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FLINKTAS-TEST-Catalog</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FLINKTAS-TEST-Catalog</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>2.0.0</flink.version></properties><dependencies><!--flink-clients,flink-table-runtime,flink-table-planner-loader- --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>2.0.0</version></dependency><!--flink-table-api-java    --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>2.0.0</version></dependency><!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql		--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-mysql</artifactId><version>4.0.0-2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-core</artifactId><version>4.0.0-2.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>org.example.App</mainClass></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>15</source><target>15</target></configuration></plugin></plugins></build>
</project>

现在来实现java处理流程:

先理解一下Catalogs:他可把整个数据库一次性注册到表环境TableEnvironment中

Catalogs | Apache Flink

flink-connector-jdbc-mysql模块已经对mysql的Catalogs 做了实现MySqlCatalog,但是它不能创建物理表,对此需要对其进行扩展实现对应的建表逻辑。

这是我的实现:

package org.example;import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.types.DataType;import java.sql.*;
import java.util.*;public class MyMySqlCatalog extends MySqlCatalog {public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);}public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String baseUrl, Properties connectionProperties) {super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);}public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {// 检查数据库是否存在if (!databaseExists(tablePath.getDatabaseName())) {throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());}// 检查表是否已存在if (tableExists(tablePath)) {if (!ignoreIfExists) {return;}}Connection conn = null;try {conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), this.getUsername(), this.getPassword());String createTableSql = generateCreateTableSql(tablePath.getObjectName(), table);try (PreparedStatement stmt = conn.prepareStatement(createTableSql)) {stmt.execute();}} catch (SQLException e) {throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);} finally {try {if (conn != null) {conn.close();}} catch (SQLException e) {e.printStackTrace();}}}private String generateCreateTableSql(String tableName, CatalogBaseTable table) {StringBuilder sql = new StringBuilder();sql.append("CREATE TABLE IF NOT EXISTS `").append(tableName).append("` (");// 构建列定义Schema schema = table.getUnresolvedSchema();List<String> columnDefs = new ArrayList<>();for (Schema.UnresolvedColumn column : schema.getColumns()) {if (column instanceof Schema.UnresolvedPhysicalColumn) {Schema.UnresolvedPhysicalColumn physicalColumn =(Schema.UnresolvedPhysicalColumn) column;String columnDef = String.format("`%s` %s",physicalColumn.getName(),convertFlinkTypeToMySql((DataType) physicalColumn.getDataType()));columnDefs.add(columnDef);}}sql.append(String.join(", ", columnDefs));sql.append(")");return sql.toString();}private String convertFlinkTypeToMySql(DataType dataType) {// 简化的类型转换,您可以根据需要扩展String typeName = dataType.getLogicalType().getTypeRoot().name();switch (typeName) {case "INTEGER":return "INT";case "VARCHAR":return "VARCHAR(255)";case "BIGINT":return "BIGINT";case "DOUBLE":return "DOUBLE";case "BOOLEAN":return "BOOLEAN";case "TIMESTAMP_WITHOUT_TIME_ZONE":return "TIMESTAMP";default:return "TEXT";}}
}

最后贴一下做数据同步过程的代码:

package org.example;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import java.util.List;import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;/*** Hello world!*/
public class App {public static void main(String[] args) throws DatabaseNotExistException, TableAlreadyExistException {EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();TableEnvironment tableEnv = TableEnvironment.create(settings);String name = "my_catalog";String defaultDatabase = "test";String username = "root";String password = "root";String baseUrl = "jdbc:mysql://localhost:3306";MyMySqlCatalog catalog = new MyMySqlCatalog(ClassLoader.getSystemClassLoader(),name,defaultDatabase,username,password,baseUrl);tableEnv.registerCatalog("my_catalog", catalog);// set the JdbcCatalog as the current catalog of the sessiontableEnv.useCatalog("my_catalog");List<String> tables = catalog.listTables(defaultDatabase);boolean exists = catalog.tableExists(ObjectPath.fromString("test.my_table_03"));//如果表不存在,则创建if (!exists) {// 定义表的字段和类型List<Column> columns = List.of(Column.physical("id", INT().notNull()),Column.physical("name", STRING()));Schema.Builder chemaB = Schema.newBuilder();chemaB.column("id", INT().notNull());chemaB.column("name", STRING());chemaB.primaryKey("id");Schema chema = chemaB.build();CatalogTable catalogTable = CatalogTable.newBuilder().schema(chema).build();catalog.createTable(ObjectPath.fromString("test.my_table_03"), catalogTable, true);}tableEnv.executeSql("SELECT * FROM my_table_01").print();tableEnv.executeSql("SELECT * FROM my_table_03").print();// 执行同步tableEnv.executeSql("INSERT INTO my_table_03 SELECT id, name FROM my_table_01");System.out.println("Hello World!");}
}

执行结果:

但是如果系统表太多,注册Catalogs可能会很消耗Flink内存,所以也可以只把需要的表注册到表环境中,

package org.example;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** Hello world!*/
public class App {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);registerMySqlTable(tableEnv);Table table1 = tableEnv.from("my_table_01");table1.printSchema();tableEnv.executeSql("SELECT * FROM my_table_01").print();registerMySqlTable02(tableEnv); // my_table_02Table table2 = tableEnv.from("my_table_02");table2.printSchema();tableEnv.executeSql("SELECT * FROM my_table_02").print();// 执行同步tableEnv.executeSql("INSERT INTO my_table_02 SELECT id, name FROM my_table_01");System.out.println("Hello World!");}/*** 注册 MySQL 表 my_table_02 到 Flink 表环境中*/public static void registerMySqlTable02(TableEnvironment tableEnv) {tableEnv.executeSql("CREATE TABLE my_table_02 (" +"id INT PRIMARY KEY NOT ENFORCED, " +"name STRING" +") WITH (" +"'connector' = 'jdbc', " +"'url' = 'jdbc:mysql://localhost:3306/test', " +"'table-name' = 'my_table_02', " +"'username' = 'root', " +"'password' = 'root'" +")");}/*** 注册 MySQL 表到 Flink 表环境中*/public static void registerMySqlTable(TableEnvironment tableEnv) {tableEnv.executeSql("CREATE TABLE my_table_01 (" +"id INT PRIMARY KEY NOT ENFORCED," +"name STRING" +") WITH (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/test'," +"'table-name' = 'my_table_01'," +"'username' = 'root'," +"'password' = 'root'" +")");}
}

这样也可以实现数据同步。最后优化建议可以使用jdbc连接池技术。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92034.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92034.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【 SpringAI核心特性 | Prompt工程 】

1. Prompt 工程 基本概念&#xff1a;Prompt ؜工程又叫提示‏词工程&#xff0c;简单来说&#xff0c;就是输入‌给 AI 的指令。 比如下面‏这段内容&#xff0c;就是提示词&#xff1a; 请问桂林电子科技大学是一个怎么样的学校&#xff1f;1.1 Prompt分类 在 AI ؜对话中…

windows wsl2-06-docker hello world

hello-world 例子 就像其他任何一门语言一样&#xff0c;我们来体验 docker 的 hello world $ docker run hello-world但是报错 :~$ docker run hello-world Unable to find image hello-world:latest locally docker: Error response from daemon: Get "https://registry…

Python知识点4-嵌套循环break和continue使用死循环

一、循环【重点掌握】 1.嵌套循环类似于嵌套if语句 语法&#xff1a; while 表达式1&#xff1a;while 表达式2&#xff1a;语句# 1. # 循环5次&#xff0c;打印0~4 m 0 while m < 5:print(m)m 1 # 循环3次&#xff0c;打印0~2 n 0 while n < 3:print(n)n 1print(&qu…

将HTML+JS+CSS数独游戏包装为安卓App

HTMLJSCSS制作一个数独游戏-CSDN博客 中开发了一个数独游戏&#xff0c;这个数独游戏提供了一次性回退到指定步骤的辅助功能&#xff0c;在解决复杂数独问题时十分有帮助&#xff0c;可作为玩数独游戏的辅助工具&#xff0c;因此&#xff0c;考虑将它改装成安卓App安装在手机上…

编程语言Java入门——核心技术篇(一)封装、继承和多态

同专栏基础知识篇写在这里&#xff0c;有兴趣的可以去看看&#xff1a; 编程语言Java入门——基础知识篇&#xff08;一&#xff09;-CSDN博客 编程语言Java入门——基础知识篇&#xff08;二&#xff09;-CSDN博客 编程语言Java入门——基础知识篇&#xff08;三&#xff0…

【39】MFC入门到精通——C++ /MFC操作文件行(读取,删除,修改指定行)

文章目录1 通过关键词&#xff0c;读取某一行 &#xff08;3种方法&#xff09;2 删除 指定行3 修改 指定行1 通过关键词&#xff0c;读取某一行 &#xff08;3种方法&#xff09; 通过定位关键词&#xff0c;读取某一行信息,返回CString //通过定位关键词&#xff0c;读取某…

5 种可行的方法:如何将 Redmi 联系人备份到 Mac

将 Redmi 联系人备份到 Mac 是防止因手机损坏、丢失或更换设备而导致数据丢失的重要措施。虽然云服务提供了便利性&#xff0c;但拥有离线备份可以提供额外的安全性&#xff0c;而无需完全依赖互联网。如果您想知道如何将 Redmi 联系人备份到 Mac&#xff0c;本文将为您介绍 5 …

LeRobot 具身智能机械臂 SO-ARM100 从搭建到训练全流程

今天给大家分享一下 LeRobot 具身智能机械臂 SO-ARM100 的完整使用流程&#xff0c;包括设备组装、环境配置、远程控制、数据录制到模型训练的全过程。适合刚入门具身智能的小伙伴参考学习。 一、前期准备与资源获取 在开始之前&#xff0c;我们需要准备好相关的资源和工具&a…

LINUX720 SWAP扩容;新增逻辑卷;逻辑卷扩容;数据库迁移;gdisk

SWAP空间扩展 方法一 增加硬盘或分区扩展 swap -s mkswap /dev/sdd6 blkid /dev/sdd6 swapon /dev/sdd6 swapon -s vim /etc/fstab /dev/sdd6 swap swap defaults 0 0 开机自动扩容 swap -s [rootweb ~]# lsblk NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT sd…

Python 进程间通信:TCP安全加密数据传输

最近在写安全方面的程序&#xff0c;有需求&#xff0c;就做了这些TCP加密数据传输类。 utils.safeUtils的内容详见&#xff1a; SafeObj&#xff1a;Python 高安全性加密数据容器类-CSDN博客SafeKey&#xff1a;Python 高安全性加密密码容器类-CSDN博客 如有任何问题或漏洞欢迎…

Windows批量修改文件属性方法

标题使用icacls命令&#xff08;推荐批量操作&#xff09;打开管理员权限的命令提示符&#xff08;CMD&#xff09;执行以下命令&#xff1a;cmd icacls "文件夹路径" /grant 用户名:(OI)(CI)F /T /C 参数说明&#xff1a;(OI)&#xff1a;对象继承 - 适用于文件夹(C…

Entity Component System架构

ECS架构 1 简介 在当今快速发展的软件开发领域&#xff0c;游戏开发、实时模拟等场景对系统的性能、灵活性和可扩展性提出了极高的要求。传统的面向对象架构在面对复杂且动态变化的实体时&#xff0c;往往会出现代码耦合度高、扩展性差等问题。​ ECS&#xff08;Entity - Com…

.vscode 扩展配置

一、vue快捷键配置 在项目.vscode下新建vue3.0.code-snippets 每当输入vue3.0后自动生成代码片段 {"Vue3.0快速生成模板": {"scope": "vue","prefix": "Vue3.0","body": ["<template>"," &…

一个基于阿里云的C端Java服务的整体项目架构

1.背景介绍 总结一下工作使用到的基于通常的公有云的项目整体架构&#xff0c;如何基于公有云建设安全可靠的服务&#xff0c;以阿里云为例的整体架构&#xff1b;1. 全局流量治理层&#xff08;用户请求入口&#xff09;1.1 域名与 DNS 解析域名注册与备案&#xff1a;通过阿里…

《剥开洋葱看中间件:Node.js请求处理效率与错误控制的深层逻辑》

在Node.js的运行时环境中&#xff0c;中间件如同一系列精密咬合的齿轮&#xff0c;驱动着请求从进入到响应的完整旅程&#xff0c;而洋葱模型则是这组齿轮的传动系统。它以一种看似矛盾的方式融合了顺序与逆序、分离与协作——让每个处理环节既能独立工作&#xff0c;又能感知全…

GaussDB union 的用法

1 union 的作用union 运算符用于组合两个或更多 select 语句的结果集。2 union 使用前提union 中的每个 select 语句必须具有相同的列数这些列也必须具有相似的数据类型每个 select 语句中的列也必须以相同的顺序排列3 union 语法select column_name(s) from table1 union sele…

构建足球实时比分APP:REST API与WebSocket接入方案详解

在开发足球实时比分应用时&#xff0c;数据接入方式的选择直接影响用户体验和系统性能。本文将客观分析REST API和WebSocket两种主流接入方案的技术特点、适用场景和实现策略&#xff0c;帮助开发者做出合理选择。一、REST API&#xff1a;灵活的数据获取方案核心优势标准化接口…

Linux文件系统三要素:块划分、分区管理与inode结构解析

理解文件系统 我们知道文件可以分为磁盘文件和内存文件&#xff0c;内存文件前面我们已经谈过了&#xff0c;下面我们来谈谈磁盘文件。 目录 一、引入"块"概念 解析 stat demo.c 命令输出 基本信息 设备信息 索引节点信息 权限信息 时间戳 二、引入"分区…

基于paddleDetect的半监督目标检测实战

基于paddleDetect的半监督目标检测实战前言相关介绍前提条件实验环境安装环境项目地址使用paddleDetect的半监督方法训练自己的数据集准备数据分割数据集配置参数文件PaddleDetection-2.7.0/configs/semi_det/denseteacher/denseteacher_ppyoloe_plus_crn_l_coco_semi010.ymlPa…

计算机网络:(十)虚拟专用网 VPN 和网络地址转换 NAT

计算机网络&#xff1a;&#xff08;十&#xff09;虚拟专用网 VPN 和网络地址转换 NAT前言一、虚拟专用网 VPN1. 基础概念与作用2. 工作原理3. 常见类型4. 协议对比二、NAT&#xff1a;网络地址转换1. 基础概念与作用2. 工作原理与类型3. 优缺点与问题4. 进阶类型三、VPN 与 N…