AI工作流的导言:
工作流系统(Workflow System)是 Dify 的核心组成部分,它通过可视化编程界面支持创建复杂的 AI 应用程序。用户可以将不同的功能块连接起来,从而设计出用于处理数据、与 AI 模型交互、管理条件以及执行各种操作的工作流。
Dify 工作流的实现机制,通过分析代码实现、数据流动和执行过程,充分理解工作流的实现原理。
1、工作流系统概述
1.1 核心概念
Dify 工作流系统是一个以图(Graph)为基础的执行引擎,它使用户能够通过可视化界面来设计和执行复杂的 AI 工作流。工作流由多种类型的节点(Node)构成,这些节点通过边(Edge)相互连接,构成一个有向图结构。
1.2 系统架构
工作流系统的核心构成模块
1.2.1: 流程引擎(Workflow Engine)
作为整个工作流的“大脑”,负责解析流程定义,构建有向无环图(DAG),并按规则调度和执行各个节点。它还控制节点之间的流转逻辑与执行顺序,确保流程按预期推进。
1.2.2:节点组件(Node Implementations)
提供多种功能类型的节点实现,如大语言模型调用(LLM)、数据库查询、知识库检索、条件判断、代码执行、HTTP 请求等,构成流程中的最小功能单元。
1.2.3:上下文与变量系统(Context & Variable Management)
负责在流程执行过程中管理数据的传递与共享,包括输入参数、中间结果、输出值以及全局/局部变量的作用域控制,确保节点间数据流动安全高效。
1.2.4:执行日志与监控(Execution Tracing & Monitoring)
记录每次流程运行的详细轨迹,包括每个节点的执行状态、耗时、输入输出内容等信息,为调试、审计、性能优化和可视化提供数据支撑。
2、数据模型设计
2.1 工作流数据模型
Dify 采用多种模型来描述工作流及其运行状况:
WorkflowModel:涵盖工作流的基础信息,如 ID、名称、描述、配置等。
WorkflowRunModel:记录工作流的运行情况,包括运行状态、启动时间、完成时间等。
WorkflowNodeExecutionModel:记录节点的运行情况,包括节点类型、输入、输出、状态等。
ConversationVariable:用于存储会话变量,包含名称、值类型、值等信息。
WorkflowDraftVariable:用于存储草稿工作流中的变量,涵盖会话变量、系统变量和节点变量。
2.2 工作流节点类型
Dify 工作流支持多种类型的节点,每种节点有不同的功能和配置:
Dify 提供了丰富多样的节点类型,以满足不同场景下的工作流构建需求:
START:作为工作流的入口点,标志着流程的开始。
END:表示工作流的终点,流程在此结束。
LLM:大语言模型节点,用于生成文本内容。
KNOWLEDGE_RETRIEVAL:知识检索节点,用于从知识库中检索所需信息。
IF_ELSE:条件分支节点,根据设定条件选择不同的执行路径。
CODE:代码执行节点,用于运行自定义代码。
HTTP_REQUEST:HTTP 请求节点,用于与外部 API 进行交互。
TOOL:工具节点,用于调用预定义的工具。
AGENT:代理节点,用于执行复杂的任务。
3、工作流执行机制
3.1 工作流执行流程
初始化工作流运行记录
解析工作流配置,构建执行图
从起始节点开始执行
根据图的边定义,确定下一个要执行的节点
执行节点,记录执行结果
重复步骤 4-5,直到达到结束节点或出现错误
完成工作流执行,更新运行记录
3.2 图引擎执行机制
图引擎作为工作流执行的核心组件,承担着以下关键职责:
解析节点和边配置:对工作流中定义的节点和边的配置信息进行分析和处理。
构建边映射和反向边映射:创建边的正向和反向映射关系,以便高效地管理和查询节点之间的连接。
识别根节点和叶子节点:确定工作流的起始节点(根节点)和结束节点(叶子节点)。
检查节点连接性和循环:验证节点之间的连接是否正确,检测是否存在循环依赖,确保工作流的执行逻辑合理。
管理并行执行:协调多个节点的并行执行,优化执行效率。
控制执行流程:根据工作流的定义和运行时条件,控制工作流的执行顺序和流程走向。
4、变量管理机制
4.1 变量池设计
Dify 工作流通过变量池(VariablePool)来管理工作流执行过程中的变量。变量池涵盖了以下几类变量:
系统变量:以 `sys.` 为前缀,例如 `sys.query`(用户输入)、`sys.files`(用户上传的文件)。
环境变量:在工作流级别进行配置的变量。
会话变量:在会话中保持持久化的变量。
节点变量:各节点的输入和输出变量。
4.2 变量传递机制
在 Dify 工作流中,节点之间通过变量池进行数据传递。具体机制如下:
节点执行后:将输出数据存储到变量池中。
下一个节点执行时:从变量池中读取所需的输入变量。
支持变量引用:通过选择器和模板字符串引用变量。
支持文件类型变量:允许传递文件类型的变量。
变量引用语法:使用 `{{#node_id.variable_name#}}` 的模板语法来引用变量。
5、节点实现机制
为了保证工作流中各类节点具备统一的执行接口,系统设计上采用面向对象的方式,让所有节点继承自一个共同的基类 —— BaseNode
。每个节点必须实现自己的 _run
方法,作为其在流程中实际执行时的核心逻辑。
5.1 基础节点结构
所有节点都继承自 BaseNode
类,实现以下方法:
-
_run:节点的具体执行逻辑
-
_get_inputs:获取节点的输入变量
-
_get_outputs:处理节点的输出变量
这是 BaseNode
类的核心实现:
BaseNode
类详解:所有节点的抽象基类
作为整个工作流系统中所有节点的统一基类,BaseNode
定义了节点运行所需的基础结构和核心接口,确保各类节点在统一规范下执行。
1. 初始化方法(__init__
)
构造函数接收必要的参数,包括节点唯一标识(node_id
)、配置信息(config
)、图引擎引用(graph_engine
)等,并据此初始化节点的基本属性。这些属性通常包括节点状态、输入输出定义以及运行时所需的上下文环境。
2. 抽象执行方法(_run
)
这是一个由子类必须实现的抽象方法,用于封装节点的核心执行逻辑。每个具体类型的节点(如 LLM 节点、条件判断节点等)都需要根据自身职责重写该方法,返回执行结果或抛出异常。
3. 统一执行入口(run
)
该方法是节点对外暴露的标准执行接口。它负责调用 _run
方法,并统一处理执行过程中的异常情况。最终,执行结果将被封装为标准事件(Event)对象返回,供流程引擎进行后续处理或记录。
通过这种设计,Dify 的工作流系统实现了高度可扩展的节点体系,使得新增节点类型变得简单高效,同时保证了整体执行流程的一致性和可控性
5.2 节点类型实现
5.2.1 StartNode 实现
StartNode
是工作流的起始节点,负责将用户输入和系统变量作为节点的输出:
StartNode
的实现非常简单,它主要完成以下工作:
-
从变量池中获取用户输入和系统变量
-
将系统变量添加到节点输入中,以
SYSTEM_VARIABLE_NODE_ID.var
的形式作为键 -
返回包含这些输入和输出的
NodeRunResult
,状态为成功
5.2.2 IfElseNode 实现
IfElseNode
是条件分支节点,根据条件选择执行路径:
IfElseNode
的实现主要完成以下工作:
-
使用
ConditionProcessor
处理条件逻辑 -
遍历
cases
结构中的条件组,并根据结果确定selected_case_id
-
如果使用旧的结构,则调用
_should_not_use_old_function
进行兼容处理 -
返回包含条件结果的
NodeRunResult
,并设置edge_source_handle
以指示下一个要执行的节点
5.2.3 LLM 节点实现
作为工作流中的关键节点,LLM 节点主要承担着调用大语言模型以生成文本的任务。
LLMNode
类的部分实现:
LLMNode
是一个典型的节点实现,负责调用大语言模型:
-
初始化节点参数和模型配置
-
处理输入变量和文件
-
构建提示消息
-
调用 LLM 模型
-
处理模型返回的结果
-
生成节点执行结果
5.2.4 ToolNode 实现
ToolNode
是工具节点,负责调用预定义的工具:
ToolNode
的实现主要完成以下工作:
-
获取工具信息和工具运行时
-
生成工具参数
-
获取会话 ID
-
通过
ToolEngine.generic_invoke
调用工具 -
处理工具返回的结果
-
生成节点执行结果
5.2.5 KnowledgeRetrievalNode 实现
KnowledgeRetrievalNode
是知识检索节点,负责从知识库中检索相关信息:
KnowledgeRetrievalNode
的实现主要完成以下工作:
-
从变量池中提取查询变量
-
检查查询是否为空
-
进行速率限制检查
-
定义检索模型配置
-
执行知识检索
-
处理检索结果
-
生成节点执行结果
5.2.6 CodeNode 实现
CodeNode
是代码执行节点,负责执行用户定义的代码:
CodeNode
的实现主要完成以下工作:
-
获取代码语言和代码内容
-
从变量池中获取输入变量
-
通过
CodeExecutor.execute_workflow_code_template
执行代码 -
检查输出变量的类型和长度
-
处理执行结果和潜在的异常
-
生成节点执行结果
5.2.7 AgentNode 实现
AgentNode
是代理节点,负责调用 AI 代理执行复杂任务:
AgentNode
的实现主要完成以下工作:
-
获取代理策略
-
生成代理参数
-
获取会话 ID
-
通过
strategy.invoke
调用代理 -
处理代理返回的结果
-
生成节点执行结果
5.2.8 HttpRequestNode 实现
HttpRequestNode
是 HTTP 请求节点,负责发送 HTTP 请求并处理响应:
HttpRequestNode
的实现主要完成以下工作:
-
获取默认配置
-
初始化
HttpRequestExecutor
-
执行 HTTP 请求
-
处理响应(包括成功和失败情况)
-
提取文件
-
生成节点执行结果
6、工作流数据流动
6.1 工作流创建和发布

-
用户在界面上设计工作流,定义节点和连接
-
系统将设计转换为工作流配置
-
创建工作流模型和草稿变量
-
发布工作流,使其可被调用
6.2 工作流调试和执行
-
用户触发工作流执行
-
系统创建工作流运行记录
-
图引擎解析工作流配置,构建执行图
-
按照图的定义执行节点
-
记录每个节点的执行状态和结果
-
完成工作流执行,更新运行记录
7、图引擎机制
图引擎是工作流执行的核心,负责解析工作流图结构并执行节点。
7.1 图引擎实现
以下是 GraphEngine
类的部分实现:
GraphEngineThreadPool
是一个继承自 ThreadPoolExecutor
的线程池,用于管理工作流的并行执行:
7.2 图结构解析
图引擎首先解析工作流的图结构,包括:
-
解析节点:解析工作流中的所有节点,包括节点类型、配置等
-
解析边:解析节点之间的连接关系,包括源节点、目标节点、源端口、目标端口等
-
构建节点映射:构建节点ID到节点对象的映射
-
构建边映射:构建边ID到边对象的映射
7.3 图结构实现
以下是 Graph
类的部分实现,它负责解析工作流配置并执行节点:
7.4 节点执行
图引擎根据图结构执行节点:
-
确定起始节点:通常是START节点
-
执行节点:调用节点的run方法
-
处理节点结果:根据节点执行结果确定下一个要执行的节点
-
处理并行执行:如果有多个分支,可以并行执行
7.4.1 节点执行的主要流程
节点执行的主要流程如下:
-
发出节点开始事件:触发
NodeRunStartedEvent
,通知系统节点开始执行 -
调用节点的
run
方法:执行节点的具体逻辑 -
处理节点事件:
-
处理
RunCompletedEvent
:获取节点执行结果 -
处理
RunStreamChunkEvent
:处理流式输出 -
处理
RunRetrieverResourceEvent
:处理检索资源
-
-
处理重试逻辑:如果节点执行失败且配置了重试,则进行重试
-
更新变量池:将节点输出变量添加到变量池中
-
发出节点完成事件:根据执行结果触发相应事件
-
成功:触发
NodeRunSucceededEvent
-
失败:触发
NodeRunFailedEvent
-
异常但继续:触发
NodeRunExceptionEvent
-
-
查找下一个要执行的节点:根据边映射和条件确定下一个节点
-
执行下一个节点:可能是串行执行或并行执行
7.4.2 查找下一个节点的机制
在工作流执行过程中,确定下一个要执行的节点是关键步骤。GraphEngine
类的 _run
方法实现了这一机制:
1:获取边映射:通过 self.graph.edge_mapping.get(next_node_id)
获取当前节点的所有出边
2:单边处理:如果只有一条出边,直接获取目标节点ID
3:多边处理:如果有多条出边,需要根据条件或并行策略确定下一个节点
条件分支:如果边有运行条件,根据条件结果确定要执行的分支
并行分支:如果没有条件或条件满足,可能需要并行执行多个分支
4:并行分支执行:通过 _run_parallel_branches
方法处理并行分支
-
创建线程池和队列管理并行执行
-
为每个分支创建一个线程执行
-
收集并处理所有分支的执行结果
5:检查节点是否在当前并行分支内:确保节点执行不会跨越并行分支边界
通过这种机制,工作流系统能够灵活地处理各种复杂的执行路径,包括条件分支和并行执行,确保工作流按照设计的逻辑正确执行。
8、图引擎与节点执行的通信机制
在工作流执行过程中,图引擎与节点执行之间的通信是基于事件驱动机制来实现的。这种机制让各个组件能够以松耦合的方式进行交互,从而显著提升了系统的可扩展性和可维护性。
8.1 事件驱动架构
工作流系统采用事件驱动架构,通过定义和传递各种事件来实现图引擎与节点之间的通信。该架构具备以下优势:
- 松耦合:图引擎与节点之间通过事件进行交互,而非直接调用,从而降低了组件间的依赖性。
- 可扩展:新的节点类型和事件类型可以轻松集成到系统中,无需对现有代码进行修改。
- 异步处理:事件可以异步处理,从而提高系统的响应性和吞吐量。
- 状态追踪:通过事件可以追踪工作流的执行状态和历史记录。
8.2 核心事件类型
工作流系统定义了多种事件类型,用于表示工作流执行过程中的不同状态和操作:
8.2.1 图级事件
-
GraphRunStartedEvent:工作流开始执行。
-
GraphRunSucceededEvent:工作流成功完成。
-
GraphRunFailedEvent:工作流执行失败。
-
GraphRunPartialSucceededEvent:工作流部分成功(部分节点失败但不影响整体结果)。
8.2.2 节点级事件
-
NodeRunStartedEvent:节点开始执行。
-
NodeRunSucceededEvent:节点执行成功。
-
NodeRunFailedEvent:节点执行失败。
-
NodeRunExceptionEvent:节点执行异常但继续执行。
-
NodeRunRetryEvent:节点重试执行。
-
NodeRunStreamChunkEvent:节点产生流式输出。
-
NodeRunRetrieverResourceEvent:节点检索资源。
8.2.3 并行分支事件
-
ParallelBranchRunStartedEvent:并行分支开始执行。
-
ParallelBranchRunSucceededEvent:并行分支执行成功。
-
ParallelBranchRunFailedEvent:并行分支执行失败。
8.2.4 迭代和循环事件
-
IterationRunStartedEvent:迭代开始。
-
IterationRunNextEvent:迭代下一步。
-
IterationRunSucceededEvent:迭代成功完成。
-
IterationRunFailedEvent:迭代失败。
-
LoopRunStartedEvent:循环开始。
-
LoopRunNextEvent:循环下一步。
-
LoopRunSucceededEvent:循环成功完成。
-
LoopRunFailedEvent:循环失败。
8.3 事件传递机制
在工作流系统中,事件的传递遵循以下流程:
8.3.1事件生成:图引擎或节点执行器生成事件:
8.3.2事件传递:通过 Python 生成器(Generator)机制传递事件:
8.3.3事件处理:工作流入口点(WorkflowEntry)接收事件并分发给回调处理器
8.3.4回调处理:回调处理器根据事件类型执行相应的操作
8.4 事件处理回调
工作流系统提供了回调接口,使得外部系统能够注册回调函数,以便处理工作流事件。
系统内置了多种回调实现,具体如下:
-
WorkflowLoggingCallback:用于记录工作流的执行日志。
-
WorkflowAppRunnerCallback:用于处理应用层面的工作流事件。
8.5 事件与状态管理
事件不仅用于通信,还用于管理工作流的状态:
8.5.1节点状态追踪:通过事件记录节点的执行状态和结果
8.5.2变量传递:事件携带节点的输入和输出变量
8.5.3错误处理:事件携带错误信息,用于错误处理和重试
8.6 事件转换与应用集成
工作流应用运行器(WorkflowAppRunner)负责将工作流事件转化为应用层面的队列事件,从而实现与应用系统的无缝集成。
这种转换机制让工作流系统在与外部应用系统实现无缝对接的同时,确保了自身内部实现的独立性。
8.7 事件通信示例
以下是一个完整的事件通信流程示例,详细展示了从节点执行到事件处理的全过程:
8.7.1 节点执行与事件产生
在图引擎执行节点的过程中,会触发并产生一系列事件:
8.7.2 事件传递与处理
事件通过工作流入口点传递给回调处理器:
8.7.3 回调处理器处理事件
回调处理器根据事件类型执行相应的操作:
8.7.4 应用运行器处理事件
应用运行器将工作流事件转换为应用级别的队列事件:
8.8 事件通信的优势
基于事件的通信机制在图引擎与节点执行器之间具有以下显著优势:
-
组件解耦:图引擎与节点执行器通过事件进行交互,而非直接调用,从而降低了组件间的耦合度。
-
简化调试:事件中包含完整的上下文信息,便于进行调试和问题排查。
-
支持异步执行:事件可以异步处理,支持并行执行和分布式部署。
-
可扩展性:新的节点类型和事件类型可以轻松集成到系统中,无需修改现有代码。
-
状态追踪:通过事件可以完整记录工作流的执行状态和历史,便于监控和审计。
-
错误处理:事件携带错误信息,支持灵活的错误处理策略和重试机制。
9、错误处理机制
9.1 错误处理策略
工作流系统提供了两种主要的错误处理策略:
-
FAIL_BRANCH:当节点执行失败时,沿着失败分支继续执行。
-
将错误信息和类型添加到变量池中。
-
设置
edge_source_handle
为FAILED
,使工作流可以沿着专门处理失败情况的分支继续执行。 -
适用于需要针对失败情况执行特定逻辑的场景。
-
-
DEFAULT_VALUE:当节点执行失败时,使用预定义的默认值继续执行。
-
将错误信息和类型添加到变量池中。
-
使用节点配置中预定义的默认值作为节点输出。
-
适用于即使失败也需要提供某种结果的场景。
-
9.2 节点重试机制
系统为某些类型的节点提供了执行失败时的重试功能,具体如下:
重试配置:
-
max_retries:最大重试次数。
-
retry_interval_seconds:重试间隔时间(秒)。
重试流程:
-
节点执行失败后,系统会检查是否配置了重试。
-
如果当前重试次数小于最大重试次数,系统会触发 NodeRunRetryEvent 事件。
-
系统等待指定的重试间隔时间。
-
系统重新执行节点。
重试事件:
系统会触发 NodeRunRetryEvent 事件,该事件包含重试索引、开始时间等信息,可用于监控和记录重试情况。
9.3 特殊错误处理行为的节点类型
系统定义了具有特定错误处理行为的节点类型,具体如下:
-
可继续执行的节点类型(CONTINUE_ON_ERROR_NODE_TYPE):
-
即使执行失败,工作流仍可继续执行。
-
例如:HTTP 请求节点、LLM 节点等。
-
这些节点可以配置错误策略(FAIL_BRANCH 或 DEFAULT_VALUE)。
-
-
可重试的节点类型(RETRY_ON_ERROR_NODE_TYPE):
-
执行失败时可以自动重试。
-
例如:HTTP 请求节点、数据库操作节点等。
-
这些节点可以配置最大重试次数和重试间隔。
-
通过这些机制,工作流系统能够灵活应对各种错误情况,从而提高工作流的健壮性和可靠性
10、变量管理机制
10.1 变量池
变量池是工作流中所有变量的集合,涵盖了以下几类:
-
用户输入变量:由用户提供的输入数据。
-
系统变量:由系统提供的变量,例如时间戳、会话ID等。
-
环境变量:与运行环境相关的变量。
-
会话变量:与当前会话相关的变量。
-
节点输出变量:节点执行完成后产生的输出变量。
-
以下是
VariablePool
类的部分实现: -
class VariablePool:
"""
Variable Pool
"""
def __init__(
self,
user_inputs: dict[str, Any],
system_variables: dict[str, Any],
environment_variables: dict[str, Any],
session_variables: dict[str, Any],
) -> None:
"""
Initialize variable pool
"""
self.user_inputs = user_inputs
self.system_variables = system_variables
self.environment_variables = environment_variables
self.session_variables = session_variables
# Initialize variable dictionary
self.variable_dictionary: dict[str, Any] = {}
def add(self, node_id: str, variable_name: str, variable_value: Any) -> None:
"""
Add variable to variable pool
"""
# Check if variable value is File
if isinstance(variable_value, File):
# Convert File to dict
variable_value = variable_value.to_dict()
# Add variable to variable dictionary
self.variable_dictionary[f"{node_id}.{variable_name}"] = variable_value
def get_variable(self, variable_selector: str) -> Any:
"""
Get variable from variable pool
"""
# Check if variable selector is empty
ifnot variable_selector:
returnNone
# Check if variable selector is system variable
if variable_selector.startswith(SYSTEM_VARIABLE_NODE_ID):
# Get system variable
variable_name = variable_selector.split(".", 1)[1]
return self.get_system_variable(variable_name)
# Check if variable selector is user input
if variable_selector.startswith(USER_INPUT_NODE_ID):
# Get user input
variable_name = variable_selector.split(".", 1)[1]
return self.get_user_input(variable_name)
# Check if variable selector is environment variable
if variable_selector.startswith(ENVIRONMENT_VARIABLE_NODE_ID):
# Get environment variable
variable_name = variable_selector.split(".", 1)[1]
return self.get_environment_variable(variable_name)
# Check if variable selector is session variable
if variable_selector.startswith(SESSION_VARIABLE_NODE_ID):
# Get session variable
variable_name = variable_selector.split(".", 1)[1]
return self.get_session_variable(variable_name)
# Get variable from variable dictionary
return self.variable_dictionary.get(variable_selector)
10.2 变量传递
变量在节点之间的传递遵循以下规则:
-
变量选择器:通过变量选择器来指定需要使用的变量。
-
变量作用域:变量的作用域覆盖整个工作流。
-
变量覆盖:后执行的节点可以覆盖先执行的节点的变量。
变量选择器的格式为 node_id.variable_name
,例如:
-
system.conversation_id
:系统变量中的会话ID。 -
user_input.query
:用户输入中的查询。 -
node_1.result
:节点1的输出变量result
。
11、并行执行机制
工作流系统支持多个分支的并行执行,这一机制是通过 GraphEngineThreadPool 来实现的。
Dify 工作流支持多个分支的并行执行:
-
通过 GraphParallel 模型来定义并行分支。
-
使用 parallel_mapping 和 node_parallel_mapping 来管理并行关系。
-
支持条件分支,可以根据条件选择执行路径。
-
限制并行层级,以避免执行图过于复杂。
12:结论
Dify 工作流系统是一款强大的可视化 AI 工作流引擎,它采用图结构来组织节点的执行流程,并通过变量池来管理数据的流动。系统支持多种节点类型、错误处理机制以及并行执行功能。Dify 工作流系统的核心组件包括:
-
工作流服务:负责管理整个工作流的生命周期。
-
工作流入口:作为工作流执行的起始点。
-
图引擎:承担节点的调度与执行任务。
-
变量池:负责管理工作流中的各类变量。
-
节点实现:提供各类节点的具体实现方式。
这些组件相互协作,使得 Dify 工作流系统能够应对从简单到复杂的各种 AI 应用场景,为用户提供了灵活且强大的工作流设计与执行能力。