Spark(29)基础自定义分区器

什么是分区

【复习提问:RDD的定义是什么?】

在 Spark 里,弹性分布式数据集(RDD)是核心的数据抽象,它是不可变的、可分区的、里面的元素并行计算的集合。

在 Spark 中,分区是指将数据集按照一定的规则划分成多个较小的子集,每个子集可以独立地在不同的计算节点上进行处理,这样可以实现数据的并行处理,提高计算效率。

可以将 Spark 中的分区类比为快递公司处理包裹的过程。假设你有一批包裹要从一个城市发送到另一个城市,快递公司会将这些包裹按照一定的规则进行分区,比如按照收件地址的区域划分。每个分区的包裹会被分配到不同的快递员或运输车辆上进行运输,这些快递员或车辆可以同时出发,并行地将包裹送到不同的区域。这就类似于 Spark 中的分区,每个分区的数据可以在不同的计算节点上同时进行处理,从而加快整个数据处理的速度。

默认分区的情况

  1. 从集合创建 RDD(使用 parallelize 方法)

当使用 parallelize 方法从一个集合创建 RDD 时,默认分区数通常取决于集群的配置。

在本地模式下,默认分区数等于本地机器的 CPU 核心数;在集群模式下,默认分区数由 spark.default.parallelism 配置项决定。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("DefaultPartitionExample").setMaster("local")

val sc = new SparkContext(conf)

val data = Seq(1, 2, 3, 4, 5)

val rdd = sc.parallelize(data)

println(s"默认分区数: ${rdd.partitions.length}")

sc.stop()

2.从外部存储(如文件)创建 RDD(使用 textFile 方法)

当使用 textFile 方法从外部存储(如 HDFS、本地文件系统等)读取文件创建 RDD 时,默认分区数通常由文件的块大小决定。对于 HDFS 文件,默认分区数等于文件的块数。例如,一个 128MB 的文件在 HDFS 上被分成 2 个 64MB 的块,那么创建的 RDD 默认分区数就是 2。

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("DefaultPartitionFileExample").setMaster("local")

val sc = new SparkContext(conf)

// 假设文件存在于本地

val rdd = sc.textFile("path/to/your/file.txt")

println(s"默认分区数: ${rdd.partitions.length}")

sc.stop()

【现场演示,如果文件是一个.gz文件,是一个不可拆分的文件,那么默认分区的数量就会是1】

(三)分区的作用

在 Spark 中,RDD 是数据的集合,它会被划分成多个分区,这些分区可以分布在不同的计算节点上,就像图书馆的书架分布在不同的房间一样。

这样做的好处是什么呢?

并行计算:Spark 能够同时对多个分区的数据进行处理,充分利用集群的计算资源,进而加快作业的执行速度。例如,若一个 RDD 有 10 个分区,且集群有足够的计算资源,Spark 就可以同时处理这 10 个分区的数据。

数据局部性:分区有助于实现数据局部性,也就是让计算尽量在数据所在的节点上进行,减少数据在网络间的传输,从而降低网络开销。

容错性:当某个分区的数据处理失败时,Spark 能够重新计算该分区,而不需要重新计算整个 RDD。

当使用savaAsTextFile做保存操作时,最终生成的文件个数通常和RDD的分区数一致。

object PartitionExample {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象,设置应用程序名称和运行模式

    val conf = new SparkConf().setAppName("PartitionExample").setMaster("local")

    // 使用 SparkConf 创建 SparkContext 对象

    val sc = new SparkContext(conf)

    // 创建一个包含 10 个元素的 Seq

    val data = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    // 使用 parallelize 方法创建 RDD,并设置分区数为 3

    val rdd = sc.parallelize(data, 3)

    // 将 RDD 保存为文本文件,保存路径为 "output"

    rdd.saveAsTextFile("output")

    // 停止 SparkContext,释放资源

    sc.stop()

  }

}  

在运行代码后,output 目录下会生成与 RDD 分区数量相同的文本文件,这里 RDD 分区数设置为 3,所以会生成 3 个文件,文件名通常为 part-00000、part- 00001、part-00002 。

  • (四)分区器和默认分区器

分区器是 Spark 中用于决定 RDD 数据如何在不同分区之间进行分布的组件。通过定义分区规则,它能够将具有键值对类型的数据(PairRDD)按照一定策略划分到不同分区,以实现数据的合理分布,进而提高并行计算的效率。

在大多数涉及键值对的转换操作中,Spark 默认使用 HashPartitioner。例如,reduceByKey、groupByKey 等操作,如果没有显式指定分区器,就会使用 HashPartitioner

HashPartitioner 根据键的哈希值来决定数据应该被分配到哪个分区。具体来说,它会对键的哈希值取模,模的结果就是分区的编号。假设分区数为 n,键为 key,则分区编号的计算公式为 hash(key) % n。

对于键值对 RDD,HashPartitioner 是大多数转换操作的默认分区器,而 RangePartitioner 是 sortByKey 操作的默认分区器。你也可以根据具体需求显式指定分区器来控制数据的分区方式。

  • (五)为什么需要自定义分区

 

数据倾斜:当数据分布不均匀,某些分区数据量过大,导致计算负载不均衡时,可自定义分区器,按照特定规则重新分配数据,避免数据倾斜影响计算性能。比如电商订单数据中,按地区统计销售额,若某些热门地区订单数远多于其他地区,使用默认分区器会使部分任务计算量过大。通过自定义分区器,可将热门地区进一步细分,让各分区数据量更均衡。

特定业务逻辑:若业务对数据分区有特殊要求,如按时间段将日志数据分区,不同时间段的数据存到不同分区便于后续处理分析;或在社交网络数据中,按用户关系紧密程度分区等,都需自定义分区器实现。

自定义分区器的实现步骤

自定义分区器需要:继承Partitioner抽象类 + 实现其中的两个方法。

  1. numPartitions :返回分区的数量,即整个 RDD 将被划分成多少个分区 。
  2. getPartition(key: Any) :接收一个键值key(对于非键值对类型 RDD,可根据数据特征构造合适的键 ),根据自定义逻辑返回该键值对应的分区索引(从 0 开始,取值范围为 0 到numPartitions - 1 ) 。
案例

假设要对 NBA 球队比赛信息进行分区存储,要求将湖人、火箭两队信息单独存储,其余球队信息存放在一个分区。

("勇士", "info1"),
("掘金", "info2"),

("湖人", "info3"),

("火箭", "info4")

示例代码如下:

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object CustomPartitionerExample {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("CustomPartitionerExample").setMaster("local[*]")

    val sc = new SparkContext(conf)

    // 准备数据集,数据为(球队名称, 相关信息)形式的键值对

    val rdd = sc.parallelize(List(

      ("勇士", "info1"),

      ("掘金", "info2"),

      ("湖人", "info3"),

      ("火箭", "info4")

    ))

    // 使用自定义分区器对RDD进行分区

    val partitionedRDD = rdd.partitionBy(new MyPartitioner)

    partitionedRDD.saveAsTextFile("output")

    sc.stop()

  }

}
// 自定义分区器类

class MyPartitioner extends Partitioner {

  // 定义分区数量为3

  override def numPartitions: Int = 3

  // 根据球队名称(键值)确定分区索引

  override def getPartition(key: Any): Int = {

    key match {

      case "湖人" => 0

      case "火箭" => 1
      case _ => 2

    }

  }

}

核心代码解释:

  1. MyPartitioner类继承自Partitioner,实现了numPartitions方法指定分区数量为 3 ,实现getPartition方法,根据球队名称判断分区索引,湖人对应分区 0,火箭对应分区 1,其他球队对应分区 2 。

2.在main方法中,创建包含球队信息的 RDD,然后调用partitionBy方法并传入自定义分区器MyPartitioner,对 RDD 进行分区,最后将分区后的数据保存到指定路径。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/news/906805.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python打卡训练营打卡记录day35

知识点回顾: 三种不同的模型可视化方法:推荐torchinfo打印summary权重分布可视化进度条功能:手动和自动写法,让打印结果更加美观推理的写法:评估模式 作业:调整模型定义时的超参数,对比下效果 1…

【MySQL】07.表内容的操作

1. insert 我们先创建一个表结构,这部分操作我们使用这张表完成我们的操作: mysql> create table student(-> id int primary key auto_increment,-> name varchar(20) not null,-> qq varchar(20) unique-> ); Query OK, 0 rows affec…

使用SQLite Expert个人版VACUUM功能修复数据库

使用SQLite Expert个人版VACUUM功能修复数据库 一、SQLite Expert工具简介 SQLite Expert 是一款功能强大的SQLite数据库管理工具,分为免费的个人版(Personal Edition)和收费的专业版(Professional Edition)。其核心功…

LM-BFF——语言模型微调新范式

gpt3(GPT3——少样本示例推动下的通用语言模型雏形)结合提示词和少样本示例后,展示出了强大性能。但大语言模型的训练门槛太高,普通研究人员无力,LM-BFF(Making Pre-trained Language Models Better Few-shot Learners)的作者受gp…

遥感解译项目Land-Cover-Semantic-Segmentation-PyTorch之二训练模型

遥感解译项目Land-Cover-Semantic-Segmentation-PyTorch之一推理模型 背景 上一篇文章了解了这个项目的环境安装和模型推理,这篇文章介绍下如何训练这个模型,添加类别 下载数据集 在之前的一篇文章中,也有用到这个数据集 QGIS之三十六Deepness插件实现AI遥感训练模型 数…

【NLP 71、常见大模型的模型结构对比】

三到五年的深耕,足够让你成为一个你想成为的人 —— 25.5.8 模型名称位置编码Transformer结构多头机制Feed Forward层设计归一化层设计线性层偏置项激活函数训练数据规模及来源参数量应用场景侧重GPT-5 (OpenAI)RoPE动态相对编码混合专家架构(MoE&#…

[250521] DBeaver 25.0.5 发布:SQL 编辑器、导航器全面升级,新增 Kingbase 支持!

目录 DBeaver 25.0.5 发布:SQL 编辑器、导航器全面升级,新增 Kingbase 支持! DBeaver 25.0.5 发布:SQL 编辑器、导航器全面升级,新增 Kingbase 支持! 近日,DBeaver 发布了 25.0.5 版本&#xf…

服务器硬盘虚拟卷的处理

目前的情况是需要删除逻辑卷,然后再重新来弄一遍。 数据已经备份好了,所以不用担心数据会丢失。 查看服务器的具体情况 使用 vgdisplay 操作查看服务器的卷组情况: --- Volume group ---VG Name vg01System IDFormat …

Flutter 中 build 方法为何写在 StatefulWidget 的 State 类中

Flutter 中 build 方法为何写在 StatefulWidget 的 State 类中 在 Flutter 中,build 方法被设计在 StatefulWidget 的 State 类中而非 StatefulWidget 类本身,这种设计基于几个重要的架构原则和实际考量: 1. 核心设计原因 1.1 生命周期管理…

传统医疗系统文档集中标准化存储和AI智能化更新路径分析

引言 随着医疗数智化建设的深入推进,传统医疗系统如医院信息系统(HIS)、临床信息系统(CIS)、护理信息系统(NIS)、影像归档与通信系统(PACS)和实验室信息系统(LIS)已经成为了现代医疗机构不可或缺的技术基础设施。这些系统各自承担着不同的功能,共同支撑…

探索常识性概念图谱:构建智能生活的知识桥梁

目录 一、知识图谱背景介绍 (一)基本背景 (二)与NLP的关系 (三)常识性概念图谱的引入对比 二、常识性概念图谱介绍 (一)常识性概念图谱关系图示例 (二&#xff09…

Linux/aarch64架构下安装Python的Orekit开发环境

1.背景 国产化趋势越来越强,从软件到硬件,从操作系统到CPU,甚至显卡,就产生了在国产ARM CPU和Kylin系统下部署Orekit的需求,且之前的开发是基于Python的,需要做适配。 2.X86架构下安装Python/Orekit开发环…

Ctrl+鼠标滚动阻止页面放大/缩小

项目场景: 提示:这里简述项目相关背景: 一般在我们做大屏的时候,不希望Ctrl鼠标上下滚动的时候页面会放大/缩小,那么在有时候,又不希望影响到别的页面,比如说这个大屏是在另一个管理后台中&am…

MySQL——复合查询表的内外连

目录 复合查询 回顾基本查询 多表查询 自连接 子查询 where 字句中使用子查询 单行子查询 多行子查询 多列子查询 from 字句中使用子查询 合并查询 实战OJ 查找所有员工入职时候的薪水情况 获取所有非manager的员工emp_no 获取所有员工当前的manager 表的内外…

聊一下CSS中的标准流,浮动流,文本流,文档流

在网络上关于CSS的文章中,有时候能听到“标准流”,“浮动流”,“定位流”等等词语,还有像“文档流”,“文本流”等词,这些流是什么意思?它们是CSS中的一些布局方案和特性。今天我们就来聊一下CS…

python训练营第33天

MLP神经网络的训练 知识点回顾: PyTorch和cuda的安装查看显卡信息的命令行命令(cmd中使用)cuda的检查简单神经网络的流程 数据预处理(归一化、转换成张量)模型的定义 继承nn.Module类定义每一个层定义前向传播流程 定义…

JDK21深度解密 Day 1:JDK21全景图:关键特性与升级价值

【JDK21深度解密 Day 1】JDK21全景图:关键特性与升级价值 引言 欢迎来到《JDK21深度解密:从新特性到生产实践的全栈指南》系列的第一天。今天我们将探讨JDK21的关键特性和升级价值。作为近5年最重要的LTS版本,JDK21不仅带来了性能上的巨大突…

[docker]更新容器中镜像版本

从peccore-dev仓库拉取镜像 docker pull 10.12.135.238:8060/peccore-dev/configserver:v1.13.45如果报错,请参考docker拉取镜像失败,添加仓库地址 修改/etc/CET/Common/peccore-docker-compose.yml文件中容器的版本,为刚刚拉取的版本 # 配置中心confi…

LVS原理详解及LVS负载均衡工作模式

什么是虚拟服务器(LVS) 虚拟服务器是高度可扩展且高度可用的服务器 构建在真实服务器集群上。服务器集群的架构 对最终用户完全透明,并且用户与 cluster 系统,就好像它只是一个高性能的虚拟 服务器。请考虑下图。 真实服务器和负…

上位机知识篇---keil IDE操作

文章目录 前言文件操作按键新建打开保存保存所有编辑操作按键撤销恢复复制粘贴剪切全选查找书签操作按键添加书签跳转到上一个书签跳转到下一个书签清空所有书签编译操作按键编译当前文件构建目标文件重新构建调试操作按键进入调试模式复位全速运行停止运行单步调试逐行调试跳出…