Ragflow 文档处理深度解析:从解析到存储的完整流程

发布于:2025-07-31 ⋅ 阅读:(21) ⋅ 点赞:(0)

1. 总体架构概览

Ragflow 的文档处理流程可以分为以下几个核心阶段:

文档上传 → 解析文档构建为Chunk → 数据增强阶段 → Embedding计算 → ES存储
  • 文档上传:用户通过前端界面上传原始文档,并配置解析参数,如:切片方法、自动关键词提取、自动问题提取数量等。
  • 解析文档构建为Chunk:后端根据设定的切片方法选择合适的解析器,将文档内容切分为结构化的知识片段(Chunk)
  • 数据增强阶段:对切分后的 Chunk 进行数据增强,比如对每个 Chunk 通过AI提取一些关键词、检索问题等
  • Embedding计算:对每个 Chunk 计算向量表示(Embedding)。
  • ES存储:将完整的 Chunk 信息写入 Elasticsearch,在后续知识检索时被使用。信息包括每个 Chunk 的文本、AI提取的关键词、AI提取的问题、Embedding等

源码主要对应于:

  • Task Executor (rag/svr/task_executor.py):解析任务执行
  • Parser Factory (rag/app/):各种文档解析器

2. 解析文档构建Chunk阶段 (build_chunks)

这个阶段负责将原始文档转换为结构化的知识片段。

不同类型的文档(如PDF、Word、Excel等)需要采用不同的解析策略来保证内容提取的准确性。

Ragflow 前端界面提供了多种切片方法选项,每种方法都对应特定的Python解析器类。下面详细说明前端切片方法与后端源码的对应关系:

前端切片方法与源码映射表
前端显示名称 对应源码文件 读取文档进行分开的函数名 适用场景
General rag/app/naive.py naive.chunk() 通用文档解析,支持PDF、Word、HTML等常规格式
Resume rag/app/resume.py resume.chunk() 简历文档专用,能识别姓名、经历、技能等结构化信息
Manual rag/app/manual.py manual.chunk() 手册类文档,支持层次化章节结构解析
Table rag/app/table.py table.chunk() 表格数据专用,保持表格结构和关系
Paper rag/app/paper.py paper.chunk() 学术论文专用,能识别摘要、作者、章节等学术结构
Book rag/app/book.py book.chunk() 长篇书籍文档,支持目录识别和智能章节分割
Laws rag/app/laws.py laws.chunk() 法律法规文档,按条款和章节进行结构化分割
Presentation rag/app/presentation.py presentation.chunk() PPT演示文档,按页面分割并保留图像信息
One rag/app/one.py one.chunk() 整文档模式,将整个文档作为单一chunk保持完整性
Picture rag/app/picture.py picture.chunk() 图片文档解析,提取图像信息和OCR文本
Audio rag/app/audio.py audio.chunk() 音频文档解析,语音转文字处理
Email rag/app/email.py email.chunk() 邮件文档解析,提取邮件头、正文等结构化信息
Q&A rag/app/qa.py qa.chunk() 问答对文档解析,识别问题-答案结构
Tag rag/app/tag.py tag.chunk() 标签文档解析,处理标签化的结构数据
Knowledge Graph rag/app/naive.py naive.chunk() 知识图谱模式,复用通用解析器但支持图谱构建
工厂模式实现

rag/svr/task_executor.py 中,系统通过工厂模式将前端选择映射到具体的解析器:

from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, email, tag

FACTORY = {
    "general": naive,
    ParserType.NAIVE.value: naive,           // 通用文档解析器,适用于PDF、Word、HTML等常规格式
    ParserType.PAPER.value: paper,           // 学术论文解析器,能识别摘要、作者、章节等学术结构
    ParserType.BOOK.value: book,             // 书籍解析器,支持目录识别和章节分割
    ParserType.PRESENTATION.value: presentation, // 演示文稿(PPT)解析器,按页面分割并保留图像信息
    ParserType.MANUAL.value: manual,         // 手册类文档解析器,支持层次化章节结构
    ParserType.LAWS.value: laws,             // 法律法规解析器,按条款和章节结构化分割
    ParserType.QA.value: qa,                 // 问答类文档解析器
    ParserType.TABLE.value: table,           // 表格数据解析器,保持表格结构和关系
    ParserType.RESUME.value: resume,         // 简历解析器,能识别姓名、经历、技能等结构化信息
    ParserType.PICTURE.value: picture,       // 图片文档解析器
    ParserType.ONE.value: one,               // 整文档模式,将整个文档作为单一chunk
    ParserType.AUDIO.value: audio,           // 音频文档解析器
    ParserType.EMAIL.value: email,           // 邮件文档解析器
    ParserType.KG.value: naive,              // 知识图谱模式,复用naive解析器
    ParserType.TAG.value: tag                // 标签文档解析器
}

当用户在前端选择某种切片方法时,系统会根据选择调用对应的解析器:

# 根据用户选择获取解析器
chunker = FACTORY[task["parser_id"].lower()] // task["parser_id"].lower() 示例:table
# 调用对应解析器的chunk方法
chunks = chunker.chunk(filename, binary=binary, **params)
Chunk数据格式详解

每个解析器的 chunk() 方法都会返回一个chunk列表,每个chunk是一个包含标准化字段的字典对象。不同类型的解析器会生成一些特定的字段,但所有chunk都包含以下核心字段:

基础字段(所有chunk共有):

字段名 数据类型 说明 示例
docnm_kwd string 源文档文件名(关键词化) "技术文档.pdf"
title_tks list 文档标题的分词结果 ["技术", "文档"]
title_sm_tks list 文档标题的细粒度分词 ["技", "术", "文", "档"]
content_with_weight string 带权重的内容文本 "这是文档的主要内容..."
content_ltks list 内容的标准分词结果 ["这是", "文档", "的", "主要", "内容"]
content_sm_ltks list 内容的细粒度分词 ["这", "是", "文", "档", "的", "主", "要", "内", "容"]

位置字段(PDF等结构化文档):

字段名 数据类型 说明 示例
page_num_int list[int] chunk所在页面编号列表 [1, 2]
position_int list[tuple] 精确位置坐标(页面, 左, 右, 顶, 底) [(1, 100, 500, 200, 300)]
top_int list[int] chunk在页面中的顶部位置 [200]

图像字段(包含图片的chunk):

字段名 数据类型 说明 示例
image PIL.Image 关联的图像对象 <PIL.Image.Image>
doc_type_kwd string 文档类型标识 "image"

其它解析器不再一一展开介绍,需要用到的时候可以读对应源码。

3. 数据增强阶段(通过设定的AI Promt提示词生成新的数据)

如果切片方法选择 General、Book、One 等,可以在前端界面上指定 自动关键词提取、自动问题提取 数量。如果指定数量不为 0,那么在基础的文档解析完成后,Ragflow 还会调用AI服务来增强chunk的语义信息。

3.1 自动关键词提取

当配置 自动关键词提取(auto_keywords) 数量大于 0 时,系统会为每个chunk自动生成指定数量的关键词。

关键词提取的prompt (rag/prompts/keyword_prompt.md) 如下:

## Role
You are a text analyzer.

## Task
Extract the most important keywords/phrases of a given piece of text content.

## Requirements
- Summarize the text content, and give the top {{ topn }} important keywords/phrases.
- The keywords MUST be in the same language as the given piece of text content.
- The keywords are delimited by ENGLISH COMMA.
- Output keywords ONLY.

---

## Text Content
{{ content }}

其中 content 是 Chunk 的文本,topn 是用户配置的关键词数量。

3.2 自动问题提取

当配置 自动问题提取(auto_questions) 数量大于 0 时,系统会为每个chunk自动生成指定数量的问题。

问题提取的prompt (rag/prompts/question_prompt.md) 如下:

## Role
You are a text analyzer.

## Task
Propose {{ topn }} questions about a given piece of text content.

## Requirements
- Understand and summarize the text content, and propose the top {{ topn }} important questions.
- The questions SHOULD NOT have overlapping meanings.
- The questions SHOULD cover the main content of the text as much as possible.
- The questions MUST be in the same language as the given piece of text content.
- One question per line.
- Output questions ONLY.

---

## Text Content
{{ content }}

其中 content 也是 Chunk 的文本,topn 是用户配置的问题数量。

4. Embedding计算阶段

Embedding计算是将文本转换为向量表示的关键步骤,这些向量将用于后续的语义检索。Ragflow 采用了标题和内容加权融合的embedding策略,具体实现位于 rag/svr/task_executor.pyembedding 函数中。

Ragflow 计算 Chunk 的 embedding 时,计算策略的伪代码如下:

content = AI提取的问题 if exists(AI提取的问题) else 原始Chunk文本
final_embeding = encode_as_embeding(Chunk所在文档的标题文本) * 0.1 + encode_as_embeding(content) * 0.9

说明:每个 Chunk 的 embedding 向量为 文档标题文本的 embedding 向量乘以 0.1,和 Chunk 的内容文本的 embedding 向量乘以 0.9 的和。其中 Chunk 的内容文本优先设置为对该Chunk的AI提取出的问题,如果没有指定自动问题提取数量,则问题为空,此时才会使用原始的 Chunk 文本。

具体源码的注释如下:

async def embedding(docs, mdl, parser_config=None, callback=None):
    """
    为文档chunks计算embedding向量
    Args:
        docs: chunk列表,每个chunk是一个字典
        mdl: embedding模型对象
        parser_config: 解析器配置,包含embedding权重等参数
        callback: 进度回调函数
    Returns:
        tk_count: 总token消耗数量
        vector_size: embedding向量的维度
    """
    if parser_config is None:
        parser_config = {}
    
    # 第一步:准备每个Chunk的标题文本和内容文本
    tts, cnts = [], []  # tts=titles标题列表, cnts=contents内容列表
    
    for d in docs:
        # 标题文本:使用文档名称,所有chunk共享同一个文档标题
        tts.append(d.get("docnm_kwd", "Title"))
        
        # 内容文本:优先使用AI生成的问题,如果没有则使用原始chunk内容
        c = "\n".join(d.get("question_kwd", []))  # 将AI生成的问题列表合并为一个字符串
        if not c:  # 如果没有AI问题,则使用原始内容
            c = d["content_with_weight"]
        
        c = re.sub(r"</?(table|td|caption|tr|th)( [^<>]{0,12})?>", " ", c)
        if not c:  
            c = "None"
        cnts.append(c)

    # 第二步:计算标题embedding向量(只计算一次,然后复制给相同文档的所有chunk)
    tk_count = 0  
    if len(tts) == len(cnts):  
        # 只对第一个标题进行embedding编码
        vts, c = await trio.to_thread.run_sync(lambda: mdl.encode(tts[0: 1]))
        # 将标题向量复制给所有chunk(因为同一文档的所有chunk共享标题)
        tts = np.concatenate([vts for _ in range(len(tts))], axis=0)
        tk_count += c  

    # 第三步:分批计算内容embedding向量(避免内存溢出)
    cnts_ = np.array([])  # 存储所有内容向量
    for i in range(0, len(cnts), EMBEDDING_BATCH_SIZE):
        async with embed_limiter: 
            # 对当前批次的内容进行embedding
            vts, c = await trio.to_thread.run_sync(
                lambda: mdl.encode([
                    truncate(c, mdl.max_length-10)  
                    for c in cnts[i: i + EMBEDDING_BATCH_SIZE]
                ])
            )
        # 拼接当前批次的向量到总向量数组
        if len(cnts_) == 0:
            cnts_ = vts
        else:
            cnts_ = np.concatenate((cnts_, vts), axis=0)
        tk_count += c  
        callback(prog=0.7 + 0.2 * (i + 1) / len(cnts), msg="")
    cnts = cnts_  # 将批量计算出的 content embeding 结果赋值给cnts
    
    # 第四步:加权融合标题向量和内容向量
    # 获取标题权重配置,默认为0.1(即10%)
    filename_embd_weight = parser_config.get("filename_embd_weight", 0.1)
    if not filename_embd_weight:  # 处理None值的情况
        filename_embd_weight = 0.1
    title_w = float(filename_embd_weight)  # 标题权重
    logging.info("标题权重 title_w: %s, 内容权重 1-title_w: %s", title_w, 1-title_w)
    
    # 加权融合公式:final_vector = title_weight * title_vector + content_weight * content_vector
    # 如果标题和内容向量数量一致,则进行融合;否则只使用内容向量
    vects = (title_w * tts + (1 - title_w) * cnts) if len(tts) == len(cnts) else cnts

    # 第五步:将最终embedding向量存储到每个chunk中
    assert len(vects) == len(docs)  # 确保向量数量与chunk数量一致
    vector_size = 0
    for i, d in enumerate(docs):
        v = vects[i].tolist()  # 将numpy数组转换为Python列表,便于JSON序列化
        vector_size = len(v)  # 记录向量维度
        # 存储向量到chunk中,字段名格式为 q_{维度}_vec(如q_768_vec、q_1024_vec)
        d["q_%d_vec" % len(v)] = v
    
    return tk_count, vector_size  # 返回token消耗和向量维度

5. ES存储阶段

最终处理后的每个Chunk数据会被存入 Doc Store 中,默认是 ES (rag\utils\es_conn.py)。

最终 ES Chunk 结构示例

{
  "id": "abc123...",
  "doc_id": "doc_001",
  "kb_id": "kb_001",
  "docnm_kwd": "技术文档.pdf",
  "title_tks": "技术 文档",
  "title_sm_tks": "技 术 文 档",
  "content_with_weight": "这是一段技术内容...",
  "content_ltks": "这是 一段 技术 内容",
  "content_sm_ltks": "这 是 一 段 技 术 内 容",
  "important_kwd": ["技术", "内容", "文档"],
  "important_tks": "技术 内容 文档",
  "question_kwd": ["什么是技术?", "如何使用?"],
  "question_tks": "什么 是 技术 如何 使用",
  "q_768_vec": [0.1, 0.2, ..., 0.8],  // 768维embedding向量
  "page_num_int": [1],
  "position_int": [(1, 100, 500, 200, 300)],
  "create_time": "2024-01-01 12:00:00",
  "create_timestamp_flt": 1704067200.0
}

网站公告

今日签到

点亮在社区的每一天
去签到