Spark自定义累加器实现高效WordCount

目录

1. 代码功能概述

2. 代码逐段解析

主程序逻辑

自定义累加器 MyAccumulator

3. Spark累加器原理

累加器的作用

AccumulatorV2 vs AccumulatorV1

累加器执行流程

4. 代码扩展与优化建议

支持多词统计

线程安全优化

使用内置累加器

5. Spark累加器的适用场景

6. 总结


package core.bcimport org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutableobject AccWordCount {def main(args: Array[String]): Unit = {val sparkConf=new SparkConf().setMaster("local").setAppName("AccWordCount")val sc = new SparkContext(sparkConf)val value = sc.makeRDD(List("hello","spark","hello"))//累加器:WordCount//创建累加器对象val wcAcc=new MyAccumulator()//向Spark进行注册sc.register(wcAcc, "wordCountAcc")value.foreach(word=>{//数据的累加(使用累加器)wcAcc.add(word)})//获取累加器结果println(wcAcc.value)sc.stop()}/*** 自定义数据累加器* 1、继承AccumulatorV2。定义泛型*  IN:累加器输入的数据类型*  OUT:返回的数据类型* 2、重写方法*/class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Long]]{val wcMap = mutable.Map[String, Long]()override def isZero: Boolean = wcMap.isEmpty//判断知否为初始状态override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new MyAccumulator()//复制一个新的累加器override def reset(): Unit = wcMap.clear()//重置累加器override def add(word: String): Unit ={   //获取累加器需要计算的值val newcount=wcMap.getOrElse(word,0L)+1LwcMap.update(word,newcount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {//Driver合并多个累加器val map1=this.wcMapval map2=other.valuemap2.foreach {case (word, count) => {val newCount = map1.getOrElse(word, 0L) + countwcMap.update(word, newCount)}}}override def value: mutable.Map[String, Long] = wcMap //获取累加器结果}
}
1. 代码功能概述

该代码使用Apache Spark实现了一个基于自定义累加器的单词计数(WordCount)程序。通过自定义MyAccumulator类(继承AccumulatorV2),统计RDD中每个单词的出现次数,并利用累加器的分布式聚合特性将结果汇总到驱动程序。


2. 代码逐段解析
主程序逻辑
object AccWordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local").setAppName("AccWordCount")val sc = new SparkContext(sparkConf)val value = sc.makeRDD(List("hello", "spark", "hello"))// 创建并注册累加器val wcAcc = new MyAccumulator()sc.register(wcAcc, "wordCountAcc")// 遍历RDD,累加单词value.foreach(word => wcAcc.add(word))// 输出结果println(wcAcc.value) // 预期输出:Map(hello -> 2, spark -> 1)sc.stop()}
}
  • RDD创建sc.makeRDD生成包含3个单词的RDD。
  • 累加器注册MyAccumulator实例通过sc.register注册到SparkContext,名称为wordCountAcc
  • 累加操作foreach遍历RDD中的每个单词,调用wcAcc.add(word)累加计数。
  • 结果获取wcAcc.value返回最终的单词计数Map。

自定义累加器 MyAccumulator
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {val wcMap = mutable.Map[String, Long]()override def isZero: Boolean = wcMap.isEmptyoverride def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new MyAccumulator()override def reset(): Unit = wcMap.clear()override def add(word: String): Unit = {val newCount = wcMap.getOrElse(word, 0L) + 1LwcMap.update(word, newCount)}override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach { case (word, count) =>val newCount = map1.getOrElse(word, 0L) + countwcMap.update(word, newCount)}}override def value: mutable.Map[String, Long] = wcMap
}
  • 核心字段wcMap用于存储单词及其计数。
  • 关键方法
    • isZero:判断累加器是否为空(初始状态)。
    • copy:创建累加器的副本(用于任务节点本地计算)。
    • reset:清空累加器状态。
    • add:累加单个单词的计数。
    • merge:合并其他累加器的统计结果(分布式汇总)。
    • value:返回最终结果。

3. Spark累加器原理
累加器的作用
  • 分布式聚合:在多个任务节点上独立计算局部结果,最后汇总到驱动程序。
  • 高效通信:避免频繁的Shuffle操作,减少网络开销。
  • 线程安全:Spark保证每个任务节点内的累加器操作是串行的。
AccumulatorV2 vs AccumulatorV1
  • AccumulatorV1:仅支持简单数据类型(如LongDouble),适用于计数、求和等场景。
  • AccumulatorV2:支持复杂数据类型(如Map、List),需自定义addmerge方法,适用于更灵活的聚合需求(如WordCount)。
累加器执行流程
  1. 任务节点本地计算:每个任务节点维护累加器的本地副本,通过add方法累加数据。
  2. 结果汇总:任务完成后,Spark将各节点的累加器副本发送到驱动程序,调用merge方法合并结果。
  3. 驱动程序获取结果:通过value方法获取全局聚合结果。

4. 代码扩展与优化建议
支持多词统计

当前代码统计单次出现的单词,若需统计多个单词(如键值对),可修改add方法:

override def add(input: String): Unit = {val words = input.split("\\s+") // 按空格分割多词words.foreach(word => {val newCount = wcMap.getOrElse(word, 0L) + 1LwcMap.update(word, newCount)})
}

线程安全优化

add方法可能被多线程并发调用(如在复杂算子中),需添加同步锁:

override def add(word: String): Unit = this.synchronized {val newCount = wcMap.getOrElse(word, 0L) + 1LwcMap.update(word, newCount)
}
使用内置累加器

对于简单场景(如全局计数),可直接使用Spark内置的LongAccumulator

val countAcc = sc.longAccumulator("countAcc")
value.foreach(_ => countAcc.add(1))
println(countAcc.value) // 输出总记录数

5. Spark累加器的适用场景
  • 全局计数:统计任务处理的总记录数、错误数等。
  • 分组统计:如WordCount、用户行为分类统计。
  • 指标监控:实时计算平均值、最大值等(需结合自定义逻辑)。
  • 调试与日志:在不中断作业的情况下收集分布式运行状态。

6. 总结

该代码通过自定义AccumulatorV2实现了分布式单词计数,展示了累加器的核心原理:任务节点本地计算 + 驱动程序全局汇总。通过合理设计addmerge方法,累加器可支持复杂聚合逻辑,是Spark中高效的分布式统计工具。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/920892.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/920892.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源 | 推荐一套企业级开源AI人工智能训练推理平台(数算岛):完整代码包含多租户、分布式训练、模型市场、多框架支持、边缘端适配、云边协同协议:

🔥 Github 主仓库(优先更新)https://github.com/roinli/SSD-GPU-POOL | Gitee 镜像仓库 > 原仓库因故暂停使用,本仓库为镜像项目。开源版本将持续迭代优化,欢迎提交 Issue 或加入社群交流。 GPU 池化平台 | AI 全…

pprint:美观打印数据结构

文章目录一、pprint.pprint():美观化打印二、pprint.pformat():格式化成字符串表示三、pprint() 处理包含__repr__() 方法的类四、递归引用:Recursion on {typename} with id{number}五、depth 参数控制 pprint() 方法的输出深度六、width 参…

解决Docker运行hello-world镜像报错问题

解决Docker运行hello-world镜像报错问题当您运行sudo docker run hello-world命令时出现"Unable to find image hello-world locally"和"context deadline exceeded"错误,这通常是由于Docker无法从默认镜像仓库下载镜像导致的。以下是几种解决方…

一体化步进伺服电机在汽车线束焊接设备中的应用案例

在汽车制造领域,线束焊接是确保电气系统可靠性的关键工艺。为解决传统焊接设备限位精度不足、运行稳定性差等问题,采用‌STM42系列一体化步进伺服电机‌,通过位置模式与原点回归功能的优化配置,显著提升了焊接设备的定位精度与抗干…

【Django】首次创建Django项目初始化

1. 创建虚拟环境例如创建虚拟环境为rebortpython3.6 -m venv test/rebort2. 安装Djangosudo -i cd test/rebort/bin/ source ./activate pip install Django如果是在wingows上安装,同时适用默认安装会,会在python的安装目前下生成了两个文件在lib目录下会…

Spark引擎中RDD的性质

RDD(Resilient Distributed Dataset,弹性分布式数据集)是SparkCore提供的核心抽象。一个RDD在逻辑上抽象地代表了一个HDFS文件或数据库中的表,但RDD是被分区的,每个分区分布在不同的节点上,从而并行执行。 …

人工智能学习:什么是NLP自然语言处理

一、什么是自然语言处理 自然语言处理(Natural Language Processing, 简称NLP)是计算机科学与语言学中关注于计算机与人类语言间转换的领域,主要目标是让机器能够理解和生成自然语言,这样人们可以通过语言与计算机进行更自然的互动。 对于自然语言来说,处理的数据…

【Selenium】UI自动化测试框架设计:从项目结构到Base-Page层的最佳实践

UI自动化测试框架设计:从项目结构到Base-Page层的最佳实践全面解析UI自动化测试项目的架构设计与实现细节,构建可维护的测试框架在现代软件开发中,UI自动化测试已成为确保产品质量的重要环节。一个良好的项目结构和合理的设计模式能够显著提高…

QT项目文件(.pro)指南

概述Qt项目文件(.pro文件)是Qt开发的核心配置文件,它使用qmake工具来管理项目的构建过程。一个良好组织的.pro文件不仅能确保项目正确编译,还能大大提高代码的可维护性和团队协作效率。本文将深入探讨.pro文件的结构、语法和最佳实…

Scikit-learn Python机器学习 - 机器学习开发流程

锋哥原创的Scikit-learn Python机器学习视频教程: 2026版 Scikit-learn Python机器学习 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 课程介绍 本课程主要讲解基于Scikit-learn的Python机器学习知识,包括机器学习概述,特征工程(数据…

基于Spring Cloud Sleuth与Zipkin的分布式链路追踪实战指南

基于Spring Cloud Sleuth与Zipkin的分布式链路追踪实战指南 随着微服务架构的普及,服务间调用链条变得越来越复杂。在生产环境中,定位跨服务调用的性能瓶颈、故障根因,往往需要分布式链路追踪能力。本文结合Spring Cloud Sleuth与Zipkin&…

Coze源码分析-工作空间-项目开发-后端源码

前言 本文将深入分析Coze Studio项目中用户登录后点击"项目开发"功能的后端实现,通过源码解读来理解整个智能体项目管理系统的架构设计和技术实现。 项目架构概览 整体架构设计 Coze Studio后端采用了经典的分层架构模式,将项目开发功能划分为…

单片机元件学习

DS18B20温度传感器51(stc8h8k64u)简介ds18B20是使用单总线的元器件代码/*--------------------------------------------------------------------- */ /* ------------------------ For STC8H MCU ----------------------------- */ /* --- Web: www.…

Spring事务管理策略对比与性能优化实践指南

Spring事务管理策略对比与性能优化实践指南 问题背景介绍 在现代企业级应用中,事务管理是保障数据一致性与安全性的核心机制。Spring作为主流的Java企业级开发框架,提供了多种事务管理方案,包括编程式事务、声明式事务以及与第三方分布式事务…

C++“类吸血鬼幸存者”游戏制作的要点学习

古之学者必有师,对于技术的提升,只靠自己的摸索虽然能得到深刻的经验,但往往没有较高的效率。笔者这些天学习了BV1eM4m1S74K“提瓦特幸存者”的C开发,也是实现了该类型游戏的开发。今天,就通过经验总结,亲手…

Python OpenCV图像处理与深度学习:Python OpenCV图像分割入门

图像分割:从基础到实践 学习目标 通过本课程,学员们将了解图像分割的基本概念,掌握使用OpenCV实现图像分割的方法,包括基于阈值的分割和基于区域的分割技术。同时,学员将能够独立完成简单的图像分割任务,并…

MQ使用场景分析

异步解耦‌系统间通过消息队列通信,降低耦合度(如订单系统与库存系统)典型场景:电商下单后异步通知物流系统‌流量削峰‌应对突发流量,将请求暂存到消息队列逐步处理典型场景:秒杀活动时缓冲高并发请求‌数…

人工智能学习:NLP文本处理的基本方法

一、分词 1、分词介绍 概念 分词就是将连续的字序列按照一定的规范重新组合成词序列的过程。在英文的行文中,单词之间是以空格作为自然分界符的,而中文只是字、句和段能通过明显的分界符来简单划界,唯独词没有一个形式上的分界符。分词过程就是找到这样分界符的过程…

Vue3 中 Proxy 在组件封装中的妙用

目录 Vue3 中 Proxy 在组件封装中的妙用:让组件交互更优雅 组件封装中的常见痛点 Proxy 是什么? Proxy 在组件封装中的应用 基础组件结构 使用 Proxy 实现方法透传 代码解析 父组件中的使用方式 Proxy 的其他应用场景 1. 权限控制 2. 方法调用…

DevExpress WinForms中文教程:Data Grid - 过滤编辑器

DevExpress WinForms拥有180组件和UI库,能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序,无论是Office风格的界面,还是分析处理大批量的业务数据,它都能轻松胜…