深入解析Flink Local模式启动流程源码:揭开作业初始化的神秘面纱

在Flink的数据处理体系中,Local模式凭借无需依赖分布式集群资源的特性,成为开发测试阶段快速验证作业逻辑的利器。其启动流程的源码里,藏着从作业提交到任务执行的完整脉络。接下来,我们将深入关键代码段,逐行剖析Flink Local模式启动的底层逻辑。

一、Local模式启动流程概述

Flink Local模式允许作业在本地环境运行,整个启动流程涵盖作业提交、环境初始化、任务调度等环节。ExecutionEnvironmentJobClientLocalExecutor等核心组件紧密协作,使得作业能从代码转化为实际运行的任务,下面我们从源码角度展开详细分析。

二、核心启动流程源码深度解析

2.1 作业提交与环境初始化触发

用户通过ExecutionEnvironment.execute()提交作业时,启动流程正式开启。以StreamExecutionEnvironment为例,来看关键源码:

public JobExecutionResult execute(String jobName) throws Exception {setJobName(jobName);// 根据用户编写的DataStream API代码生成StreamGraph,定义作业数据处理逻辑final StreamGraph streamGraph = getStreamGraph(); return execute(streamGraph);
}public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {// 创建执行器,不同模式下执行器不同,Local模式对应特殊实现final Executor executor = createExecutor(); return executor.execute(streamGraph);
}

getStreamGraph()方法中,会遍历用户定义的转换操作(Transformation),构建StreamGraph。如在构建Source节点时:

// SourceTransformationTranslator中translateInternal方法片段
public Collection<Integer> translateInternal(final SourceTransformation<OUT, SplitT, EnumChkT> transformation,final Context context,boolean emitProgressiveWatermarks) {// 获取关键信息final StreamGraph streamGraph = context.getStreamGraph();final String slotSharingGroup = context.getSlotSharingGroup();final int transformationId = transformation.getId();final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();// 创建SourceOperatorFactory,用于实例化Source算子SourceOperatorFactory<OUT> operatorFactory =new SourceOperatorFactory<>(transformation.getSource(),transformation.getWatermarkStrategy(),emitProgressiveWatermarks);// 将Source节点添加到StreamGraphstreamGraph.addSource( transformationId,slotSharingGroup,transformation.getCoLocationGroupKey(),operatorFactory,null,transformation.getOutputType(),"Source: " + transformation.getName());// 设置并行度等参数final int parallelism =transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT?transformation.getParallelism() : executionConfig.getParallelism();streamGraph.setParallelism(transformationId, parallelism);streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());return Collections.singleton(transformationId);
}

上述代码展示了Source节点如何被添加到StreamGraph,为后续任务执行奠定基础。

2.2 JobClient创建与任务提交实现

环境初始化完成后,进入JobClient创建阶段。JobClientJobClientFactory创建,在Local模式下为LocalJobClient

// JobClientFactory的createJobClient方法
public static JobClient createJobClient(JobGraph jobGraph,Configuration configuration
) throws IOException {final String jobManagerAddress;final int jobManagerPort;// Local模式下特殊处理if (configuration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) { jobManagerAddress = "localhost";jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_RPC_PORT, -1);} else {// 非Local模式逻辑jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_RPC_ADDRESS_KEY);jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_RPC_PORT, -1);}final RpcService rpcService = createRpcService(configuration);final RpcGatewayTarget jobManagerGatewayTarget =new RpcGatewayTarget(jobManagerAddress, jobManagerPort, "jobmanager");// 根据不同模式创建对应的JobClientif (LocalExecutor.isLocalExecution(configuration)) { return new LocalJobClient(jobGraph, rpcService, jobManagerGatewayTarget);} else {return new StandaloneJobClient(jobGraph, rpcService, jobManagerGatewayTarget);}
}

LocalJobClient创建后,会将JobGraph提交给LocalExecutor,提交前会对JobGraph进行校验,确保任务定义无误:

// LocalJobClient的submitJob方法
public CompletableFuture<JobExecutionResult> submitJob() {final CompletableFuture<JobExecutionResult> jobResultFuture = new CompletableFuture<>();try {// 校验JobGraphPreconditions.checkState(jobGraph != null, "JobGraph must not be null."); // 将JobGraph提交给LocalExecutorfinal CompletableFuture<JobExecutionResult> executionFuture =localExecutor.execute(jobGraph, jobResultFuture::complete, jobResultFuture::completeExceptionally); return executionFuture;} catch (Exception e) {jobResultFuture.completeExceptionally(e);return jobResultFuture;}
}

2.3 LocalExecutor任务执行逻辑

LocalExecutor是Local模式任务执行核心。它先根据JobGraph创建TaskExecutorRunner

// LocalExecutor的execute方法片段
public CompletableFuture<JobExecutionResult> execute(JobGraph jobGraph,Consumer<JobExecutionResult> resultConsumer,Consumer<Throwable> failureConsumer
) {final CompletableFuture<JobExecutionResult> jobResultFuture = new CompletableFuture<>();try {// 解析JobGraph获取任务相关配置final TaskExecutorConfiguration taskExecutorConfiguration =TaskExecutorConfiguration.fromConfigurationAndJobGraph(configuration, jobGraph); // 创建TaskExecutorFactoryfinal TaskExecutorFactory taskExecutorFactory = new TaskExecutorFactory(taskExecutorConfiguration); // 创建TaskExecutorRunnerfinal TaskExecutorRunner taskExecutorRunner =new TaskExecutorRunner(taskExecutorConfiguration, taskExecutorFactory); // 启动TaskExecutorRunnertaskExecutorRunner.start(); // 提交任务到TaskExecutorRunner执行final CompletableFuture<JobExecutionResult> executionFuture =taskExecutorRunner.submitJob(jobGraph, resultConsumer, failureConsumer); return executionFuture;} catch (Exception e) {jobResultFuture.completeExceptionally(e);return jobResultFuture;}
}

TaskExecutorRunner启动时,会为任务分配资源、初始化执行环境。以内存分配为例:

// TaskExecutorRunner的start方法片段
public void start() throws Exception {// 分配内存资源,根据任务配置计算所需内存final MemorySize taskHeapMemory = taskExecutorConfiguration.getTaskHeapMemorySize(); final MemorySize taskOffHeapMemory = taskExecutorConfiguration.getTaskOffHeapMemorySize(); // 初始化内存相关环境memoryManager = MemoryManagerFactory.createMemoryManager(taskHeapMemory,taskOffHeapMemory,configuration); // 其他资源初始化及环境准备操作//...
}

2.4 任务初始化与运行机制

任务线程中,会加载StreamOperator并初始化。以MapOperator为例:

// MapOperator的初始化方法片段
public void open() throws Exception {super.open();// 初始化用户定义的MapFunctionuserFunction = userFunctionSerializer.deserialize(new DeserializationContext.GetInitialContext()); // 初始化输入输出相关资源inputSerializer.open();outputSerializer.open();
}// MapOperator的处理数据方法
public void processElement(StreamRecord<IN> element) throws Exception {// 从输入获取数据IN input = element.getValue();// 执行用户定义的映射函数OUT output = userFunction.map(input); // 将处理后的数据发送到输出outputCollector.collect(new StreamRecord<>(output, element.getTimestamp())); 
}

数据在各个任务间通过StreamEdge流转,完成整个作业处理。

三、关键组件与技术细节源码剖析

3.1 资源管理与分配策略

LocalExecutor依据JobGraph为任务分配资源。在分配线程资源时:

// LocalExecutor中任务线程创建相关逻辑
private void createTaskThreads(JobGraph jobGraph) {for (JobVertex jobVertex : jobGraph.getVertices()) {final int parallelism = jobVertex.getParallelism();for (int i = 0; i < parallelism; i++) {// 创建任务线程final TaskThread taskThread = new TaskThread(jobVertex, i); taskThreads.add(taskThread);// 启动任务线程taskThread.start(); }}
}

同时,通过线程池管理避免资源耗尽:

// 线程池相关定义与使用
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>()
);// 提交任务到线程池执行
public void submitTask(Runnable task) {threadPoolExecutor.submit(task);
}

3.2 任务调度与协调机制

LocalExecutorJobGraph任务依赖调度。调度逻辑如下:

// LocalExecutor的任务调度方法
private void scheduleTasks(JobGraph jobGraph) {final Set<JobVertex> readyVertices = new HashSet<>();for (JobVertex jobVertex : jobGraph.getVertices()) {// 检查任务是否没有上游依赖if (jobVertex.getInEdges().isEmpty()) { readyVertices.add(jobVertex);}}while (!readyVertices.isEmpty()) {final JobVertex jobVertex = readyVertices.iterator().next();readyVertices.remove(jobVertex);// 启动任务startTask(jobVertex); for (JobEdge jobEdge : jobVertex.getOutEdges()) {final JobVertex targetVertex = jobEdge.getTarget();boolean allPredecessorsCompleted = true;for (JobEdge inEdge : targetVertex.getInEdges()) {if (!inEdge.getSource().isFinished()) {allPredecessorsCompleted = false;break;}}if (allPredecessorsCompleted) {readyVertices.add(targetVertex);}}}
}

3.3 错误处理与恢复机制

任务提交阶段,JobGraph校验失败时:

// JobGraph校验方法片段
public void validate() throws InvalidJobException {for (JobVertex jobVertex : getVertices()) {if (jobVertex.getParallelism() <= 0) {throw new InvalidJobException("Job vertex " + jobVertex + " has invalid parallelism.");}// 其他校验逻辑//...}
}

任务执行中出现异常,TaskExecutorRunner处理如下:

// TaskExecutorRunner的任务执行异常处理
public void run() {try {// 任务执行逻辑//...} catch (Exception e) {// 上报异常exceptionHandler.handleException(e); // 根据异常类型处理,可恢复则尝试重启任务if (isRecoverableException(e)) { restartTask();} else {// 不可恢复则终止作业terminateJob(e); }}
}

四、Local模式启动流程的实践意义与优化方向

深入研究Flink Local模式启动流程源码,开发者在开发测试时,可通过断点调试StreamExecutionEnvironment初始化、LocalExecutor任务调度等关键代码,快速定位问题。优化资源分配策略时,参考LocalExecutor中资源分配源码,根据任务实际需求动态调整分配逻辑。同时,对比分布式模式启动流程源码,借鉴其动态资源调度思路,可进一步完善Local模式性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/86161.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/86161.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二刷 苍穹外卖 day06

HttpClient 用来提供高效的、最新的、功能丰富的支持HTTP协议的客户端编程工具包 作用&#xff1a; 发送HTTP请求 接受响应数据 应用场景&#xff1a; 当我们在使用扫描支付、查看地图、获取验证码、查看天气等功能时 其实&#xff0c;应用程序本身并未实现这些功能&#xff…

React第六十三节Router中BrowserRouter的用途及注意事项

前言 BrowserRouter 是 React Router 库的核心组件&#xff0c;用于实现单页面应用&#xff08;SPA&#xff09;的客户端路由。它利用 HTML5 History API 管理 URL&#xff0c;实现页面无刷新跳转。下面详细解释其用途、使用方法和代码示例&#xff1a; 一、BrowserRouter 核…

《Self-Adapting Language Models》(SEAL)代码阅读笔记

代码&#xff1a;https://github.com/Continual-Intelligence 脚本命令用法&#xff1a;knowledge-incorporation/README.md 生成self-edit数据 脚本&#xff1a;sbatch knowledge-incorporation/scripts/make_squad_data.sh vllm serve启动Qwen2.5-7B模型的服务。 执行self-e…

GelSight Mini视触觉传感器开发资源升级:触觉3D点云+ROS2助力机器人科研与医疗等应用

近日&#xff0c;GelSight宣布对其GelSight Mini视触觉传感器的GitHub支持页面进行重大更新&#xff0c;围绕3D点云重建、ROS2 集成及开发者支持体系推出三大核心升级&#xff0c;助力机器人触觉感知、工业检测及科研场景落地。 GelSight Mini视触觉传感器重磅发布&#xff01;…

6、做中学 | 三年级下期 Golang值类型相互转换

本次为操作文章&#xff0c;大部分都在讨论类型之间如何转换&#xff0c;使用的是内置方法进行调用执行&#xff0c;详细使用请移步至&#xff1a; go的API使用文档地址 https://studygolang.com/pkgdoc 一、数值类型相互转换 go中数值转换需要显示转换&#xff0c;不能隐式自…

019 高校心理教育辅导系统技术解析:构建心理健康守护平台

高校心理教育辅导系统技术解析&#xff1a;构建心理健康守护平台 在关注大学生心理健康成为教育重点的当下&#xff0c;高校心理教育辅导系统借助数字化技术整合多种功能模块&#xff0c;面向管理员、学生、教师三类角色&#xff0c;实现心理教育辅导工作的高效化与精准化。本…

【ArcGIS】土地资源单项评价

【ArcGIS】土地资源单项评价 一、土地资源单项评价1、评价思路 二、操作步骤1、处理环境设置2、地形坡度评价3、高程评价4、坡度高程叠加评价5、地形起伏度6、土地资源综合评价 一、土地资源单项评价 1、评价思路 &#xff08;1&#xff09;利用全域DEM计算地形坡度&#xff…

Prioritized Generative Replay

ICLR 2025 Oral code 具有样本效率的 online reinforcement learning (RL) 通常使用 replay buffer 存储经验&#xff0c;以便在更新价值函数时重复使用。然而&#xff0c;uniform replay 效率低下&#xff0c;因为某些类型的 transition 可能与学习更相关。 虽然对更有用的样本…

Linux -- 线程、锁

1、 Linux线程概念 1.1、什么是线程 在一个程序里的一个执行路线就叫做线程&#xff08;thread&#xff09;更准确的定义是&#xff1a;线程是“一个进程内部的控制序列”一切进程至少都有一个执行线程线程在进程内部运行&#xff0c;本质是在进程地址空间内运行在Linux系统中…

海外服务器的定义和作用都有哪些?

海外服务器可以说是一个统称&#xff0c;其中包含了全球各地除了中国大陆以外其他国家的服务器&#xff0c;在如今的数字化时代中&#xff0c;海外服务器的应用已经成为跨国企业业务拓展、科研与学术交流等多个领域中不可或缺的一部分&#xff0c;能够为各个行业提供更加稳定且…

数据结构之优先级队列

系列文章目录 数据结构之ArrayList_arraylist o(1) o(n)-CSDN博客 数据结构之LinkedList-CSDN博客 数据结构之栈-CSDN博客 数据结构之队列-CSDN博客 数据结构之二叉树-CSDN博客 目录 系列文章目录 前言 一、优先级队列和堆 二、堆的模拟实现 1. 堆的创建 2. 计算建堆…

【版本控制教程】如何使用Unreal Engine 5 + UE源代码控制(Perforce P4)

本文来源perforce.com&#xff0c;由Perforce中国授权合作伙伴——龙智翻译整理&#xff0c;旨在为国内用户提供一份实用、易懂的Unreal Engine 5Perforce P4的中文使用指南。希望能为UE开发者、设计师和美术小伙伴们的版本控制实践提供有力支持~ Unreal Engine 5 是一款尖端的…

opensingleComDialog方法解析优化

下面是对 opensingleComDialog 方法的详细解析&#xff0c;并给出优化建议和优化后的代码。 方法解析 作用 opensingleComDialog(index) 方法用于在输入框失去焦点时&#xff08;blur 事件&#xff09;自动根据输入内容进行唯一性查询&#xff0c;如果查到唯一结果则自动填充…

css 实现1个像素在不同分辨率屏幕上画网格线

实现网格线绘制&#xff0c;要考虑画布style尺寸和画布像素大小的缩放关系 单像素绘制主要出现的问题是会模糊&#xff0c;从像素角度看就是出现绘制两个像素&#xff0c;实际就是要做偏移 核心就是&#xff1a;按物理像素绘制&#xff0c;首先要对齐物理像素&#xff0c;计算…

深度图聚类DGC—Paper Notes

目录 Unsupervised Deep Embedding for Clustering Analysis (DEC 2016)Attributed Graph Clustering: A Deep Attentional Embedding Approach (DAEGC 2019)Structural Deep Clustering Network (SDCN 2020)Contrastive Multi-View Representation Learning on Graphs (MVG…

获取YARN application 应用列表的几种方法

目录 1. 使用YARN命令行工具 2. 通过REST API获取 YARN 提供了获取YARN集群上运行的应用列表,以下是几种常见方法: 1. 使用YARN命令行工具 最直接的方式是使用YARN提供的命令行工具: yarn application -list 上述命令会显示所有正在运行的应用。 如果要查看所有应用(…

前端如何下载 ‘Content-Type‘: ‘application/octet-stream‘ 的文件

前言 在前端开发中&#xff0c;经常会遇到需要从后端接口下载文件的需求。当后端返回的响应头中 Content-Type 为 application/octet-stream 时&#xff0c;表示这是一个二进制流文件&#xff0c;浏览器无法直接展示&#xff0c;需要前端处理后下载到本地。本文将详细介绍前端…

咨询顾问进阶——顾问公司战略咨询分析模板【附全文阅读】

该战略咨询分析模板围绕企业战略分析展开&#xff0c;先从总体思考战略分析的目的与方法&#xff0c;接着探讨企业及战略定义、战略地位等。外部环境分析通过 PEST、五种竞争力等模型&#xff0c;分析环境、行业、市场等情况以发现机会与威胁&#xff1b;内部环境分析从资源、核…

宝塔服务器调优工具 1.1(Opcache优化)

第一步&#xff1a;宝塔服务器调优工具 1.1&#xff08;按照下面的参数填写&#xff09; 第二步&#xff1a;路径/www/server/php/80/etc/php.ini 搜索jit jit1235 其中1235根据服务器情况修改 第三步&#xff1a;路径/www/server/php/80/etc/php-cli.ini 搜索 jit1235 其中…

React Native【详解】动画

基础动画的实现流程 使用支持动画的组件 <Animated.Viewstyle{[{opacity: fadeAnim, // 绑定透明度动画值},]}><Text>动画元素</Text></Animated.View>Animated.View&#xff1a;用于创建动画容器&#xff0c;支持所有 View 的属性。Animated.Te…