在 PySpark 中解锁窗口函数的力量,实现高级数据转换

本篇文章Mastering PySpark Window Functions: A Practical Guide to Time-Based Analytics适合数据分析和工程师入门了解PySpark的窗口函数。文章的亮点在于详细介绍了窗口函数的基本概念及其在销售数据分析中的实际应用,帮助读者理解如何进行复杂的数据计算而无需多次连接或聚合。


文章目录

  • 1 理解窗口函数:基础
  • 2 搭建分析管道
  • 3 客户级别聚合:理解历史模式
  • 4 滚动窗口:捕捉时间趋势
  • 5 关键概念解释
  • 6 月度滞后特征:季节性模式分析
  • 7 性能优化技巧
  • 8 行业级别基准
  • 9 结论


PySpark logo image

窗口函数是 Apache Spark 中最强大但却未被充分利用的功能之一。它们允许您对与当前行相关的行执行复杂的计算,而无需昂贵的连接或多次聚合。在这篇文章中,我们将通过一个销售分析场景来探讨窗口函数的实际应用。

1 理解窗口函数:基础

可以将窗口函数视为一种在处理每个单独行时“窥视”相邻行的方式。与将多行合并为一行的常规聚合不同,窗口函数会为每个输入行返回一个结果,同时考虑一个相关行的“窗口”。

窗口函数的基本组成包括:

  • Partition By(按分区):将行分组到逻辑分区中
  • Order By(按排序):定义每个分区内的排序
  • Frame(框架):指定分区内要包含在计算中的行

2 搭建分析管道

让我们从一个包含交易记录的销售数据集开始。我们将使用各种窗口函数技术来构建预测支付延迟的特征。

from pyspark.sql import functions as F
from pyspark.sql.window import WindowsalesDF = salesDF.withColumn('transaction_day', F.dayofmonth(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_month', F.month(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_year', F.year(F.col('transaction_date')))
salesDF = salesDF.withColumn('day_of_week', F.dayofweek(F.col('transaction_date')) - 1)
salesDF = salesDF.withColumn('payment_day', F.dayofmonth(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_month', F.month(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_year', F.year(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_day_of_week', F.dayofweek(F.col('payment_due_date')) - 1)

3 客户级别聚合:理解历史模式

在深入了解窗口函数之前,我们通常需要客户级别的统计数据。这些数据为理解当前行为是典型还是异常提供了背景。

salesDF = salesDF.join(salesDF.groupBy('client_id', 'transaction_type').agg(F.mean('delay_days').alias('client_delay_average'),F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('client_delay_weighted_avg'),F.stddev('delay_days').alias('client_delay_stddev'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('client_delay_weighted_stddev'),F.expr('percentile_approx(delay_days, 0.5)').alias('client_delay_median'),F.count('delay_days').alias('client_transaction_count')),on=['client_id', 'transaction_type'],how='left'
)

加权标准差的计算可能看起来很复杂,但它使用的是数学公式:E[X2]−(E[X])2\sqrt{E[X^2] - (E[X])^2}E[X2](E[X])2,其中较大的交易对标准差计算的影响更大。

4 滚动窗口:捕捉时间趋势

这就是窗口函数真正发挥作用的地方。滚动窗口允许我们计算滑动时间段内的指标,捕捉客户行为中的趋势和季节性。

time_windows = [30, 90, 365]for days in time_windows:rolling_window = (Window.partitionBy('client_id', 'transaction_type').orderBy(F.col('transaction_date').cast("timestamp").cast("long")).rangeBetween(-days * 86400, -1))salesDF = salesDF.withColumn(f'delay_rolling_avg_{days}d',F.avg('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_std_{days}d',F.stddev('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_weighted_avg_{days}d',F.try_divide(F.sum(F.col('delay_days') * F.col('invoice_amount')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)))salesDF = salesDF.withColumn(f'delay_rolling_weighted_std_{days}d',F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)), 2)))

5 关键概念解释

Range(范围)与 Rows(行)窗口:我们使用 rangeBetween(-days * 86400, -1) 而不是 rowsBetween(),因为我们想要一个基于时间的窗口。这确保我们能够精确地捕获指定天数的数据,而与交易频率无关。

加权计算:通过按发票金额对指标进行加权,我们赋予了较大交易更高的重要性,这通常能更好地代表客户的支付行为。

排除当前行:将 -1 作为上限可以排除当前交易,从而防止预测模型中的数据泄露。

6 月度滞后特征:季节性模式分析

为了进行长期趋势分析,我们可以创建月度聚合并生成滞后特征以捕捉季节性模式。

monthlyDF = salesDF.groupBy("client_id", "transaction_type", "transaction_year", "transaction_month"
).agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('monthly_weighted_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('monthly_weighted_delay_std')
)monthly_window = Window.partitionBy("client_id", "transaction_type") \.orderBy("transaction_year", "transaction_month")for lag_months in range(1, 13):monthlyDF = monthlyDF.withColumn(f"delay_avg_lag_{lag_months}m",F.lag("monthly_weighted_delay_avg", lag_months).over(monthly_window))monthlyDF = monthlyDF.withColumn(f"delay_std_lag_{lag_months}m",F.lag("monthly_weighted_delay_std", lag_months).over(monthly_window))salesDF = salesDF.join(monthlyDF,on=["client_id", "transaction_type", "transaction_year", "transaction_month"],how="left"
)

7 性能优化技巧

分区策略:始终根据逻辑上对数据进行分组的高基数列进行分区。这可以最大限度地减少数据混洗。

窗口框架优化:使用尽可能限制性的框架。无界窗口开销大且通常不必要。

缓存:当对同一数据集执行多个窗口操作时,考虑缓存中间结果。

salesDF.cache()

8 行业级别基准

不要忘记创建行业或细分市场级别的基准进行比较:

salesDF = salesDF.join(salesDF.groupBy('industry_sector', "transaction_type").agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('industry_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('industry_delay_std')),on=['industry_sector', 'transaction_type'],how='left'
)

9 结论

窗口函数解锁了 PySpark 中复杂的分析能力,使您能够为机器学习和高级分析创建丰富的特征集。关键在于理解何时使用不同类型的窗口:

  • 无界窗口:用于累积指标
  • 基于范围的窗口:用于时间序列分析
  • 基于行的窗口:用于排名和百分位数
  • 滞后函数:用于趋势和季节性检测

通过将这些技术与适当的分区和优化策略相结合,您可以构建健壮、可扩展的分析管道,捕捉数据中复杂的时间模式。

开始在您自己的数据集中尝试这些模式,您很快就会发现窗口函数在将原始数据转化为可操作洞察方面的真正力量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/97746.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/97746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从理念到实践:三层解耦架构与“无系统”论

在上一篇中,我们揭示了“五层双闭环”治理模型如何像骨骼一样,为数字化转型提供支撑和定型。但再宏伟的蓝图也需要坚实的施工来实现。今天,我们将深入最具体的实施层面,将“业务重塑”和“以人为本”的理念,转化为可落…

详细介绍Linux 内存管理struct page数据结构中的_count和_mapcount有什么区别?

在Linux内核的struct page中,_count(或_refcount)和_mapcount是两个关键的引用计数成员,它们各自承担不同的职责。以下是深度解析和代码案例:1. _count vs _mapcount 区别详解_count(或_refcount&#xff0…

面阵 vs 线阵相机:怎么选不踩坑?选型公式直接套用

面阵vs线阵相机:怎么选不踩坑?选型公式直接套用🎯面阵vs线阵相机怎么选不踩坑?🎯一、面阵相机:工业检测的“万能选手”,拍全图靠它🎯二、线阵相机:大视野/高精度的“专属…

Spring Security 如何使用@PreAuthorize注解

🧱 第一步:环境准备✅ 1. 创建数据库(MySQL)-- 创建数据库,使用 utf8mb4 字符集支持 emoji 和多语言 CREATE DATABASE security_demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;-- 使用该数据库 USE security…

JVM中产生OOM(内存溢出)的8种典型情况及解决方案

Java中的OutOfMemoryError(OOM)是当JVM内存不足时抛出的错误。本文将全面剖析JVM中产生OOM的各种情况,包括堆内存溢出、方法区溢出、栈溢出等,并提供详细的诊断方法和解决方案。 一、OOM基础概念 1.1 OOM错误类型 Java中的OOM是…

【IEEE出版、EI检索、往届会后3个月检索】第四届信号处理、计算机网络与通信国际学术会议(SPCNC 2025)

第四届信号处理、计算机网络与通信国际学术会议(SPCNC 2025)将于2025年12月5-7日于中国武汉召开(线上同步)。为本次会议旨在齐聚海内外信号处理、计算机网络与通信等计算机领域的专家学者,为相关领域研究和从业人员提供…

Spring boot注解介绍

1. Spring 核心注解Spring Boot 是基于 Spring 框架的,所以核心注解依然适用。✅ 常见核心注解Component表示一个通用组件,Spring 会自动扫描并注入到容器中。Component public class MyComponent {public void sayHello() {System.out.println("He…

撤销回退 情况⼆:已经 add ,但没有 commit

撤销回退 情况⼆:已经 add ,但没有 commit add 后还是保存到了暂存区呢?怎么撤销呢? 1 # 向ReadMe中新增⼀⾏代码 2 hyb139-159-150-152:~/gitcode$ vim ReadMe 3 hyb139-159-150-152:~/gitcode$ cat ReadMe 4 hello bit 5 hell…

【Linux笔记】命令行与vim基础

一、Linux命令行基础 1. 基本语法命令空格参数(可写可不写)空格文件,文件夹(可写可不写)ls列出文件夹中的内容/opt 根目录下的opt文件夹ls-a all显示出所有文件以及隐藏文件/optls-a如果不写则输出一个点,当…

Redis 的整数集合:像分类收纳盒一样的整数专属存储

目录 一、先懂定位:为什么需要整数集合?(衔接哈希表) 二、整数集合的结构:像 “贴了规格标签的收纳盒” 1. encoding:收纳盒的 “规格标签”(核心:决定格子大小) 2. …

Linux 进程状态 — 僵尸进程

🎁个人主页:工藤新一 🔍系列专栏:C面向对象(类和对象篇) 🌟心中的天空之城,终会照亮我前方的路 🎉欢迎大家点赞👍评论📝收藏⭐文章 文章目录进…

React 中 key 的作用

React 中 key 的作用是什么? Date: August 31, 2025 Area: 原理key 概念 在 React 中,key 用于识别哪些元素是变化、添加或删除的。 在列表渲染中,key 尤其重要,因为它能提高渲染性能和确保组件状态的一致性。key 的作用 1&#x…

wpf之附加属性

前言 附加属性是 WPF 中一个非常强大和独特的概念。简单来说,它允许一个对象为另一个在其本身类定义中未定义的属性赋值。 1、定义附加属性 定义一个Watermark的附加属性,该属性的作用是将TextBox的附加属性改变时,TextBox的字体颜色改成灰…

深入浅出 RabbitMQ-消息可靠性投递

大家好,我是工藤学编程 🦉一个正在努力学习的小博主,期待你的关注实战代码系列最新文章😉C实现图书管理系统(Qt C GUI界面版)SpringBoot实战系列🐷【SpringBoot实战系列】SpringBoot3.X 整合 Mi…

数字化时代,中小企业如何落地数字化转型

大数据时代,各行各业的行业龙头和大型集团都已经开始了数据管理,让数据成为数据资产。但是在我国,中小企业的数量巨大,很多管理者忽视了这一点,今天我们就来聊一聊中小企业的数字化转型。中小企业需要数字化转型首先要…

Unity笔记(九)——画线功能Linerenderer、范围检测、射线检测

写在前面:写本系列(自用)的目的是回顾已经学过的知识、记录新学习的知识或是记录心得理解,方便自己以后快速复习,减少遗忘。这里只记录代码知识。十一、画线功能Linerenderer画线功能Linerenderer是Unity提供的画线脚本,创建一个空…

刷题记录(8)string类操作使用

一、仅反转字母 917. 仅仅反转字母 - 力扣(LeetCode) 简单来说输入字符串,要求你返回所有仅字母位置反转后的字符串。 简单看一个样例加深理解: 前后互换,我想思路基本很明显了,双指针,或者说…

用好AI,从提示词工程到上下文工程

前言 随着 AI 大模型的爆发,提示词工程(prompt engineering ) 一度是用户应用 AI ,发挥 AI 能力最重要、也最应该掌握的技术。 但现在,在 “提示词工程”的基础上,一个更宽泛也更强力的演化概念被提出,也就是本文我们要介绍的 “上下文工程(Context Engineering)” …

计算机Python毕业设计推荐:基于Django+Vue用户评论挖掘旅游系统

精彩专栏推荐订阅:在下方主页👇🏻👇🏻👇🏻👇🏻 💖🔥作者主页:计算机毕设木哥🔥 💖 文章目录 一、项目介绍二、…

⸢ 肆 ⸥ ⤳ 默认安全:安全建设方案 ➭ a.信息安全基线

👍点「赞」📌收「藏」👀关「注」💬评「论」 在金融科技深度融合的背景下,信息安全已从单纯的技术攻防扩展至架构、合规、流程与创新的系统工程。作为一名从业十多年的老兵,将系统阐述数字银行安全体系的建设…