Flink Stream API 源码走读 - window 和 sum

本文核心观点

核心观点:WindowedStream 是一个"假流",它比 KeyedStream 更虚,只是一个 API 的过渡器,不是真正意义上的 DataStream,需要调用函数回归。

  1. 虚拟化时刻:从真实流到虚拟流
    KeyedStream<T,K> keyedStream = …; // 半虚拟流
    WindowedStream<T,K,W> windowedStream = keyedStream.window(assigner); // 完全虚拟流

  2. 回归时刻:从虚拟流回到真实流
    windowedStream.sum()
    return input.transform(opName, resultType, operator); // 回到DataStream标准流程

一、window() 方法的特殊性发现

1.1 只有 KeyedStream 才有 window 方法

//  DataStream 上没有 window 方法
DataStream<String> stream = ...;
// stream.window(assigner); // 编译错误!//  只有 KeyedStream 才有 window 方法
KeyedStream<String, String> keyedStream = stream.keyBy(...);
WindowedStream<String, String, TimeWindow> windowedStream = keyedStream.window(assigner);

为什么这样设计?

  • 窗口操作需要基于 Key 进行分组
  • 每个 Key 都有独立的窗口状态
  • 保证相同 Key 的数据进入同一个窗口实例

1.2 KeyedStream 的特殊 API 设计

public class KeyedStream<T, KEY> extends DataStream<T> {// 继承 DataStream 的所有方法:map, filter, flatMap...// KeyedStream 特有的窗口 APIpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner);public WindowedStream<T, KEY, GlobalWindow> countWindow(long size);// KeyedStream 特有的聚合 APIpublic SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function);public SingleOutputStreamOperator<T> sum(int positionToSum);public SingleOutputStreamOperator<T> max(int positionToMax);// ... 其他聚合操作
}

设计理念

  • 继承性:保留 DataStream 的所有基础能力
  • 扩展性:增加基于 Key 的特殊操作
  • 状态性:支持有状态的聚合操作

二、WindowedStream 的"虚拟"本质

2.1 WindowedStream 的创建过程

// KeyedStream.java
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);  // 仅仅是创建对象
}

关键发现:window() 方法没有创建任何 Transformation!

2.2 WindowedStream 的内部结构

public class WindowedStream<T, K, W extends Window> {// 仅有两个成员变量private final KeyedStream<T, K> input;           // 上游流的引用private final WindowOperatorBuilder<T, K, W> builder;  // 算子构建器// 注意:没有继承 DataStream!
}

2.3 WindowedStream 构造函数解析

public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;  // 保存上游流引用// 创建窗口算子构建器,用于构建窗口操作的核心组件// WindowOperatorBuilder是构建者模式的实现,负责组装窗口操作所需的各种组件this.builder = new WindowOperatorBuilder<>(// 窗口分配器:决定数据元素被分配到哪个窗口// 例如:TumblingEventTimeWindows、SlidingEventTimeWindows等windowAssigner,// 窗口触发器:决定何时触发窗口计算和输出结果// 每种窗口分配器都有其默认的触发器策略// 例如:EventTimeTrigger用于事件时间窗口,ProcessingTimeTrigger用于处理时间窗口windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),// 执行配置:包含序列化器、并行度等运行时配置信息input.getExecutionConfig(),// 输入数据类型信息:用于序列化和反序列化输入数据input.getType(),// Key选择器:从输入数据中提取分组键,确保相同key的数据进入同一个窗口实例input.getKeySelector(),// Key类型信息:用于序列化和反序列化分组键input.getKeyType());
}

重要理解

  • 构造函数只是组装配置信息,没有创建算子
  • 比 KeyedStream 更"虚",KeyedStream 好歹有个 PartitionTransformation
  • WindowedStream 什么 Transformation 都没有

2.4 WindowedStream 的"虚拟"特性

流类型虚拟化程度特性描述
DataStream🟢 真实流✅ 有 Transformation
✅ 支持链式调用
✅ 可直接执行
KeyedStream🟡 半虚拟流✅ 有 PartitionTransformation
✅ 支持链式调用
✅ 支持窗口API
⚠️ 无实际算子
WindowedStream🔴 完全虚拟流❌ 无 Transformation
❌ 断开链式调用
✅ 只支持窗口聚合API
⚠️ 纯过渡器

WindowedStream 的特殊性

  1. 不继承 DataStream - 彻底断开链式调用
  2. 纯 API 过渡器 - 只是工具类,不是真正的流
  3. 强制聚合 - 必须调用聚合操作才能回到正常流
  4. 临时状态 - 无法直接使用,必须转换

WindowedStream 的特殊性

  1. 不继承 DataStream - 彻底断开链式调用
  2. 纯 API 过渡器 - 只是工具类,不是真正的流
  3. 强制聚合 - 必须调用聚合操作才能回到正常流
  4. 临时状态 - 无法直接使用,必须转换

三、sum() 方法的完整解析

3.1 sum() 方法的调用链

// WindowedStream.java - 入口方法
public SingleOutputStreamOperator<T> sum(int positionToSum) {// 创建内置的求和聚合器return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}// aggregate 方法 - 中转
private SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregator) {return reduce(aggregator);  // 转发给 reduce
}

关键理解

  • sum() 只是一个便利方法
  • 内部使用 Flink 预定义的 SumAggregator
  • 最终还是调用 reduce() 方法

3.2 SumAggregator 的本质

// SumAggregator 的继承关系
public class SumAggregator<T> extends AggregationFunction<T> implements ReduceFunction<T> {private final int positionToSum;  // 要求和的字段位置// 实现具体的求和逻辑
}

重要发现

  • SumAggregator 就是一个 ReduceFunction
  • 与用户自定义的 MapFunction 地位完全相同
  • Flink 内部预写好的函数,用户也可以自己实现

3.3 reduce() 方法的三层重载

// 第一层:只有 ReduceFunction(我们的入口)
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {function = input.getExecutionEnvironment().clean(function);  // 清理函数return reduce(function, new PassThroughWindowFunction<>());  // 添加默认 WindowFunction
}// 第二层:ReduceFunction + WindowFunction
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function) {// 推断输出类型TypeInformation<R> resultType = getWindowFunctionReturnType(function, inputType);return reduce(reduceFunction, function, resultType);  // 继续传递
}// 第三层:完整参数(最终实现)
public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction,WindowFunction<T, R, K, W> function,TypeInformation<R> resultType) {// 1. 清理函数(序列化检查)function = input.getExecutionEnvironment().clean(function);reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);// 2. 生成算子名称和描述final String opName = builder.generateOperatorName();final String opDescription = builder.generateOperatorDescription(reduceFunction, function);// 3. 通过 builder 根据function 创建WindowOperatorOneInputStreamOperator<T, R> operator = builder.reduce(reduceFunction, function);// 4. 根据Operator 创建 OperatorFactory -> transformation -> DataStreamreturn input.transform(opName, resultType, operator).setDescription(opDescription);
}

重载链的设计目的

  • 逐步补充参数:从简单到复杂
  • 提供默认值:PassThroughWindowFunction 作为默认窗口函数
  • 类型推断:自动推断输出类型
  • 函数清理:确保函数可序列化

3.4 PassThroughWindowFunction 的巧妙设计

// 第一层 reduce 方法中的关键一行
return reduce(function, new PassThroughWindowFunction<>());

PassThroughWindowFunction 的作用

// PassThroughWindowFunction 的简化实现
public class PassThroughWindowFunction<T, K, W extends Window>implements WindowFunction<T, T, K, W> {@Overridepublic void apply(K key, W window, Iterable<T> input, Collector<T> out) {// 直接透传,不做任何处理for (T element : input) {out.collect(element);}}
}

为什么需要 PassThroughWindowFunction?

  • 接口统一:WindowOperator 需要 ReduceFunction + WindowFunction 两个函数
  • 透明传递:用户只想要聚合结果,不需要额外处理
  • 适配器模式:将单一的 ReduceFunction 适配为完整的窗口处理流程
用户调用sum
只有ReduceFunction
SumAggregator
自动添加
PassThroughWindowFunction
WindowOperator需要的
完整函数对

五、回到 DataStream 的标准流程

5.1 关键的回归时刻

// WindowedStream 的最后一步 - 回到正轨!
return input.transform(opName, resultType, operator);

这一行代码的重要性

  • inputKeyedStream(继承自 DataStream
  • 调用的是 DataStream.transform() 方法
  • WindowedStream 完成使命,回到标准流程

5.2 transform() 方法的标准处理

// DataStream.java - 标准的 transform 方法
public <R> SingleOutputStreamOperator<R> transform(String operatorName,TypeInformation<R> outTypeInfo,OneInputStreamOperator<T, R> operator) {// 包装算子为工厂return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

5.3 doTransform() 的核心逻辑

protected <R> SingleOutputStreamOperator<R> doTransform(...) {// 1. 创建物理 TransformationOneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,    // 上游:PartitionTransformation (keyBy产生的)operatorName,          // "Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumAggregator, PassThroughWindowFunction)"operatorFactory,       // SimpleOperatorFactory(WindowOperator)outTypeInfo,          // 输出类型信息environment.getParallelism(),  // 并行度false);               // 不是并行度敏感的// 2. 创建新的 DataStreamSingleOutputStreamOperator<R> returnStream =new SingleOutputStreamOperator<>(environment, resultTransform);// 3. 添加到执行环境 - 重要!getExecutionEnvironment().addOperator(resultTransform);return returnStream;
}

关键步骤解析

  1. 创建物理 Transformation:包含真正的算子
  2. 构建新的 DataStream:恢复正常的流
  3. 注册到环境:只有物理 Transformation 才会被注册

六、调用时序图

在这里插入图片描述

导航链接

上节链接:Flink Stream API 源码走读 - keyBy

下节预告:Flink Stream API 源码走读 - print

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/93660.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/93660.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蓝牙 GFSK RX Core 架构解析

GFSK RX Core分为以下几个模块&#xff1a; 1.Frequency offset compensation CORDIC 2.A low pass filter 3.A power estimator for packet detection,RSSI and digital gaion computation for DPSK path 4.A demodulator implemented as Phase Shift Discriminator 5.A drequ…

微电网管控系统中python多线程缓存与SQLite多数据库文件连接池实践总结(含源码)

1. 引言 在分散的微电网能源管理场景中,系统采用集中式云平台模式,为100个独立微电网用户提供高并发数据写入服务面临三大挑战:用户数据隔离、I/O性能瓶颈、多线程安全性。本文揭示一种新式的分片锁+三级缓存+sqlite多数据库文件连接池架构,在保持SQLite轻量级优势的同时,…

InfluxDB 开发工具链:IDE 插件与调试技巧(一)

引言 ** 在当今数字化时代&#xff0c;时间序列数据的处理与分析在众多领域中都扮演着至关重要的角色。无论是物联网设备产生的海量传感器数据&#xff0c;还是金融市场中实时波动的交易数据&#xff0c;又或是服务器运维过程中不断产生的性能指标数据&#xff0c;这些都属于…

计算机网络-IPv6

1、IPv6基础IPv4与IPv6的对比&#xff1a;问题IPv4的缺陷IPv6的优势地址空间IPv4地址采用32比特标识&#xff0c;能提供的地址数量是43亿&#xff0c;分配很不均衡。针对IPv4的地址短缺问题&#xff0c;有几种解决方案&#xff1a;无类别域间路由CIDR&#xff08;Classless Int…

整体设计 之“凝聚式中心点”原型 --整除:智能合约和DBMS的深层融合 之2

摘要&#xff08;CSDN的AI助手自动生成的&#xff09;本文提出了一种基于"整除"数学原型的智能合约与DBMS融合架构设计&#xff0c;将SQL查询语句的四个关键段&#xff08;SELECT、FROM、WHERE、BY&#xff09;分别映射到整除运算的四个要素&#xff08;商、被除数、…

【赵渝强老师】TiDB表数据与键值对的映射关系

TiDB实例将表中的每一行数据映射成RocksDB中的键值对&#xff0c;则需要考虑如何构造Key和Value。首先&#xff0c;OLTP场景下有大量针对单行或者多行的增、删、改、查等操作&#xff0c;要求数据库具备快速读取一行数据的能力。因此&#xff0c;对应的Key最好有一个唯一ID&…

带操作系统的延时函数

delay.c:#include "delay.h"/*** brief 微秒级延时* param nus 延时时长&#xff0c;范围&#xff1a;0~233015* retval 无*/ void delay_us(uint32_t nus) {uint32_t ticks;uint32_t tcnt 0, told, tnow;uint32_t reload SysTick->LOAD; //重…

ES Module 和 CommonJS的区别

ES Module&#xff08;ESM&#xff0c;ES6 模块系统&#xff09;和 CommonJS 是 JavaScript 中两种主流的模块规范&#xff0c;分别用于现代前端和 Node.js 环境&#xff08;早期&#xff09;&#xff0c;它们在语法、加载机制、特性等方面有显著区别。以下是详细对比&#xff…

猫头虎AI分享|一款智能量化交易系统:QuantCell,从数据收集到策略执行全流程自动化

猫头虎AI分享&#xff5c;一款智能量化交易系统&#xff1a;QuantCell&#xff0c;从数据收集到策略执行全流程自动化 在当今金融市场中&#xff0c;量化交易系统已经成为越来越多投资者和机构的重要选择。无论是股票、期货还是加密货币&#xff0c;自动化交易与人工智能的结合…

直播美颜SDK架构揭秘:动态贴纸功能的实现原理与性能优化

如今&#xff0c;美颜SDK 已经不再只是“磨皮、美白”的基础工具&#xff0c;而是逐渐进化为一个涵盖 人脸识别、实时特效、动态贴纸交互 的复杂技术体系。尤其是 动态贴纸功能 的加入&#xff0c;让主播与观众之间的互动更加生动有趣&#xff0c;也成为提升用户粘性与平台差异…

Docker安装CDC

Docker安装CDC拉取镜像离线形式安装上传文件并创建docker-compose.yml把镜像加载到docker中启动容器连接数据库创建账号&#xff0c;并给账号授权设置wal_level确认wal_level的值创建链接查询连接状态使用kafdrop消息中看不到修改之前的信息怎么办补充拉取镜像 docker pull co…

如何在win服务器中部署若依项目

一、安装jdk的环境&#xff1a; 这一步很简单&#xff0c;直接拿到安装包双击安装即可。 二、配置jdk的环境变量默认安装的路径为&#xff1a;C:\Program Files (x86)\Java\jdk1.7.0_51安装完成之后进行环境变量配置右击计算机&#xff08;此电脑&#xff09;点击属性点击高级系…

CSS从入门到精通完整指南

第一部分&#xff1a;CSS基础入门1.1 什么是CSSCSS&#xff08;层叠样式表&#xff0c;Cascading Style Sheets&#xff09;是用于描述HTML文档外观和格式的样式语言。CSS将内容与表现分离&#xff0c;让HTML专注于内容结构&#xff0c;CSS专注于视觉效果。1.2 CSS语法结构选择…

重温k8s基础概念知识系列二(Pod)

文章目录1、Pod概念2、K8s 中的 Pod 的两种用法3、定义Pod4、Pod的创建资源5、Pod 模板6、容器探针7、总结干货8、 K8s Pod 经典面试题速查表Pod是Kubernetes中最小的单元&#xff1a; 1、Pod概念 Pod 是可以在 Kubernetes中创建和管理的、最小的可部署的计算单元。它由一组、一…

设计模式之静态代理

一些个人理解 顾名思义&#xff0c;就是代理一个对象。 那么&#xff0c;既然要代理一个东西&#xff0c;就要传入它吧? 【1】所以将代理对象当作属性【【2】往往通过构造方法传入被代理的目标对象】。 既然要代理&#xff0c;那必然要和代理对象拥有相同的功能吧? 所以实现了…

牛津大学xDeepMind 自然语言处理(1)

牛津大学xDeepMind 自然语言处理 Natural Language Processing 词向量与词汇语义学 Word Vectors and Lexical Semantics 词语表示的基本问题与分布语义思想 传统词语表示&#xff08;如独热向量&#xff09;存在稀疏、正交、语义弱的问题&#xff0c;无法表达语义相似性。分布…

StarRocks数据库集群的完整部署流程

目录 依赖环境 下载安装包 部署FE 部署BE 搭建集群 停止集群 依赖环境 详见&#xff1a;StarRocks 部署&#xff1a;依赖环境-CSDN博客 下载安装包 在官方网站下载安装包&#xff1a;StarRocks 部署FE 创建元数据目录。 mkdir -p <meta_dir> 修改 FE 配置文件 f…

简单的 VSCode 设置

以下是我使用的vscode设置。虽然有些主观&#xff0c;但很实用。1 主题。我放弃了那些炫酷的主题。我选择了Tokyo Night (Storm)。理由是&#xff1a;它平静、赏心悦目&#xff0c;并且与代码形成了美丽的对比&#xff0c;却又不显得刺眼。2. 字体。我切换到了 JetBrains Mono …

Rust 条件语句

Rust 条件语句 在编程语言中&#xff0c;条件语句是程序流程控制的重要组成部分。Rust 作为一种系统编程语言&#xff0c;其条件语句的设计简洁而强大。本文将详细介绍 Rust 中的条件语句&#xff0c;包括其语法、用法以及一些高级特性。 1. 基本条件语句 Rust 中的基本条件语句…

【Java EE进阶 --- SpringBoot】初识Spring(创建SpringBoot项目)

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 &#xff0c;Java, Java EE初阶&#xff0c; Java数据结构 欢迎大家访问~ 创作不易&#xff0c;大佬们点赞鼓励…