dask.dataframe.shuffle.set_index中获取 divisions 的步骤分析

dask.dataframe.shuffle.set_index 中获取 divisions 的步骤分析

主要流程概述

set_index 函数中,当 divisions=None 时,系统需要通过分析数据来动态计算分区边界。这个过程分为以下几个关键步骤:

1. 初始检查和准备

if divisions is None:sizes = df.map_partitions(sizeof) if repartition else []divisions = index2._repartition_quantiles(npartitions, upsample=upsample)mins = index2.map_partitions(M.min)maxes = index2.map_partitions(M.max)divisions, sizes, mins, maxes = base.compute(divisions, sizes, mins, maxes)

步骤说明:

  • 计算每个分区的大小(如果启用重新分区)
  • 调用 _repartition_quantiles 计算近似分位数
  • 并行计算每个分区的最小值和最大值
  • 使用 base.compute 触发实际计算

2. 分位数计算过程 (_repartition_quantiles)

_repartition_quantiles 方法调用 partition_quantiles 函数,该函数执行以下步骤:

2.1 生成采样策略
def sample_percentiles(num_old, num_new, chunk_length, upsample=1.0, random_state=None):# 计算随机百分位比例random_percentage = 1 / (1 + (4 * num_new / num_old) ** 0.5)# 生成等间距和随机百分位
2.2 创建计算图
# 1. 数据类型信息
dtype_dsk = {(name0, 0): (dtype_info, df_keys[0])}# 2. 每个分区的百分位摘要
val_dsk = {(name1, i): (percentiles_summary, key, df.npartitions, npartitions, upsample, state)for i, (state, key) in enumerate(zip(state_data, df_keys))
}# 3. 合并和压缩摘要
merge_dsk = create_merge_tree(merge_and_compress_summaries, sorted(val_dsk), name2)# 4. 最终处理
last_dsk = {(name3, 0): (pd.Series, (process_val_weights, merged_key, npartitions, (name0, 0)), qs, None, df.name)
}

3. 数据后处理

divisions = methods.tolist(divisions)
if type(sizes) is not list:sizes = methods.tolist(sizes)
mins = methods.tolist(mins)
maxes = methods.tolist(maxes)

4. 空数据检测和重新分区

empty_dataframe_detected = pd.isnull(divisions).all()
if repartition or empty_dataframe_detected:total = sum(sizes)npartitions = max(math.ceil(total / partition_size), 1)npartitions = min(npartitions, df.npartitions)# 插值生成新的分界点divisions = np.interp(x=np.linspace(0, n - 1, npartitions + 1),xp=np.linspace(0, n - 1, n),fp=divisions,).tolist()

5. 数据类型特殊处理

if pd.api.types.is_categorical_dtype(index2.dtype):dtype = index2.dtypemins = pd.Categorical(mins, dtype=dtype).codes.tolist()maxes = pd.Categorical(maxes, dtype=dtype).codes.tolist()

6. 排序优化检查

if (mins == sorted(mins) and maxes == sorted(maxes) and all(mx < mn for mx, mn in zip(maxes[:-1], mins[1:]))):divisions = mins + [maxes[-1]]result = set_sorted_index(df, index, drop=drop, divisions=divisions)return result.map_partitions(M.sort_index)

这个检查的作用:

  • 如果数据已经按索引排序,可以直接使用最小值和最大值作为分界点
  • 避免昂贵的shuffle操作

分位数计算详细过程

核心算法:percentiles_summary 函数
def percentiles_summary(df, num_old, num_new, upsample, state):"""Summarize data using percentiles and derived weights."""# 1. 生成采样百分位qs = sample_percentiles(num_old, num_new, len(df), upsample, state)# 2. 计算百分位值vals = df.quantile(qs)# 3. 转换为权重return percentiles_to_weights(qs, vals, len(df))
权重计算:percentiles_to_weights 函数
def percentiles_to_weights(qs, vals, length):"""Weigh percentile values by length and the difference between percentiles"""if length == 0:return ()diff = np.ediff1d(qs, 0.0, 0.0)weights = 0.5 * length * (diff[1:] + diff[:-1])return vals.tolist(), weights.tolist()

权重计算原理:

  • 每个百分位值的权重 = 0.5 × 分区长度 × (前一个百分位差 + 后一个百分位差)
  • 这样确保权重反映该值在数据分布中的重要性
合并和压缩:merge_and_compress_summaries 函数
def merge_and_compress_summaries(vals_and_weights):"""Merge and sort percentile summaries that are already sorted."""vals, weights = zip(*vals_and_weights)vals = list(merge_sorted(*vals))weights = list(merge_sorted(*weights))# 压缩重复值compressed_vals = []compressed_weights = []# ... 压缩逻辑return compressed_vals, compressed_weights
最终处理:process_val_weights 函数

这个函数(我们之前分析过的)将合并后的值-权重对转换为最终的分区边界。

完整流程图和示例

让我创建一个详细的流程图来展示整个过程:

开始 set_index(df, index, divisions=None)↓
检查 divisions 是否为 None↓
是 → 开始动态计算分界点↓
1. 并行计算:- 每个分区的大小 (sizes)- 每个分区的分位数 (divisions)- 每个分区的最小值 (mins)- 每个分区的最大值 (maxes)↓
2. 触发计算:base.compute(divisions, sizes, mins, maxes)↓
3. 分位数计算过程:┌─────────────────────────────────────────┐│ 对每个分区执行 percentiles_summary:     ││ 1. 生成采样百分位 (等间距 + 随机)       ││ 2. 计算百分位值                        ││ 3. 转换为权重                          │└─────────────────────────────────────────┘↓
4. 合并所有分区的摘要:┌─────────────────────────────────────────┐│ merge_and_compress_summaries:           ││ 1. 合并排序的值和权重                   ││ 2. 压缩重复值                          │└─────────────────────────────────────────┘↓
5. 最终处理:┌─────────────────────────────────────────┐│ process_val_weights:                    ││ 1. 处理巨型值                          ││ 2. 计算最终分界点                      ││ 3. 处理数据类型                        │└─────────────────────────────────────────┘↓
6. 后处理:- 转换为列表格式- 检查空数据- 处理重新分区- 特殊数据类型处理↓
7. 排序优化检查:- 如果数据已排序,使用 min/max 作为分界点- 否则继续到 shuffle 阶段↓
调用 set_partition 进行实际的数据重排↓
结束

关键优化策略

  1. 采样策略:结合等间距和随机百分位,平衡计算效率和准确性
  2. 排序检测:如果数据已排序,避免昂贵的shuffle操作
  3. 数据类型感知:特别处理分类、时间等特殊数据类型
  4. 内存优化:通过压缩和合并减少内存使用
  5. 分布式计算:利用Dask的并行计算能力

性能考虑

  • 时间复杂度:O(n log n),主要由排序和分位数计算决定
  • 空间复杂度:O(n),存储采样数据和权重
  • 网络开销:需要收集所有分区的统计信息
  • 计算开销:需要两次数据遍历(统计 + shuffle)

总结

dask.dataframe.shuffle.set_index 中获取 divisions 的过程是一个复杂的分布式算法,主要包含以下步骤:

核心步骤

  1. 并行统计:计算每个分区的分位数、大小、最小值、最大值
  2. 分位数计算:使用采样策略生成代表性百分位
  3. 权重分配:根据数据分布为每个值分配权重
  4. 合并压缩:合并所有分区的统计信息并压缩重复值
  5. 分界点计算:使用 process_val_weights 计算最终分界点
  6. 优化检查:检测数据是否已排序,避免不必要的shuffle

关键特点

  • 分布式设计:充分利用Dask的并行计算能力
  • 智能采样:结合等间距和随机采样策略
  • 类型感知:特别处理不同数据类型
  • 性能优化:检测已排序数据,避免重复计算
  • 内存高效:通过压缩和合并减少内存使用

这个算法是Dask DataFrame实现高效分布式排序和分区的核心,通过巧妙的采样和合并策略,在保证准确性的同时实现了良好的性能。

自己实现

import numpy as np
import pandas as pd# 1️⃣ 采样百分位
def sample_percentiles(num_old, num_new, chunk_length, upsample=1.0, random_state=None):"""简单版本:等间距百分位"""return np.linspace(0, 1, num_new + 1)# 2️⃣ 计算百分位摘要(值+权重)
def percentiles_summary(series, num_old, num_new):qs = sample_percentiles(num_old, num_new, len(series))vals = series.quantile(qs).to_numpy()diff = np.ediff1d(qs, 0.0, 0.0)weights = 0.5 * len(series) * (diff[1:] + diff[:-1])return vals.tolist(), weights.tolist()# 3️⃣ 合并多个分区的摘要
def merge_and_compress_summaries(summaries):all_vals = []all_weights = []for vals, weights in summaries:all_vals.extend(vals)all_weights.extend(weights)# 按值排序order = np.argsort(all_vals)vals = np.array(all_vals)[order]weights = np.array(all_weights)[order]# 压缩重复值compressed_vals = []compressed_weights = []last_val = Nonefor v, w in zip(vals, weights):if last_val is not None and v == last_val:compressed_weights[-1] += welse:compressed_vals.append(v)compressed_weights.append(w)last_val = vreturn np.array(compressed_vals), np.array(compressed_weights)# 4️⃣ 最终处理:计算分界点
def process_val_weights(vals, weights, npartitions):if len(vals) == 0:return np.array([])if len(vals) == npartitions + 1:return valselif len(vals) < npartitions + 1:q_weights = np.cumsum(weights)q_target = np.linspace(q_weights[0], q_weights[-1], npartitions + 1)return np.interp(q_target, q_weights, vals)else:target_weight = weights.sum() / npartitionsjumbo_mask = weights >= target_weightjumbo_vals = vals[jumbo_mask]trimmed_vals = vals[~jumbo_mask]trimmed_weights = weights[~jumbo_mask]trimmed_npartitions = npartitions - len(jumbo_vals)q_weights = np.cumsum(trimmed_weights)q_target = np.linspace(0, q_weights[-1], trimmed_npartitions + 1)left = np.searchsorted(q_weights, q_target, side="left")right = np.searchsorted(q_weights, q_target, side="right") - 1lower = np.minimum(left, right)trimmed = trimmed_vals[lower]rv = np.concatenate([trimmed, jumbo_vals])rv.sort()return rv# 5️⃣ 模拟 set_index 中 divisions 的获取
def simulate_set_index(df, column, npartitions):num_old = len(df)# 假设原始有分区(这里手动切分成2块模拟)partitions = np.array_split(df[column], 2)summaries = [percentiles_summary(p, num_old, npartitions) for p in partitions]vals, weights = merge_and_compress_summaries(summaries)divisions = process_val_weights(vals, weights, npartitions)return divisions# ========== DEMO 使用 ==========
df = pd.DataFrame({"x": np.random.randint(0, 100, size=50)})divs = simulate_set_index(df, "x", npartitions=4)print("原始数据示例:\n", df.head())
print("\n计算得到的 divisions:", divs)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/95659.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/95659.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ai生成ppt工具有哪些?10款主流AI生成PPT工具盘点

随着人工智能技术的飞速发展&#xff0c;AI生成PPT工具逐渐成为职场人士、学生和创作者提升效率的得力助手。这类工具通过智能算法&#xff0c;能够快速将文本、数据或创意转化为结构化、视觉化的演示文稿&#xff0c;大幅节省设计时间。1、AiPPT星级评分&#xff1a;★★★★★…

Qt多线程编程学习

Qt多线程编程学习 1. 项目概述 本项目展示了Qt中多线程编程的基本用法&#xff0c;通过继承QThread类创建自定义线程&#xff0c;并演示了线程的启动、执行和销毁过程。项目包含一个简单的用户界面&#xff0c;用户可以通过按钮控制线程的启动和结束。 1.1 项目结构 项目包含以…

加密货币武器化:恶意npm包利用以太坊智能合约实现隐蔽通信

ReversingLabs研究人员发现两个恶意npm包利用以太坊&#xff08;Ethereum&#xff09;智能合约隐藏并传播恶意软件。这两个名为colortoolsv2和mimelib2的软件包于2025年7月被识别&#xff0c;展现了开源安全攻防战中的新战术。恶意软件包伪装成实用工具攻击活动始于7月7日发布的…

Spring Boot 全局字段处理最佳实践

在日常开发中&#xff0c;我们总会遇到一些琐碎但又无处不在的字段处理需求&#xff1a;• 请求处理: 用户提交的表单&#xff0c;字符串前后带了多余的空格&#xff0c;需要手动 trim()。• 响应处理: 返回给前端的 BigDecimal 金额&#xff0c;因为精度问题导致JS处理出错&am…

三坐标测量机在汽车制造行业中的应用

在汽车制造业中&#xff0c;零部件精度决定着整车性能。从发动机活塞的微米级公差&#xff0c;到车身焊接的毫米级间隙&#xff0c;汽车制造“差之毫厘&#xff0c;谬以千里” &#xff0c;任何细微偏差都可能引发连锁反应&#xff1a;发动机抖动、异响、油耗飙升&#xff0c;车…

机床夹具设计 +选型

机床夹具设计—第2组&#xff08;钻床夹具&#xff09;仿真组装视频_哔哩哔哩_bilibili 夹具-商品搜索-怡合达一站式采购平台 米思米FA标准品电子目录new 可能要吧这些定位块单独用yolo训练一边才能搞识别分析 3长条一短销定位&#xff0c;黄色的用来夹紧 一个面加一短轴一棱…

表格识别技术:通过计算机视觉和OCR,实现非结构化表格向结构化数据的转换,推动数字化转型。

在日常工作和生活中&#xff0c;我们无处不在与表格打交道。从财务报表、发票收据&#xff0c;到科研论文中的数据表、医疗报告&#xff0c;表格以其清晰、结构化的方式&#xff0c;承载着大量关键信息。然而&#xff0c;当这些表格以纸质或图片等非结构化形式存在时&#xff0…

Go基础(②Viper)

Viper 读取配置创建一个配置文件 config.yamlserver:port: 8080timeout: 30 # 超时时间&#xff08;秒&#xff09; database:host: "localhost"user: "root"password: "123456"name: "mydb"然后用 Viper 读取这个配置&#xff0c;代…

kafka Partition(分区)详解

一、什么是 PartitionPartition&#xff08;分区&#xff09; 是 Kafka Topic&#xff08;主题&#xff09; 的最小并行单位。一个 Topic 可以包含多个 Partition&#xff0c;每个 Partition 底层对应一个有序、不可变的消息队列&#xff0c;消息只会顺序追加。Partition 内部消…

中创中间件适配HGDB

文章目录环境文档用途详细信息环境 系统平台&#xff1a;Microsoft Windows (64-bit) 10 版本&#xff1a;5.6.5 文档用途 本文章主要介绍中创中间件简单适配HGDB。 详细信息 一、数据源配置 1.数据库准备 &#xff08;1&#xff09;安装HGDB并创建一个名为myhgdb的数据…

服务器内存和普通计算机内存在技术方面有什么区别?

服务器内存和普通计算机内存在技术上的区别&#xff0c;主要体现在为满足不同工作场景和要求而采用的设计和特性上。下面这个表格汇总了它们的主要技术差异&#xff0c;方便你快速了解&#xff1a; ​技术特性​​服务器内存​​普通计算机内存​​错误校验 (ECC)​​支持ECC(…

哪款AI生成PPT工具对职场新人最友好?操作门槛最低的是哪个?

一句话生成专业PPT&#xff0c;职场新人也能轻松做出高质量演示文稿现代职场节奏快&#xff0c;PPT制作已成为必备技能。然而&#xff0c;职场新人常面临两大挑战&#xff1a;缺乏设计经验&#xff0c;以及需要在有限时间内完成高质量演示。传统PPT制作耗时费力&#xff0c;需梳…

1.注解的力量:Spring Boot如何用注解重构IoC容器

文章目录1.1 IoC容器&#xff1a;Spring的智能管家1.2 注解驱动&#xff1a;给管家下指令1.2.1 SpringBootApplication&#xff1a;总管家的聘书1.2.2 组件注解&#xff1a;员工的身份标识1.2.3 Autowired&#xff1a;依赖注入的三种方式1.2.4 Bean注解&#xff1a;手动招聘特殊…

【算法】92.翻转链表Ⅱ--通俗讲解

一、题目是啥?一句话说清 给你一个链表和两个整数 left 和 right,反转从第 left 个节点到第 right 个节点的子链表,并返回反转后的链表。其他部分保持不变。 示例: 输入:head = [1,2,3,4,5], left = 2, right = 4 输出:[1,4,3,2,5](反转了从第2到第4个节点) 二、解题…

Nature子刊:新发现!深层脑网络中发现强迫症症状的神经生物标志物

强迫症&#xff08;OCD&#xff09;是一种令人困扰的精神疾病&#xff0c;患者常常被强迫思维和强迫行为所困扰。例如&#xff0c;有些人会反复洗手&#xff0c;无法控制自己的清洁冲动&#xff1b;还有些人会不断检查门窗是否关好&#xff0c;即便他们已经确认过无数次。这些行…

Onlyoffice集成与AI交互操作指引(Iframe版)

Onlyoffice集成与AI交互操作指引&#xff08;Iframe版&#xff09; 本文档系统介绍了软件系统集成OnlyOffice实现在线编辑与AI辅助功能的方案。主要内容包括&#xff1a;后端需提供文档配置信息并实现Callback接口以处理文档保存&#xff1b;前端通过Vue集成编辑器&#xff0c…

TypeScript 中 keyof、typeof 和 instanceof

在 TypeScript 开发中&#xff0c;keyof、typeof 和 instanceof 是核心的类型操作符和操作符&#xff0c;专门用于提升类型安全、代码可读性和维护性。1. keyof 操作符定义和用途&#xff1a;keyof 是一个类型操作符&#xff0c;用于获取对象类型的所有键&#xff08;属性名&am…

分布式专题——1.1 Redis单机、主从、哨兵、集群部署

1 Redis 部署 下面演示在 Linux 环境下部署 Redis7。 1.1 单机部署 1.1.1 检查安装 gcc 环境Redis 是由 C 语言编写的&#xff0c;它的运行需要 C 环境&#xff0c;因此我们需要先安装 gcc&#xff1b; # 关闭防⽕墙 systemctl stop firewalld.service # 查看防火墙状态 firewa…

2025年渗透测试面试题总结-54(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。1、SQL注入的防护方法有哪些&#xff1f; 2、永恒之蓝的漏洞原理是什么&#xff1f;怎么做到的&#xff1f; 3、命令…

安卓学习 之 按钮点击事件

今天学习安卓应用中的按钮点击事件&#xff1a;总结下来在安卓应用中的Button注册点击事件的方法主要是以下4种方法&#xff0c;稍后会逐个介绍&#xff1a; 第一种方法&#xff1a;自定义内部类的方法 第二种方法&#xff1a;匿名内部类的方法 第三种方法&#xff1a;当前Acti…