项目准备
初始化项目
uv init --package my-project
cd my-project
创建虚拟环境
uv venv .venv
激活虚拟环境:
# For Windows
.venv\Scripts\activate
# For macOS/Linux
source .venv/bin/activate
添加A2A库
uv add git+https://github.com/djsamseng/A2A#subdirectory=samples/python --branch prefixPythonPackage
创建agent和task_manager文件
touch src/my_project/agent.pytouch
touch src/my_project/task_manager.py
定义代理技能(Agent Skills)
基本结构
首先,我们定义技能的基本结构。示例:
{
"id": "my-project-echo-skill",
"name": "Echo Tool",
"description": "Echos the input given",
"tags": ["echo", "repeater"],
"examples": ["I will see this echoed back to me"],
"inputModes": ["text"],
"outputModes": ["text"]
}
定义 Agent Card(代理名片)
基本概念
代理名片是描述代理能力和技能的 JSON 格式文档,此外它还包含身份验证机制。通过代理卡片,外界可以了解代理的功能以及如何与其交互。
代理卡片的主要作用是:
- 公布代理的功能和技能。
- 提供与代理交互时所需的信息(例如,代理的 URL 和输入/输出模式)。
- 确保其他系统或用户能够正确调用和利用代理的能力。
# 添加代理能力和代理卡片
capabilities = AgentCapabilities()
agent_card = AgentCard(
name="Echo Agent",
description="This agent echos the input given",
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=capabilities,
skills=[skill]
)
logging.info(agent_card)
任务管理器(Task Manager)
基本概念
在创建服务器之前,我们需要实现一个任务管理器。任务管理器的职责是处理传入的请求,管理任务的状态并返回响应。
实现任务管理器将实现 InMemoryTaskManager
接口,即继承InMemoryTaskManager类。
这要求要实现以下两个异步方法:
async def on_send_task(
self,
request: SendTaskRequest
) -> SendTaskResponse:
"""
此方法用于查询或创建代理任务。
调用者将收到一个确切的响应。
当需要获取单次任务执行结果时使用此方法。
任务完成后会立即返回处理结果。
"""
pass
async def on_send_task_subscribe(
self,
request: SendTaskStreamingRequest
) -> AsyncInterable[SendTaskStreamingResponse] | JSONRPCResponse:
"""
此方法用于订阅任务的后续更新。
调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。
适用于需要持续监控任务进度的场景。
可以实时获取任务状态变化、中间结果等信息。
支持长时间运行的任务场景。
"""
pass
本次demo的任务管理器实现src/my_project/task_manager.py
文件:
import asyncio
from typing import AsyncIterable
from my_project.agent import ReimbursementAgent
import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (
Artifact, # 任务产物/结果
JSONRPCResponse, # JSON-RPC响应
Message, # 消息对象
SendTaskRequest, # 发送任务请求
SendTaskResponse, # 发送任务响应
SendTaskStreamingRequest, # 流式发送任务请求
SendTaskStreamingResponse, # 流式发送任务响应
Task, # 任务对象
TaskState, # 任务状态
TaskStatus, # 任务状态信息
TaskStatusUpdateEvent, # 任务状态更新事件
)
from my_project.qwen import llm
class MyAgentTaskManager(InMemoryTaskManager):
"""自定义任务管理器类,继承自InMemoryTaskManager"""
def __init__(self, agent:ReimbursementAgent):
super().__init__()
self.agent = agent
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
"""
此方法用于查询或创建代理任务。
调用者将收到一个确切的响应。
当需要获取单次任务执行结果时使用此方法。
任务完成后会立即返回处理结果。
"""
# 将任务保存到内存任务管理器中
await self.upsert_task(request.params)
task_id = request.params.id
# 获取接收到的文本内容
query = request.params.message.parts[0].text
# 初始化回应文本
result = f"on_send_task received: {query}"
try:
result = self.agent.invoke(query, request.params.sessionId)
except Exception as e:
print(f"Error invoking agent: {e}")
raise ValueError(f"Error invoking agent: {e}")
parts = [{"type": "text", "text": result}]
# 更新任务状态为已完成,并返回echo响应
task = await self._update_task(
task_id=task_id,
task_state=TaskState.COMPLETED,
response_text=f"on_send_task received: {result}"
)
# 返回任务响应
return SendTaskResponse(id=request.id, result=task)
async def _stream_3_messages(self, request: SendTaskStreamingRequest):
# 异步方法:按顺序发送3条消息
task_id = request.params.id # 获取任务ID
received_text = request.params.message.parts[0].text # 获取用户发送的文本
text_messages = ["one", "two", "three"] # 定义要发送的3条消息
for text in text_messages:
# 为每条消息创建响应内容
parts = [
{
"type": "text",
"text": f"{received_text}: {text}", # 组合用户输入和当前消息
}
]
message = Message(role="agent", parts=parts) # 创建消息对象
is_last = text == text_messages[-1] # 检查是否是最后一条消息
task_state = TaskState.COMPLETED if is_last else TaskState.WORKING # 根据是否是最后一条消息设置任务状态
task_status = TaskStatus(
state=task_state,
message=message
) # 创建任务状态对象
task_update_event = TaskStatusUpdateEvent(
id=request.params.id,
status=task_status,
final=is_last,
) # 创建任务更新事件
await self.enqueue_events_for_sse(
request.params.id,
task_update_event
) # 将事件加入队列等待发送
async def on_send_task_subscribe(
self,
request: SendTaskStreamingRequest
) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
"""
用于处理流式任务请求的方法
目前未实现,后续可以根据需要添加流式处理逻辑
流式传输允许客户端订阅服务器,并接收多个更新,而不是仅仅一个响应。
这在处理长期运行的代理任务或需要将多个工件(Artifacts)传回客户端时非常有用。
此方法用于订阅任务的后续更新。
调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。
适用于需要持续监控任务进度的场景。
可以实时获取任务状态变化、中间结果等信息。
支持长时间运行的任务场景。
"""
# 处理订阅请求的异步方法
# 将任务保存到内存任务管理器中
await self.upsert_task(request.params)
task_id = request.params.id
# 为该任务创建一个工作队列
sse_event_queue = await self.setup_sse_consumer(task_id=task_id)
# 启动异步任务处理消息流
asyncio.create_task(self._stream_3_messages(request))
# 返回事件流,告诉客户端期待后续的流式响应
return self.dequeue_events_for_sse(
request_id=request.id,
task_id=task_id,
sse_event_queue=sse_event_queue,
)
async def _update_task(
self,
task_id: str, # 任务ID
task_state: TaskState, # 任务状态
response_text: str, # 响应文本
) -> Task:
"""
更新任务状态的内部方法
参数:
task_id: 要更新的任务ID
task_state: 新的任务状态
response_text: 响应文本内容
返回:
更新后的任务对象
"""
# 从任务字典中获取任务对象
task = self.tasks[task_id]
# 构造响应部分
agent_response_parts = [
{
"type": "text",
"text": response_text,
}
]
# 更新任务状态
task.status = TaskStatus(
state=task_state,
message=Message(
role="agent",
parts=agent_response_parts,
)
)
# 更新任务产物
task.artifacts = [
Artifact(
parts=agent_response_parts,
)
]
return task
不使用流式的话只需要实现on_send_task函数,on_send_task_subscribe主要用于流式的时候。
Agent功能
实现src/my_project/agent.py
文件,添加agent功能
import json
import random
from typing import Any, AsyncIterable, Dict, Optional
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_context import ToolContext
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from my_project.qwen import llm
# Local cache of created request_ids for demo purposes.
request_ids = set()
def create_request_form(date: Optional[str] = None, amount: Optional[str] = None, purpose: Optional[str] = None) -> dict[str, Any]:
"""
Create a request form for the employee to fill out.
Args:
date (str): The date of the request. Can be an empty string.
amount (str): The requested amount. Can be an empty string.
purpose (str): The purpose of the request. Can be an empty string.
Returns:
dict[str, Any]: A dictionary containing the request form data.
"""
request_id = "request_id_" + str(random.randint(1000000, 9999999))
request_ids.add(request_id)
return {
"request_id": request_id,
"date": "<transaction date>" if not date else date,
"amount": "<transaction dollar amount>" if not amount else amount,
"purpose": "<business justification/purpose of the transaction>" if not purpose else purpose,
}
def return_form(
form_request: dict[str, Any],
tool_context: ToolContext,
instructions: Optional[str] = None) -> dict[str, Any]:
"""
Returns a structured json object indicating a form to complete.
Args:
form_request (dict[str, Any]): The request form data.
tool_context (ToolContext): The context in which the tool operates.
instructions (str): Instructions for processing the form. Can be an empty string.
Returns:
dict[str, Any]: A JSON dictionary for the form response.
"""
if isinstance(form_request, str):
form_request = json.loads(form_request)
tool_context.actions.skip_summarization = True
tool_context.actions.escalate = True
form_dict = {
'type': 'form',
'form': {
'type': 'object',
'properties': {
'date': {
'type': 'string',
'format': 'date',
'description': 'Date of expense',
'title': 'Date',
},
'amount': {
'type': 'string',
'format': 'number',
'description': 'Amount of expense',
'title': 'Amount',
},
'purpose': {
'type': 'string',
'description': 'Purpose of expense',
'title': 'Purpose',
},
'request_id': {
'type': 'string',
'description': 'Request id',
'title': 'Request ID',
},
},
'required': list(form_request.keys()),
},
'form_data': form_request,
'instructions': instructions,
}
return json.dumps(form_dict)
def reimburse(request_id: str) -> dict[str, Any]:
"""Reimburse the amount of money to the employee for a given request_id."""
if request_id not in request_ids:
return {"request_id": request_id, "status": "Error: Invalid request_id."}
return {"request_id": request_id, "status": "approved"}
class ReimbursementAgent:
"""An agent that handles reimbursement requests."""
SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]
def __init__(self):
self._agent = self._build_agent()
self._user_id = "remote_agent"
self._runner = Runner(
app_name=self._agent.name,
agent=self._agent,
artifact_service=InMemoryArtifactService(),
session_service=InMemorySessionService(),
memory_service=InMemoryMemoryService(),
)
def invoke(self, query, session_id) -> str:
session = self._runner.session_service.get_session(
app_name=self._agent.name, user_id=self._user_id, session_id=session_id
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
if session is None:
session = self._runner.session_service.create_session(
app_name=self._agent.name,
user_id=self._user_id,
state={},
session_id=session_id,
)
events = list(self._runner.run(
user_id=self._user_id, session_id=session.id, new_message=content
))
if not events or not events[-1].content or not events[-1].content.parts:
return ""
return "\n".join([p.text for p in events[-1].content.parts if p.text])
async def stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:
session = self._runner.session_service.get_session(
app_name=self._agent.name, user_id=self._user_id, session_id=session_id
)
content = types.Content(
role="user", parts=[types.Part.from_text(text=query)]
)
if session is None:
session = self._runner.session_service.create_session(
app_name=self._agent.name,
user_id=self._user_id,
state={},
session_id=session_id,
)
async for event in self._runner.run_async(
user_id=self._user_id, session_id=session.id, new_message=content
):
if event.is_final_response():
response = ""
if (
event.content
and event.content.parts
and event.content.parts[0].text
):
response = "\n".join([p.text for p in event.content.parts if p.text])
elif (
event.content
and event.content.parts
and any([True for p in event.content.parts if p.function_response])):
response = next((p.function_response.model_dump() for p in event.content.parts))
yield {
"is_task_complete": True,
"content": response,
}
else:
yield {
"is_task_complete": False,
"updates": "Processing the reimbursement request...",
}
def _build_agent(self) -> LlmAgent:
"""Builds the LLM agent for the reimbursement agent."""
return LlmAgent(
model=llm,
name="reimbursement_agent",
description=(
"This agent handles the reimbursement process for the employees"
" given the amount and purpose of the reimbursement."
),
instruction="""
You are an agent who handles the reimbursement process for employees.
When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.
1. 'Date': the date of the transaction.
2. 'Amount': the dollar amount of the transaction.
3. 'Business Justification/Purpose': the reason for the reimbursement.
Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.
Once you received the filled-out form back from the user, you should then check the form contains all required information:
1. 'Date': the date of the transaction.
2. 'Amount': the value of the amount of the reimbursement being requested.
3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.
If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.
For valid reimbursement requests, you can then use reimburse() to reimburse the employee.
* In your response, you should include the request_id and the status of the reimbursement request.
""",
tools=[
create_request_form,
reimburse,
return_form,
],
)
主函数
实现src/my_project/__init__.py
文件,实现主函数启动服务
import logging
import click
from dotenv import load_dotenv
import google_a2a
from google_a2a.common.types import AgentSkill,AgentCapabilities, AgentCard
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
from my_project.agent import ReimbursementAgent
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
skill = AgentSkill(
id="my-project-echo-skill",
name="Echo Tool",
description="Echos the input given",
tags=["echo", "repeater"],
examples=["I will see this echoed back to me"],
inputModes=["text"],
outputModes=["text"],
)
logging.info(skill)
# 添加代理能力和代理卡片
capabilities = AgentCapabilities(
streaming=False
)
agent_card = AgentCard(
name="Echo Agent",
description="This agent echos the input given",
url=f"http://{host}:{port}/",
version="0.1.0",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=capabilities,
skills=[skill]
)
logging.info(agent_card)
task_manager = MyAgentTaskManager(agent=ReimbursementAgent()) # 创建任务管理器实例
server = A2AServer(
agent_card=agent_card,
task_manager=task_manager,
host=host,
port=port,
)
server.start() # 启动服务器
if __name__ == "__main__":
main()
大模型llm使用
我使用的qwen
my_project/qwen.py文件
from google.adk.models.lite_llm import LiteLlm
llm = LiteLlm(
model="openai/qwen-max-2025-01-25",
api_key='自己的阿里api——key',
base_url='https://dashscope.aliyuncs.com/compatible-mode/v1',
temperature=0.3,
)
测试功能
启动服务:
uv run my-project
这样就是启动成功了
然后启动谷歌自带的客户端:
uv run google-a2a-cli --agent http://localhost:10002
输入你好后
服务端的后台