Spark 运行流程核心组件(一)作业提交

1、Job启动流程

在这里插入图片描述

1、Client触发 SparkContext 初始化

2、SparkContextMaster 注册应用

3、Master 调度 Worker 启动 Executor

4、Worker 进程启动 Executor

5、DAGScheduler 将作业分解为 Stage

6、TaskScheduler 分配 TaskExecutor

2、核心组件

组件职责
SparkContext应用入口,协调各组件,管理应用生命周期。
DAGScheduler将 Job 拆分为 Stage,构建 DAG,提交 TaskSet 给 TaskScheduler。
TaskScheduler调度 Task 到 Executor,处理故障重试。
CoarseGrainedSchedulerBackend与集群管理器交互,申请资源,管理 Executor。
ExternalClusterManager抽象层,适配不同集群(Standalone/YARN/Mesos)。
Master & WorkerStandalone 模式下管理集群资源(Master 分配资源,Worker 启动 Executor)。
Executor在 Worker 上运行,执行 Task,管理内存/磁盘。
CoarseGrainedExecutorBackendExecutor 的通信代理,接收 Task,返回状态/结果。
Task计算单元(ShuffleMapTask / ResultTask)。
ShuffleManager管理 Shuffle 数据读写(如 SortShuffleManager)。

3、工作流程

1、SparkContext

负责资源申请、任务提交、与集群管理器通信。

调用runJob方法,将RDD操作传递给DAGScheduler

2、DAGScheduler

将Job拆分为Stage(DAG),处理Shuffle依赖,提交TaskSet给TaskScheduler。

1、DAGSchedulerEvent

/* 作业生命周期事件 */
JobSubmitted //新作业提交时触发
JobCancelled //单个作业被取消
JobGroupCancelled //作业组整体取消
JobTagCancelled //按标签批量取消作业
AllJobsCancelled //取消所有运行中的作业/* 阶段执行事件 */
MapStageSubmitted //Shuffle Map阶段提交
StageCancelled //单个阶段取消
StageFailed //阶段执行失败
ResubmitFailedStages //自动重试失败阶段 ,默认4次/* 任务调度事件 */
TaskSetFailed //整个任务集失败,默认4次
SpeculativeTaskSubmitted //启动推测执行任务
UnschedulableTaskSetAdded //任务集进入待调度队列
UnschedulableTaskSetRemoved //任务集离开待调度队列/* Shuffle 优化事件 */
RegisterMergeStatuses //注册Shuffle合并状态
ShuffleMergeFinalized //Shuffle合并完成
ShufflePushCompleted //Shuffle数据推送完成/* 资源管理事件 */
ExecutorAdded //新Executor注册成功
ExecutorLost //Executor异常丢失
WorkerRemoved //工作节点移除/* 执行过程事件 */
BeginEvent //任务集开始执行 
GettingResultEvent //驱动程序主动获取任务结果
CompletionEvent //作业/阶段完成

2、stage拆分流程

*ResultStage (执行作的最后一个阶段)、ShuffleMapStage (shuffle映射输出文件)*

  1. 用户行动操作触发submitJob,发送JobSubmitted事件。
  2. handleJobSubmitted处理事件,调用createResultStage创建ResultStage。
  3. createResultStage调用getOrCreateParentStages获取父Stage,父Stage的创建会递归进行。
  4. 在创建父Stage的过程中,遇到宽依赖则创建ShuffleMapStage,并递归创建其父Stage。
  5. 当所有父Stage都创建完成后,回到handleJobSubmitted,调用submitStage提交ResultStage。
  6. submitStage检查父Stage是否完成,如果有未完成的父Stage,则递归提交父Stage;否则,提交当前Stage(调用submitMissingTasks)。
  7. submitMissingTasks为Stage创建任务(ShuffleMapTask或ResultTask),并提交给TaskScheduler执行。

3、宽窄依赖切分

private def stageDependsOn(stage: Stage, target: Stage): Boolean = {if (stage == target) {return true}// DFS遍历RDD依赖树val visitedRdds = new HashSet[RDD[_]]// We are manually maintaining a stack here to prevent StackOverflowError// caused by recursively visitingval waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += stage.rdddef visit(rdd: RDD[_]): Unit = {if (!visitedRdds(rdd)) {visitedRdds += rddfor (dep <- rdd.dependencies) {dep match {// 宽依赖:创建新的ShuffleMapStagecase shufDep: ShuffleDependency[_, _, _] =>val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {waitingForVisit.prepend(mapStage.rdd)}  // Otherwise there's no need to follow the dependency back// 窄依赖:继续回溯case narrowDep: NarrowDependency[_] =>waitingForVisit.prepend(narrowDep.rdd)}}}}while (waitingForVisit.nonEmpty) {visit(waitingForVisit.remove(0))}visitedRdds.contains(target.rdd)}

3、TaskScheduler

接收TaskSet,按调度策略(FIFO/FAIR)将Task分配给Executor。

1、执行流程

1、DAGScheduler 调用 taskScheduler.submitTasks() 后,任务进入 TaskScheduler 调度阶段

2、任务提交submitTasks

// TaskSetManager管理任务集
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 添加到调度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 触发资源分配
backend.reviveOffers()

3、资源分配 (Driver)

// CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {driverEndpoint.send(ReviveOffers)  // 向DriverEndpoint发送消息
}// DriverEndpoint处理
case ReviveOffers =>makeOffers()  // 触发资源分配

4、资源分配核心

private def makeOffers(): Unit = {// Make sure no executor is killed while some task is launching on itval taskDescs = withLock {// 1. 获取所有可用Executor资源val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) }val workOffers = activeExecutors.map {case (id, executorData) => buildWorkerOffer(id, executorData)}.toIndexedSeq// 2. 调用任务调度器分配任务scheduler.resourceOffers(workOffers, true)}// 3. 启动分配的任务if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}

5、任务启动

// CoarseGrainedSchedulerBackend
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {// 1. 序列化任务val serializedTask = TaskDescription.encode(task)// 2. 检查任务大小if (serializedTask.limit() >= maxRpcMessageSize) {Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)// Do resources allocation here. The allocated resources will get released after the task// finishes.executorData.freeCores -= task.cpustask.resources.foreach { case (rName, addressAmounts) =>executorData.resourcesInfo(rName).acquire(addressAmounts)}logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +s"${executorData.executorHost}.")// 发送任务到ExecutorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/93196.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/93196.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL 临时表与复制表

一、MySQL 临时表临时表是会话级别的临时数据载体&#xff0c;其设计初衷是为了满足短期数据处理需求&#xff0c;以下从技术细节展开说明。&#xff08;一&#xff09;核心特性拓展1.生命周期与会话绑定会话结束的判定&#xff1a;包括正常断开连接&#xff08;exit/quit&…

从配置到调试:WinCC与S7-1200/200SMT无线Modbus TCP通讯方案

测试设备与参数l 西门子PLC型号&#xff1a;S7-1200 1台l 西门子PLC型号&#xff1a;S7-200Smart 1台l 上位机&#xff1a;WinCC7.4 1台l 无线通讯终端——DTD418MB 3块l 主从关系&#xff1a;1主2从l 通讯接口&#xff1a;RJ45接口l 供电&#xff1a;12-24VDCl 通讯协议&a…

Android沉浸式全屏显示与隐藏导航栏的实现

1. 总体流程以下是实现沉浸式全屏显示和隐藏导航栏的流程&#xff1a;步骤描述步骤1创建一个新的Android项目步骤2在布局文件中定义需要展示的界面步骤3在Activity中设置沉浸式全屏显示步骤4处理系统UI的显示与隐藏步骤5运行应用并测试效果2. 详细步骤步骤1&#xff1a;创建一个…

EN 62368消费电子、信息技术设备和办公设备安全要求标准

EN 62368认证标准是一项全球性的电子产品安全标准&#xff0c;用于评估和认证消费电子、信息技术设备和办公设备的安全性。该标准由国际电工委员会(IEC)制定&#xff0c;取代了传统的EN60065和EN 60950两个标准&#xff0c;成为国际电子产品安全领域的新指导。IEC /EN 62368-1是…

【unity实战】使用Splines+DOTween制作弯曲手牌和抽牌动画效果

最终效果 文章目录最终效果前言实战1、Splines的使用2、绘制样条线3、DOTween安装和使用4、基于样条曲线&#xff08;Spline&#xff09;的手牌管理系统4.1 代码实现4.2 解释&#xff1a;&#xff08;1&#xff09;计算第一张卡牌的位置&#xff08;居中排列&#xff09;&#…

Flask模板注入梳理

从模板开始介绍&#xff1a;Flask中有许多不同功能的模板&#xff0c;他们之间是相互隔离的地带&#xff0c;可供引入和使用。Flask中的模块&#xff1a;flask 主模块&#xff1a;包含框架的核心类和函数&#xff0c;如 Flask&#xff08;应用实例&#xff09;、request&#x…

企业级的即时通讯平台怎么保护敏感行业通讯安全?

聊天记录存在第三方服务器、敏感文件被误发至外部群组、离职员工仍能查看历史消息.对于金融、医疗、政务等对数据安全高度敏感的行业而言&#xff0c;“沟通效率与”信息安全”的矛盾&#xff0c;从未像今天这样尖锐。企业即时通讯怎么保护敏感行业通讯安全&#xff1f;这个问题…

Java Spring框架最新版本及发展史详解(截至2025年8月)-优雅草卓伊凡

Java Spring框架最新版本及发展史详解&#xff08;截至2025年8月&#xff09;-优雅草卓伊凡引言今天有个新项目 客户问我为什么不用spring 4版本&#xff0c;卓伊凡我今天刚做完项目方案&#xff0c;我被客户这一句问了有点愣住&#xff0c;Java Spring框架最新版本及发展史详解…

Android实现Glide/Coil样式图/视频加载框架,Kotlin

Android实现Glide/Coil样式图/视频加载框架&#xff0c;Kotlin <uses-permission android:name"android.permission.WRITE_EXTERNAL_STORAGE" /><uses-permission android:name"android.permission.READ_EXTERNAL_STORAGE" /><uses-permiss…

【k8s】pvc 配置的两种方式volumeClaimTemplates 和 PersistentVolumeClaim

pvc配置实例 实例1在Deployment中配置 template:xxxxxxvolumeClaimTemplates:- metadata:name: dataspec:accessModes:- ReadWriteOnceresources:requests:storage: 1GistorageClassName: nfsdev-storageclass (创建好的storageClassName)实例2#先创建一个pvc 然后在 Deploym…

Logistic Loss Function|逻辑回归代价函数

----------------------------------------------------------------------------------------------- 这是我在我的网站中截取的文章&#xff0c;有更多的文章欢迎来访问我自己的博客网站rn.berlinlian.cn&#xff0c;这里还有很多有关计算机的知识&#xff0c;欢迎进行留言或…

计算机网络技术-知识篇(Day.1)

一、网络概述 1、网络的概念 两个不在同一地理位置的主机&#xff0c;通过传输介质和通信协议&#xff0c;实现通信和资源共享。 2、网络发展史 第一阶段&#xff08;20世纪60年代&#xff09; 标志性事件&#xff1a;ARPANET的诞生关键技术&#xff1a;分组交换技术 第二…

工业元宇宙:迈向星辰大海的“玄奘之路”

一、从认知革命到工业革命&#xff1a;文明跃迁的底层逻辑1.1 认知革命&#xff1a;人类协作的基石时间线&#xff1a;约7万年前&#xff0c;智人通过语言和想象力构建共同虚拟现实&#xff0c;形成部落协作模式。核心突破&#xff1a;虚构能力&#xff1a;创造神、国家、法律等…

9. React组件生命周期

2. React组件生命周期 2.1. 认识生命周期 2.1.1. 很多事物都有从创建到销毁的整个过程&#xff0c;这个过程称之为生命周期&#xff1b;2.1.2. React组件也有自己的生命周期&#xff0c;了解生命周期可以让我们在最合适的地方完成想要的功能2.1.3. 生命周期和生命周期函数的关系…

【单板硬件开发】关于复位电路的理解

阅读紫光同创供应商提供的FPGA单板硬件开发手册&#xff0c;发现复位电路他们家解释的很通俗易懂&#xff0c;所以分享一下。如下图&#xff0c;RST_N 是低有效的异步全芯片复位信号&#xff0c;一般外部连接电路有 3 种形式如图 3–2&#xff0c;可根据实际需要选择合适的电路…

《Unity Shader入门精要》学习笔记一

1、本书的源代码 https://github.com/candycat1992/Unity_Shaders_Book 2、第1章 Shader是面向GPU的工作方式 3、第2章 渲染流水线 Shader&#xff1a;着色器 渲染流水线&#xff1a;目标是渲染一张二维纹理&#xff0c;输入是一个虚拟摄像机、一些光源、一些Shader以及纹…

从零到一:TCP 回声服务器与客户端的完整实现与原理详解

目录 一、TCP 通信的核心逻辑 二、TCP 服务器编程步骤 步骤 1&#xff1a;创建监听 Socket 步骤 2&#xff1a;绑定地址与端口&#xff08;bind&#xff09; 步骤 3&#xff1a;设置监听状态&#xff08;listen&#xff09; 步骤 4&#xff1a;接收客户端连接&#xff08…

MyBatis-Plus核心内容

MyBatis-Plus MyBatis-Plus 是一个基于 MyBatis的增强工具&#xff0c;旨在简化开发过程&#xff0c;减少重复代码。它在MyBatis的基础上增加了CRUD操作封装&#xff0c;条件构造器、代码生成器等功能。 一、核心特性与优势 1. 核心特性 无侵入&#xff1a;只做增强不做改变&am…

计算机网络摘星题库800题笔记 第4章 网络层

第4章 网络层4.1 网络层概述题组闯关1.在 Windows 的网络配置中&#xff0c;“默认网关” 一般被设置为 ( ) 的地址。 A. DNS 服务器 B. Web 服务器 C. 路由器 D. 交换机1.【参考答案】C 【解析】只有在计算机上正确安装网卡驱动程序和网络协议&#xff0c;并正确设置 IP 地址信…

非root用户在linux中配置zsh(已解决ncurses-devel报错)

Zsh&#xff08;Z Shell&#xff09;是一款功能强大的交互式 Unix shell&#xff0c;以其高度可定制性和丰富的功能著称&#xff0c;被视为 Bash 的增强替代品。它支持智能补全、主题美化、插件扩展&#xff08;如 Oh My Zsh 框架&#xff09;、自动纠错、全局别名等特性&#…