LangChain教程 - RAG - PDF问答

发布于:2025-02-27 ⋅ 阅读:(27) ⋅ 点赞:(0)

系列文章索引
LangChain教程 - 系列文章

在现代自然语言处理(NLP)中,基于文档内容的问答系统变得愈发重要,尤其是当我们需要从大量文档中提取信息时。通过结合文档检索和生成模型(如RAG,Retrieval-Augmented Generation),我们可以构建强大的问答系统。本博客将详细介绍如何使用FastAPI和LangChain框架,创建一个基于PDF文档的RAG问答API。


一、背景

在许多实际应用中,用户可能需要基于大量的PDF文件进行快速的问答查询。LangChain作为一个强大的框架,支持将各种数据源与生成模型集成,而FastAPI则是一个轻量级的Web框架,适用于构建高性能的API。在本案例中,我们将使用FastAPI作为API服务端,LangChain来处理文档加载、文本切分、向量存储和问答生成任务。

二、技术栈

  • FastAPI:用于构建Web服务。
  • LangChain:提供构建问答系统的工具,涉及文档加载、文本切分、向量存储、RAG链构建等功能。
  • Ollama Embeddings:用于将文本转换为向量。
  • Chroma:用于存储和检索文本向量的数据库。
  • Starlette:FastAPI的底层库,用于支持流式响应。

三、实现步骤

1. 环境配置

首先,我们需要安装必需的库。你可以通过以下命令来安装:

pip install fastapi langchain langchain-chroma langchain-ollama langchain-community starlette uvicorn nest_asyncio

安装完毕后,我们可以开始构建我们的API。

2. 加载PDF并处理文本

我们从一个PDF文件加载文档,并将其切分成适合处理的小块。这样可以更高效地将文本转化为向量,并存储到数据库中。

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 设置PDF文件路径
pdf_path = "../../files/pdf/en/Transformer.pdf"

# 加载PDF文档并分割文本
loader = PyPDFLoader(pdf_path)
docs = loader.load()

# 使用递归文本切分器来切分文档
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
splits = text_splitter.split_documents(docs)

在这段代码中,PyPDFLoader被用来加载PDF文件,而RecursiveCharacterTextSplitter则将文档切分为多个小块,确保每个块之间有200个字符的重叠,以便保持上下文的连贯性。

3. 存储向量到数据库

接下来,我们使用Chroma来存储文档的向量表示。我们利用OllamaEmbeddings模型将文本块转化为向量,并将它们存储在Chroma数据库中。

from langchain_chroma import Chroma
from langchain_ollama import OllamaEmbeddings

# 存储分割后的文档到向量数据库
vectorstore = Chroma.from_documents(documents=splits, embedding=OllamaEmbeddings(model="nomic-embed-text"))

这里,我们通过OllamaEmbeddings将文档切分块转换为嵌入向量,并使用Chroma将这些向量存储到数据库中。这样,我们就可以通过相似度检索来快速找到与用户查询相关的文档。

4. 构建检索器

为了支持从数据库中检索相关文档,我们将构建一个基于相似度搜索的检索器。

# 构建检索器
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})

这段代码通过as_retriever方法创建了一个检索器,能够基于向量的相似度从数据库中返回与查询最相关的文档。

5. 定义RAG链

我们使用RAG(检索增强生成)技术,将检索到的文档与生成模型(如ChatOllama)结合,生成最终的答案。hub.pull("rlm/rag-prompt")方法提供了一个预定义的RAG提示模板。

from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama

# 定义RAG提示模板
prompt = hub.pull("rlm/rag-prompt")

# 格式化检索到的文档
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 定义RAG链
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | ChatOllama(model="deepseek-r1:7b")
    | StrOutputParser()
)

在这里,我们将检索到的文档内容格式化为字符串,并将其与用户的查询一起传递到生成模型中,以生成最终的答案。

6. 生成答案和流式响应

我们定义了两个方法:一个是生成完整答案,另一个是生成流式响应。

import json

# 生成答案函数
async def generate_answer(question: str):
    response = await rag_chain.ainvoke(question)
    return response

# 生成流式响应
async def generate_streaming_response(question: str):
    async for chunk in rag_chain.astream(question):  # 使用astream逐块获取响应
        yield json.dumps({"answer chunk": chunk}) + "\n"  # 按流式返回每一块内容

在这部分代码中,generate_answer方法会返回完整的答案,而generate_streaming_response方法则返回流式响应,每次返回一个内容块。

7. 创建FastAPI应用

最后,我们使用FastAPI创建一个Web应用,提供一个POST接口来接收用户查询,并返回答案。

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from starlette.responses import StreamingResponse

# 创建FastAPI应用
app = FastAPI()

# 定义输入模型
class QueryModel(BaseModel):
    question: str
    stream: bool = False  # 默认不流式返回

# 创建POST路由处理查询
@app.post("/query/")
async def query_question(query: QueryModel):
    try:
        if query.stream:
            # 如果stream为True,使用流式响应
            return StreamingResponse(generate_streaming_response(query.question), media_type="text/json")
        else:
            # 否则直接返回完整答案
            answer = await generate_answer(query.question)  # 使用await获取完整的答案
            return {"answer": answer}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

此API提供了一个接口,用户可以通过发送带有问题的POST请求来获取答案。如果请求中指定streamTrue,系统将返回流式的答案。

8. 启动(jupyter)

notebooks下载地址: https://github.com/flower-trees/langchain-example/blob/master/pdf/jupyter/chat_pdf_api.ipynb

import nest_asyncio
import uvicorn
nest_asyncio.apply()
uvicorn.run(app, host="127.0.0.1", port=8000)

9. 提问

curl -X POST http://127.0.0.1:8000/query/ \
  -H "Content-Type: application/json" \
  -d '{
	"question": "Why is masking necessary in the decoder’s self-attention mechanism?",
	"stream": true
  }'

10. 清理向量数据库

vectorstore.delete_collection()

四、完整代码实例

代码下载地址: https://github.com/flower-trees/langchain-example/blob/master/pdf/chat_pdf_api.py

import json
from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from langchain import hub
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from starlette.responses import StreamingResponse

# 设置 PDF 文件路径
pdf_path = "../files/pdf/en/Transformer.pdf"

# 加载 PDF 文档并分割文本
loader = PyPDFLoader(pdf_path)
docs = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
splits = text_splitter.split_documents(docs)

# 存储分割后的文档到向量数据库
vectorstore = Chroma.from_documents(documents=splits, embedding=OllamaEmbeddings(model="nomic-embed-text"))

# 构建检索器
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 3})

# 定义 RAG 提示模板
prompt = hub.pull("rlm/rag-prompt")


# 格式化检索到的文档
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# 定义 RAG 链
rag_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | ChatOllama(model="deepseek-r1:7b")
        | StrOutputParser()
)

print("RAG ready")


# 生成答案函数
async def generate_answer(question: str):
    response = await rag_chain.ainvoke(question)
    return response

# 生成流式响应
async def generate_streaming_response(question: str):
    async for chunk in rag_chain.astream(question):  # 使用 astream 逐块获取响应
        yield json.dumps({"answer chunk": chunk}) + "\n"  # 按流式返回每一块内容

# 8. 清理向量数据库
def clear_vectorstore():
    vectorstore.delete_collection()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 在应用启动时执行的代码
    yield
    # 在应用关闭时执行的代码
    clear_vectorstore()
    print("Vectorstore cleaned up successfully!")

# 创建 FastAPI 应用
app = FastAPI(lifespan=lifespan)

# 定义输入模型
class QueryModel(BaseModel):
    question: str
    stream: bool = False  # 默认不流式返回

# 创建 POST 路由处理查询
@app.post("/query/")
async def query_question(query: QueryModel):
    try:
        if query.stream:
            # 如果 `stream` 为 True,使用流式响应
            return StreamingResponse(generate_streaming_response(query.question), media_type="text/json")
        else:
            # 否则直接返回完整答案
            answer = await generate_answer(query.question)  # 使用 await 获取完整的答案
            return {"answer": answer}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# 启动 FastAPI 应用(适用于开发环境)
# uvicorn chat_pdf_api:app --reload

五、总结

通过本教程,我们展示了如何使用FastAPI和LangChain框架,结合检索增强生成(RAG)技术,构建一个基于PDF文档的问答系统。系统支持两种查询方式:普通的完整答案返回和流式答案返回。借助LangChain提供的强大工具集,我们能够轻松地实现文档加载、文本切分、向量存储与检索等功能。FastAPI则让我们能够高效地将这些功能封装为一个Web API,供用户使用。

在实际应用中,这种基于文档的问答系统可以广泛应用于客户支持、知识库管理、教育培训等领域,为用户提供智能化的答案生成服务。