7.7晚自习作业

实操作业02:Spark核心开发

作业说明

  • 请严格按照步骤操作,并将最终结果文件(命名为:sparkcore_result.txt)于20点前上传
  • 结果文件需包含每一步的关键命令执行结果文本输出。

一、数据读取与转换操作

  1. 上传账户数据$DATA_EXERCISE/accounts到HDFS的/dw/accounts目录,从HDFS路径/dw/accounts读取accounts数据文件
hadoop fs -mkdir -p /dw/accounts
hadoop fs -put $DATA_EXERCISE/accounts /dw/accounts/

  1. 将每行数据按逗号分割成字段数组
  2. 以邮政编码字段(第9个字段)作为key,创建键值对RDD
  3. 查看转换后的数据结构,显示前2条记录

step1. 创建 RDD(读取所有 part 文件)

// 使用通配符 * 读取目录下所有 part 文件
val accountsRDD = sc.textFile("hdfs://master:8020/dw/accounts/accounts/part-*")// 验证数据加载
accountsRDD.take(2).foreach(println)

step2:数据转换

// 一、数据转换操作
val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")  // 按逗号分割每行数据val zipCode = fields(8)      // 提取第9个字段(索引8)作为邮政编码(zipCode, fields)            // 返回(邮编, 字段数组)的键值对
})// 显示前2条
zipKeyValueRDD.take(2).foreach { case (zip, fields) => println(s"Key: $zip | Value: ${fields.mkString(",")}")
}

为什么可以确定line是一个数组属性的变量呢

解析:
line: 是一个 String 类型的变量,表示CSV文件的某一行(例如:"1,John,Doe,10001")
.split(","):
是String类的方法,按指定的分隔符(这里是逗号 ,)切割字符串
返回一个 Array[String](字符串数组)
val fields:
定义一个 不可变变量(Scala中用val声明)
类型由编译器自动推断为 Array[String]
存储切割后的字段数组可以通过Spark-shell直接验证类型:
// 示例数据
val line = "1,John,Doe,10001"
// 执行切割
val fields = line.split(",")  // 得到:Array("1", "John", "Doe", "10001")
// 查看类型
println(fields.getClass)  // 输出:class [Ljava.lang.String; (表示String数组)

这三行代码也只能输出一行数据的转换结果,但是这个是一整个文件的转换结果,应该是有循环在里面的,循环体现在哪呢

这里的关键在于 Spark的分布式计算模型 和 RDD的转换操作 的运作机制。让我们深入解析这个"隐式循环"的实现原理:

1. 表面现象 vs 底层实现

你看到的代码(看似单行处理)

scala

val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")  // 似乎只处理一行val zipCode = fields(8)(zipCode, fields)
})

实际发生的处理(分布式并行处理)

python

# 伪代码:Spark集群的实际执行逻辑
for partition in accountsRDD.partitions:    # 遍历所有数据分区for line in partition:                   # 分区内逐行处理(真正的循环)fields = line.split(",")emit((fields[8], fields))             # 输出键值对

2. 循环的隐藏位置

Spark通过以下机制实现"隐式循环":

组件作用类比传统编程
RDD.map()定义转换逻辑相当于循环体
Partitions(分区)数据分片相当于外层循环
Tasks(任务)并行处理单元相当于多线程循环

3. 完整数据处理流程

  1. 输入阶段

    • 假设原始文件有100万行,被Spark自动划分为N个分区(如HDFS的block数)

  2. 并行执行阶段

    • 每个分区的数据会被不同的Executor进程处理

    • 每个Executor对分配到的分区数据 逐行执行map函数

  3. 输出阶段

    • 所有节点的处理结果合并成新的RDD

4. 关键问题解答

Q: 为什么代码没有显式循环却能处理整个文件?

  • Spark的RDD操作是声明式的:你只需要定义"做什么"(业务逻辑),Spark框架自动处理"怎么做"(分布式循环)

  • 数据分区和并行计算

    scala

    // 假设文件被分为3个分区
    accountsRDD.partitions.size  // 返回分区数

    每个分区会触发一个Task,每个Task内部会循环处理其分配到的数据

Q: 如何验证确实处理了所有数据?

scala

// 查看处理后的记录数
zipKeyValueRDD.count() 

// 查看分区处理情况(需在Spark UI观察)
zipKeyValueRDD.toDebugString


5. 与传统编程的对比

特性Spark RDD传统Java/Python
循环方式隐式(由框架管理)显式(for/while)
并行度自动分区并行需手动线程/进程管理
数据规模支持TB/PB级通常单机内存限制
代码示例rdd.map(x => f(x))for x in list: f(x)
  • 循环确实存在:但被Spark框架隐藏在分布式运行时中

  • 你的代码是"循环体"map中的逻辑会被应用到每一行数据

  • 优势:开发者无需关心并行化和数据分发,专注业务逻辑

二、数据聚合操作

  1. 对上述RDD提取每个记录的姓名字段:
  2. 提取第5个字段(first_name)和第4个字段(last_name)
  3. 将姓和名用逗号连接
val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}"  // 格式化为"姓,名"
)

    在Scala中,s"${fields(4)},${fields(3)}" 是一种称为 字符串插值(String Interpolation) 的语法

    1. 字符串插值的组成

    部分含义示例
    开头的s表示启用字符串插值s"..."
    ${}插入变量/表达式的语法${fields(4)}
    引号内内容固定字符串+动态变量组合"姓,名"

    2. 具体到代码

    scala

    s"${fields(4)},${fields(3)}"
    • 等效的普通写法

      scala

      fields(4) + "," + fields(3)  // 直接字符串拼接

    • 执行过程

      1. 取出数组fields的第5个元素(索引4)

      2. 取出第4个元素(索引3)

      3. 用逗号连接两者

    3. 对比其他语言

    语言类似语法示例
    Scalas"${var}"s"Hello, ${name}"
    Pythonf-stringf"Hello, {name}"
    JavaScript模板字符串`Hello, ${name}`

    1. map vs mapValues 的本质区别

    操作函数签名输入 → 输出在你的代码中的应用
    map(T) => U整个元素 → 新元素line => (zipCode, fields)
    mapValues(V) => U仅值部分 → 新值(键不变)fields => "姓,名"

    2.代码中两个阶段的解析

    (1)第一阶段:数据转换 (map)

    scala

    val zipKeyValueRDD = accountsRDD.map(line => {val fields = line.split(",")       // String → Array[String]val zipCode = fields(8)           // 提取key(zipCode, fields)                 // 返回: (String, Array[String])
    })
    • line => 的含义:

      • 输入:原始字符串(如 "1,John,Doe,10001"

      • 输出:完全新建的键值对 (String, Array[String])

    • 数据流

      text

      "1,John,Doe,10001" → split → ["1","John","Doe","10001"] → 取fields(8)作为key → 输出 ("10001", ["1","John","Doe","10001",...])
    (2)第二阶段:聚合 (mapValues)

    scala

    val nameByZipRDD = zipKeyValueRDD.mapValues(fields => s"${fields(4)},${fields(3)}"  // 仅修改value部分
    )
    • fields => 的含义:

      • 输入:已有键值对的值部分(即之前的 Array[String]

      • 输出:仅更新值(键 zipCode 保持不变)

    • 数据流

      text

      输入: ("10001", ["1","John","Doe","10001",...])→ 提取fields(4)和fields(3) → 输出 ("10001", "Doe,John")  // 键未改变!

    3. => 的本质

    • => 是Scala中的函数定义符号,表示:

      scala

      val func: InputType => OutputType = (input) => { // 处理input output 
      }
    • 在代码中:

      • line => ...:定义了一个从 String 到 (String, Array[String]) 的函数

      • fields => ...:定义了一个从 Array[String] 到 String 的函数

    1. 按邮政编码分组
    2. 查看聚合结果,显示前2条记录
    val groupedByNameRDD = nameByZipRDD.groupByKey()// 显示前2组
    groupedByNameRDD.take(2).foreach {case (zip, names) => println(s"$zip -> ${names.mkString("; ")}")
    }

    三、数据排序与展示

    1. 对分组后的RDD按邮政编码进行升序排列
    2. 取前5条记录进行展示
    3. 对每条记录,先打印邮政编码,然后打印该邮政编码下的所有姓名列表
    groupedByNameRDD.sortByKey().take(5).foreach {case (zip, names) =>println(s"\n=== 邮政编码: $zip ===")names.foreach(println)
    }


    四、提交要求

    1. 代码和结果文件:将代码及其执行后的输出结果保存到sparkcore_result.txt文件中

    2. 结果文件应包含

    3. 数据读取与转换操作的代码和输出结果
    4. 数据聚合操作的代码和输出结果
    5. 数据排序与展示的代码和输出结果

    本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
    如若转载,请注明出处:http://www.pswp.cn/bicheng/88023.shtml
    繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88023.shtml

    如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

    相关文章

    手机FunASR识别SIM卡通话占用内存和运行性能分析

    手机FunASR识别SIM卡通话占用内存和运行性能分析 --本地AI电话机器人 上一篇:手机无网离线使用FunASR识别SIM卡语音通话内容 下一篇:手机通话语音离线ASR识别商用和优化方向 一、前言 书接上一文《阿里FunASR本地断网离线识别模型简析》,…

    虚幻引擎Unreal Engine5恐怖游戏设计制作教程,从入门到精通从零开始完整项目开发实战详细讲解中英字幕

    和大家分享一个以前收集的UE5虚幻引擎恐怖游戏开发教程,这是国外一个大神制作的视频教程,教程从零开始到制作出一款完整的游戏。内容讲解全面,如蓝图基础知识讲解、角色控制、高级交互系统、高级库存系统、物品检查、恐怖环境氛围设计、过场动…

    多人协同开发时Git使用命令

    拉取仓库代码 # 拉取远程仓库至本地tar_dir路径 git clone gitgithub.com:your-repo.git target_dir # 默认是拉取远程master分支,下面拉取并切换到自己需要开发的分支上 # 假设自己需要开发的分支是/feature/my_branch分支 git checkout -b feature/my_branch orig…

    线性表——双向链表

    线性表——双向链表1. 双向链表的实现1.1 简单图例1.2 结点的定义1.3 新结点的创建1.4 链表的初始化1.5 结点的插入1.5.1 头部插入(头插)1.5.2 尾部插入(尾插)1.5.3 任意位置(前)插入1.6 结点的删除1.6.1 头…

    Java后端技术博客汇总文档

    文章目录 前言Java后端汇总链接Java基础知识点数据结构算法(Java实现)算法知识点合集算法刷题算法竞赛AcWing课程蓝桥杯AB组辅导课合集(更新中…) 源码分析redission 数据库SQL ServerMySQLRedis -Canal JUC并发编程JVMNetty日志框…

    QT 菜单栏设计使用方法

    目录 常用设置函数 多个QAction的单选设置 ​​​​​​​菜单相关类 ​​​​​​​ 系统菜单的生成和响应 使用代码添加系统菜单 使用UI设计器设计系统菜单 使用Qt设计及界面时,常用的两种方式添加菜单,第一使用UI界面添加,第二种 在…

    AIGC领域AI艺术,打造个性化艺术作品

    AIGC领域AI艺术,打造个性化艺术作品 关键词:AIGC、AI艺术、生成对抗网络、个性化创作、深度学习、艺术风格迁移、创意计算 摘要:本文深入探讨了AIGC(人工智能生成内容)在艺术创作领域的应用,重点分析了如何利用AI技术打造个性化艺术作品。文章从技术原理出发,详细解析了生…

    基于Flask+Jinja2的快捷教务系统(后端链接到新版正方教务系统)

    快捷教务系统(Easy Educational Administration Management System, EasyEAMS) 项目简介 EasyEAMS 是一个基于 Flask Jinja2 的现代化教务系统 Web 应用。学生可通过网页端登录,在线查询个人信息、成绩、课表、学业生涯、通知、选课等。系…

    EDM自动化与出海独立开发实用教程

    随着互联网全球化发展,越来越多的独立开发者(Indie Developer)选择将自己的产品推向海外市场。如何高效地获客、激活用户、提升转化率,成为出海过程中必须解决的问题。EDM(电子邮件营销)自动化,…

    「日拱一码」017 深度学习常用库——TensorFlow

    目录 基础操作 张量操作: tf.constant 用于创建常量张量 tf.Variable 用于创建可训练的变量张量 tf.reshape 可改变张量的形状 tf.concat 可将多个张量沿指定维度拼接 tf.split 则可将张量沿指定维度分割 数学运算: tf.add 张量的加运算 tf.su…

    ARM DStream仿真器脚本常用命令

    以下是ARM DStream仿真器脚本中常用的命令及其功能分类,结合调试流程和典型应用场景整理: ⚙️ 一、连接与初始化命令 connect 建立与目标设备的连接,需指定接口类型(如JTAG/SWD)和处理器核心。 示例:conne…

    vscode 调试unity

    lanch.json { “version”: “0.2.0”, “configurations”: [ { “name”: “Attach to Unity”, “type”: “vstuc”, “request”: “attach” } ] }

    金融IT入门知识点

    银行金融IT核心知识点全解析:架构、技术与实践 一、金融IT的战略地位与行业特性 金融IT作为银行业务的核心支撑体系,其发展水平直接决定了银行服务的效率、安全性与创新能力。截至 2025年,中国银行业线上化业务占比已达97%,手机银…

    C++——手撕智能指针、单例模式、线程池、String

    智能指针今天我们来学习一下C中的智能指针,如果有人不知道C中的智能指针的概念的话:C智能指针是一种基于RAII(Resource Acquisition Is Initialization,资源获取即初始化)机制的高级内存管理工具,用于自动化…

    Mybatis----留言板

    基础项目:留言板 截止到目前为止,我们已经学习了 Spring(只学习了DI)、Spring MVC、SpringBoot、Mybatis 这些知识了,已经满足了做简单项目的基本要求了,所以接下来我们就从0到1实现表白墙项目。 需求分析…

    Web-API-day3 DOM事件进阶

    一、 事件流 1.事件冒泡 const fa document.querySelector(.father)const son document.querySelector(.son)document.addEventListener(click, function () {alert(我是爷爷)})fa.addEventListener(click, function () {alert(我是爸爸)})son.addEventListener(click, fun…

    小波增强型KAN网络 + SHAP可解释性分析(Pytorch实现)

    效果一览一、传统KAN网络的痛点与突破 1. 传统KAN的局限性 传统Kolmogorov-Arnold网络(KAN)虽在理论上有可靠的多变量函数逼近能力,但存在显著瓶颈: 计算效率低:训练速度慢于MLP,资源消耗大,尤其…

    tomcat部署多个端口以及制定路径部署-vue3

    vue3项目tomcat部署记录 使用hash路由 字符串拼接的图片地址可以使用import.meta.env.BASE_URL 默认8080 如果部署地址为8080/xc 则设置 vite.config.js中设置base为’/xc/’ outDir设置为xc 打包产物直接拖到webapps目录下 如果另开一个端口 如8081 设置根目录访问 conf/ser…

    LeetCode三数之和-js题解

    给你一个整数数组 nums ,判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k且 j ! k ,同时还满足 nums[i] nums[j] nums[k] 0 。请你返回所有和为 0 且不重复的三元组。 注意:答案中不可以包含重复的三元组。 示例 1&…

    Flink SQLServer CDC 环境配置与验证

    一、SQL Server 数据库核心配置 1. 启用 CDC 功能(Change Data Capture) SQL Server CDC 依赖数据库级别的 CDC 功能及表级别的捕获配置,需按以下步骤启用: 启用数据库 CDC -- 以管理员身份连接数据库 USE master; GO-- 检查数…