生产环境Spark Structured Streaming实时数据处理应用实践分享

封面图片

生产环境Spark Structured Streaming实时数据处理应用实践分享

一、业务场景描述

我们所在的电商平台需要实时监控用户行为数据(如点击、下单、支付等),基于事件级别的流式数据进行实时统计、会话聚合、漏斗分析,并将结果推送到Dashboard和报表存储。原有系统使用的Storm+Kafka方案在高并发时存在容错难、状态管理复杂、维护成本高的问题。

核心需求:

  • 低延迟:端到端处理延迟控制在2秒以内。
  • 可伸缩:能水平扩展,应对峰值10万条/秒消息吞吐。
  • 容错性:任务失败自动重启且保证端到端数据不丢失。
  • 状态管理:支持有状态聚合(窗口、会话)和超大状态存储。

二、技术选型过程

我们对主流实时计算框架进行了对比:

| 框架 | 延迟 | 状态管理 | 易用性 | 扩展性 | 社区成熟度 | | ---- | ---- | ---- | ---- | ---- | ---- | | Apache Storm | 500ms~1s | 需自行实现State Store | 开发复杂 | 高 | 高 | | Apache Flink | 200ms~500ms | 内置强大状态管理 | 编程模型复杂 | 高 | 高 | | Spark Structured Streaming | 1s~2s | 使用Checkpoint and WAL,可容错 | API友好,基于Spark SQL | 高 | 高 | | Apache Kafka Streams | <1s | 基于RocksDB,状态管理受限 | 与Kafka耦合高 | 中 | 中 |

综合考虑团队技术栈和运维成本,我们最终选定Spark Structured Streaming:

  • 与现有Spark Batch集群共用资源。
  • 编程模型统一,SQL/DS/Lambda API支持灵活。
  • Checkpoint与WAL机制简化状态管理,集成HDFS持久化状态。

三、实现方案详解

3.1 项目结构

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com.company.streaming
│   │   │       ├── App.java
│   │   │       └── utils
│   │   │           └── KafkaOffsetManager.java
│   │   └── resources
│   │       └── application.conf
└── README.md

3.2 核心配置(application.conf)

spark.app.name=RealTimeUserBehavior
spark.master=yarn
spark.sql.shuffle.partitions=200
spark.streaming.backpressure.enabled=true
spark.checkpoint.dir=hdfs://namenode:8020/app/checkpoints/structured-streaming
kafka.bootstrap.servers=broker1:9092,broker2:9092
kafka.topic.user=topic_user_behavior
kafka.group.id=user_behavior_group

3.3 主入口代码(App.java)

package com.company.streaming;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.Trigger;public class App {public static void main(String[] args) throws Exception {SparkSession spark = SparkSession.builder().appName("RealTimeUserBehavior").getOrCreate();// 从Kafka读取原始数据Dataset<Row> raw = spark.readStream().format("kafka").option("kafka.bootstrap.servers", spark.sparkContext().getConf().get("kafka.bootstrap.servers")).option("subscribe", spark.sparkContext().getConf().get("kafka.topic.user")).option("startingOffsets", "latest").load();// 解析JSON并选取字段Dataset<Row> userEvents = raw.selectExpr("CAST(value AS STRING) as json").select(org.apache.spark.sql.functions.from_json(org.apache.spark.sql.functions.col("json"),DataSchema.eventSchema()).as("data")).select("data.*");// 实时会话聚合:10分钟无操作认为会话结束Dataset<Row> sessions = userEvents.withWatermark("eventTime", "2 minutes").groupBy(org.apache.spark.sql.functions.window(org.apache.spark.sql.functions.col("eventTime"),"10 minutes", "5 minutes"),org.apache.spark.sql.functions.col("userId")).agg(org.apache.spark.sql.functions.count("eventType").alias("eventCount"),org.apache.spark.sql.functions.min("eventTime").alias("startTime"),org.apache.spark.sql.functions.max("eventTime").alias("endTime"));// 输出到HDFS OR 更新到外部系统sessions.writeStream().outputMode(OutputMode.Update()).trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://namenode:8020/app/output/user_sessions").option("checkpointLocation", spark.sparkContext().getConf().get("spark.checkpoint.dir") + "/sessions").start().awaitTermination();}
}

3.4 关键工具类(KafkaOffsetManager.java)

package com.company.streaming.utils;// 省略:管理Kafka手动提交offset、读写Zookeeper存储偏移量

四、踩过的坑与解决方案

  1. 状态膨胀导致Checkpoint文件过大:

    • 方案:定期做State TTL清理,结合Spark 3.1.1+的state cleanup策略。
  2. Kafka消费位点重复或丢失:

    • 方案:使用KafkaOffsetManager手动管理,结合幂等写入目标系统保证At-Least-Once语义。
  3. 延迟抖动:

    • 方案:开启backpressure,限制最大并行度,并合理调整Trigger频率。
  4. Driver内存溢出:

    • 方案:提升driver内存,拆分业务流程;或将部分轻量计算迁移至Executors。

五、总结与最佳实践

  • 合理规划Checkpoint和WAL存储目录,避免与业务数据混淆。
  • 利用Spark监控UI实时观察批次时长、shuffle写入、延迟指标。
  • 结合PeriodicStateCleanup+Watermark确保有状态算子状态可控。
  • 抽象共通工具类(KafkaOffsetManager、JSON解析、公用Schema),提高代码可维护性。
  • 复杂业务可拆分成多个流式子作业,下游合并结果,增强可扩展性。

通过以上实践,我们成功将平台数据实时处理延迟稳定在1.2秒左右,作业稳定运行10+节点集群一个季度零故障。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97593.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97593.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

海康相机开发---HCNetSDK

HCNetSDK&#xff08;Hikvision Network Software Development Kit&#xff09;是海康威视专为旗下安防监控设备打造的二次开发工具包&#xff0c;是连接上层应用与海康设备的核心桥梁。其封装了设备底层通信协议&#xff08;包括私有协议与部分标准协议&#xff09;&#xff0…

构建无广告私人图书馆Reader与cpolar让电子书库随身携带

文章目录前言&#xff1a;告别书荒&#xff0c;拯救灵魂的“摸鱼神器”1、关于Reader&#xff1a;小而美的开源在线阅读器2、Docker部署3、简单使用reader和添加书源4.群晖安装Cpolar工具5.创建reader阅读器的公网地址6.配置固定公网地址前言&#xff1a;告别书荒&#xff0c;拯…

amd cpu是x86架构吗

是的&#xff0c;AMD CPU属于x86架构‌&#xff0c;其64位扩展&#xff08;x86-64&#xff09;最初由AMD设计并成为行业标准。‌ ‌AMD与x86架构的关系‌ ‌技术渊源‌&#xff1a;AMD自1976年起通过技术授权成为x86架构的合法制造商&#xff0c;与英特尔共同主导x86市场。2003…

vercel上线资源无法加载

背景&#xff1a;在本地跑开发服务器没问题&#xff0c;但是部署到 vercel 上就有问题上一次出现类似问题是在更新游戏引擎方法后本地可以跑但是上线没有成功&#xff0c;当时是因为 runner.html 是在部署时通过脚本从远端仓库拉取的&#xff0c;所以解决方案&#xff1a;1.更新…

Node.js 的模块化规范是什么?CommonJS 和 ES6 模块有什么区别?

目录 一、为什么需要模块化&#xff1f; 二、Node.js 的模块化规范 三、CommonJS 模块化 1. 基本语法 2. 特点 3. 缺点 四、ES6 模块&#xff08;ESM&#xff09; 1. 基本语法 2. 特点 3. 在 Node.js 中的使用 五、CommonJS 和 ES6 模块的区别 六、实际开发中的选择…

设计模式:代理模式(Proxy Pattern)

文章目录一、代理模式的定义二、实例分析三、示例代码一、代理模式的定义 代理模式是一种结构型设计模式&#xff0c;它为某个对象提供一个代理或占位符&#xff0c;以控制对这个对象的访问。简单来说代理对象在客户端和目标对象之间起到中介作用&#xff0c;客户端并不会直接操…

数据类型序列化-封装

/// <summary> /// 定义泛型接口 /// </summary> /// <typeparam name"T">T</typeparam> public interface ISettingValue<T> {/// <summary>/// value/// </summary>T DoubleValue { get; }/// <summary>/// key//…

PitVis-2023挑战赛:内镜下垂体瘤手术视频中的手术流程识别|文献速递-深度学习人工智能医疗图像

Title题目PitVis-2023 challenge: Workflow recognition in videos of endoscopic pituitary surgeryPitVis-2023挑战赛&#xff1a;内镜下垂体瘤手术视频中的手术流程识别01文献速递介绍内镜视觉挑战赛与PitVis-2023挑战赛背景及核心内容 “内镜视觉&#xff08;EndoVis&#…

2025年8月个人工作生活总结

本文为 2025年8月工作生活总结。研发编码 无处不在的AI 现在很多地方都在推AI&#xff0c;广西的人工智能走在前列&#xff0c;要赋能各行各业。至于我&#xff0c;主要就是在写点代码&#xff0c;写点交差的文档。其实现在我已经有点分析哪些代码哪些文字是AI写的了。我工作用…

Dubbo常见面试题

1、默认使用的是什么通信框架&#xff0c;还有别的选择吗? 默认也推荐使用netty框架&#xff0c;还有mina。 2、服务调用是阻塞的吗&#xff1f; 默认是阻塞的&#xff0c;可以异步调用&#xff0c;没有返回值的可以这么做。 3、一般使用什么注册中心&#xff1f;还有别的…

简单的加密算法

// 加密函数&#xff08;32位版本&#xff09; //这里的 data 是ID&#xff0c; dword encrypt(dword data, dword key, int shift) {data ^ key; // 第一步&#xff1a;异或混淆// 循环左移&#xff08;shift范围1-31&#xff09;return (data << sh…

升级的MS9125S USB投屏控制芯片(VGAHD输出)

MS9125S是一款USB单芯片投屏器&#xff0c;内部集成了USB 2.0控制器和数据收发模块、视频DAC、HD接口和音视频处理模块&#xff0c;支持压缩视频传输。MS9125S可以通过USB接口显示或者扩展PC、智能手机、平板电脑的显示信息到更大尺寸的显示设备上&#xff0c;支持VGA和HD视频接…

求欧拉回路:Hierholzer算法图解模拟

代码模板&#xff1a;List<Integer> resultList new ArrayList<>();List<Integer> hierholzer() {dfs(0);resultList.add(0);// 数组反转Collections.reverse(resultList);return resultList; }void dfs(int start) {for(int end : G[start]) {if(!vis[star…

Kafka面试精讲 Day 2:Topic、Partition与Replica机制

【Kafka面试精讲 Day 2】Topic、Partition与Replica机制 在“Kafka面试精讲”系列的第二天&#xff0c;我们将深入剖析Kafka最核心的三大数据组织机制&#xff1a;Topic&#xff08;主题&#xff09;、Partition&#xff08;分区&#xff09;与Replica&#xff08;副本&#x…

【备战2025数模国赛】(三)数模常见赛题类型及解决办法

在进行数学建模竞赛时&#xff0c;很多同学面临的第一个挑战是如何对赛题进行归类&#xff0c;并选择合适的模型。本篇梳理了数学建模中最常见的几类赛题&#xff0c;并针对每类题型提供了基本的解决思路&#xff0c;帮助大家快速选择合适的解题方法&#xff0c;高效完成模型构…

LabVIEW测斜设备承压试验台

为保障煤矿井下地质勘探钻孔中测斜装备的可靠运行&#xff0c;设计基于 LabVIEW的钻孔测斜设备承压性能试验台。该试验台以气动增压泵为压力执行元件&#xff0c;结合虚拟仪器与 PLC 控制技术&#xff0c;可精准模拟井下压力环境&#xff0c;完成水压、疲劳等试验&#xff0c;实…

四、练习1:Git基础操作

练习1&#xff1a;Git基础操作 练习目标 通过实际操作掌握Git的基本命令&#xff0c;包括初始化仓库、添加文件、提交更改等。 练习步骤 步骤1&#xff1a;环境准备 确保已安装Git配置用户信息&#xff08;如果未配置&#xff09; # 检查Git版本 git --version# 配置用户信息 g…

RK3399内核驱动实战:获取设备号控制LED的四种方法(由浅入深、代码注释详尽)

RK3399 内核驱动实战&#xff1a;获取设备号控制 LED 的四种方法&#xff08;由浅入深、代码注释详尽&#xff09; 在 Linux 字符设备驱动开发中&#xff0c;设备号&#xff08;major minor&#xff09;是内核与用户空间沟通的桥梁。文章围绕设备号这一条线展开&#xff0c;从…

2025年AI智能体开源技术栈全面解析:从基础框架到垂直应用

2025年&#xff0c;开源AI智能体技术正以前所未有的速度重塑人工智能领域&#xff0c;从单一任务处理到复杂多智能体协作&#xff0c;开源生态已成为技术创新的核心驱动力。一、开源AI智能体生态概述 1.1 技术演进与发展历程 AI智能体技术经历了从规则式智能体&#xff08;2015…

Empire: LupinOne靶场渗透

Empire: LupinOne 来自 <https://www.vulnhub.com/entry/empire-lupinone,750/#top> 1&#xff0c;将两台虚拟机网络连接都改为NAT模式 2&#xff0c;攻击机上做namp局域网扫描发现靶机 nmap -sn 192.168.23.0/24 那么攻击机IP为192.168.23.128&#xff0c;靶场IP192.16…