SparkStreaming之04:调优

SparkStreaming调优

一 、要点

4.1 SparkStreaming运行原理

在这里插入图片描述

深入理解

在这里插入图片描述

4.2 调优策略

4.2.1 调整BlockReceiver的数量

在这里插入图片描述

案例演示:

object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("NetworkWordCount")val sc = new SparkContext(sparkConf)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sc, Seconds(5))//创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)val lines = lines1.union(lines2)lines.repartition(100)//处理的逻辑,就是简单的进行word countval words = lines.repartition(100).flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))//将结果输出到控制台wordCounts.print()//启动Streaming处理流ssc.start()//等待Streaming程序终止ssc.awaitTermination()ssc.stop(false)}
}
⭐️4.2.2 调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了

4.2.3 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

⭐️4.2.3 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 数据的序列化

SparkStreaming两种需要序列化的数据:
a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 内存调优
(1)需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大

(2)数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(2)Kafka(0.8版本)

虽然现在的Kafka的版本已经到2.x版本了,但是很多公司因为历史遗留的原因,公司里面还是会有0.8x的Kafka。比如本人公司里面有两个Kafka集群,一个是0.8x的kafka,一个是1.x的Kafka。开发的时候有时候需要我们使用SparkStreaming做实时的ETL,然后再把数据打回Kafka,0.8版本的kafka默认是没有批量提交的功能的。本人公司里面一个真实的案例,一位同学写的SparkStreaming程序将数据处理完了以后通过ForeachRDD把数据写回到0.8Kafka。但是数据处理得很慢,经常会收到延时告警。最终发现他把数据写到Kafka的时候是一条数据一条数据提交的性能很差。最终手动实现了批量提交的功能。从此再也没有收到过告警。

4.2.7 Backpressure(压力反馈)

在这里插入图片描述
在这里插入图片描述

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态

在这里插入图片描述

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉
  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率

在这里插入图片描述

如果延迟了,那么就自动增加资源

在这里插入图片描述

在这里插入图片描述

从Spark2.0有这个功能:: spark.streaming.dynamicAllocation.enabled = true

⭐️4.2.8 数据倾斜调优(重要)

因为SparkStreaming的底层就是RDD,之前SparkCore的所有的数据倾斜的调优策略(见Spark之数据倾斜调优)都适合于SparkStreaming,需要灵活掌握,在实际开发的工作当中用得频率较高。

二 、总结

面试问题:你在工作当中有SparkStreaming调优过项目吗?怎么调优的?效果怎么样?

  1. 比如举foreachRDD的例子
  2. 比如举个数据倾斜的例子
  3. 用Xmind整理调优的策略

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/71601.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/71601.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软考初级程序员知识点汇总

以下是计算机技术与软件专业技术资格(水平)考试(简称“软考”)中 程序员(初级) 考试的核心知识点汇总,涵盖考试大纲的主要方向,帮助你系统复习: 一、计算机基础 计算机组…

Gauss数据库omm用户无法连接处理

确保gauss数据库服务已经打开 重启gauss服务 gs_om -t restart 连接gauss gsql -d postgres -p 26000 -r 结果发现 查看数据库运行情况 gs_om -t status --detail 我们可以看到 cluster_state 的值是 Unavailable 不可用 那么问题大概率是出现在了这里 然后我们再查看一…

36-Openwrt wifi命令工具iwconfig、iwinfo、iwpriv、iwlist

增对wifi的调试命令有很多,这边列出我们常用的命令提供参考,方便查看信息定位问题。 1、iwconfig 查看当前 WIFI 的工作信道以及工作带宽模式: root@openwrt:/# iwconfig ra0 ra0 mt7603e ESSID:"openwrt" Mode:Managed Channel:8 Access Point: DC:4B…

Android 低功率蓝牙之BluetoothGattDescriptor详解

BluetoothGattDescriptor 详解 BluetoothGattDescriptor 是 Android 中用于表示蓝牙低功耗(BLE)设备中 GATT(Generic Attribute Profile)描述符 的类。描述符是 GATT 架构中的一种属性,用于提供关于 特征值&#xff0…

计算机毕业设计Python+DeepSeek-R1大模型医疗问答系统 知识图谱健康膳食推荐系统 食谱推荐系统 医疗大数据(源码+LW文档+PPT+讲解)

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

数字体验推荐TOP8提升用户参与

数字内容体验推荐核心优势 在数字化竞争日益激烈的市场环境中,数字内容体验的差异化优势已成为企业突围的关键。通过智能算法驱动的个性化推荐系统,能够精准捕捉用户行为轨迹与兴趣偏好,实现内容与受众的动态匹配。这种技术不仅显著提升页面…

【每日学点HarmonyOS Next知识】动图循环播放、监听tab切换、富文本上下滚动、tab默认居中、a标签唤起拨号

1、image加载网络动图播放一遍后不再播放,有什么方法可以 设置循环播放 目前ArkUI不支持gif图片设置轮播次数,可通过三方库ohos-gif-drawable设置轮播次数,在播放一次结束后的回调方法getLoopFinish()中更新播放次数,达到指定次数后设置播放…

redis数据迁移教程(使用RedisShake实现不停机迁移十分便捷)

1.我的场景 需要把本地的redis数据上传到阿里云服务器上面,服务器上redis并没有开aof持久化,但是将rdb文件上传至服务器后每次重启redis,rdb文件会被覆盖导致无法同同步数据,最终决定使用RedisShake 2.RedisShake介绍 什么是 RedisShake​ RedisShake 是一个用于处理和迁移…

C语言_数据结构总结4:不带头结点的单链表

纯C语言代码,不涉及C 0. 结点结构 typedef int ElemType; typedef struct LNode { ElemType data; //数据域 struct LNode* next; //指针域 }LNode, * LinkList; 1. 初始化 不带头结点的初始化,即只需将头指针初始化为NULL即可 void Init…

78.StringBuilder简单示例 C#例子 WPF例子

利用 StringBuilder 提升字符串操作性能 在 C# 中,字符串是不可变的,这意味着每次修改字符串时都会创建一个新的对象。这种特性虽然保证了安全性,但在频繁修改字符串的场景中会导致性能问题。StringBuilder 正是为解决这一问题而设计的。 什…

【数据集】社区天气资讯网络CoWIN-香港小时尺度气象数据(含MATLAB处理代码)

社区天气资讯网络CoWIN-香港小时尺度气象数据 数据概述气象变量说明数据提取(MATLAB全代码)输出WRF所需站点气温数据参考数据概述 官网-Community Weather Information Network (CoWIN) data policy CoWIN 提供 2010 - 2024 年 的数据下载,每年数据均可单独下载。下载数据…

【JAVA架构师成长之路】【Redis】第14集:Redis缓存穿透原理、规避、解决方案

30分钟自学教程:Redis缓存穿透原理与解决方案 目标 理解缓存穿透的成因及危害。掌握布隆过滤器、空值缓存等核心防御技术。能够通过代码实现请求拦截与缓存保护。学会限流降级、异步加载等应急方案。 教程内容 0~2分钟:缓存穿透的定义与核心原因 定义…

尚硅谷爬虫note15

一、当当网 1. 保存数据 数据交给pipelines保存 items中的类名: DemoNddwItem class DemoNddwItem(scrapy.Item): 变量名 类名() book DemoNddwItem(src src, name name, price price)导入: from 项目名.items import 类…

LVGL直接解码png图片的方法

通过把png文件解码为.C文件,再放到工程中的供使用,这种方式随时速度快(应为已经解码,代码中只要直接加载图片数据显示出来即可),但是不够灵活,适用于哪些简单又不经常需要更换UI的场景下使用。如…

【计算机网络】Socket

Socket 是网络通信的核心技术之一,充当应用程序与网络协议栈之间的接口。 1. Socket 定义 Socket(套接字)是操作系统提供的 网络通信抽象层,允许应用程序通过标准接口(如 TCP/IP 或 UDP)进行数据传输。它…

Apache XTable:在数据湖仓一体中推进数据互作性

Apache XTable 通过以多种开放表格式提供对数据的访问,在增强互作性方面迈出了一大步。移动数据很困难,在过去,这意味着在为数据湖仓一体选择开放表格式时,您被锁定在该选择中。一个令人兴奋的项目当在数据堆栈的这一层引入互作性…

anolis8.9-k8s1.32-node-二进制部署

一、系统 # cat /etc/anolis-release Anolis OS release 8.9 # uname -r 5.10.134-18.an8.x86_64 二、从master上拷贝dockers及cri-docker相关文件 # groupadd docker # mkdir /etc/docker# scp -P 4033 root192.168.7.201:/etc/systemd/system/containerd.service /etc/s…

《AJAX:前端异步交互的魔法指南》

什么是AJAX AJAX(Asynchronous JavaScript and XML,异步 JavaScript 和 XML) 是一种用于创建异步网页应用的技术,允许网页在不重新加载整个页面的情况下,与服务器交换数据并局部更新页面内容。尽管名称中包含 XML&…

Python 性能优化:从入门到精通的实用指南

Langchain系列文章目录 01-玩转LangChain:从模型调用到Prompt模板与输出解析的完整指南 02-玩转 LangChain Memory 模块:四种记忆类型详解及应用场景全覆盖 03-全面掌握 LangChain:从核心链条构建到动态任务分配的实战指南 04-玩转 LangChai…

利用 requestrepo 工具验证 XML外部实体注入漏洞

1. 前言 在数字化浪潮席卷的当下,网络安全的重要性愈发凸显。应用程序在便捷生活与工作的同时,也可能暗藏安全风险。XXE(XML外部实体)漏洞作为其中的典型代表,攻击者一旦利用它,便能窃取敏感信息、掌控服务…