使用Spark Shell探索RDD
启动并使用Scala Spark Shell
在终端窗口,启动Scala Spark shell:
spark-shell --master local
查看对象:
scala> sc
scala> spark
输入spark.[TAB]
然后可以看到所有可用的方法。
读并显示文本文件
查看文本$DATA_EXERCISE/frostroad.txt
读取本地文件来创建RDD。Spark并没有读文件,直到你执行了action操作,比如统计数据集行数。
尝试执行collect操作来显示RDD的所有数据。
输入mydata.[TAB]
可以看到所有可用的转换操作。
输入exit
退出。
操作代码
[root@master ~]# spark-shell --master localscala> val fr_rdd = sc.textFile("file:/root/dataExercise/frostroad.txt")
fr_rdd: org.apache.spark.rdd.RDD[String] = file:/root/dataExercise/frostroad.txt MapPartitionsRDD[1] at textFile at <console>:24scala> fr_rdd.take(2)
res0: Array[String] = Array(Two roads diverged in a yellow wood,, And sorry I could not travel both)scala> fr_rdd.collect()scala> fr_rdd.collect().foreach(println)scala> fr_rdd.
++ first max take
aggregate flatMap min takeAsync
barrier fold name takeOrdered
cache foreach
使用RDD来转换数据集
探索Web日志文件
日志文件/dw/weblogs
示例:
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /KBDOC-00150.html HTTP/1.0" 200 19203 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F11L"
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /theme.css HTTP/1.0" 200 10684 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F11L"
从数据文件创建RDD。
创建只包含请求图片JPG文件的RDD。
使用take查看前10行数据。
在日志中返回每行的长度。
把每一行映射成一个数组,查看前5条。
定义新的RDD,日志文件的每一行只包含IP地址。
最后,保存IP地址列表到/dw/iplist
在终端窗口或Hue文件浏览器,列出/dw/iplist
目录内容。你可以看到多个part-xxxxx文件。查看文件内容确认结果是正确的。
如果需要节约内存,可以停止cloudera服务
service cloudera-scm-server stop
service cloudera-scm-agent stop
scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24scala> logs.take(2)
res0: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a_sales.html HTTP/1.0" 200 11416 "http://www.loudacre.com" "Loudacre Mobile Browser Titanic 2300", 34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /theme.css HTTP/1.0" 200 14933 "http://www.loudacre.com" "Loudacre Mobile Browser Titanic 2300")scala> val jpglogs = logs.filter(line => line.contains(".jpg"))scala> jpglogs.take(2)
res1: Array[String] = Array(34.28.1.122 - 65255 [01/Mar/2014:23:57:51 +0100] "GET /ifruit_3a.jpg HTTP/1.0" 200 12554 "http://www.loudacre.com" "Loudacre Mobile Browser Titanic 2300", 242.13.139.123 - 66694 [01/Mar/2014:23:54:48 +0100] "GET /sorrento_f10l.jpg HTTP/1.0" 200 649 "http://www.loudacre.com" "Loudacre Mobile Browser Sorrento F10L")scala> val logsLen = logs.map(line => line.length)
logsLen: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:25scala> logsLen.take(5)
res2: Array[Int] = Array(161, 150, 148, 154, 160)scala> val logs_split = logs.map(line => line.split(" "))
logs_split: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:25scala> logs_split.take(2)scala> val ip_list = logs_split.map(ar => ar(0))
ip_list: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:25scala> ip_list.take(2)
res5: Array[String] = Array(34.28.1.122, 34.28.1.122)scala> ip_list.saveAsTextFile("/dw/iplist")
使用Spark处理数据文件
检查数据
检查$DATA_EXERCISE/activations
里的数据,每个XML文件包含了客户在指定月份活跃的设备数据。
拷贝数据到HDFS的/dw
目录
样本数据示例:
<activations><activation timestamp="1225499258" type="phone"><account-number>316</account-number><device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id><phone-number>5108307062</phone-number><model>iFruit 1</model></activation>…
</activations>
处理文件
读取XML文件并抽取账户号和设备型号,把结果保存到/dw/account-models
,格式为account_number:model
。
输出示例:
1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…
提供了解析XML的函数如下:
// Stub code to copy into Spark Shellimport scala.xml._// Given a string containing XML, parse the string, and
// return an iterator of activation XML records (Nodes) contained in the stringdef getActivations(xmlstring: String): Iterator[Node] = {val nodes = XML.loadString(xmlstring) \\ "activation"nodes.toIterator
}// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {(activation \ "model").text
}// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {(activation \ "account-number").text
}
操作命令
上传文件 hdfs dfs -put $DATA_EXERCISE/activations /dw
scala> val xmls = sc.wholeTextFiles("/dw/activations")
xmls: org.apache.spark.rdd.RDD[(String, String)] = /dw/activations MapPartitionsRDD[12] at wholeTextFiles at <console>:27scala> xmls.take(1)
res9: Array[(String, String)] =
Array((hdfs://master:8020/dw/activations/2013-01.xml,<activations><activation timestamp="1359697709" type="phone"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation><activation timestamp="1359697637" type="phone"><account-number>97068</account-number><device-id>49beb012-d410-40c8-84b6-a5753b68c607</device-id><phone-number>5592763034</phone-number><model>Ronin S1</model></activation><activation timestamp="1359696681" type="phone"><account-number>82601</account-number><device-id>ed58b95d-a7f3-4333-be06-d53890ef1a08</device-id><phone-number>503470...
scala> scala> val xmls_flat = xmls.flatMap(pair => getActivations(pair._2))
xmls_flat: org.apache.spark.rdd.RDD[scala.xml.Node] = MapPartitionsRDD[13] at flatMap at <console>:30scala> xmls_flat.take(1)
res10: Array[scala.xml.Node] =
Array(<activation type="phone" timestamp="1359697709"><account-number>97349</account-number><device-id>e17ff6a8-0899-4a87-972b-30230ebfa6b9</device-id><phone-number>4247767545</phone-number><model>iFruit 4</model></activation>)scala> val acc_model = xmls_flat.map(act => getAccount(act)+":"+getModel(act))
acc_model: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at map at <console>:32scala> acc_model.take(1)
res11: Array[String] = Array(97349:iFruit 4)scala> acc_model.saveAsTextFile("/dw/account-models")
使用Pair RDD来连接2个数据集
探索Web日志文件
使用map-reduce,统计每个用户的请求。
-
使用map创建Pair RDD,User ID作为key,整数1作为value(user ID是每行的第三个字段)
-
汇总每个用户的value
使用countByKey来确定对不同的频率有多少用户访问了网站。即有多少用户访问了1次、两次或者三次等等
- 使用map来倒转key和value,类似于:
(userID, count) => (count, userID)
- 使用countByKey来返回(频率:用户数)键值对的Map
创建一个RDD,用户id为key,用户访问的ip地址作为value。 - 提示:Map为(userid,ipaddress)并使用groupByKey。
scala> val logs = sc.textFile("/dw/weblogs")
logs: org.apache.spark.rdd.RDD[String] = /dw/weblogs MapPartitionsRDD[17] at textFile at <console>:27scala> val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2),1)).reduceByKey((v1,v2) => v1+v2)
user_reqs: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:28scala> user_reqs.take(2)
res15: Array[(String, Int)] = Array((92694,4), (49368,4)) scala> user_reqs.map(pair => (pair._2, pair._1)).countByKey()
res18: scala.collection.Map[Int,Long] = Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, 10 -> 228, 142 -> 9, 14 -> 20, 110 -> 2, 152 -> 6, 164 -> 1, 106 -> 3, 132 -> 10, 116 -> 10,
…………)scala> val user_ips = logs.map(line => line.split(" ")).map(ar => (ar(2),ar(0))).groupByKey()
user_ips: org.apache.spark.rdd.RDD[(String, Iterable[String])] = ShuffledRDD[32] at groupByKey at <console>:28scala> user_ips.take(2)
res20: Array[(String, Iterable[String])] = Array((92694,CompactBuffer(84.206.178.154, 84.206.178.154, 84.206.178.154, 84.206.178.154)), (49368,CompactBuffer(34.179.95.142, 34.179.95.142, 34.179.95.142, 34.179.95.142))) scala> for (pair <- user_ips.take(2)){| println("userid:"+pair._1)| for (ip <- pair._2) println("-- ip:"+ip)| }
userid:92694
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
-- ip:84.206.178.154
userid:49368
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
-- ip:34.179.95.142
解析:
map(line => line.split(" ")) - 将每行日志按空格分割成数组
map(ar => (ar(2),1)) - 提取第三个字段(索引2)作为用户ID,创建键值对(userID, 1)格式假设为:[IP地址] [其他字段] [用户ID] [其他字段]...
reduceByKey((v1,v2) => v1+v2) - 对相同用户ID的值进行累加
结果得到(userID, 总请求次数)的Pair RDD示例输出:
Array((92694,4), (49368,4))3. 统计访问频率分布
scala
user_reqs.map(pair => (pair._2, pair._1)).countByKey()
详细步骤:
map(pair => (pair._2, pair._1)) - 反转键值对,从(userID, count)变为(count, userID)
countByKey() - 统计每个访问次数(count)有多少不同的用户
返回一个Map,键是访问次数,值是对应的用户数量示例输出:
Map(138 -> 6, 170 -> 1, 5 -> 19, 120 -> 6, ...)表示:
有6个用户访问了138次
有1个用户访问了170次
有19个用户访问了5次
有6个用户访问了120次5. 打印用户IP信息
scala
for (pair <- user_ips.take(2)){println("userid:"+pair._1)for (ip <- pair._2) println("-- ip:"+ip)
}
详细步骤:
user_ips.take(2) - 获取前两个用户的记录
对于每个用户:
打印用户ID
遍历该用户的所有IP地址并打印
连接账户数据和Web日志文件
上传账户数据$DATA_EXERCISE/accounts
到HDFS的/dw/accounts
目录。 第一个字段是用户ID。其他字段包含了账户明细,比如创建日期,姓名等。
// 加载账户数据
val accounts = sc.textFile("/dw/accounts/accounts")// 创建Pair RDD (userid -> 账户信息数组)
val accountsRDD = accounts.map(line => {val fields = line.split(",")(fields(0), fields) // (userid, Array[账户信息])
})// 查看前2条账户数据
accountsRDD.take(2).foreach { case (userid, fields) =>println(s"UserID: $userid, Name: ${fields(3)} ${fields(4)}")
}
用map,将accounts里的line作为input,经过计算得到(fields(0), fields)的输出,其中fields是将每行的line用逗号分割成数组
输入:
accounts 是一个 RDD[String],每行代表一个账户记录
示例输入行:"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."处理逻辑:
line.split(","):将每行字符串按逗号,分割成一个字符串数组fields
示例结果:Array("32438", "2012-08-22 20:40:31.0", "\N", "Violet", "Searcy", ...)
fields(0):取数组的第一个元素(用户ID)
(fields(0), fields):创建一个键值对(Tuple2)输出:
返回一个新的 Pair RDD,类型为 RDD[(String, Array[String])]
示例输出元素:("32438", Array("32438", "2012-08-22...))
数据转换示例:
输入行:
text
"32438,2012-08-22 20:40:31.0,\N,Violet,Searcy,2601 Twin Oaks Drive..."
经过转换后变成:
scala
("32438", // 用户ID作为keyArray( // 所有字段作为value"32438","2012-08-22 20:40:31.0","\\N","Violet","Searcy","2601 Twin Oaks Drive",...)
)
连接weblog数据和账户数据,得到以user ID为key的数据集。包含了用户账户信息和网页数量。
基于账户数据创建RDD,由key/value-array对(userid, [values…])组成。
// 这是第一步已经创建的RDD
val user_reqs = logs.map(line => line.split(" ")).map(ar => (ar(2), 1)).reduceByKey(_ + _)
第一步:logs.map(line => line.split(" "))
作用:将原始日志的每一行按空格分割成数组输入:line(原始日志行,如 "34.28.1.122 - 65255 [01/Mar/2014:23:57:51 ...")处理:split(" ") 按空格分割输出:字符串数组 Array(34.28.1.122, -, 65255, [01/Mar/2014:23:57:51, ...)关键字段位置:
ar(0):IP地址(如 34.28.1.122)
ar(1):分隔符(-)
ar(2):用户ID(如 65255)← 这是我们需要的字段第二步:.map(ar => (ar(2), 1))
作用:生成 (用户ID, 1) 的键值对输入:上一步的数组 ar处理:取 ar(2)(用户ID)作为 key,固定值 1 作为 value输出:Pair RDD 格式 (String, Int),如 ("65255", 1)为什么是1:每个日志行代表一次访问,用 1 表示一次计数,方便后续聚合。第三步:.reduceByKey(_ + _)
作用:对相同用户ID的计数求和输入:上一步的 (用户ID, 1) 对处理:
reduceByKey 将相同 key(用户ID)的 value 合并
_ + _ 是简写,等价于 (v1, v2) => v1 + v2输出:最终统计结果 (用户ID, 总访问次数),如 ("65255", 42)数据流示例
假设原始日志有3行:text
"1.1.1.1 - 101 ..." // 用户101访问1次
"2.2.2.2 - 102 ..." // 用户102访问1次
"3.3.3.3 - 101 ..." // 用户101再访问1次
分割后:scala
Array("1.1.1.1", "-", "101", ...)
Array("2.2.2.2", "-", "102", ...)
Array("3.3.3.3", "-", "101", ...)
生成计数对:scala
("101", 1)
("102", 1)
("101", 1)
聚合结果:scala
("101", 2) // 用户101总计2次
("102", 1) // 用户102总计1次
连接PairRDD和上一步计算的userid/hitcount键值对数据集
// 连接账户数据和点击量数据
val joinedData = accountsRDD.join(user_reqs)// 查看连接后的数据结构
joinedData.take(2).foreach { case (userid, (accountFields, count)) =>println(s"UserID: $userid, Count: $count, Name: ${accountFields(3)} ${accountFields(4)}")
}
第一步:accountsRDD.join(user_reqs)
作用:基于用户ID关联账户信息和访问次数
输入:
accountsRDD: (String, Array[String]) ← (用户ID, 账户字段数组)
user_reqs: (String, Int) ← (用户ID, 访问次数)处理:
通过 用户ID(key) 内连接(inner join)两个RDD
自动匹配两个RDD中相同的用户ID输出:
新RDD格式:(String, (Array[String], Int))
结构示例:("32438", (Array("32438", "2012-08-22..."), 4))
第二步:joinedData.take(2).foreach
作用:查看前2条连接结果并格式化输出数据解构:
scala
case (userid, (accountFields, count)) =>// userid: 用户ID(String)// accountFields: 账户字段数组(Array[String])// count: 访问次数(Int)
字段索引(根据账户数据格式):
accountFields(3): 用户的名(如 "Violet")
accountFields(4): 用户的姓(如 "Searcy")输出示例:
text
UserID: 32438, Count: 4, Name: Violet Searcy
UserID: 32439, Count: 25, Name: Eunice Myers数据流详解
假设有以下数据:
账户数据(accountsRDD):
scala
("32438", Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...))
("32439", Array("32439", "2012-12-15", "...", "Eunice", "Myers", ...))访问次数(user_reqs):
scala
("32438", 4)
("32439", 25)连接后结果(joinedData):
scala
("32438", (Array("32438", "2012-08-22", ..., "Violet", "Searcy"), 4)
)
("32439",(Array("32439", "2012-12-15", ..., "Eunice", "Myers"), 25)
)
完整数据映射图
accountsRDD (用户ID -> 账户详情) user_reqs (用户ID -> 访问次数)
+-----------------------------+ +---------------------+
| "32438" -> [...,Violet,...] | | "32438" -> 4 |
| "32439" -> [...,Eunice,...] | JOIN | "32439" -> 25 |
+-----------------------------+ +---------------------+|v
joinedData (用户ID -> (账户详情, 访问次数))
+--------------------------------------------------+
| "32438" -> ([...,Violet,...], 4) -> "Violet 4" |
| "32439" -> ([...,Eunice,...], 25) -> "Eunice 25" |
+--------------------------------------------------+
显示用户ID,点击量和姓名,比如:
userid1 4 Jack Cheng
userid2 25 John Doe
// 格式化输出:userid, 点击量, 姓名
val formattedResults = joinedData.map { case (userid, (accountFields, count)) => s"$userid $count ${accountFields(3)} ${accountFields(4)}"
}// 显示前10条结果
formattedResults.take(10).foreach(println)
1. 输入数据结构
joinedData 的格式为:
RDD[(String, (Array[String], Int))]
即每个元素是:
(用户ID, (账户字段数组, 访问次数))示例数据:
scala
("32438", (Array("32438", "2012-08-22", "\\N", "Violet", "Searcy", ...), 4)
)
2. map 操作
作用:对 joinedData 的每个元素进行转换模式匹配:
case (userid, (accountFields, count)) 解构嵌套元组:
userid:用户ID(如 "32438")
accountFields:账户字段数组
count:访问次数(如 4)3. 字符串模板
s"$userid $count ${accountFields(3)} ${accountFields(4)}"字段索引:
accountFields(3):名字(如 "Violet")
accountFields(4):姓氏(如 "Searcy")输出格式:
用户ID 访问次数 名 姓
示例:"32438 4 Violet Searcy"
数据转换流程
原始数据 → 提取字段 → 格式化字符串("32438", (Array["32438",...,"Violet","Searcy",...], 4))
→ 提取 "32438"、4、"Violet"、"Searcy"
→ 拼接成 "32438 4 Violet Searcy"
编写和运行Spark应用
编写一个简单的程序来统计web日志文件中JPG请求的数量。文件名将作为参数传递到程序中。
使用Python编写Spark应用
创建CountJPGs.py
文件,实现统计JPG请求的功能。
import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)# TODO: 从参数读取日志文件,实现统计JPG请求的功能
import sys
from pyspark import SparkContextif __name__ == "__main__":if len(sys.argv) < 2:print >> sys.stderr, "Usage: CountJPGs.py <logfile>"exit(-1)sc = SparkContext()logfile = sys.argv[1]sc.setLogLevel("WARN")count = sc.textFile(logfile).filter(lambda line: '.jpg' in line).count()print "Number of JPG requests: ", countsc.stop()
运行程序
在终端中运行Python程序:
# 使用spark-submit运行
spark-submit --master local[*] CountJPGs.py /dw/weblogs
运行并查看结果:
测试成功后,程序将输出统计的JPG请求数量。
提交Spark应用到集群
默认,spark-submit
在本地运行应用。在这个部分,运行应用到YARN集群上。
重新运行程序,指定—master yarn
参数
从运行日志中找到应用ID,并运行使用当前ID来查看结果
spark-submit --master yarn --deploy-mode cluster CountJPGs1.py /dw/weblogs
配置Spark应用
使用之前的实验中使用的CountJPGs.py程序
在命令行设置配置选项
重新运行CountJPGs.py程序,指定应用名'Count JPGs'
访问RM UI并注意命令行指定的应用名
在属性文件中设置配置选项
使用文本编辑器,创建$CODE_EXERCISE/myspark.conf
文件,并添加如下配置:
spark.app.name "My Spark App1"
spark.master yarn
spark.executor.memory 600M
使用属性文件myspark.conf重新运行应用
当应用正在运行,查看YARN UI并确认Spark应用名正确的显示为"My Spark App1"
设置日志级别
修改/etc/spark/conf/log4j.properties
编辑log4j.properties
。第一行替换为DEBUG:
log4j.rootCategory=DEBUG, console
重新运行Spark应用。
spark-submit --master local[*] CountJPGs.py /dw/weblogs
注意到输出包含INFO和DEBUG消息,比如
编辑log4j.properties
文件,替换DEBUG为WARN并重新运行。注意只有WARN消息出来。
在Spark应用UI中查看Jobs和Stages
探索基于文件RDD的分区
启动Spark Shell,为了模拟实际中的多节点集群,使用2个线程运行在本地模式
使用Hue或命令行重新查看账户数据集(/dw/accounts/)。注意文件数量
在数据集中基于单个文件创建RDD,比如,/dw/accounts/part-m-00000
。然后调用toDebugString
。在结果RDD中有多少分区?
重复这个流程,但指定最小3个分区:sc.textFile(filename,3)
。RDD是否有3个分区?
最后,基于账户数据集的所有文件创建RDD。比较一下文件数和RDD的分区数?
设置作业
创建accounts RDD,key是账户id,value是姓名
创建userreqs RDD,统计每个用户页面点击总数
通过user ID进行连接,并基于名字、姓和点击总量重构新的RDD
打印出accounthits.toDebugString
的结果并查看输出,基于这个信息,看是否能确定:
- 作业中有多少stage?
- Stage之间的依赖关系?
- 每个stage包含多少tasks?
运行作业
通过浏览器查看Spark应用UI,http://master:4040
在Spark应用UI中检查Job
在Spark UI中,确保选择了Jobs标签。
重新运行shell并通过执行action(saveAsTextFile)来启动作业
重新加载Spark UI Jobs页面。
点击job description来查看stages。
点击stages来查看stage详情。
当作业完成后,返回Jobs标签查看执行的任务的最终统计和作业花费的时间。
持久化RDD
在上一个实验的基础上来完成后面的步骤
统计用户点击量大于5的账户数量
调用accounthits.persist()
来缓存RDD
在浏览器中,查看Spark应用UI并选择Storage标签。现在你已经标记RDD被持久化,但是还没有执行action操作使得它持久化。
在Spark Shell,执行count操作
查看RDD的toDebugString
。注意输出包含了选择的持久化级别。
重新加载Storage标签,注意持久化的RDD显示出来。点击RDD ID查看分区和持久化的明细。
点击executors标签并使用的内存量和可用的工作节点。
scala> val accounthits = joined.filter(pair => pair._2._1 > 5)scala> accounthits.persist()
res0: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()
res1: Long = 3721scala> accounthits.toDebugString
res2: String =
(15) MapPartitionsRDD[15] at filter at <console>:25 [Memory Deserialized 1x Replicated]| CachedPartitions: 15; MemorySize: 643.5 KB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B| MapPartitionsRDD[13] at join at <console>:27 [Memory Deserialized 1x Replicated]| MapPartitionsRDD[12] at join at <console>:27 [Memory Deserialized 1x Replicated]| CoGroupedRDD[11] at join at <console>:27 [Memory Deserialized 1x Replicated]| ShuffledRDD[4] at reduceByKey at <console>:25 [Memory Deserialized 1x Replicated]+-(15) MapPartitionsRDD[3] at map at <console>:25 [Memory Deserialized 1x Replicated]| MapPartitionsRDD[2] at map at <console>:25 [Memory Deserialized 1x Replicated]| /dw/weblogs MapPartitionsRDD[1] at textFile at <console>:24 [Memory De...
--清理缓存
scala> accounthits.unpersist()
res3: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevelscala> accounthits.persist(StorageLevel.DISK_ONLY)
res4: accounthits.type = MapPartitionsRDD[15] at filter at <console>:25scala> accounthits.count()
Spark SQL处理Hive表数据
数据探索任务
读取 Hive 表 db_userapp.user_base_info
,并查看表结构和部分数据。
检查新的DataFrame的schema
创建新的DataFrame,选择 user_id
和 user_name
列
将DataFrame转换为Pair RDD,字段为 user_id
和 user_name
列
scala> val df_user = spark.read.table("db_userapp.user_base_info")
scala> df_user.printSchema
root|-- user_id: long (nullable = true)|-- user_name: string (nullable = true)|-- age: integer (nullable = true)|-- gender_id: integer (nullable = true)|-- edu_id: integer (nullable = true)|-- marital_id: integer (nullable = true)|-- property_cert_count: integer (nullable = true)|-- car_count: integer (nullable = true)scala> val df_user2 = df_user.select($"user_id",$"user_name")scala> df_user.select($"age"+10).show(4)
+----------+
|(age + 10)|
+----------+
| 95|
| 36|
| 28|
| 80|
+----------+
only showing top 4 rowsscala> df_user.sort($"age".desc).show(4)
+-------+---------+---+---------+------+----------+-------------------+---------+
|user_id|user_name|age|gender_id|edu_id|marital_id|property_cert_count|car_count|
+-------+---------+---+---------+------+----------+-------------------+---------+
| 27| 宋凌寒| 85| 1| 5| 4| 0| 0|
| 1| 周雅芙| 85| 1| 4| 2| 0| 0|
| 41| 黄冰萍| 85| 1| 6| 3| 4| 0|
| 22| 韩映雪| 85| 2| 3| 3| 0| 0|
+-------+---------+---+---------+------+----------+-------------------+---------+
only showing top 4 rowsscala> df_user.registerTempTable("tbl_user")
warning: there was one deprecation warning; re-run with -deprecation for detailsscala> spark.sql("select user_name,age from tbl_user limit 3").show()
+---------+---+
|user_name|age|
+---------+---+
| 周雅芙| 85|
| 王从梦| 26|
| 孙忆翠| 18|
+---------+---+scala> val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString))
关于代码 val rdd_user = df_user.rdd.map(row => (row(0).toString,row(1).toString)的解析
1. 原始RDD数据结构
scala
Array([1,周雅芙,85,1,4,2,0,0], // 第一行[2,王从梦,26,1,3,1,0,0] // 第二行
)
每行是一个org.apache.spark.sql.Row对象字段顺序对应DataFrame的Schema:
scala
root|-- user_id: long (index 0)|-- user_name: string (index 1)|-- age: integer (index 2)|-- gender_id: integer (index 3)|-- edu_id: integer (index 4)|-- marital_id: integer (index 5)|-- property_cert_count: integer (index 6)|-- car_count: integer (index 7)2. 转换操作详解
原始代码:
scala
val rdd_user = df_user.rdd.map(row => (row(0).toString, row(1).toString)
)
对第一行 [1,周雅芙,85,...] 的处理:
row(0) → 取第0个字段:1 (Long型)
.toString → 转为字符串:"1"
row(1) → 取第1个字段:"周雅芙" (已是String)
最终生成键值对:("1", "周雅芙")对第二行 [2,王从梦,26,...] 的处理:
同理生成:("2", "王从梦")3. 转换后的RDD内容
执行rdd_user.take(2)会得到:
Array(("1", "周雅芙"), ("2", "王从梦")
)
数据变换任务
新增年龄分组字段:根据age
字段,将用户分为“未成年”(<18)、“青年”(18-29)、“中年”(30-49)、“老年”(≥50)四类,新增age_group
字段。
拼接姓名与年龄:将user_name
和age
字段拼接为新字段name_age
,格式如“张三-25”。
筛选有房且有车的用户:筛选property_cert_count
>0且car_count
>0的用户,输出其user_id
、user_name
、property_cert_count
、car_count
。
val df_user3 = df_user.withColumn(
"age_group",
when(col("age") <18, "未成年")
.when(col("age") <30, "青年")
.when(col("age") <50, "中年")
.otherwise("老年")
)
年龄分组代码的解析:
2.1 withColumn 方法
功能:向DataFrame添加新列或替换现有列参数:
第一个参数:新列名(此处为"age_group")
第二个参数:列表达式(此处为when-otherwise条件链)2.2 when 条件表达式
工作方式:类似SQL的CASE WHEN语句
链式调用:多个when可以串联,最后以otherwise结束
执行顺序:从上到下依次判断,第一个满足的条件即返回对应值2.3 col("age") 列引用
指向DataFrame中的age列
所有比较操作都基于该列值3. 条件逻辑分解
条件判断 分组标签 对应年龄段
age < 18 未成年 小于18岁
age between 18 and 29 青年 18-29岁(含)
age between 30 and 49 中年 30-49岁(含)
以上都不满足(otherwise) 老年 50岁及以上4. 执行过程示例
以原始数据中的两行为例:
[1,周雅芙,85,...] // age=85
[2,王从梦,26,...] // age=26
第一行处理:
85 < 18? → 否
85 between 18-29? → 否
85 between 30-49? → 否
执行otherwise → "老年"第二行处理:
26 < 18? → 否
26 between 18-29? → 是 → "青年"
统计分析任务
统计不同学历(edu_id)用户的数量。
统计不同性别(gender_id)用户的平均年龄。
统计拥有房产证数量大于0的用户比例。
将处理结果保存为 Parquet 和 JSON 格式到 HDFS 指定目录。