除了循环和分支之外,工作流还可以并发地执行步骤。当你有多个可以相互独立运行的步骤,并且这些步骤中包含需要等待的耗时操作时,这种并发执行的方式就非常有用,因为它允许其他步骤并行运行。
触发多个事件
到目前为止,在我们的示例中,每个步骤只触发了一个事件。但在很多情况下,你可能希望并行执行多个步骤。要做到这一点,你需要触发多个事件。你可以通过 send_event 来实现这一点:
import asyncio
import randomfrom llama_index.core.workflow import Workflow, step, Context, StartEvent, Event, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepTowEvent(Event):payload: strquery: strclass ParallelFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTowEvent:print(ev.payload)ctx.send_event(StepTowEvent(payload="跳转到第二步", query="查询 1"))ctx.send_event(StepTowEvent(payload="跳转到第二步", query="查询 2"))ctx.send_event(StepTowEvent(payload="跳转到第二步", query="查询 3"))return None@step(num_workers=4)async def step_tow(self, ctx: Context, ev: StepTowEvent) -> StopEvent:print(ev.query)await asyncio.sleep(random.randint(1, 5))return StopEvent(result="结束!")async def run_workflow():w = ParallelFlow(timeout=60, verbose=True)result = await w.run(payload="开始工作流...")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(ParallelFlow, filename="parallel_workflow.html")
运行结果:
Running step step_one
开始工作流...
Step step_one produced no event
Running step step_tow
查询 1
Running step step_tow
查询 2
Running step step_tow
查询 3
Step step_tow produced event StopEvent
结束!
parallel_workflow.html
流程图:
在这个示例中,我们的起始步骤触发了三个 StepTwoEvents。step_two 步骤使用了 num_workers=4 进行装饰,这告诉工作流最多可以同时运行 4 个该步骤的实例(这也是默认值)。
收集事件
如果你执行前面的示例,你会发现工作流会在第一个完成的查询后停止。有时候这种行为是有用的,但在其他情况下,你可能希望在继续执行下一步之前,等待所有耗时操作都完成。你可以通过 collect_events 来实现这一点:
import asyncio
import randomfrom llama_index.core.workflow import Workflow, Event, step, StartEvent, Context, StopEventfrom llama_index.utils.workflow import draw_all_possible_flowsclass StepTwoEvent(Event):payload: strquery: strclass StepThreeEvent(Event):payload: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepTwoEvent:print(ev.payload)ctx.send_event(StepTwoEvent(payload="", query="查询 1"))ctx.send_event(StepTwoEvent(payload="", query="查询 2"))ctx.send_event(StepTwoEvent(payload="", query="查询 3"))return None@step(num_workers=4)async def step_two(self, ctx: Context, ev: StepTwoEvent)-> StepThreeEvent:print("第二步...", ev.query)await asyncio.sleep(random.randint(1, 5))return StepThreeEvent(payload="来自第二步")@stepasync def step_three(self, ctx: Context, ev: StepThreeEvent)-> StopEvent:print("运行第三步...")# 等待直到我们接收到 3 个事件result = ctx.collect_events(ev, [StepThreeEvent] * 3)if result is None:return None# 将这三个结果一并进行处理print("将这三个结果一并进行处理", result)return StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="开始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")
运行结果:
Running step step_one
开始工作流..
Step step_one produced no event
Running step step_two
第二步... 查询 1
Running step step_two
第二步... 查询 2
Running step step_two
第二步... 查询 3
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Step step_two produced event StepThreeEvent
Running step step_three
运行第三步...
Step step_three produced no event
Running step step_three
运行第三步...
Step step_three produced no event
Running step step_three
运行第三步...
将这三个结果一并进行处理 [StepThreeEvent(payload='来自第二步'), StepThreeEvent(payload='来自第二步'), StepThreeEvent(payload='来自第二步')]
Step step_three produced event StopEvent
Done
concurrent_workflow.html
流程图:
collect_events 方法位于 Context 上,它接收触发当前步骤的事件以及一个需要等待的事件类型数组作为参数。在本例中,我们正在等待 3 个相同类型的 StepThreeEvent 事件。
每当接收到一个 StepThreeEvent 事件时,step_three 步骤就会被触发,但 collect_events 在接收到全部 3 个事件之前会一直返回 None。一旦收集到所有 3 个事件,该步骤将继续执行,并可以将这三个结果一并进行处理。
collect_events 返回的结果是一个按接收顺序排列的事件数组,其中包含所收集到的所有事件。
多种事件类型
当然,你并不要求必须等待相同类型的事件。你可以根据需要等待任意组合的事件,例如在以下示例中:
import asynciofrom llama_index.core.workflow import Event, Workflow, step, Context, StartEvent, StopEvent
from llama_index.utils.workflow import draw_all_possible_flowsclass StepAEvent(Event):query: str
class StepBEvent(Event):query: str
class StepCEvent(Event):query: strclass StepACompleteEvent(Event):query: str
class StepBCompleteEvent(Event):query: str
class StepCCompleteEvent(Event):query: strclass ConcurrentFlow(Workflow):@stepasync def step_one(self, ctx: Context, ev: StartEvent)-> StepAEvent | StepBEvent | StepCEvent:print(ev.payload)ctx.send_event(StepAEvent(query="A 查询..."))ctx.send_event(StepBEvent(query="B 查询..."))ctx.send_event(StepCEvent(query="C 查询..."))return None@stepasync def step_a(self, ctx: Context, ev: StepAEvent)-> StepACompleteEvent:print(ev.query)return StepACompleteEvent(query="A 查询...完成")@stepasync def step_b(self, ctx: Context, ev: StepBEvent)-> StepBCompleteEvent:print(ev.query)return StepBCompleteEvent(query="B 查询...完成")@stepasync def step_c(self, ctx: Context, ev: StepCEvent)-> StepCCompleteEvent:print(ev.query)return StepCCompleteEvent(query="C 查询...完成")@stepasync def step_tow(self,ctx: Context,ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,) -> StopEvent:print("接收到的事件 ", ev.query)# 等待直到我们接收到 3 个事件result = ctx.collect_events(ev, [StepCCompleteEvent, StepACompleteEvent, StepBCompleteEvent],)if result is None:return None# do something with all 3 results togetherreturn StopEvent(result="Done")async def run_workflow():w = ConcurrentFlow(timeout=60, verbose=True)result = await w.run(payload="开始工作流..")print(result)if __name__ == '__main__':asyncio.run(run_workflow())# 工作流可视化工具
draw_all_possible_flows(ConcurrentFlow, filename="concurrent_workflow.html")
运行结果:
Running step step_one
开始工作流..
Step step_one produced no event
Running step step_a
A 查询...
Step step_a produced event StepACompleteEvent
Running step step_b
B 查询...
Step step_b produced event StepBCompleteEvent
Running step step_c
C 查询...
Step step_c produced event StepCCompleteEvent
Running step step_tow
接收到的事件 A 查询...完成
Step step_tow produced no event
Running step step_tow
接收到的事件 B 查询...完成
Step step_tow produced no event
Running step step_tow
接收到的事件 C 查询...完成
Step step_tow produced event StopEvent
Done
concurrent_workflow.html
流程图: