每一篇文章都短小精悍,不啰嗦。
今天我们来深入剖析Role
类的代码实现。在多智能体协作系统中,Role
(角色)就像现实世界中的 "员工",是执行具体任务、参与协作的基本单位。这段代码是 MetaGPT 框架的核心,它定义了一个角色从 "接收信息" 到 "做出决策" 再到 "执行任务" 的完整生命周期。
一、类的整体结构与核心定位
1. 继承关系:能力的组合
class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
- BaseRole:抽象基类,定义了角色的核心接口(
think
/act
/react
等),强制实现基础能力; - SerializationMixin:提供序列化能力,支持角色状态的保存与恢复(断点续跑);
- ContextMixin:整合全局上下文(配置、成本管理器等),方便访问全局资源;
- BaseModel(Pydantic):提供数据校验、属性管理(如
model_config
),简化参数处理。
设计意图:通过多继承将不同维度的能力(接口规范、序列化、上下文访问、数据管理)分离,符合 "单一职责原则"。
2. 核心属性:角色的 "身份与配置"
属性 | 作用 | 类比现实 |
---|---|---|
name /profile | 角色名称(如 "张三")与身份(如 "产品经理") | 员工的姓名和职位 |
goal /constraints | 工作目标(如 "设计需求文档")与约束(如 "用中文输出") | 岗位目标和工作规范 |
actions | 角色可执行的动作(如WriteCode /AnalyzeRequirement ) | 员工的技能列表 |
rc (RoleContext) | 运行时上下文(消息缓冲区、记忆、当前状态等) | 员工的 "工作记忆"(待办、邮件、历史记录) |
react_mode | 决策模式(react /by_order /plan_and_act ) | 工作方式(灵活应变 / 按流程 / 先规划后执行) |
二、关键函数实现剖析
模块 1:初始化与基础配置
1. __init__
与_process_role_extra
:角色的 "入职配置"
@model_validator(mode="after")
def validate_role_extra(self):self._process_role_extra()return selfdef _process_role_extra(self):kwargs = self.model_extra or {}if self.is_human:self.llm = HumanProvider(None) # 人类角色用人类交互接口self._check_actions() # 初始化动作列表self.llm.system_prompt = self._get_prefix() # 设置大模型提示词前缀self.llm.cost_manager = self.context.cost_manager # 绑定成本管理器if not self.observe_all_msg_from_buffer:self._watch(kwargs.pop("watch", [UserRequirement])) # 默认关注用户需求if self.latest_observed_msg:self.recovered = True # 标记为恢复的角色
- 作用:完成角色初始化的收尾工作,包括 LLM 配置、动作校验、消息关注设置等。
- 关键步骤:
- 若为人类角色,绑定
HumanProvider
(接收人类输入); - 通过
_check_actions
初始化动作列表(确保每个动作正确绑定上下文和 LLM); - 设置 LLM 的系统提示词(
_get_prefix
生成),定义角色的 "人设"; - 默认关注
UserRequirement
(用户需求),确保角色能接收核心指令。
- 若为人类角色,绑定
2. _get_prefix
:角色的 "人设说明书"
def _get_prefix(self):if self.desc:return self.desc# 基础人设:身份、名称、目标prefix = PREFIX_TEMPLATE.format(profile=self.profile, name=self.name, goal=self.goal)# 约束条件(如"只能用中文")if self.constraints:prefix += CONSTRAINT_TEMPLATE.format(constraints=self.constraints)# 环境信息(所在团队的其他角色)if self.rc.env and self.rc.env.desc:all_roles = self.rc.env.role_names()other_role_names = ", ".join([r for r in all_roles if r != self.name])prefix += f"You are in {self.rc.env.desc} with roles({other_role_names})."return prefix
- 作用:生成 LLM 的系统提示词,定义角色的核心身份、目标和约束,是角色 "行为准则" 的基础。
- 设计亮点:动态整合环境信息(如团队中的其他角色),让角色知道 "自己在和谁协作",提升协作合理性。
模块 2:感知环境 ——_observe
:角色的 "读邮件"
async def _observe(self) -> int:# 1. 获取新消息(从缓冲区或恢复状态)news = []if self.recovered and self.latest_observed_msg:# 从恢复状态获取最近消息news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)if not news:# 从消息缓冲区取所有未处理消息news = self.rc.msg_buffer.pop_all()# 2. 过滤消息(只保留感兴趣的)old_messages = [] if not self.enable_memory else self.rc.memory.get() # 已处理的旧消息self.rc.news = [n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) # 关注的动作或发给自己的消息and n not in old_messages # 排除已处理的]# 3. 存入记忆(避免重复处理)if self.observe_all_msg_from_buffer:self.rc.memory.add_batch(news) # 全量存入(无状态角色可能需要)else:self.rc.memory.add_batch(self.rc.news) # 只存感兴趣的# 4. 记录最新消息(用于断点恢复)self.latest_observed_msg = self.rc.news[-1] if self.rc.news else Nonelogger.debug(f"{self._setting} observed: {[f'{i.role}: {i.content[:20]}...' for i in self.rc.news]}")return len(self.rc.news)
- 作用:从消息缓冲区获取并处理新消息,是角色 "感知世界" 的入口。
- 核心逻辑:"取消息→过滤→存记忆",确保角色只关注与自己相关的信息(避免信息过载)。
- 设计亮点:
- 支持断点恢复(从
latest_observed_msg
继续处理); - 灵活的消息过滤机制(通过
rc.watch
和send_to
判断相关性); - 可配置是否全量存入记忆(
observe_all_msg_from_buffer
),适应不同角色需求(如管理者可能需要了解全局)。
- 支持断点恢复(从
模块 3:决策系统 ——_think
:角色的 "思考下一步"
_think
是角色的 "决策核心",根据react_mode
(反应模式)决定下一个动作,支持三种策略:
1. 单动作场景(只有一个Action
)
if len(self.actions) == 1:self._set_state(0) # 直接执行唯一动作return True
- 场景:如 "数据采集员" 只有
CollectData
一个动作,无需复杂决策。
2. 按顺序执行(by_order
模式)
if self.rc.react_mode == RoleReactMode.BY_ORDER:self._set_state(self.rc.state + 1) # 每次切换到下一个动作return self.rc.state >= 0 and self.rc.state < len(self.actions)
- 场景:流程固定的任务(如 "需求→设计→开发" 的线性流程),按预设顺序执行动作。
- 示例:
ProductManager
先执行WritePRD
,再执行ReviewPRD
。
3. 动态决策(react
模式):LLM 驱动
# 构建提示词:包含历史记录、当前状态、可选动作
prompt = self._get_prefix() + STATE_TEMPLATE.format(history=self.rc.history,states="\n".join(self.states), # 可选动作列表(如"0. 写需求 1. 评审需求")n_states=len(self.states) - 1,previous_state=self.rc.state
)# 调用LLM选择下一个状态(动作索引)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state) # 提取纯数字结果# 校验并更新状态
if next_state not in range(-1, len(self.states)):next_state = -1 # 无效状态则终止
self._set_state(next_state) # 更新状态并绑定对应动作
- 场景:复杂、动态的任务(如应对用户频繁变更的需求),需要 LLM 根据历史对话动态判断。
- 设计亮点:
- 用
STATE_TEMPLATE
严格约束 LLM 输出格式(只返回数字),避免解析错误; - 失败处理(无效输出时设为 - 1 终止),保证系统健壮性。
- 用
4. 规划后执行(plan_and_act
模式)
由_plan_and_act
实现,先通过Planner
生成任务计划,再按计划执行动作(后续详解)。
模块 4:执行系统 ——_act
:角色的 "动手做事"
async def _act(self) -> Message:logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")# 1. 执行当前待办动作(如WriteCode.run())response = await self.rc.todo.run(self.rc.history) # 传入历史信息供参考# 2. 将结果封装为消息(便于协作)if isinstance(response, (ActionOutput, ActionNode)):# 动作输出→AIMessage(带结构化内容)msg = AIMessage(content=response.content,instruct_content=response.instruct_content, # 结构化指令内容(如JSON)cause_by=self.rc.todo, # 标记由哪个动作产生sent_from=self # 标记发送者)elif isinstance(response, Message):msg = response # 已是消息格式,直接使用else:# 其他类型→简单文本消息msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)# 3. 存入记忆(记录自己的执行结果)self.rc.memory.add(msg)return msg
- 作用:执行
_think
决策的动作,并将结果封装为消息(供其他角色接收)。 - 核心流程:"执行动作→封装结果→记录记忆",是角色产生价值的核心步骤。
- 设计亮点:
- 兼容多种输出类型(
ActionOutput
/Message
/ 文本),灵活适配不同Action
的实现; - 消息中记录
cause_by
和sent_from
,便于追踪 "动作来源" 和 "发送者",支持协作追溯。
- 兼容多种输出类型(
模块 5:主流程 ——run
与react
:角色的 "工作循环"
1. run
:完整工作流程
async def run(self, with_message=None) -> Message | None:# 1. 处理输入消息(如有外部消息,先放入缓冲区)if with_message:self.put_message(with_message)# 2. 感知新消息(无消息则等待)if not await self._observe():logger.debug(f"{self.name}:无新消息,等待中")return# 3. 反应(根据模式执行思考-行动循环)rsp = await self.react()# 4. 发送结果(广播给其他角色)self.publish_message(rsp)return rsp
- 作用:串联 "感知→决策→行动→反馈" 的完整闭环,是角色对外提供服务的入口。
- 类比:员工的 "工作日流程"—— 看邮件→想方案→做事情→发结果。
2. react
:反应策略分发
async def react(self) -> Message:if self.rc.react_mode in [RoleReactMode.REACT, RoleReactMode.BY_ORDER]:rsp = await self._react() # 执行思考-行动循环elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:rsp = await self._plan_and_act() # 先规划再执行self._set_state(-1) # 重置状态return rsp
- 作用:根据
react_mode
调用对应的反应策略,是 "策略模式" 的典型应用。
3. _react
:react
/by_order
模式的执行循环
async def _react(self) -> Message:actions_taken = 0rsp = AIMessage(content="No actions taken yet", cause_by=Action)while actions_taken < self.rc.max_react_loop: # 限制最大循环次数(防无限执行)# 思考下一步has_todo = await self._think()if not has_todo:break# 执行动作rsp = await self._act()actions_taken += 1return rsp # 返回最后一个动作的结果
- 作用:实现 "思考→行动" 的循环,支持多轮决策(如先分析需求,再修改方案)。
- 安全机制:
max_react_loop
限制最大轮次,避免因 LLM 决策失误导致无限循环。
模块 6:消息协作 ——publish_message
与put_message
角色通过消息与其他角色协作,这两个方法是 "沟通工具":
1. put_message
:接收私信
def put_message(self, message):if not message:returnself.rc.msg_buffer.push(message) # 放入私有消息缓冲区
- 作用:接收发给自己的消息(如 "@产品经理 请补充需求"),存入私有缓冲区,供
_observe
处理。
2. publish_message
:广播消息
def publish_message(self, msg):if not msg:return# 处理"发给自己"的标记if MESSAGE_ROUTE_TO_SELF in msg.send_to:msg.send_to.add(any_to_str(self))msg.send_to.remove(MESSAGE_ROUTE_TO_SELF)# 发给自己的消息直接放入缓冲区if all(to in {any_to_str(self), self.name} for to in msg.send_to):self.put_message(msg)return# 否则通过环境广播(所有订阅者可见)if self.rc.env:self.rc.env.publish_message(msg)
- 作用:将消息广播到环境中,供其他角色接收(如 "产品经理发布需求文档,供架构师参考")。
- 设计亮点:
- 支持 "发给自己" 的特殊标记(
MESSAGE_ROUTE_TO_SELF
),方便角色自我记录; - 消息路由由环境(
env
)处理,符合 "单一职责原则"(角色不关心谁接收,只负责发送)。
- 支持 "发给自己" 的特殊标记(
模块 7:其他关键功能
1. 动作管理:set_actions
/set_action
def set_actions(self, actions: list[Union[Action, Type[Action]]]):self._reset() # 清空现有动作for action in actions:# 实例化动作(如果传入的是类)if not isinstance(action, Action):i = action(context=self.context)else:i = action# 初始化动作(绑定上下文、LLM、前缀)self._init_action(i)self.actions.append(i)self.states.append(f"{len(self.actions)-1}. {action}") # 记录动作状态描述
- 作用:为角色添加可执行的动作(如给
Engineer
添加WriteCode
和TestCode
)。 - 设计亮点:支持传入动作类或实例,自动初始化并绑定上下文,简化角色配置。
2. 记忆管理:get_memories
def get_memories(self, k=0) -> list[Message]:return self.rc.memory.get(k=k) # 返回最近k条记忆(k=0返回全部)
- 作用:获取历史消息(记忆),供决策和动作执行参考(如
_think
需要历史对话,_act
需要上下文信息)。
3. 规划执行:_plan_and_act
(plan_and_act
模式)
async def _plan_and_act(self) -> Message:# 1. 生成计划(基于目标和历史)if not self.planner.plan.goal:goal = self.rc.memory.get()[-1].content # 取最新需求作为目标await self.planner.update_plan(goal=goal) # LLM生成计划# 2. 按计划执行任务while self.planner.current_task:task = self.planner.current_tasktask_result = await self._act_on_task(task) # 执行任务await self.planner.process_task_result(task_result) # 处理结果(更新计划)# 3. 返回最终结果rsp = self.planner.get_useful_memories()[0]self.rc.memory.add(rsp)return rsp
- 作用:适用于复杂任务(如 "开发一个电商网站"),先通过
Planner
拆分任务,再逐个执行。 - 设计亮点:计划与执行分离,支持动态调整计划(如某任务失败后重新规划)。
三、设计亮点总结
- 策略模式:通过
react_mode
支持三种决策策略,适配不同场景; - 模块化设计:
_observe
/_think
/_act
分离,便于单独扩展(如优化决策逻辑只需改_think
); - 灵活性:动作可动态添加、记忆可配置、消息过滤可定制;
- 健壮性:无效输入处理(如 LLM 输出错误)、断点恢复(
recovered
状态); - 现实映射:设计贴近人类协作模式(记忆、消息、决策流程),降低理解成本。
四、学习要点
- 角色是多智能体系统的 "细胞":所有协作都通过角色的
run
方法串联,理解角色的生命周期是掌握多智能体系统的关键; - 设计模式的应用:策略模式(
react_mode
)、模板方法(run
定义流程)、观察者模式(消息订阅)等,是代码灵活性的核心; - 工程化细节:异常处理、状态持久化、配置驱动等,保证系统在复杂场景下的可靠性。
希望通过今天的剖析,大家能理解Role
类如何将 "智能体" 的抽象概念转化为可执行的代码,以及每个函数在其中的作用。后续可以尝试扩展角色(如实现一个Designer
角色),加深理解。