Flink Table SQL

Apache Flink 提供了强大的 Table API 和 SQL 接口,用于统一处理批数据和流数据。它们为开发者提供了类 SQL 的编程方式,简化了复杂的数据处理逻辑,并支持与外部系统集成。


🧩 一、Flink Table & SQL 核心概念

概念描述
Table API基于 Java/Scala 的 DSL,提供类型安全的操作接口
Flink SQL支持标准 ANSI SQL 语法的查询语言
DataStream / DataSet ↔ Table可以在 DataStream 或 Table 之间互相转换
Catalog元数据管理器,如 Hive Catalog、Memory Catalog
TableEnvironment管理表、SQL 执行环境的核心类
Connectors支持 Kafka、Hive、MySQL、文件等数据源接入
Time Attributes定义事件时间(Event Time)、处理时间(Processing Time)
Windowing支持滚动窗口、滑动窗口、会话窗口等

💻 二、Flink Table API 和 SQL 的优势

特性描述
统一接口同一套代码可运行在 Batch 和 Streaming 场景下
高性能底层使用 Apache Calcite 进行优化,自动进行查询优化
易用性强对熟悉 SQL 的用户非常友好
生态集成好支持 Kafka、Hive、JDBC、Elasticsearch 等多种数据源
状态管理在流式场景中自动管理状态和窗口逻辑

📦 三、核心组件说明

1. TableEnvironment

  • 是操作 Table 和 SQL 的入口
  • 负责注册表、执行查询、管理元数据等
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

2. DataStream ↔ Table 转换

示例:DataStream 转 Table
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 2));// 将 DataStream 转换为 Table
Table table = tEnv.fromDataStream(dataStream);// 注册为临时表
tEnv.createTemporaryView("myTable", dataStream);
示例:Table 转 DataStream
Table resultTable = tEnv.sqlQuery("SELECT * FROM myTable WHERE f1 > 1");
DataStream<Row> resultStream = tEnv.toDataStream(resultTable);

3. Flink SQL 查询

示例:使用 SQL 查询统计结果
// 创建临时表
tEnv.executeSql("CREATE TABLE MyKafkaSource (" +"  user STRING," +"  url STRING," +"  ts BIGINT" +") WITH (" +"  'connector' = 'kafka'," +"  'format' = 'json'" +")"
);// 执行 SQL 查询
Table result = tEnv.sqlQuery("SELECT user, COUNT(*) AS cnt FROM MyKafkaSource GROUP BY user");// 转换为 DataStream 并输出
tEnv.toDataStream(result).print();env.execute();

🧪 四、Java 示例:完整的 Table API + SQL 使用案例

✅ 功能:

从 Kafka 读取日志数据,按用户分组统计访问次数

📁 依赖建议(pom.xml)

<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><!-- Flink Streaming --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.17.1</version></dependency><!-- Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.1</version></dependency><!-- JSON Format --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.1</version></dependency>
</dependencies>

🧱 五、完整 Java 示例代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableAndSQLEntry {public static void main(String[] args) throws Exception {// 1. 初始化流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建 TableEnvironmentStreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 3. 创建 Kafka Source 表(模拟从 Kafka 读取日志)tEnv.executeSql("CREATE TABLE KafkaLog (" +"  user STRING," +"  url STRING," +"  ts BIGINT" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_log'," +"  'properties.bootstrap.servers' = 'localhost:9092'," +"  'properties.group.id' = 'flink-sql-group'," +"  'format' = 'json'" +")");// 4. 创建 Sink 表(控制台输出)tEnv.executeSql("CREATE TABLE ConsoleSink (" +"  user STRING," +"  cnt BIGINT" +") WITH (" +"  'connector' = 'print'" +")");// 5. 使用 SQL 编写业务逻辑tEnv.executeSql("INSERT INTO ConsoleSink " +"SELECT user, COUNT(*) AS cnt " +"FROM KafkaLog " +"GROUP BY user");}
}

📊 六、SQL 查询示例汇总

SQL 示例描述
SELECT * FROM table查询所有字段
SELECT user, COUNT(*) FROM table GROUP BY user分组聚合
SELECT * FROM table WHERE ts > 1000条件过滤
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) ...时间窗口聚合
SELECT * FROM LATERAL TABLE(udtf(col))使用 UDTF
CREATE VIEW view_name AS SELECT ...创建视图
INSERT INTO sink_table SELECT ...写入到目标表

⏱️ 七、时间属性与窗口聚合

示例:定义事件时间并使用滚动窗口

-- 定义带有事件时间的表
CREATE TABLE EventTable (user STRING,url STRING,ts BIGINT,WATERMARK FOR ts AS ts - 1000 -- 定义水印
) WITH (...);-- 使用滚动窗口进行统计
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,user,COUNT(*) AS cnt
FROM EventTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), user;

📁 八、连接器(Connector)配置示例

1. Kafka Source

CREATE TABLE KafkaSource (user STRING,url STRING,ts BIGINT
) WITH ('connector' = 'kafka','topic' = 'input-topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink-sql-group','format' = 'json'
);

2. MySQL Sink

CREATE TABLE MysqlSink (user STRING,cnt BIGINT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydb','table-name' = 'user_access_log'
);

📈 九、Flink SQL + Table API 的典型应用场景

场景示例
实时 ETL从 Kafka 读取数据 → 清洗 → 写入 HDFS
流式分析统计每分钟点击量、异常检测
数据质量监控判断字段是否为空、格式是否合法
风控规则引擎使用 CEP 检测异常行为
数仓建模构建 DWD、DWS 层表结构

🧠 十、Table API vs SQL

特性Table APISQL
语法风格函数式链式调用类 SQL 语法
易用性对 Java 开发者更友好对 SQL 用户更友好
动态解析不适合动态 SQL支持字符串拼接、模板引擎
性能一致(底层都是 Calcite)一致
支持功能大部分 SQL 功能都有对应 API支持完整 SQL 语法
调试难度相对较难调试更直观、便于调试

✅ 十一、总结

技术点描述
Table API基于 Java/Scala 的函数式 API
Flink SQL支持 ANSI SQL,易于上手
TableEnvironment管理表和 SQL 的核心类
Connectors支持 Kafka、Hive、JDBC、File、Print 等
Time Attributes支持事件时间、处理时间
Windowing支持滚动、滑动、会话窗口
State Backend支持 RocksDB、FS、Memory 状态后端

🧩 十二、扩展学习方向

如果你希望我为你演示以下内容,请继续提问:

  • 自定义函数(UDF、UDAF、UDTF)
  • Kafka + MySQL 实时同步方案
  • 基于 Hive 的批处理 SQL 作业
  • 使用 PyFlink 实现 SQL 作业
  • 使用 WITH 子句定义临时表
  • 使用 LATERAL TABLE 调用 UDTF
  • 使用 MATCH_RECOGNIZE 实现 CEP 模式匹配

📌 一句话总结:

Flink Table API 和 SQL 提供了一种统一的批流一体编程模型,适合数据仓库、实时分析、ETL、风控等多种大数据处理场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/81383.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【AWS入门】Amazon SageMaker简介

【AWS入门】Amazon SageMaker简介 [AWS Essentials] Brief Introduction to Amazon SageMaker By JacksonML 机器学习(Machine Learning&#xff0c;简称ML) 是当代流行的计算机科学分支技术。通常&#xff0c;人们在本地部署搭建环境&#xff0c;以满足机器学习的要求。 AWS…

解决 Go 构建依赖超时问题:使用 GOPROXY 提升 Docker 构建稳定性

目录 解决 Go 构建依赖超时问题&#xff1a;使用 GOPROXY 提升 Docker 构建稳定性 ✅ 问题背景 ✅ 正确做法&#xff1a;多阶段中在 Go 阶段设置 GOPROXY ✅ 实际收获 &#x1f9ea; 小技巧&#xff1a;验证 GOPROXY 设置是否生效 ✅ 总结 解决 Go 构建依赖超时问题&#x…

【周输入】510周阅读推荐-3

前文 【周输入】510周阅读推荐-1-CSDN博客 【周输入】510周阅读推荐-2-CSDN博客 本次推荐 目录 前文 本次推荐 算法技术 模型产品 算法技术 vLLM和DeepSpeed部署模型的优缺点_vllm deepspeed-CSDN博客 优点缺点总结vLLM 适用于推理 优化内存管理 高效并行化 功能单…

Kubernetes控制平面组件:Kubelet详解(七):容器网络接口 CNI

云原生学习路线导航页&#xff08;持续更新中&#xff09; kubernetes学习系列快捷链接 Kubernetes架构原则和对象设计&#xff08;一&#xff09;Kubernetes架构原则和对象设计&#xff08;二&#xff09;Kubernetes架构原则和对象设计&#xff08;三&#xff09;Kubernetes控…

【推荐】新准则下对照会计报表172个会计科目解释

序号 科目名称 对应的会计报表项目 序号 科目名称 对应的会计报表项目   一、资产类     二、负债类   1 1001 库存现金 货币资金 103 2001 短期借款 短期借款 2 1002 银行存款 货币资金 104 2101 交易性金融负债 易性金融负债 3 1012 其他货币资…

MongoDB的安装及简单使用

MongoDB 是一个开源的文档型 NoSQL 数据库​​&#xff0c;由 MongoDB Inc. 开发&#xff0c;专为灵活性和可扩展性设计。 特点&#xff1a; ​​1.文档模型​​&#xff1a;数据以 BSON&#xff08;二进制 JSON&#xff09;格式存储&#xff0c;支持嵌套结构。 ​​2.动态 S…

Gartner《如何将生成式人工智能(GenAI)集成到应用架构》学习心得

针对软件架构师、技术专业人士如何更好的把 GenAI 如何融入解决方案,提升用户体验、生产力并带来差异化成果的趋势,Gartner发布了《Integrating GenAI Into Your Application Architecture》研究报告。 报告首先介绍了 GenAI 的发展背景,指出其已成为主流趋势,大型语言模型…

IDEA - Windows IDEA 代码块展开与折叠(基础折叠操作、高级折叠操作)

一、基础折叠操作 折叠当前代码块&#xff1a;Ctrl - # 操作方式按下 【Ctrl】 键&#xff0c;再按下 【-】 键展开当前代码块&#xff1a;Ctrl # 操作方式按下 【Ctrl】 键&#xff0c;再按下 【】 键折叠所有代码块&#xff1a;Ctrl Shift - # 操作方式按下 【Ctrl】…

基于STM32F103与Marvell88W8686的WIFI无线监控视频传输系统研发(论文)

基于STM32F103与Marvell88W8686的WIFI无线监控视频传输系统研发 中文摘要 在当今社会信息化进程不断加速的时代背景下&#xff0c;众多领域对于监控系统的需求日益增长&#xff0c;像车内安全监控、电梯运行监控等场景都离不开监控系统的支持。过去&#xff0c;不少领域普遍采用…

Java基础知识总结(超详细整理)

一&#xff1a;概述 1.1Java类及类的成员 属性、方法、构造器、代码块、内部类 &#xff08;1&#xff09;数组 java虚拟机内存划分 各区域作用 内存解析 基本使用 两个变量指向一个一维数组 没有new就不会在堆里新开辟空间 &#xff08;2&#xff09;对象数组 &#xff08;3&a…

StarRocks Community Monthly Newsletter (Apr)

版本动态 3.4.3 版本更新 核心功能升级 Routine Load和Stream Load新增Lambda表达式支持&#xff0c;支持复杂的列数据提取 增强JSON数据处理能力&#xff0c;支持将JSON Array/Object转为ARRAY/MAP类型 优化information_schema.task_runs视图查询&#xff0c;新增LIMIT支持…

探索AI新领域:生成式人工智能认证(GAI认证)助力职场发展

在数字化时代的大潮中&#xff0c;人工智能&#xff08;AI&#xff09;技术以其强大的影响力和广泛的应用前景&#xff0c;正逐步重塑我们的生活与工作方式。随着生成式AI技术的崛起&#xff0c;掌握这一前沿技能已成为职场竞争中的关键优势。那么&#xff0c;如何通过系统的学…

数据库触发器Trigger

在数据库管理系统中&#xff0c;触发器&#xff08;Trigger&#xff09;是一种特殊的存储过程&#xff0c;它在特定的事件发生时自动执行。触发器通常用于维护数据的完整性和一致性。通过事件触发而被执行&#xff0c;不能直接调用。 触发器的三要素 触发事件 before/after&a…

如何利用 Java 爬虫获得某书笔记详情:实战指南

在知识分享和学习的领域&#xff0c;许多平台提供了丰富的书籍笔记和学习资源。通过 Java 爬虫技术&#xff0c;我们可以高效地获取这些笔记的详细信息&#xff0c;以便进行进一步的分析和整理。本文将详细介绍如何利用 Java 爬虫获取某书笔记详情&#xff0c;并提供完整的代码…

主成分分析的应用之sklearn.decomposition模块的PCA函数

主成分分析的应用之sklearn.decomposition模块的PCA函数 一、模型建立整体步骤 二、数据 2297.86 589.62 474.74 164.19 290.91 626.21 295.20 199.03 2262.19 571.69 461.25 185.90 337.83 604.78 354.66 198.96 2303.29 589.99 516.21 236.55 403.92 730.05 438.41 225.80 …

【Redis】List 列表

文章目录 初识列表常用命令lpushlpushxlrangerpushrpushxlpop & rpoplindexlinsertllen阻塞操作 —— blpop & brpop 内部编码应用场景 初识列表 列表类型&#xff0c;用于存储多个字符串。在操作和实现上&#xff0c;类似 C 的双端队列&#xff0c;支持随机访问(O(N)…

Android framework 中间件开发(三)

前两篇我们讲了中间件的开发和打包应用, Android framework 中间件开发(一) Android framework 中间件开发(二) 这边我们来讲一下在中间件中编写JNI 1.新建C文件 找到frameworks\base\services\core\jni\路径,新建一个cpp文件,文件名为com_android_server_DarkControlService.c…

深入了解linux系统—— 基础IO(上)

文件 在之前学习C语言文件操作时&#xff0c;我们了解过什么是文件&#xff0c;这里简单回顾一下&#xff1a; 文件存在磁盘中&#xff0c;文件有分为程序文件、数据文件&#xff1b;二进制文件和文本文件等。 详细描述见文章&#xff1a;文件操作——C语言 文件在磁盘里&a…

Flink CDC—实时数据集成框架

Flink CDC 是一个基于流的数据集成工具&#xff0c;旨在为用户提供一套功能更加全面的编程接口&#xff08;API&#xff09;&#xff0c;它基于数据库日志的 CDC&#xff08;变更数据捕获&#xff09;技术实现了统一的增量和全量数据读取。 该工具使得用户能够以 YAML 配置文件…

ES(ES2023/ES14)最新更新内容,及如何减少内耗

截至2023年10月,JavaScript(ECMAScript)的最新版本是 ES2023(ES14)。 ES2023 引入了许多新特性,如findLast、toSorted等,同时优化了性能。通过减少全局变量、避免内存泄漏、优化循环、减少DOM操作、使用Web Workers、懒加载、缓存、高效数据结构和代码压缩,可以显著降低…