文章目录
- 1. 多进程概述
- 1.1. 多进程的概念
- 1.2. 多进程注意事项
- 2. 进程调用方式
- 2.1. Process 类
- 2.1.1. 构造方法
- 2.1.2. 实例方法
- 2.1.3. 属性
- 2.2. 面向过程
- 2.3. 面向对象
- 3. 进程间通讯
- 3.1. Queues
- 3.2. Pipes
- 3.3. Managers(进行共享数据)
- 4. 进程同步
- 5. 进程池
https://blog.csdn.net/qq_39112646/article/details/86772525
https://blog.51cto.com/u_16175431/8116042
1. 多进程概述
1.1. 多进程的概念
Python 提供了非常好用的多进程包 multiprocessing,借助这一个包,可以轻松地从单进程到多进程的转换。只需要定义一个函数,Python 会完成其他所有的事情。
multiprocessing 支持子进程、通信和共享数据、执行不同形式的同步,提供了Process,Queue,Pipe,Lock 组件
multiprocessing 包是 Python 中多进程的管理包,与 threading.Thread() 类似,可以利用 multiprocess.Process() 创建一个进程对象,该对象和 Thread 对象用法形同,也有start()、run()、join()方法。
此外,multiprocess 包中也有Lock、Event、Semaphore、Condition类(这些对象可以向多线程那样,通过参数传递给各个进程),用以同步进程。其用法与 threading 包中的同名类一致。
1.2. 多进程注意事项
- 在 UNIX 平台上,当某个进程终结之后,该进程需要被其父进程调用 wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个 Process 对象调用 join() 方法 (实际上等同于 wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
- multiprocessing 提供了 threading 包中没有的 IPC(比如 Pipe 和 Queue),效率上更高。应优先考虑 Pipe 和 Queue,避免使用 Lock、Event、Semaphore、Condition等同步方式 (因为它们占据的不是用户进程的资源)。
- 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和 Manager 的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
- window 系统下,需要注意的是要想启动一个子进程,必须加上这句 if name == “main”,进程相关的要写在这句下面。
2. 进程调用方式
2.1. Process 类
2.1.1. 构造方法
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
2.1.2. 实例方法
is_alive(): 返回进程是否在运行。
join([timeout]): 阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的 timeout(可选参数)。
start(): 进程准备就绪,等待 CPU 调度
run(): strat() 调用 run 方法,如果实例进程时未制定传入 target,这 star 执行t默认 run() 方法。
terminate(): 不管任务是否完成,立即停止工作进程
2.1.3. 属性
authkey
daemon: 和线程的 setDeamon 功能一样
exitcode: 进程在运行时为None、如果为–N,表示被信号N结束
name: 进程名字。
pid: 进程号
2.2. 面向过程
from multiprocessing import Process
import timedef func(name):time.sleep(1)print('SubProcess', name)if __name__ == '__main__':processes = [Process(target=func, args=(f"user_{idx}",)) for idx in range(3)]# 开启子进程for p in processes:p.start()# 子进程阻塞执行for p in processes:p.join()print('MasterProcess End')
运行结果:
SubProcess user_2
SubProcess user_0
SubProcess user_1
MasterProcess End
2.3. 面向对象
from multiprocessing import Process
import timeclass MyProcess(Process):def __init__(self, name):# super(MyProcess, self).__init__()super().__init__()self.name = namedef run(self):time.sleep(1)print('SubProcess', self.name)if __name__ == '__main__':processes = [MyProcess(f"user_{idx}") for idx in range(3)]# 开启子进程for p in processes:p.start()# 子进程阻塞执行for p in processes:p.join()print('MasterProcess End')
运行结果同上。
3. 进程间通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
3.1. Queues
from multiprocessing import Process, Queue
import timedef func(name, q):time.sleep(1)print('SubProcess', name)q.put([42, name, 'hello'])if __name__ == '__main__':q = Queue()processes = [Process(target=func, args=(f"user_{idx}", q)) for idx in range(3)]# 开启子进程for p in processes:p.start()# 三个子进程各put 一次print(q.get())print(q.get())print(q.get())# 子进程阻塞执行for p in processes:p.join()print('MasterProcess End')
运行结果:
SubProcess user_0
SubProcess user_1
[42, 'user_0', 'hello']
[42, 'user_1', 'hello']
SubProcess user_2
[42, 'user_2', 'hello']
MasterProcess End
3.2. Pipes
from multiprocessing import Process, Pipedef func(conn):conn.send([42, None, 'hello'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=func, args=(child_conn,))p.start()print(parent_conn.recv()) # prints "[42, None, 'hello']"p.join()print('MasterProcess End')
3.3. Managers(进行共享数据)
from multiprocessing import Process, Manager
import timedef func(name, d, l):time.sleep(1)d[name] = {name:name}l.append(name)if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list()processes = [Process(target=func, args=(f"user_{idx}", d, l)) for idx in range(3)]# 开启子进程for p in processes:p.start()# 子进程阻塞执行for p in processes:p.join()print(d)print(l)print('MasterProcess End')
运行结果:
{'user_0': {'user_0': 'user_0'}, 'user_1': {'user_1': 'user_1'}, 'user_2': {'user_2': 'user_2'}}
['user_0', 'user_1', 'user_2']
MasterProcess End
4. 进程同步
不使用来自不同进程的锁,很容易混淆。
from multiprocessing import Process, Lock
import timedef func(name, lock):lock.acquire()try:time.sleep(1)print('hello world', name)finally:lock.release()if __name__ == '__main__':lock = Lock()processes = [Process(target=func, args=(f"user_{idx}", lock)) for idx in range(3)]# 开启子进程for p in processes:p.start()# 子进程阻塞执行for p in processes:p.join()
运行结果:
hello world user_1
hello world user_0
hello world user_2
5. 进程池
进程池内部维护一个进程序列,当使用时,就去进程池中获取一个进程,如果进程池中没有可供使用的序列,那么程序就会等待,知道进程池中有可用进程为止。
进程池中有两个方法:
apply
apply_async
from multiprocessing import Process, Pool
import timedef Foo(i):time.sleep(2)return i+100def Callback(arg):print('--->exec dong:', arg)if __name__ == '__main__':pool=Pool(5)for i in range(3):#pool.apply(func=Foo,args=(i,))pool.apply_async(func=Foo, args=(i,), callback=Callback)print('MasterProcess Start')pool.close()pool.join()print('MasterProcess End')
运行结果:
MasterProcess Start
--->exec dong: 102
--->exec dong: 100
--->exec dong: 101
MasterProcess End