A2A demo简单实现(使用qwen模型)(一)

发布于:2025-05-10 ⋅ 阅读:(13) ⋅ 点赞:(0)

项目准备

初始化项目

uv init --package my-project
cd my-project

 创建虚拟环境

uv venv .venv

激活虚拟环境:

# For Windows
.venv\Scripts\activate

# For macOS/Linux
source .venv/bin/activate

添加A2A库

uv add git+https://github.com/djsamseng/A2A#subdirectory=samples/python --branch prefixPythonPackage

创建agent和task_manager文件

touch src/my_project/agent.pytouch 
touch src/my_project/task_manager.py

定义代理技能(Agent Skills)

基本结构

首先,我们定义技能的基本结构。示例:

{
  "id": "my-project-echo-skill",
  "name": "Echo Tool",
  "description": "Echos the input given",
  "tags": ["echo", "repeater"],
  "examples": ["I will see this echoed back to me"],
  "inputModes": ["text"],
  "outputModes": ["text"]
}

定义 Agent Card(代理名片)

基本概念

代理名片是描述代理能力和技能的 JSON 格式文档,此外它还包含身份验证机制。通过代理卡片,外界可以了解代理的功能以及如何与其交互。

代理卡片的主要作用是:

  • 公布代理的功能和技能。
  • 提供与代理交互时所需的信息(例如,代理的 URL 和输入/输出模式)。
  • 确保其他系统或用户能够正确调用和利用代理的能力。
# 添加代理能力和代理卡片
    capabilities = AgentCapabilities()
    agent_card = AgentCard(
        name="Echo Agent",
        description="This agent echos the input given",
        url=f"http://{host}:{port}/",
        version="0.1.0",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        capabilities=capabilities,
        skills=[skill]
    )
    logging.info(agent_card)

任务管理器(Task Manager)

基本概念

在创建服务器之前,我们需要实现一个任务管理器。任务管理器的职责是处理传入的请求,管理任务的状态并返回响应。

 实现任务管理器将实现 InMemoryTaskManager 接口,即继承InMemoryTaskManager类。这要求要实现以下两个异步方法:

async def on_send_task(
  self,
  request: SendTaskRequest
) -> SendTaskResponse:
  """
  此方法用于查询或创建代理任务。
  调用者将收到一个确切的响应。
  当需要获取单次任务执行结果时使用此方法。
  任务完成后会立即返回处理结果。
  """
  pass

async def on_send_task_subscribe(
  self,
  request: SendTaskStreamingRequest
) -> AsyncInterable[SendTaskStreamingResponse] | JSONRPCResponse:
  """
  此方法用于订阅任务的后续更新。
  调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。
  适用于需要持续监控任务进度的场景。
  可以实时获取任务状态变化、中间结果等信息。
  支持长时间运行的任务场景。
  """
  pass

本次demo的任务管理器实现src/my_project/task_manager.py 文件:

import asyncio
from typing import AsyncIterable
from my_project.agent import ReimbursementAgent
import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (
  Artifact,  # 任务产物/结果
  JSONRPCResponse,  # JSON-RPC响应
  Message,  # 消息对象
  SendTaskRequest,  # 发送任务请求
  SendTaskResponse,  # 发送任务响应
  SendTaskStreamingRequest,  # 流式发送任务请求
  SendTaskStreamingResponse,  # 流式发送任务响应
  Task,  # 任务对象
  TaskState,  # 任务状态
  TaskStatus,  # 任务状态信息
  TaskStatusUpdateEvent,  # 任务状态更新事件
)
from my_project.qwen import llm

class MyAgentTaskManager(InMemoryTaskManager):
  """自定义任务管理器类,继承自InMemoryTaskManager"""
  def __init__(self, agent:ReimbursementAgent):
    super().__init__()
    self.agent = agent

  async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
    """
  此方法用于查询或创建代理任务。
  调用者将收到一个确切的响应。
  当需要获取单次任务执行结果时使用此方法。
  任务完成后会立即返回处理结果。
  """
    # 将任务保存到内存任务管理器中
    await self.upsert_task(request.params)

    task_id = request.params.id
    # 获取接收到的文本内容
    query = request.params.message.parts[0].text
    
    # 初始化回应文本
    result = f"on_send_task received: {query}"
    
    try:
      result = self.agent.invoke(query, request.params.sessionId)
    except Exception as e:
      print(f"Error invoking agent: {e}")
      raise ValueError(f"Error invoking agent: {e}")
    
    parts = [{"type": "text", "text": result}]
    # 更新任务状态为已完成,并返回echo响应
    task = await self._update_task(
      task_id=task_id,
      task_state=TaskState.COMPLETED,
      response_text=f"on_send_task received: {result}"
    )

    # 返回任务响应
    return SendTaskResponse(id=request.id, result=task)

  async def _stream_3_messages(self, request: SendTaskStreamingRequest):
    # 异步方法:按顺序发送3条消息
    task_id = request.params.id  # 获取任务ID
    received_text = request.params.message.parts[0].text  # 获取用户发送的文本

    text_messages = ["one", "two", "three"]  # 定义要发送的3条消息
    for text in text_messages:
      # 为每条消息创建响应内容
      parts = [
        {
          "type": "text",
          "text": f"{received_text}: {text}",  # 组合用户输入和当前消息
        }
      ]
      message = Message(role="agent", parts=parts)  # 创建消息对象
      is_last = text == text_messages[-1]  # 检查是否是最后一条消息
      task_state = TaskState.COMPLETED if is_last else TaskState.WORKING  # 根据是否是最后一条消息设置任务状态
      task_status = TaskStatus(
        state=task_state,
        message=message
      )  # 创建任务状态对象
      task_update_event = TaskStatusUpdateEvent(
        id=request.params.id,
        status=task_status,
        final=is_last,
      )  # 创建任务更新事件
      await self.enqueue_events_for_sse(
        request.params.id,
        task_update_event
      )  # 将事件加入队列等待发送

  async def on_send_task_subscribe(
    self,
    request: SendTaskStreamingRequest
  ) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:
    """
    用于处理流式任务请求的方法
    目前未实现,后续可以根据需要添加流式处理逻辑
    流式传输允许客户端订阅服务器,并接收多个更新,而不是仅仅一个响应。
    这在处理长期运行的代理任务或需要将多个工件(Artifacts)传回客户端时非常有用。
  此方法用于订阅任务的后续更新。
  调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。
  适用于需要持续监控任务进度的场景。
  可以实时获取任务状态变化、中间结果等信息。
  支持长时间运行的任务场景。
    """
    # 处理订阅请求的异步方法
    # 将任务保存到内存任务管理器中
    await self.upsert_task(request.params)
    task_id = request.params.id
    # 为该任务创建一个工作队列
    sse_event_queue = await self.setup_sse_consumer(task_id=task_id)

    # 启动异步任务处理消息流
    asyncio.create_task(self._stream_3_messages(request))

    # 返回事件流,告诉客户端期待后续的流式响应
    return self.dequeue_events_for_sse(
      request_id=request.id,
      task_id=task_id,
      sse_event_queue=sse_event_queue,
    )

  async def _update_task(
    self,
    task_id: str,  # 任务ID
    task_state: TaskState,  # 任务状态
    response_text: str,  # 响应文本
  ) -> Task:
    """
    更新任务状态的内部方法
    参数:
      task_id: 要更新的任务ID
      task_state: 新的任务状态
      response_text: 响应文本内容
    返回:
      更新后的任务对象
    """
    # 从任务字典中获取任务对象
    task = self.tasks[task_id]
    # 构造响应部分
    agent_response_parts = [
      {
        "type": "text",
        "text": response_text,
      }
    ]
    # 更新任务状态
    task.status = TaskStatus(
      state=task_state,
      message=Message(
        role="agent",
        parts=agent_response_parts,
      )
    )
    # 更新任务产物
    task.artifacts = [
      Artifact(
        parts=agent_response_parts,
      )
    ]
    return task

不使用流式的话只需要实现on_send_task函数,on_send_task_subscribe主要用于流式的时候。

Agent功能

实现src/my_project/agent.py 文件,添加agent功能

import json
import random
from typing import Any, AsyncIterable, Dict, Optional
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_context import ToolContext
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from my_project.qwen import llm

# Local cache of created request_ids for demo purposes.
request_ids = set()

def create_request_form(date: Optional[str] = None, amount: Optional[str] = None, purpose: Optional[str] = None) -> dict[str, Any]:
  """
   Create a request form for the employee to fill out.
   
   Args:
       date (str): The date of the request. Can be an empty string.
       amount (str): The requested amount. Can be an empty string.
       purpose (str): The purpose of the request. Can be an empty string.
       
   Returns:
       dict[str, Any]: A dictionary containing the request form data.
   """
  request_id = "request_id_" + str(random.randint(1000000, 9999999))
  request_ids.add(request_id)
  return {
      "request_id": request_id,
      "date": "<transaction date>" if not date else date,
      "amount": "<transaction dollar amount>" if not amount else amount,
      "purpose": "<business justification/purpose of the transaction>" if not purpose else purpose,
  }

def return_form(
    form_request: dict[str, Any],    
    tool_context: ToolContext,
    instructions: Optional[str] = None) -> dict[str, Any]:
  """
   Returns a structured json object indicating a form to complete.
   
   Args:
       form_request (dict[str, Any]): The request form data.
       tool_context (ToolContext): The context in which the tool operates.
       instructions (str): Instructions for processing the form. Can be an empty string.       
       
   Returns:
       dict[str, Any]: A JSON dictionary for the form response.
   """  
  if isinstance(form_request, str):
    form_request = json.loads(form_request)

  tool_context.actions.skip_summarization = True
  tool_context.actions.escalate = True
  form_dict = {
      'type': 'form',
      'form': {
        'type': 'object',
        'properties': {
            'date': {
                'type': 'string',
                'format': 'date',
                'description': 'Date of expense',
                'title': 'Date',
            },
            'amount': {
                'type': 'string',
                'format': 'number',
                'description': 'Amount of expense',
                'title': 'Amount',
            },
            'purpose': {
                'type': 'string',
                'description': 'Purpose of expense',
                'title': 'Purpose',
            },
            'request_id': {
                'type': 'string',
                'description': 'Request id',
                'title': 'Request ID',
            },
        },
        'required': list(form_request.keys()),
      },
      'form_data': form_request,
      'instructions': instructions,
  }
  return json.dumps(form_dict)

def reimburse(request_id: str) -> dict[str, Any]:
  """Reimburse the amount of money to the employee for a given request_id."""
  if request_id not in request_ids:
    return {"request_id": request_id, "status": "Error: Invalid request_id."}
  return {"request_id": request_id, "status": "approved"}

class ReimbursementAgent:
  """An agent that handles reimbursement requests."""

  SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]

  def __init__(self):
    self._agent = self._build_agent()
    self._user_id = "remote_agent"
    self._runner = Runner(
        app_name=self._agent.name,
        agent=self._agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

  def invoke(self, query, session_id) -> str:
    session = self._runner.session_service.get_session(
        app_name=self._agent.name, user_id=self._user_id, session_id=session_id
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=query)]
    )
    if session is None:
      session = self._runner.session_service.create_session(
          app_name=self._agent.name,
          user_id=self._user_id,
          state={},
          session_id=session_id,
      )
    events = list(self._runner.run(
        user_id=self._user_id, session_id=session.id, new_message=content
    ))
    if not events or not events[-1].content or not events[-1].content.parts:
      return ""
    return "\n".join([p.text for p in events[-1].content.parts if p.text])

  async def stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:
    session = self._runner.session_service.get_session(
        app_name=self._agent.name, user_id=self._user_id, session_id=session_id
    )
    content = types.Content(
        role="user", parts=[types.Part.from_text(text=query)]
    )
    if session is None:
      session = self._runner.session_service.create_session(
          app_name=self._agent.name,
          user_id=self._user_id,
          state={},
          session_id=session_id,
      )
    async for event in self._runner.run_async(
        user_id=self._user_id, session_id=session.id, new_message=content
    ):
      if event.is_final_response():
        response = ""
        if (
            event.content
            and event.content.parts
            and event.content.parts[0].text
        ):
          response = "\n".join([p.text for p in event.content.parts if p.text])
        elif (
            event.content
            and event.content.parts
            and any([True for p in event.content.parts if p.function_response])):
          response = next((p.function_response.model_dump() for p in event.content.parts))
        yield {
            "is_task_complete": True,
            "content": response,
        }
      else:
        yield {
            "is_task_complete": False,
            "updates": "Processing the reimbursement request...",
        }

  def _build_agent(self) -> LlmAgent:
    """Builds the LLM agent for the reimbursement agent."""
    return LlmAgent(
        model=llm,
        name="reimbursement_agent",
        description=(
            "This agent handles the reimbursement process for the employees"
            " given the amount and purpose of the reimbursement."
        ),
        instruction="""
    You are an agent who handles the reimbursement process for employees.

    When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.
      1. 'Date': the date of the transaction.
      2. 'Amount': the dollar amount of the transaction.
      3. 'Business Justification/Purpose': the reason for the reimbursement.

    Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.

    Once you received the filled-out form back from the user, you should then check the form contains all required information:
      1. 'Date': the date of the transaction.
      2. 'Amount': the value of the amount of the reimbursement being requested.
      3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.

    If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.


    For valid reimbursement requests, you can then use reimburse() to reimburse the employee.
      * In your response, you should include the request_id and the status of the reimbursement request.

    """,
        tools=[
            create_request_form,
            reimburse,
            return_form,
        ],
    )

主函数

实现src/my_project/__init__.py 文件,实现主函数启动服务

import logging
import click
from dotenv import load_dotenv
import google_a2a
from google_a2a.common.types import AgentSkill,AgentCapabilities, AgentCard
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
from my_project.agent import ReimbursementAgent

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):
    skill = AgentSkill(
        id="my-project-echo-skill",
        name="Echo Tool",
        description="Echos the input given",
        tags=["echo", "repeater"],
        examples=["I will see this echoed back to me"],
        inputModes=["text"],
        outputModes=["text"],
    )
    logging.info(skill)
    # 添加代理能力和代理卡片
    capabilities = AgentCapabilities(
        streaming=False
    )
    agent_card = AgentCard(
        name="Echo Agent",
        description="This agent echos the input given",
        url=f"http://{host}:{port}/",
        version="0.1.0",
        defaultInputModes=["text"],
        defaultOutputModes=["text"],
        capabilities=capabilities,
        skills=[skill]
    )
    logging.info(agent_card)
    task_manager = MyAgentTaskManager(agent=ReimbursementAgent())  # 创建任务管理器实例
    server = A2AServer(
        agent_card=agent_card,
        task_manager=task_manager,
        host=host,
        port=port,
    )
    server.start()  # 启动服务器
    
if __name__ == "__main__":
    main()

大模型llm使用

我使用的qwen

my_project/qwen.py文件

from google.adk.models.lite_llm import LiteLlm

llm = LiteLlm(
    model="openai/qwen-max-2025-01-25",
    api_key='自己的阿里api——key',
    base_url='https://dashscope.aliyuncs.com/compatible-mode/v1',
    temperature=0.3,
)

测试功能

启动服务:

uv run my-project

这样就是启动成功了

然后启动谷歌自带的客户端:

uv run google-a2a-cli --agent http://localhost:10002

输入你好后

服务端的后台

 


网站公告

今日签到

点亮在社区的每一天
去签到