本系列博文在掘金同步发布, 更多优质文章,请关注本人掘金账号:
人肉推土机的掘金账号
AutoGen 技术博客系列 (六):SelectorGroupChat 的原理与实践
AutoGen 技术博客系列 (七):状态管理与组件序列化解析
AutoGen 技术博客系列 八:深入剖析 Swarm—— 智能体协作的新范式
AutoGen 技术博客系列 九:从 v0.2 到 v0.4 的迁移指南
在人工智能的浩瀚星空中,AutoGen 的 Swarm 模式犹如一颗耀眼的新星,正引领着智能体协作的新潮流。今天,就让我们一同深入探究这一强大模式的奥秘。
一、Swarm 模式:智能协作的核心引擎
Swarm 模式的设计理念独具匠心,它打破了传统智能体协作的桎梏,允许智能体基于自身能力灵活地将任务交接给其他合适的智能体,并且所有智能体在同一消息上下文环境下协同工作。这一特性使得智能体能够自主地进行任务规划和分配,极大地提升了系统的灵活性与适应性。
在其运行机制中,智能体轮流生成响应并广播,而每次发言智能体的选定取决于上下文中最新的 HandoffMessage
。这就要求每个智能体都具备生成 HandoffMessage
的能力,从而明确指示任务的交接方向。例如,在 AssistantAgent
中,我们可以通过设置 handoffs
参数来精准指定可交接的目标智能体,并利用 Handoff
进一步定制消息内容和交接行为,为智能体之间的协作提供了丰富的定制化空间。
从实现原理的深度层面来看,当团队接收到任务时,首个发言智能体迅速展开任务处理,并根据任务的具体情况和自身的能力判断是否需要交接任务以及交接的对象。一旦某个智能体生成 HandoffMessage
,接收智能体便会无缝接管任务,继续在相同的消息上下文环境下推进工作。这种基于消息驱动的任务交接机制,确保了任务的连贯性和高效性。
值得注意的是,AssistantAgent
依赖模型的工具调用功能来实现任务交接,这就对模型提出了较高的要求,即必须支持工具调用。若模型执行并行工具调用,可能会引发意想不到的行为。为避免此类问题,在使用 OpenAIChatCompletionClient
或 AzureOpenAIChatCompletionClient
时,我们可以通过简单地设置 parallel_tool_calls = False
来禁用并行工具调用,确保系统的稳定运行。
二、实战案例:从航班退款到股票研究
(一)客户支持案例:航班退款的智能流程
【图片来源 AutoGen 官方说明文档】
在航班退款场景中,我们构建了一个包含旅行代理(Travel Agent)和航班退款专员(Flights Refunder)的智能系统,并允许用户在必要时参与交互。
旅行代理作为系统的入口,负责启动对话并全面评估用户的退款请求。当遇到退款相关任务时,它会迅速将任务交接给航班退款专员;若需要用户提供进一步的信息,如航班号等,它会将任务交接给用户。航班退款专员则专注于使用 refund_flight
工具处理退款事宜,在需要用户输入时,会暂停团队执行,等待用户提供关键信息。
以下是具体的代码实现:
from typing import Any, Dict, List
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.teams import Swarm
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 定义退款工具函数
def refund_flight(flight_id: str) -> str:
return f"Flight {flight_id} refunded"
# 创建模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key="YOUR_API_KEY",
)
# 创建旅行代理智能体
travel_agent = AssistantAgent(
"travel_agent",
model_client=model_client,
handoffs=["flights_refunder", "user"],
system_message="""You are a travel agent. The flights_refunder is in charge of refunding flights. If you need information from the user, you must first send your message, then you can handoff to the user. Use TERMINATE when the travel planning is complete.""",
)
# 创建航班退款专员智能体
flights_refunder = AssistantAgent(
"flights_refunder",
model_client=model_client,
handoffs=["travel_agent", "user"],
tools=[refund_flight],
system_message="""You are an agent specialized in refunding flights. You only need flight reference numbers to refund a flight. You have the ability to refund a flight using the refund_flight tool. If you need information from the user, you must first send your message, then you can handoff to the user. When the transaction is complete, handoff to the travel agent to finalize.""",
)
# 设置终止条件
termination = HandoffTermination(target="user") | TextMentionTermination("TERMINATE")
team = Swarm([travel_agent, flights_refunder], termination_condition=termination)
# 定义任务
task = "I need to refund my flight."
async def run_team_stream():
task_result = await Console(team.run_stream(task=task))
last_message = task_result.messages[-1]
while isinstance(last_message, HandoffMessage) and last_message.target == "user":
user_message = input("User: ")
task_result = await Console(
team.run_stream(task=HandoffMessage(source="user", target=last_message.source, content=user_message))
)
last_message = task_result.messages[-1]
# 运行任务
# Use asyncio.run(...) if you are running this in a script.
await run_team_stream()
代码解读:
首先,我们导入了必要的模块和类,包括
AssistantAgent
、HandoffTermination
、TextMentionTermination
等,这些是构建 Swarm 团队和实现任务交接、终止条件判断的关键组件。接着定义了
refund_flight
函数,它模拟了航班退款的实际操作,接受航班号作为参数并返回退款成功的消息。创建
OpenAIChatCompletionClient
作为模型客户端,用于智能体与语言模型的交互,这里指定了使用的模型为gpt-4o
。然后分别创建了
travel_agent
和flights_refunder
两个智能体。travel_agent
的系统消息表明它作为旅行代理的角色和任务交接规则,flights_refunder
则定义了其作为航班退款专员的职责和工具使用方法。设置了终止条件
termination
,它由HandoffTermination
(当交接目标为用户时触发)和TextMentionTermination
(当消息中提及 “TERMINATE” 时触发)组合而成,确保任务在合适的时机结束。最后定义了任务 “I need to refund my flight.” 并通过
run_team_stream
函数运行团队任务,在任务执行过程中,如果遇到交接给用户的情况,会暂停等待用户输入,然后继续执行任务。
运行结果如下:
当用户输入 “I need to refund my flight.” 时,旅行代理首先做出响应:
---------- user ----------
I need to refund my flight.
---------- travel_agent ----------
[FunctionCall(id='call_ZQ2rGjq4Z29pd0yP2sNcuyd2', arguments='{}', name='transfer_to_flights_refunder')]
[Prompt tokens: 119, Completion tokens: 14]
---------- travel_agent ----------
[FunctionExecutionResult(content='Transferred to flights_refunder, adopting the role of flights_refunder immediately.', call_id='call_ZQ2rGjq4Z29pd0yP2sNcuyd2')]
---------- travel_agent ----------
Transferred to flights_refunder, adopting the role of flights_refunder immediately.
随后航班退款专员接手任务,并询问用户航班号:
---------- flights_refunder ----------
Could you please provide me with the flight reference number so I can process the refund for you?
[Prompt tokens: 191, Completion tokens: 20]
---------- flights_refunder ----------
[FunctionCall(id='call_1iRfzNpxTJhRTW2ww9aQJ8sK', arguments='{}', name='transfer_to_user')]
[Prompt tokens: 219, Completion tokens: 11]
---------- flights_refunder ----------
[FunctionExecutionResult(content='Transferred to user, adopting the role of user immediately.', call_id='call_1iRfzNpxTJhRTW2ww9aQJ8sK')]
---------- flights_refunder ----------
Transferred to user, adopting the role of user immediately.
此时系统暂停,等待用户输入航班号。假设用户输入 “507811”,则航班退款专员继续执行任务:
---------- user ----------
Sure, it's 507811
---------- flights_refunder ----------
[FunctionCall(id='call_UKCsoEBdflkvpuT9Bi2xlvTd', arguments='{"flight_id":"507811"}', name='refund_flight')]
[Prompt tokens: 266, Completion tokens: 18]
---------- flights_refunder ----------
[FunctionExecutionResult(content='Flight 507811 refunded', call_id='call_UKCsoEBdflkvpuT9Bi2xlvTd')]
---------- flights_refunder ----------
Tool calls:
refund_flight({"flight_id":"507811"}) = Flight 507811 refunded
完成退款后,航班退款专员将任务交接回旅行代理:
---------- flights_refunder ----------
[FunctionCall(id='call_MQ2CXR8UhVtjNc6jG3wSQp2W', arguments='{}', name='transfer_to_travel_agent')]
[Prompt tokens: 303, Completion tokens: 13]
---------- flights_refunder ----------
[FunctionExecutionResult(content='Transferred to travel_agent, adopting the role of travel_agent immediately.', call_id='call_MQ2CXR8UhVtjNc6jG3wSQp2W')]
---------- flights_refunder ----------
Transferred to travel_agent, adopting the role of travel_agent immediately.
最后旅行代理确认退款成功并终止流程:
---------- travel_agent ----------
Your flight with reference number 507811 has been successfully refunded. If you need anything else, feel free to let me know. Safe travels! TERMINATE
[Prompt tokens: 272, Completion tokens: 32]
从这个案例中,我们可以清晰地看到 Swarm 模式在客户支持场景中的高效应用,智能体之间的任务交接顺畅自然,有效地解决了用户的问题。
(二)股票研究案例:多智能体协同的智慧结晶
在股票研究领域,我们设计了一个由四个智能体组成的强大系统,分别是规划者(Planner)、金融分析师(Financial Analyst)、新闻分析师(News Analyst)和撰写者(Writer)。
【图片来源: AutoGen 官网文档】
规划者作为系统的核心协调者,负责根据任务需求有条不紊地将具体任务分配给各个专业智能体,并确保整个工作流程的高效运行。金融分析师专注于使用 get_stock_data
工具深入分析股票数据和财务指标,为研究提供坚实的数据支持。新闻分析师则利用 get_news
工具广泛收集和总结与股票相关的最新新闻资讯,并提取关键市场洞察。撰写者负责将金融和新闻分析的结果精心整理成一份逻辑严密、内容详实的最终报告。
每个智能体在完成自身任务后,都会将控制权交回给规划者,规划者根据整体进展情况决定是否继续分配任务或终止流程。这种循环往复的协作机制,充分发挥了每个智能体的专业优势,实现了高效的股票研究。
以下是具体的代码实现:
from typing import Any, Dict, List
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.messages import HandoffMessage
from autogen_agentchat.teams import Swarm
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 定义获取股票数据工具函数
async def get_stock_data(symbol: str) -> Dict[str, Any]:
return {"price": 180.25, "volume": 1000000, "pe_ratio": 65.4, "market_cap": "700B"}
# 定义获取新闻工具函数
async def get_news(query: str) -> List[Dict[str, str]]:
return [
{
"title": "Tesla Expands Cybertruck Production",
"date": "2024-03-20",
"summary": "Tesla ramps up Cybertruck manufacturing capacity at Gigafactory Texas, aiming to meet strong demand.",
},
{
"title": "Tesla FSD Beta Shows Promise",
"date": "2024-03-19",
"summary": "Latest Full Self-Driving beta demonstrates significant improvements in urban navigation and safety features.",
},
{
"title": "Model Y Dominates Global EV Sales",
"date": "2024-03-18",
"summary": "Tesla's Model Y becomes best-selling electric vehicle worldwide, capturing significant market share.",
},
]
# 创建模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# api_key="YOUR_API_KEY",
)
# 创建规划者智能体
planner = AssistantAgent(
"planner",
model_client=model_client,
handoffs=["financial_analyst", "news_analyst", "writer"],
system_message="""You are a research planning coordinator. Coordinate market research by delegating to specialized agents: - Financial Analyst: For stock data analysis - News Analyst: For news gathering and analysis - Writer: For compiling final report Always send your plan first, then handoff to appropriate agent. Always handoff to a single agent at a time. Use TERMINATE when research is complete.""",
)
# 创建金融分析师智能体
financial_analyst = AssistantAgent(
"financial_analyst",
model_client=model_client,
handoffs=["planner"],
tools=[get_stock_data],
system_message="""You are a financial analyst. Analyze stock market data using the get_stock_data tool. Provide insights on financial metrics. Always handoff back to planner when analysis is complete.""",
)
# 创建新闻分析师智能体
news_analyst = AssistantAgent(
"news_analyst",
model_client=model_client,
handoffs=["planner"],
tools=[get_news],
system_message="""You are a news analyst. Gather and analyze relevant news using the get_news tool. Summarize key market insights from news. Always handoff back to planner when analysis is complete.""",
)
# 创建撰写者智能体
writer = AssistantAgent(
"writer",
model_client=model_client,
handoffs=["planner"],
system_message="""You are a financial report writer. Compile research findings into clear, concise reports. Always handoff back to planner when writing is complete.""",
)
# 设置终止条件
text_termination = TextMentionTermination("TERMINATE")
termination = text_termination
research_team = Swarm(
participants=[planner, financial_analyst, news_analyst, writer], termination_condition=termination
)
# 定义任务
task = "Conduct market research for TSLA stock"
await Console(research_team.run_stream(task=task))
代码解读:
同样先导入所需模块和类,然后定义了
get_stock_data
和get_news
两个工具函数,分别模拟获取股票数据和新闻资讯的操作。创建
OpenAIChatCompletionClient
模型客户端,并指定模型为gpt-4o
。接着创建了四个智能体:
planner
作为规划协调者,其系统消息明确了任务分配规则和流程;financial_analyst
配备了get_stock_data
工具用于股票数据分析;news_analyst
利用get_news
工具进行新闻收集和分析;writer
负责撰写报告。每个智能体都设置了相应的任务交接目标和系统消息。设置终止条件为
TextMentionTermination("TERMINATE")
,当消息中提及 “TERMINATE” 时任务结束。最后定义任务 “Conduct market research for TSLA stock” 并运行研究团队任务,智能体之间按照设定的流程进行协作。
运行结果如下:
当用户输入 “Conduct market research for TSLA stock” 时,规划者首先启动任务分配:
---------- user ----------
Conduct market research for TSLA stock
---------- planner ----------
[FunctionCall(id='call_BX5QaRuhmB8CxTsBlqCUIXPb', arguments='{}', name='transfer_to_financial_analyst')]
[Prompt tokens: 169, Completion tokens: 166]
---------- planner ----------
[FunctionExecutionResult(content='Transferred to financial_analyst, adopting the role of financial_analyst immediately.', call_id='call_BX5QaRuhmB8CxTsBlqCUIXPb')]
---------- planner ----------
Transferred to financial_analyst, adopting the role of financial_analyst immediately.
金融分析师接收任务并调用工具获取股票数据:
---------- financial_analyst ----------
[FunctionCall(id='call_SAXy1ebtA9mnaZo4ztp