Spark Shuffle机制原理

文章目录

  • 1.什么是Shuffle?
  • 2.Shuffle解决什么问题?
  • 3.Shuffle Write与Shuffle Read
  • 4.Shuffle的计算需求
    • 4.1 计算需求表
    • 4.2 partitionby
    • 4.3 groupByKey
    • 4.4 reduceByKey
    • 4.5 sortByKey
  • 5.Shuffle Write框架设计与实现
    • 5.1 Shuffle Write框架实现的功能
    • 5.2 Shuffle Write的多种情况
      • 5.2.1 不需要combine和sort
        • 5.2.1.1 操作流程
        • 5.2.1.2 优缺点
        • 5.2.1.3 适用性
      • 5.2.2 不需要combine,需要sort
        • 5.2.2.1 操作流程
        • 5.2.2.2 优缺点
        • 5.2.2.3 适用性
      • 5.2.3 需要combile,需要/不需要sort
        • 5.2.3.1 操作流程
        • 5.2.3.2 优缺点
        • 5.2.3.3 适用性
  • 6.Shuffle Read框架设计与实现
    • 6.1 Shuffle Read框架实现的功能
    • 6.2 Shuffle Read的不同情况
      • 6.2.1 不需要combine和sort
        • 6.2.1.1 操作流程
        • 6.2.1.2 优缺点
        • 6.2.1.3 适用性
      • 6.2.2 不需要combine,需要sort
        • 6.2.2.1 操作流程
        • 6.2.2.2 优缺点
        • 6.2.2.3 适用性
      • 6.2.3 需要combine,需要/不需要sort
        • 6.2.3.1 操作流程
        • 6.2.3.2 优缺点
        • 6.2.3.3 适用性

阅读本篇文章前,需要阅读 Spark执行计划与UI分析

1.什么是Shuffle?

运行在不同stage、不同节点上的task间如何进行数据传递。这个数据传递过程通常被称为Shuffle机制。

2.Shuffle解决什么问题?

如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。如何支持这些不同类型的计算,如何提高Shuffle的性能都是Shuffle机制设计的难点问题。

3.Shuffle Write与Shuffle Read

  • Shuffle Write:上游stage预先将输出数据进行划分,按照分区存放,分区个数与下游task个数一致,这个过程被称为"Shuffle Write"。
  • Shuffle Read:上游数据按照分区存放完成后,下游的task将属于自己分区的数据通过网络传输获取,然后将来自上游不同分区的数据聚合再一起处理,这个过程称为"Shuffle Read"。

4.Shuffle的计算需求

4.1 计算需求表

所谓计算需求,也就是Shuffle要解决具体算子的哪些计算需求:
在这里插入图片描述
这里我来分析几个例子:

4.2 partitionby

在这里插入图片描述
可以看到partitionby操作只进行了数据分区操作,并没有涉及到数据的聚合和排序操作。

4.3 groupByKey

在这里插入图片描述
可以看到groupByKey的操作既需要分区,又需要做聚合,并且在Shuffle Read阶段做的聚合。

4.4 reduceByKey

在这里插入图片描述
可以看到reduceByKey做了两步聚合,在Shuffle Write中先执行func聚合一次(由spark内部执行,不生成新的rdd),然后进行分区数据传输,最后再在每个分区聚合一次,执行相同的func函数。同时func需要满足交换律和结合律。两次聚合(多了Shuffle Write端聚合)的优点是优化Shuffle的性能,一是传输的数据量大大减少,二是降低Shuffle Read端的内存消耗。

4.5 sortByKey

在这里插入图片描述
分区后,在ShuffleRead端进行排序。sortByKey() 为了保证生成的RDD中的数据是全局有序(按照Key排序) 的, 采用Range划分来分发数据。 Range划分可以保证在生成的RDD中, partition 1中的所有record的Key小于(或大于) partition 2中所有的record的Key。
可以看到当前并没有算子需要在Shuffle Write端进行排序的,但不能保证用户实现的算子不会在Shuffle Write端进行排序,因此在spark实现Shuffle框架的时候保留了在Shuffle Write端进行排序的功能

5.Shuffle Write框架设计与实现

5.1 Shuffle Write框架实现的功能

如第四节中的图所示,每个数据操作只需要其中的一个或两个功能。Spark为了支持所有的情况,设计了一个通用的Shuffle Write框架,框架的计算顺序为“map()输出→数据聚合→排序→分区”输出。
在这里插入图片描述
map task每计算出一个record及其partitionId,就将record放入类似HashMap的数据结构中进行聚合;聚合完成后,再将HashMap中的数据放入类似Array的数据结构中进行排序,既可按照partitionId,也可以按照partitionId+Key进行排序;最后根据partitionId将数据写入不同的数据分区中,存放到本地磁盘上。partitionId=Hash(Key)% 下游分区数

5.2 Shuffle Write的多种情况

5.2.1 不需要combine和sort

在这里插入图片描述
这种Shuffle Write方式称为:BypassMergeSortShuffleWriter
这种情况最简单,只需要实现分区功能:

5.2.1.1 操作流程

map()依次输出KV record,并计算其partitionId(PID),Spark根据 partitionId,将record依次输出到不同的buffer中,每当buffer填满就将record溢写到磁盘上的分区文件中。分配buffer的原因是map()输出record的速度很快,需要进行缓冲来减少磁盘I/O。

5.2.1.2 优缺点

该模式的优点是速度快,直接将record输出到不同文件中。缺点是资源消耗过高,每个分区都需要有一个buffer(默认大小为32KB,由spark.Shuffle.file.buffer进行控制),当分区数过大时,内存消耗会很高。

5.2.1.3 适用性

适用于Shuffle Write端不需要聚合和排序且分区个数较少(小于spark.Shuffle.sort.bypassMergeThreshold,默认值为200),例如groupBy(100),partitionBy(100),sortByKey(100)。

5.2.2 不需要combine,需要sort

在这里插入图片描述

这种Shuffle模式被命名为:SortShuffleWriter(KeyOrdering=true),使用的Array被命名为PartitionedPairBuffer

5.2.2.1 操作流程
  • 这种情况需要使用partitionId+key进行排序,Spark采用的实现方法是建立一个Array:PartitionedPairBuffer,来存放map()输出的record,并将每个<K,V>record转化为<(PartitionId,K),V>record,然后按照PartitionId+Key对record进行排序,最后将所有record写入写入一个文件中,通过建立索引来标示每个分区
  • 如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上,等待map()输出完以后,再将Array中的record与磁盘上已排序的record进行全局排序,得到最终有序的record,并写入文件中。
5.2.2.2 优缺点
  • 优点是只需要一个Array结构就可以支持按照partitionId+Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,支持从小规模到大规模数据的排序。同时,输出的数据已经按照partitionId进行排序,因此只需要一个分区文件存储,即可标示不同的分区数据,克服了BypassMergeSortShuffleWriter中建立文件数过多的问题,适用于分区个数很大的情况。缺点是排序增加计算时延。
5.2.2.3 适用性
  • map()端不需要聚合(combine)、Key需要排序、分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来会提供这种类型的操作。sortByKey()操作虽然需要按Key进行排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进行排序。

最后,使用这种Shuffle如何解决BypassMergeSortShuffleWriter存在的buffer分配过多的问题?我们只需要将“按PartitionId+Key排序”改为“只按PartitionId排序”,就可以支持“不需要map()端combine、不需要按照Key进行排序,分区个数过大”的操作。例如,groupByKey(300)、partitionBy(300)、sortByKey(300)。

5.2.3 需要combile,需要/不需要sort

在这里插入图片描述
这种Shuffle模式被称为:sort-based Shuffle Write,哈希表为:PartitionedAppendOnlyMap

5.2.3.1 操作流程
  • 需要实现按Key进行聚合(combine)的功能,Spark采用的实现方法是建立一个类似HashMap的数据结构对map()输出的record进行聚合。HashMap中的Key是“partitionId+Key”,HashMap中的Value是经过相同combine的聚合结果。在图中,combine()是sum()函数,那么Value中存放的是多个record对应的Value相加的结果。
  • 聚合完成后,Spark对HashMap中的record进行排序。如果不需要按Key进行排序,如上图所示,那么只按partitionId进行排序;如果需要按Key进行排序,如图6.7的下图所示,那么按partitionId+Key进行排序。最后,将排序后的record写入一个分区文件中。其中使用的hash表既可以实现聚合功能,也可以实现排序功能。
  • 如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对map()输出的record进行聚合,如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当map()输出完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合(merge),得到最终的record,输出到分区文件中。
5.2.3.2 优缺点
  • 优缺点同5.4.2
5.2.3.3 适用性
  • 适合map()端聚合(combine)、需要或者不需要按Key进行排序、分区个数无限制的应用,如reduceByKey()、aggregateByKey()等。

6.Shuffle Read框架设计与实现

6.1 Shuffle Read框架实现的功能

在这里插入图片描述
reduce task不断从各个map task的分区文件中获取数据(Fetch records),然后使用类似HashMap的结构来对数据进行聚(aggregate),该过程是边获取数据边聚合。聚合完成后,将HashMap中的数据放入类似Array的数据结构中按照Key进行排序(sort byKey),最后将排序结果输出或者传递给下一个操作。

6.2 Shuffle Read的不同情况

6.2.1 不需要combine和sort

在这里插入图片描述

6.2.1.1 操作流程
  • 这种情况最简单,只需要实现数据获取功能即可。等待所有的map task结束后,reduce task开始不断从各个map task获取<K,V>record,并将record输出到一个buffer中(大小为spark.reducer.maxSizeInFlight=48MB),下一个操作直接从buffer中获取数据即可。
6.2.1.2 优缺点
  • 优点是逻辑和实现简单,内存消耗很小。缺点是不支持聚合、排序等复杂功能。
6.2.1.3 适用性
  • 适合既不需要聚合也不需要排序的应用,如partitionBy()等。

6.2.2 不需要combine,需要sort

在这里插入图片描述
使用的Array结构:PartitionedPairBuffer

6.2.2.1 操作流程
  • 获取数据后,将buffer中的record依次输出到一个Array结构(PartitionedPairBuffer)中。由于这里采用了本来用于Shuffle Write端的PartitionedPairBuffer结构,所以还保留了每个record的partitionId。然后,对Array中的record按照Key进行排序,并将排序结果输出或者传递给下一步操作。
  • 当内存无法存下所有的record时,PartitionedPairBuffer将record排序后spill到磁盘上,最后将内存中和磁盘上的record进行全局排序,得到最终排序后的record。
6.2.2.2 优缺点
  • 优点是只需要一个Array结构就可以支持按照Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,不受数据规模限制。缺点是排序增加计算时延。
6.2.2.3 适用性
  • 适合reduce端不需要聚合,但需要按Key进行排序的操作,如sortByKey()、sortBy()等。

6.2.3 需要combine,需要/不需要sort

在这里插入图片描述
哈希表:ExternalAppendOnlyMap

6.2.3.1 操作流程
  • 获取record后,Spark建立一个类似HashMap的数据结构(ExternalAppendOnlyMap)对buffer中的record进行聚合,HashMap中的Key是record中的Key,HashMap中的Value是经过相同聚合函数(func())计算后的结果。
  • 聚合函数是sum()函数,那么Value中存放的是多个record对应Value相加后的结果。之后,如果需要按照Key进行排序,如下图所示,则建立一个Array结构,读取HashMap中的record,并对record按Key进行排序,排序完成后,将结果输出或者传递给下一步操作。
  • 如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对buffer中的record进行聚合。如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当聚合完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合,得到最终的record,输出到分区文件中。
    注意,这里的排序和聚合依然使用的同一个数据结构。
6.2.3.2 优缺点
  • 同上一节。
6.2.3.3 适用性
  • 适合reduce端需要聚合、不需要或需要按Key进行排序的操作,如reduceByKey()、aggregateByKey()等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/918825.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/918825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Cursor vs Trae vs VSCode:2025终极IDE横评,谁才是开发者的效率之选?

前言 2025年的编程世界&#xff0c;AI不再只是辅助&#xff0c;而是编程工作流的核心驱动者。从微软的VSCode 到新锐 Cursor 与国产黑马 Trae &#xff0c;三大 IDE 正在重新定义“人机协作”的边界。本文从架构设计、AI能力、场景适配等维度&#xff0c;带你看透工具本质&…

Vue 安装指定版本依赖包、删除某个依赖包、依赖管理

如何安装指定版本的依赖包安装指定版本&#xff1a;一旦你知道了想要的版本号&#xff0c;比如3.4.0&#xff0c;你可以使用以下命令来安装这个版本的vue-router&#xff1a;npm install vue-router3.4.0 --save这里的^表示安装3.4.0的最新小版本更新&#xff0c;但不会超过主版…

psycopg2 如何验证链接是否有效

在 psycopg2 中&#xff0c;验证数据库连接是否有效&#xff08;即连接是否仍然活跃&#xff09;可以通过以下几种方法实现&#xff1a;1. 使用 conn.closed 属性 psycopg2 的连接对象有一个 closed 属性&#xff0c;可以检查连接是否已关闭&#xff1a; import psycopg2conn …

数据科学与计算-电商双11美妆数据分析

一、项目背景&#xff1a;双 11 美妆数据的价值所在 每年的 “双 11” 购物节都是电商行业的盛宴&#xff0c;而美妆品类作为消费热门领域&#xff0c;蕴含着丰富的用户行为与市场趋势信息。该项目聚焦双 11 期间的美妆电商数据&#xff0c;旨在通过数据分析揭示以下核心问题&…

简单了解MongoDB数据存储

官方文档&#xff1a;MongoDB中文手册|官方文档中文版 | MongoDB-CN-Manual 什么是MongoDB? MongnDB是一个分布式文件存储数据库(或叫文档数据库)&#xff0c;是一个介于 关系数据库和非关系数据库之间的产品&#xff0c;是非关系数据库当中功能最丰富&#xff0c;最像关系数…

web网站开发,在线%射击比赛成绩管理%系统开发demo,基于html,css,jquery,python,django,model,orm,mysql数据库

经验心得 这个也是crud业务单子&#xff0c;第二个聊点其他的&#xff0c;从最早的无分层开发&#xff0c;到三层开发&#xff0c;工厂&#xff0c;各种接口&#xff0c;再到后面多层&#xff0c;代码无痕aop&#xff0c;各种框架等&#xff0c;都是在方便我们快速打架一个程序…

[QtADS]解析ads.pro

本文来源 &#xff1a; 腾讯元宝subdirs : 子目录TEMPLATE subdirs的作用​​​​核心功能​​&#xff1a;声明当前项目为“多项目管理”模式。Qt 的构建系统&#xff08;qmake&#xff09;会遍历 SUBDIRS中列出的子目录&#xff0c;在每个子目录中寻找 .pro文件并递归构建。…

三方相机问题分析六:【没用相机,诡异的手电筒不可使用】下拉状态栏,手电筒置灰,无法打开,提提示相机正在使用

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 三方相机问题分析六:【没用相机,诡异的手电筒不可使用】下拉状态栏,点击手电筒,手电筒置灰,无法打开,提提示相机正在使用9348353 目录 一、问题背景 二、:问题分析过程 2.1:基于…

Java Selenium 自动打开浏览器保存截图

// 代码 public class ScreenshotExample {public static void main(String[] args) {// 1. 设置浏览器驱动路径&#xff08;根据实际路径修改&#xff09;System.setProperty("webdriver.chrome.driver", "D:\\chromedriver-win64\\chromedriver.exe");//…

新商机:为了减少辐射,可以用座机打机房中心再转手机

某些人痛恨自家附近有基站&#xff0c;说是辐射太大。你不能说人家迷信。一般解决办法就是拆基站。而我觉得&#xff0c;商机来了。现在座机基本没人装了。新商机就是座机。附近没有基站&#xff0c;又要打电话&#xff0c;怎么办&#xff1f;装座机。用座机打电话时&#xff0…

【Java|第十九篇】面向对象九——String类和枚举类

&#xff08;四&#xff09;面向对象11、String类&#xff08;1&#xff09;概述<1>String是一个类&#xff0c;引用数据类型&#xff0c;用来表示字符串&#xff1b;<2>String是Lang包下的类&#xff0c;使用不需要导包&#xff1b;<3>字符串的值不能变&…

超越相似名称:Elasticsearch semantic text 如何在简洁、高效、集成方面超越 OpenSearch semantic 字段

作者&#xff1a;来自 Elastic Mike Pellegrini, Nick Chow 及 Libby Lin 比较 Elasticsearch 语义文本和 OpenSearch 语义字段在简洁性、可配置性和效率方面的表现。 自己动手体验向量搜索&#xff0c;使用这个自定进度的 Search AI 实操学习。你现在可以开始免费的云试用&am…

OpenAI发布最新大模型GPT5、本地部署GPT开源模型

OpenAI发布最新大模型GPT5、本地部署GPT开源模型 GPT-5概述 北京时间 2025年8月8日 凌晨1点 OPENAI举行了1个小时的线上发布会&#xff0c;正式推出了其史上最聪明、最强大的大模型GPT-5。 GPT-5是OpenAI发布的最新一代大型语言模型&#xff0c;它基于Transformer架构&#xff…

容器网络模式选择在云服务器多节点部署中的连通性验证方案

容器网络模式选择在云服务器多节点部署中的连通性验证方案在云计算环境中&#xff0c;容器网络模式的选择直接影响着多节点部署的通信效率和安全性。本文将从Docker原生网络驱动对比入手&#xff0c;深入分析Overlay、Host、Bridge等主流网络模式在跨主机通信场景下的性能表现&…

电商双11美妆数据分析

1、数据初步了解2.数据清洗2.1 重复值处理 直接删除重复值。2.2 缺失值处理通过上面观察数据发现sale_count,comment_count 存在缺失值,先观察存在缺失值的行的基本情况存在的缺失值很可能意味着售出的数量为0或者评论的数量为0&#xff0c;所以我们用0来填补缺失值。2.3 数据挖…

Pytest项目_day14(参数化、数据驱动)

parametrize 参数化可以组装测试数据。在测试前定义好测试数据&#xff0c;并在测试用例中使用 单参数单次循环 我们可以在装饰器中使用mark.parametrize&#xff0c;来定义参数名和参数值列表 参数名还需要传给函数参数名需要用字符串来定义&#xff0c;参数值列表需要用可迭代…

Nest.js、Knex.js、Nuxt.js、Next.js 和 Spring Boot的异同相关概念

总述Nest.js、Knex.js、Nuxt.js、Next.js 和 Spring Boot 是分属不同技术领域的工具&#xff0c;涵盖前端框架、后端框架、数据库工具等角色&#xff0c;它们在开发中既有功能交集&#xff0c;也有明确的定位差异。一、相同点服务端参与能力五者均能在服务端发挥作用&#xff1…

第2节 大模型分布式推理架构设计原则

大模型推理系统的设计过程,本质上是在多重约束下寻找最优解的过程。硬件资源的物理限制、场景对性能的刚性要求、系统的可扩展性需求,共同构成了设计的边界条件。明确这些约束的具体表现形式,理解性能指标之间的权衡逻辑,确立架构设计的核心原则,是构建高效分布式推理系统…

快速部署一个鉴黄服务

1.安装依赖pip install opennsfw22.代码实现import opennsfw2 as n2# 将自动下载预训练模型 open_nsfw_weights.h5 到 C:\Users\Administrator\.opennsfw2\weights # pip install opennsfw2# 单张预测 image_path 1.jpg nsfw_probability n2.predict_image(image_path) print…

Camera open failed

前言 由前面的几篇博客可以知道&#xff0c;openCamera&#xff0c;createCaptureSession&#xff0c;setRepeatingRequest&#xff0c;capture是非常重要的过程&#xff0c;如果其中一个环节出了问题时该如何分析呢&#xff0c;这里我们首先从打开相机流程时&#xff0c;打开…