对于分布式智能体部署中的动态记忆库而言,OpenSearch 是一个绝佳的选择。实际上,对于许多用例来说,它可能比 DynamoDB/Redis 更胜一筹。让我来说明原因及实现方法:
对于分布式智能体部署中的动态记忆库,OpenSearch 是一个更优越的选择。以下是原因及实现方法:
为何 OpenSearch 更适合作为智能体记忆库
语义记忆搜索
与 DynamoDB 的键值查找不同,OpenSearch 支持语义记忆检索:
import boto3 from opensearchpy import OpenSearch, RequestsHttpConnection from aws_requests_auth.aws_auth import AWSRequestsAuth from strands import Agent, tool import json import time class OpenSearchMemoryBank: def __init__(self, endpoint: str, region: str = 'us-east-1'): # 用于 OpenSearch 的 AWS 认证 credentials = boto3.Session().get_credentials() awsauth = AWSRequestsAuth(credentials, region, 'es') self.client = OpenSearch( hosts=[{'host': endpoint.replace('https://', ''), 'port': 443}], http_auth=awsauth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection ) self.memory_index = "agent-shared-memory" # 智能体共享记忆索引 self.create_memory_index() def create_memory_index(self): """创建包含用于语义搜索的向量字段的索引""" index_body = { "settings": { "index": { "knn": True, "knn.algo_param.ef_search": 100 } }, "mappings": { "properties": { "memory_key": {"type": "keyword"}, # 记忆键 "content": {"type": "text"}, # 内容 "description": {"type": "text"}, # 描述 "agent_id": {"type": "keyword"}, # 智能体ID "timestamp": {"type": "date"}, # 时间戳 "tags": {"type": "keyword"}, # 标签 "memory_vector": { # 记忆向量 "type": "knn_vector", "dimension": 1536, # OpenAI 嵌入维度 "method": { "name": "hnsw", "space_type": "cosine", # 余弦相似度 "engine": "nmslib" } }, "context": {"type": "object"}, # 上下文 "importance_score": {"type": "float"} # 重要性分数 } } } if not self.client.indices.exists(self.memory_index): self.client.indices.create(self.memory_index, body=index_body) def store_memory(self, key: str, content: str, agent_id: str, description: str = "", tags: list = None, context: dict = None, importance: float = 1.0): """存储带有语义向量的记忆""" # 生成用于语义搜索的嵌入向量 embedding = self.generate_embedding(f"{description} {content}") doc = { "memory_key": key, "content": content, "description": description, "agent_id": agent_id, "timestamp": time.time(), "tags": tags or [], "memory_vector": embedding, "context": context or {}, "importance_score": importance } self.client.index( index=self.memory_index, id=key, body=doc ) def semantic_search(self, query: str, k: int = 5, agent_filter: str = None): """通过语义相似性搜索记忆""" query_vector = self.generate_embedding(query) search_body = { "size": k, "query": { "bool": { "must": [ { "knn": { "memory_vector": { "vector": query_vector, "k": k } } } ] } }, "sort": [ {"importance_score": {"order": "desc"}}, # 按重要性降序 {"timestamp": {"order": "desc"}} # 按时间戳降序 ] } # 如果指定了智能体过滤器,则添加 if agent_filter: search_body["query"]["bool"]["filter"] = [ {"term": {"agent_id": agent_filter}} ] response = self.client.search(index=self.memory_index, body=search_body) return response['hits']['hits'] def keyword_search(self, query: str, tags: list = None): """传统关键字搜索""" search_body = { "query": { "bool": { "should": [ {"match": {"content": query}}, {"match": {"description": query}} ] } } } if tags: search_body["query"]["bool"]["filter"] = [ {"terms": {"tags": tags}} ] response = self.client.search(index=self.memory_index, body=search_body) return response['hits']['hits'] def hybrid_search(self, query: str, k: int = 5): """结合语义和关键字搜索""" query_vector = self.generate_embedding(query) search_body = { "size": k, "query": { "bool": { "should": [ { "knn": { "memory_vector": { "vector": query_vector, "k": k, "boost": 2.0 # 提升权重 } } }, { "multi_match": { "query": query, "fields": ["content", "description"], "boost": 1.0 } } ] } } } response = self.client.search(index=self.memory_index, body=search_body) return response['hits']['hits'] def generate_embedding(self, text: str): """使用 Amazon Bedrock 生成嵌入向量""" bedrock = boto3.client('bedrock-runtime') response = bedrock.invoke_model( modelId='amazon.titan-embed-text-v1', body=json.dumps({"inputText": text}) ) result = json.loads(response['body'].read()) return result['embedding'] # 初始化 OpenSearch 记忆库 opensearch_memory = OpenSearchMemoryBank( endpoint=os.getenv('OPENSEARCH_ENDPOINT'), # 从环境变量获取端点 region=os.getenv('AWS_REGION', 'us-east-1') # 从环境变量获取区域,默认为 us-east-1 )
用于智能体的高级记忆工具
@tool def store_semantic_memory(key: str, content: str, description: str = "", tags: str = "", importance: float = 1.0) -> str: """存储可以通过语义搜索找到的记忆""" tag_list = [t.strip() for t in tags.split(",")] if tags else [] opensearch_memory.store_memory( key=key, content=content, agent_id=os.getenv('AGENT_ID', 'unknown'), # 从环境变量获取智能体ID,默认为 unknown description=description, tags=tag_list, importance=importance ) return f"已存储语义记忆: {key}" @tool def find_similar_memories(query: str, limit: int = 5) -> str: """使用 AI 语义搜索查找与查询相似的记忆""" results = opensearch_memory.semantic_search(query, k=limit) if not results: return "未找到相似记忆" memories = [] for hit in results: source = hit['_source'] score = hit['_score'] memories.append( f"键: {source['memory_key']}\n" f"描述: {source['description']}\n" f"内容: {source['content'][:200]}...\n" # 截取前200个字符 f"相关性: {score:.2f}\n" f"智能体: {source['agent_id']}\n---" ) return "\n".join(memories) @tool def search_memory_by_context(query: str, context_type: str = "") -> str: """使用混合(语义 + 关键字)搜索记忆""" results = opensearch_memory.hybrid_search(query, k=5) memories = [] for hit in results: source = hit['_source'] if context_type and context_type not in source.get('tags', []): continue memories.append( f"📝 {source['description']}\n" f"💭 {source['content']}\n" f"🏷️ 标签: {', '.join(source.get('tags', []))}\n" f"🤖 智能体: {source['agent_id']}\n" ) return "\n\n".join(memories) if memories else "未找到相关记忆" @tool def update_memory_importance(key: str, new_importance: float) -> str: """更新记忆的重要性分数""" opensearch_memory.client.update( index=opensearch_memory.memory_index, id=key, body={"doc": {"importance_score": new_importance}} ) return f"已将 {key} 的重要性更新为 {new_importance}"
使用 OpenSearch 进行生产环境部署
python
# production_agent_with_opensearch.py import os import asyncio from strands import Agent from strands.multiagent.a2a import A2AServer from strands.session.s3_session_manager import S3SessionManager class ProductionAgentWithOpenSearch: def __init__(self, agent_id: str): self.agent_id = agent_id # OpenSearch 记忆库 self.memory_bank = OpenSearchMemoryBank( endpoint=os.getenv('OPENSEARCH_ENDPOINT'), region=os.getenv('AWS_REGION', 'us-east-1') ) # 用于会话历史的 S3 会话管理器 self.session_manager = S3SessionManager( session_id=f"prod_{agent_id}", bucket=os.getenv('AGENT_MEMORY_BUCKET'), # 智能体记忆存储桶 prefix=f"sessions/{agent_id}" # 前缀 ) # 创建带有 OpenSearch 记忆工具的智能体 self.agent = Agent( name=f"生产环境智能体 {agent_id}", session_manager=self.session_manager, tools=[ store_semantic_memory, find_similar_memories, search_memory_by_context, update_memory_importance, *self.get_specialized_tools() # 获取特定工具 ] ) def get_specialized_tools(self): """重写此方法以提供智能体特定工具""" return [] async def start_service(self): """启动带有内存同步的 A2A 服务""" port = int(os.getenv('PORT', 9000)) # 端口,默认9000 server = A2AServer(agent=self.agent, port=port) # 启动内存维护任务 asyncio.create_task(self.memory_maintenance_task()) server.serve() async def memory_maintenance_task(self): """用于内存管理的后台任务""" while True: try: # 清理旧的、低重要性的记忆 await self.cleanup_old_memories() # 根据使用情况更新记忆重要性 await self.update_memory_scores() await asyncio.sleep(3600) # 每小时运行一次 except Exception as e: print(f"内存维护错误: {e}") async def cleanup_old_memories(self): """移除旧的、不重要的记忆""" cutoff_time = time.time() - (30 * 24 * 3600) # 30 天前 search_body = { "query": { "bool": { "must": [ {"range": {"timestamp": {"lt": cutoff_time}}}, # 时间戳早于 cutoff_time {"range": {"importance_score": {"lt": 0.3}}} # 重要性分数低于 0.3 ] } } } response = self.memory_bank.client.search( index=self.memory_bank.memory_index, body=search_body ) for hit in response['hits']['hits']: self.memory_bank.client.delete( index=self.memory_bank.memory_index, id=hit['_id'] )
多智能体记忆协作
class CollaborativeMemorySystem: def __init__(self): self.opensearch = OpenSearchMemoryBank( endpoint=os.getenv('OPENSEARCH_ENDPOINT') ) async def cross_agent_memory_sharing(self, source_agent: str, target_agents: list, memory_query: str): """在智能体之间共享相关记忆""" # 从源智能体查找相关记忆 memories = self.opensearch.semantic_search( query=memory_query, agent_filter=source_agent, k=10 ) # 为目标智能体创建共享记忆条目 for memory in memories: source = memory['_source'] shared_key = f"shared_{source['memory_key']}_{int(time.time())}" # 共享键名 for target_agent in target_agents: self.opensearch.store_memory( key=shared_key, content=source['content'], agent_id=target_agent, description=f"从 {source_agent} 共享: {source['description']}", tags=source['tags'] + ['shared_memory'], # 添加共享记忆标签 importance=source['importance_score'] * 0.8 # 重要性稍低 ) # 具有协作记忆功能的增强型智能体 class CollaborativeAgent(ProductionAgentWithOpenSearch): def __init__(self, agent_id: str): super().__init__(agent_id) self.memory_system = CollaborativeMemorySystem() @tool def share_memories_with_agents(self, topic: str, target_agents: str) -> str: """与其他智能体共享关于某个主题的相关记忆""" agent_list = [a.strip() for a in target_agents.split(",")] # 智能体列表 asyncio.create_task( self.memory_system.cross_agent_memory_sharing( source_agent=self.agent_id, target_agents=agent_list, memory_query=topic ) ) return f"正在与 {len(agent_list)} 个智能体共享关于 '{topic}' 的记忆"
用于自动扩展的 OpenSearch Serverless
# terraform/opensearch.tf resource "aws_opensearchserverless_collection" "agent_memory" { name = "agent-shared-memory" # 集合名称 type = "VECTORSEARCH" # 类型为向量搜索 tags = { Environment = "production" # 环境标签 Purpose = "agent-memory-bank" # 用途标签 } } resource "aws_opensearchserverless_access_policy" "agent_memory_access" { name = "agent-memory-access" # 策略名称 type = "data" # 策略类型为数据 policy = jsonencode([ { Rules = [ { ResourceType = "index" # 资源类型为索引 Resource = ["index/agent-shared-memory/*"] # 资源名称 Permission = ["aoss:*"] # 权限 } ] Principal = [aws_iam_role.agent_execution_role.arn] # 主体(执行角色ARN) } ]) }
完整的 Docker 部署
# docker-compose.yml version: '3.8' services: opensearch: image: opensearchproject/opensearch:2.11.0 # OpenSearch 镜像 environment: - discovery.type=single-node # 单节点发现 - plugins.security.disabled=true # 禁用安全插件(仅用于开发) - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # JVM 选项 ports: - "9200:9200" # 端口映射 volumes: - opensearch_data:/usr/share/opensearch/data # 数据卷 orchestrator: # 协调器服务 build: ./orchestrator # 构建上下文 environment: - AGENT_ID=orchestrator # 智能体ID - OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点 - AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶 ports: - "9000:9000" # 端口映射 depends_on: - opensearch # 依赖 OpenSearch 服务 data-specialist: # 数据专家服务 build: ./data-specialist # 构建上下文 environment: - AGENT_ID=data_specialist # 智能体ID - OPENSEARCH_ENDPOINT=http://opensearch:9200 # OpenSearch 端点 - AGENT_MEMORY_BUCKET=shared-agent-memory # 记忆存储桶 ports: - "9001:9001" # 端口映射 depends_on: - opensearch # 依赖 OpenSearch 服务 volumes: opensearch_data: # 定义数据卷
OpenSearch 作为智能体记忆库的主要优势
语义搜索 (Semantic Search): 按含义而非仅关键字查找记忆
向量存储 (Vector Storage): 原生支持嵌入向量和相似性搜索
混合搜索 (Hybrid Search): 结合语义和传统搜索
可扩展性 (Scalability): 可处理数十亿条记忆条目
实时性 (Real-time): 即时索引和搜索
分析能力 (Analytics): 内置仪表板和监控
成本效益 (Cost-Effective): 提供按使用量付费的无服务器选项
AWS 集成 (AWS Integration): 与 Bedrock、Lambda 等原生集成
对于分布式智能体部署中的动态记忆库,尤其是当您需要智能记忆检索和语义理解时,OpenSearch 绝对是更优越的选择!
资料来源
Vector search - Amazon OpenSearch Service (向量搜索 - Amazon OpenSearch Service)
Unlock the power of Amazon OpenSearch Service: Your learning guide for search, analytics, and generative AI solutions | AWS Training and Certification Blog (释放 Amazon OpenSearch Service 的力量:您的搜索、分析和生成式 AI 解决方案学习指南 | AWS 培训与认证博客)
Vector Embeddings for Search, RAG, Chatbots, Agents, and Generative AI - Vector Database for Amazon OpenSearch Service - AWS (用于搜索、RAG、聊天机器人、智能体和生成式 AI 的向量嵌入 - Amazon OpenSearch Service 的向量数据库 - AWS)
Open-Source Search Engine - Amazon OpenSearch Service Managed Service - AWS (开源搜索引擎 - Amazon OpenSearch Service 托管服务 - AWS)
Meta-tools - AWS Prescriptive Guidance (元工具 - AWS 规范性指南)