Joblib库多进程/线程使用(一):使用generator参数实现边响应边使用

 进程与线程的基本概念

特性进程 (Process)线程 (Thread)
定义

操作系统分配资源的基本单位(独立的内存空间)

多进程可真正并行(利用多核 CPU)

进程内的执行单元(共享进程资源)
独立性完全独立,崩溃后不影响其他进程共享进程资源,一个线程崩溃可能导致整个进程崩溃

资源开销

高(需要分配独立的内存、文件句柄等)

不可直接共享,需通过 IPC(如管道、共享内存)

低(共享进程资源,仅需少量栈和寄存器)

可直接访问全局变量(需同步机制避免竞争)

数据共享必须通过 IPC(进程间通信):<br> - 管道(Pipe)<br> - 共享内存(Shared Memory)<br> - 消息队列(Queue)直接共享进程内存(需锁机制)
同步机制通常不需要需要锁(Lock)、信号量(Semaphore)等。
场景进程线程
CPU 密集型任务
(如数值计算、机器学习)
✅ 多进程(绕过 GIL,利用多核)。❌ 多线程(受 GIL 限制,无法并行计算)。
I/O 密集型任务
(如网络请求、文件读写)
⚠️ 可用,但资源开销高。✅ 多线程(轻量级,等待 I/O 时可切换任务)。
需要高隔离性✅ 进程崩溃不影响其他进程。❌ 线程崩溃可能拖垮整个进程。

Joblib库

基础Parallel并行处理

n_jobs:设置并行运行的作业数。

verbose:设置后端的详细程度。

prefer:设置要使用的首选后端。可以是“processes”(进程)或“threads”(线程)。

require:设置对后端的请求。可以是“sharedmem”(共享内存)。

inner_max_num_threads:限制第三方库使用的最大线程数,这些库管理它们自己的 C 级线程池。此参数仅受 backend 支持,这些 backend 将 supports_inner_max_num_threads 类属性设置为 True,例如“loky”后端。

temp_folder、max_nbytes、mmap_mode:控制后端的自动内存映射行为。有关更多详细信息,请参阅在共享内存(内存映射)中使用数值数据

def process_row(row,idx,function):print('已经进入processrow')result = function(row)#print(f"处理完成 #{idx}: {str(result)[:50]}...")print(f"处理完成 #{idx}")result_clean=json_repair.loads(result)return result_clean# 使用Parallel进行并行处理,并添加verbose参数(日志详细程度)
results0= Parallel(n_jobs=10, verbose=10)(delayed(process_row)(row, i,function) for i, row in data0.iterrows()
)

通过Parallel的return_as参数,实现结果的即使反馈

return_as="generator"

输出可以是一个生成器,它会尽快返回可用的结果,即使后续任务尚未完成。输出的顺序始终与输入提交的顺序一致

return_as="generator_unordered" 

在这种情况下,输出的顺序取决于workers的并发情况,并且是不确定的,这意味着每次执行代码时结果的顺序可能不同。

传统的 Parallel(...)(...) 会先把所有任务结果收集完,再一次性返回一个 list。当任务数量大、单个结果对象很大时,可能导致内存占用暴涨,甚至 OOM(内存溢出)。

使用 return_as="generator" 则:1.每次只保留一个任务的结果在内存中 2.避免累积数百/数千个中间结果。

注意事项

generator使用过比如next/list()后无法回到上一个状态,想要关闭的话使用.close()

环境准备

首先可以更新一下joblib包,1.0.1版本的是不支持return_as的参数的

import joblib
print(joblib.__version__)
!pip install --upgrade joblib

Embarrassingly parallel for loops — joblib 1.6.dev0 documentation

可以参考文档测试一下代码

from math import sqrt
from joblib import Parallel, delayed
parallel = Parallel(n_jobs=2, return_as="generator")
output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
print(type(output_generator))
print(next(output_generator))
print(next(output_generator))
print(list(output_generator))

 实现中断后可以继续处理未处理的数据

import os
import json
import pandas as pd
from joblib import Parallel, delayed
import json_repair# 假设这个是你已有的 DataFrame
# data0 = pd.read_csv(...) 或别的方式加载SAVE_DIR = "results_json"
os.makedirs(SAVE_DIR, exist_ok=True)# 检查是否已处理过
def has_processed(idx):return os.path.exists(os.path.join(SAVE_DIR, f"{idx}.json"))# 每一行任务的处理函数
def process_row(row, idx, openai_thinking_is_match):if has_processed(idx):print(f"跳过 #{idx}:已存在结果")return Noneprint(f"开始处理 #{idx}")result = openai_thinking_is_match(row)print(f"处理完成 #{idx}")result_clean = json_repair.loads(result)# 保存结果到独立文件with open(os.path.join(SAVE_DIR, f"{idx}.json"), "w", encoding="utf-8") as f:json.dump(result_clean, f, ensure_ascii=False, indent=2)return result_clean# 执行并行任务parallel = Parallel(n_jobs=50, return_as="generator", verbose=10)output_generator = parallel(delayed(process_row)(row, idx, function)for idx, row in data0.iterrows())# 可选:边执行边消费for _ in output_generator:pass

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/86308.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/86308.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

css上下滚动文字

效果图 取得是数组里的数据 上下滚动切换 css .notice-new {background: #222222;border-radius: 19rpx;margin-top: 28rpx;font-size: 24rpx;color: white;font-weight: 500;padding: 0 20rpx;height: 55rpx;line-height: 55rpx;overflow: hidden;.notice-scroll-wrapper {pos…

概念篇: 01-带你认识Dockerfile

在本篇文章中&#xff0c;我们将带你认识 Dockerfile —— 构建 Docker 镜像的"蓝图"。我们会介绍它的基本概念和常用指令&#xff0c;帮助你理解如何使用它来打包你的应用。 简单了解 Docker&#xff08;背景知识&#xff09; 在我们深入 Dockerfile 之前&#xf…

技术伦理之争:OpenAI陷抄袭风波,法院强制下架宣传视频

在AI巨头OpenAI宣布以65亿美元天价收购苹果前设计总监Jony Ive的硬件公司IO仅一个月后&#xff0c;一场抄袭指控将这家科技明星企业推上风口浪尖。 源自谷歌X实验室的初创企业IYO将OpenAI告上法庭&#xff0c;指控其窃取智能耳塞核心技术&#xff0c;并通过巨额收购试图掩盖抄袭…

前沿解读:缺陷如何操控二维半导体中的电子摩擦耗散超快动力学

摩擦能耗约占全球一次能源损耗的1/3&#xff0c;在微纳器件中尤为突出。二维半导体&#xff08;如WS₂&#xff09;因其独特的电子特性成为研究热点&#xff0c;但电子摩擦的动态机制因电子行为的超快特性长期难以捕捉。近期清华团队在Nature Communications发表的研究[1]&…

什么是物联网 (IoT)?

你家是否安装了智能恒温器&#xff1f;或者你属于三分之一的美国健身追踪器用户&#xff0c;通过设备记录运动习惯&#xff1f;如果是&#xff0c;你已在使用物联网技术。这项技术不仅融入日常生活&#xff0c;更深刻改变着组织的运营方式。物联网通过多种技术连接数字与物理世…

[特殊字符] Windows 查看端口占用及服务来源教程(以 9018 端口为例)

下面是一份详细的 Windows 系统中排查 某端口&#xff08;如 9018&#xff09;被哪个程序占用 并确定其具体服务来源的完整教程&#xff0c;适合用于日常运维、开发部署排障等场景。 &#x1f3af; Windows 查看端口占用及服务来源教程&#xff08;以 9018 端口为例&#xff09…

异步爬虫 原理与解析

先遍历100遍一个程序 import requests import logging import timelogging.basicConfig(levellogging.INFO, format%(asctime)s - %(levelname)s: %(message)s) TOTAL_NUMBER 100 BASE_URL https://ssr4.scrape.center/start_time time.time() for id in range(1,TOTAL_NUM…

vscode管理go多个版本

#1.下载go安装包 https://developer.aliyun.com/mirror/golang/?spma2c6h.25603864.0.0.55ea7c45IsI4GM # 2.创建 sdk 目录&#xff08;如果不存在&#xff09; mkdir -p ~/sdk # 3.解压下载的 go1.16.15 到 ~/sdk/ tar -C ~/sdk -xzf go1.16.15.linux-amd64.tar.gz # 4.重…

香港维尔利健康科技集团推出AI辅助医学影像训练平台,助力医护人才数字化转型

香港维尔利健康科技集团近日正式发布其自主研发的“AI辅助医学影像训练平台&#xff08;V-MedTrain&#xff09;”&#xff0c;这一创新平台的上线&#xff0c;标志着医学影像教育迈入智能化辅助教学新时代。依托人工智能与大数据分析技术&#xff0c;香港维尔利健康科技集团在…

互联网+医疗,医疗服务的全方位革新

近年来&#xff0c;互联网医疗行业迅速崛起&#xff0c;为医疗健康服务带来了翻天覆地的变革。新模式、新业态层出不穷&#xff0c;不仅大幅提升了医疗健康服务的可及性&#xff0c;也使得群众就医体验更为舒适、便捷。互联网技术的广泛应用&#xff0c;不仅改变了医疗核心业务…

酒店智能门锁系统常见问题解决方法——东方仙盟

重做系统后 usb发卡器与注册时发卡器不一致 解决发方法: 用总卡重新注册软件,要可以开房间的总卡 房号不存在 2声---正确提示&#xff0c;表示是设置卡 3声---门锁已反锁&#xff0c;解决方法&#xff1a;用能开反锁的卡或解除反锁 6声---房号不对&#xff0c;解决方法&#…

从零开始理解百度语音识别API的Python实现

大家好&#xff01;今天我要给大家详细讲解一个使用百度语音识别API的Python代码。这个代码可以将音频文件转换成文字&#xff0c;非常适合做语音转文字的应用。我会从最基础的概念开始讲起&#xff0c;确保没有任何编程基础的朋友也能理解。 翻译 一、代码概览 这段代码主要…

中小企业适用的几种会议签到工具

对企业行政来说&#xff0c;会议签到是件小事&#xff0c;但处理不好&#xff0c;会直接拖慢会议流程、影响管理效率、降低参会体验。尤其是面对人数多、时间紧、场地临时变动等情况&#xff0c;靠传统纸笔或简单Excel管理&#xff0c;往往应对乏力。 实际上&#xff0c;签到看…

android 11.0 打开ALOGV ALOGI ALOGD日志输出的方法

1.前言 在11.0的系统rom定制化开发中,在某些时候,需要打印ALOGV,ALOGI等TAG日志,在系统中,默认是关闭这些日志的, 防止日志打印过多,系统过于卡顿,但是有时候会为了调试,需要打开日志开关,所以就需要在系统源码中查看哪里 需要打开日志的开关,来实现日志的打印解决…

语言大模型or时序大模型?原理、应用与未来发展

引言 随着人工智能技术的飞速发展&#xff0c;大规模预训练模型已成为当前研究的热点。其中&#xff0c;语言模型和时序大模型作为两类重要的模型架构&#xff0c;分别在自然语言处理和时间序列分析领域展现出卓越的性能。然而&#xff0c;这两类模型在基本原理和应用场景上存…

【Excel数据分析】花垣县事业单位出成绩了,用Excel自带的M语言做一个数据分析

这里写自定义目录标题 花垣县事业单位出成绩了&#xff0c;用Excel自带的M语言做一个数据分析需求 花垣县事业单位出成绩了&#xff0c;用Excel自带的M语言做一个数据分析 Power Query M 语言&#xff0c;简称 M 语言&#xff0c;全名叫 Power Query Formula Language。 需求…

微处理器原理与应用篇---音频采集与串口传输功能的系统设计

这段内容是基于 STM32F407VGT6 单片机&#xff0c;实现音频采集与串口传输功能的嵌入式系统设计方案&#xff0c;包含硬件架构、软件逻辑和代码实现&#xff0c;核心是通过 ADC 采集音频、串口收发指令与数据 &#xff0c;以下分模块拆解&#xff1a; 一、系统设计概述 硬件&…

【大模型学习 | 量化】pytorch量化基础知识(1)

pytorch量化 [!note] 官方定义&#xff1a;performing computations and storing tensors at lower bitwidths than floating point precision.支持INT8量化&#xff0c;可以降低4倍的模型大小以及显存需求&#xff0c;加速2-4倍的推理速度通俗理解&#xff1a;降低权重和激活值…

ES和 Kafka 集群搭建过程中的典型问题、配置规范及最佳实践

Kafka 集群搭建与配置经验库文档&#xff08;完整会话汇总&#xff09; 一、会话问题分类与解决方案 1. Elasticsearch 映射解析错误 问题现象&#xff1a; {"error":{"root_cause":[{"type":"mapper_parsing_exception","re…

Linux-信号量

目录 POSIX信号量 信号量的原理 信号量的概念 申请信号量失败被挂起等待 信号量函数 二元信号量模拟实现互斥功能 基于环形队列的生产消费模型 下面环形队列采用数组模拟&#xff0c;用模运算来模拟环状特性&#xff0c;类似如此 空间资源和数据资源 生产者和消费者申请…