分布式部署的A2A strands agents sdk架构中的最佳选择,使用open search共享模型记忆

发布于:2025-09-12 ⋅ 阅读:(25) ⋅ 点赞:(0)

对于分布式智能体部署中的动态记忆库而言,OpenSearch 是一个绝佳的选择。实际上,对于许多用例来说,它可能比 DynamoDB/Redis 更胜一筹。让我来说明原因及实现方法:

对于分布式智能体部署中的动态记忆库,OpenSearch 是一个更优越的选择。以下是原因及实现方法:

为何 OpenSearch 更适合作为智能体记忆库

  1. 语义记忆搜索
    与 DynamoDB 的键值查找不同,OpenSearch 支持语义记忆检索:

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from aws_requests_auth.aws_auth import AWSRequestsAuth
from strands import Agent, tool
import json
import time

class OpenSearchMemoryBank:
    def __init__(self, endpoint: str, region: str = 'us-east-1'):
        # 用于 OpenSearch 的 AWS 认证
        credentials = boto3.Session().get_credentials()
        awsauth = AWSRequestsAuth(credentials, region, 'es')
        
        self.client = OpenSearch(
            hosts=[{'host': endpoint.replace('https://', ''), 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection
        )
        
        self.memory_index = "agent-shared-memory" # 智能体共享记忆索引
        self.create_memory_index()
    
    def create_memory_index(self):
        """创建包含用于语义搜索的向量字段的索引"""
        index_body = {
            "settings": {
                "index": {
                    "knn": True,
                    "knn.algo_param.ef_search": 100
                }
            },
            "mappings": {
                "properties": {
                    "memory_key": {"type": "keyword"}, # 记忆键
                    "content": {"type": "text"}, # 内容
                    "description": {"type": "text"}, # 描述
                    "agent_id": {"type": "keyword"}, # 智能体ID
                    "timestamp": {"type": "date"}, # 时间戳
                    "tags": {"type": "keyword"}, # 标签
                    "memory_vector": { # 记忆向量
                        "type": "knn_vector",
                        "dimension": 1536,  # OpenAI 嵌入维度
                        "method": {
                            "name": "hnsw",
                            "space_type": "cosine", # 余弦相似度
                            "engine": "nmslib"
                        }
                    },
                    "context": {"type": "object"}, # 上下文
                    "importance_score": {"type": "float"} # 重要性分数
                }
            }
        }
        
        if not self.client.indices.exists(self.memory_index):
            self.client.indices.create(self.memory_index, body=index_body)
    
    def store_memory(self, key: str, content: str, agent_id: str, 
                    description: str = "", tags: list = None, 
                    context: dict = None, importance: float = 1.0):
        """存储带有语义向量的记忆"""
        # 生成用于语义搜索的嵌入向量
        embedding = self.generate_embedding(f"{description} {content}")
        
        doc = {
            "memory_key": key,
            "content": content,
            "description": description,
            "agent_id": agent_id,
            "timestamp": time.time(),
            "tags": tags or [],
            "memory_vector": embedding,
            "context": context or {},
            "importance_score": importance
        }
        
        self.client.index(
            index=self.memory_index,
            id=key,
            body=doc
        )
    
    def semantic_search(self, query: str, k: int = 5, agent_filter: str = None):
        """通过语义相似性搜索记忆"""
        query_vector = self.generate_embedding(query)
        
        search_body = {
            "size": k,
            "query": {
                "bool": {
                    "must": [
                        {
                            "knn": {
                                "memory_vector": {
                                    "vector": query_vector,
                                    "k": k
                                }
                            }
                        }
                    ]
                }
            },
            "sort": [
                {"importance_score": {"order": "desc"}}, # 按重要性降序
                {"timestamp": {"order": "desc"}} # 按时间戳降序
            ]
        }
        
        # 如果指定了智能体过滤器,则添加
        if agent_filter:
            search_body["query"]["bool"]["filter"] = [
                {"term": {"agent_id": agent_filter}}
            ]
        
        response = self.client.search(index=self.memory_index, body=search_body)
        return response['hits']['hits']
    
    def keyword_search(self, query: str, tags: list = None):
        """传统关键字搜索"""
        search_body = {
            "query": {
                "bool": {
                    "should": [
                        {"match": {"content": query}},
                        {"match": {"description": query}}
                    ]
                }
            }
        }
        
        if tags:
            search_body["query"]["bool"]["filter"] = [
                {"terms": {"tags": tags}}
            ]
        
        response = self.client.search(index=self.memory_index, body=search_body)
        return response['hits']['hits']
    
    def hybrid_search(self, query: str, k: int = 5):
        """结合语义和关键字搜索"""
        query_vector = self.generate_embedding(query)
        
        search_body = {
            "size": k,
            "query": {
                "bool": {
                    "should": [
                        {
                            "knn": {
                                "memory_vector": {
                                    "vector": query_vector,
                                    "k": k,
                                    "boost": 2.0 # 提升权重
                                }
                            }
                        },
                        {
                            "multi_match": {
                                "query": query,
                                "fields": ["content", "description"],
                                "boost": 1.0
                            }
                        }
                    ]
                }
            }
        }
        
        response = self.client.search(index=self.memory_index, body=search_body)
        return response['hits']['hits']
    
    def generate_embedding(self, text: str):
        """使用 Amazon Bedrock 生成嵌入向量"""
        bedrock = boto3.client('bedrock-runtime')
        
        response = bedrock.invoke_model(
            modelId='amazon.titan-embed-text-v1',
            body=json.dumps({"inputText": text})
        )
        
        result = json.loads(response['body'].read())
        return result['embedding']

# 初始化 OpenSearch 记忆库
opensearch_memory = OpenSearchMemoryBank(
    endpoint=os.getenv('OPENSEARCH_ENDPOINT'), # 从环境变量获取端点
    region=os.getenv('AWS_REGION', 'us-east-1') # 从环境变量获取区域,默认为 us-east-1
)
  1. 用于智能体的高级记忆工具

@tool
def store_semantic_memory(key: str, content: str, description: str = "", 
                         tags: str = "", importance: float = 1.0) -> str:
    """存储可以通过语义搜索找到的记忆"""
    tag_list = [t.strip() for t in tags.split(",")] if tags else []
    
    opensearch_memory.store_memory(
        key=key,
        content=content,
        agent_id=os.getenv('AGENT_ID', 'unknown'), # 从环境变量获取智能体ID,默认为 unknown
        description=description,
        tags=tag_list,
        importance=importance
    )
    return f"已存储语义记忆: {key}"

@tool
def find_similar_memories(query: str, limit: int = 5) -> str:
    """使用 AI 语义搜索查找与查询相似的记忆"""
    results = opensearch_memory.semantic_search(query, k=limit)
    
    if not results:
        return "未找到相似记忆"
    
    memories = []
    for hit in results:
        source = hit['_source']
        score = hit['_score']
        memories.append(
            f"键: {source['memory_key']}\n"
            f"描述: {source['description']}\n"
            f"内容: {source['content'][:200]}...\n" # 截取前200个字符
            f"相关性: {score:.2f}\n"
            f"智能体: {source['agent_id']}\n---"
        )
    
    return "\n".join(memories)

@tool
def search_memory_by_context(query: str, context_type: str = "") -> str:
    """使用混合(语义 + 关键字)搜索记忆"""
    results = opensearch_memory.hybrid_search(query, k=5)
    
    memories = []
    for hit in results:
        source = hit['_source']
        if context_type and context_type not in source.get('tags', []):
            continue
            
        memories.append(
            f"📝 {source['description']}\n"
            f"💭 {source['content']}\n"
            f"🏷️ 标签: {', '.join(source.get('tags', []))}\n"
            f"🤖 智能体: {source['agent_id']}\n"
        )
    
    return "\n\n".join(memories) if memories else "未找到相关记忆"

@tool
def update_memory_importance(key: str, new_importance: float) -> str:
    """更新记忆的重要性分数"""
    opensearch_memory.client.update(
        index=opensearch_memory.memory_index,
        id=key,
        body={"doc": {"importance_score": new_importance}}
    )
    return f"已将 {key} 的重要性更新为 {new_importance}"
  1. 使用 OpenSearch 进行生产环境部署

python

# production_agent_with_opensearch.py
import os
import asyncio
from strands import Agent
from strands.multiagent.a2a import A2AServer
from strands.session.s3_session_manager import S3SessionManager

class ProductionAgentWithOpenSearch:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        
        # OpenSearch 记忆库
        self.memory_bank = OpenSearchMemoryBank(
            endpoint=os.getenv('OPENSEARCH_ENDPOINT'),
            region=os.getenv('AWS_REGION', 'us-east-1')
        )
        
        # 用于会话历史的 S3 会话管理器
        self.session_manager = S3SessionManager(
            session_id=f"prod_{agent_id}",
            bucket=os.getenv('AGENT_MEMORY_BUCKET'), # 智能体记忆存储桶
            prefix=f"sessions/{agent_id}" # 前缀
        )
        
        # 创建带有 OpenSearch 记忆工具的智能体
        self.agent = Agent(
            name=f"生产环境智能体 {agent_id}",
            session_manager=self.session_manager,
            tools=[
                store_semantic_memory,
                find_similar_memories,
                search_memory_by_context,
                update_memory_importance,
                *self.get_specialized_tools() # 获取特定工具
            ]
        )
    
    def get_specialized_tools(self):
        """重写此方法以提供智能体特定工具"""
        return []
    
    async def start_service(self):
        """启动带有内存同步的 A2A 服务"""
        port = int(os.getenv('PORT', 9000)) # 端口,默认9000
        server = A2AServer(agent=self.agent, port=port)
        
        # 启动内存维护任务
        asyncio.create_task(self.memory_maintenance_task())
        
        server.serve()
    
    async def memory_maintenance_task(self):
        """用于内存管理的后台任务"""
        while True:
            try:
                # 清理旧的、低重要性的记忆
                await self.cleanup_old_memories()
                
                # 根据使用情况更新记忆重要性
                await self.update_memory_scores()
                
                await asyncio.sleep(3600)  # 每小时运行一次
            except Exception as e:
                print(f"内存维护错误: {e}")
    
    async def cleanup_old_memories(self):
        """移除旧的、不重要的记忆"""
        cutoff_time = time.time() - (30 * 24 * 3600)  # 30 天前
        
        search_body = {
            "query": {
                "bool": {
                    "must": [
                        {"range": {"timestamp": {"lt": cutoff_time}}}, # 时间戳早于 cutoff_time
                        {"range": {"importance_score": {"lt": 0.3}}} # 重要性分数低于 0.3
                    ]
                }
            }
        }
        
        response = self.memory_bank.client.search(
            index=self.memory_bank.memory_index,
            body=search_body
        )
        
        for hit in response['hits']['hits']:
            self.memory_bank.client.delete(
                index=self.memory_bank.memory_index,
                id=hit['_id']
            )
  1. 多智能体记忆协作

class CollaborativeMemorySystem:
    def __init__(self):
        self.opensearch = OpenSearchMemoryBank(
            endpoint=os.getenv('OPENSEARCH_ENDPOINT')
        )
    
    async def cross_agent_memory_sharing(self, source_agent: str, 
                                       target_agents: list, 
                                       memory_query: str):
        """在智能体之间共享相关记忆"""
        
        # 从源智能体查找相关记忆
        memories = self.opensearch.semantic_search(
            query=memory_query,
            agent_filter=source_agent,
            k=10
        )
        
        # 为目标智能体创建共享记忆条目
        for memory in memories:
            source = memory['_source']
            shared_key = f"shared_{source['memory_key']}_{int(time.time())}" # 共享键名
            
            for target_agent in target_agents:
                self.opensearch.store_memory(
                    key=shared_key,
                    content=source['content'],
                    agent_id=target_agent,
                    description=f"从 {source_agent} 共享: {source['description']}",
                    tags=source['tags'] + ['shared_memory'], # 添加共享记忆标签
                    importance=source['importance_score'] * 0.8  # 重要性稍低
                )

# 具有协作记忆功能的增强型智能体
class CollaborativeAgent(ProductionAgentWithOpenSearch):
    def __init__(self, agent_id: str):
        super().__init__(agent_id)
        self.memory_system = CollaborativeMemorySystem()
    
    @tool
    def share_memories_with_agents(self, topic: str, target_agents: str) -> str:
        """与其他智能体共享关于某个主题的相关记忆"""
        agent_list = [a.strip() for a in target_agents.split(",")] # 智能体列表
        
        asyncio.create_task(
            self.memory_system.cross_agent_memory_sharing(
                source_agent=self.agent_id,
                target_agents=agent_list,
                memory_query=topic
            )
        )
        
        return f"正在与 {len(agent_list)} 个智能体共享关于 '{topic}' 的记忆"
  1. 用于自动扩展的 OpenSearch Serverless

# terraform/opensearch.tf
resource "aws_opensearchserverless_collection" "agent_memory" {
  name = "agent-shared-memory" # 集合名称
  type = "VECTORSEARCH" # 类型为向量搜索
  
  tags = {
    Environment = "production" # 环境标签
    Purpose     = "agent-memory-bank" # 用途标签
  }
}

resource "aws_opensearchserverless_access_policy" "agent_memory_access" {
  name = "agent-memory-access" # 策略名称
  type = "data" # 策略类型为数据
  
  policy = jsonencode([
    {
      Rules = [
        {
          ResourceType = "index" # 资源类型为索引
          Resource     = ["index/agent-shared-memory/*"] # 资源名称
          Permission   = ["aoss:*"] # 权限
        }
      ]
      Principal = [aws_iam_role.agent_execution_role.arn] # 主体(执行角色ARN)
    }
  ])
}
  1. 完整的 Docker 部署

# docker-compose.yml
version: '3.8'
services:
  opensearch:
    image: opensearchproject/opensearch:2.11.0 # OpenSearch 镜像
    environment:
      - discovery.type=single-node # 单节点发现
      - plugins.security.disabled=true # 禁用安全插件(仅用于开发)
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # JVM 选项
    ports:
      - "9200:9200" # 端口映射
    volumes:
      - opensearch_data:/usr/share/opensearch/data # 数据卷

  orchestrator: # 协调器服务
    build: ./orchestrator # 构建上下文
    environment:
      - AGENT_ID=orchestrator # 智能体ID
      - OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点
      - AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶
    ports:
      - "9000:9000" # 端口映射
    depends_on:
      - opensearch # 依赖 OpenSearch 服务

  data-specialist: # 数据专家服务
    build: ./data-specialist # 构建上下文
    environment:
      - AGENT_ID=data_specialist # 智能体ID
      - OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点
      - AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶
    ports:
      - "9001:9001" # 端口映射
    depends_on:
      - opensearch # 依赖 OpenSearch 服务

volumes:
  opensearch_data: # 定义数据卷

OpenSearch 作为智能体记忆库的主要优势

  • 语义搜索 (Semantic Search): 按含义而非仅关键字查找记忆

  • 向量存储 (Vector Storage): 原生支持嵌入向量和相似性搜索

  • 混合搜索 (Hybrid Search): 结合语义和传统搜索

  • 可扩展性 (Scalability): 可处理数十亿条记忆条目

  • 实时性 (Real-time): 即时索引和搜索

  • 分析能力 (Analytics): 内置仪表板和监控

  • 成本效益 (Cost-Effective): 提供按使用量付费的无服务器选项

  • AWS 集成 (AWS Integration): 与 Bedrock、Lambda 等原生集成

对于分布式智能体部署中的动态记忆库,尤其是当您需要智能记忆检索和语义理解时,OpenSearch 绝对是更优越的选择!

资料来源

  • Vector search - Amazon OpenSearch Service (向量搜索 - Amazon OpenSearch Service)

  • Unlock the power of Amazon OpenSearch Service: Your learning guide for search, analytics, and generative AI solutions | AWS Training and Certification Blog (释放 Amazon OpenSearch Service 的力量:您的搜索、分析和生成式 AI 解决方案学习指南 | AWS 培训与认证博客)

  • Vector Embeddings for Search, RAG, Chatbots, Agents, and Generative AI - Vector Database for Amazon OpenSearch Service - AWS (用于搜索、RAG、聊天机器人、智能体和生成式 AI 的向量嵌入 - Amazon OpenSearch Service 的向量数据库 - AWS)

  • Open-Source Search Engine - Amazon OpenSearch Service Managed Service - AWS (开源搜索引擎 - Amazon OpenSearch Service 托管服务 - AWS)

  • Meta-tools - AWS Prescriptive Guidance (元工具 - AWS 规范性指南)


网站公告

今日签到

点亮在社区的每一天
去签到