VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶三

发布于:2025-03-30 ⋅ 阅读:(43) ⋅ 点赞:(0)

VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶三

本方案融合 LSTM 时序预测与动态风险控制。系统采用混合架构,离线训练构建多尺度特征工程和双均线策略,结合在线增量更新持续优化模型。技术要点包括三层特征筛选、波动率动态仓位管理、混合精度训练提升效率,以及用 VectorBT 验证收益。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

Backtest Strategy

本文是进阶指南🚀,推荐先阅读了解基础知识‼️

一、方案设计

1.1 方案概述

1.1.1 系统原理

本方案采用LSTM神经网络预测股票收益率,结合双均线交易策略和VectorBT回测框架,构建端到端的量化交易系统。核心流程包括:

  1. 特征工程:通过TA-Lib生成多维技术指标
  2. 特征选择:采用互信息+随机森林+基于模型特性的三阶段特征筛选
  3. 模型训练:LSTM网络学习收益率时序模式
  4. 策略生成:基于预测收益率的双均线策略
  5. 动态仓位:根据预测置信度和波动率调整仓位
  6. 回测验证:VectorBT实现交易回测

1.1.2 架构设计

在线更新
离线训练
增量数据
模型增量更新
原始数据
LSTM预测
特征选择
特征工程
双均线策略
动态仓位管理
VectorBT回测
绩效分析

1.1.3 核心特点

  1. 增量训练:支持在线更新模型参数
  2. 多维特征:包含8大类技术指标
  3. 动态风控:基于波动率的仓位控制
  4. 交易成本:考虑手续费和滑点
  5. 可解释性:特征重要性分析

1.1.4 注意事项

  1. 数据频率需保持一致性(默认日频)
  2. 增量更新时需保证特征分布一致性
  3. 避免在特征工程中使用未来数据
  4. 回测结果需进行多次样本外测试

1.2 关键步骤详解

1.2.1 工作流程

DataProcessor FeatureEngineer Model Trainer Backtester 生成特征 定义模型 初始训练 回测 反馈结果 增量训练 再次回测 DataProcessor FeatureEngineer Model Trainer Backtester

1.2.2 步骤说明

1. 数据处理

  • 加载数据:从指定路径加载股票数据,并进行基本的预处理,如日期格式转换、计算收益率等。
  • 生成技术指标:使用TA-Lib库生成各种技术指标,如移动平均线、相对强弱指数(RSI)等。

2. 特征工程

  • 特征生成:根据不同的特征组(如趋势类、动量类、波动率类等),生成相应的技术指标。
  • 特征选择:使用互信息、随机森林和模型特性进行三阶段特征选择,保留最重要的特征。

3. 模型定义

  • LSTM模型:定义一个单尺度的LSTM模型,用于预测未来的收益率。

4. 训练

  • 初始训练:使用历史数据进行初始训练,并保存模型和特征处理器。
  • 增量训练:在已有模型的基础上,使用新数据进行增量训练。

5. 回测

  • 信号生成:根据预测的收益率和波动率,生成交易信号。
  • 策略执行:使用VectorBT库执行交易策略,并评估策略的表现。

1.2.3 分类应用要点

  • 策略适配:趋势策略侧重MA/MACD,反转策略依赖RSI/KDJ。
  • 参数优化:避免过度拟合,需结合市场周期调整。
  • 多指标协同:例如用ADX过滤RSI信号(仅在强趋势下交易超买超卖)。

1.2.4 归一化技术要点

参数 作用 推荐选择
RobustScaler 用IQR抗异常值 数据含离群点时使用
MinMaxScaler 缩放到[0,1]区间 需要固定范围时
StandardScaler 标准正态分布化 数据分布较对称时

1.3 关键类和函数详解

1.3.1 data_processing.py

数据加载和处理模块

def load_data(ts_code, data_path="./data"):
    """加载预处理后的股票数据

    :param ts_code: 股票代码(如"600000.SH")
    :param data_path: 数据存储路径
    :return: 处理后的DataFrame

    处理步骤:
    1. 读取parquet格式的本地数据
    2. 转换交易日期格式
    3. 计算次日收益率(目标变量)
    4. 删除缺失值
    """
    # 关键代码解释:
    # 使用shift(-1)计算次日收益率,避免未来数据泄露
    df["returns"] = df["close"].pct_change().shift(-1)

1.3.2 model_definition.py

模型定义模块

class SingleScaleLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim=128):
        """单尺度LSTM预测模型

        :param input_dim: 输入特征维度
        :param hidden_dim: LSTM隐藏层维度, defaults to 128
        """
        
    def forward(self, x):
        """前向传播

        :param x: 输入张量 [batch_size, seq_len, input_dim]
        :return: 预测收益率张量 [batch_size, 1]

        网络结构:
        1. LSTM层提取时序特征
        2. 批归一化层稳定训练
        3. 两层全连接网络进行预测
        4. 使用ReLU激活和Dropout正则化
        """
        # 关键代码解释:
        # 仅使用LSTM最后一个时间步的输出进行预测
        out = out[:, -1, :]

模型结构说明:

输入窗口
LSTM层
BatchNorm
全连接层1
ReLU
Dropout
全连接层2
预测输出

1.3.3 training.py

模型训练模块,包含特征工程、数据处理、增量训练和模型保存等功能。

class SingleWindowDataset(Dataset):
    """
    单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。
    :param X_window: 滑动窗口特征数据
    :param y: 目标变量
    """

class TrainingStateManager:
    """
    训练状态管理器类,负责模型和预处理器的保存和加载。
    :param model_dir: 模型保存目录,defaults to "./models"
    """

class OnlineFeatureEngineer:
    """在线特征工程生成器

    :param windows: 滑动窗口列表(用于特征生成), defaults to [5]
    """
    
    def generate_features(self, df, refit=False):
        """生成技术指标特征

        :param df: 原始数据DataFrame
        :param refit: 是否重新拟合标准化器, defaults to False
        :return: 特征DataFrame

        生成7大类技术指标:
        1. 趋势类指标(MA, MACD等)
        2. 动量类指标(RSI, KDJ等)
        3. 波动率指标(布林带, ATR等)
        4. 成交量指标(OBV, MFI等)
        5. 市场情绪类指标 (需要外部数据) -- 忽略
        6. 支撑阻力指标(斐波那契回撤等)
        7. 统计指标(线性回归斜率等)
        8. 复合指标(Ichimoku云图等)
        """
        # 关键代码解释:
        # 使用TA-Lib计算技术指标
        df[f"MA20"] = talib.SMA(df["close"], timeperiod=20)
        ...
        
class IncrementalDataHandler:
    """增量数据处理器

    :param feature_engineer: 特征工程对象
    :param feature_selector: 特征选择器
    :param window_size: 滑动窗口大小, defaults to 3
    """
    def sliding_window(self, data, window_size):
        """生成时序滑动窗口数据

        :param data: 原始特征数据
        :param window_size: 窗口长度
        :return: 窗口序列数据 [n_samples, window_size, n_features]
        """
        # 关键代码解释:
        # 创建时间步长为window_size的序列数据
        for i in range(window_size, len(data)):
            sequences.append(data[i-window_size:i])
        
class IncrementalTrainer:
    """增量训练控制器

    :param device: 计算设备(CPU/GPU)
    """
    def initial_train(self, df, train_config):
        """初始训练流程"""
    def incremental_update(self, new_df):
        """在线更新流程"""
    def _train(self, X_windows, y, train_config):
        """模型训练内部方法
        
        关键配置:
        - 使用AdamW优化器
        - 指数衰减学习率
        - Huber损失函数
        - 梯度裁剪
        """

1.3.4 backtesting.py

回测策略模块,包含信号生成和回测执行逻辑。

class DualMovingAverageStrategy:
    def __init__(self, pred_returns, volatility, params):
        """双均线交易策略生成器

        :param pred_returns: 模型预测的收益率序列
        :param volatility: 波动率序列(用于仓位计算)
        :param params: 策略参数字典
        """
        
    def generate_signals(self):
        """生成交易信号和仓位大小
        
        :return: (交易信号序列, 仓位大小序列)
        
        实现逻辑:
        1. 计算长短周期移动平均线
        2. 通过均线交叉产生买卖信号
        3. 基于预测收益率和波动率计算动态仓位
        4. 输出信号分布统计信息
        """
        # 关键代码解释:
        # 使用tanh函数压缩仓位大小,实现非线性映射
        position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(0.1, 0.8)

class BacktestStrategy:
    def __init__(self, model, device):
        """回测执行引擎

        :param model: 训练好的预测模型
        :param device: 计算设备(CPU/GPU)
        """
        
    def _vbt_settings(self):
        """配置VectorBT全局参数
        设置包括:
        - 时间频率为日频
        - 使用暗色主题
        - 初始资金10万元
        - 交易成本(手续费+滑点)共0.5%
        """
        
    def run(self, test_data, df, feature_engineer, feature_selector):
        """执行完整回测流程

        :param test_data: 测试数据集元组(X_test, y_test)
        :param df: 原始数据DataFrame
        :param feature_engineer: 特征工程对象
        :param feature_selector: 特征选择器
        :return: (组合对象, 结果DataFrame)

        关键步骤:
        1. 模型预测收益率
        2. 构建包含价格、预测值和波动率的结果DataFrame
        3. 生成交易信号和仓位大小
        4. 使用VectorBT执行组合回测
        """
        # 关键代码解释:
        # 使用rolling窗口计算动态波动率
        result_df["volatility"] = df["atr"].values[-len(preds):] / df["close"].values[-len(preds):]

1.3.5 main.py

主执行模块

def set_random_seed(seed=42):
    """设置全局随机种子

    :param seed: 随机种子, defaults to 42

    影响范围:
    - Python内置随机模块
    - Numpy随机数生成
    - PyTorch CPU/CUDA随机种子
    """
    # 关键代码解释:
    # 确保CUDA卷积确定性
    torch.backends.cudnn.deterministic = True
def train_model(train_data=None, incremental=False):
    """模型训练入口函数

    :param train_data: 训练数据, defaults to None
    :param incremental: 是否为增量训练模式, defaults to False
    :return: 训练完成的模型

    执行逻辑:
    1. 检测可用计算设备(优先使用CUDA)
    2. 初始化训练器
    3. 根据模式选择初始训练或增量更新
    """
    # 关键代码解释:
    # 多设备支持配置
    device = torch.device("cuda" if torch.cuda.is_available() 
                         else "mps" if torch.backends.mps.is_available() 
                         else "cpu")

执行流程:

初始训练
增量更新
设置随机种子
加载数据
模式判断
完整训练
在线更新
保存模型
回测验证

参数配置说明表:

参数名称 默认值 作用域 说明
hidden_dim 128 模型结构 神经网络隐藏层的维度,决定模型的复杂度和表达能力
lr 1e-4 优化器 学习率(Learning Rate),控制参数更新的步长大小
batch_size 64 训练过程 单次训练使用的样本量,影响内存消耗和梯度更新频率
weight_decay 1e-6 正则化 L2正则化系数,用于防止过拟合
epochs 100 训练控制 完整遍历训练数据的总轮次
window_size 5 数据预处理 时间序列窗口大小,用于构建输入序列的滑动窗口长度

参数依赖关系:

window_size
输入序列长度
hidden_dim
模型复杂度
需要调整 lr/weight_decay

参数调试建议:

  1. 优先调参顺序
    window_sizebatch_sizelrhidden_dimweight_decay

  2. 快速验证组合

    initial_config = {
        "hidden_dim": 64,      # 降低模型复杂度
        "lr": 3e-4,            # 适度增大学习率
        "batch_size": 32,      # 加快迭代速度
        "window_size": 7       # 平衡长短期特征
        "weight_decay": 1e-5,  # 增强模型泛化性
    }
    

性能与效果的权衡:

参数调整方向 可能收益 潜在风险
增大 hidden_dim 提升模型拟合能力 增加过拟合风险
减小 batch_size 提高梯度更新频率 增加训练波动性
增大 weight_decay 增强模型泛化性 可能削弱特征学习能力

1.4 优化建议

模型层面优化:

模型改进
多尺度LSTM
注意力机制
残差连接
波动率预测分支

策略层面优化:

优化方向 具体方法
信号过滤 结合波动率阈值过滤虚假信号
动态参数 使用强化学习优化均线参数
组合优化 引入风险平价仓位分配
止损机制 加入动态跟踪止损模块

二、代码实现

3.1 数据准备

import warnings
import pandas as pd
import numpy as np
import tushare as ts
import talib as ta

warnings.filterwarnings("ignore", category=FutureWarning)

# 初始化tushare
ts.set_token("your_token")  # 替换为你的tushare token
pro = ts.pro_api()

# 股票代码
# 平安银行(000001.SZ)
ts_code = "000001.SZ"
# start_date = "20250101"
# end_date = "20250228"
start_date = "20100101"
end_date = "20241231"

# 获取A股数据
def get_tushare_data(ts_code, start_date, end_date):
    df = pro.daily(ts_code=ts_code, start_date=start_date, end_date=end_date)
    return df

tushare_data = get_tushare_data(ts_code, start_date, end_date)
tushare_data = tushare_data.sort_values("trade_date")

# 将数据存储为Parquet文件
tushare_data.to_parquet(f"./data/{ts_code}.parquet", index=False)

# 读取Parquet文件
df = pd.read_parquet(f"./data/{ts_code}.parquet")

df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")

"""计算8类技术指标"""

# ===== 1. 趋势类指标 =====
df["MA20"] = ta.SMA(df["close"], timeperiod=20)  # 20日均线
df["EMA12"] = ta.EMA(df["close"], timeperiod=12)
macd, macdsignal, macdhist = ta.MACD(df["close"])
df["MACD"] = macd
df["ADX"] = ta.ADX(df["high"], df["low"], df["close"], timeperiod=14)
df["SAR"] = ta.SAR(df["high"], df["low"], acceleration=0.02, maximum=0.2)

# ===== 2. 动量类指标 =====
df["RSI"] = ta.RSI(df["close"], timeperiod=14)
# KDJ计算(TA-Lib的STOCH返回K,D值)
slowk, slowd = ta.STOCH(
    df["high"], df["low"], df["close"], fastk_period=9, slowk_period=3, slowd_period=3
)
df["KDJ_K"] = slowk
df["KDJ_D"] = slowd
df["KDJ_J"] = 3 * slowk - 2 * slowd  # J值=3K-2D
df["CCI"] = ta.CCI(df["high"], df["low"], df["close"], timeperiod=14)
df["WILLR"] = ta.WILLR(df["high"], df["low"], df["close"], timeperiod=14)

# ===== 3. 波动率类指标 =====
upper, middle, lower = ta.BBANDS(df["close"], timeperiod=20, nbdevup=2, nbdevdn=2)
df["BB_upper"] = upper
df["BB_middle"] = middle
df["BB_lower"] = lower
df["ATR"] = ta.ATR(df["high"], df["low"], df["close"], timeperiod=14)
df["STD20"] = ta.STDDEV(df["close"], timeperiod=20, nbdev=1)  # 20日标准差

# ===== 4. 成交量类指标 =====
df["OBV"] = ta.OBV(df["close"], df["vol"])
df["MFI"] = ta.MFI(df["high"], df["low"], df["close"], df["vol"], timeperiod=14)

# ===== 5. 市场情绪类指标 =====
# 需要额外数据(如VIX),此处仅作示例
# df['VIX'] = external_vix_data

# ===== 6. 支撑阻力类 =====
# 斐波那契回撤(手动实现)
recent_high = df["high"][-50:].max()  # 假设最近50周期最高价
recent_low = df["low"][-50:].min()
df["Fib_0.382"] = recent_high - (recent_high - recent_low) * 0.382
df["Fib_0.618"] = recent_high - (recent_high - recent_low) * 0.618

# 枢轴点(Pivot Points)
df["Pivot"] = (
    df["high"].shift(1) + df["low"].shift(1) + df["close"].shift(1)
) / 3  # 前一日枢轴点

# ===== 7. 统计类指标 =====
# 线性回归通道(TA-Lib的LINEARREG)
df["LR_slope"] = ta.LINEARREG_SLOPE(df["close"], timeperiod=20)
df["LR_angle"] = np.arctan(df["LR_slope"]) * (180 / np.pi)  # 转换为角度

# ===== 8. 复合型指标 =====
# Ichimoku云图
df["Ichimoku_tenkan"] = (
    ta.MAX(df["high"], timeperiod=9) + ta.MIN(df["low"], timeperiod=9)
) / 2  # 转换线 (Tenkan-sen)
df["Ichimoku_kijun"] = (
    ta.MAX(df["high"], timeperiod=26) + ta.MIN(df["low"], timeperiod=26)
) / 2  #  基准线 (Kijun-sen)
# 先行带A (Senkou Span A)
df["Ichimoku_senkou_a"] = ((df["Ichimoku_tenkan"] + df["Ichimoku_kijun"]) / 2).shift(26)
# 先行带B (Senkou Span B)
df["Ichimoku_senkou_b"] = (
    (ta.MAX(df["high"], timeperiod=52) + ta.MIN(df["low"], timeperiod=52)) / 2
).shift(26)
# 延迟线 (Chikou Span)
df["Ichimoku_chikou"] = df["close"].shift(-26)

# 存储为Parquet文件
df.to_parquet(f"./data/processed_{ts_code}.parquet", index=False)

# 读取Parquet文件
df = pd.read_parquet(f"./data/processed_{ts_code}.parquet")

df.dropna(inplace=True)

print(df.head())

3.2 工程代码

目录结构:

data/
├── 000001.SZ.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_3_model.pth
├── vectorbt_3_preprocessors.pkl
src/
└── vectorbt-3/
    ├── data_processing.py
    ├── model_definition.py
    ├── training.py
    ├── backtesting.py
    ├── main.py
    └── __init__.py

3.2.1 data_processing.py

import pandas as pd


def load_data(ts_code, data_path="./data"):
    """加载预处理后的股票数据

    :param ts_code: 股票代码(如"600000.SH")
    :param data_path: 数据存储路径
    :return: 处理后的DataFrame

    处理步骤:
    1. 读取parquet格式的本地数据
    2. 转换交易日期格式
    3. 计算次日收益率(目标变量)
    4. 删除缺失值
    """
    df = pd.read_parquet(f"{data_path}/processed_{ts_code}.parquet")
    df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
    df.set_index("trade_date", inplace=True)
    # 使用shift(-1)计算次日收益率,避免未来数据泄露
    df["returns"] = df["close"].pct_change().shift(-1)
    df.dropna(inplace=True)
    return df

3.2.2 model_definition.py

import torch.nn as nn


class SingleScaleLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim=128):
        """单尺度LSTM预测模型

        :param input_dim: 输入特征维度
        :param hidden_dim: LSTM隐藏层维度, defaults to 128
        """
        super().__init__()
        # 定义LSTM层,输入维度为input_dim,隐藏层维度为hidden_dim,batch_first=True表示输入张量的形状为[batch_size, seq_len, input_dim]
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        # 定义全连接层序列,用于处理LSTM的输出
        self.fc = nn.Sequential(
            # 批归一化层,稳定训练过程
            nn.BatchNorm1d(hidden_dim),
            # 第一个全连接层,将隐藏层维度映射到128维
            nn.Linear(hidden_dim, 128),
            # ReLU激活函数
            nn.ReLU(),
            # Dropout层,防止过拟合,丢弃概率为0.3
            nn.Dropout(0.3),
            # 最后一个全连接层,将128维映射到1维,即最终的预测值
            nn.Linear(128, 1),
        )

    def forward(self, x):
        """前向传播

        :param x: 输入张量 [batch_size, seq_len, input_dim]
        :return: 预测收益率张量 [batch_size, 1]

        网络结构:
        1. LSTM层提取时序特征
        2. 批归一化层稳定训练
        3. 两层全连接网络进行预测
        4. 使用ReLU激活和Dropout正则化
        """
        # 通过LSTM层处理输入张量x,得到输出out和隐藏状态h
        out, _ = self.lstm(x)
        # 仅使用LSTM最后一个时间步的输出进行预测,形状为[batch_size, hidden_dim]
        out = out[:, -1, :]
        # 通过全连接层序列处理LSTM的输出,得到最终的预测结果
        return self.fc(out)

3.2.3 training.py

import os

import joblib
import numpy as np
import pandas as pd
import talib
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel, mutual_info_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

from vectorbt_3.backtesting import BacktestStrategy
from vectorbt_3.model_definition import SingleScaleLSTM


class SingleWindowDataset(Dataset):
    def __init__(self, X_window, y):
        """单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。

        :param X_window: 滑动窗口特征数据
        :param y: 目标变量
        """
        self.X_window = X_window.astype(np.float32)
        self.y = y.astype(np.float32)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.X_window[idx]).float()
        label = torch.tensor(self.y[idx], dtype=torch.float32)
        return x, label


class TrainingStateManager:
    def __init__(self, model_dir="./models"):
        """训练状态管理器类,负责模型和预处理器的保存和加载。

        :param model_dir: 模型保存目录,defaults to "./models"
        """
        self.model_dir = model_dir
        self.model_path = f"{self.model_dir}/vectorbt_3_model.pth"
        self.preprocessors_path = f"{self.model_dir}/vectorbt_3_preprocessors.pkl"
        os.makedirs(model_dir, exist_ok=True)

    def save(self, model, feature_engineer, feature_selector, config):
        """保存模型和预处理器。

        :param model: 训练好的模型
        :param feature_engineer: 特征工程对象
        :param feature_selector: 特征选择器
        :param config: 模型配置
        """
        torch.save(
            {"model_state_dict": model.state_dict(), "config": config},
            self.model_path,
        )
        joblib.dump(
            {
                "feature_engineer": feature_engineer,
                "feature_selector": feature_selector,
                "config": config,
            },
            self.preprocessors_path,
        )
        print(f"Model saved to {self.model_path}")
        print(f"Preprocessor saved to {self.preprocessors_path}")

    def load(self, device):
        """加载模型和预处理器。

        :param device: 计算设备(CPU/GPU)
        :return: 加载的模型、特征工程对象、特征选择器和配置
        """
        model = None
        feature_engineer = None
        feature_selector = None
        config = None

        if os.path.exists(self.model_path):
            model_data = torch.load(
                self.model_path, weights_only=False, map_location=device
            )
            config = model_data["config"]
            model = SingleScaleLSTM(**config).to(device)
            model.load_state_dict(model_data["model_state_dict"])

        if os.path.exists(self.preprocessors_path):
            preprocess = joblib.load(self.preprocessors_path)
            feature_engineer = preprocess["feature_engineer"]
            feature_selector = preprocess["feature_selector"]
            config = preprocess["config"]

        return model, feature_engineer, feature_selector, config


class OnlineFeatureEngineer:
    def __init__(self, windows=[5]):
        """在线特征工程生成器类,用于生成技术指标特征并进行标准化。

        :param windows: 滑动窗口列表(用于特征生成), defaults to [5]
        """
        self.windows = windows
        self.n_features = 10
        self.scalers = {
            "Trend": RobustScaler(),
            "Momentum": MinMaxScaler(),
            "Volatility": RobustScaler(),
            "Volume": StandardScaler(),
            # "Sentiment": StandardScaler(),
            "SupportResistance": MinMaxScaler(),
            "Statistical": StandardScaler(),
            "Composite": RobustScaler(),
        }
        # "price": ["open", "high", "low", "close"],
        self.feature_groups = {
            # Trend-Following Indicators 趋势类指标
            "Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],
            # Momentum Indicators 动量类指标
            "Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],
            # Volatility Indicators 波动率类指标
            "Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],
            # Volume Indicators 成交量类指标
            "Volume": ["OBV", "MFI"],
            # Market Sentiment Indicators 市场情绪类指标 (需要外部数据)
            # "Sentiment": [],
            # Support/Resistance Indicators 支撑阻力类指标
            "SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],
            # Statistical Indicators 统计类指标
            "Statistical": ["LR_slope", "LR_angle"],
            # Composite Indicators 复合型指标
            "Composite": [
                "Ichimoku_tenkan",
                "Ichimoku_kijun",
                "Ichimoku_senkou_a",
                "Ichimoku_senkou_b",
                "Ichimoku_chikou",
            ],
        }

    def partial_fit(self, new_df):
        """对新数据进行部分拟合。

        :param new_df: 新数据DataFrame
        """
        new_features = self.generate_features(new_df, refit=True)
        for group in self.scalers:
            if hasattr(self.scalers[group], "partial_fit"):
                self.scalers[group].partial_fit(
                    new_features[self.feature_groups[group]]
                )

    def generate_features(self, df, refit=False):
        """生成技术指标特征。

        :param df: 原始数据DataFrame
        :param refit: 是否重新拟合标准化器, defaults to False
        :return: 特征DataFrame

        生成7大类技术指标:
        1. 趋势类指标(MA, MACD等)
        2. 动量类指标(RSI, KDJ等)
        3. 波动率指标(布林带, ATR等)
        4. 成交量指标(OBV, MFI等)
        5. 市场情绪类指标 (需要外部数据) -- 忽略
        6. 支撑阻力指标(斐波那契回撤等)
        7. 统计指标(线性回归斜率等)
        8. 复合指标(Ichimoku云图等)
        """
        for window in self.windows:
            df[f"return_{window}d"] = df["close"].pct_change(window)
            df[f"vol_ma_{window}"] = talib.SMA(df["vol"], timeperiod=window)
            df[f"close_ma_{window}"] = talib.SMA(df["close"], timeperiod=window)

        processed = []
        for group in self.feature_groups:
            features = self.feature_groups[group]
            scaler = self.scalers[group]

            if not refit:
                scaler.fit(df[features])

            scaled = scaler.transform(df[features])
            processed.append(scaled)

        all_features = [
            feature for sublist in self.feature_groups.values() for feature in sublist
        ]
        return pd.DataFrame(
            np.hstack(processed), index=df.index, columns=all_features
        ).dropna()

    def feature_selection(self, X, y):
        """进行特征选择。

        :param X: 特征数据
        :param y: 目标变量
        :return: 选择后的特征数据
        """
        mi_scores = mutual_info_regression(X, y)
        selected = mi_scores > np.quantile(mi_scores, 0.2)
        X_mi = X[:, selected].astype(np.float32)
        print(
            f"[Stage 1] Number of features after mutual information filtering: {X_mi.shape[1]}"
        )

        rf = RandomForestRegressor(n_estimators=50, n_jobs=-1)
        rf.fit(X_mi, y)
        importances = rf.feature_importances_
        rf_selected = importances > np.mean(importances)
        X_rf = X_mi[:, rf_selected].astype(np.float32)
        print(
            f"[Stage 2] Number of features after random forest filtering: {X_rf.shape[1]}"
        )

        X_selected = X_rf.astype(np.float32)
        y_tensor = y.astype(np.float32).ravel()

        n_features_to_select = min(self.n_features, X_selected.shape[1])
        if n_features_to_select < X_selected.shape[1]:
            self.selector = SelectFromModel(rf, max_features=n_features_to_select)
            self.selector.fit(X_selected, y_tensor)
            final_features = self.selector.transform(X_selected)
            print(
                f"[Stage 3] Number of features after model filtering: {final_features.shape[1]}"
            )
        else:
            final_features = X_selected
            print(
                f"[Stage 3] No further filtering was performed, the number of features: {final_features.shape[1]}"
            )

        self.selected_features = final_features.astype(np.float32)

        return self.selected_features


class IncrementalDataHandler:
    def __init__(self, feature_engineer, feature_selector, window_size=3):
        """增量数据处理器类,用于处理增量数据并生成滑动窗口数据。

        :param feature_engineer: 特征工程对象
        :param feature_selector: 特征选择器
        :param window_size: 滑动窗口大小, defaults to 3
        """
        self.window_size = window_size
        self.buffer = pd.DataFrame()
        self.feature_engineer = feature_engineer
        self.feature_selector = feature_selector

    def update_buffer(self, new_df):
        """更新数据缓冲区。

        :param new_df: 新数据DataFrame
        """
        self.buffer = pd.concat([self.buffer, new_df])

    def prepare_incremental_data(self):
        """准备增量数据。

        :return: 训练数据和测试数据
        """
        processed_df = self.feature_engineer.generate_features(self.buffer)
        X_window = self.sliding_window(processed_df.values, self.window_size)
        y = self.buffer["returns"].values[-len(X_window) :]
        X_train, X_test, y_train, y_test = train_test_split(
            X_window, y, test_size=0.2, random_state=42
        )
        return (X_train, y_train), (X_test, y_test)

    def sliding_window(self, data, window_size):
        """生成时序滑动窗口数据

        :param data: 原始特征数据
        :param window_size: 窗口长度
        :return: 窗口序列数据 [n_samples, window_size, n_features]
        """
        sequences = []
        for i in range(window_size, len(data)):
            sequences.append(data[i - window_size : i])
        return np.array(sequences, dtype=np.float32)


class IncrementalTrainer:
    def __init__(self, device):
        """增量训练控制器类,负责模型的初始训练和增量更新。

        :param device: 计算设备(CPU/GPU)
        """
        self.device = device
        self.state_manager = TrainingStateManager()
        self.model = None
        self.feature_engineer = None
        self.feature_selector = None
        self.config = None
        self.backtest_strategy = BacktestStrategy(self.model, self.device)
        self.load_state()

    def load_state(self):
        """加载模型和预处理器。"""
        (self.model, self.feature_engineer, self.feature_selector, self.config) = (
            self.state_manager.load(self.device)
        )
        if self.model:
            self.backtest_strategy.model = self.model

    def initial_train(self, df, train_config, backtesting=True):
        """初始训练模型。

        :param df: 原始数据DataFrame
        :param train_config: 训练配置
        :param backtesting: 是否进行回测,defaults to True
        """
        self.feature_engineer = OnlineFeatureEngineer(
            windows=[train_config["window_size"]]
        )
        processed_df = self.feature_engineer.generate_features(df)

        X, y = processed_df.values, df.reindex(processed_df.index)["returns"].values
        self.feature_selector = self.feature_engineer.feature_selection(X, y)

        data_handler = IncrementalDataHandler(
            feature_engineer=self.feature_engineer,
            feature_selector=self.feature_selector,
            window_size=train_config["window_size"],
        )
        data_handler.buffer = df

        (X_train, y_train), (X_test, y_test) = data_handler.prepare_incremental_data()

        self.config = {
            "input_dim": X_train[0].shape[-1],
            "hidden_dim": train_config.get("hidden_dim", 128),
        }
        self.model = SingleScaleLSTM(**self.config).to(self.device)
        self.backtest_strategy.model = self.model

        print(f"Initial Train Config: {self.config}")

        self._train(X_train, y_train, train_config)

        print("\nModel Evaluation:")
        self._evaluate(X_test, y_test)

        print("\nTraining Save:")
        self.state_manager.save(
            self.model, self.feature_engineer, self.feature_selector, self.config
        )

        if backtesting:
            print("\nStrategy Backtesting:")
            self.backtest_strategy.run(
                (X_test, y_test),
                data_handler.buffer,
                self.feature_engineer,
                self.feature_selector,
            )

    def incremental_update(self, new_df, backtesting=True):
        """增量更新模型。

        :param new_df: 新数据DataFrame
        :param backtesting: 是否进行回测,defaults to True
        """
        if not self.model:
            print("The model does not exist, please train it first.")
            return

        self.feature_engineer.partial_fit(new_df)

        data_handler = IncrementalDataHandler(
            self.feature_engineer, self.feature_selector
        )
        data_handler.update_buffer(new_df)

        (X_train, y_train), (X_test, y_test) = data_handler.prepare_incremental_data()

        print(f"Incremental Update Config: {self.config}")

        self._train(
            X_train,
            y_train,
            {
                "lr": 1e-4,
                "batch_size": 64,
                "weight_decay": 1e-6,
                "epochs": 100,
            },
        )

        print("\nModel Evaluation:")
        self._evaluate(X_test, y_test)

        print("\nTraining Save:")
        self.state_manager.save(
            self.model, self.feature_engineer, self.feature_selector, self.config
        )

        if backtesting:
            print("\nStrategy Backtesting:")
            self.backtest_strategy.run(
                (X_test, y_test),
                data_handler.buffer,
                self.feature_engineer,
                self.feature_selector,
            )

    def _train(self, X_windows, y, train_config):
        """模型训练内部方法。

        关键配置:
        - 使用AdamW优化器
        - 指数衰减学习率
        - Huber损失函数
        - 梯度裁剪
        """
        dataset = SingleWindowDataset(X_windows, y)
        loader = DataLoader(
            dataset,
            batch_size=train_config["batch_size"],
            shuffle=False,
        )

        optimizer = optim.AdamW(
            self.model.parameters(),
            lr=train_config["lr"],
            weight_decay=train_config["weight_decay"],
        )
        scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
        criterion = nn.HuberLoss()

        epochs = train_config["epochs"]
        train_loss = []
        for epoch in tqdm(range(train_config["epochs"]), desc="Training"):
            self.model.train()
            total_loss = 0
            for X_batch, y_batch in loader:
                X_batch = X_batch.to(self.device)
                y_batch = y_batch.to(self.device).unsqueeze(1)

                optimizer.zero_grad()
                preds = self.model(X_batch)
                loss = criterion(preds, y_batch)
                loss.backward()
                nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                optimizer.step()
            scheduler.step()
            total_loss += loss.item()

            if (epoch + 1) % 10 == 0:
                train_loss.append(
                    f"Epoch [{epoch+1}/{epochs}], Train Loss: {total_loss:.4f}"
                )
        for loss in train_loss:
            print(loss)

    def _evaluate(self, X_test_windows, y_test):
        """模型评估内部方法。

        :param X_test_windows: 测试集滑动窗口特征数据
        :param y_test: 测试集目标变量
        :return: 平均损失
        """
        test_dataset = SingleWindowDataset(X_test_windows, y_test)
        test_loader = DataLoader(
            test_dataset,
            batch_size=256,
            shuffle=False,
        )
        self.model.eval()
        total_loss = 0
        criterion = nn.HuberLoss()
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                X_batch = X_batch.to(self.device)
                y_batch = y_batch.to(self.device).unsqueeze(1)
                preds = self.model(X_batch)
                loss = criterion(preds, y_batch)
                total_loss += loss.item() * len(y_batch)
        avg_loss = total_loss / len(test_dataset)
        print(f"Val Avg Loss: {avg_loss:.4f}")
        return avg_loss

3.2.4 backtesting.py

import numpy as np
import pandas as pd
import torch
import vectorbt as vbt


class DualMovingAverageStrategy:
    def __init__(self, pred_returns, volatility, params):
        """双均线交易策略生成器。

        :param pred_returns: 模型预测的收益率序列
        :param volatility: 波动率序列(用于仓位计算)
        :param params: 策略参数字典
        """
        self.pred_returns = pred_returns
        self.volatility = volatility.clip(lower=0.01)  # 防止波动率为0,导致除零错误
        self.params = params

    def generate_signals(self):
        """生成交易信号和仓位大小。

        :return: (交易信号序列, 仓位大小序列)

        实现逻辑:
        1. 计算长短周期移动平均线
        2. 通过均线交叉产生买卖信号
        3. 基于预测收益率和波动率计算动态仓位
        4. 输出信号分布统计信息
        """
        short_window = self.params.get("short_window", 5)
        long_window = self.params.get("long_window", 20)

        short_mavg = self.pred_returns.rolling(
            window=short_window, min_periods=1
        ).mean()
        long_mavg = self.pred_returns.rolling(window=long_window, min_periods=1).mean()

        signals = pd.Series(0, index=self.pred_returns.index)
        signals[
            (short_mavg > long_mavg) & (short_mavg.shift(1) <= long_mavg.shift(1))
        ] = 1
        signals[
            (short_mavg < long_mavg) & (short_mavg.shift(1) >= long_mavg.shift(1))
        ] = -1

        # 使用tanh函数压缩仓位大小,实现非线性映射
        position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(
            0.1, 0.8
        )

        print(f"Long Conditions: {(signals == 1).sum()}")
        print(
            f"Short Conditions: {(signals == -1).sum()}",
        )
        print("Signals Distribution:")
        print(signals.value_counts())

        return signals, position_size


class BacktestStrategy:
    def __init__(self, model, device):
        """回测执行引擎。

        :param model: 训练好的预测模型
        :param device: 计算设备(CPU/GPU)
        """
        self.model = model
        self.device = device
        self._vbt_settings()

    def _vbt_settings(self):
        """配置VectorBT全局参数
        设置包括:
        - 时间频率为日频
        - 使用暗色主题
        - 初始资金10万元
        - 交易成本(手续费+滑点)共0.5%
        """
        vbt.settings.array_wrapper["freq"] = "D"
        vbt.settings.plotting["layout"]["template"] = "vbt_dark"
        vbt.settings.plotting["layout"]["width"] = 1200
        vbt.settings.portfolio["init_cash"] = 100000.0  # CNY
        vbt.settings.portfolio["fees"] = 0.0025  # 0.25%
        vbt.settings.portfolio["slippage"] = 0.0025  # 0.25%

    def run(self, test_data, df, feature_engineer, feature_selector):
        """执行完整回测流程。

        :param test_data: 测试数据集元组(X_test, y_test)
        :param df: 原始数据DataFrame
        :param feature_engineer: 特征工程对象
        :param feature_selector: 特征选择器
        :return: (组合对象, 结果DataFrame)

        关键步骤:
        1. 模型预测收益率
        2. 构建包含价格、预测值和波动率的结果DataFrame
        3. 生成交易信号和仓位大小
        4. 使用VectorBT执行组合回测
        """
        X_test, y_test = test_data
        self.model.eval()
        with torch.no_grad():
            test_tensor = torch.FloatTensor(X_test).to(self.device)
            preds = self.model(test_tensor).cpu().numpy().flatten()
        test_dates = df.index[-len(preds) :]
        result_df = pd.DataFrame(
            {
                "close": df["close"].values[-len(preds) :],
                "pred_returns": preds,
                # 使用rolling窗口计算动态波动率
                "volatility": df["ATR"].values[-len(preds) :]
                / df["close"].values[-len(preds) :],
            },
            index=test_dates,
        )
        strategy = DualMovingAverageStrategy(
            pred_returns=result_df["pred_returns"],
            volatility=result_df["volatility"],
            params={"short_window": 5, "long_window": 20},
        )
        signals, position_size = strategy.generate_signals()
        pf = vbt.Portfolio.from_signals(
            close=result_df["close"],
            entries=signals == 1,
            exits=signals == -1,
            size=position_size.abs(),
            size_type="percent",
            freq="D",
        )
        print("Backtest Results:")
        print(pf.stats())
        # pf.plot().show()
        return pf, result_df

3.2.5 main.py

import random

import numpy as np
import torch

from vectorbt_3.data_processing import load_data
from vectorbt_3.training import IncrementalTrainer


def set_random_seed(seed=42):
    """设置全局随机种子

    :param seed: 随机种子, defaults to 42

    影响范围:
    - Python内置随机模块
    - Numpy随机数生成
    - PyTorch CPU/CUDA随机种子
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 如果使用多个GPU
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False


def train_model(train_data=None, incremental=False):
    """模型训练入口函数

    :param train_data: 训练数据, defaults to None
    :param incremental: 是否为增量训练模式, defaults to False
    :return: 训练完成的模型

    执行逻辑:
    1. 检测可用计算设备(优先使用CUDA)
    2. 初始化训练器
    3. 根据模式选择初始训练或增量更新
    """
    # 检测可用计算设备(优先使用CUDA)
    device = torch.device(
        "cuda"
        if torch.cuda.is_available()
        else "mps" if torch.backends.mps.is_available() else "cpu"
    )
    # 初始化训练器
    trainer = IncrementalTrainer(device)

    # 根据模式选择初始训练或增量更新
    if incremental:
        if train_data is not None:
            trainer.incremental_update(train_data)
    else:
        initial_config = {
            "hidden_dim": 128,
            "lr": 1e-4,
            "batch_size": 64,
            "weight_decay": 1e-6,
            "window_size": 5,
            "epochs": 200,
        }
        trainer.initial_train(train_data, initial_config)
    return trainer.model


if __name__ == "__main__":
    # 设置随机种子
    # 函数确保了整个训练过程的可重复性。
    # 通过设置相同的随机种子,可以保证每次运行时生成的随机数序列一致,这对于调试和实验验证非常重要。
    set_random_seed()

    # 初始化训示例
    ts_code = "600000.SH" # 股票代码 平安银行(000001.SZ)
    df = load_data(ts_code)
    model = train_model(df)

    # 增量训练示例
    # new_df = load_data(ts_code)
    # model = train_model(new_df, incremental=True)

附录 技术指标知识点

在量化交易中,技术指标通常根据其功能和应用场景分为以下几大类别,每类指标在策略设计中承担不同的角色:

1. 趋势类指标(Trend-Following Indicators)

用于识别和跟踪市场趋势方向,适用于趋势跟踪策略。

  • 典型指标
    • 移动平均线(MA, EMA):平滑价格波动,显示趋势方向。
    • MACD(异同移动平均线):通过快慢均线差值判断趋势强度。
    • ADX(平均趋向指数):量化趋势强度(非方向)。
    • 抛物线转向(SAR):标记潜在趋势反转点。
  • 用途:判断多空趋势、入场/离场时机。

2. 动量类指标(Momentum Indicators)

衡量价格变化速度和强度,捕捉超买超卖或反转信号。

  • 典型指标
    • RSI(相对强弱指数):0-100区间内反映价格动能。
    • 随机振荡器(KDJ):比较收盘价与价格区间位置。
    • CCI(商品通道指数):衡量价格偏离统计平均的程度。
    • 威廉姆斯%R:反向显示超买超卖水平。
  • 用途:预测反转、确认趋势延续性。

3. 波动率类指标(Volatility Indicators)

反映价格波动幅度,辅助风险管理。

  • 典型指标
    • 布林带(Bollinger Bands):通过标准差构造价格通道。
    • ATR(平均真实波幅):量化市场波动程度。
    • 标准差(Standard Deviation):衡量价格离散度。
  • 用途:设定止损/止盈、识别波动率突变。

4. 成交量类指标(Volume Indicators)

结合交易量验证价格变动的可信度。

  • 典型指标
    • OBV(能量潮):通过成交量累积/派发判断资金流向。
    • 成交量加权平均价(VWAP):机构常用基准价。
    • 资金流量指标(MFI):类似RSI但结合成交量。
    • Chaikin资金流(CMF):结合价格与成交量的资金强度。
  • 用途:确认趋势强度、探测主力行为。

5. 市场情绪类指标(Market Sentiment Indicators)

量化市场参与者的心理状态。

  • 典型指标
    • VIX(恐慌指数):标普500隐含波动率,反映市场情绪。
    • Put/Call比率:期权多空比例。
    • 融资融券余额:A股市场杠杆情绪指标。
  • 用途:逆向交易、极端情绪预警。

6. 支撑阻力类指标(Support & Resistance Indicators)

识别关键价格区域,辅助突破/回调策略。

  • 典型指标
    • 斐波那契回撤/扩展:基于黄金分割比例预测支撑阻力。
    • 枢轴点(Pivot Points):日内交易常用静态关键位。
  • 用途:设定入场/离场目标位。

7. 统计类指标(Statistical Indicators)

基于概率分布或回归分析,挖掘统计规律。

  • 典型指标
    • 线性回归通道:价格围绕回归线分布的统计区间。
    • 协整性分析:配对交易中判断价差平稳性。
  • 用途:均值回归策略、统计套利。

8. 复合型指标(Composite Indicators)

整合多种指标形成综合信号。

  • 典型指标
    • Ichimoku云图:结合趋势、动量、支撑阻力的多维度系统。
    • TD序列:德马克理论中的序列计数信号。
  • 用途:多因子策略、减少单一指标噪音。

理解每类指标的核心逻辑,结合回测验证其有效性,是量化策略开发的关键步骤。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。


网站公告

今日签到

点亮在社区的每一天
去签到