本文是《LangChain实战课》系列的第二十一篇,将带领您构建一个完整的自动化AI客服系统。通过结合对话记忆、工具调用和业务知识库,我们将创建一个能够处理复杂客户查询的智能客服解决方案。
前言
在现代商业环境中,客户服务是企业成功的关键因素之一。传统客服系统往往面临响应速度慢、人力成本高、服务时间有限等挑战。通过结合LangChain的先进技术,我们可以构建一个智能的自动化客服系统,提供24/7的高效服务,同时保持人性化的交互体验。
系统架构设计
核心组件
我们的自动化AI客服系统包含以下核心组件:
对话管理:处理多轮对话,维护对话上下文
工具调用:集成外部系统和API,执行具体操作
知识库检索:从企业文档中检索相关信息
意图识别:理解用户查询的真实意图
响应生成:生成自然、准确的回复
系统架构图
用户接口 → 对话管理器 → 意图识别器 → 工具执行器
↓ ↓
知识库检索 外部系统集成
↓ ↓
响应生成器 ← 结果处理器
环境准备与安装
首先安装必要的依赖包:
# 安装核心库
pip install langchain openai python-dotenv
# 安装向量数据库和嵌入模型
pip install chromadb sentence-transformers
# 安装文档处理工具
pip install pymupdf python-pptx python-docx
# 安装Web框架(可选,用于API接口)
pip install flask fastapi
# 安装其他工具库
pip install requests beautifulsoup4
设置环境变量:
export OPENAI_API_KEY="your-openai-api-key"
构建核心组件
1. 对话记忆管理
from langchain.memory import ConversationBufferWindowMemory, CombinedMemory
from langchain.schema import BaseMemory
from typing import Dict, List, Any
import json
class EnhancedConversationMemory:
def __init__(self, k=10):
# 短期记忆:保存最近k轮对话
self.short_term_memory = ConversationBufferWindowMemory(
memory_key="short_term",
k=k,
return_messages=True
)
# 长期记忆(简化实现,实际应用中可以使用数据库)
self.long_term_memory = {
}
# 用户信息记忆
self.user_profile_memory = {
}
def save_conversation(self, user_id: str, query: str, response: str):
"""保存对话记录"""
# 更新短期记忆
self.short_term_memory.save_context(
{
"input": query},
{
"output": response}
)
# 更新长期记忆
if user_id not in self.long_term_memory:
self.long_term_memory[user_id] = []
self.long_term_memory[user_id].append({
"timestamp": self._get_timestamp(),
"query": query,
"response": response
})
def get_conversation_history(self, user_id: str, limit: int = 5) -> List[Dict]:
"""获取对话历史"""
if user_id in self.long_term_memory:
return self.long_term_memory[user_id][-limit:]
return []
def update_user_profile(self, user_id: str, key: str, value: Any):
"""更新用户信息"""
if user_id not in self.user_profile_memory:
self.user_profile_memory[user_id] = {
}
self.user_profile_memory[user_id][key] = value
def get_user_profile(self, user_id: str) -> Dict:
"""获取用户信息"""
return self.user_profile_memory.get(user_id, {
})
def get_memory_context(self, user_id: str) -> str:
"""获取记忆上下文"""
context_parts = []
# 添加用户信息
profile = self.get_user_profile(user_id)
if profile:
context_parts.append("用户信息:")
for key, value in profile.items():
context_parts.append(f"- {
key}: {
value}")
# 添加对话历史
history = self.get_conversation_history(user_id)
if history:
context_parts.append("对话历史:")
for i, item in enumerate(history[-3:], 1): # 最近3条
context_parts.append(f"{
i}. 用户: {
item['query']}")
context_parts.append(f" 助手: {
item['response']}")
return "\n".join(context_parts) if context_parts else "无历史记录"
def _get_timestamp(self):
"""获取时间戳"""
from datetime import datetime
return datetime.now().isoformat()
# 使用示例
memory_manager = EnhancedConversationMemory(k=8)
# 模拟对话
user_id = "user_123"
memory_manager.save_conversation(user_id, "你好,我想查询订单状态", "好的,请提供您的订单号")
memory_manager.save_conversation(user_id, "订单号是ORD123456", "正在查询订单ORD123456...")
# 获取记忆上下文
context = memory_manager.get_memory_context(user_id)
print("记忆上下文:\n", context)
2. 业务知识库构建
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader, DirectoryLoader
import os
class BusinessKnowledgeBase:
def __init__(self, persist_directory: str = "./knowledge_db"):
self.embeddings = OpenAIEmbeddings()
self.persist_directory = persist_directory
self.vectorstore = None
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
def build_knowledge_base(self, data_directory: str):
"""构建知识库"""
# 加载文档
loader = DirectoryLoader(
data_directory,
glob="**/*.txt",
loader_cls=TextLoader
)
documents = loader.load()
# 分割文档
texts = self.text_splitter.split_documents(documents)
# 创建向量存储
self.vectorstore = Chroma.from_documents(
documents=texts,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
self.vectorstore.persist()
print(f"知识库构建完成,包含 {
len(texts)} 个文档块")
def load_knowledge_base(self):
"""加载已有知识库"""
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
def search_relevant_info(self, query: str, k: int = 3) -> List[str]:
"""搜索相关信息"""
if self.vectorstore is None:
return ["知识库未初始化"]
results = self.vectorstore.similarity_search(query, k=k)
return [doc.page_content for doc in results]
def add_document(self, document_path: str):
"""添加新文档到知识库"""
if self.vectorstore is None:
self.load_knowledge_base()
loader = TextLoader(document_path)
documents = loader.load()
texts = self.text_splitter.split_documents(documents)
self.vectorstore.add_documents(texts)
self.vectorstore.persist()
print(f"添加了 {
len(texts)} 个新文档块")
# 使用示例
knowledge_base = BusinessKnowledgeBase()
# 构建知识库(第一次运行)
# knowledge_base.build_knowledge_base("./knowledge_documents")
# 加载已有知识库
knowledge_base.load_knowledge_base()
# 搜索信息
results = knowledge_base.search_relevant_info("退货政策是什么?")
print("相关知识:", results[0] if results else "未找到相关信息")
3. 工具集成系统
from langchain.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, Optional
import requests
import json
class OrderQueryInput(BaseModel):
order_id: str = Field(description="订单编号")
class