1、引言
在使用多个爬虫脚本进行数据爬取和调用大语言模型返回结果的场景中,涉及到大量的网络IO操作。协程能够让网络IO操作并发执行,极大地提升程序的运行效率。在智能体相关的开源项目中,我们也可以经常看到协程的身影。
2、协程
协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。它本质上还是一段运行在单进程单线程上的函数,并不能提升你的运算速度。它比较适合处理需要等待的任务,例如网络的通讯。
特点:
- 协作性:它允许单线程内的任务协作执行,避免传统线程中的竞争条件。
- 切换效率:不同于系统级的线程上下文切换,协程切换在用户级实现,效率更高,开销更小
- 非抢占式:协程依赖程序员显式地进行任务切换,而不是由操作系统判定,赋予了更大的控制自由度。
3、asyncio异步框架
在python中,协程通过asyncio标准库能够很方便地使用
3.1、核心概念
3.1.1、协程(Coroutine)
协程是一种特殊的函数,它允许在执行过程中暂停和恢复。在Python的asyncio库中,协程是通过async
关键字定义的。它和普通的函数不同,普通的函数在调用时会直接执行,而协程需要被事件循环调度执行。
特点
- 非阻塞:协程在执行过程中,当遇到需要等待的操作(如I/O操作)时,不会像传统线程那样阻塞整个程序。它会暂停自己的执行,让出控制权,让其他协程运行,从而实现高效的并发。
- 轻量级:创建和切换协程的开销比线程小得多。线程切换需要操作系统级别的资源调度,而协程的切换是在用户态完成的,所以可以创建大量的协程。
import asyncioasync def say_hello():print("Hello")await asyncio.sleep(1) # 模拟异步的I/O操作,暂停协程print("World")# 创建事件循环
loop = asyncio.get_event_loop()
# 运行协程
loop.run_until_complete(say_hello())
loop.close()
注意:直接调用协程函数不会运行里面的程序返回结果,而是会返回一个协程对象。
在这个例子中,say_hello
函数是一个协程。当执行到await asyncio.sleep(1)
时,协程会暂停执行。asyncio.sleep
是一个异步的等待函数,它模拟了I/O操作。如果在这个协程暂停期间有其他协程需要运行,事件循环就可以调度其他协程执行。
3.1.2、事件循环(Event Loop)
事件循环是asyncio程序的核心。它负责调度协程的执行,管理协程的暂停和恢复。事件循环会不断地检查协程的状态,当协程暂停时,它会记录下来,并在合适的时机(如I/O操作完成)恢复协程的执行。
功能
- 调度协程:事件循环根据协程的执行状态(如是否遇到
await
暂停)来安排协程的执行顺序。 - 处理I/O事件:它会监听各种I/O事件(如文件读写、网络通信等)。当I/O操作完成时,事件循环会唤醒等待该I/O操作的协程。
import asyncioasync def task1():print("Task 1 started")await asyncio.sleep(2) # 模拟耗时操作print("Task 1 finished")async def task2():print("Task 2 started")await asyncio.sleep(1) # 模拟耗时操作print("Task 2 finished")async def main():# 创建两个任务task1_obj = asyncio.create_task(task1())task2_obj = asyncio.create_task(task2())# 等待两个任务完成,await关键字用于获取任务执行的结果await task1_objawait task2_obj# 创建事件循环
loop = asyncio.get_event_loop()
# 运行主协程
loop.run_until_complete(main())
loop.close()
在这个例子中,事件循环负责调度task1
和task2
这两个协程。task1
和task2
都会在执行到await asyncio.sleep
时暂停。事件循环会先启动task1
,当task1
暂停后,它会启动task2
。当task2
的await asyncio.sleep(1)
完成时,task2
会恢复执行并完成。而task1
会在await asyncio.sleep(2)
完成时恢复执行并完成。通过这种方式,事件循环实现了两个协程的并发执行,而不是像传统线程那样需要复杂的线程切换和同步机制。
3.1.3、Future
Future是一个表示异步操作最终结果的对象。它是一个低层次的接口,用于表示异步操作的最终完成(或失败)以及其结果。主要用于底层库来封装异步操作的结果。它允许你注册回调函数,当异步操作完成时,这些回调函数会被调用。
特点:
Future
本身并不执行任何操作,它只是一个容器,用来存储异步操作的结果。- 你可以通过
Future
的方法(如add_done_callback
)来添加回调函数,当异步操作完成时,这些回调函数会被调用。 Future
的状态可以是PENDING
(等待中)、FINISHED
(已完成或已取消)。
import asyncioasync def main():loop = asyncio.get_running_loop()future = loop.create_future()print("Future state:", future._state) # PENDING# 设置 Future 的结果future.set_result("Hello, Future!")# 等待 Future 完成result = await futureprint("Future result:", result) # Hello, Future!asyncio.run(main())
3.1.4、任务(Task)
Task是Future的一个子类,它封装了一个协程(coroutine)的执行。它用于调度协程的运行,并且可以管理协程的执行状态。主要用于调度和管理协程的执行。它是asyncio中用于并发执行协程的主要方式。
特点:
Task
是一个高级的接口,它在内部封装了协程的执行逻辑。- 你可以通过
asyncio.create_task()
或loop.create_task()
来创建一个Task
。 Task
会自动处理协程的运行,包括调度、执行和结果管理。Task
的状态可以是PENDING
(等待中)、FINISHED
(已完成或已取消)。
import asyncioasync def my_coroutine():await asyncio.sleep(1)return "Hello, Task!"async def main():task = asyncio.create_task(my_coroutine()) # 创建 Taskprint("Task state:", task._state) # PENDING# 等待 Task 完成result = await taskprint("Task result:", result) # Hello, Task!asyncio.run(main())
3.1.5、Future和Task的区别
1、功能层次
Future
是一个低层次的接口,主要用于底层库来封装异步操作的结果。Task
是一个高层次的接口,主要用于调度和管理协程的执行。
2、用途
Future
是一个容器,用来存储异步操作的结果,通常由底层库创建和管理。Task
是用来封装和调度协程的执行,通常由用户代码创建和管理。
3、创建方式
-
Future
可以通过loop.create_future()
创建。Task
可以通过asyncio.create_task()
或loop.create_task()
创建。
4、执行逻辑
Future
本身并不执行任何操作,它只是一个容器。Task
会自动调度和执行封装的协程。
5、回调机制
Future
可以通过add_done_callback
添加回调函数。Task
也可以通过add_done_callback
添加回调函数,但通常不需要手动添加,因为Task
会自动处理协程的执行和结果。
在实际使用中,大多数情况下我们直接使用Task来调度协程的执行,而Future更多是被底层库使用。
3.2、基本用法
3.2.1、运行协程
要运行一个协程,你可以使用asyncio.run()函数。它会创建一个事件循环,并运行指定的协程。
import asyncioasync def main():print("Start")await asyncio.sleep(1)print("End")asyncio.run(main())
3.2.2、并发执行多个任务
你可以使用asyncio.gather()函数并发执行多个协程,并等待它们全部完成。
import asyncioasync def task1():print("Task 1 started")await asyncio.sleep(1)print("Task 1 finished")async def task2():print("Task 2 started")await asyncio.sleep(2)print("Task 2 finished")async def main():await asyncio.gather(task1(), task2())asyncio.run(main())
3.2.3、注意事项
1、Python版本:部分功能需Python3.7+(如asyncio.run())
2、阻塞操作:避免在协程中使用同步阻塞代码
4、代码示例
下面将使用asyncio模拟异步执行三个爬虫函数
import asyncio
import timeasync def spider1():print("Spider 1 started")await asyncio.sleep(1)print("Spider 1 finished")async def spider2():print("Spider 2 started")await asyncio.sleep(2)print("Spider 2 finished")async def spider3():print("Spider 3 started")await asyncio.sleep(3)print("Spider 3 finished")async def main():# 创建任务task1 = asyncio.create_task(spider1())task2 = asyncio.create_task(spider2())task3 = asyncio.create_task(spider3())# 等待所有任务完成await asyncio.gather(task1,task2,task3)if __name__ == '__main__':start_time = time.time()# 获取事件循环loop = asyncio.get_event_loop()# 运行主函数loop.run_until_complete(main())# 关闭事件循环loop.close()end_time = time.time()print(f"程序执行时间:{end_time-start_time}")
如果三个爬虫函数同步执行,则完成一共需要6s。通过协程异步执行,只需要3s多,极大地节省了程序的运行时间。
代码优化:使用asyncio.run()
从Python3.7开始,推荐使用asyncio.run()来运行主函数,这样可以简化事件循环的管理。
if __name__ == '__main__':start_time = time.time()asyncio.run(main()) # 使用 asyncio.run() 运行主函数end_time = time.time()print(f"程序执行时间:{end_time - start_time}")