《大数据技术原理与应用》实验报告七 熟悉 Spark 初级编程实践

目  录

一、实验目的

二、实验环境

三、实验内容与完成情况

3.1 Spark读取文件系统的数据。

3.2 编写独立应用程序实现数据去重。

3.3 编写独立应用程序实现求平局值问题。

四、问题和解决方法

五、心得体会


一、实验目的

        1. 掌握使用 Spark 访问本地文件和 HDFS 文件的方法。

        2. 掌握 Spark 应用程序的编写、编译和运行方法。


二、实验环境

        1. 硬件要求:笔记本电脑一台

        2. 软件要求:VMWare虚拟机、Ubuntu 18.04 64、JDK1.8、Hadoop-3.1.3、Hive-3.1.2、Windows11操作系统、Eclipse、Flink-1.9.1、IntelliJ IDEA、Spark-2.4.0


三、实验内容与完成情况

3.1 Spark读取文件系统的数据。

        在 Linux 系统中安装 IntelliJ IDEA,然后使用 IntelliJ IDEA 工具开发 WordCount 程序,并打 包成JAR 文件,提交到 Flink 中运行。

        (1)在 spark-shell 中读取 Linux 系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数。

        ①下载spark压缩包并使用如下语句将对应的压缩包解压到/usr/local的文件目录下:

sudo tar -zxf ./spark-2.4.0-bin-without-hadoop.tgz -C /usr/local

        ②使用如下语句将解压后的文件夹“spark-2.4.0-bin-without-hadoop”重命名为“spark”:

sudo mv ./spark-2.4.0-bin-without-hadoop/ ./spark

        ③使用如下语句修改spark的配置文件信息:

vim /etc/profile

        ④使用如下语句让刚修改完的配置信息立即生效:

source /etc/profile

        ⑤使用如下语句复制"spark-env.sh.template"文件的内容并将其存入到"spark-env.sh"文件中:

cp spark-env.sh.template spark-env.sh

        ⑥使用如下语句启动spark进行实验操作:

./sbin/start-all.sh

        ⑦使用如下语句进行进程信息查看:

jps

        ⑧使用如下语句完成spark-shell的启动工作:

./bin/spark-shell

        ⑨使用如下语句读取 Linux 系统本地文件“/home/hadoop/test.txt”:

cd /home/hadoop/test.txt
val textFile=sc.textFile("file:///home/hadoop/test.txt")

        ⑩使用如下语句统计文件“/home/hadoop/test.txt”的行数:

textFile.count()

        (2)在 spark-shell 中读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在, 请先创建),然后统计出文件的行数。

        ①使用如下语句在HDFS上传文件"1.txt",上传完成后并进行查看是否成功完成上传:

cd /usr/local/hadoop
hadoop fs -ls
hadoop fs -put 1.txt
hadoop fs -ls

        ②使用如下语句读取 Linux 系统HDFS系统文件“/user/hadoop/test.txt”:

val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")

        ③使用如下语句统计HDFS系统文件“/user/hadoop/test.txt”的行数:

textFile.count()

        (3)编写独立应用程序(推荐使用 Scala 语言),读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后统计出文件的行数;通过 sbt 工具将整个应用程 序编译打包成 JAR 包,并将生成的 JAR 包通过 spark-submit 提交到 Spark 中运行命令。

        ①使用hadoop用户名登录Linux系统,打开一个终端在Linux终端中执行如下命令创建一个文件夹sparkapp作为应用程序根目录:

cd ~
mkdir ./sparkapp
mkdir -p ./sparkapp/src/main/scala

        ②将对应的应用程序代码存放在应用程序根目录下的“src/main/scala”目录下,使用vim编辑器在“~/sparkapp/src/main/scala”下建立一个名为 SimpleApp.scala的Scala代码文件:

import org.apache.spark.{SparkConf, SparkContext}object SimpleApp {def main(args: Array[String]) {// 正确的路径:去掉路径字符串中的多余空格val logFile = "hdfs://localhost:9000/user/hadoop/test.txt"// 设置SparkConfval conf = new SparkConf().setAppName("Simple Application")// 初始化SparkContextval sc = new SparkContext(conf)// 读取文件并进行计算val logData = sc.textFile(logFile, 2)// 计算并输出文件的行数val num = logData.count()printf("The num of this file is %d\n", num)// 关闭SparkContextsc.stop()}
}

        ③SimpleApp.scala程序依赖于Spark API,因此需要通过sbt进行编译打包以后才能运行。 首先需要使用vim编辑器在“~/sparkapp”目录下新建文件simple.sbt:

cd ~
vim ./sparkapp/simple.sbt

        ④simple.sbt文件用于声明该独立应用程序的信息以及与 Spark的依赖关系,需要在simple.sbt文件中输入以下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

        ⑤为了保证sbt能够正常运行,先执行如下命令检查整个应用程序的文件结构:

cd sparkapp/
find .

        ⑥通过如下代码将整个应用程序打包成 JAR:

/usr/local/sbt/sbt package

        ⑦生成的JAR包的位置为“~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar”,使用sbt打包得到的应用程序JAR包可以通过 spark-submit 提交到 Spark 中运行:

/usr/local/spark/bin/spark-submit --class "SimpleApp"
~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar

3.2 编写独立应用程序实现数据去重。

        对于两个输入文件 A 和 B,编写 Spark 独立应用程序(推荐使用 Scala 语言),对两个 文件进行合并,并剔除其中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的 一个样例供参考。

        输入文件 A 的样例如下:

20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z

        输入文件 B 的样例如下:

20170101 y
20170102 y
20170103 x
20170104 z
20170105 y

        根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:

20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

        (1)在目录/usr/local/spark/mycode/remdup/src/main/scala下新建一个remdup.scala,然后输入如下代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitionerobject RemDup {def main(args: Array[String]) {// 初始化SparkConf和SparkContextval conf = new SparkConf().setAppName("RemDup")val sc = new SparkContext(conf)// 输入文件路径val dataFile = "file:///home/charles/data"// 读取文件并创建RDD,设置为2个分区val data = sc.textFile(dataFile, 2)// 处理数据:过滤空行,去除多余空格并进行分区和排序val res = data.filter(_.trim.length > 0)  // 过滤空行.map(line => (line.trim, "")) // 每行生成键值对(键为去除空格的行内容,值为空字符串).partitionBy(new HashPartitioner(1))  // 使用HashPartitioner进行分区.groupByKey()  // 根据键进行分组.sortByKey()  // 按照键进行排序.keys  // 只保留键(去除重复行)// 将结果保存到文件res.saveAsTextFile("result")// 停止SparkContextsc.stop()}
}

        (2)在目录/usr/local/spark/mycode/remdup目录下新建simple.sbt,然后输入如下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

        (3)在目录/usr/local/spark/mycode/remdup下执行如下命令打包程序:

sudo /usr/local/sbt/sbt package

        (4)最后在目录/usr/local/spark/mycode/remdup下执行如下命令提交程序,在目录/usr/local/spark/mycode/remdup/result下即可得到结果文件:

/usr/local/spark/bin/spark-submit --class "RemDup"
/usr/local/spark/mycode/remdup/target/scala-2.11/simple-project_2.11-1.0.jar

3.3 编写独立应用程序实现求平局值问题。

        每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。下面是输入文件和输出文件的一个样例供参考。

        Algorithm 成绩:

小明 92
小红 87
小新 82
小丽 90

        Database 成绩:

小明 95
小红 81
小新 89
小丽 85

        Python 成绩:

小明 82
小红 83
小新 94
小丽 91

        平均成绩如下:

(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)

        (1)在目录/usr/local/spark/mycode/avgscore/src/main/scala下新建一个avgscore.scala,然后输入如下代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.HashPartitionerobject AvgScore {def main(args: Array[String]) {// 初始化SparkConf和SparkContextval conf = new SparkConf().setAppName("AvgScore")val sc = new SparkContext(conf)// 输入文件路径val dataFile = "file:///home/hadoop/data"// 读取文件并创建RDD,设置为3个分区val data = sc.textFile(dataFile, 3)// 处理数据:过滤空行,拆分数据并转为键值对val res = data.filter(_.trim.length > 0)  // 过滤空行.map(line => {val parts = line.split(" ")(parts(0).trim, parts(1).trim.toInt) // 键为学生名字,值为成绩}).partitionBy(new HashPartitioner(1))  // 使用HashPartitioner进行分区.groupByKey()  // 根据键(学生名字)进行分组.map(x => {var n = 0var sum = 0.0// 计算每个学生的总成绩和数量for (i <- x._2) {sum += in += 1}// 计算平均成绩val avg = sum / n// 格式化平均成绩到两位小数val format = f"$avg%1.2f"(x._1, format)  // 返回学生名字和格式化后的平均成绩})// 将结果保存到文件res.saveAsTextFile("result")// 停止SparkContextsc.stop()}
}

        (2)在目录/usr/local/spark/mycode/avgscore目录下新建simple.sbt,然后输入如下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

        (3)在目录/usr/local/spark/mycode/avgscore下执行如下命令打包程序:

sudo /usr/local/sbt/sbt package

        (4)最后在目录/usr/local/spark/mycode/avgscore下执行下面命令提交程序,在目录/usr/local/spark/mycode/avgscore/result下即可得到结果文件:

/usr/local/spark/bin/spark-submit --class "AvgScore"
/usr/local/spark/mycode/avgscore/target/scala-2.11/simple-project_2.11-1.0.jar


四、问题和解决方法

        1. 实验问题:Spark的配置可能变得很复杂,尤其是当涉及到不同的集群管理器、存储系统和应用需求时。

解决方法:仔细阅读Spark官方文档,并理解各种配置选项含义,根据应用需求和集群环境,选择正确配置选项并进行适当调整。

        2. 实验问题:在Spark实验中,数据清洗、转换和聚合较为繁琐。

解决方法:使用Spark提供的丰富数据处理功能,例如DataFrame API和Transformations、Actions操作,通过使用这些功能,可以轻松地处理数据并对其进行各种操作。

        3. 实验问题:用spark读取本地文件并统计文件内容行数时报出IllegalArgumentException(非法参数异常)。

解决方法:由于file:///少打了一个/,spark读取本地文件的url路径有问题,正确的应该是valtextFile=sc.textFile("file:///usr/local/hadoop/1.txt")。

        4. 实验问题:用spark读取hdfs文件系统中的文件并统计文件内容行数时报出InvalidInput Exception(无效输入异常),并提示输入路径不存在。

解决方法:读取hdfs文件的url路径有问题,hdfs文件系统的根目录不是~,而应该是/user/hadoop,其中hadoop是用户名。

        5. 实验问题:编写spark独立应用程序并实现数据去重时,程序运行报出异常:java.net.URI SyntaxException: Illegal character in scheme name at index 0。

解决方法:代码中获取hdfs文件的地址URL前面有空格,就会报错java.net.URISyntaxException:Illegal character in scheme name at index 0,删掉空格就好了。

        6. 实验问题:随着Spark版本更新,一些功能和API会发生变化,导致与旧版本不兼容。

解决方法:了解Spark版本之间差异并适应新API和功能通过仔细阅读官方文档并参考示例代码,可以更好地适应新版本Spark并利用其提供的新功能。

        7. 实验问题:在Spark应用程序运行过程中,对其进行监控和调试是一项重要任务会面临日志分析困难、任务跟踪和性能分析等。

解决方法:使用Spark提供的监控工具和API,例如Spark UI、Event Tracker和Profiler等,通过这些工具和API,可以实时监控Spark应用程序状态、跟踪任务执行过程并分析性能瓶颈等。


五、心得体会

        1、在这次实验中我深刻体会到了数据去重的重要性,在处理和分析大规模数据时,数据去重能够避免重复数据的干扰,提高数据的质量和准确性。例如在分析用户行为或销售数据时,如果存在大量的重复数据,将会对结果产生误导。通过使用Spark的distinct()函数可以轻松地实现数据去重,并且在处理大规模数据时,去重操作能够显著提高计算效率和结果质量。此外数据去重还可以应用于数据清洗和预处理阶段,在数据预处理时需要删除无效、重复和不完整的数据,以便进行后续的数据分析和挖掘。通过使用Spark的distinct()函数可以快速地去除重复数据,并且对数据进行清洗和预处理,以便更好地探索和分析数据的规律和信息。

        2、Spark是一个开源的大规模数据处理工具,具有高效、可扩展和易用的特点,通过使用Spark可以快速地读取和处理大量数据,并且使用丰富的转换和操作来分析和挖掘数据中的规律和信息。DataFrame提供了一种以表格形式组织数据的结构,可以方便地处理各种类型的数据。在处理和分析数据时,Spark提供了丰富的转换和操作,例如可以使用map()函数对数据进行转换,使用filter()函数对数据进行筛选,使用reduceByKey()函数对数据进行聚合操作。

        3、使用Spark的read()函数来读取不同格式的数据,使用DataFrame API提供的各种函数对数据进行处理和分析,使用select()函数选择指定的列进行输出,使用where()函数对数据进行筛选,使用groupBy()函数对数据进行分组和聚合操作,还可以使用DataFrame API提供的统计函数来计算数据的统计信息,如平均值、标准差、计数等。

        4、合理地设置Spark的配置参数,以提高计算效率和减少资源浪费,选择合适的转换和操作来处理和分析数据以确保结果的准确性和可靠性。在分布式计算中,某些键值对的出现频率可能远高于其他键值对,导致数据分布不均衡,这会引发集群中的负载倾斜问题,影响计算性能和结果质量。为了避免数据倾斜对计算性能的影响,可以对数据进行预处理或使用应对数据倾斜的算法来解决问题。

        5、DataFrame API提供了丰富的统计和分析函数,可以方便地对数据进行分组、聚合、过滤和排序等操作,通过使用DataFrame API可以快速地获得数据的统计信息和分布情况,并且对数据进行更深入的挖掘和分析。可以使用groupBy()函数对数据进行分组操作,然后使用agg()函数对分组后的数据进行聚合操作。使用window()函数对数据进行窗口分析,以便更好地探索数据的趋势和规律,窗口函数可以用于计算滑动平均值、累计和等动态变化的数据指标。

        6、使用Spark进行数据处理具有高效处理大规模数据、简单易用的API、灵活的数据处理方式、高效的资源利用率、稳定的计算性能等优点,这些优点使得Spark成为目前大规模数据处理领域中备受关注和常用的工具之一。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/915193.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/915193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习漫画小抄 - 彩图版

斯坦福机器学习漫画小抄&#xff0c;中文版来啦&#xff01; 下载地址&#xff1a; 通过网盘分享的文件&#xff1a;机器学习知识点彩图版.pdf 链接: https://pan.baidu.com/s/1-fH9OpC_u_OrTqWy6gVUCA 提取码: 246r

1.初始化

业务模块核心技术栈业务&#xff08;亮点&#xff09;解决方案课程安排01 认识Vue3为什么需要学Vue3?Vue3组合式API体验Vue3更多的优势2 使用create-vue搭建Vue3项目认识 create-vue使用create-vue创建项目3 熟悉项目目录和关键文件项目目录和关键文件4 组合式API - setup选项…

Milvus分布式数据库工作职责

主导腾讯云Milvus服务化项目&#xff0c;设计多租户隔离方案&#xff0c;支撑日均10亿向量请求&#xff0c;延迟降低40%。优化IVF_PQ索引构建流程&#xff0c;通过量化编码压缩使内存占用减少60%&#xff0c;QPS提升35%。开发基于Kubernetes的Milvus Operator&#xff0c;实现自…

FMEA-CP-PFD三位一体数字化闭环:汽车部件质量管控的速效引擎

FMEA-CP-PFD三位一体数字化闭环&#xff1a;汽车部件质量管控的速效引擎 全星FMEA软件系统通过​​FMEA&#xff08;失效模式分析&#xff09;、CP&#xff08;控制计划&#xff09;、PFD&#xff08;过程流程图&#xff09;三大工具的一体化协同管理​​&#xff0c;为汽车部件…

VUE2 学习笔记1

目录 VUE特点 文档tips 开发者工具 从一个Hello world开始 hello world Demo 容器和实例的对应关系 差值语法{{}} VUE特点 构建用户界面&#xff1a;可以用来把数据构建成用户界面。 渐进式&#xff1a;自底向上&#xff0c;可以先从一个非常轻量级的框架开始&#xf…

嵌入式学习系统编程(四)进程

目录 一、进程 1.程序和进程 2.进程的八种状态 3. 几个状态 4.关于进程常用命令 二、关于进程的函数 1.fork 2.面问 3.孤儿进程 后台进程 2. exec函数族 (只保留父子关系&#xff0c;做新的事情) strtok函数 三、进程的结束 1.分类 exit和_exit的区别 wait函数…

Linux中添加重定向(Redirection)功能到minishell

前言&#xff1a;在谈论添加minishell之前&#xff0c;我再重谈一下重定向的具体实现等大概思想&#xff01;&#xff01;&#xff01;方便自己回顾&#xff01;&#xff01;&#xff01; 目录 一、重定向&#xff08;Redirection&#xff09;原理详解 1、文件描述符基础 2、…

Django由于数据库版本原因导致数据库迁移失败解决办法

在django开发中&#xff0c;一般我们初始化一个项目之后&#xff0c;创建应用一般就会生成如下的目录&#xff1a;django-admin startproject myproject python manage.py startapp blogmyproject/ ├── manage.py └── myproject/ | ├── __init__.py | ├── se…

C++STL系列之vector

前言 vector是变长数组&#xff0c;有点像数据结构中的顺序表&#xff0c;它和list也是经常被拿出作对比的&#xff0c; vector使用动态分配数组来存储它的元素。当新元素插入时候&#xff0c;这个数组需要被重新分配大小&#xff0c;如果扩容&#xff0c;因为要开一个新数组把…

Functional C++ for Fun Profit

Lambda Conf上有人讲C函数式编程。在Functional Conf 2019上&#xff0c;就有主题为“Lambdas: The Functional Programming Companion of Modern C”的演讲。演讲者介绍了现代C中函数式编程相关内容&#xff0c;讲解了如何使用Lambda表达式编写符合函数式编程原则的C代码&…

Python基础理论与实践:从零到爬虫实战

引言Python如轻舟&#xff0c;载你探寻数据宝藏&#xff01;本文从基础理论&#xff08;变量、循环、函数、模块&#xff09;启航&#xff0c;结合requests和BeautifulSoup实战爬取Quotes to Scrape&#xff0c;适合零基础到进阶者。文章聚焦Python基础&#xff08;变量、循环、…

ThingJS开发从入门到精通:构建三维物联网可视化应用的完整指南

文章目录第一部分&#xff1a;ThingJS基础入门第一章 ThingJS概述与技术架构1.1 ThingJS平台简介1.2 技术架构解析1.3 开发环境配置第二章 基础概念与核心API2.1 核心对象模型2.2 场景创建与管理2.3 对象操作基础第三章 基础开发实战3.1 第一个ThingJS应用3.2 事件系统详解3.3 …

关于list

1、什么是listlist是一个带头结点的双向循环链表模版容器&#xff0c;可以存放任意类型&#xff0c;需要显式定义2、list的使用有了前面学习string和vector的基础&#xff0c;学习和使用list会方便很多&#xff0c;因为大部分的内容依然是高度重合的。与顺序表不同&#xff0c;…

Mysql 查看当前事务锁

在 MySQL 中查看事务锁&#xff08;锁等待、锁持有等&#xff09;&#xff0c;可以使用以下方法&#xff1a; 一、查看当前锁等待情况&#xff08;推荐&#xff09; SELECTr.trx_id AS waiting_trx_id,r.trx_mysql_thread_id AS waiting_thread,r.trx_query AS waiting_query,b…

【Keil5-map文件】

Keil5-map文件■ map文件■ map文件

k8s 基本架构

基于Kubernetes(K8s)的核心设计&#xff0c;以下是其关键基本概念的详细解析。这些概念构成了K8s容器编排系统的基石&#xff0c;用于自动化部署、扩展和管理容器化应用。### 一、K8s核心概念概览 K8s的核心对象围绕容器生命周期管理、资源调度和服务发现展开&#xff0c;主要包…

Bell不等式赋能机器学习:微算法科技MLGO一种基于量子纠缠的监督量子分类器训练算法技术

近年来&#xff0c;量子计算&#xff08;Quantum Computing&#xff09; 和 机器学习&#xff08;Machine Learning&#xff09; 的融合成为人工智能和计算科学领域的重要研究方向。随着经典计算机在某些复杂任务上接近计算极限&#xff0c;研究人员开始探索量子计算的独特优势…

Edge浏览器设置网页自动翻译

一.浏览网页自动翻译设置->扩展->获取Microsoft Edge扩展->搜索“沉浸式翻译”->获取 。提示&#xff1a;如果采用其他的翻译扩展没找自动翻译功能&#xff0c;所以这里选择“沉浸式翻译”二.基于Java WebElement时自动翻译Java关键代码&#xff1a;提示&#xff1…

TCP/UDP协议深度解析(四):TCP的粘包问题以及异常情况处理

&#x1f50d; 开发者资源导航 &#x1f50d;&#x1f3f7;️ 博客主页&#xff1a; 个人主页&#x1f4da; 专栏订阅&#xff1a; JavaEE全栈专栏 本系列往期内容~ TCP/UDP协议深度解析&#xff08;一&#xff09;&#xff1a;UDP特性与TCP确认应答以及重传机制 TCP/UDP协议深…

R 基础语法

R 基础语法 R 语言是一种针对统计计算和图形表示而设计的编程语言&#xff0c;广泛应用于数据分析、统计学习、生物信息学等领域。本文将为您介绍 R 语言的基础语法&#xff0c;帮助您快速入门。 1. R 语言环境搭建 在开始学习 R 语言之前&#xff0c;您需要安装并配置 R 语言环…