系列文章索引
LangChain教程 - 系列文章
在现代自然语言处理(NLP)中,基于文档内容的问答系统变得愈发重要,尤其是当我们需要从大量文档中提取信息时。通过结合文档检索和生成模型(如RAG,Retrieval-Augmented Generation),我们可以构建强大的问答系统。本博客将详细介绍如何使用FastAPI和LangChain框架,创建一个基于PDF文档的RAG问答API。
一、背景
在许多实际应用中,用户可能需要基于大量的PDF文件进行快速的问答查询。LangChain作为一个强大的框架,支持将各种数据源与生成模型集成,而FastAPI则是一个轻量级的Web框架,适用于构建高性能的API。在本案例中,我们将使用FastAPI作为API服务端,LangChain来处理文档加载、文本切分、向量存储和问答生成任务。
二、技术栈
- FastAPI:用于构建Web服务。
- LangChain:提供构建问答系统的工具,涉及文档加载、文本切分、向量存储、RAG链构建等功能。
- Ollama Embeddings:用于将文本转换为向量。
- Chroma:用于存储和检索文本向量的数据库。
- Starlette:FastAPI的底层库,用于支持流式响应。
三、实现步骤
1. 环境配置
首先,我们需要安装必需的库。你可以通过以下命令来安装:
pip install fastapi langchain langchain-chroma langchain-ollama langchain-community starlette uvicorn nest_asyncio
安装完毕后,我们可以开始构建我们的API。
2. 加载PDF并处理文本
我们从一个PDF文件加载文档,并将其切分成适合处理的小块。这样可以更高效地将文本转化为向量,并存储到数据库中。
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# 设置PDF文件路径
pdf_path = "../../files/pdf/en/Transformer.pdf"
# 加载PDF文档并分割文本
loader = PyPDFLoader(pdf_path)
docs = loader.load()
# 使用递归文本切分器来切分文档
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
splits = text_splitter.split_documents(docs)
在这段代码中,PyPDFLoader
被用来加载PDF文件,而RecursiveCharacterTextSplitter
则将文档切分为多个小块,确保每个块之间有200个字符的重叠,以便保持上下文的连贯性。
3. 存储向量到数据库
接下来,我们使用Chroma
来存储文档的向量表示。我们利用OllamaEmbeddings
模型将文本块转化为向量,并将它们存储在Chroma数据库中。
from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings
# 存储分割后的文档到向量数据库
vectorstore = Chroma.from_documents(documents=splits, embedding=OllamaEmbeddings(model="nomic-embed-text"))
这里,我们通过OllamaEmbeddings
将文档切分块转换为嵌入向量,并使用Chroma
将这些向量存储到数据库中。这样,我们就可以通过相似度检索来快速找到与用户查询相关的文档。
4. 构建检索器
为了支持从数据库中检索相关文档,我们将构建一个基于相似度搜索的检索器。
# 构建检索器
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
这段代码通过as_retriever
方法创建了一个检索器,能够基于向量的相似度从数据库中返回与查询最相关的文档。
5. 定义RAG链
我们使用RAG(检索增强生成)技术,将检索到的文档与生成模型(如ChatOllama
)结合,生成最终的答案。hub.pull("rlm/rag-prompt")
方法提供了一个预定义的RAG提示模板。
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama
# 定义RAG提示模板
prompt = hub.pull("rlm/rag-prompt")
# 格式化检索到的文档
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 定义RAG链
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| ChatOllama(model="deepseek-r1:7b")
| StrOutputParser()
)
在这里,我们将检索到的文档内容格式化为字符串,并将其与用户的查询一起传递到生成模型中,以生成最终的答案。
6. 生成答案和流式响应
我们定义了两个方法:一个是生成完整答案,另一个是生成流式响应。
import json
# 生成答案函数
async def generate_answer(question: str):
response = await rag_chain.ainvoke(question)
return response
# 生成流式响应
async def generate_streaming_response(question: str):
async for chunk in rag_chain.astream(question): # 使用astream逐块获取响应
yield json.dumps({"answer chunk": chunk}) + "\n" # 按流式返回每一块内容
在这部分代码中,generate_answer
方法会返回完整的答案,而generate_streaming_response
方法则返回流式响应,每次返回一个内容块。
7. 创建FastAPI应用
最后,我们使用FastAPI创建一个Web应用,提供一个POST接口来接收用户查询,并返回答案。
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from starlette.responses import StreamingResponse
# 创建FastAPI应用
app = FastAPI()
# 定义输入模型
class QueryModel(BaseModel):
question: str
stream: bool = False # 默认不流式返回
# 创建POST路由处理查询
@app.post("/query/")
async def query_question(query: QueryModel):
try:
if query.stream:
# 如果stream为True,使用流式响应
return StreamingResponse(generate_streaming_response(query.question), media_type="text/json")
else:
# 否则直接返回完整答案
answer = await generate_answer(query.question) # 使用await获取完整的答案
return {"answer": answer}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
此API提供了一个接口,用户可以通过发送带有问题的POST请求来获取答案。如果请求中指定stream
为True
,系统将返回流式的答案。
8. 启动(jupyter)
notebooks下载地址: https://github.com/flower-trees/langchain-example/blob/master/pdf/jupyter/chat_pdf_api.ipynb
import nest_asyncio
import uvicorn
nest_asyncio.apply()
uvicorn.run(app, host="127.0.0.1", port=8000)
9. 提问
curl -X POST http://127.0.0.1:8000/query/ \
-H "Content-Type: application/json" \
-d '{
"question": "Why is masking necessary in the decoder’s self-attention mechanism?",
"stream": true
}'
10. 清理向量数据库
vectorstore.delete_collection()
四、完整代码实例
代码下载地址: https://github.com/flower-trees/langchain-example/blob/master/pdf/chat_pdf_api.py
import json
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain import hub
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from starlette.responses import StreamingResponse
# 设置 PDF 文件路径
pdf_path = "../files/pdf/en/Transformer.pdf"
# 加载 PDF 文档并分割文本
loader = PyPDFLoader(pdf_path)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
splits = text_splitter.split_documents(docs)
# 存储分割后的文档到向量数据库
vectorstore = Chroma.from_documents(documents=splits, embedding=OllamaEmbeddings(model="nomic-embed-text"))
# 构建检索器
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})
# 定义 RAG 提示模板
prompt = hub.pull("rlm/rag-prompt")
# 格式化检索到的文档
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 定义 RAG 链
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| ChatOllama(model="deepseek-r1:7b")
| StrOutputParser()
)
print("RAG ready")
# 生成答案函数
async def generate_answer(question: str):
response = await rag_chain.ainvoke(question)
return response
# 生成流式响应
async def generate_streaming_response(question: str):
async for chunk in rag_chain.astream(question): # 使用 astream 逐块获取响应
yield json.dumps({"answer chunk": chunk}) + "\n" # 按流式返回每一块内容
# 8. 清理向量数据库
def clear_vectorstore():
vectorstore.delete_collection()
@asynccontextmanager
async def lifespan(app: FastAPI):
# 在应用启动时执行的代码
yield
# 在应用关闭时执行的代码
clear_vectorstore()
print("Vectorstore cleaned up successfully!")
# 创建 FastAPI 应用
app = FastAPI(lifespan=lifespan)
# 定义输入模型
class QueryModel(BaseModel):
question: str
stream: bool = False # 默认不流式返回
# 创建 POST 路由处理查询
@app.post("/query/")
async def query_question(query: QueryModel):
try:
if query.stream:
# 如果 `stream` 为 True,使用流式响应
return StreamingResponse(generate_streaming_response(query.question), media_type="text/json")
else:
# 否则直接返回完整答案
answer = await generate_answer(query.question) # 使用 await 获取完整的答案
return {"answer": answer}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 启动 FastAPI 应用(适用于开发环境)
# uvicorn chat_pdf_api:app --reload
五、总结
通过本教程,我们展示了如何使用FastAPI和LangChain框架,结合检索增强生成(RAG)技术,构建一个基于PDF文档的问答系统。系统支持两种查询方式:普通的完整答案返回和流式答案返回。借助LangChain提供的强大工具集,我们能够轻松地实现文档加载、文本切分、向量存储与检索等功能。FastAPI则让我们能够高效地将这些功能封装为一个Web API,供用户使用。
在实际应用中,这种基于文档的问答系统可以广泛应用于客户支持、知识库管理、教育培训等领域,为用户提供智能化的答案生成服务。