Airflow量化入门系列:第一章 Apache Airflow 基础
本教程系统性地讲解了 Apache Airflow 在 A 股量化交易中的应用,覆盖从基础安装到高级功能的完整知识体系。通过六章内容,读者将掌握 Airflow 的核心概念、任务调度、数据处理、技术指标计算、策略回测及工作流监控等关键技能。教程整合 Tushare 数据源、TA-Lib 技术指标库和 VectorBT 策略回测工具,提供丰富实战案例,帮助构建高效、可靠的工作流,助力量化交易实践。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。
学习对象
- 中高级水平的开发者
- 具备 Python 编程基础,熟悉基本的 ETL 流程和数据分析工具
- 希望掌握 Airflow 在量化交易场景中的应用
教程目标
- 系统掌握 Apache Airflow 的核心功能与高级特性
- 深入理解 Airflow 在 A 股量化交易工作流中的应用
- 能够独立设计、部署和维护复杂的量化交易工作流
教程目录
第一章:Apache Airflow 基础
1.1 Airflow 简介与安装
1.2 Airflow 核心概念
1.3 Airflow Web UI 使用与管理
1.4 Airflow 配置与环境搭建
第二章:Airflow 任务调度与依赖管理
2.1 DAG 定义与任务依赖关系
2.2 Task 类型
2.3 任务调度策略与优先级
2.4 动态任务生成与条件分支
第三章:A 股市场数据处理与存储
3.1 使用 Tushare 获取 A 股数据
3.2 数据清洗与预处理
3.3 Parquet 文件存储与读取
3.4 数据存储优化与性能提升
第四章:技术指标计算与策略实现
4.1 TA-Lib 基础与常用技术指标
4.2 技术指标计算流程设计
4.3 策略回测与评估(使用 Vector BT)
4.4 策略优化与参数调整
第五章:Airflow 高级功能与最佳实践
5.1 Airflow 与 Docker 集成
5.2 Airflow 任务监控与日志分析
5.3 Airflow 任务容错与重试机制
5.4 Airflow 任务性能优化
第六章:完整案例:A 股量化交易工作流
6.1 数据获取与存储工作流
6.2 技术指标计算工作流
6.3 策略回测与结果存储工作流
6.4 工作流监控与维护
第一章 Apache Airflow 基础
1.1 Airflow 简介与安装
理论详解
Apache Airflow 是一个开源的工作流管理平台,用于编写、调度和监控复杂的工作流。它具有以下核心功能:
- 任务调度:支持复杂的任务依赖关系和调度策略。
- 任务监控:通过 Web UI 实时监控任务状态和日志。
- 可扩展性:支持多种任务类型(如 Bash、Python、HTTP 等)和多种存储后端(如 SQLite、MySQL、PostgreSQL)。
应用场景:
- 数据管道管理
- ETL 流程自动化
- 量化交易策略执行与监控
基础架构:Basic Airflow Architecture1
- Metadata Database:Airflow 使用 SQL 数据库来存储有关正在运行的数据管道的元数据。在上图中,使用 Postgres数据库,它在 Airflow 中非常流行。Airflow 默认数据库为 SQLite。
- Webserver:Airflow Web 服务器和调度器是单独的进程,在本例中运行在本地机器上,并与上面提到的数据库交互。
- Executor:在图中,执行器被单独列出,因为它是 Airflow 中经常被讨论的内容,也是教程中提到的关键部分。但实际上,执行器并不是一个单独的进程,而是运行在调度器内部。
- Worker(s):工作进程是单独的进程,它们也与其他 Airflow 架构组件以及元数据存储库进行交互。
- DAG(s):包含 Python 代码的文件,用于表示 Airflow 要运行的数据管道。这些文件的位置在 Airflow 配置文件中指定,但它们需要能够被 Web Server、Scheduler 和 Workers 访问。
- airflow.cfg 是 Airflow 配置文件,由 Web Server、Scheduler和 Worker 访问。
优势:
- 灵活性:支持多种任务类型和调度策略。
- 可扩展性:支持分布式执行和多种存储后端。
- 易用性:通过 Web UI 直观管理任务。
实战示例
安装 Airflow:
# 安装 Airflow
pip install apache-airflow
# 初始化 Airflow 数据库,默认:SQLite
airflow db init
# 创建默认用户
airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin
# 启动 Airflow Web Server,默认端口:8080
airflow webserver
# 启动 Airflow Scheduler
airflow scheduler
1.2 Airflow 核心概念
理论详解
DAG(Directed Acyclic Graph):
- 任务流程的定义,表示任务之间的依赖关系。
- 无环图,确保任务可以按顺序执行。
Task:
- 任务单元,可以是 Bash、Python、HTTP 等类型。
- 任务可以设置优先级、重试机制等。
Operator:
- 任务的执行逻辑。
- 常见的 Operator 包括:
- BashOperator:执行 Bash 命令。
- PythonOperator:执行 Python 函数。
- BranchOperator:根据条件分支任务。
Executor:
- 任务调度机制。
- 常见的 Executor 包括:
- SequentialExecutor:默认执行器,单进程顺序执行任务,适用于调试和开发环境。
- LocalExecutor:所有任务都在同一个进程中运行,适用于开发和测试环境。
- CeleryExecutor:使用 Celery 作为任务队列,支持多台机器上的分布式任务执行,多用于生产场景,使用时需要配置消息队列。
- KubernetesExecutor:在 Kubernetes 集群中动态创建 Pod 来执行任务,每个任务在一个独立的 Pod 中运行,需要配置 Kubernetes API 和相关资源。
- DaskExecutor:使用 Dask 分布式计算库,支持并行和分布式计算,适用于数据密集型任务。
- LocalKubernetesExecutor:对于本地任务使用 LocalExecutor,对于 Kubernetes 任务使用 KubernetesExecutor,适用于混合环境,既可以在本地运行一些任务,又可以在 Kubernetes 上运行其他任务。
任务实例 Task Instances2
任务的一个实例是该任务在特定 DAG(以及特定数据时间间隔)中的一次具体运行。任务实例也是具有状态的任务的表示,反映了它所处的生命周期阶段。
任务实例的可能状态包括:
- none:任务尚未被排队执行(其依赖条件尚未满足)。
- scheduled:调度器已确定任务的依赖条件已满足,可以运行。
- queued:任务已被分配给执行器,正在等待工作进程。
- running:任务正在工作进程上运行(或在本地/同步执行器上运行)。
- success:任务成功完成,没有错误。
- restarting:任务在运行时被外部请求重新启动。
- failed:任务在执行过程中出现错误,未能运行。
- skipped:由于分支、LatestOnly 或类似原因,任务被跳过。
- upstream_failed:上游任务失败,触发规则要求需要它。
- up_for_retry:任务失败,但还有重试机会,将被重新调度。
- up_for_reschedule:任务是一个处于重新调度模式的传感器。
- deferred:任务已委托给触发器。
- removed:任务在运行开始后从 DAG 中消失。
理想情况下,任务应从 none 状态开始,依次经过 scheduled、queued、running,最终达到 success 状态。
实战示例
设计一个简单的 DAG:
创建一个 DAG,包含多个任务依赖关系。
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def task1(**kwargs):
"""
任务 1:打印任务开始信息。
:param kwargs: Airflow 任务参数
:return: None
"""
print("Task 1 started")
def task2(**kwargs):
"""
任务 2:打印任务执行信息。
:param kwargs: Airflow 任务参数
:return: None
"""
print("Task 2 executed")
def task3(**kwargs):
"""
任务 3:打印任务结束信息。
:param kwargs: Airflow 任务参数
:return: None
"""
print("Task 3 completed")
# 定义 DAG
with DAG(
"simple_dag",
description="A simple DAG with multiple tasks",
schedule_interval=None,
tags=["quant", "tutorial"],
) as dag:
# 定义任务
task1 = PythonOperator(task_id="task1", python_callable=task1, provide_context=True)
task2 = PythonOperator(task_id="task2", python_callable=task2, provide_context=True)
task3 = PythonOperator(task_id="task3", python_callable=task3, provide_context=True)
# 设置任务依赖关系
task1 >> task2 >> task3
运行 DAG:
- 将 DAG 文件保存到 Airflow 的
dags
目录。 - 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。
1.3 Airflow Web UI 使用与管理
理论详解
Airflow Web UI3 是一个直观的管理工具,支持以下功能:
- DAGs:查看所有 DAG 的状态和执行历史。
- Tree:查看 DAG 的任务依赖关系。
- Graph:以图形化方式展示任务执行状态。
- Gantt:以甘特图方式展示任务调度计划。
- Logs:查看任务日志。
实战示例
通过 Web UI 触发 DAG:
- 打开 Airflow Web UI(默认地址:
http://localhost:8080
)。 - 在 DAG 列表中找到
fetch_stock_data
DAG。 - 点击 “Trigger DAG” 按钮,输入任务参数(如股票代码、日期范围)。
- 查看任务状态和日志。
查看任务日志:
- 在 DAG 的 “Graph” 视图中找到任务实例。
- 点击任务实例,进入任务详情页面。
- 在 “Logs” 标签页中查看任务日志。
1.4 Airflow 配置与环境搭建
理论详解
Airflow 的配置文件 airflow.cfg
和环境变量用于管理 Airflow 的运行环境。常见的配置包括:
基本配置
[core]
部分:dags_folder
: 指定存放 DAG 文件的目录路径。load_examples
: 设置是否加载示例 DAG(默认为 True)。生产环境中建议设为 False。executor
: 定义使用的执行器类型,如 LocalExecutor, CeleryExecutor 或 KubernetesExecutor。sql_alchemy_conn
: 数据库连接字符串,指定 Airflow 使用哪个数据库来存储元数据。
[webserver]
部分:web_server_host
: Web 服务器监听的 IP 地址。web_server_port
: Web 服务器监听的端口号,默认为 8080。base_url
: 如果需要的话,可以设置 Web 界面的基础 URL。
[scheduler]
部分:scheduler_heartbeat_sec
: 调度器心跳间隔时间(秒)。min_file_process_interval
: 文件处理最小间隔时间(秒),防止频繁检查文件变化。dag_dir_list_interval
: DAG 目录列表刷新频率(秒)。
安全性相关配置
[webserver]
部分:authenticate
: 是否启用身份验证功能。auth_backend
: 自定义认证后端模块的位置。secret_key
: 用于加密会话的密钥。
[api]
部分:auth_backend
: API 认证后端的选择。
性能调优
[celery]
部分 (当使用 CeleryExecutor 时):broker_url
: 消息队列服务的地址。result_backend
: 结果存储的位置。worker_concurrency
: 工作者进程的数量。
[kubernetes]
部分 (当使用 KubernetesExecutor 时):kube_config
: Kubernetes 配置文件路径。namespace
: 运行任务所在的命名空间。delete_worker_pods
: 完成后是否删除 worker pods。
邮件通知
[smtp]
部分:smtp_host
,smtp_starttls
,smtp_ssl
,smtp_user
,smtp_password
,smtp_port
,smtp_mail_from
: 邮件发送相关的配置参数。
日志管理
[logging]
部分:logging_level
: 日志级别。base_log_folder
: 存放日志的基本目录。remote_logging
: 是否启用远程日志记录。
其他重要配置
[operators]
和[hooks]
部分: 可以自定义特定操作符或钩子的行为。- 环境变量: 除了直接修改配置文件外,许多配置项也可以通过环境变量来设置,这在容器化部署中非常有用。
实战示例
配置 Airflow 使用 Tushare API:
在
airflow.cfg
文件中添加 Tushare API 配置:[core] load_examples = False [tushare] api_token = your_tushare_token data_folder = /data
在 DAG 中读取配置:
from airflow.configuration import conf tushare_token = conf.get("tushare", "api_token") ts.set_token(tushare_token)
完整示例:
import os
from datetime import datetime
import tushare as ts
from airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator
# 配置 Tushare API
tushare_token = conf.get("tushare", "api_token")
ts.set_token(tushare_token)
pro = ts.pro_api()
def fetch_stock_data(**kwargs):
"""
获取特定股票的历史数据并存储为 Parquet 文件。
:param kwargs: Airflow 任务参数
:return: None
"""
stock_code = kwargs["dag_run"].conf.get("stock_code", "000001.SZ")
start_date = kwargs["dag_run"].conf.get("start_date", "20230101")
end_date = kwargs["dag_run"].conf.get("end_date", "20241231")
print(f"Fetch data for {stock_code} from {start_date} to {end_date}")
# 获取股票数据
df = pro.daily(ts_code=stock_code, start_date=start_date, end_date=end_date)
# 数据存储目录
data_dir = conf.get("tushare", "data_folder")
if not os.path.exists(data_dir):
os.makedirs(data_dir)
# 存储为 Parquet 文件
file_path = os.path.join(data_dir, f"stock_data_{stock_code}.parquet")
df.to_parquet(file_path, index=False)
print(f"Data saved to {file_path}")
# 定义 DAG
with DAG(
"fetch_stock_data",
description="Fetch stock data from Tushare and save as Parquet",
schedule_interval=None,
start_date=datetime(2025, 1, 1),
tags=["quant", "stock"],
) as dag:
# 定义任务
fetch_task = PythonOperator(
task_id="fetch_stock_data",
python_callable=fetch_stock_data,
provide_context=True,
)
# 设置任务依赖关系
fetch_task
运行 DAG:
- 将 DAG 文件保存到 Airflow 的
dags
目录。 - 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。
总结
通过本章的学习,您已经掌握了 Apache Airflow 的基础概念和基本操作。在接下来的章节中,我们将深入探讨 Airflow 的任务调度、依赖管理以及在 A 股量化交易中的应用。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。