Airflow量化入门系列:第一章 Apache Airflow 基础

发布于:2025-04-03 ⋅ 阅读:(28) ⋅ 点赞:(0)

Airflow量化入门系列:第一章 Apache Airflow 基础

本教程系统性地讲解了 Apache Airflow 在 A 股量化交易中的应用,覆盖从基础安装到高级功能的完整知识体系。通过六章内容,读者将掌握 Airflow 的核心概念、任务调度、数据处理、技术指标计算、策略回测及工作流监控等关键技能。教程整合 Tushare 数据源、TA-Lib 技术指标库和 VectorBT 策略回测工具,提供丰富实战案例,帮助构建高效、可靠的工作流,助力量化交易实践。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

AirFlow

学习对象

  • 中高级水平的开发者
  • 具备 Python 编程基础,熟悉基本的 ETL 流程和数据分析工具
  • 希望掌握 Airflow 在量化交易场景中的应用

教程目标

  • 系统掌握 Apache Airflow 的核心功能与高级特性
  • 深入理解 Airflow 在 A 股量化交易工作流中的应用
  • 能够独立设计、部署和维护复杂的量化交易工作流

教程目录

第一章:Apache Airflow 基础

1.1 Airflow 简介与安装
1.2 Airflow 核心概念
1.3 Airflow Web UI 使用与管理
1.4 Airflow 配置与环境搭建

第二章:Airflow 任务调度与依赖管理

2.1 DAG 定义与任务依赖关系
2.2 Task 类型
2.3 任务调度策略与优先级
2.4 动态任务生成与条件分支

第三章:A 股市场数据处理与存储

3.1 使用 Tushare 获取 A 股数据
3.2 数据清洗与预处理
3.3 Parquet 文件存储与读取
3.4 数据存储优化与性能提升

第四章:技术指标计算与策略实现

4.1 TA-Lib 基础与常用技术指标
4.2 技术指标计算流程设计
4.3 策略回测与评估(使用 Vector BT)
4.4 策略优化与参数调整

第五章:Airflow 高级功能与最佳实践

5.1 Airflow 与 Docker 集成
5.2 Airflow 任务监控与日志分析
5.3 Airflow 任务容错与重试机制
5.4 Airflow 任务性能优化

第六章:完整案例:A 股量化交易工作流

6.1 数据获取与存储工作流
6.2 技术指标计算工作流
6.3 策略回测与结果存储工作流
6.4 工作流监控与维护

第一章 Apache Airflow 基础

1.1 Airflow 简介与安装

理论详解

Apache Airflow 是一个开源的工作流管理平台,用于编写、调度和监控复杂的工作流。它具有以下核心功能:

  • 任务调度:支持复杂的任务依赖关系和调度策略。
  • 任务监控:通过 Web UI 实时监控任务状态和日志。
  • 可扩展性:支持多种任务类型(如 Bash、Python、HTTP 等)和多种存储后端(如 SQLite、MySQL、PostgreSQL)。

应用场景

  • 数据管道管理
  • ETL 流程自动化
  • 量化交易策略执行与监控

基础架构:Basic Airflow Architecture1

Basic Airflow Architecture

  • Metadata Database:Airflow 使用 SQL 数据库来存储有关正在运行的数据管道的元数据。在上图中,使用 Postgres数据库,它在 Airflow 中非常流行。Airflow 默认数据库为 SQLite。
  • Webserver:Airflow Web 服务器和调度器是单独的进程,在本例中运行在本地机器上,并与上面提到的数据库交互。
  • Executor:在图中,执行器被单独列出,因为它是 Airflow 中经常被讨论的内容,也是教程中提到的关键部分。但实际上,执行器并不是一个单独的进程,而是运行在调度器内部。
  • Worker(s):工作进程是单独的进程,它们也与其他 Airflow 架构组件以及元数据存储库进行交互。
  • DAG(s):包含 Python 代码的文件,用于表示 Airflow 要运行的数据管道。这些文件的位置在 Airflow 配置文件中指定,但它们需要能够被 Web Server、Scheduler 和 Workers 访问。
  • airflow.cfg 是 Airflow 配置文件,由 Web Server、Scheduler和 Worker 访问。

优势

  • 灵活性:支持多种任务类型和调度策略。
  • 可扩展性:支持分布式执行和多种存储后端。
  • 易用性:通过 Web UI 直观管理任务。

实战示例

安装 Airflow

# 安装 Airflow
pip install apache-airflow

# 初始化 Airflow 数据库,默认:SQLite
airflow db init

# 创建默认用户
airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin

# 启动 Airflow Web Server,默认端口:8080
airflow webserver 

# 启动 Airflow Scheduler
airflow scheduler

1.2 Airflow 核心概念

理论详解

DAG(Directed Acyclic Graph)

  • 任务流程的定义,表示任务之间的依赖关系。
  • 无环图,确保任务可以按顺序执行。

Task

  • 任务单元,可以是 Bash、Python、HTTP 等类型。
  • 任务可以设置优先级、重试机制等。

Operator

  • 任务的执行逻辑。
  • 常见的 Operator 包括:
    • BashOperator:执行 Bash 命令。
    • PythonOperator:执行 Python 函数。
    • BranchOperator:根据条件分支任务。

Executor

  • 任务调度机制。
  • 常见的 Executor 包括:
    • SequentialExecutor:默认执行器,单进程顺序执行任务,适用于调试和开发环境。
    • LocalExecutor:所有任务都在同一个进程中运行,适用于开发和测试环境。
    • CeleryExecutor:使用 Celery 作为任务队列,支持多台机器上的分布式任务执行,多用于生产场景,使用时需要配置消息队列。
    • KubernetesExecutor:在 Kubernetes 集群中动态创建 Pod 来执行任务,每个任务在一个独立的 Pod 中运行,需要配置 Kubernetes API 和相关资源。
    • DaskExecutor:使用 Dask 分布式计算库,支持并行和分布式计算,适用于数据密集型任务。
    • LocalKubernetesExecutor:对于本地任务使用 LocalExecutor,对于 Kubernetes 任务使用 KubernetesExecutor,适用于混合环境,既可以在本地运行一些任务,又可以在 Kubernetes 上运行其他任务。

任务实例 Task Instances2

任务的一个实例是该任务在特定 DAG(以及特定数据时间间隔)中的一次具体运行。任务实例也是具有状态的任务的表示,反映了它所处的生命周期阶段。

Task Lifecycle Diagram

任务实例的可能状态包括:

  • none:任务尚未被排队执行(其依赖条件尚未满足)。
  • scheduled:调度器已确定任务的依赖条件已满足,可以运行。
  • queued:任务已被分配给执行器,正在等待工作进程。
  • running:任务正在工作进程上运行(或在本地/同步执行器上运行)。
  • success:任务成功完成,没有错误。
  • restarting:任务在运行时被外部请求重新启动。
  • failed:任务在执行过程中出现错误,未能运行。
  • skipped:由于分支、LatestOnly 或类似原因,任务被跳过。
  • upstream_failed:上游任务失败,触发规则要求需要它。
  • up_for_retry:任务失败,但还有重试机会,将被重新调度。
  • up_for_reschedule:任务是一个处于重新调度模式的传感器。
  • deferred:任务已委托给触发器。
  • removed:任务在运行开始后从 DAG 中消失。

理想情况下,任务应从 none 状态开始,依次经过 scheduled、queued、running,最终达到 success 状态。

实战示例

设计一个简单的 DAG

创建一个 DAG,包含多个任务依赖关系。

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def task1(**kwargs):
    """
    任务 1:打印任务开始信息。

    :param kwargs: Airflow 任务参数
    :return: None
    """
    print("Task 1 started")


def task2(**kwargs):
    """
    任务 2:打印任务执行信息。

    :param kwargs: Airflow 任务参数
    :return: None
    """
    print("Task 2 executed")


def task3(**kwargs):
    """
    任务 3:打印任务结束信息。

    :param kwargs: Airflow 任务参数
    :return: None
    """
    print("Task 3 completed")


# 定义 DAG
with DAG(
    "simple_dag",
    description="A simple DAG with multiple tasks",
    schedule_interval=None,
    tags=["quant", "tutorial"],
) as dag:
    # 定义任务
    task1 = PythonOperator(task_id="task1", python_callable=task1, provide_context=True)

    task2 = PythonOperator(task_id="task2", python_callable=task2, provide_context=True)

    task3 = PythonOperator(task_id="task3", python_callable=task3, provide_context=True)

    # 设置任务依赖关系
    task1 >> task2 >> task3

运行 DAG

  1. 将 DAG 文件保存到 Airflow 的 dags 目录。
  2. 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。

1.3 Airflow Web UI 使用与管理

Airflow Web UI

理论详解

Airflow Web UI3 是一个直观的管理工具,支持以下功能:

  • DAGs:查看所有 DAG 的状态和执行历史。
  • Tree:查看 DAG 的任务依赖关系。
  • Graph:以图形化方式展示任务执行状态。
  • Gantt:以甘特图方式展示任务调度计划。
  • Logs:查看任务日志。

实战示例

通过 Web UI 触发 DAG

  1. 打开 Airflow Web UI(默认地址:http://localhost:8080)。
  2. 在 DAG 列表中找到 fetch_stock_data DAG。
  3. 点击 “Trigger DAG” 按钮,输入任务参数(如股票代码、日期范围)。
  4. 查看任务状态和日志。

查看任务日志

  1. 在 DAG 的 “Graph” 视图中找到任务实例。
  2. 点击任务实例,进入任务详情页面。
  3. 在 “Logs” 标签页中查看任务日志。

1.4 Airflow 配置与环境搭建

理论详解

Airflow 的配置文件 airflow.cfg 和环境变量用于管理 Airflow 的运行环境。常见的配置包括:

  1. 基本配置

    • [core] 部分:

      • dags_folder: 指定存放 DAG 文件的目录路径。
      • load_examples: 设置是否加载示例 DAG(默认为 True)。生产环境中建议设为 False。
      • executor: 定义使用的执行器类型,如 LocalExecutor, CeleryExecutor 或 KubernetesExecutor。
      • sql_alchemy_conn: 数据库连接字符串,指定 Airflow 使用哪个数据库来存储元数据。
    • [webserver] 部分:

      • web_server_host: Web 服务器监听的 IP 地址。
      • web_server_port: Web 服务器监听的端口号,默认为 8080。
      • base_url: 如果需要的话,可以设置 Web 界面的基础 URL。
    • [scheduler] 部分:

      • scheduler_heartbeat_sec: 调度器心跳间隔时间(秒)。
      • min_file_process_interval: 文件处理最小间隔时间(秒),防止频繁检查文件变化。
      • dag_dir_list_interval: DAG 目录列表刷新频率(秒)。
  2. 安全性相关配置

    • [webserver] 部分:

      • authenticate: 是否启用身份验证功能。
      • auth_backend: 自定义认证后端模块的位置。
      • secret_key: 用于加密会话的密钥。
    • [api] 部分:

      • auth_backend: API 认证后端的选择。
  3. 性能调优

    • [celery] 部分 (当使用 CeleryExecutor 时):

      • broker_url: 消息队列服务的地址。
      • result_backend: 结果存储的位置。
      • worker_concurrency: 工作者进程的数量。
    • [kubernetes] 部分 (当使用 KubernetesExecutor 时):

      • kube_config: Kubernetes 配置文件路径。
      • namespace: 运行任务所在的命名空间。
      • delete_worker_pods: 完成后是否删除 worker pods。
  4. 邮件通知

    • [smtp] 部分:
      • smtp_host, smtp_starttls, smtp_ssl, smtp_user, smtp_password, smtp_port, smtp_mail_from: 邮件发送相关的配置参数。
  5. 日志管理

    • [logging] 部分:
      • logging_level: 日志级别。
      • base_log_folder: 存放日志的基本目录。
      • remote_logging: 是否启用远程日志记录。
  6. 其他重要配置

    • [operators][hooks] 部分: 可以自定义特定操作符或钩子的行为。
    • 环境变量: 除了直接修改配置文件外,许多配置项也可以通过环境变量来设置,这在容器化部署中非常有用。

实战示例

配置 Airflow 使用 Tushare API

  1. airflow.cfg 文件中添加 Tushare API 配置:

    [core]
    load_examples = False
    [tushare]
    api_token = your_tushare_token
    data_folder = /data
    
  2. 在 DAG 中读取配置:

    from airflow.configuration import conf
    
    tushare_token = conf.get("tushare", "api_token")
    ts.set_token(tushare_token)
    

完整示例

import os
from datetime import datetime

import tushare as ts

from airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator

# 配置 Tushare API
tushare_token = conf.get("tushare", "api_token")
ts.set_token(tushare_token)
pro = ts.pro_api()


def fetch_stock_data(**kwargs):
    """
    获取特定股票的历史数据并存储为 Parquet 文件。

    :param kwargs: Airflow 任务参数
    :return: None
    """
    stock_code = kwargs["dag_run"].conf.get("stock_code", "000001.SZ")
    start_date = kwargs["dag_run"].conf.get("start_date", "20230101")
    end_date = kwargs["dag_run"].conf.get("end_date", "20241231")
    print(f"Fetch data for {stock_code} from {start_date} to {end_date}")

    # 获取股票数据
    df = pro.daily(ts_code=stock_code, start_date=start_date, end_date=end_date)

    # 数据存储目录
    data_dir = conf.get("tushare", "data_folder")
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    # 存储为 Parquet 文件
    file_path = os.path.join(data_dir, f"stock_data_{stock_code}.parquet")
    df.to_parquet(file_path, index=False)
    print(f"Data saved to {file_path}")


# 定义 DAG
with DAG(
    "fetch_stock_data",
    description="Fetch stock data from Tushare and save as Parquet",
    schedule_interval=None,
    start_date=datetime(2025, 1, 1),
    tags=["quant", "stock"],
) as dag:
    # 定义任务
    fetch_task = PythonOperator(
        task_id="fetch_stock_data",
        python_callable=fetch_stock_data,
        provide_context=True,
    )

# 设置任务依赖关系
fetch_task

运行 DAG

  1. 将 DAG 文件保存到 Airflow 的 dags 目录。
  2. 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。

总结

通过本章的学习,您已经掌握了 Apache Airflow 的基础概念和基本操作。在接下来的章节中,我们将深入探讨 Airflow 的任务调度、依赖管理以及在 A 股量化交易中的应用。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。


  1. https://airflow.apache.org/docs/apache-airflow/2.0.1/concepts.html ↩︎

  2. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html ↩︎

  3. https://airflow.apache.org/docs/apache-airflow/stable/ui.html ↩︎