VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶四

发布于:2025-04-03 ⋅ 阅读:(22) ⋅ 点赞:(0)

VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶四

本方案融合 LSTM 时序预测与动态风险控制。系统采用混合架构,离线训练构建多尺度特征工程和双均线策略,结合在线增量更新持续优化模型。技术要点包括三层特征筛选、波动率动态仓位管理、混合精度训练提升效率,以及用 VectorBT 验证收益。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

Backtest Strategy

本文是进阶指南🚀,推荐先阅读了解基础知识‼️

1. 方案概述

本方案设计了一个基于PyTorch和LSTM的短期交易模型,支持多股票增量数据训练和单股数据回测。该模型利用了时间序列预测、特征工程、波动率动态仓位管理、超参数优化以及回测等技术手段,以实现高效的交易策略生成和评估。

原理

LSTM预测模型

  • 采用多尺度LSTM架构,包含短期(5天)和中期(10天)两个时间维度
  • 引入时间注意力机制,自动捕捉重要时间节点
  • 使用Huber Loss作为损失函数,增强对异常值的鲁棒性

双EMA策略

  • 基于预测收益率生成快慢EMA交叉信号
  • 动态参数优化机制:使用Optuna框架进行参数寻优
  • 风险控制:波动率动态仓位管理,双重交易成本(0.5%总成本 ‼️示例数据‼️)

Optuna: 用于自动化的超参数优化。

特点

  1. 增量学习架构
新数据
特征在线更新
模型增量训练
参数动态调整
策略实时更新
  1. 多维度特征体系
  • 8大类技术指标
  • 自适应特征选择机制
  • 分层标准化策略
  1. 工程化设计
  • 状态持久化(模型+预处理器)
  • 设备自适应(CUDA/MPS/CPU)
  • 全流程随机种子控制

注意事项

  1. 数据泄漏防范
  • 严格按时间序列划分数据集
  • 分组计算收益率(groupby ts_code)
  • 在线特征工程采用滞后窗口
  1. 参数过拟合风险
  • Optuna优化次数限制(n_trials=10)
  • 使用Walk-Forward验证
  • 利润目标与夏普率结合评估
  1. 计算资源考量
  • GPU显存管理(batch_size=128)
  • 数据批处理(window_size=5)
  • 特征维度压缩(SelectFromModel)

2. 序列图

Main DataProcessing Training Model Backtesting DualEMACrossover vbt.Portfolio load_data() initial_train() MultiScaleLSTM() Model Save incremental_update() Incremental training MultiScaleLSTM() Model Update BacktestStrategy() _optimize_parameters() generate_signals() signals, position_size from_signals() pf.stats() Main DataProcessing Training Model Backtesting DualEMACrossover vbt.Portfolio

3. 工程代码

目录结构:

data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_4_model.pth
├── vectorbt_4_preprocessors.pkl
src/
└── vectorbt_4/
    ├── data_processing.py
    ├── model_definition.py
    ├── training.py
    ├── backtesting.py
    ├── main.py
    └── __init__.py

3.1 data_processing.py

import pandas as pd


def load_data(ts_codes, data_path="./data"):
    """加载预处理后的股票数据

    :param ts_code: 股票代码(如["600000.SH", "600036.SH", "000001.SZ"])
    :param data_path: 数据存储路径
    :return: 合并后的DataFrame(含ts_code列标识股票)

    处理步骤:
    1. 读取parquet格式的本地数据
    2. 转换交易日期格式
    3. 计算次日收益率(目标变量)
    4. 删除缺失值
    """
    dfs = []
    for code in ts_codes:
        df = pd.read_parquet(f"{data_path}/processed_{code}.parquet")
        df["ts_code"] = code
        dfs.append(df)

    combined_df = pd.concat(dfs)
    combined_df["trade_date"] = pd.to_datetime(
        combined_df["trade_date"], format="%Y%m%d"
    )
    combined_df.set_index("trade_date", inplace=True)
    combined_df.sort_index(inplace=True)

    # 按股票分组计算收益率,避免跨股票计算,严格避免未来信息
    combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)[
        "close"
    ].apply(
        lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1)  # 添加收益率截断
    )
    combined_df.dropna(inplace=True)

    return combined_df

3.2 model_definition.py

import torch
import torch.nn as nn
import torch.nn.functional as F


class TemporalAttention(nn.Module):

    def __init__(self, hidden_dim):
        """时间注意力机制模块。

        :param hidden_dim: 隐藏层维度
        """
        super().__init__()
        # 定义线性变换W,用于计算注意力得分
        self.W = nn.Linear(hidden_dim, hidden_dim)
        # 定义线性变换V,用于将得分转换为权重
        self.V = nn.Linear(hidden_dim, 1)

    def forward(self, hidden):
        """前向传播函数。

        :param hidden: 输入隐藏状态 (batch, seq_len, hidden_dim)
        :return: 上下文向量 (batch, hidden_dim)
        """
        # 对hidden应用W变换,并使用tanh激活函数
        score = torch.tanh(self.W(hidden))  # (batch, seq_len, hidden_dim)
        # 计算每个时间步的注意力权重
        attention_weights = F.softmax(self.V(score), dim=1)  # (batch, seq_len, 1)
        # 加权求和得到上下文向量
        context = torch.sum(attention_weights * hidden, dim=1)  # (batch, hidden_dim)
        return context


class MultiScaleLSTM(nn.Module):

    def __init__(self, input_dim, hidden_dim=256, num_layers=3):
        """多尺度LSTM模型。

        :param input_dim: 输入特征维度
        :param hidden_dim: LSTM隐藏层维度,defaults to 256
        :param num_layers: LSTM层数,defaults to 3
        """
        super().__init__()
        # 短期LSTM配置
        self.lstm_short = nn.LSTM(
            input_dim,
            hidden_dim // 2,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True,
        )

        # 中期LSTM配置
        self.lstm_mid = nn.LSTM(
            input_dim,
            hidden_dim,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True,
        )

        # 短期特征的时间注意力机制
        self.temp_attn_short = TemporalAttention(hidden_dim)
        # 中期特征的时间注意力机制
        self.temp_attn_mid = TemporalAttention(hidden_dim * 2)

        # 动态计算融合后的维度
        self.fusion_dim = hidden_dim + hidden_dim * 2

        # 收益预测头
        self.return_head = nn.Sequential(
            nn.Linear(self.fusion_dim, 128),
            nn.LayerNorm(128),
            nn.GELU(),
            nn.Dropout(0.3),
            nn.Linear(128, 1),  # 输出维度为1
        )

    def forward(self, x):
        """前向传播函数。

        :param x: 输入数据 (batch, seq_len, input_dim)
        :return: 预测收益 (batch_size, 1)
        """
        # 短期特征处理
        out_short, _ = self.lstm_short(x)  # (batch, seq_len, hidden_dim)
        context_short = self.temp_attn_short(out_short)  # (batch, hidden_dim)

        # 中期特征处理(带降采样)
        x_mid = F.max_pool1d(x.transpose(1, 2), kernel_size=2).transpose(
            1, 2
        )  # (batch, seq_len/2, input_dim)
        out_mid, _ = self.lstm_mid(x_mid)  # (batch, seq_len/2, hidden_dim*2)
        context_mid = self.temp_attn_mid(out_mid)  # (batch, hidden_dim*2)

        # 特征融合
        combined = torch.cat(
            [context_short, context_mid], dim=-1
        )  # (batch, fusion_dim)

        # 单任务输出
        return self.return_head(combined)  # 输出形状 [batch_size, 1]

3.3 training.py

import os

import joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm

from vectorbt_4.model_definition import MultiScaleLSTM


class SingleWindowDataset(Dataset):
    def __init__(self, X, y):
        """单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。

        :param X: 滑动窗口特征数据
        :param y: 目标变量
        """
        self.X = X.astype(np.float32)  # 将滑动窗口特征数据转换为float32类型
        self.y = y.astype(np.float32)  # 将目标变量转换为float32类型

    def __len__(self):
        return len(self.y)  # 返回目标变量的长度

    def __getitem__(self, idx):
        x = torch.from_numpy(
            self.X[idx]
        ).float()  # 将滑动窗口特征数据转换为PyTorch张量
        label = torch.tensor(
            self.y[idx], dtype=torch.float32
        )  # 将目标变量转换为PyTorch张量
        return x, label  # 返回特征和标签


class TrainingStateManager:
    def __init__(self, model_dir="./models"):
        """训练状态管理器类,负责模型和预处理器的保存和加载。

        :param model_dir: 模型保存目录,defaults to "./models"
        """
        self.model_dir = model_dir  # 设置模型保存目录
        self.model_path = f"{self.model_dir}/vectorbt_4_model.pth"  # 设置模型文件路径
        self.preprocessors_path = (
            f"{self.model_dir}/vectorbt_4_preprocessors.pkl"  # 设置预处理器文件路径
        )
        os.makedirs(model_dir, exist_ok=True)  # 创建模型保存目录(如果不存在)

    def save(self, model, feature_engineer, feature_selector, config):
        """保存模型和预处理器。

        :param model: 训练好的模型
        :param feature_engineer: 特征工程对象
        :param feature_selector: 特征选择器
        :param config: 模型配置
        """
        torch.save(model.state_dict(), self.model_path)  # 保存模型参数
        joblib.dump(
            {
                "feature_engineer": feature_engineer,
                "feature_selector": feature_selector,
                "config": config,
            },
            self.preprocessors_path,
        )  # 保存预处理器和配置
        print(f"Model saved to {self.model_path}")  # 打印模型保存路径
        print(
            f"Preprocessor saved to {self.preprocessors_path}"
        )  # 打印预处理器保存路径

    def load(self, device):
        """加载模型和预处理器。

        :param device: 计算设备(CPU/GPU)
        :return: 加载的模型、特征工程对象、特征选择器和模型配置
        """
        model = None  # 初始化模型
        feature_engineer = None  # 初始化特征工程对象
        feature_selector = None  # 初始化特征选择器
        config = None  # 初始化配置

        if os.path.exists(self.preprocessors_path):  # 如果预处理器文件存在
            preprocess = joblib.load(self.preprocessors_path)  # 加载预处理器
            feature_engineer = preprocess["feature_engineer"]  # 获取特征工程对象
            feature_selector = preprocess["feature_selector"]  # 获取特征选择器
            config = preprocess["config"]  # 获取配置

        if os.path.exists(self.model_path):  # 如果模型文件存在
            model_weights = torch.load(
                self.model_path, weights_only=False, map_location=device
            )  # 加载模型参数
            model = MultiScaleLSTM(
                input_dim=config["input_dim"], hidden_dim=config["hidden_dim"]
            ).to(
                device
            )  # 初始化模型并移动到指定设备
            model.load_state_dict(model_weights)  # 加载模型参数

        return (
            model,
            feature_engineer,
            feature_selector,
            config,
        )  # 返回加载的模型、特征工程对象、特征选择器和配置


class OnlineFeatureEngineer:
    def __init__(self, windows=[5]):
        """在线特征工程生成器类,用于生成技术指标特征并进行标准化。

        :param windows: 滑动窗口列表(用于特征生成),defaults to [5]
        """
        self.windows = windows  # 设置滑动窗口列表
        self.n_features = 10  # 设置特征数量
        self.scalers = {
            "Trend": RobustScaler(),  # 趋势类指标使用RobustScaler
            "Momentum": MinMaxScaler(),  # 动量类指标使用MinMaxScaler
            "Volatility": RobustScaler(),  # 波动率类指标使用RobustScaler
            "Volume": StandardScaler(),  # 成交量类指标使用StandardScaler
            # "Sentiment": StandardScaler(),  # 市场情绪类指标使用StandardScaler
            "SupportResistance": MinMaxScaler(),  # 支撑阻力类指标使用MinMaxScaler
            "Statistical": StandardScaler(),  # 统计类指标使用StandardScaler
            "Composite": RobustScaler(),  # 复合型指标使用RobustScaler
        }
        # "price": ["open", "high", "low", "close"],
        self.feature_groups = {
            # Trend-Following Indicators 趋势类指标
            "Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],
            # Momentum Indicators 动量类指标
            "Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],
            # Volatility Indicators 波动率类指标
            "Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],
            # Volume Indicators 成交量类指标
            "Volume": ["OBV", "MFI"],
            # Market Sentiment Indicators 市场情绪类指标 (需要外部数据)
            # "Sentiment": [],
            # Support/Resistance Indicators 支撑阻力类指标
            "SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],
            # Statistical Indicators 统计类指标
            "Statistical": ["LR_slope", "LR_angle"],
            # Composite Indicators 复合型指标
            "Composite": [
                "Ichimoku_tenkan",
                "Ichimoku_kijun",
                "Ichimoku_senkou_a",
                "Ichimoku_senkou_b",
                "Ichimoku_chikou",
            ],
        }
        self.all_features = [
            f for sublist in self.feature_groups.values() for f in sublist
        ]  # 生成所有特征列表

    def partial_fit(self, new_df):
        """对新数据进行部分拟合。

        :param new_df: 新数据DataFrame
        """
        new_features = self.generate_features(new_df, refit=True)  # 生成新数据的特征
        for group, features in self.feature_groups.items():  # 遍历每个特征组
            if hasattr(self.scalers[group], "partial_fit"):  # 如果该缩放器支持部分拟合
                self.scalers[group].partial_fit(
                    new_features[features]
                )  # 对新数据进行部分拟合

    def generate_features(self, df, refit=False):
        """生成技术指标特征。

        :param df: 原始数据DataFrame
        :param refit: 是否重新拟合标准化器,defaults to False
        :return: 特征DataFrame

        生成8大类技术指标:
        1. 趋势类指标(MA, MACD等)
        2. 动量类指标(RSI, KDJ等)
        3. 波动率指标(布林带, ATR等)
        4. 成交量指标(OBV, MFI等)
        5. 市场情绪类指标 (需要外部数据) -- 忽略
        6. 支撑阻力指标(斐波那契回撤等)
        7. 统计指标(线性回归斜率等)
        8. 复合指标(Ichimoku云图等)
        """

        processed = []  # 初始化处理后的特征列表
        for group, features in self.feature_groups.items():  # 遍历每个特征组
            scaler = self.scalers[group]  # 获取对应的缩放器
            if not refit:  # 如果不重新拟合
                scaler.fit(df[features])  # 拟合缩放器
            scaled = scaler.transform(df[features])  # 标准化特征
            processed.append(scaled)  # 添加到处理后的特征列表

        processed_df = pd.DataFrame(
            np.hstack(processed), index=df.index, columns=self.all_features
        )  # 将处理后的特征合并为DataFrame

        processed_df["ts_code"] = df["ts_code"]  # 添加股票代码
        processed_df["returns"] = df["returns"]  # 添加收益

        return processed_df.dropna()  # 删除缺失值

    def feature_selection(self, X, y):
        """进行特征选择。

        :param X: 特征数据
        :param y: 目标变量
        :return: 选择后的特征数据
        """
        selector = RandomForestRegressor(
            n_estimators=50, n_jobs=-1
        )  # 初始化随机森林回归器
        selector.fit(X, y)  # 拟合随机森林回归器
        feature_selector = SelectFromModel(selector, prefit=True)  # 初始化特征选择器
        return feature_selector  # 返回特征选择器


class IncrementalDataHandler:
    def __init__(self, feature_engineer, feature_selector, window_size=5):
        """增量数据处理器类,用于处理增量数据并生成滑动窗口数据。

        :param feature_engineer: 特征工程对象
        :param feature_selector: 特征选择器
        :param window_size: 滑动窗口大小,defaults to 5
        """
        self.feature_engineer = feature_engineer  # 设置特征工程对象
        self.feature_selector = feature_selector  # 设置特征选择器
        self.window_size = window_size  # 设置滑动窗口大小
        self.buffer = pd.DataFrame()  # 初始化数据缓冲区

    def update_buffer(self, new_df):
        """更新数据缓冲区。

        :param new_df: 新数据DataFrame
        """
        self.buffer = pd.concat(
            [self.buffer, new_df]
        ).sort_index()  # 更新数据缓冲区并排序

    def prepare_incremental_data(self):
        """准备增量数据。

        :return: 训练数据和测试数据
        """
        processed_df = self.feature_engineer.generate_features(self.buffer)  # 生成特征
        X_selected = self.feature_selector.transform(
            processed_df[self.feature_engineer.all_features].values
        )  # 选择特征
        X, y = self.sliding_window(processed_df, X_selected)  # 生成滑动窗口数据

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, shuffle=False
        )  # 划分训练集和测试集
        return (X_train, y_train), (X_test, y_test)  # 返回训练集和测试集

    def sliding_window(self, df, features):
        """生成时序滑动窗口数据"""
        sequences = []  # 初始化序列列表
        labels = []  # 初始化标签列表
        for ts_code, group in df.groupby("ts_code"):  # 按股票代码分组
            group_features = features[df["ts_code"] == ts_code]  # 获取该股票的特征
            for i in range(self.window_size, len(group)):  # 生成滑动窗口
                sequences.append(
                    group_features[i - self.window_size : i]
                )  # 添加滑动窗口特征
                labels.append(group["returns"].iloc[i])  # 添加标签
        return np.array(sequences, dtype=np.float32), np.array(
            labels, dtype=np.float32
        )  # 返回滑动窗口特征和标签


class IncrementalTrainer:
    def __init__(self, device, window_size=5):
        """增量训练控制器类,负责模型的初始训练和增量更新。

        :param device: 计算设备(CPU/GPU)
        """
        self.device = device  # 设置计算设备
        self.window_size = window_size  # 设置滑动窗口大小
        self.state_manager = TrainingStateManager()  # 初始化训练状态管理器
        self.model = None  # 初始化模型
        self.feature_engineer = None  # 初始化特征工程对象
        self.feature_selector = None  # 初始化特征选择器
        self.config = None  # 初始化配置
        self.load_state()  # 加载状态

    def load_state(self):
        """加载模型和预处理器。"""
        (
            self.model,
            self.feature_engineer,
            self.feature_selector,
            self.config,
        ) = self.state_manager.load(
            self.device
        )  # 加载模型、特征工程对象、特征选择器和配置

    def initial_train(self, df):
        """初始训练模型。

        :param df: 原始数据DataFrame
        """

        # 特征生成
        self.feature_engineer = OnlineFeatureEngineer(
            windows=[self.window_size]
        )  # 初始化特征工程对象
        processed_df = self.feature_engineer.generate_features(df)  # 生成特征

        # 特征选择
        X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(
            processed_df.index
        )  # 准备特征和标签
        self.feature_selector = self.feature_engineer.feature_selection(
            X, y
        )  # 选择特征

        # 准备训练数据
        data_handler = IncrementalDataHandler(
            feature_engineer=self.feature_engineer,
            feature_selector=self.feature_selector,
            window_size=self.window_size,
        )  # 初始化增量数据处理器
        data_handler.buffer = df  # 更新数据缓冲区

        (X_train, y_train), (X_test, y_test) = (
            data_handler.prepare_incremental_data()
        )  # 准备训练数据

        # Optuna超参优化
        self._optimize_parameters(X_train, y_train, X_test, y_test)  # 优化超参数

        # 最佳模型训练
        print(f"Initial Model Config: {self.config}")  # 打印初始模型配置
        self._train(X_train, y_train)  # 训练模型

        print("Initial Model Evaluation:")  # 打印初始模型评估
        self._evaluate(X_test, y_test)  # 评估模型

        print("Initial Model Save:")  # 打印初始模型保存
        self.state_manager.save(
            self.model, self.feature_engineer, self.feature_selector, self.config
        )  # 保存模型

    def incremental_update(self, new_df):
        """增量更新模型。

        :param new_df: 新数据DataFrame
        """
        if not self.model:
            print(
                "The model does not exist, please train it first."
            )  # 如果模型不存在,提示先训练模型
            return

        self.feature_engineer.partial_fit(new_df)  # 对新数据进行部分拟合

        data_handler = IncrementalDataHandler(
            self.feature_engineer, self.feature_selector
        )  # 初始化增量数据处理器
        data_handler.update_buffer(new_df)  # 更新数据缓冲区

        (X_train, y_train), (X_test, y_test) = (
            data_handler.prepare_incremental_data()
        )  # 准备增量数据

        print(f"Incremental Model Config: {self.config}")  # 打印增量模型配置

        self._train(X_train, y_train)  # 训练模型

        print("\nIncremental Model Evaluation:")  # 打印增量模型评估
        self._evaluate(X_test, y_test)  # 评估模型

        print("\nIncremental Model Save:")  # 打印增量模型保存
        self.state_manager.save(
            self.model, self.feature_engineer, self.feature_selector, self.config
        )  # 保存模型

    def _optimize_parameters(self, X_train, y_train, X_test, y_test):
        """Optuna超参优化

        :param X_train: 训练集滑动窗口特征数据
        :param y_train: 训练集目标变量
        :param X_test: 测试集滑动窗口特征数据
        :param y_test: 测试集目标变量
        """

        def objective(trial):
            self.config = {
                "hidden_dim": trial.suggest_int(
                    "hidden_dim", 64, 256
                ),  # 建议隐藏层维度
                "lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),  # 建议学习率
                "batch_size": trial.suggest_categorical(
                    "batch_size", [32, 64, 128]
                ),  # 建议批量大小
                "weight_decay": trial.suggest_float(
                    "weight_decay", 1e-6, 1e-4
                ),  # 建议权重衰减
                "input_dim": X_train[0].shape[-1],  # 输入特征维度
                "epochs": 100,  # 训练轮数
                "window_size": self.window_size,  # 滑动窗口大小
            }
            self._train(X_train, y_train)  # 训练模型
            val_loss = self._evaluate(X_test, y_test)  # 评估模型
            print(f"Val Loss: {val_loss:.4f}")  # 打印验证损失
            return val_loss  # 返回验证损失

        # 超参优化
        study = optuna.create_study(direction="minimize")  # 创建研究
        study.optimize(objective, n_trials=1, show_progress_bar=True)  # 优化目标函数
        print(f"Training Best params: {study.best_params}")  # 打印最佳参数
        # 最佳模型参数
        self.config.update(study.best_params)  # 更新配置

    def _train(self, X_train, y_train):
        """模型训练内部方法。

        :param X_train: 训练集滑动窗口特征数据
        :param y_train: 训练集目标变量
        """
        print(f"Training Model Config: {self.config}")  # 打印模型配置
        lr = self.config.get("lr", 1e-4)  # 获取学习率
        epochs = self.config.get("epochs", 100)  # 获取训练轮数
        batch_size = self.config.get("batch_size", 128)  # 获取批量大小
        weight_decay = self.config.get("weight_decay", 1e-6)  # 获取权重衰减
        hidden_dim = self.config.get("hidden_dim", 128)  # 获取隐藏层维度
        input_dim = self.config.get(
            "input_dim", X_train[0].shape[-1]
        )  # 获取输入特征维度

        dataset = SingleWindowDataset(X_train, y_train)  # 初始化数据集
        loader = DataLoader(
            dataset,
            batch_size=batch_size,
            shuffle=False,
        )  # 初始化数据加载器

        # 初始化模型
        self.model = MultiScaleLSTM(input_dim=input_dim, hidden_dim=hidden_dim).to(
            self.device
        )  # 初始化模型并移动到指定设备

        optimizer = optim.AdamW(
            self.model.parameters(),
            lr=lr,
            weight_decay=weight_decay,
        )  # 初始化优化器
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, "min", patience=5
        )  # 初始化学习率调度器
        criterion = nn.HuberLoss()  # 初始化损失函数

        # 训练循环
        for epoch in tqdm(range(epochs), desc="Training"):  # 进行训练
            self.model.train()  # 设置模型为训练模式
            total_loss = 0  # 初始化总损失
            for X_batch, y_batch in loader:  # 遍历数据加载器
                X_batch = X_batch.to(self.device)  # 将特征数据移动到指定设备
                y_batch = y_batch.to(self.device).unsqueeze(
                    1
                )  # 将标签数据移动到指定设备并扩展维度

                optimizer.zero_grad()  # 清零梯度
                preds = self.model(X_batch)  # 前向传播
                loss = criterion(preds, y_batch)  # 计算损失
                loss.backward()  # 反向传播
                nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)  # 梯度裁剪
                optimizer.step()  # 更新参数
                total_loss += loss.item()  # 累加损失

            # 学习率调整
            avg_loss = total_loss / len(loader)  # 计算平均损失
            scheduler.step(avg_loss)  # 更新学习率

    def _evaluate(self, X_test, y_test):
        """模型评估内部方法。

        :param X_test: 测试集滑动窗口特征数据
        :param y_test: 测试集目标变量
        :return: 平均损失
        """
        test_dataset = SingleWindowDataset(X_test, y_test)  # 初始化测试数据集
        test_loader = DataLoader(
            test_dataset,
            batch_size=128,
            shuffle=False,
        )  # 初始化测试数据加载器
        self.model.eval()  # 设置模型为评估模式
        total_loss = 0  # 初始化总损失
        criterion = nn.HuberLoss()  # 初始化损失函数
        with torch.no_grad():  # 关闭梯度计算
            for X_batch, y_batch in test_loader:  # 遍历测试数据加载器
                X_batch = X_batch.to(self.device)  # 将特征数据移动到指定设备
                y_batch = y_batch.to(self.device).unsqueeze(
                    1
                )  # 将标签数据移动到指定设备并扩展维度
                preds = self.model(X_batch)  # 前向传播
                loss = criterion(preds, y_batch)  # 计算损失
                total_loss += loss.item() * len(y_batch)  # 累加损失

        test_loss = total_loss / len(test_dataset)  # 计算平均损失
        print(f"Test Loss: {test_loss}")  # 打印测试损失
        return test_loss  # 返回测试损失

3.4 backtesting.py

import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbt


class DualEMACrossoverStrategy:

    def __init__(self, pred_returns, volatility, params):
        """双EMA交叉策略类。

        :param pred_returns: 预测的收益率序列
        :param volatility: 波动率序列(用于仓位计算)
        :param params: 参数字典,包含快慢EMA的时间跨度
        """
        self.pred_returns = pred_returns  # 预测的收益率序列
        self.volatility = volatility.clip(lower=0.01)  # 防止波动率为0,导致除零错误
        self.fast_span = params["fast_span"]  # 快速EMA的时间跨度
        self.slow_span = params["slow_span"]  # 慢速EMA的时间跨度

    def generate_signals(self):
        """生成交易信号。

        :return: (交易信号, 仓位大小)
        """
        ema_fast = self.pred_returns.ewm(
            span=self.fast_span, min_periods=self.fast_span
        ).mean()  # 计算快速EMA
        ema_slow = self.pred_returns.ewm(
            span=self.slow_span, min_periods=self.slow_span
        ).mean()  # 计算慢速EMA
        signals = pd.Series(0, index=self.pred_returns.index)  # 初始化信号序列
        signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (
            1  # 买入信号
        )
        signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (
            -1
        )  # 卖出信号

        # 使用tanh函数压缩仓位大小,实现非线性映射
        position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(
            0.3, 0.8
        )

        return signals, position_size  # 返回信号和仓位大小


class BacktestStrategy:

    def __init__(self, model, device):
        """回测执行引擎。

        :param model: 训练好的预测模型
        :param device: 计算设备(CPU/GPU)
        """
        self.model = model  # 训练好的预测模型
        self.device = device  # 计算设备
        self._configure_vbt()  # 配置VectorBT全局参数

    def _configure_vbt(self):
        """配置VectorBT全局参数。"""
        vbt.settings.array_wrapper["freq"] = "D"  # 设置时间频率为日频
        vbt.settings.plotting["layout"]["template"] = "vbt_dark"  # 使用暗色主题
        vbt.settings.plotting["layout"]["width"] = 1200  # 设置图表宽度
        vbt.settings.portfolio["init_cash"] = 100000.0  # 初始资金10万元
        vbt.settings.portfolio["fees"] = 0.0025  # 交易成本(手续费)0.25%
        vbt.settings.portfolio["slippage"] = 0.0025  # 交易成本(滑点)0.25%

    def _optimize_parameters(self, result_df):
        """优化参数逻辑调整。

        :param result_df: 包含预测收益率的结果DataFrame
        :return: 最优参数
        """

        def objective(trial):
            params = {
                "fast_span": trial.suggest_int(
                    "fast_span", 10, 30
                ),  # 建议快速EMA的时间跨度
                "slow_span": trial.suggest_int(
                    "slow_span", 50, 100
                ),  # 建议慢速EMA的时间跨度
            }

            strategy = DualEMACrossoverStrategy(
                pred_returns=result_df["pred_returns"],
                volatility=result_df["volatility"],
                params=params,
            )  # 创建策略实例
            signals, position_size = strategy.generate_signals()  # 生成交易信号

            pf = vbt.Portfolio.from_signals(
                close=result_df["close"],
                entries=signals.shift(1) == 1,  # 买入信号
                exits=signals.shift(1) == -1,  # 卖出信号
                size=position_size,  # 固定仓位
                freq="D",
            )

            return pf.total_profit()  # 返回总利润

        study = optuna.create_study(direction="maximize")  # 创建Optuna研究
        study.optimize(objective, n_trials=10, show_progress_bar=True)  # 优化参数
        print(f"Strategy Best params: {study.best_params}")  # 打印最优参数
        return study.best_params  # 返回最优参数

    def run(self, test_data, df):
        """执行完整回测流程。

        :param test_data: 测试数据集元组(X_test, y_test)
        :param df: 原始数据DataFrame
        :return: (组合对象, 结果DataFrame)
        """
        X_test = test_data  # 测试数据
        self.model.eval()  # 将模型设置为评估模式
        with torch.no_grad():  # 禁用梯度计算
            test_tensor = torch.FloatTensor(X_test).to(
                self.device
            )  # 转换为Tensor并移动到指定设备
            preds = (
                self.model(test_tensor).cpu().numpy().flatten()
            )  # 获取预测值并转换为NumPy数组
        test_dates = df.index[-len(preds) :]  # 获取测试日期
        result_df = pd.DataFrame(
            {
                "close": df["close"].values[-len(preds) :],
                "pred_returns": preds,  # 使用rolling窗口计算动态波动率
                "volatility": df["ATR"].values[-len(preds) :]
                / df["close"].values[-len(preds) :],
            },
            index=test_dates,
        )  # 创建结果DataFrame
        best_params = self._optimize_parameters(result_df)  # 运行参数优化
        strategy = DualEMACrossoverStrategy(
            pred_returns=result_df["pred_returns"],
            volatility=result_df["volatility"],
            params=best_params,
        )  # 创建策略实例
        signals, position_size = strategy.generate_signals()  # 生成交易信号
        return vbt.Portfolio.from_signals(
            close=result_df["close"],
            entries=signals == 1,  # 买入信号
            exits=signals == -1,  # 卖出信号
            size=position_size,
            size_type="percent",
            freq="D",
        )  # 执行组合回测

3.5 main.py

import random

import numpy as np
import torch

from vectorbt_4.backtesting import BacktestStrategy
from vectorbt_4.data_processing import load_data
from vectorbt_4.training import IncrementalTrainer, TrainingStateManager


def set_random_seed(seed=42):
    """设置全局随机种子

    :param seed: 随机种子, defaults to 42

    影响范围:
    - Python内置随机模块
    - Numpy随机数生成
    - PyTorch CPU/CUDA随机种子
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 如果使用多个GPU
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False


def prepare_backtest_data(ts_code, device):
    """准备回测数据。

    :param ts_code: 股票代码
    :param device: 计算设备(CPU/GPU)
    :return: 滑动窗口特征数据、原始数据DataFrame、模型、特征工程对象、特征选择器和配置
    """
    state_manager = TrainingStateManager()  # 初始化训练状态管理器
    model, feature_engineer, feature_selector, config = state_manager.load(
        device
    )  # 加载模型和预处理器

    print(f"Model Config: {config}")  # 打印模型配置

    # 加载并处理数据
    test_df = load_data([ts_code])  # 加载数据
    test_df = test_df[-300:]  # 取最近300条数据
    processed_df = feature_engineer.generate_features(test_df)  # 生成特征
    X_selected = feature_selector.transform(
        processed_df[feature_engineer.all_features].values
    )  # 选择特征

    # 构建滑动窗口
    window_size = config["window_size"]  # 获取滑动窗口大小
    sequences = []  # 初始化序列列表
    for i in range(window_size, len(X_selected)):  # 遍历数据
        sequences.append(X_selected[i - window_size : i])  # 添加滑动窗口特征

    return (
        np.array(sequences, dtype=np.float32),  # 返回滑动窗口特征数据
        test_df,  # 返回原始数据DataFrame
        model,  # 返回模型
        feature_engineer,  # 返回特征工程对象
        feature_selector,  # 返回特征选择器
        config,  # 返回配置
    )


if __name__ == "__main__":
    # 设置随机种子
    # 函数确保了整个训练过程的可重复性。
    # 通过设置相同的随机种子,可以保证每次运行时生成的随机数序列一致,这对于调试和实验验证非常重要。
    set_random_seed()

    # 检测可用计算设备(优先使用CUDA)
    device = torch.device(
        "cuda"
        if torch.cuda.is_available()
        else "mps" if torch.backends.mps.is_available() else "cpu"
    )

    # 股票行情
    # start_date = "20100101"
    # end_date = "20241231"

    trainer = IncrementalTrainer(device)

    # 多股票初始训练示例
    # 浦发银行(600000.SH)
    # 招商银行(600036.SH)
    # 平安银行(000001.SZ)
    train_codes = ["600000.SH", "600036.SH", "000001.SZ"]
    train_df = load_data(train_codes)
    model = trainer.initial_train(train_df)

    # 增量训练示例
    # 贵州茅台(600519.SH)
    # new_df = load_data(["600519.SH"])
    # model = trainer.incremental_update(new_df)

    # 单股票回测
    (test_data, test_df, model, feature_engineer, feature_selector, config) = (
        prepare_backtest_data("600519.SH", device)
    )

    backtester = BacktestStrategy(model, device)
    pf = backtester.run(test_data, test_df)

    print("回测结果统计:")
    print(pf.stats())
    pf.plot().show()

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。


网站公告

今日签到

点亮在社区的每一天
去签到