已经有很多平台集成RAG模式,dify,cherrystudio等,这里通过AI辅助,用DS的API实现一个简单的RAG部署。框架主要技术栈是Chroma,langchain,streamlit,答案流式输出,并且对答案加上索引。支持doc,docx,pdf,txt。
RAG
import os
import streamlit as st
import chromadb
import fitz # PyMuPDF
import pypandoc # DOC解析
from docx import Document
from typing import List, Dict, Any, Generator
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import BaseLLM
from pydantic import Field, BaseModel
from openai import OpenAI
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_core.outputs import LLMResult, Generation
import shutil
import re
from tenacity import retry, wait_exponential, stop_after_attempt
# 每次运行会将chroma_db删除,就要重新构建知识库。
# shutil.rmtree("./chroma_db", ignore_errors=True)
# 自定义支持流式输出的DeepSeek LLM类
class DeepSeekLLM(BaseLLM, BaseModel):
api_key: str = Field(..., description="DeepSeek API密钥")
base_url: str = Field(..., description="API基础地址")
@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3))
def _stream_call(self, prompt: str, stop: List[str] = None) -> Generator[str, None, None]:
client = OpenAI(api_key=self.api_key, base_url=self.base_url)
response = client.chat.completions.create(
model="deepseek-reasoner",
messages=[
{"role": "system", "content": "你是各领域资深专家"},
{"role": "user", "content": prompt},
],
temperature=0.3,
stream=True # 启用流式输出
)
for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def _call(self, prompt: str, stop: List[str] = None) -> str:
return "".join(self._stream_call(prompt, stop))
def _generate(self, prompts: List[str], stop: List[str] = None) -> LLMResult:
generations = []
for prompt in prompts:
stream_output = list(self._stream_call(prompt, stop))
generations.append([Generation(text="".join(stream_output))])
return LLMResult(generations=generations)
@property
def _llm_type(self) -> str:
return "deepseek-legal-llm"
@property
def _identifying_params(self) -> Dict[str, Any]:
return {"api_key": self.api_key, "base_url": self.base_url}
# 配置中文小模型(约300MB)
embeddings_bge = HuggingFaceEmbeddings(
model_name="./bge-large-zh-v1.5"
# model_kwargs={
# 'device': 'cpu' # 强制使用CPU,避免CUDA依赖
# }
)
test_embedding = embeddings_bge.embed_query("测试")
if not test_embedding or len(test_embedding) == 0:
st.error("Embedding 生成失败,请检查本地模型路径是否正确!")
raise RuntimeError("Embedding 生成失败")
# API配置
DEEPSEEK_API_KEY = "你的密钥"
DEEPSEEK_BASE_URL = "https://api.deepseek.com"
# 初始化组件
deepseek_llm = DeepSeekLLM(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)
# 文档处理函数
@st.cache_data # 在文件上传逻辑添加缓存(避免重复处理)
def process_document(file) -> str:
"""支持PDF/DOC/DOCX/TXT的解析"""
if file.name.endswith(".pdf"):
with fitz.open(stream=file.read()) as doc:
return "\n".join([page.get_text() for page in doc])
elif file.name.endswith(".docx"):
return "\n".join([p.text for p in Document(file).paragraphs])
elif file.name.endswith(".doc"):
return pypandoc.convert_text(file.read(), 'plain', format='doc')
elif file.name.endswith(".txt"):
return file.read().decode()
return ""
# RAG处理流程
# 修改后的流式RAG处理流程
def build_streaming_retrieval_chain():
vectorstore = Chroma(
collection_name="legal_docs",
embedding_function=embeddings_bge,
persist_directory="./chroma_db"
)
return RetrievalQA.from_chain_type(
llm=deepseek_llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(ssearch_kwargs={
"k": 3,
"filter": {"metadata_field": {"$gte": 0.7}}}), # 返回Top3且相似度>0.7
return_source_documents=True
)
# Streamlit界面
st.title("DeepSeek-RAG系统")
# 文件上传处理
uploaded_files = st.file_uploader("上传文件",
type=["pdf", "doc", "docx", "txt"],
accept_multiple_files=True)
if uploaded_files:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
# 使用LangChain的Chroma添加文档
vectorstore = Chroma(
collection_name="legal_docs",
embedding_function=embeddings_bge,
persist_directory="./chroma_db"
)
# if vectorstore._collection.count() == 0:
# st.warning("警告:Chroma 数据库为空,可能未成功加载任何数据")
for file in uploaded_files:
text = process_document(file)
if not text.strip(): # 检查解析后文本是否为空
st.error(f"文件 {file.name} 解析失败,跳过")
continue
chunks = text_splitter.split_text(text)
if not chunks: # 确保 chunks 不是空列表
st.error(f"文件 {file.name} 无法进行文本切分,跳过")
continue
metadatas = [{"source": file.name} for _ in chunks] # 让每个 chunk 记录来源文件名
vectorstore.add_texts(texts=chunks, metadatas=metadatas) # 添加文本和元数据
st.success(f"已成功加载{len(uploaded_files)}份文件")
if query := st.text_input("请输入问题:"):
# 初始化流式输出容器
answer_container = st.empty()
full_answer = ""
source_docs = []
# 创建QA链
qa_chain = build_streaming_retrieval_chain()
try:
# 执行查询并获取流式响应
result = qa_chain.invoke({"query": query})
source_docs = result['source_documents']
# 流式输出处理
for token in deepseek_llm._stream_call(result['result']):
full_answer += token
# 实时更新显示(带光标效果)
answer_container.markdown(f"**答案**:{full_answer}▌")
# 最终显示完整答案
answer_container.markdown(f"**答案**:{full_answer}")
except Exception as e:
st.error(f"生成中断: {str(e)}")
full_answer += "\n\n(输出因错误中断)"
answer_container.markdown(f"**答案**:{full_answer}")
# 处理引用标注(修改后的版本)
if full_answer and source_docs:
doc_references = {}
doc_counter = 1
for doc in source_docs:
source = doc.metadata.get('source', '未知来源')
if source not in doc_references:
doc_references[source] = str(doc_counter)
doc_counter += 1
# 智能分段与标注逻辑
segmentation_patterns = [
r'\n{2,}',
r'\n(?=\d+[\.、])',
r'\n(?=[•\-*□▶])',
r'(?<=。|!|?)\s+(?=.)'
]
split_regex = re.compile('|'.join(segmentation_patterns))
paragraphs = [p.strip() for p in split_regex.split(full_answer) if p.strip()]
formatted_paragraphs = []
sorted_citation = "".join(
f"[{num}]" for num in
sorted(doc_references.values(), key=lambda x: int(x))
)
for para in paragraphs:
if re.search(r'[。!?]$', para) or '。' in para:
formatted_para = para + f" {sorted_citation}"
else:
formatted_para = para
formatted_paragraphs.append(formatted_para)
# 更新显示带引用的答案
formatted_answer = '\n\n'.join(formatted_paragraphs)
answer_container.markdown(f"**答案**:{formatted_answer}")
# 显示来源文档
st.subheader("依据文件")
for source, num in sorted(doc_references.items(), key=lambda x: int(x[1])):
st.markdown(f"[{num}] {source}")