MetaGPT源码剖析(三):多智能体系统的 “智能角色“ 核心实现——Role类

发布于:2025-07-24 ⋅ 阅读:(32) ⋅ 点赞:(0)

每一篇文章都短小精悍,不啰嗦。

今天我们来深入剖析Role类的代码实现。在多智能体协作系统中,Role(角色)就像现实世界中的 "员工",是执行具体任务、参与协作的基本单位。这段代码是 MetaGPT 框架的核心,它定义了一个角色从 "接收信息" 到 "做出决策" 再到 "执行任务" 的完整生命周期。

一、类的整体结构与核心定位

1. 继承关系:能力的组合

class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
  • BaseRole:抽象基类,定义了角色的核心接口(think/act/react等),强制实现基础能力;
  • SerializationMixin:提供序列化能力,支持角色状态的保存与恢复(断点续跑);
  • ContextMixin:整合全局上下文(配置、成本管理器等),方便访问全局资源;
  • BaseModel(Pydantic):提供数据校验、属性管理(如model_config),简化参数处理。

设计意图:通过多继承将不同维度的能力(接口规范、序列化、上下文访问、数据管理)分离,符合 "单一职责原则"。

2. 核心属性:角色的 "身份与配置"

属性 作用 类比现实
name/profile 角色名称(如 "张三")与身份(如 "产品经理") 员工的姓名和职位
goal/constraints 工作目标(如 "设计需求文档")与约束(如 "用中文输出") 岗位目标和工作规范
actions 角色可执行的动作(如WriteCode/AnalyzeRequirement 员工的技能列表
rc(RoleContext) 运行时上下文(消息缓冲区、记忆、当前状态等) 员工的 "工作记忆"(待办、邮件、历史记录)
react_mode 决策模式(react/by_order/plan_and_act 工作方式(灵活应变 / 按流程 / 先规划后执行)

二、关键函数实现剖析

模块 1:初始化与基础配置

1. __init___process_role_extra:角色的 "入职配置"
@model_validator(mode="after")
def validate_role_extra(self):
    self._process_role_extra()
    return self

def _process_role_extra(self):
    kwargs = self.model_extra or {}
    if self.is_human:
        self.llm = HumanProvider(None)  # 人类角色用人类交互接口
    self._check_actions()  # 初始化动作列表
    self.llm.system_prompt = self._get_prefix()  # 设置大模型提示词前缀
    self.llm.cost_manager = self.context.cost_manager  # 绑定成本管理器
    if not self.observe_all_msg_from_buffer:
        self._watch(kwargs.pop("watch", [UserRequirement]))  # 默认关注用户需求
    if self.latest_observed_msg:
        self.recovered = True  # 标记为恢复的角色
  • 作用:完成角色初始化的收尾工作,包括 LLM 配置、动作校验、消息关注设置等。
  • 关键步骤
    • 若为人类角色,绑定HumanProvider(接收人类输入);
    • 通过_check_actions初始化动作列表(确保每个动作正确绑定上下文和 LLM);
    • 设置 LLM 的系统提示词(_get_prefix生成),定义角色的 "人设";
    • 默认关注UserRequirement(用户需求),确保角色能接收核心指令。
2. _get_prefix:角色的 "人设说明书"
def _get_prefix(self):
    if self.desc:
        return self.desc
    # 基础人设:身份、名称、目标
    prefix = PREFIX_TEMPLATE.format(profile=self.profile, name=self.name, goal=self.goal)
    # 约束条件(如"只能用中文")
    if self.constraints:
        prefix += CONSTRAINT_TEMPLATE.format(constraints=self.constraints)
    # 环境信息(所在团队的其他角色)
    if self.rc.env and self.rc.env.desc:
        all_roles = self.rc.env.role_names()
        other_role_names = ", ".join([r for r in all_roles if r != self.name])
        prefix += f"You are in {self.rc.env.desc} with roles({other_role_names})."
    return prefix
  • 作用:生成 LLM 的系统提示词,定义角色的核心身份、目标和约束,是角色 "行为准则" 的基础。
  • 设计亮点:动态整合环境信息(如团队中的其他角色),让角色知道 "自己在和谁协作",提升协作合理性。

模块 2:感知环境 ——_observe:角色的 "读邮件"

async def _observe(self) -> int:
    # 1. 获取新消息(从缓冲区或恢复状态)
    news = []
    if self.recovered and self.latest_observed_msg:
        # 从恢复状态获取最近消息
        news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)
    if not news:
        # 从消息缓冲区取所有未处理消息
        news = self.rc.msg_buffer.pop_all()
    
    # 2. 过滤消息(只保留感兴趣的)
    old_messages = [] if not self.enable_memory else self.rc.memory.get()  # 已处理的旧消息
    self.rc.news = [
        n for n in news 
        if (n.cause_by in self.rc.watch or self.name in n.send_to)  # 关注的动作或发给自己的消息
        and n not in old_messages  # 排除已处理的
    ]
    
    # 3. 存入记忆(避免重复处理)
    if self.observe_all_msg_from_buffer:
        self.rc.memory.add_batch(news)  # 全量存入(无状态角色可能需要)
    else:
        self.rc.memory.add_batch(self.rc.news)  # 只存感兴趣的
    
    # 4. 记录最新消息(用于断点恢复)
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None
    
    logger.debug(f"{self._setting} observed: {[f'{i.role}: {i.content[:20]}...' for i in self.rc.news]}")
    return len(self.rc.news)
  • 作用:从消息缓冲区获取并处理新消息,是角色 "感知世界" 的入口。
  • 核心逻辑:"取消息→过滤→存记忆",确保角色只关注与自己相关的信息(避免信息过载)。
  • 设计亮点
    • 支持断点恢复(从latest_observed_msg继续处理);
    • 灵活的消息过滤机制(通过rc.watchsend_to判断相关性);
    • 可配置是否全量存入记忆(observe_all_msg_from_buffer),适应不同角色需求(如管理者可能需要了解全局)。

模块 3:决策系统 ——_think:角色的 "思考下一步"

_think是角色的 "决策核心",根据react_mode(反应模式)决定下一个动作,支持三种策略:

1. 单动作场景(只有一个Action
if len(self.actions) == 1:
    self._set_state(0)  # 直接执行唯一动作
    return True
  • 场景:如 "数据采集员" 只有CollectData一个动作,无需复杂决策。
2. 按顺序执行(by_order模式)
if self.rc.react_mode == RoleReactMode.BY_ORDER:
    self._set_state(self.rc.state + 1)  # 每次切换到下一个动作
    return self.rc.state >= 0 and self.rc.state < len(self.actions)
  • 场景:流程固定的任务(如 "需求→设计→开发" 的线性流程),按预设顺序执行动作。
  • 示例ProductManager先执行WritePRD,再执行ReviewPRD
3. 动态决策(react模式):LLM 驱动
# 构建提示词:包含历史记录、当前状态、可选动作
prompt = self._get_prefix() + STATE_TEMPLATE.format(
    history=self.rc.history,
    states="\n".join(self.states),  # 可选动作列表(如"0. 写需求 1. 评审需求")
    n_states=len(self.states) - 1,
    previous_state=self.rc.state
)

# 调用LLM选择下一个状态(动作索引)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state)  # 提取纯数字结果

# 校验并更新状态
if next_state not in range(-1, len(self.states)):
    next_state = -1  # 无效状态则终止
self._set_state(next_state)  # 更新状态并绑定对应动作

  • 场景:复杂、动态的任务(如应对用户频繁变更的需求),需要 LLM 根据历史对话动态判断。
  • 设计亮点
    • STATE_TEMPLATE严格约束 LLM 输出格式(只返回数字),避免解析错误;
    • 失败处理(无效输出时设为 - 1 终止),保证系统健壮性。
4. 规划后执行(plan_and_act模式)

_plan_and_act实现,先通过Planner生成任务计划,再按计划执行动作(后续详解)。

模块 4:执行系统 ——_act:角色的 "动手做事"

async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    # 1. 执行当前待办动作(如WriteCode.run())
    response = await self.rc.todo.run(self.rc.history)  # 传入历史信息供参考
    
    # 2. 将结果封装为消息(便于协作)
    if isinstance(response, (ActionOutput, ActionNode)):
        # 动作输出→AIMessage(带结构化内容)
        msg = AIMessage(
            content=response.content,
            instruct_content=response.instruct_content,  # 结构化指令内容(如JSON)
            cause_by=self.rc.todo,  # 标记由哪个动作产生
            sent_from=self  # 标记发送者
        )
    elif isinstance(response, Message):
        msg = response  # 已是消息格式,直接使用
    else:
        # 其他类型→简单文本消息
        msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)
    
    # 3. 存入记忆(记录自己的执行结果)
    self.rc.memory.add(msg)
    return msg

  • 作用:执行_think决策的动作,并将结果封装为消息(供其他角色接收)。
  • 核心流程:"执行动作→封装结果→记录记忆",是角色产生价值的核心步骤。
  • 设计亮点
    • 兼容多种输出类型(ActionOutput/Message/ 文本),灵活适配不同Action的实现;
    • 消息中记录cause_bysent_from,便于追踪 "动作来源" 和 "发送者",支持协作追溯。

模块 5:主流程 ——runreact:角色的 "工作循环"

1. run:完整工作流程
async def run(self, with_message=None) -> Message | None:
    # 1. 处理输入消息(如有外部消息,先放入缓冲区)
    if with_message:
        self.put_message(with_message)
    
    # 2. 感知新消息(无消息则等待)
    if not await self._observe():
        logger.debug(f"{self.name}:无新消息,等待中")
        return
    
    # 3. 反应(根据模式执行思考-行动循环)
    rsp = await self.react()
    
    # 4. 发送结果(广播给其他角色)
    self.publish_message(rsp)
    return rsp

  • 作用:串联 "感知→决策→行动→反馈" 的完整闭环,是角色对外提供服务的入口。
  • 类比:员工的 "工作日流程"—— 看邮件→想方案→做事情→发结果。
2. react:反应策略分发
async def react(self) -> Message:
    if self.rc.react_mode in [RoleReactMode.REACT, RoleReactMode.BY_ORDER]:
        rsp = await self._react()  # 执行思考-行动循环
    elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:
        rsp = await self._plan_and_act()  # 先规划再执行
    self._set_state(-1)  # 重置状态
    return rsp

  • 作用:根据react_mode调用对应的反应策略,是 "策略模式" 的典型应用。
3. _reactreact/by_order模式的执行循环
async def _react(self) -> Message:
    actions_taken = 0
    rsp = AIMessage(content="No actions taken yet", cause_by=Action)
    while actions_taken < self.rc.max_react_loop:  # 限制最大循环次数(防无限执行)
        # 思考下一步
        has_todo = await self._think()
        if not has_todo:
            break
        # 执行动作
        rsp = await self._act()
        actions_taken += 1
    return rsp  # 返回最后一个动作的结果

  • 作用:实现 "思考→行动" 的循环,支持多轮决策(如先分析需求,再修改方案)。
  • 安全机制max_react_loop限制最大轮次,避免因 LLM 决策失误导致无限循环。

模块 6:消息协作 ——publish_messageput_message

角色通过消息与其他角色协作,这两个方法是 "沟通工具":

1. put_message:接收私信
def put_message(self, message):
    if not message:
        return
    self.rc.msg_buffer.push(message)  # 放入私有消息缓冲区

  • 作用:接收发给自己的消息(如 "@产品经理 请补充需求"),存入私有缓冲区,供_observe处理。
2. publish_message:广播消息
def publish_message(self, msg):
    if not msg:
        return
    # 处理"发给自己"的标记
    if MESSAGE_ROUTE_TO_SELF in msg.send_to:
        msg.send_to.add(any_to_str(self))
        msg.send_to.remove(MESSAGE_ROUTE_TO_SELF)
    # 发给自己的消息直接放入缓冲区
    if all(to in {any_to_str(self), self.name} for to in msg.send_to):
        self.put_message(msg)
        return
    # 否则通过环境广播(所有订阅者可见)
    if self.rc.env:
        self.rc.env.publish_message(msg)

  • 作用:将消息广播到环境中,供其他角色接收(如 "产品经理发布需求文档,供架构师参考")。
  • 设计亮点
    • 支持 "发给自己" 的特殊标记(MESSAGE_ROUTE_TO_SELF),方便角色自我记录;
    • 消息路由由环境(env)处理,符合 "单一职责原则"(角色不关心谁接收,只负责发送)。

模块 7:其他关键功能

1. 动作管理:set_actions/set_action
def set_actions(self, actions: list[Union[Action, Type[Action]]]):
    self._reset()  # 清空现有动作
    for action in actions:
        # 实例化动作(如果传入的是类)
        if not isinstance(action, Action):
            i = action(context=self.context)
        else:
            i = action
        # 初始化动作(绑定上下文、LLM、前缀)
        self._init_action(i)
        self.actions.append(i)
        self.states.append(f"{len(self.actions)-1}. {action}")  # 记录动作状态描述

  • 作用:为角色添加可执行的动作(如给Engineer添加WriteCodeTestCode)。
  • 设计亮点:支持传入动作类或实例,自动初始化并绑定上下文,简化角色配置。
2. 记忆管理:get_memories
def get_memories(self, k=0) -> list[Message]:
    return self.rc.memory.get(k=k)  # 返回最近k条记忆(k=0返回全部)

  • 作用:获取历史消息(记忆),供决策和动作执行参考(如_think需要历史对话,_act需要上下文信息)。
3. 规划执行:_plan_and_actplan_and_act模式)
async def _plan_and_act(self) -> Message:
    # 1. 生成计划(基于目标和历史)
    if not self.planner.plan.goal:
        goal = self.rc.memory.get()[-1].content  # 取最新需求作为目标
        await self.planner.update_plan(goal=goal)  # LLM生成计划
    
    # 2. 按计划执行任务
    while self.planner.current_task:
        task = self.planner.current_task
        task_result = await self._act_on_task(task)  # 执行任务
        await self.planner.process_task_result(task_result)  # 处理结果(更新计划)
    
    # 3. 返回最终结果
    rsp = self.planner.get_useful_memories()[0]
    self.rc.memory.add(rsp)
    return rsp

  • 作用:适用于复杂任务(如 "开发一个电商网站"),先通过Planner拆分任务,再逐个执行。
  • 设计亮点:计划与执行分离,支持动态调整计划(如某任务失败后重新规划)。

三、设计亮点总结

  1. 策略模式:通过react_mode支持三种决策策略,适配不同场景;
  2. 模块化设计_observe/_think/_act分离,便于单独扩展(如优化决策逻辑只需改_think);
  3. 灵活性:动作可动态添加、记忆可配置、消息过滤可定制;
  4. 健壮性:无效输入处理(如 LLM 输出错误)、断点恢复(recovered状态);
  5. 现实映射:设计贴近人类协作模式(记忆、消息、决策流程),降低理解成本。

四、学习要点

  • 角色是多智能体系统的 "细胞":所有协作都通过角色的run方法串联,理解角色的生命周期是掌握多智能体系统的关键;
  • 设计模式的应用:策略模式(react_mode)、模板方法(run定义流程)、观察者模式(消息订阅)等,是代码灵活性的核心;
  • 工程化细节:异常处理、状态持久化、配置驱动等,保证系统在复杂场景下的可靠性。

希望通过今天的剖析,大家能理解Role类如何将 "智能体" 的抽象概念转化为可执行的代码,以及每个函数在其中的作用。后续可以尝试扩展角色(如实现一个Designer角色),加深理解。


网站公告

今日签到

点亮在社区的每一天
去签到