血缘元数据采集开放标准:OpenLineage Integrations Apache Airflow Preflight Check DAG & Class

发布于:2025-08-29 ⋅ 阅读:(16) ⋅ 点赞:(0)

OpenLineage

OpenLineage 是一个用于元数据和血缘采集的开放标准,专为在作业运行时动态采集数据而设计。它通过统一的命名策略定义了由作业(Job)、运行实例(Run)和数据集(Dataset) 组成的通用模型,并通过可扩展的Facets机制对这些实体进行元数据增强。
该项目是 LF AI & Data 基金会的毕业级项目,处于活跃开发阶段,欢迎社区贡献。

预检 DAG

Preflight Check DAG

目的

预检 DAG 用于验证 Airflow 环境中 OpenLineage 的安装与配置。它会检查 Airflow 版本、已安装的 OpenLineage 包版本以及 OpenLineage 监听器读取到的配置项。完成 OpenLineage 与 Airflow 的集成并设置必要的环境变量后,用户通常需要确认配置无误,才能开始接收 OL 事件,因此该验证至关重要。

配置变量

DAG 提供了两个可由用户按需设置的可配置变量:

  • BYPASS_LATEST_VERSION_CHECK:设为 True 可跳过 OpenLineage 最新版本检查。适用于无法访问 PyPI URL 或用户不打算升级的场景。
  • LINEAGE_BACKEND:指定用于接收 OpenLineage 事件的后端,默认值为 MARQUEZ。若使用自定义后端,请在 _verify_custom_backend 函数中实现自定义检查逻辑。

实现

DAG 由多个关键函数组成,分别执行特定验证:

  1. 版本检查:根据 BYPASS_LATEST_VERSION_CHECK 标志,验证已安装的 OpenLineage 包是否为 PyPI 最新版。
  2. Airflow 版本兼容性:确保 Airflow 版本与 OpenLineage 兼容,OpenLineage 要求 Airflow ≥ 2.1。
  3. 传输层与配置验证:检查 OpenLineage 与指定后端通信所需的传输层设置及配置是否就绪。
  4. 后端连通性:验证与指定 LINEAGE_BACKEND 的连接,确保 OpenLineage 能够成功发送事件。
  5. 监听器可用性及插件检查:确保 OpenLineage 监听器可访问,且 OpenLineage 未被 环境变量配置 禁用。

DAG 任务

DAG 定义了三个串行执行的主要任务:

  1. validate_ol_installation:确认 OpenLineage 安装正确且为最新。
  2. is_ol_accessible_and_enabled:检查 OpenLineage 在 Airflow 中是否可访问并已启用。
  3. validate_connection:验证与指定血缘后端的连接。

部署与执行

使用步骤:

  1. 确保已在 Airflow 环境中安装 OpenLineage。
  2. 设置必要的环境变量,如命名空间、URL 或传输机制,参考 provider 包文档OL 文档
  3. 按需配置 BYPASS_LATEST_VERSION_CHECKLINEAGE_BACKEND
  4. 将 DAG 文件放入 Airflow DAGs 目录。
  5. 手动触发 DAG,或直接启用并等待其按调度(@once)自动运行一次以完成预检。

预检 DAG 代码

from __future__ import annotations

import logging
import os
import attr

from packaging.version import Version

from airflow import DAG
from airflow.configuration import conf
from airflow import __version__ as airflow_version
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# 设为 True 可跳过 OpenLineage 包最新版本检查。
# 若无法访问 PyPI URL 则自动跳过版本检查
BYPASS_LATEST_VERSION_CHECK = False
# 若使用其他后端接收 OpenLineage 事件,请改为 `CUSTOM`
# 使用自定义传输层时,请在 _verify_custom_backend 函数中实现自定义检查
LINEAGE_BACKEND = "MARQUEZ"

log = logging.getLogger(__name__)


def _get_latest_package_version(library_name: str) -> Version | None:
    try:
        import requests

        response = requests.get(f"https://pypi.org/pypi/{library_name}/json")
        response.raise_for_status()
        version_string = response.json()["info"]["version"]
        return Version(version_string)
    except Exception as e:
        log.error(f"Failed to fetch latest version for `{library_name}` from PyPI: {e}")
        return None


def _get_installed_package_version(library_name) -> Version | None:
    try:
        from importlib.metadata import version

        return Version(version(library_name))
    except Exception as e:
        raise ModuleNotFoundError(f"`{library_name}` is not installed") from e


def _provider_can_be_used() -> bool:
    parsed_version = Version(airflow_version)
    if parsed_version < Version("2.5"):
        raise RuntimeError("OpenLineage is not supported in Airflow versions <2.5")
    elif parsed_version >= Version("2.7"):
        return True
    return False


def validate_ol_installation() -> None:
    library_name = "openlineage-airflow"
    if _provider_can_be_used():
        library_name = "apache-airflow-providers-openlineage"
        library_version = _get_installed_package_version(library_name)
        
        if Version(airflow_version) >= Version("2.9.0") and library_version < Version("2.0.0"):
            raise ValueError(
                f"Airflow version `{airflow_version}` requires `{library_name}` version >=2.0.0. "
                f"Installed version: `{library_version}` "
                f"Please upgrade the package using `pip install --upgrade {library_name}`"
            )

        elif Version(airflow_version) >= Version("2.8.0") and library_version < Version("1.11.0"):
                raise ValueError(
                    f"Airflow version `{airflow_version}` requires `{library_name}` version >=1.11.0. "
                    f"Installed version: `{library_version}` "
                    f"Please upgrade the package using `pip install --upgrade {library_name}`"
                    )

        if BYPASS_LATEST_VERSION_CHECK:
            log.info(f"Bypassing the latest version check for `{library_name}`")
            return

        latest_version = _get_latest_package_version(library_name)
        if latest_version is None:
            log.warning(f"Failed to fetch the latest version for `{library_name}`. Skipping version check.")
            return

        if library_version < latest_version:
            raise ValueError(
                f"`{library_name}` is out of date. "
                f"Installed version: `{library_version}`, "
                f"Required version: `{latest_version}`"
                f"Please upgrade the package using `pip install --upgrade {library_name}` or set BYPASS_LATEST_VERSION_CHECK to True"
            )

    else:
        library_version = _get_installed_package_version(library_name)
        if Version(airflow_version) < Version("1.11.0"):
            raise ValueError(
                f"Airflow version `{airflow_version}` is no longer supported as of October 2022. "
                f"Consider upgrading to a more recent version of Airflow. " 
                f"If upgrading to Airflow >=2.7.0, use the OpenLineage Airflow Provider. "
                )


def _is_transport_set() -> None:
    transport = conf.get("openlineage", "transport", fallback="")
    if transport:
        raise ValueError(
            "Transport value found: `%s`\n"
            "Please check the format at "
            "https://openlineage.io/docs/client/python/#built-in-transport-types",
            transport,
        )
    log.info("Airflow OL transport is not set.")
    return


def _is_config_set(provider: bool = True) -> None:
    if provider:
        config_path = conf.get("openlineage", "config_path", fallback="")
    else:
        config_path = os.getenv("OPENLINEAGE_CONFIG", "")

    if config_path and not _check_openlineage_yml(config_path):
        raise ValueError(
            "Config file is empty or does not exist: `%s`",
            config_path,
        )

    log.info("OL config is not set.")
    return


def _check_openlineage_yml(file_path) -> bool:
    file_path = os.path.expanduser(file_path)
    if os.path.exists(file_path):
        with open(file_path, "r") as file:
            content = file.read()
        if not content:
            raise ValueError(f"Empty file: `{file_path}`")
        raise ValueError(
                f"File found at `{file_path}` with the following content: `{content}`. "
                "Make sure there the configuration is correct."
            )
    log.info("File not found: `%s`", file_path)
    return False


def _check_http_env_vars() -> None:
    from urllib.parse import urljoin

    final_url = urljoin(os.getenv("OPENLINEAGE_URL", ""), os.getenv("OPENLINEAGE_ENDPOINT"))
    if final_url:
        raise ValueError("OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are set to: %s", final_url)
    else:
        log.info(
            "OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are not set. "
            "Please set up OpenLineage using documentation at "
            "https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
        )

    transport_var = os.getenv("AIRFLOW__OPENLINEAGE__TRANSPORT", "")
    if transport_var:
        log.info("AIRFLOW__OPENLINEAGE__TRANSPORT is set to: %s", transport_var)
    else:
        log.info("AIRFLOW__OPENLINEAGE__TRANSPORT variable is not set.")

    return


def _debug_missing_transport():
    if _provider_can_be_used():
        _is_config_set(provider=True)
        _is_transport_set()
    _is_config_set(provider=False)
    _check_openlineage_yml("openlineage.yml")
    _check_openlineage_yml("~/.openlineage/openlineage.yml")
    _check_http_env_vars()
    raise ValueError("OpenLineage is missing configuration, please refer to the OL setup docs.")


def _is_listener_accessible():
    if _provider_can_be_used():
        try:
            from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin as plugin
        except ImportError as e:
            raise ValueError("OpenLineage provider is not accessible") from e
    else:
        try:
            from openlineage.airflow.plugin import OpenLineagePlugin as plugin
        except ImportError as e:
            raise ValueError("OpenLineage is not accessible") from e

    if len(plugin.listeners) == 1:
        return True

    return False


def _is_ol_disabled():
    if _provider_can_be_used():
        try:
            # apache-airflow-providers-openlineage >= 1.7.0
            from airflow.providers.openlineage.conf import is_disabled
        except ImportError:
            # apache-airflow-providers-openlineage < 1.7.0
            from airflow.providers.openlineage.plugins.openlineage import _is_disabled as is_disabled
    else:
        from openlineage.airflow.plugin import _is_disabled as is_disabled

    if is_disabled():
        if _provider_can_be_used() and conf.getboolean("openlineage", "disabled", fallback=False):
            raise ValueError("OpenLineage is disabled in airflow.cfg: openlineage.disabled")
        elif os.getenv("OPENLINEAGE_DISABLED", "false").lower() == "true":
            raise ValueError(
                "OpenLineage is disabled due to the environment variable OPENLINEAGE_DISABLED"
            )
        raise ValueError(
            "OpenLineage is disabled because required config/env variables are not set. "
            "Please refer to "
            "https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
        )
    return False


def _get_transport():
    if _provider_can_be_used():
        from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin
        transport = OpenLineageProviderPlugin().listeners[0].adapter.get_or_create_openlineage_client().transport
    else:
        from openlineage.airflow.plugin import OpenLineagePlugin
        transport = (
            OpenLineagePlugin.listeners[0].adapter.get_or_create_openlineage_client().transport
        )
    return transport

def is_ol_accessible_and_enabled():
    if not _is_listener_accessible():
        _is_ol_disabled()

    try:
        transport = _get_transport()
    except Exception as e:
        raise ValueError("There was an error when trying to build transport.") from e

    if transport is None or transport.kind in ("noop", "console"):
        _debug_missing_transport()


def validate_connection():
    transport = _get_transport()
    config = attr.asdict(transport.config)
    verify_backend(LINEAGE_BACKEND, config)


def verify_backend(backend_type: str, config: dict):
    backend_type = backend_type.lower()
    if backend_type == "marquez":
        return _verify_marquez_http_backend(config)
    elif backend_type == "atlan":
        return _verify_atlan_http_backend(config)
    elif backend_type == "custom":
        return _verify_custom_backend(config)
    raise ValueError(f"Unsupported backend type: {backend_type}")


def _verify_marquez_http_backend(config):
    log.info("Checking Marquez setup")
    ol_url = config["url"]
    ol_endpoint = config["endpoint"]  # "api/v1/lineage"
    marquez_prefix_path = ol_endpoint[: ol_endpoint.rfind("/") + 1]  # "api/v1/"
    list_namespace_url = ol_url + "/" + marquez_prefix_path + "namespaces"
    import requests

    try:
        response = requests.get(list_namespace_url)
        response.raise_for_status()
    except Exception as e:
        raise ConnectionError(f"Failed to connect to Marquez at `{list_namespace_url}`") from e
    log.info("Airflow is able to access the URL")


def _verify_atlan_http_backend(config):
    raise NotImplementedError("This feature is not implemented yet")


def _verify_custom_backend(config):
    raise NotImplementedError("This feature is not implemented yet")


with DAG(
    dag_id="openlineage_preflight_check_dag",
    start_date=days_ago(1),
    description="A DAG to check OpenLineage setup and configurations",
    schedule_interval="@once",
) as dag:
    validate_ol_installation_task = PythonOperator(
        task_id="validate_ol_installation",
        python_callable=validate_ol_installation,
    )

    is_ol_accessible_and_enabled_task = PythonOperator(
        task_id="is_ol_accessible_and_enabled",
        python_callable=is_ol_accessible_and_enabled,
    )

    validate_connection_task = PythonOperator(
        task_id="validate_connection",
        python_callable=validate_connection,
    )

    validate_ol_installation_task >> is_ol_accessible_and_enabled_task
    is_ol_accessible_and_enabled_task >> validate_connection_task

结论

OpenLineage 预检 DAG 是验证 Airflow 中 OpenLineage 安装与配置正确性的关键工具。按照本文档的说明和配置操作,用户即可放心完成设置,并开始利用 OpenLineage 在 Airflow 工作流中监控与管理数据血缘。

预检检查类

Preflight Check Class

目的

在某些情况下,您可能希望在无需启动 Airflow 服务或触发流水线的情况下,验证您在 Airflow 中的 OpenLineage 设置。或者,您可能希望在任务而非 DAG 内部验证 OpenLineage。在这些情况下,可以使用此 Python 类替代 Preflight Check DAG,该类即基于此 DAG。

预检检查类代码

import logging
import os
import attr

from packaging.version import Version
from airflow.configuration import conf

log = logging.getLogger(__name__)

BYPASS_LATEST_VERSION_CHECK: bool = False
LINEAGE_BACKEND: str = "MARQUEZ"

class CheckOpenLineage:
    """
    CheckOpenLineage 类用于在 Apache Airflow 环境中验证 OpenLineage 的设置。
    它会检查 Airflow 版本、已安装的 OpenLineage 包版本以及 OpenLineage 监听器读取的配置设置。
    这种验证至关重要,因为在使用 Airflow 完成 OpenLineage 设置并配置必要的环境变量后,
    用户需要确认其 OpenLineage 消费者将开始接收 OpenLineage 事件。

    该类基于 OpenLineage 文档中的预检检查 DAG:
    https://openlineage.io/docs/integrations/airflow/preflight-check-dag。
    """    

    def _get_latest_package_version(self, library_name: str) -> Version | None:
        """
        从 PyPI.org API 获取 Apache Airflow OpenLineage Provider 包的最新可用版本。
        """
        try:
            import requests

            response = requests.get(f"https://pypi.org/pypi/{library_name}/json")
            response.raise_for_status()
            version_string = response.json()["info"]["version"]
            return Version(version_string)
        except Exception as e:
            log.error(
                f"Failed to fetch latest version for `{library_name}` from PyPI: {e}"
                )
            return None


    def _get_installed_package_version(self, library_name) -> Version | None:
        """
        获取本地安装的 Apache Airflow OpenLineage Provider 的版本。
        """
        try:
            from importlib.metadata import version

            version = Version(version(library_name)) 
            log.info(f"Installed {library_name} version is {version}.")
            return version
        except Exception as e:
            raise ModuleNotFoundError(
                f"`{library_name}` is not installed"
                ) from e


    def _provider_can_be_used(self) -> [bool, str]:
        """
        获取本地安装的 Apache Airflow 实例的版本,以确定是否可以使用 Apache Airflow OpenLineage Provider。
        """
        import subprocess
        
        app_name = "airflow"
        version_flag = "version"
        process = subprocess.run(
            [app_name, version_flag], 
            capture_output=True, 
            text=True, 
            check=True
            )
        version_output = process.stdout.strip()
        parsed_version = Version(version_output)
        if parsed_version < Version("2.5"):
            raise RuntimeError(
                "OpenLineage is not supported in Airflow versions <2.5"
                )
        elif parsed_version >= Version("2.7"):
            log.info("OpenLineage Provider can be used.")
            return True, version_output
        return False, version_output


    def validate_ol_installation(self) -> None:
        """
        通过验证 OpenLineage 集成与本地安装的 Apache Airflow 之间的兼容性来验证 OpenLineage 安装。
        """
        library_name = "openlineage-airflow"
        provider_status = self._provider_can_be_used()
        if provider_status[0]:
            library_name = "apache-airflow-providers-openlineage"
            library_version = self._get_installed_package_version(library_name)

            if Version(provider_status[1]) >= Version("2.9.0") and library_version < Version("2.0.0"):
                raise ValueError(
                    f"Airflow version `{provider_status[1]}` requires `{library_name}` version >=2.0.0. "
                    f"Installed version: `{library_version}` "
                    f"Please upgrade the package using `pip install --upgrade {library_name}`"
                    )
            elif Version(provider_status[1]) >= Version("2.8.0") and library_version < Version("1.11.0"):
                raise ValueError(
                    f"Airflow version `{provider_status[1]}` requires `{library_name}` version >=1.11.0. "
                    f"Installed version: `{library_version}` "
                    f"Please upgrade the package using `pip install --upgrade {library_name}`"
                    )

            if BYPASS_LATEST_VERSION_CHECK:
                log.info(f"Bypassing the latest version check for `{library_name}`")
                return

            latest_version = self._get_latest_package_version(library_name)
            if latest_version is None:
                log.warning(f"Failed to fetch the latest version for `{library_name}`. Skipping version check.")
                return

            if library_version < latest_version:
                raise ValueError(
                    f"`{library_name}` is out of date. "
                    f"Installed version: `{library_version}`, "
                    f"Required version: `{latest_version}`"
                    f"Please upgrade the package using `pip install --upgrade {library_name}` or set BYPASS_LATEST_VERSION_CHECK to True"
                    )

        else:
            library_version = self._get_installed_package_version(library_name)
            if Version(provider_status[1]) < Version("1.11.0"):
                raise ValueError(
                    f"Airflow version `{provider_status[1]}` is no longer supported as of October 2022. "
                    f"Consider upgrading to a more recent version of Airflow. " 
                    f"If upgrading to Airflow >=2.7.0, use the OpenLineage Airflow Provider. "
                    )

    def _is_transport_set(self) -> None:
        """检查是否已设置 OpenLineage 传输方式。"""
        transport = conf.get("openlineage", "transport", fallback="")
        log.info(f"Transport: {transport}")
        if transport:
            raise ValueError(
                "Transport value found: `%s`\n"
                "Please check the format at "
                "https://openlineage.io/docs/client/python/#built-in-transport-types",
                transport,
                )
        log.info("Airflow OpenLineage transport is not set.")
        return


    def _is_config_set(self, provider: bool = True) -> None:
        """检查是否存在 OpenLineage 配置。"""
        if provider:
            config_path = conf.get("openlineage", "config_path", fallback="")
        else:
            config_path = os.getenv("OPENLINEAGE_CONFIG", "")

        log.info("OpenLineage config is not set.")
        return

    def _check_openlineage_yml(self, file_path: str) -> bool:
        file_path = os.path.expanduser(file_path)
        if os.path.exists(file_path):
            with open(file_path, "r") as file:
                content = file.read()
            if not content:
                raise ValueError(f"Empty file: `{file_path}`")
            raise ValueError(
                    f"File found at `{file_path}` with the following content: `{content}`. "
                    "Make sure there the configuration is correct."
                )
        log.info("File not found: `%s`", file_path)
        return False

    def _check_http_env_vars(self) -> None:
        """
        检查环境中是否存在 OpenLineage URL 和端点环境变量。
        """
        from urllib.parse import urljoin

        final_url = urljoin(os.getenv("OPENLINEAGE_URL"), os.getenv("OPENLINEAGE_ENDPOINT"))
        if final_url:
            log.info("OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are set to: %s", final_url)
        else:
            raise ValueError(
                "OPENLINEAGE_URL and OPENLINEAGE_ENDPOINT are not set. "
                "Please set up OpenLineage using documentation at "
                "https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
                )

        transport_var = os.getenv("AIRFLOW__OPENLINEAGE__TRANSPORT", "")
        if transport_var:
            log.info("AIRFLOW__OPENLINEAGE__TRANSPORT is set to: %s", transport_var)
        else:
            log.info("AIRFLOW__OPENLINEAGE__TRANSPORT variable is not set.")

        return


    def _debug_missing_transport(self):
        """调试缺失的传输方式。"""
        if self._provider_can_be_used():
            self._is_config_set(provider=True)
            self._is_transport_set()
        self._is_config_set(provider=False)
        self._check_openlineage_yml("openlineage.yml")
        self._check_openlineage_yml("~/.openlineage/openlineage.yml")
        self._check_http_env_vars()
        raise ValueError(
            "OpenLineage is missing configuration, please refer to the OL setup docs."
            )


    def _is_listener_accessible(self):
        """检查是否可以访问 OpenLineage 监听器。"""
        if self._provider_can_be_used():
            try:
                from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin as plugin
            except ImportError as e:
                raise ValueError("OpenLineage provider is not accessible") from e
        else:
            try:
                from openlineage.airflow.plugin import OpenLineagePlugin as plugin
            except ImportError as e:
                raise ValueError("OpenLineage is not accessible") from e

        if len(plugin.listeners) == 1:
            return True

        return False


    def _is_ol_disabled(self):
        """
        确认 OpenLineage 未被禁用,并检查配置以建议修复方法。
        """
        if self._provider_can_be_used():
            try:
                # apache-airflow-providers-openlineage >= 1.7.0
                from airflow.providers.openlineage.conf import is_disabled
            except ImportError:
                # apache-airflow-providers-openlineage < 1.7.0
                from airflow.providers.openlineage.plugins.openlineage import _is_disabled as is_disabled
        else:
            from openlineage.airflow.plugin import _is_disabled as is_disabled

        if is_disabled():
            if self._provider_can_be_used() and conf.getboolean("openlineage", "disabled", fallback=False):
                raise ValueError("OpenLineage is disabled in airflow.cfg: openlineage.disabled")
            elif os.getenv("OPENLINEAGE_DISABLED", "false").lower() == "true":
                raise ValueError(
                    "OpenLineage is disabled due to the environment variable OPENLINEAGE_DISABLED"
                    )
            raise ValueError(
                "OpenLineage is disabled because required config/env variables are not set. "
                "Please refer to "
                "https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html"
                )
        log.info("OpenLineage is not disabled.")
        return False


    def _get_transport(self):
        """从 OpenLineage 插件获取配置的传输方式。"""
        if self._provider_can_be_used():
            from airflow.providers.openlineage.plugins.openlineage import OpenLineageProviderPlugin

            transport = OpenLineageProviderPlugin().listeners[0].adapter.get_or_create_openlineage_client().transport
        else:
            from openlineage.airflow.plugin import OpenLineagePlugin

            transport = (
                OpenLineagePlugin.listeners[0].adapter.get_or_create_openlineage_client().transport
                )
        return transport

    def is_ol_accessible_and_enabled(self):
        """
        通过尝试构建传输方式来确认 OpenLineage 可访问且已启用。
        """
        if not self._is_listener_accessible():
            self._is_ol_disabled()

        try:
            transport = self._get_transport()
        except Exception as e:
            raise ValueError("There was an error when trying to build transport.") from e

        if transport is None or transport.kind in ("noop", "console"):
            self._debug_missing_transport()


    def validate_connection(self):
        """验证与血缘后端的连接。"""
        transport = self._get_transport()
        config = attr.asdict(transport.config)
        self._verify_backend(LINEAGE_BACKEND, config)


    def _verify_backend(self, backend_type: str, config: dict):
        """验证血缘后端。"""
        backend_type = backend_type.lower()
        if backend_type == "marquez":
            log.info("Backend type: Marquez")
            return
        elif backend_type == "atlan":
            log.info("Backend type: Atlan")
            return self._verify_atlan_http_backend(config)
        elif backend_type == "custom":
            log.info("Backend type: custom")
            return self._verify_custom_backend(config)
        raise ValueError(f"Unsupported backend type: {backend_type}")


    def _verify_atlan_http_backend(self, config):
        raise NotImplementedError("This feature is not implemented yet")


    def _verify_custom_backend(self, config):
        raise NotImplementedError("This feature is not implemented yet")

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。