RAG实战之dify源码文件解析-pdf文件解析流程

发布于:2025-07-10 ⋅ 阅读:(22) ⋅ 点赞:(0)

通过对dify源码的解析,用户上传的文档首先经过api处理后传递给文件处理服务层,对于知识管理,上传的 PDF 通过 IndexingRunnerindexing_runner.py进入索引管道。 这个过程通常通过 Celery tasksdocument_indexing_task.py 异步执行。ExtractProcessor作为文档处理的中央处理器根据文档的格式选择具体的ExtractorPdfExtractor 类专门用于 PDF 文件,利用使用 pypdfium2 这个高效的 PDF 解析库,按页读取 PDF 内容。
在这里插入图片描述

PDF格式文档解析工作流程

User "Upload API" "FileService" "IndexingRunner" "ExtractProcessor" "PdfExtractor" "CleanProcessor" Upload PDF file Validate and save file Return file metadata Trigger indexing process Extract text from document Process PDF file Use pypdfium2 to extract text Return extracted text Clean extracted text Return processed text Store in knowledge base User "Upload API" "FileService" "IndexingRunner" "ExtractProcessor" "PdfExtractor" "CleanProcessor"

Dify的文件解析功能是一个分层的系统架构,主要通过以下几个核心组件来实现:

核心架构

1. 基础抽象类

Dify定义了一个抽象基类BaseExtractor,为所有文件提取器提供统一的接口

"""Abstract interface for document loader implementations."""

from abc import ABC, abstractmethod


class BaseExtractor(ABC):
    """Interface for extract files."""

    @abstractmethod
    def extract(self):
        raise NotImplementedError

2. 中央处理器

ExtractProcessor类作为核心协调器,负责根据文件类型选择合适的提取器来处理不同格式的文件 。
主要方法说明

  • load_from_upload_file
    输入:UploadFile 对象
    功能:从上传的文件中抽取内容。可以返回 Document 列表,也可以只返回文本内容。
  • load_from_url
    输入:文件/网页的 URL
    功能:通过 ssrf_proxy 获取远程文件内容,自动推断文件类型,保存到本地临时文件后进行抽取。
  • extract
    输入:ExtractSetting(抽取设置),可选 file_path
    功能:根据数据源类型(本地文件、Notion、网站)和文件类型,选择合适的抽取器并执行抽取,返回 Document 列表。
SUPPORT_URL_CONTENT_TYPES = ["application/pdf", "text/plain", "application/json"]
USER_AGENT = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124"
    " Safari/537.36"
)


class ExtractProcessor:
    @classmethod
    def load_from_upload_file(
        cls, upload_file: UploadFile, return_text: bool = False, is_automatic: bool = False
    ) -> Union[list[Document], str]:
        extract_setting = ExtractSetting(
            datasource_type="upload_file", upload_file=upload_file, document_model="text_model"
        )
        if return_text:
            delimiter = "\n"
            return delimiter.join([document.page_content for document in cls.extract(extract_setting, is_automatic)])
        else:
            return cls.extract(extract_setting, is_automatic)

    @classmethod
    def load_from_url(cls, url: str, return_text: bool = False) -> Union[list[Document], str]:
        response = ssrf_proxy.get(url, headers={"User-Agent": USER_AGENT})

        with tempfile.TemporaryDirectory() as temp_dir:
            suffix = Path(url).suffix
            if not suffix and suffix != ".":
                # get content-type
                if response.headers.get("Content-Type"):
                    suffix = "." + response.headers.get("Content-Type").split("/")[-1]
                else:
                    content_disposition = response.headers.get("Content-Disposition")
                    filename_match = re.search(r'filename="([^"]+)"', content_disposition)
                    if filename_match:
                        filename = unquote(filename_match.group(1))
                        match = re.search(r"\.(\w+)$", filename)
                        if match:
                            suffix = "." + match.group(1)
                        else:
                            suffix = ""
            # FIXME mypy: Cannot determine type of 'tempfile._get_candidate_names' better not use it here
            file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"  # type: ignore
            Path(file_path).write_bytes(response.content)
            extract_setting = ExtractSetting(datasource_type="upload_file", document_model="text_model")
            if return_text:
                delimiter = "\n"
                return delimiter.join(
                    [
                        document.page_content
                        for document in cls.extract(extract_setting=extract_setting, file_path=file_path)
                    ]
                )
            else:
                return cls.extract(extract_setting=extract_setting, file_path=file_path)

    @classmethod
    def extract(
        cls, extract_setting: ExtractSetting, is_automatic: bool = False, file_path: Optional[str] = None
    ) -> list[Document]:
        if extract_setting.datasource_type == DatasourceType.FILE.value:
            with tempfile.TemporaryDirectory() as temp_dir:
                if not file_path:
                    assert extract_setting.upload_file is not None, "upload_file is required"
                    upload_file: UploadFile = extract_setting.upload_file
                    suffix = Path(upload_file.key).suffix
                    # FIXME mypy: Cannot determine type of 'tempfile._get_candidate_names' better not use it here
                    file_path = f"{temp_dir}/{next(tempfile._get_candidate_names())}{suffix}"  # type: ignore
                    storage.download(upload_file.key, file_path)
                input_file = Path(file_path)
                file_extension = input_file.suffix.lower()
                etl_type = dify_config.ETL_TYPE
                extractor: Optional[BaseExtractor] = None
                if etl_type == "Unstructured":
                    unstructured_api_url = dify_config.UNSTRUCTURED_API_URL or ""
                    unstructured_api_key = dify_config.UNSTRUCTURED_API_KEY or ""

                    if file_extension in {".xlsx", ".xls"}:
                        extractor = ExcelExtractor(file_path)
                    elif file_extension == ".pdf":
                        extractor = PdfExtractor(file_path)
                    elif file_extension in {".md", ".markdown", ".mdx"}:
                        extractor = (
                            UnstructuredMarkdownExtractor(file_path, unstructured_api_url, unstructured_api_key)
                            if is_automatic
                            else MarkdownExtractor(file_path, autodetect_encoding=True)
                        )
                   /**
                   选择具体文档提取类
                   **/
                    elif file_extension == ".epub":
                        extractor = UnstructuredEpubExtractor(file_path, unstructured_api_url, unstructured_api_key)
                    else:
                        # txt
                        extractor = TextExtractor(file_path, autodetect_encoding=True)
                else:
                    if file_extension in {".xlsx", ".xls"}:
                        extractor = ExcelExtractor(file_path)
                    elif file_extension == ".pdf":
                        extractor = PdfExtractor(file_path)
                    elif file_extension in {".md", ".markdown", ".mdx"}:
                        extractor = MarkdownExtractor(file_path, autodetect_encoding=True)
                    elif file_extension in {".htm", ".html"}:
                        extractor = HtmlExtractor(file_path)
                    elif file_extension == ".docx":
                        extractor = WordExtractor(file_path, upload_file.tenant_id, upload_file.created_by)
                    elif file_extension == ".csv":
                        extractor = CSVExtractor(file_path, autodetect_encoding=True)
                    elif file_extension == ".epub":
                        extractor = UnstructuredEpubExtractor(file_path)
                    else:
                        # txt
                        extractor = TextExtractor(file_path, autodetect_encoding=True)
                return extractor.extract()
        elif extract_setting.datasource_type == DatasourceType.NOTION.value:
            assert extract_setting.notion_info is not None, "notion_info is required"
            extractor = NotionExtractor(
                notion_workspace_id=extract_setting.notion_info.notion_workspace_id,
                notion_obj_id=extract_setting.notion_info.notion_obj_id,
                notion_page_type=extract_setting.notion_info.notion_page_type,
                document_model=extract_setting.notion_info.document,
                tenant_id=extract_setting.notion_info.tenant_id,
            )
            return extractor.extract()
        elif extract_setting.datasource_type == DatasourceType.WEBSITE.value:
            assert extract_setting.website_info is not None, "website_info is required"
            if extract_setting.website_info.provider == "firecrawl":
                extractor = FirecrawlWebExtractor(
                    url=extract_setting.website_info.url,
                    job_id=extract_setting.website_info.job_id,
                    tenant_id=extract_setting.website_info.tenant_id,
                    mode=extract_setting.website_info.mode,
                    only_main_content=extract_setting.website_info.only_main_content,
                )
                return extractor.extract()
            elif extract_setting.website_info.provider == "watercrawl":
                extractor = WaterCrawlWebExtractor(
                    url=extract_setting.website_info.url,
                    job_id=extract_setting.website_info.job_id,
                    tenant_id=extract_setting.website_info.tenant_id,
                    mode=extract_setting.website_info.mode,
                    only_main_content=extract_setting.website_info.only_main_content,
                )
                return extractor.extract()
            elif extract_setting.website_info.provider == "jinareader":
                extractor = JinaReaderWebExtractor(
                    url=extract_setting.website_info.url,
                    job_id=extract_setting.website_info.job_id,
                    tenant_id=extract_setting.website_info.tenant_id,
                    mode=extract_setting.website_info.mode,
                    only_main_content=extract_setting.website_info.only_main_content,
                )
                return extractor.extract()
            else:
                raise ValueError(f"Unsupported website provider: {extract_setting.website_info.provider}")
        else:
            raise ValueError(f"Unsupported datasource type: {extract_setting.datasource_type}")

深度集成到Dify的RAG系统和工作流系统中,为知识库构建和文档处理提供了强大的基础能力。


网站公告

今日签到

点亮在社区的每一天
去签到