MetaGPT源码剖析(三):多智能体系统的 “智能角色“ 核心实现——Role类

每一篇文章都短小精悍,不啰嗦。

今天我们来深入剖析Role类的代码实现。在多智能体协作系统中,Role(角色)就像现实世界中的 "员工",是执行具体任务、参与协作的基本单位。这段代码是 MetaGPT 框架的核心,它定义了一个角色从 "接收信息" 到 "做出决策" 再到 "执行任务" 的完整生命周期。

一、类的整体结构与核心定位

1. 继承关系:能力的组合

class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
  • BaseRole:抽象基类,定义了角色的核心接口(think/act/react等),强制实现基础能力;
  • SerializationMixin:提供序列化能力,支持角色状态的保存与恢复(断点续跑);
  • ContextMixin:整合全局上下文(配置、成本管理器等),方便访问全局资源;
  • BaseModel(Pydantic):提供数据校验、属性管理(如model_config),简化参数处理。

设计意图:通过多继承将不同维度的能力(接口规范、序列化、上下文访问、数据管理)分离,符合 "单一职责原则"。

2. 核心属性:角色的 "身份与配置"

属性作用类比现实
name/profile角色名称(如 "张三")与身份(如 "产品经理")员工的姓名和职位
goal/constraints工作目标(如 "设计需求文档")与约束(如 "用中文输出")岗位目标和工作规范
actions角色可执行的动作(如WriteCode/AnalyzeRequirement员工的技能列表
rc(RoleContext)运行时上下文(消息缓冲区、记忆、当前状态等)员工的 "工作记忆"(待办、邮件、历史记录)
react_mode决策模式(react/by_order/plan_and_act工作方式(灵活应变 / 按流程 / 先规划后执行)

二、关键函数实现剖析

模块 1:初始化与基础配置

1. __init___process_role_extra:角色的 "入职配置"
@model_validator(mode="after")
def validate_role_extra(self):self._process_role_extra()return selfdef _process_role_extra(self):kwargs = self.model_extra or {}if self.is_human:self.llm = HumanProvider(None)  # 人类角色用人类交互接口self._check_actions()  # 初始化动作列表self.llm.system_prompt = self._get_prefix()  # 设置大模型提示词前缀self.llm.cost_manager = self.context.cost_manager  # 绑定成本管理器if not self.observe_all_msg_from_buffer:self._watch(kwargs.pop("watch", [UserRequirement]))  # 默认关注用户需求if self.latest_observed_msg:self.recovered = True  # 标记为恢复的角色
  • 作用:完成角色初始化的收尾工作,包括 LLM 配置、动作校验、消息关注设置等。
  • 关键步骤
    • 若为人类角色,绑定HumanProvider(接收人类输入);
    • 通过_check_actions初始化动作列表(确保每个动作正确绑定上下文和 LLM);
    • 设置 LLM 的系统提示词(_get_prefix生成),定义角色的 "人设";
    • 默认关注UserRequirement(用户需求),确保角色能接收核心指令。
2. _get_prefix:角色的 "人设说明书"
def _get_prefix(self):if self.desc:return self.desc# 基础人设:身份、名称、目标prefix = PREFIX_TEMPLATE.format(profile=self.profile, name=self.name, goal=self.goal)# 约束条件(如"只能用中文")if self.constraints:prefix += CONSTRAINT_TEMPLATE.format(constraints=self.constraints)# 环境信息(所在团队的其他角色)if self.rc.env and self.rc.env.desc:all_roles = self.rc.env.role_names()other_role_names = ", ".join([r for r in all_roles if r != self.name])prefix += f"You are in {self.rc.env.desc} with roles({other_role_names})."return prefix
  • 作用:生成 LLM 的系统提示词,定义角色的核心身份、目标和约束,是角色 "行为准则" 的基础。
  • 设计亮点:动态整合环境信息(如团队中的其他角色),让角色知道 "自己在和谁协作",提升协作合理性。

模块 2:感知环境 ——_observe:角色的 "读邮件"

async def _observe(self) -> int:# 1. 获取新消息(从缓冲区或恢复状态)news = []if self.recovered and self.latest_observed_msg:# 从恢复状态获取最近消息news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)if not news:# 从消息缓冲区取所有未处理消息news = self.rc.msg_buffer.pop_all()# 2. 过滤消息(只保留感兴趣的)old_messages = [] if not self.enable_memory else self.rc.memory.get()  # 已处理的旧消息self.rc.news = [n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to)  # 关注的动作或发给自己的消息and n not in old_messages  # 排除已处理的]# 3. 存入记忆(避免重复处理)if self.observe_all_msg_from_buffer:self.rc.memory.add_batch(news)  # 全量存入(无状态角色可能需要)else:self.rc.memory.add_batch(self.rc.news)  # 只存感兴趣的# 4. 记录最新消息(用于断点恢复)self.latest_observed_msg = self.rc.news[-1] if self.rc.news else Nonelogger.debug(f"{self._setting} observed: {[f'{i.role}: {i.content[:20]}...' for i in self.rc.news]}")return len(self.rc.news)
  • 作用:从消息缓冲区获取并处理新消息,是角色 "感知世界" 的入口。
  • 核心逻辑:"取消息→过滤→存记忆",确保角色只关注与自己相关的信息(避免信息过载)。
  • 设计亮点
    • 支持断点恢复(从latest_observed_msg继续处理);
    • 灵活的消息过滤机制(通过rc.watchsend_to判断相关性);
    • 可配置是否全量存入记忆(observe_all_msg_from_buffer),适应不同角色需求(如管理者可能需要了解全局)。

模块 3:决策系统 ——_think:角色的 "思考下一步"

_think是角色的 "决策核心",根据react_mode(反应模式)决定下一个动作,支持三种策略:

1. 单动作场景(只有一个Action
if len(self.actions) == 1:self._set_state(0)  # 直接执行唯一动作return True
  • 场景:如 "数据采集员" 只有CollectData一个动作,无需复杂决策。
2. 按顺序执行(by_order模式)
if self.rc.react_mode == RoleReactMode.BY_ORDER:self._set_state(self.rc.state + 1)  # 每次切换到下一个动作return self.rc.state >= 0 and self.rc.state < len(self.actions)
  • 场景:流程固定的任务(如 "需求→设计→开发" 的线性流程),按预设顺序执行动作。
  • 示例ProductManager先执行WritePRD,再执行ReviewPRD
3. 动态决策(react模式):LLM 驱动
# 构建提示词:包含历史记录、当前状态、可选动作
prompt = self._get_prefix() + STATE_TEMPLATE.format(history=self.rc.history,states="\n".join(self.states),  # 可选动作列表(如"0. 写需求 1. 评审需求")n_states=len(self.states) - 1,previous_state=self.rc.state
)# 调用LLM选择下一个状态(动作索引)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state)  # 提取纯数字结果# 校验并更新状态
if next_state not in range(-1, len(self.states)):next_state = -1  # 无效状态则终止
self._set_state(next_state)  # 更新状态并绑定对应动作

  • 场景:复杂、动态的任务(如应对用户频繁变更的需求),需要 LLM 根据历史对话动态判断。
  • 设计亮点
    • STATE_TEMPLATE严格约束 LLM 输出格式(只返回数字),避免解析错误;
    • 失败处理(无效输出时设为 - 1 终止),保证系统健壮性。
4. 规划后执行(plan_and_act模式)

_plan_and_act实现,先通过Planner生成任务计划,再按计划执行动作(后续详解)。

模块 4:执行系统 ——_act:角色的 "动手做事"

async def _act(self) -> Message:logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")# 1. 执行当前待办动作(如WriteCode.run())response = await self.rc.todo.run(self.rc.history)  # 传入历史信息供参考# 2. 将结果封装为消息(便于协作)if isinstance(response, (ActionOutput, ActionNode)):# 动作输出→AIMessage(带结构化内容)msg = AIMessage(content=response.content,instruct_content=response.instruct_content,  # 结构化指令内容(如JSON)cause_by=self.rc.todo,  # 标记由哪个动作产生sent_from=self  # 标记发送者)elif isinstance(response, Message):msg = response  # 已是消息格式,直接使用else:# 其他类型→简单文本消息msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)# 3. 存入记忆(记录自己的执行结果)self.rc.memory.add(msg)return msg

  • 作用:执行_think决策的动作,并将结果封装为消息(供其他角色接收)。
  • 核心流程:"执行动作→封装结果→记录记忆",是角色产生价值的核心步骤。
  • 设计亮点
    • 兼容多种输出类型(ActionOutput/Message/ 文本),灵活适配不同Action的实现;
    • 消息中记录cause_bysent_from,便于追踪 "动作来源" 和 "发送者",支持协作追溯。

模块 5:主流程 ——runreact:角色的 "工作循环"

1. run:完整工作流程
async def run(self, with_message=None) -> Message | None:# 1. 处理输入消息(如有外部消息,先放入缓冲区)if with_message:self.put_message(with_message)# 2. 感知新消息(无消息则等待)if not await self._observe():logger.debug(f"{self.name}:无新消息,等待中")return# 3. 反应(根据模式执行思考-行动循环)rsp = await self.react()# 4. 发送结果(广播给其他角色)self.publish_message(rsp)return rsp

  • 作用:串联 "感知→决策→行动→反馈" 的完整闭环,是角色对外提供服务的入口。
  • 类比:员工的 "工作日流程"—— 看邮件→想方案→做事情→发结果。
2. react:反应策略分发
async def react(self) -> Message:if self.rc.react_mode in [RoleReactMode.REACT, RoleReactMode.BY_ORDER]:rsp = await self._react()  # 执行思考-行动循环elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:rsp = await self._plan_and_act()  # 先规划再执行self._set_state(-1)  # 重置状态return rsp

  • 作用:根据react_mode调用对应的反应策略,是 "策略模式" 的典型应用。
3. _reactreact/by_order模式的执行循环
async def _react(self) -> Message:actions_taken = 0rsp = AIMessage(content="No actions taken yet", cause_by=Action)while actions_taken < self.rc.max_react_loop:  # 限制最大循环次数(防无限执行)# 思考下一步has_todo = await self._think()if not has_todo:break# 执行动作rsp = await self._act()actions_taken += 1return rsp  # 返回最后一个动作的结果

  • 作用:实现 "思考→行动" 的循环,支持多轮决策(如先分析需求,再修改方案)。
  • 安全机制max_react_loop限制最大轮次,避免因 LLM 决策失误导致无限循环。

模块 6:消息协作 ——publish_messageput_message

角色通过消息与其他角色协作,这两个方法是 "沟通工具":

1. put_message:接收私信
def put_message(self, message):if not message:returnself.rc.msg_buffer.push(message)  # 放入私有消息缓冲区

  • 作用:接收发给自己的消息(如 "@产品经理 请补充需求"),存入私有缓冲区,供_observe处理。
2. publish_message:广播消息
def publish_message(self, msg):if not msg:return# 处理"发给自己"的标记if MESSAGE_ROUTE_TO_SELF in msg.send_to:msg.send_to.add(any_to_str(self))msg.send_to.remove(MESSAGE_ROUTE_TO_SELF)# 发给自己的消息直接放入缓冲区if all(to in {any_to_str(self), self.name} for to in msg.send_to):self.put_message(msg)return# 否则通过环境广播(所有订阅者可见)if self.rc.env:self.rc.env.publish_message(msg)

  • 作用:将消息广播到环境中,供其他角色接收(如 "产品经理发布需求文档,供架构师参考")。
  • 设计亮点
    • 支持 "发给自己" 的特殊标记(MESSAGE_ROUTE_TO_SELF),方便角色自我记录;
    • 消息路由由环境(env)处理,符合 "单一职责原则"(角色不关心谁接收,只负责发送)。

模块 7:其他关键功能

1. 动作管理:set_actions/set_action
def set_actions(self, actions: list[Union[Action, Type[Action]]]):self._reset()  # 清空现有动作for action in actions:# 实例化动作(如果传入的是类)if not isinstance(action, Action):i = action(context=self.context)else:i = action# 初始化动作(绑定上下文、LLM、前缀)self._init_action(i)self.actions.append(i)self.states.append(f"{len(self.actions)-1}. {action}")  # 记录动作状态描述

  • 作用:为角色添加可执行的动作(如给Engineer添加WriteCodeTestCode)。
  • 设计亮点:支持传入动作类或实例,自动初始化并绑定上下文,简化角色配置。
2. 记忆管理:get_memories
def get_memories(self, k=0) -> list[Message]:return self.rc.memory.get(k=k)  # 返回最近k条记忆(k=0返回全部)

  • 作用:获取历史消息(记忆),供决策和动作执行参考(如_think需要历史对话,_act需要上下文信息)。
3. 规划执行:_plan_and_actplan_and_act模式)
async def _plan_and_act(self) -> Message:# 1. 生成计划(基于目标和历史)if not self.planner.plan.goal:goal = self.rc.memory.get()[-1].content  # 取最新需求作为目标await self.planner.update_plan(goal=goal)  # LLM生成计划# 2. 按计划执行任务while self.planner.current_task:task = self.planner.current_tasktask_result = await self._act_on_task(task)  # 执行任务await self.planner.process_task_result(task_result)  # 处理结果(更新计划)# 3. 返回最终结果rsp = self.planner.get_useful_memories()[0]self.rc.memory.add(rsp)return rsp

  • 作用:适用于复杂任务(如 "开发一个电商网站"),先通过Planner拆分任务,再逐个执行。
  • 设计亮点:计划与执行分离,支持动态调整计划(如某任务失败后重新规划)。

三、设计亮点总结

  1. 策略模式:通过react_mode支持三种决策策略,适配不同场景;
  2. 模块化设计_observe/_think/_act分离,便于单独扩展(如优化决策逻辑只需改_think);
  3. 灵活性:动作可动态添加、记忆可配置、消息过滤可定制;
  4. 健壮性:无效输入处理(如 LLM 输出错误)、断点恢复(recovered状态);
  5. 现实映射:设计贴近人类协作模式(记忆、消息、决策流程),降低理解成本。

四、学习要点

  • 角色是多智能体系统的 "细胞":所有协作都通过角色的run方法串联,理解角色的生命周期是掌握多智能体系统的关键;
  • 设计模式的应用:策略模式(react_mode)、模板方法(run定义流程)、观察者模式(消息订阅)等,是代码灵活性的核心;
  • 工程化细节:异常处理、状态持久化、配置驱动等,保证系统在复杂场景下的可靠性。

希望通过今天的剖析,大家能理解Role类如何将 "智能体" 的抽象概念转化为可执行的代码,以及每个函数在其中的作用。后续可以尝试扩展角色(如实现一个Designer角色),加深理解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/92414.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/92414.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【项目经验】小智ai MCP学习笔记

理论 1、什么是MCP MCP(Model Context Protocol&#xff0c;模型上下文协议)是一种开放式协议&#xff0c;它实现了LLM与各种工具的调用。使LLM从对话、生成式AI变成了拥有调用三方工具的AI。用官方的比喻&#xff0c;MCP就是USB-C接口&#xff0c;只要实现了这个接口&#x…

Matlab学习笔记:矩阵基础

MATLAB学习笔记:矩阵基础 作为MATLAB的核心,矩阵是处理数据的基础工具。矩阵本质上是一个二维数组,由行和列组成,用于存储和操作数值数据。在本节中,我将详细讲解矩阵的所有知识点,包括创建、索引、运算、函数等,确保内容通俗易懂。我会在关键地方添加MATLAB代码示例,…

技术演进中的开发沉思-38 MFC系列:关于打印

打印程序也是MFC开发中不能忽视的一个环节&#xff0c;现在做打印开发so easy。但当年做打印开发还是挺麻烦。在当年的桌面程序里就像拼图的最后一块&#xff0c;看着简单&#xff0c;实则要把屏幕上的像素世界&#xff0c;准确映射到打印机的物理纸张上。而MFC 的打印机制就像…

Apache Ignite 长事务终止机制

这段内容讲的是 Apache Ignite 中长事务终止机制&#xff08;Long Running Transactions Termination&#xff09;&#xff0c;特别是关于分区映射交换&#xff08;Partition Map Exchange&#xff09;与事务超时设置&#xff08;Transaction Timeout&#xff09;之间的关系。下…

网络编程---TCP协议

TCP协议基础知识TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是互联网核心协议之一&#xff0c;位于传输层&#xff08;OSI第4层&#xff09;&#xff0c;为应用层提供可靠的、面向连接的、基于字节流的数据传输服务。它与IP协议共同构成…

K 近邻算法(K-Nearest Neighbors, KNN)详解及案例

K近邻算法&#xff08;K-Nearest Neighbors, KNN&#xff09;详解及案例 一、基本原理 K近邻算法是一种监督学习算法&#xff0c;核心思想是“物以类聚&#xff0c;人以群分”&#xff1a;对于一个新样本&#xff0c;通过计算它与训练集中所有样本的“距离”&#xff0c;找出距…

深入理解 Redis 集群化看门狗机制:原理、实践与风险

在分布式系统中&#xff0c;我们常常需要执行一些关键任务&#xff0c;这些任务要么必须成功执行&#xff0c;要么失败后需要明确的状态&#xff08;如回滚&#xff09;&#xff0c;并且它们的执行时间可能难以精确预测。如何确保这些任务不会被意外中断&#xff0c;或者在长时…

Python机器学习:从零基础到项目实战

目录第一部分&#xff1a;思想与基石——万法归宗&#xff0c;筑基问道第1章&#xff1a;初探智慧之境——机器学习世界观1.1 何为学习&#xff1f;从人类学习到机器智能1.2 机器学习的“前世今生”&#xff1a;一部思想与技术的演进史1.3 为何是Python&#xff1f;——数据科学…

数据库:库的操作

1&#xff1a;查看所有数据库SHOW DATABASES;2&#xff1a;创建数据库CREATE DATABASE [ IF NOT EXISTS ] 数据库名 [ CHARACTER SET 字符集编码 | COLLATE 字符集校验规则 | ENCRYPTION { Y | N } ];[]&#xff1a;可写可不写{}&#xff1a;必选一个|&#xff1a;n 选 1ENCR…

AngularJS 动画

AngularJS 动画 引言 AngularJS 是一个流行的JavaScript框架,它为开发者提供了一种构建动态Web应用的方式。在AngularJS中,动画是一个强大的功能,可以帮助我们创建出更加生动和引人注目的用户界面。本文将详细介绍AngularJS动画的原理、用法以及最佳实践。 AngularJS 动画…

SonarQube 代码分析工具

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 🧠全面掌握 SonarQube:企业代码质量保障的利器 🚀 在当今 DevOps 流水线中,代码…

vmware vsphere esxi6.5 使用工具导出镜像

注&#xff1a;为什么使用这个工具&#xff0c;我这边主要因为esxi6.5自身bug导致web导出镜像会失败一、下载VMware-ovftool到本地系统&#xff08;根据你的操作系统版本到官网下载安装&#xff0c;此处略&#xff09;以下内容默认将VMware-ovftool安装到windows 本地系统为例。…

ES 踩坑记:Set Processor 字段更新引发的 _source 污染

问题背景 社区的一个伙伴想对一个 integer 的字段类型添加一个 keyword 类型的子字段&#xff0c;然后进行精确匹配的查询优化&#xff0c;提高查询的速度。 整个索引数据量不大&#xff0c;并不想进行 reindex 这样的复杂操作&#xff0c;就想到了使用 update_by_query 的存量…

如何彻底搞定 PyCharm 中 pip install 报错 ModuleNotFoundError: No module named ‘requests’ 的问题

如何彻底搞定 PyCharm 中 pip install 报错 ModuleNotFoundError: No module named ‘requests’ 的问题 在使用 PyCharm 开发 Python 项目时&#xff0c;ModuleNotFoundError: No module named requests 是一个常见但令人头疼的问题。本篇博文将从环境配置、原因分析到多种解…

powerquery如何实现表的拼接主键

在做表过程中&#xff0c;有时候没有基表&#xff0c;这个时候就要构造完整的主键&#xff0c;这样才可以使之后匹配的数据不会因为主键不全而丢失数据 我的处理方法是吧多个表的主键拼在一起然后去重&#xff0c;构造一个单单之后之间的表作为基表去匹配数据 所以就哟啊用到自…

今日Github热门仓库推荐 第八期

今日Github热门仓库推荐2025-07-22 如果让AI分别扮演 后端开发人员和前端开发人员&#xff0c;然后看看他们分别对github每天的trending仓库感兴趣的有哪些&#xff0c;并且给出他感兴趣的理由&#xff0c;那会发生什么呢&#xff1f; 本内容通过Python AI生成&#xff0c;项…

Dify-13: 文本生成API端点

本文档提供了有关 Dify 中与文本生成相关的 API 端点的全面信息。文本生成 API 支持无会话持久性的单次请求文本生成&#xff0c;使其适用于翻译、摘要、文章写作等非对话式人工智能应用场景。 概述 文本生成 API 端点允许开发人员将 Dify 的文本生成功能集成到不需要维护对话上…

Leetcode 3620. Network Recovery Pathways

Leetcode 3620. Network Recovery Pathways 1. 解题思路2. 代码实现 题目链接&#xff1a;3620. Network Recovery Pathways 1. 解题思路 这一题我最开始想的是遍历一下所有的网络路径&#xff0c;不过遇到了超时的情况。因此后来调整了一下处理思路&#xff0c;使用二分法的…

链路备份技术(链路聚合、RSTP)

一、链路聚合&#xff01;链路备份技术之一-----链路聚合&#xff08;Link Aggregation&#xff09;被视为链路备份技术&#xff0c;核心原因在于它能通过多条物理链路的捆绑&#xff0c;实现 “一条链路故障时&#xff0c;其他链路自动接管流量” 的冗余备份效果&#xff0c;同…

PyTorch新手实操 安装

PyTorch简介 PyTorch 是一个基于 Python 的开源深度学习框架&#xff0c;由 Meta AI&#xff08;原 Facebook AI&#xff09;主导开发&#xff0c;以动态计算图&#xff08;Define-by-Run&#xff09;为核心&#xff0c;支持灵活构建和训练神经网络模型。其设计理念高度契合科…