基于ReAct(Reasoning + Acting)框架的自主智能体
import re
from typing import List, Tuple
from langchain_community.chat_message_histories.in_memory import ChatMessageHistory
from langchain_core.language_models.chat_models import BaseChatModel
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser
from langchain.schema.output_parser import StrOutputParser
from langchain.tools.base import BaseTool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import render_text_description
from pydantic import ValidationError
from langchain_core.prompts import HumanMessagePromptTemplate
from Agent.Action import Action
from Utils.CallbackHandlers import *
class ReActAgent:
"""AutoGPT:基于Langchain实现"""
@staticmethod
def __format_thought_observation(thought: str, action: Action, observation: str) -> str:
# 将全部JSON代码块替换为空
ret = re.sub(r'```json(.*?)```', '', thought, flags=re.DOTALL)
ret += "\n" + str(action) + "\n返回结果:\n" + observation
return ret
@staticmethod
def __extract_json_action(text: str) -> str | None:
# 匹配最后出现的JSON代码块
json_pattern = re.compile(r'```json(.*?)```', re.DOTALL)
matches = json_pattern.findall(text)
if matches:
last_json_str = matches[-1]
return last_json_str
return None
def __init__(
self,
llm: BaseChatModel,
tools: List[BaseTool],
work_dir: str,
main_prompt_file: str,
max_thought_steps: Optional[int] = 10,
):
self.llm = llm
self.tools = tools
self.work_dir = work_dir
self.max_thought_steps = max_thought_steps
# OutputFixingParser: 如果输出格式不正确,尝试修复
self.output_parser = PydanticOutputParser(pydantic_object=Action)
self.robust_parser = OutputFixingParser.from_llm(
parser=self.output_parser,
llm=llm
)
self.main_prompt_file = main_prompt_file
self.__init_prompt_templates()
self.__init_chains()
self.verbose_handler = ColoredPrintHandler(color=THOUGHT_COLOR)
def __init_prompt_templates(self):
with open(self.main_prompt_file, 'r', encoding='utf-8') as f:
self.prompt = ChatPromptTemplate.from_messages(
[
MessagesPlaceholder(variable_name="chat_history"),
HumanMessagePromptTemplate.from_template(f.read()),
]
).partial(
work_dir=self.work_dir,
tools=render_text_description(self.tools),
tool_names=','.join([tool.name for tool in self.tools]),
format_instructions=self.output_parser.get_format_instructions(),
)
def __init_chains(self):
# 主流程的chain
self.main_chain = (self.prompt | self.llm | StrOutputParser())
def __find_tool(self, tool_name: str) -> Optional[BaseTool]:
for tool in self.tools:
if tool.name == tool_name:
return tool
return None
def __step(self,
task,
short_term_memory,
chat_history,
verbose=False
) -> Tuple[Action, str]:
"""执行一步思考"""
inputs = {
"input": task,
"agent_scratchpad": "\n".join(short_term_memory),
"chat_history": chat_history.messages,
}
config = {
"callbacks": [self.verbose_handler]
if verbose else []
}
response = ""
for s in self.main_chain.stream(inputs, config=config):
response += s
# 提取JSON代码块
json_action = self.__extract_json_action(response)
# 带容错的解析
action = self.robust_parser.parse(
json_action if json_action else response
)
return action, response
def __exec_action(self, action: Action) -> str:
# 查找工具
tool = self.__find_tool(action.name)
if tool is None:
observation = (
f"Error: 找不到工具或指令 '{action.name}'. "
f"请从提供的工具/指令列表中选择,请确保按对顶格式输出。"
)
else:
try:
# 执行工具
observation = tool.run(action.args)
except ValidationError as e:
# 工具的入参异常
observation = (
f"Validation Error in args: {str(e)}, args: {action.args}"
)
except Exception as e:
# 工具执行异常
observation = f"Error: {str(e)}, {type(e).__name__}, args: {action.args}"
return observation
def run(
self,
task: str,
chat_history: ChatMessageHistory,
verbose=False
) -> str:
"""
运行智能体
:param task: 用户任务
:param chat_history: 对话上下文(长时记忆)
:param verbose: 是否显示详细信息
"""
# 初始化短时记忆: 记录推理过程
short_term_memory = []
# 思考步数
thought_step_count = 0
reply = ""
# 开始逐步思考
while thought_step_count < self.max_thought_steps:
if verbose:
self.verbose_handler.on_thought_start(thought_step_count)
# 执行一步思考
action, response = self.__step(
task=task,
short_term_memory=short_term_memory,
chat_history=chat_history,
verbose=verbose,
)
# 如果是结束指令,执行最后一步
if action.name == "FINISH":
reply = self.__exec_action(action)
break
# 执行动作
observation = self.__exec_action(action)
if verbose:
self.verbose_handler.on_tool_end(observation)
# 更新短时记忆
short_term_memory.append(
self.__format_thought_observation(
response, action, observation
)
)
thought_step_count += 1
if thought_step_count >= self.max_thought_steps:
# 如果思考步数达到上限,返回错误信息
reply = "抱歉,我没能完成您的任务。"
# 更新长时记忆
chat_history.add_user_message(task)
chat_history.add_ai_message(reply)
return reply
结合LangChain框架和工具调用能力来逐步解决用户任务。
以下是代码的逐模块解析:
1. 核心结构
class ReActAgent:
"""AutoGPT:基于Langchain实现"""
- 核心类:实现了ReAct范式(推理+行动循环)的自主智能体
- 核心能力:
- 多步思考推理
- 工具调用
- 长短期记忆管理
- 异常处理与自我修正
2. 关键静态方法
@staticmethod
def __format_thought_observation(...): # 格式化思考记录
@staticmethod
def __extract_json_action(...): # 提取JSON动作
- 功能:
__format_thought_observation
:将思考过程、动作执行和观察结果格式化为可读文本,存入短期记忆__extract_json_action
:用正则表达式提取模型输出中的最后一个JSON代码块(确保获取最新动作)
3. 初始化模块
def __init__(...):
# 核心组件初始化
self.llm = llm # 大语言模型
self.tools = tools # 可用工具列表
self.work_dir = work_dir # 工作目录
self.max_thought_steps = ... # 最大思考步数
# 输出解析系统
self.output_parser = PydanticOutputParser(pydantic_object=Action)
self.robust_parser = OutputFixingParser.from_llm(...)
# 提示工程
self.__init_prompt_templates()
self.__init_chains()
- 关键技术点:
- 双解析器机制:
OutputFixingParser
可在格式错误时自动修复输出 - Pydantic验证:确保动作符合预定义结构(Action模型)
- 工具描述渲染:
render_text_description
将工具转化为自然语言描述
- 双解析器机制:
4. 提示工程系统
def __init_prompt_templates(self):
with open(self.main_prompt_file) as f:
self.prompt = ChatPromptTemplate.from_messages(...)
.partial(
tools=..., # 工具描述
tool_names=..., # 工具名称列表
format_instructions=..., # 格式说明
)
- 核心要素:
- 动态加载提示模板文件
- 包含:
- 聊天历史占位符
- 工具使用说明
- 输出格式要求
- 工作目录上下文
5. 执行流程控制
def run(...):
while thought_step_count < self.max_thought_steps:
# 单步思考
action, response = self.__step(...)
if action.name == "FINISH":
break
# 执行动作
observation = self.__exec_action(action)
# 记忆更新
short_term_memory.append(...)
- ReAct循环:
- Reasoning:生成思考与动作(
__step
) - Acting:执行工具调用(
__exec_action
) - Observing:记录执行结果
- Loop:直到达到终止条件
- Reasoning:生成思考与动作(
6. 关键技术实现
6.1 单步推理 (__step
)
def __step(...):
inputs = {
"input": task,
"agent_scratchpad": "\n".join(short_term_memory),
"chat_history": chat_history.messages,
}
# 流式处理LLM输出
for s in self.main_chain.stream(inputs):
response += s
# 提取并解析动作
json_action = self.__extract_json_action(response)
action = self.robust_parser.parse(...)
- 输入组成:
- 任务目标
- 短期记忆(推理过程)
- 长期记忆(聊天历史)
- 流式处理:实时显示思考过程
- 错误恢复:自动修复格式错误的JSON输出
6.2 动作执行 (__exec_action
)
def __exec_action(...):
tool = self.__find_tool(action.name)
try:
observation = tool.run(action.args)
except ValidationError:
# 参数验证错误处理
except Exception:
# 通用错误处理
- 异常处理机制:
- 工具不存在
- 参数验证错误
- 运行时异常
- 观察反馈:将错误信息转化为自然语言,供后续推理使用
7. 记忆系统
# 短期记忆
short_term_memory = [] # 存储格式化的推理过程
# 长期记忆
chat_history = ChatMessageHistory() # 保存完整对话记录
- 记忆类型:
- 短期记忆:当前任务的推理过程(最多保留max_thought_steps步)
- 长期记忆:跨会话的完整对话历史
8. 关键设计亮点
自愈式输出解析:
- 通过
OutputFixingParser
实现格式错误自动修复 - 示例场景:当LLM返回非法JSON时,自动尝试修正
- 通过
渐进式推理:
# 示例输出格式 Thought: 我需要先查找用户信息 Action: {"name": "user_search", "args": {"id": 123}} Observation: 用户张三,年龄30岁
- 通过
agent_scratchpad
维护推理上下文
- 通过
工具发现机制:
- 动态渲染工具描述到提示词
- 支持工具的热插拔
多级异常处理:
- 工具不存在
- 参数验证错误
- 执行时异常
- 最大步数限制
9. 使用示例
# 初始化组件
llm = ChatOpenAI()
tools = [SearchTool(), Calculator()]
agent = ReActAgent(llm, tools, work_dir="/data")
# 执行任务
result = agent.run(
task="计算马云当前年龄的平方根",
chat_history=ChatMessageHistory(),
verbose=True
)
- 典型执行流程:
- 搜索"马云年龄" → 得到60岁
- 调用计算器计算√60 → 约7.746
- 返回最终结果
10. 可扩展性建议
增强记忆管理:
- 添加向量数据库长期记忆
- 实现记忆压缩/摘要
改进推理质量:
- 添加自我验证步骤
- 实现多路径推理
性能优化:
- 添加异步执行
- 实现工具并行调用
该实现展示了如何结合LangChain框架构建复杂的自主智能体系统,平衡了LLM的创造力和结构化工具调用的可靠性。