1. 总体架构概览
Ragflow 的文档处理流程可以分为以下几个核心阶段:
文档上传 → 解析文档构建为Chunk → 数据增强阶段 → Embedding计算 → ES存储
- 文档上传:用户通过前端界面上传原始文档,并配置解析参数,如:切片方法、自动关键词提取、自动问题提取数量等。
- 解析文档构建为Chunk:后端根据设定的切片方法选择合适的解析器,将文档内容切分为结构化的知识片段(Chunk)
- 数据增强阶段:对切分后的 Chunk 进行数据增强,比如对每个 Chunk 通过AI提取一些关键词、检索问题等
- Embedding计算:对每个 Chunk 计算向量表示(Embedding)。
- ES存储:将完整的 Chunk 信息写入 Elasticsearch,在后续知识检索时被使用。信息包括每个 Chunk 的文本、AI提取的关键词、AI提取的问题、Embedding等
源码主要对应于:
- Task Executor (
rag/svr/task_executor.py
):解析任务执行 - Parser Factory (
rag/app/
):各种文档解析器
2. 解析文档构建Chunk阶段 (build_chunks
)
这个阶段负责将原始文档转换为结构化的知识片段。
不同类型的文档(如PDF、Word、Excel等)需要采用不同的解析策略来保证内容提取的准确性。
Ragflow 前端界面提供了多种切片方法选项,每种方法都对应特定的Python解析器类。下面详细说明前端切片方法与后端源码的对应关系:
前端切片方法与源码映射表
前端显示名称 | 对应源码文件 | 读取文档进行分开的函数名 | 适用场景 |
---|---|---|---|
General | rag/app/naive.py |
naive.chunk() |
通用文档解析,支持PDF、Word、HTML等常规格式 |
Resume | rag/app/resume.py |
resume.chunk() |
简历文档专用,能识别姓名、经历、技能等结构化信息 |
Manual | rag/app/manual.py |
manual.chunk() |
手册类文档,支持层次化章节结构解析 |
Table | rag/app/table.py |
table.chunk() |
表格数据专用,保持表格结构和关系 |
Paper | rag/app/paper.py |
paper.chunk() |
学术论文专用,能识别摘要、作者、章节等学术结构 |
Book | rag/app/book.py |
book.chunk() |
长篇书籍文档,支持目录识别和智能章节分割 |
Laws | rag/app/laws.py |
laws.chunk() |
法律法规文档,按条款和章节进行结构化分割 |
Presentation | rag/app/presentation.py |
presentation.chunk() |
PPT演示文档,按页面分割并保留图像信息 |
One | rag/app/one.py |
one.chunk() |
整文档模式,将整个文档作为单一chunk保持完整性 |
Picture | rag/app/picture.py |
picture.chunk() |
图片文档解析,提取图像信息和OCR文本 |
Audio | rag/app/audio.py |
audio.chunk() |
音频文档解析,语音转文字处理 |
rag/app/email.py |
email.chunk() |
邮件文档解析,提取邮件头、正文等结构化信息 | |
Q&A | rag/app/qa.py |
qa.chunk() |
问答对文档解析,识别问题-答案结构 |
Tag | rag/app/tag.py |
tag.chunk() |
标签文档解析,处理标签化的结构数据 |
Knowledge Graph | rag/app/naive.py |
naive.chunk() |
知识图谱模式,复用通用解析器但支持图谱构建 |
工厂模式实现
在 rag/svr/task_executor.py
中,系统通过工厂模式将前端选择映射到具体的解析器:
from rag.app import laws, paper, presentation, manual, qa, table, book, resume, picture, naive, one, audio, email, tag
FACTORY = {
"general": naive,
ParserType.NAIVE.value: naive, // 通用文档解析器,适用于PDF、Word、HTML等常规格式
ParserType.PAPER.value: paper, // 学术论文解析器,能识别摘要、作者、章节等学术结构
ParserType.BOOK.value: book, // 书籍解析器,支持目录识别和章节分割
ParserType.PRESENTATION.value: presentation, // 演示文稿(PPT)解析器,按页面分割并保留图像信息
ParserType.MANUAL.value: manual, // 手册类文档解析器,支持层次化章节结构
ParserType.LAWS.value: laws, // 法律法规解析器,按条款和章节结构化分割
ParserType.QA.value: qa, // 问答类文档解析器
ParserType.TABLE.value: table, // 表格数据解析器,保持表格结构和关系
ParserType.RESUME.value: resume, // 简历解析器,能识别姓名、经历、技能等结构化信息
ParserType.PICTURE.value: picture, // 图片文档解析器
ParserType.ONE.value: one, // 整文档模式,将整个文档作为单一chunk
ParserType.AUDIO.value: audio, // 音频文档解析器
ParserType.EMAIL.value: email, // 邮件文档解析器
ParserType.KG.value: naive, // 知识图谱模式,复用naive解析器
ParserType.TAG.value: tag // 标签文档解析器
}
当用户在前端选择某种切片方法时,系统会根据选择调用对应的解析器:
# 根据用户选择获取解析器
chunker = FACTORY[task["parser_id"].lower()] // task["parser_id"].lower() 示例:table
# 调用对应解析器的chunk方法
chunks = chunker.chunk(filename, binary=binary, **params)
Chunk数据格式详解
每个解析器的 chunk()
方法都会返回一个chunk列表,每个chunk是一个包含标准化字段的字典对象。不同类型的解析器会生成一些特定的字段,但所有chunk都包含以下核心字段:
基础字段(所有chunk共有):
字段名 | 数据类型 | 说明 | 示例 |
---|---|---|---|
docnm_kwd |
string | 源文档文件名(关键词化) | "技术文档.pdf" |
title_tks |
list | 文档标题的分词结果 | ["技术", "文档"] |
title_sm_tks |
list | 文档标题的细粒度分词 | ["技", "术", "文", "档"] |
content_with_weight |
string | 带权重的内容文本 | "这是文档的主要内容..." |
content_ltks |
list | 内容的标准分词结果 | ["这是", "文档", "的", "主要", "内容"] |
content_sm_ltks |
list | 内容的细粒度分词 | ["这", "是", "文", "档", "的", "主", "要", "内", "容"] |
位置字段(PDF等结构化文档):
字段名 | 数据类型 | 说明 | 示例 |
---|---|---|---|
page_num_int |
list[int] | chunk所在页面编号列表 | [1, 2] |
position_int |
list[tuple] | 精确位置坐标(页面, 左, 右, 顶, 底) | [(1, 100, 500, 200, 300)] |
top_int |
list[int] | chunk在页面中的顶部位置 | [200] |
图像字段(包含图片的chunk):
字段名 | 数据类型 | 说明 | 示例 |
---|---|---|---|
image |
PIL.Image | 关联的图像对象 | <PIL.Image.Image> |
doc_type_kwd |
string | 文档类型标识 | "image" |
其它解析器不再一一展开介绍,需要用到的时候可以读对应源码。
3. 数据增强阶段(通过设定的AI Promt提示词生成新的数据)
如果切片方法选择 General、Book、One 等,可以在前端界面上指定 自动关键词提取、自动问题提取 数量。如果指定数量不为 0,那么在基础的文档解析完成后,Ragflow 还会调用AI服务来增强chunk的语义信息。
3.1 自动关键词提取
当配置 自动关键词提取(auto_keywords) 数量大于 0 时,系统会为每个chunk自动生成指定数量的关键词。
关键词提取的prompt (rag/prompts/keyword_prompt.md
) 如下:
## Role
You are a text analyzer.
## Task
Extract the most important keywords/phrases of a given piece of text content.
## Requirements
- Summarize the text content, and give the top {{ topn }} important keywords/phrases.
- The keywords MUST be in the same language as the given piece of text content.
- The keywords are delimited by ENGLISH COMMA.
- Output keywords ONLY.
---
## Text Content
{{ content }}
其中 content 是 Chunk 的文本,topn 是用户配置的关键词数量。
3.2 自动问题提取
当配置 自动问题提取(auto_questions) 数量大于 0 时,系统会为每个chunk自动生成指定数量的问题。
问题提取的prompt (rag/prompts/question_prompt.md
) 如下:
## Role
You are a text analyzer.
## Task
Propose {{ topn }} questions about a given piece of text content.
## Requirements
- Understand and summarize the text content, and propose the top {{ topn }} important questions.
- The questions SHOULD NOT have overlapping meanings.
- The questions SHOULD cover the main content of the text as much as possible.
- The questions MUST be in the same language as the given piece of text content.
- One question per line.
- Output questions ONLY.
---
## Text Content
{{ content }}
其中 content 也是 Chunk 的文本,topn 是用户配置的问题数量。
4. Embedding计算阶段
Embedding计算是将文本转换为向量表示的关键步骤,这些向量将用于后续的语义检索。Ragflow 采用了标题和内容加权融合的embedding策略,具体实现位于 rag/svr/task_executor.py
的 embedding
函数中。
Ragflow 计算 Chunk 的 embedding 时,计算策略的伪代码如下:
content = AI提取的问题 if exists(AI提取的问题) else 原始Chunk文本
final_embeding = encode_as_embeding(Chunk所在文档的标题文本) * 0.1 + encode_as_embeding(content) * 0.9
说明:每个 Chunk 的 embedding 向量为 文档标题文本的 embedding 向量乘以 0.1,和 Chunk 的内容文本的 embedding 向量乘以 0.9 的和。其中 Chunk 的内容文本优先设置为对该Chunk的AI提取出的问题,如果没有指定自动问题提取数量,则问题为空,此时才会使用原始的 Chunk 文本。
具体源码的注释如下:
async def embedding(docs, mdl, parser_config=None, callback=None):
"""
为文档chunks计算embedding向量
Args:
docs: chunk列表,每个chunk是一个字典
mdl: embedding模型对象
parser_config: 解析器配置,包含embedding权重等参数
callback: 进度回调函数
Returns:
tk_count: 总token消耗数量
vector_size: embedding向量的维度
"""
if parser_config is None:
parser_config = {}
# 第一步:准备每个Chunk的标题文本和内容文本
tts, cnts = [], [] # tts=titles标题列表, cnts=contents内容列表
for d in docs:
# 标题文本:使用文档名称,所有chunk共享同一个文档标题
tts.append(d.get("docnm_kwd", "Title"))
# 内容文本:优先使用AI生成的问题,如果没有则使用原始chunk内容
c = "\n".join(d.get("question_kwd", [])) # 将AI生成的问题列表合并为一个字符串
if not c: # 如果没有AI问题,则使用原始内容
c = d["content_with_weight"]
c = re.sub(r"</?(table|td|caption|tr|th)( [^<>]{0,12})?>", " ", c)
if not c:
c = "None"
cnts.append(c)
# 第二步:计算标题embedding向量(只计算一次,然后复制给相同文档的所有chunk)
tk_count = 0
if len(tts) == len(cnts):
# 只对第一个标题进行embedding编码
vts, c = await trio.to_thread.run_sync(lambda: mdl.encode(tts[0: 1]))
# 将标题向量复制给所有chunk(因为同一文档的所有chunk共享标题)
tts = np.concatenate([vts for _ in range(len(tts))], axis=0)
tk_count += c
# 第三步:分批计算内容embedding向量(避免内存溢出)
cnts_ = np.array([]) # 存储所有内容向量
for i in range(0, len(cnts), EMBEDDING_BATCH_SIZE):
async with embed_limiter:
# 对当前批次的内容进行embedding
vts, c = await trio.to_thread.run_sync(
lambda: mdl.encode([
truncate(c, mdl.max_length-10)
for c in cnts[i: i + EMBEDDING_BATCH_SIZE]
])
)
# 拼接当前批次的向量到总向量数组
if len(cnts_) == 0:
cnts_ = vts
else:
cnts_ = np.concatenate((cnts_, vts), axis=0)
tk_count += c
callback(prog=0.7 + 0.2 * (i + 1) / len(cnts), msg="")
cnts = cnts_ # 将批量计算出的 content embeding 结果赋值给cnts
# 第四步:加权融合标题向量和内容向量
# 获取标题权重配置,默认为0.1(即10%)
filename_embd_weight = parser_config.get("filename_embd_weight", 0.1)
if not filename_embd_weight: # 处理None值的情况
filename_embd_weight = 0.1
title_w = float(filename_embd_weight) # 标题权重
logging.info("标题权重 title_w: %s, 内容权重 1-title_w: %s", title_w, 1-title_w)
# 加权融合公式:final_vector = title_weight * title_vector + content_weight * content_vector
# 如果标题和内容向量数量一致,则进行融合;否则只使用内容向量
vects = (title_w * tts + (1 - title_w) * cnts) if len(tts) == len(cnts) else cnts
# 第五步:将最终embedding向量存储到每个chunk中
assert len(vects) == len(docs) # 确保向量数量与chunk数量一致
vector_size = 0
for i, d in enumerate(docs):
v = vects[i].tolist() # 将numpy数组转换为Python列表,便于JSON序列化
vector_size = len(v) # 记录向量维度
# 存储向量到chunk中,字段名格式为 q_{维度}_vec(如q_768_vec、q_1024_vec)
d["q_%d_vec" % len(v)] = v
return tk_count, vector_size # 返回token消耗和向量维度
5. ES存储阶段
最终处理后的每个Chunk数据会被存入 Doc Store 中,默认是 ES (rag\utils\es_conn.py)。
最终 ES Chunk 结构示例
{
"id": "abc123...",
"doc_id": "doc_001",
"kb_id": "kb_001",
"docnm_kwd": "技术文档.pdf",
"title_tks": "技术 文档",
"title_sm_tks": "技 术 文 档",
"content_with_weight": "这是一段技术内容...",
"content_ltks": "这是 一段 技术 内容",
"content_sm_ltks": "这 是 一 段 技 术 内 容",
"important_kwd": ["技术", "内容", "文档"],
"important_tks": "技术 内容 文档",
"question_kwd": ["什么是技术?", "如何使用?"],
"question_tks": "什么 是 技术 如何 使用",
"q_768_vec": [0.1, 0.2, ..., 0.8], // 768维embedding向量
"page_num_int": [1],
"position_int": [(1, 100, 500, 200, 300)],
"create_time": "2024-01-01 12:00:00",
"create_timestamp_flt": 1704067200.0
}