DeepSeek+RAG局域网部署

发布于:2025-03-26 ⋅ 阅读:(15) ⋅ 点赞:(0)

已经有很多平台集成RAG模式,dify,cherrystudio等,这里通过AI辅助,用DS的API实现一个简单的RAG部署。框架主要技术栈是Chroma,langchain,streamlit,答案流式输出,并且对答案加上索引。支持doc,docx,pdf,txt。

RAG

import os
import streamlit as st
import chromadb
import fitz  # PyMuPDF
import pypandoc  # DOC解析
from docx import Document
from typing import List, Dict, Any, Generator
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import BaseLLM
from pydantic import Field, BaseModel
from openai import OpenAI
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_core.outputs import LLMResult, Generation
import shutil
import re
from tenacity import retry, wait_exponential, stop_after_attempt


# 每次运行会将chroma_db删除,就要重新构建知识库。
# shutil.rmtree("./chroma_db", ignore_errors=True)


# 自定义支持流式输出的DeepSeek LLM类
class DeepSeekLLM(BaseLLM, BaseModel):
    api_key: str = Field(..., description="DeepSeek API密钥")
    base_url: str = Field(..., description="API基础地址")

    @retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3))
    def _stream_call(self, prompt: str, stop: List[str] = None) -> Generator[str, None, None]:
        client = OpenAI(api_key=self.api_key, base_url=self.base_url)
        response = client.chat.completions.create(
            model="deepseek-reasoner",
            messages=[
                {"role": "system", "content": "你是各领域资深专家"},
                {"role": "user", "content": prompt},
            ],
            temperature=0.3,
            stream=True  # 启用流式输出
        )
        for chunk in response:
            if chunk.choices and chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    def _call(self, prompt: str, stop: List[str] = None) -> str:
        return "".join(self._stream_call(prompt, stop))

    def _generate(self, prompts: List[str], stop: List[str] = None) -> LLMResult:
        generations = []
        for prompt in prompts:
            stream_output = list(self._stream_call(prompt, stop))
            generations.append([Generation(text="".join(stream_output))])
        return LLMResult(generations=generations)

    @property
    def _llm_type(self) -> str:
        return "deepseek-legal-llm"

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        return {"api_key": self.api_key, "base_url": self.base_url}

# 配置中文小模型(约300MB)
embeddings_bge = HuggingFaceEmbeddings(
    model_name="./bge-large-zh-v1.5"
    # model_kwargs={
    #     'device': 'cpu'  # 强制使用CPU,避免CUDA依赖
    # }
)
test_embedding = embeddings_bge.embed_query("测试")
if not test_embedding or len(test_embedding) == 0:
    st.error("Embedding 生成失败,请检查本地模型路径是否正确!")
    raise RuntimeError("Embedding 生成失败")

# API配置
DEEPSEEK_API_KEY = "你的密钥"
DEEPSEEK_BASE_URL = "https://api.deepseek.com"

# 初始化组件
deepseek_llm = DeepSeekLLM(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)


# 文档处理函数
@st.cache_data # 在文件上传逻辑添加缓存(避免重复处理)
def process_document(file) -> str:
    """支持PDF/DOC/DOCX/TXT的解析"""
    if file.name.endswith(".pdf"):
        with fitz.open(stream=file.read()) as doc:
            return "\n".join([page.get_text() for page in doc])
    elif file.name.endswith(".docx"):
        return "\n".join([p.text for p in Document(file).paragraphs])
    elif file.name.endswith(".doc"):
        return pypandoc.convert_text(file.read(), 'plain', format='doc')
    elif file.name.endswith(".txt"):
        return file.read().decode()
    return ""


# RAG处理流程
# 修改后的流式RAG处理流程
def build_streaming_retrieval_chain():
    vectorstore = Chroma(
        collection_name="legal_docs",
        embedding_function=embeddings_bge,
        persist_directory="./chroma_db"
    )
    return RetrievalQA.from_chain_type(
        llm=deepseek_llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(ssearch_kwargs={
        "k": 3,
        "filter": {"metadata_field": {"$gte": 0.7}}}), # 返回Top3且相似度>0.7
        return_source_documents=True
    )



# Streamlit界面
st.title("DeepSeek-RAG系统")

# 文件上传处理
uploaded_files = st.file_uploader("上传文件",
                                  type=["pdf", "doc", "docx", "txt"],
                                  accept_multiple_files=True)
if uploaded_files:
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    # 使用LangChain的Chroma添加文档
    vectorstore = Chroma(
        collection_name="legal_docs",
        embedding_function=embeddings_bge,
        persist_directory="./chroma_db"
    )
    # if vectorstore._collection.count() == 0:
    #     st.warning("警告:Chroma 数据库为空,可能未成功加载任何数据")
    for file in uploaded_files:
        text = process_document(file)
        if not text.strip():  # 检查解析后文本是否为空
            st.error(f"文件 {file.name} 解析失败,跳过")
            continue

        chunks = text_splitter.split_text(text)
        if not chunks:  # 确保 chunks 不是空列表
            st.error(f"文件 {file.name} 无法进行文本切分,跳过")
            continue
        metadatas = [{"source": file.name} for _ in chunks]  # 让每个 chunk 记录来源文件名
        vectorstore.add_texts(texts=chunks, metadatas=metadatas)  # 添加文本和元数据
    st.success(f"已成功加载{len(uploaded_files)}份文件")

if query := st.text_input("请输入问题:"):
    # 初始化流式输出容器
    answer_container = st.empty()
    full_answer = ""
    source_docs = []

    # 创建QA链
    qa_chain = build_streaming_retrieval_chain()

    try:
        # 执行查询并获取流式响应
        result = qa_chain.invoke({"query": query})
        source_docs = result['source_documents']

        # 流式输出处理
        for token in deepseek_llm._stream_call(result['result']):
            full_answer += token
            # 实时更新显示(带光标效果)
            answer_container.markdown(f"**答案**:{full_answer}▌")

        # 最终显示完整答案
        answer_container.markdown(f"**答案**:{full_answer}")

    except Exception as e:
        st.error(f"生成中断: {str(e)}")
        full_answer += "\n\n(输出因错误中断)"
        answer_container.markdown(f"**答案**:{full_answer}")

    # 处理引用标注(修改后的版本)
    if full_answer and source_docs:
        doc_references = {}
        doc_counter = 1
        for doc in source_docs:
            source = doc.metadata.get('source', '未知来源')
            if source not in doc_references:
                doc_references[source] = str(doc_counter)
                doc_counter += 1

        # 智能分段与标注逻辑
        segmentation_patterns = [
            r'\n{2,}',
            r'\n(?=\d+[\.、])',
            r'\n(?=[•\-*□▶])',
            r'(?<=。|!|?)\s+(?=.)'
        ]
        split_regex = re.compile('|'.join(segmentation_patterns))
        paragraphs = [p.strip() for p in split_regex.split(full_answer) if p.strip()]

        formatted_paragraphs = []
        sorted_citation = "".join(
            f"[{num}]" for num in
            sorted(doc_references.values(), key=lambda x: int(x))
        )

        for para in paragraphs:
            if re.search(r'[。!?]$', para) or '。' in para:
                formatted_para = para + f" {sorted_citation}"
            else:
                formatted_para = para
            formatted_paragraphs.append(formatted_para)

        # 更新显示带引用的答案
        formatted_answer = '\n\n'.join(formatted_paragraphs)
        answer_container.markdown(f"**答案**:{formatted_answer}")

        # 显示来源文档
        st.subheader("依据文件")
        for source, num in sorted(doc_references.items(), key=lambda x: int(x[1])):
            st.markdown(f"[{num}] {source}")