Spark 简介
Apache Spark 是一个开源的分布式计算框架,专为大规模数据处理而设计。它通过内存计算和优化的执行引擎显著提升了数据处理速度,适用于批处理、实时流处理、机器学习和图计算等场景。
核心特性
高性能:利用内存计算(In-Memory Processing)减少磁盘 I/O,比传统 MapReduce 快数十倍。
易用性:支持 Java、Scala、Python(PySpark)和 R 等多种语言,提供高级 API(如 DataFrame、SQL)。
Apache Spark包含4个核心大模块,SparkSQL,Spark流处理,Spark机器学习,Graphx图
集成多个库(Spark SQL、MLlib、GraphX、Spark Streaming),覆盖数据分析全流程。
容错性:基于弹性分布式数据集(RDD)实现数据自动恢复,保障任务稳定性。
主要组件
Spark Core:提供任务调度、内存管理和分布式任务执行基础功能。
Spark SQL:支持结构化数据处理,兼容 Hive、JSON、Parquet 等数据源。
Spark Streaming:实时流处理,支持 Kafka、Flume 等数据源接入。
MLlib:内置机器学习算法库(分类、回归、聚类等)。
GraphX:图计算库,支持 PageRank、连通性分析等算法。
应用场景
- 批量数据处理:ETL(数据提取、转换、加载)、日志分析。
- 实时分析:监控系统、欺诈检测。
- 机器学习:推荐系统、预测模型训练。
- 图计算:社交网络分析、路径规划。
示例代码(PySpark 计算词频):
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("WordCount").getOrCreate()
text = spark.read.text("input.txt")
words = text.selectExpr("explode(split(value, ' ')) as word")
result = words.groupBy("word").count()
result.show()
Spark SQL示例
以下是一些实用的Spark SQL示例,涵盖基础查询、聚合、窗口函数、UDF等常见操作,适用于Spark 3.0+版本。示例基于DataFrame API和SQL语法。
基础查询
创建DataFrame并注册临时表
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("examples").getOrCreate()# 示例数据
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
df = spark.createDataFrame(data, ["name", "age"])
df.createOrReplaceTempView("people")
查询所有列
df.select("*").show()
# 或SQL语法
spark.sql("SELECT * FROM people").show()
条件过滤
df.filter(df.age > 30).show()
# SQL语法
spark.sql("SELECT * FROM people WHERE age > 30").show()
聚合操作
分组统计
df.groupBy("name").agg({"age": "max"}).show()
# SQL语法
spark.sql("SELECT name, MAX(age) FROM people GROUP BY name").show()
多列聚合
from pyspark.sql import functions as F
df.agg(F.min("age"), F.max("age")).show()
窗口函数
计算排名
from pyspark.sql.window import Window
window = Window.orderBy(F.desc("age"))
df.withColumn("rank", F.rank().over(window)).show()
分组内排序
window = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_num", F.row_number().over(window)).show()
复杂操作
JSON数据处理
json_data = spark.read.json("path/to/json/file")
json_data.select("field1", "field2.nested").show()
UDF自定义函数
from pyspark.sql.types import StringType
def reverse_str(s):return s[::-1]
reverse_udf = F.udf(reverse_str, StringType())
df.withColumn("reversed_name", reverse_udf("name")).show()
性能优化
缓存表
spark.sql("CACHE TABLE people")
分区裁剪
df.write.partitionBy("date").parquet("output_path")
spark.read.parquet("output_path").filter("date = '2023-01-01'").show()
完整示例脚本
以下是一个包含示例的脚本链接,可直接运行:
- GitHub Gist示例
- Databricks Notebook示例
注意:实际运行时需根据数据调整字段名称和路径。
基于C++和Apache Spark的实用示例
以下是一些基于C++和Apache Spark的实用示例,涵盖数据处理、机器学习、流计算等场景。Apache Spark原生支持Scala/Java/Python,但通过RDD API或Spark SQL的C++绑定(如SparkR或第三方库),可以在C++中调用Spark功能。
数据处理示例(RDD操作)
// 示例1: 读取文本文件并统计行数
SparkConf conf("local[2]");
SparkContext sc(conf);
RDD<std::string> lines = sc.textFile("hdfs://path/to/file.txt");
std::cout << "Line count: " << lines.count();
// 示例2: 过滤包含关键字的行
RDD<std::string> filtered = lines.filter([](const std::string& line) {return line.find("error") != std::string::npos;
});
键值对操作
// 示例3: 单词计数
RDD<std::string> words = lines.flatMap([](const std::string& line) {std::vector<std::string> tokens;// 分割字符串逻辑return tokens;
});RDD<std::pair<std::string, int>> counts = words.mapToPair([](const std::string& word) {return std::make_pair(word, 1);}).reduceByKey([](int a, int b) { return a + b; });
数值计算
// 示例4: 计算平均值
RDD<double> data = sc.parallelize({1.0, 2.0, 3.0});
double mean = data.reduce([](double a, double b) { return a + b; }) / data.count();
Spark SQL集成
通过C++连接Spark SQL(需使用JNI或Thrift接口):
// 示例5: 执行SQL查询
SqlContext sqlContext(sc);
DataFrame df = sqlContext.sql("SELECT * FROM table WHERE value > 100");
流处理示例
// 示例6: 套接字流词频统计
StreamingContext ssc(Seconds(1));
ReceiverInputDStream<std::string> lines = ssc.socketTextStream("localhost", 9999);
lines.flatMap(...).countByValue().print();
ssc.start();
ssc.awaitTermination();
机器学习(MLlib)
通过C++调用Spark的MLlib(需封装Java/Scala API):
// 示例7: 线性回归训练
LinearRegressionModel model = LinearRegressionWithSGD::train(trainingData, // RDD<LabeledPoint>iterations,stepSize
);
图计算(GraphX)
// 示例8: PageRank算法
Graph<std::string, double> graph = ... // 构建图
GraphOps::pagerank(graph, tolerance, maxIter);
其他实用场景
- 示例9: 分布式矩阵乘法
- 示例10: JSON/CSV文件解析
- 示例11: 分布式排序
- 示例12: 广播变量使用
- 示例13: 累加器统计
- 示例14: 分区操作优化
- 示例15: 自定义序列化
注意事项
- C++支持限制:Spark官方未提供原生C++ API,需通过以下方式实现:
- 使用JNI调用Java/Scala API
- 使用第三方库如SparkR(R的C++接口)
- 通过Thrift/HTTP协议与Spark集群通信
- 性能建议:避免频繁的C++/Java数据转换,优先使用列式存储格式(Parquet/ORC)。
完整项目示例可参考GitHub仓库(如spark-cpp或spark-jni-wrapper)。
Java Spark Core的任务调
以下是一些基于Java Spark Core的任务调度、内存管理和分布式任务执行的基础功能实例,涵盖常见场景和操作:
初始化SparkContext
SparkConf conf = new SparkConf().setAppName("Example").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
创建RDD
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(data);
转换操作:Map
JavaRDD<Integer> squared = rdd.map(x -> x * x);
转换操作:Filter
JavaRDD<Integer> evenNumbers = rdd.filter(x -> x % 2 == 0);
转换操作:FlatMap
JavaRDD<String> words = sc.parallelize(Arrays.asList("Hello World", "Hi There"));
JavaRDD<String> flattened = words.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
转换操作:Distinct
JavaRDD<Integer> uniqueNumbers = rdd.distinct();
转换操作:Union
JavaRDD<Integer> anotherRDD = sc.parallelize(Arrays.asList(6, 7, 8));
JavaRDD<Integer> unionRDD = rdd.union(anotherRDD);