进程与线程的基本概念
特性 | 进程 (Process) | 线程 (Thread) |
---|---|---|
定义 | 操作系统分配资源的基本单位(独立的内存空间) 多进程可真正并行(利用多核 CPU) | 进程内的执行单元(共享进程资源) |
独立性 | 完全独立,崩溃后不影响其他进程 | 共享进程资源,一个线程崩溃可能导致整个进程崩溃 |
资源开销 | 高(需要分配独立的内存、文件句柄等) 不可直接共享,需通过 IPC(如管道、共享内存) | 低(共享进程资源,仅需少量栈和寄存器) 可直接访问全局变量(需同步机制避免竞争) |
数据共享 | 必须通过 IPC(进程间通信):<br> - 管道(Pipe)<br> - 共享内存(Shared Memory)<br> - 消息队列(Queue) | 直接共享进程内存(需锁机制) |
同步机制 | 通常不需要 | 需要锁(Lock)、信号量(Semaphore)等。 |
场景 | 进程 | 线程 |
---|---|---|
CPU 密集型任务 (如数值计算、机器学习) | ✅ 多进程(绕过 GIL,利用多核)。 | ❌ 多线程(受 GIL 限制,无法并行计算)。 |
I/O 密集型任务 (如网络请求、文件读写) | ⚠️ 可用,但资源开销高。 | ✅ 多线程(轻量级,等待 I/O 时可切换任务)。 |
需要高隔离性 | ✅ 进程崩溃不影响其他进程。 | ❌ 线程崩溃可能拖垮整个进程。 |
Joblib库
基础Parallel并行处理
n_jobs:设置并行运行的作业数。
verbose:设置后端的详细程度。
prefer:设置要使用的首选后端。可以是“processes”(进程)或“threads”(线程)。
require:设置对后端的请求。可以是“sharedmem”(共享内存)。
inner_max_num_threads:限制第三方库使用的最大线程数,这些库管理它们自己的 C 级线程池。此参数仅受 backend 支持,这些 backend 将 supports_inner_max_num_threads 类属性设置为 True,例如“loky”后端。
temp_folder、max_nbytes、mmap_mode:控制后端的自动内存映射行为。有关更多详细信息,请参阅在共享内存(内存映射)中使用数值数据
def process_row(row,idx,function):print('已经进入processrow')result = function(row)#print(f"处理完成 #{idx}: {str(result)[:50]}...")print(f"处理完成 #{idx}")result_clean=json_repair.loads(result)return result_clean# 使用Parallel进行并行处理,并添加verbose参数(日志详细程度)
results0= Parallel(n_jobs=10, verbose=10)(delayed(process_row)(row, i,function) for i, row in data0.iterrows()
)
通过Parallel的return_as参数,实现结果的即使反馈
return_as="generator"
输出可以是一个生成器,它会尽快返回可用的结果,即使后续任务尚未完成。输出的顺序始终与输入提交的顺序一致
return_as="generator_unordered"
在这种情况下,输出的顺序取决于workers的并发情况,并且是不确定的,这意味着每次执行代码时结果的顺序可能不同。
传统的
Parallel(...)(...)
会先把所有任务结果收集完,再一次性返回一个list
。当任务数量大、单个结果对象很大时,可能导致内存占用暴涨,甚至 OOM(内存溢出)。使用
return_as="generator"
则:1.每次只保留一个任务的结果在内存中 2.避免累积数百/数千个中间结果。
注意事项
generator使用过比如next/list()后无法回到上一个状态,想要关闭的话使用.close()
环境准备
首先可以更新一下joblib包,1.0.1版本的是不支持return_as的参数的
import joblib
print(joblib.__version__)
!pip install --upgrade joblib
Embarrassingly parallel for loops — joblib 1.6.dev0 documentation
可以参考文档测试一下代码
from math import sqrt
from joblib import Parallel, delayed
parallel = Parallel(n_jobs=2, return_as="generator")
output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
print(type(output_generator))
print(next(output_generator))
print(next(output_generator))
print(list(output_generator))
实现中断后可以继续处理未处理的数据
import os
import json
import pandas as pd
from joblib import Parallel, delayed
import json_repair# 假设这个是你已有的 DataFrame
# data0 = pd.read_csv(...) 或别的方式加载SAVE_DIR = "results_json"
os.makedirs(SAVE_DIR, exist_ok=True)# 检查是否已处理过
def has_processed(idx):return os.path.exists(os.path.join(SAVE_DIR, f"{idx}.json"))# 每一行任务的处理函数
def process_row(row, idx, openai_thinking_is_match):if has_processed(idx):print(f"跳过 #{idx}:已存在结果")return Noneprint(f"开始处理 #{idx}")result = openai_thinking_is_match(row)print(f"处理完成 #{idx}")result_clean = json_repair.loads(result)# 保存结果到独立文件with open(os.path.join(SAVE_DIR, f"{idx}.json"), "w", encoding="utf-8") as f:json.dump(result_clean, f, ensure_ascii=False, indent=2)return result_clean# 执行并行任务parallel = Parallel(n_jobs=50, return_as="generator", verbose=10)output_generator = parallel(delayed(process_row)(row, idx, function)for idx, row in data0.iterrows())# 可选:边执行边消费for _ in output_generator:pass