stevensu1/EC0720
表 API 和 SQL#
表 API 和 SQL——用于统一流和批处理 加工。表 API 是适用于 Java、Scala 和 Python 的语言集成查询 API,它 允许组合来自关系运算符的查询,例如 selection、filter 和 join in 一种非常直观的方式。Flink 的 SQL 支持基于实现 SQL 标准的 Apache Calcite。任一接口中指定的查询具有相同的语义 并指定相同的结果,无论输入是连续的(流式处理:无界)还是有界的(批处理:有界)。
我们的目标是同步mysql表和数据
先完成maven依赖:这里我们只引入flink-table-api-java:
概览 |Apache Flink
如果在ide中运行:还要引入<!--flink-clients,flink-table-runtime,flink-table-planner-loader- -->三个模块:概览 |Apache Flink
接着是mysql连接相关JDBC |Apache Flink
JDBC SQL 连接器
JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据和将数据写入任何关系数据库。本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。
如果在 DDL 上定义了主键,则 JDBC 接收器以更新插入模式运行,以便与外部系统交换 UPDATE/DELETE 消息,否则,它以追加模式运行,不支持使用 UPDATE/DELETE 消息。
依次引入对应maven依赖:<!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql -->
到此所需的依赖引入完成。不过程序通常需要打包并通过web ui上传到Fink服务器上运行,Fink服务器通过java SPI服务发现运行我们的jar,关于java SPI接口,前面的文章《关于Red Hat Single Sign-On的User Storage SPI》里有提到过。
这是官网的插件配置地址:
第一步 |Apache Flink,所以要需要添加官方提供的maven打包插件:使用 Maven |Apache Flink
最后完整的依赖如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FLINKTAS-TEST-Catalog</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>FLINKTAS-TEST-Catalog</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>2.0.0</flink.version></properties><dependencies><!--flink-clients,flink-table-runtime,flink-table-planner-loader- --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>2.0.0</version></dependency><!--flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>2.0.0</version></dependency><!--flink-connector-jdbc-core,mysql-connector-java,flink-connector-jdbc-mysql --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-mysql</artifactId><version>4.0.0-2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc-core</artifactId><version>4.0.0-2.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job --><mainClass>org.example.App</mainClass></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>15</source><target>15</target></configuration></plugin></plugins></build>
</project>
现在来实现java处理流程:
先理解一下Catalogs:他可把整个数据库一次性注册到表环境TableEnvironment中
Catalogs | Apache Flink
flink-connector-jdbc-mysql模块已经对mysql的Catalogs 做了实现MySqlCatalog,但是它不能创建物理表,对此需要对其进行扩展实现对应的建表逻辑。
这是我的实现:
package org.example;import org.apache.flink.connector.jdbc.mysql.database.catalog.MySqlCatalog;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.types.DataType;import java.sql.*;
import java.util.*;public class MyMySqlCatalog extends MySqlCatalog {public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);}public MyMySqlCatalog(ClassLoader userClassLoader, String catalogName, String defaultDatabase, String baseUrl, Properties connectionProperties) {super(userClassLoader, catalogName, defaultDatabase, baseUrl, connectionProperties);}public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {// 检查数据库是否存在if (!databaseExists(tablePath.getDatabaseName())) {throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());}// 检查表是否已存在if (tableExists(tablePath)) {if (!ignoreIfExists) {return;}}Connection conn = null;try {conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), this.getUsername(), this.getPassword());String createTableSql = generateCreateTableSql(tablePath.getObjectName(), table);try (PreparedStatement stmt = conn.prepareStatement(createTableSql)) {stmt.execute();}} catch (SQLException e) {throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);} finally {try {if (conn != null) {conn.close();}} catch (SQLException e) {e.printStackTrace();}}}private String generateCreateTableSql(String tableName, CatalogBaseTable table) {StringBuilder sql = new StringBuilder();sql.append("CREATE TABLE IF NOT EXISTS `").append(tableName).append("` (");// 构建列定义Schema schema = table.getUnresolvedSchema();List<String> columnDefs = new ArrayList<>();for (Schema.UnresolvedColumn column : schema.getColumns()) {if (column instanceof Schema.UnresolvedPhysicalColumn) {Schema.UnresolvedPhysicalColumn physicalColumn =(Schema.UnresolvedPhysicalColumn) column;String columnDef = String.format("`%s` %s",physicalColumn.getName(),convertFlinkTypeToMySql((DataType) physicalColumn.getDataType()));columnDefs.add(columnDef);}}sql.append(String.join(", ", columnDefs));sql.append(")");return sql.toString();}private String convertFlinkTypeToMySql(DataType dataType) {// 简化的类型转换,您可以根据需要扩展String typeName = dataType.getLogicalType().getTypeRoot().name();switch (typeName) {case "INTEGER":return "INT";case "VARCHAR":return "VARCHAR(255)";case "BIGINT":return "BIGINT";case "DOUBLE":return "DOUBLE";case "BOOLEAN":return "BOOLEAN";case "TIMESTAMP_WITHOUT_TIME_ZONE":return "TIMESTAMP";default:return "TEXT";}}
}
最后贴一下做数据同步过程的代码:
package org.example;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import java.util.List;import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.STRING;/*** Hello world!*/
public class App {public static void main(String[] args) throws DatabaseNotExistException, TableAlreadyExistException {EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();TableEnvironment tableEnv = TableEnvironment.create(settings);String name = "my_catalog";String defaultDatabase = "test";String username = "root";String password = "root";String baseUrl = "jdbc:mysql://localhost:3306";MyMySqlCatalog catalog = new MyMySqlCatalog(ClassLoader.getSystemClassLoader(),name,defaultDatabase,username,password,baseUrl);tableEnv.registerCatalog("my_catalog", catalog);// set the JdbcCatalog as the current catalog of the sessiontableEnv.useCatalog("my_catalog");List<String> tables = catalog.listTables(defaultDatabase);boolean exists = catalog.tableExists(ObjectPath.fromString("test.my_table_03"));//如果表不存在,则创建if (!exists) {// 定义表的字段和类型List<Column> columns = List.of(Column.physical("id", INT().notNull()),Column.physical("name", STRING()));Schema.Builder chemaB = Schema.newBuilder();chemaB.column("id", INT().notNull());chemaB.column("name", STRING());chemaB.primaryKey("id");Schema chema = chemaB.build();CatalogTable catalogTable = CatalogTable.newBuilder().schema(chema).build();catalog.createTable(ObjectPath.fromString("test.my_table_03"), catalogTable, true);}tableEnv.executeSql("SELECT * FROM my_table_01").print();tableEnv.executeSql("SELECT * FROM my_table_03").print();// 执行同步tableEnv.executeSql("INSERT INTO my_table_03 SELECT id, name FROM my_table_01");System.out.println("Hello World!");}
}
执行结果:
但是如果系统表太多,注册Catalogs可能会很消耗Flink内存,所以也可以只把需要的表注册到表环境中,
package org.example;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** Hello world!*/
public class App {public static void main(String[] args) {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);registerMySqlTable(tableEnv);Table table1 = tableEnv.from("my_table_01");table1.printSchema();tableEnv.executeSql("SELECT * FROM my_table_01").print();registerMySqlTable02(tableEnv); // my_table_02Table table2 = tableEnv.from("my_table_02");table2.printSchema();tableEnv.executeSql("SELECT * FROM my_table_02").print();// 执行同步tableEnv.executeSql("INSERT INTO my_table_02 SELECT id, name FROM my_table_01");System.out.println("Hello World!");}/*** 注册 MySQL 表 my_table_02 到 Flink 表环境中*/public static void registerMySqlTable02(TableEnvironment tableEnv) {tableEnv.executeSql("CREATE TABLE my_table_02 (" +"id INT PRIMARY KEY NOT ENFORCED, " +"name STRING" +") WITH (" +"'connector' = 'jdbc', " +"'url' = 'jdbc:mysql://localhost:3306/test', " +"'table-name' = 'my_table_02', " +"'username' = 'root', " +"'password' = 'root'" +")");}/*** 注册 MySQL 表到 Flink 表环境中*/public static void registerMySqlTable(TableEnvironment tableEnv) {tableEnv.executeSql("CREATE TABLE my_table_01 (" +"id INT PRIMARY KEY NOT ENFORCED," +"name STRING" +") WITH (" +"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/test'," +"'table-name' = 'my_table_01'," +"'username' = 'root'," +"'password' = 'root'" +")");}
}
这样也可以实现数据同步。最后优化建议可以使用jdbc连接池技术。