通过对dify源码的解析,用户上传的文档首先经过api处理后传递给文件处理服务层,对于知识管理,上传的 PDF 通过 IndexingRunnerindexing_runner.py
进入索引管道。 这个过程通常通过 Celery tasksdocument_indexing_task.py
异步执行。ExtractProcessor
作为文档处理的中央处理器根据文档的格式选择具体的Extractor
,PdfExtractor
类专门用于 PDF 文件,利用使用 pypdfium2 这个高效的 PDF 解析库,按页读取 PDF 内容。
PDF格式文档解析工作流程
Dify的文件解析功能是一个分层的系统架构,主要通过以下几个核心组件来实现:
核心架构
1. 基础抽象类
Dify定义了一个抽象基类BaseExtractor
,为所有文件提取器提供统一的接口
"""Abstract interface for document loader implementations."""
from abc import ABC, abstractmethod
class BaseExtractor(ABC):
"""Interface for extract files."""
@abstractmethod
def extract(self):
raise NotImplementedError
2. 中央处理器
ExtractProcessor
类作为核心协调器,负责根据文件类型选择合适的提取器来处理不同格式的文件 。
主要方法说明
- load_from_upload_file
输入:UploadFile 对象
功能:从上传的文件中抽取内容。可以返回 Document 列表,也可以只返回文本内容。 - load_from_url
输入:文件/网页的 URL
功能:通过 ssrf_proxy 获取远程文件内容,自动推断文件类型,保存到本地临时文件后进行抽取。 - extract
输入:ExtractSetting(抽取设置),可选 file_path
功能:根据数据源类型(本地文件、Notion、网站)和文件类型,选择合适的抽取器并执行抽取,返回 Document 列表。
SUPPORT_URL_CONTENT_TYPES = ["application/pdf", "text/plain", "application/json"]
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124"
" Safari/537.36"
)
class ExtractProcessor:
@classmethod
def load_from_upload_file(
cls, upload_file: UploadFile, return_text: bool = False, is_automatic: bool = False
) -> Union[list[Document], str]:
extract_setting = ExtractSetting(
datasource_type="upload_file", upload_file=upload_file, document_model="text_model"
)
if return_text:
delimiter = "\n"
return delimiter.join([document.page_content for document in cls.extract(extract_setting, is_automatic)])
else:
return cls.extract(extract_setting, is_automatic)
@classmethod
def load_from_url(cls, url: str, return_text: bool = False) -> Union[list[Document], str]:
response = ssrf_proxy.get(url, headers={"User-Agent": USER_AGENT})
with tempfile.TemporaryDirectory() as temp_dir:
suffix = Path(url).suffix
if not suffix and suffix != ".":
# get content-type
if response.headers.get("Content-Type"):
suffix = "." + response.headers.get("Content-Type").split("/")[-1]
else:
content_disposition = response.headers.get("Content-Disposition")
filename_match = re.search(r'filename="([^"]+)"', content_disposition)
if filename_match:
filename = unquote(filename_match.group(1))
match = re.search(r"\.(\w+)$", filename)
if match:
suffix = "." + match.group(1)
else:
suffix = ""
# FIXME mypy: Cannot determine type of 'tempfile._get_candidate_names' better not use it here
file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}" # type: ignore
Path(file_path).write_bytes(response.content)
extract_setting = ExtractSetting(datasource_type="upload_file", document_model="text_model")
if return_text:
delimiter = "\n"
return delimiter.join(
[
document.page_content
for document in cls.extract(extract_setting=extract_setting, file_path=file_path)
]
)
else:
return cls.extract(extract_setting=extract_setting, file_path=file_path)
@classmethod
def extract(
cls, extract_setting: ExtractSetting, is_automatic: bool = False, file_path: Optional[str] = None
) -> list[Document]:
if extract_setting.datasource_type == DatasourceType.FILE.value:
with tempfile.TemporaryDirectory() as temp_dir:
if not file_path:
assert extract_setting.upload_file is not None, "upload_file is required"
upload_file: UploadFile = extract_setting.upload_file
suffix = Path(upload_file.key).suffix
# FIXME mypy: Cannot determine type of 'tempfile._get_candidate_names' better not use it here
file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}" # type: ignore
storage.download(upload_file.key, file_path)
input_file = Path(file_path)
file_extension = input_file.suffix.lower()
etl_type = dify_config.ETL_TYPE
extractor: Optional[BaseExtractor] = None
if etl_type == "Unstructured":
unstructured_api_url = dify_config.UNSTRUCTURED_API_URL or ""
unstructured_api_key = dify_config.UNSTRUCTURED_API_KEY or ""
if file_extension in {".xlsx", ".xls"}:
extractor = ExcelExtractor(file_path)
elif file_extension == ".pdf":
extractor = PdfExtractor(file_path)
elif file_extension in {".md", ".markdown", ".mdx"}:
extractor = (
UnstructuredMarkdownExtractor(file_path, unstructured_api_url, unstructured_api_key)
if is_automatic
else MarkdownExtractor(file_path, autodetect_encoding=True)
)
/**
选择具体文档提取类
**/
elif file_extension == ".epub":
extractor = UnstructuredEpubExtractor(file_path, unstructured_api_url, unstructured_api_key)
else:
# txt
extractor = TextExtractor(file_path, autodetect_encoding=True)
else:
if file_extension in {".xlsx", ".xls"}:
extractor = ExcelExtractor(file_path)
elif file_extension == ".pdf":
extractor = PdfExtractor(file_path)
elif file_extension in {".md", ".markdown", ".mdx"}:
extractor = MarkdownExtractor(file_path, autodetect_encoding=True)
elif file_extension in {".htm", ".html"}:
extractor = HtmlExtractor(file_path)
elif file_extension == ".docx":
extractor = WordExtractor(file_path, upload_file.tenant_id, upload_file.created_by)
elif file_extension == ".csv":
extractor = CSVExtractor(file_path, autodetect_encoding=True)
elif file_extension == ".epub":
extractor = UnstructuredEpubExtractor(file_path)
else:
# txt
extractor = TextExtractor(file_path, autodetect_encoding=True)
return extractor.extract()
elif extract_setting.datasource_type == DatasourceType.NOTION.value:
assert extract_setting.notion_info is not None, "notion_info is required"
extractor = NotionExtractor(
notion_workspace_id=extract_setting.notion_info.notion_workspace_id,
notion_obj_id=extract_setting.notion_info.notion_obj_id,
notion_page_type=extract_setting.notion_info.notion_page_type,
document_model=extract_setting.notion_info.document,
tenant_id=extract_setting.notion_info.tenant_id,
)
return extractor.extract()
elif extract_setting.datasource_type == DatasourceType.WEBSITE.value:
assert extract_setting.website_info is not None, "website_info is required"
if extract_setting.website_info.provider == "firecrawl":
extractor = FirecrawlWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
elif extract_setting.website_info.provider == "watercrawl":
extractor = WaterCrawlWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
elif extract_setting.website_info.provider == "jinareader":
extractor = JinaReaderWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
else:
raise ValueError(f"Unsupported website provider: {extract_setting.website_info.provider}")
else:
raise ValueError(f"Unsupported datasource type: {extract_setting.datasource_type}")
深度集成到Dify的RAG系统和工作流系统中,为知识库构建和文档处理提供了强大的基础能力。