Flink Stream API 源码走读 - socketTextStream

概述

本文深入分析了 Flink 中 socketTextStream() 方法的源码实现,从用户API调用到最终返回 DataStream 的完整流程。

核心知识点

1. socketTextStream 方法重载链

// 用户调用入口
env.socketTextStream("hostname", 9999)↓ 补充分隔符参数
env.socketTextStream("hostname", 9999, "\n") ↓ 补充重试次数参数
env.socketTextStream("hostname", 9999, "\n", 0)↓ 创建 SocketTextStreamFunction
addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream")

重载过程分析:

  • 第一层:补充分隔符参数(默认 “\n”)
  • 第二层:补充重试次数参数(默认 0)
  • 最终:创建 SocketTextStreamFunction 并调用 addSource

2. SourceFunction 的重要说明

@Deprecated
public class SocketTextStreamFunction implements SourceFunction<String>

⚠️ 重要提醒:

  • SourceFunction 已被标记为 @Deprecated(过时)
  • 官方建议使用新的 Source API
  • 基于 SourceFunction 的架构是老架构
  • 新架构基于 org.apache.flink.api.connector.source.Source

3. addSource 方法的重载链

addSource(function, sourceName)
addSource(function, sourceName, null)
addSource(function, sourceName, typeInfo, CONTINUOUS_UNBOUNDED)
核心处理逻辑

参数补充过程:

  1. addSource(function, "Socket Stream")
  2. addSource(function, "Socket Stream", null) - 补充 TypeInformation 为 null
  3. addSource(function, "Socket Stream", null, CONTINUOUS_UNBOUNDED) - 补充有界性

4. 核心处理逻辑分析

private <OUT> DataStreamSource<OUT> addSource(final SourceFunction<OUT> function,final String sourceName,@Nullable final TypeInformation<OUT> typeInfo,final Boundedness boundedness) {// 1. 非空检查checkNotNull(function);checkNotNull(sourceName);checkNotNull(boundedness);// 2. 抽取类型信息TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);// 3. 判断是否并行boolean isParallel = function instanceof ParallelSourceFunction;// 4. 序列化检查clean(function);// 5. Function → Operatorfinal StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);// 6. 返回 DataStreamSourcereturn new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}

5. 四个核心概念的转换

Function
用户逻辑
Operator
算子封装
Transformation
转换操作
DataStream
用户API

概念解释:

  1. Function: 用户的业务逻辑封装

    • SocketTextStreamFunction - Socket连接和数据读取逻辑
    • 继承自 SourceFunction<String>
  2. Operator: 算子的抽象

    • StreamSource<OUT, ?> - 将Function包装成算子
    • 继承自 AbstractUdfStreamOperator
  3. Transformation: 转换操作的封装

    • LegacySourceTransformation - 包装Operator和相关元信息
    • 包含类型信息、并行度、有界性等
  4. DataStream: 面向用户的流式API

    • DataStreamSource - 继承自 DataStream
    • 支持链式调用(map、filter、keyBy等)

6. 重要参数说明

TypeInformation(类型信息)
// 为什么需要 TypeInformation?
// Java 泛型在编译后会被类型擦除,Flink需要显式的类型信息来:
// 1. 创建序列化器/反序列化器
// 2. 根据不同类型产生不同的序列化机制
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
Boundedness(有界性)
// CONTINUOUS_UNBOUNDED 表示无界流
// 在翻译成物理执行计划时会用到这个信息
// 有界流和无界流会生成不同的执行计划
Boundedness.CONTINUOUS_UNBOUNDED
并行性检查
// 检查是否为并行源函数
boolean isParallel = function instanceof ParallelSourceFunction;
// SocketTextStreamFunction 不是 ParallelSourceFunction,所以 isParallel = false

7. DataStreamSource 的构造

public DataStreamSource(StreamExecutionEnvironment environment,TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,boolean isParallel,String sourceName,Boundedness boundedness) {// 调用父类构造,创建 LegacySourceTransformationsuper(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism(), boundedness));// 如果不是并行的,设置并行度为1if (!isParallel) {setParallelism(1);}
}

8. 继承关系分析

DataStream<T>
SingleOutputStreamOperator<T>
DataStreamSource<T>
包含所有流式API
map, filter, keyBy, window等

重要理解:

  • DataStreamSource 本质上就是一个 DataStream
  • 所有的链式调用API都定义在 DataStream
  • SingleOutputStreamOperator 这个命名容易误导,它实际上是个 DataStream

9. DataStream 的内部结构

public class DataStream<T> {// 两个最重要的成员protected final StreamExecutionEnvironment environment;  // 执行环境protected final Transformation<T> transformation;        // 转换操作
}

关系链:

  • DataStream 包含 Transformation
  • Transformation 包含 Operator
  • Operator 包含 Function

10. 链式调用的实现

DataStream<String> stream = env.socketTextStream("localhost", 9999).map(...)           // 返回 SingleOutputStreamOperator (实际是DataStream).filter(...)        // 返回 SingleOutputStreamOperator  .keyBy(...)         // 返回 KeyedStream.window(...)        // 返回 WindowedStream.sum(...)           // 返回 SingleOutputStreamOperator.print();          // 返回 DataStreamSink

流程:
DataStreamSource → 各种变换 → DataStreamSink

总结

核心流程回顾

  1. 用户调用 env.socketTextStream(hostname, port)
  2. 参数补全 通过重载方法逐步补充参数
  3. Function创建 创建 SocketTextStreamFunction
  4. addSource调用 进入核心处理逻辑
  5. 类型推断 抽取输出数据的类型信息
  6. 并行性检查 判断是否为并行源函数
  7. Function→Operator 封装成 StreamSource
  8. Operator→Transformation 创建 LegacySourceTransformation
  9. 返回DataStream 创建 DataStreamSource

设计模式体现

  • 装饰器模式: Function → Operator → Transformation → DataStream
  • 建造者模式: 通过重载方法逐步构建完整对象
  • 模板方法模式: addSource的处理流程

关键技术点

  • 类型擦除处理: 通过 TypeInformation 解决Java泛型擦除问题
  • 序列化机制: 根据类型信息创建对应的序列化器
  • 并行度控制: 非并行源强制设置并行度为1
  • 有界性标识: 为后续执行计划生成提供信息

下节预告

Flink Stream API 源码走读 map和 flatmap


注意: 基于 Flink 1.18 版本,SourceFunction 已被标记为过时,实际项目中建议使用新的 Source API。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93240.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93240.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

待办事项小程序开发

1. 项目规划功能需求&#xff1a;添加待办事项标记完成/未完成删除待办事项分类或标签管理&#xff08;可选&#xff09;数据持久化&#xff08;本地存储&#xff09;2. 实现功能添加待办事项&#xff1a;监听输入框和按钮事件&#xff0c;将输入内容添加到列表。 标记完成/未完…

【C#】Region、Exclude的用法

在 C# 中&#xff0c;Region 和 Exclude 是与图形编程相关的概念&#xff0c;通常在使用 System.Drawing 命名空间进行 GDI 绘图时出现。它们主要用于定义和操作二维空间中的区域&#xff08;几何区域&#xff09;&#xff0c;常用于窗体裁剪、控件重绘、图形绘制优化等场景。 …

机器学习 - Kaggle项目实践(3)Digit Recognizer 手写数字识别

Digit Recognizer | Kaggle 题面 Digit Recognizer-CNN | Kaggle 下面代码的kaggle版本 使用CNN进行手写数字识别 学习到了网络搭建手法学习率退火数据增广 提高训练效果。 使用混淆矩阵 以及对分类出错概率最大的例子单独拎出来分析。 最终以99.546%正确率 排在 86/1035 …

新手如何高效运营亚马逊跨境电商:从传统SP广告到DeepBI智能策略

"为什么我的广告点击量很高但订单转化率却很低&#xff1f;""如何避免新品期广告预算被大词消耗殆尽&#xff1f;""为什么手动调整关键词和出价总是慢市场半拍&#xff1f;""竞品ASIN投放到底该怎么做才有效&#xff1f;""有没有…

【论文阅读 | CVPR 2024 | UniRGB-IR:通过适配器调优实现可见光-红外语义任务的统一框架】

论文阅读 | CVPR 2024 | UniRGB-IR&#xff1a;通过适配器调优实现可见光-红外语义任务的统一框架​1&&2. 摘要&&引言3.方法3.1 整体架构3.2 多模态特征池3.3 补充特征注入器3.4 适配器调优范式4 实验4.1 RGB-IR 目标检测4.2 RGB-IR 语义分割4.3 RGB-IR 显著目…

Hyperf 百度翻译接口实现方案

保留 HTML/XML 标签结构&#xff0c;仅翻译文本内容&#xff0c;避免破坏富文本格式。采用「HTML 解析 → 文本提取 → 批量翻译 → 回填」的流程。百度翻译集成方案&#xff1a;富文本内容翻译系统 HTML 解析 百度翻译 API 集成 文件结构 app/ ├── Controller/ │ └──…

字节跳动 VeOmni 框架开源:统一多模态训练效率飞跃!

资料来源&#xff1a;火山引擎-开发者社区 多模态时代的训练痛点&#xff0c;终于有了“特效药” 当大模型从单一语言向文本 图像 视频的多模态进化时&#xff0c;算法工程师们的训练流程却陷入了 “碎片化困境”&#xff1a; 当业务要同时迭代 DiT、LLM 与 VLM时&#xff0…

配置docker pull走http代理

之前写了一篇自建Docker镜像加速器服务的博客&#xff0c;需要用到境外服务器作为代理&#xff0c;但是一般可能没有境外服务器&#xff0c;只有http代理&#xff0c;所以如果本地使用想走代理可以用以下方式 临时生效&#xff08;只对当前终端有效&#xff09; 设置环境变量…

OpenAI 开源模型 gpt-oss 本地部署详细教程

OpenAI 最近发布了其首个开源的开放权重模型gpt-oss&#xff0c;这在AI圈引起了巨大的轰动。对于广大开发者和AI爱好者来说&#xff0c;这意味着我们终于可以在自己的机器上&#xff0c;完全本地化地运行和探索这款强大的模型了。 本教程将一步一步指导你如何在Windows和Linux…

力扣-5.最长回文子串

题目链接 5.最长回文子串 class Solution {public String longestPalindrome(String s) {boolean[][] dp new boolean[s.length()][s.length()];int maxLen 0;String str s.substring(0, 1);for (int i 0; i < s.length(); i) {dp[i][i] true;}for (int len 2; len …

Apache Ignite超时管理核心组件解析

这是一个非常关键且设计精巧的 定时任务与超时管理组件 —— GridTimeoutProcessor&#xff0c;它是 Apache Ignite 内核中负责 统一调度和处理所有异步超时事件的核心模块。&#x1f3af; 一、核心职责统一管理所有需要“在某个时间点触发”的任务或超时逻辑。它相当于 Ignite…

DAY 42 Grad-CAM与Hook函数

知识点回顾回调函数lambda函数hook函数的模块钩子和张量钩子Grad-CAM的示例# 定义一个存储梯度的列表 conv_gradients []# 定义反向钩子函数 def backward_hook(module, grad_input, grad_output):# 模块&#xff1a;当前应用钩子的模块# grad_input&#xff1a;模块输入的梯度…

基于 NVIDIA 生态的 Dynamo 风格分布式 LLM 推理架构

网罗开发&#xff08;小红书、快手、视频号同名&#xff09;大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等方…

《吃透 C++ 类和对象(中):拷贝构造函数与赋值运算符重载深度解析》

&#x1f525;个人主页&#xff1a;草莓熊Lotso &#x1f3ac;作者简介&#xff1a;C研发方向学习者 &#x1f4d6;个人专栏&#xff1a; 《C语言》 《数据结构与算法》《C语言刷题集》《Leetcode刷题指南》 ⭐️人生格言&#xff1a;生活是默默的坚持&#xff0c;毅力是永久的…

Python 环境隔离实战:venv、virtualenv 与 conda 的差异与最佳实践

那天把项目部署到测试环境&#xff0c;结果依赖冲突把服务拉崩了——本地能跑&#xff0c;线上不能跑。折腾半天才发现&#xff1a;我和同事用的不是同一套 site-packages&#xff0c;版本差异导致运行时异常。那一刻我彻底明白&#xff1a;虚拟环境不是可选项&#xff0c;它是…

[ 数据结构 ] 时间和空间复杂度

1.算法效率算法效率分析分为两种 : ①时间效率, ②空间效率 时间效率即为 时间复杂度 , 时间复杂度主要衡量一个算法的运行速度空间效率即为 空间复杂度 , 空间复杂度主要衡量一个算法所需要的额外空间2.时间复杂度2.1 时间复杂度的概念定义 : 再计算机科学中 , 算法的时间复杂…

一,设计模式-单例模式

目的设计单例模式的目的是为了解决两个问题&#xff1a;保证一个类只有一个实例这种需求是需要控制某些资源的共享权限&#xff0c;比如文件资源、数据库资源。为该实例提供一个全局访问节点相较于通过全局变量保存重要的共享对象&#xff0c;通过一个封装的类对象&#xff0c;…

AIStarter修复macOS 15兼容问题:跨平台AI项目管理新体验

AIStarter是全网唯一支持Windows、Mac和Linux的AI管理平台&#xff0c;为开发者提供便捷的AI项目管理体验。近期&#xff0c;熊哥在视频中分享了针对macOS 15系统无法打开AIStarter的修复方案&#xff0c;最新版已完美兼容。本文基于视频内容&#xff0c;详解修复细节与使用技巧…

LabVIEW 纺织检测数据传递

基于 LabVIEW 实现纺织检测系统中上位机&#xff08;PC 机&#xff09;与下位机&#xff08;单片机&#xff09;的串口数据传递&#xff0c;成功应用于煮茧机温度测量系统。通过采用特定硬件架构与软件设计&#xff0c;实现了温度数据的高效采集、传输与分析&#xff0c;操作简…

ECCV-2018《Variational Wasserstein Clustering》

核心思想 该论文提出了一个基于最优传输(optimal transportation) 理论的新型聚类方法&#xff0c;称为变分Wasserstein聚类(Variational Wasserstein Clustering, VWC)。其核心思想有三点&#xff1a;建立最优传输与k-means聚类的联系&#xff1a;作者指出k-means聚类问题本质…