VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一

发布于:2025-03-24 ⋅ 阅读:(27) ⋅ 点赞:(0)

VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一

本文介绍了如何使用PyTorch和LSTM模型进行股票数据的训练和回测。涵盖了数据预处理、特征选择、LSTM模型构建、模型训练与验证、动态阈值策略生成交易信号以及使用VectorBT进行回测和绩效分析。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

Backtest Strategy

本文是 🚀 VectorBT:Python量化交易策略开发与回测评估详解 🔥的进阶指南,推荐先阅读了解基础知识‼️

1. 知识点总结

本文将介绍如何使用PyTorch+LSTM模型结合VectorBT来训练和回测股票数据。

将涵盖以下知识点

  • 数据预处理
  • 特征选择
  • LSTM模型构建
  • 模型训练与验证
  • 动态阈值策略
  • 回测与绩效分析

2. 关键步骤讲解

2.1 数据预处理

数据预处理是机器学习中的关键步骤,包括读取数据、处理缺失值、标准化等。

  • Pandas:用于数据处理和操作。
  • MinMaxScaler:用于数据标准化,将特征缩放到0到1之间。

2.2 特征选择

特征选择是为了减少特征维度,提高模型性能。这里使用互信息法进行特征选择。

  • mutual_info_regression:计算特征与目标变量之间的互信息。

2.3 LSTM模型构建

LSTM(长短期记忆网络)是一种特殊的RNN,适用于时间序列预测。

  • EnhancedLSTM:自定义的LSTM模型,包含注意力机制。
  • nn.LSTM:PyTorch中的LSTM层。
  • nn.Sequential:用于构建神经网络层的顺序容器。

2.4 模型训练与验证

  • AdamW优化器:一种基于梯度下降的优化算法。
  • OneCycleLR调度器:动态调整学习率。
  • HuberLoss损失函数:对异常值不敏感的损失函数。

2.5 动态阈值策略

动态阈值策略根据历史数据生成交易信号。

  • AdaptiveStrategy:根据预测收益率和波动率生成交易信号。

2.6 回测与绩效分析

  • VectorBT:用于回测和绩效分析的库。
  • Portfolio.from_signals:根据交易信号构建投资组合。

3. 代码实现

3.1 环境设置

import torch
import vectorbt as vbt

print(f"PyTorch版本: {torch.__version__}")
print(f"VectorBT版本: {vbt.__version__}")

vbt.settings.array_wrapper["freq"] = "D"
vbt.settings.plotting["layout"]["template"] = "vbt_dark"
vbt.settings.plotting["layout"]["width"] = 1200
vbt.settings.portfolio["init_cash"] = 100000.0  # 100000 CNY
vbt.settings.portfolio["fees"] = 0.0025  # 0.25%
vbt.settings.portfolio["slippage"] = 0.0025  # 0.25%

device = torch.device(
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.backends.mps.is_available() else "cpu"
)

3.2 数据准备

import pandas as pd

# 股票代码
# 贵州茅台(600519.SH)
ts_code = "600519.SH"

# 读取Parquet文件
df = pd.read_parquet(f"./data/processed_{ts_code}.parquet")

df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
df.set_index("trade_date", inplace=True)

df.dropna(inplace=True)

print(df.head())
print(df.shape)

3.3 数据预处理

  • 特征筛选
  • 目标变量
  • 平稳性检验
  • 时序分割
  • 时序分割
  • 标准化
  • 创建时间序列数据
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import mutual_info_regression
from statsmodels.tsa.stattools import adfuller
from tqdm.auto import tqdm

# 滑动窗口生成
seq_length = 30


def prepare_data(df):
    # 特征工程(示例)
    features = [
        "open",
        "high",
        "low",
        "vol",
        "close",
        "ma5",
        "ma10",
        "ma20",
        "rsi",
        "macd",
        "macdsignal",
        "macdhist",
        "bb_upper",
        "bb_middle",
        "bb_lower",
        "momentum",
        "roc",
        "atr",
        "obv",
    ]

    # 特征筛选(互信息法)
    target = df["close"].pct_change().shift(-1).dropna()
    selected_features = []
    for feature in features:
        mi = mutual_info_regression(
            df.iloc[:-1][feature].values.reshape(-1, 1), target.iloc[: len(df)].values
        )
        if mi > 0.05:
            selected_features.append(feature)

    if len(selected_features) > 0:
        features = selected_features
    print(f"特征维度: {len(features)}列: {features}")

    # 目标变量转换为收益率
    df["returns"] = df["close"].pct_change().shift(-1)  # 下一日收益率
    df.dropna(inplace=True)

    # 平稳性检验
    adf_result = adfuller(df["returns"].dropna())
    if adf_result[1] > 0.05:
        df["returns"] = df["returns"].diff().fillna(0)  # 一阶差分

    # 时序分割
    split_ratios = (0.6, 0.2, 0.2)  # train/val/test
    splits = np.cumsum(split_ratios) * len(df)
    splits = splits.astype(int)
    train_df = df.iloc[: splits[0]]
    val_df = df.iloc[splits[0] : splits[1]]
    test_df = df.iloc[splits[1] :]

    # 标准化
    X_scaler = MinMaxScaler().fit(train_df[features])
    y_scaler = MinMaxScaler().fit(train_df[["returns"]])

    # 创建时间序列数据
    def create_sequences(data, target):
        X, y = [], []
        for i in range(len(data) - seq_length):
            X.append(data[i : i + seq_length])  # 滑动窗口
            y.append(target[i + seq_length - 1])  # 预测下一个时间步
        return np.array(X), np.array(y)

    X_train, y_train = create_sequences(
        X_scaler.transform(train_df[features]),
        y_scaler.transform(train_df[["returns"]]),
    )
    X_val, y_val = create_sequences(
        X_scaler.transform(val_df[features]), y_scaler.transform(val_df[["returns"]])
    )
    X_test, y_test = create_sequences(
        X_scaler.transform(test_df[features]), y_scaler.transform(test_df[["returns"]])
    )

    print(f"训练集维度: X{X_train.shape} y{y_train.shape}")
    print(f"验证集维度: X{X_val.shape} y{y_val.shape}")
    print(f"测试集维度: X{X_test.shape} y{y_test.shape}")
    print(f"总样本数: {len(X_train)+len(X_val)+len(X_test)}")

    return (
        (X_train, y_train),
        (X_val, y_val),
        (X_test, y_test),
        X_scaler,
        y_scaler,
        test_df,
    )

3.4 LSTM模型定义

class EnhancedLSTM(nn.Module):

    def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.3):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
        )
        self.attention = nn.Sequential(
            nn.Linear(hidden_dim, 16), nn.Tanh(), nn.Linear(16, 1), nn.Softmax(dim=1)
        )
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim, 32), nn.ReLU(), nn.Dropout(dropout), nn.Linear(32, 1)
        )

    def forward(self, x):
        out, _ = self.lstm(x)  # [batch, seq_len, hidden]
        attn_weights = self.attention(out)  # [batch, seq_len, 1]
        context = torch.sum(attn_weights * out, dim=1)  # [batch, hidden]
        return self.fc(context)

3.5 动态阈值策略

class AdaptiveStrategy:
    def __init__(self, pred_returns, volatility, params):
        self.pred_returns = pred_returns
        self.volatility = volatility
        self.params = params

    def generate_signals(self):
        # 动态阈值
        lookback = self.params["threshold_lookback"]
        upper_q = self.pred_returns.rolling(lookback).quantile(0.7)
        lower_q = self.pred_returns.rolling(lookback).quantile(0.3)

        # 波动率调整仓位(凯利公式变体)
        position_size = 0.5 * self.pred_returns.abs() / (self.volatility + 1e-6)
        position_size = position_size.clip(0.1, 0.8)

        signals = pd.Series(0, index=self.pred_returns.index)
        long_signals = (self.pred_returns > upper_q) & (position_size > 0.15)
        exit_signals = self.pred_returns < lower_q

        signals[long_signals] = 1
        signals[exit_signals] = -1

        return signals, position_size

3.6 模型训练和评估

# 模型训练
def train_model(config, train_data, val_data):
    X_train, y_train = train_data

    input_dim = X_train.shape[-1]
    hidden_dim = config["hidden_dim"]
    num_layers = config["num_layers"]
    dropout = config["dropout"]
    batch_size = config["batch_size"]
    lr = config["lr"]
    weight_decay = config["weight_decay"]
    epochs = config["epochs"]

    # 初始化模型
    model = EnhancedLSTM(input_dim, hidden_dim, num_layers, dropout).to(device)

    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=config["lr"],
        total_steps=config["epochs"] * len(X_train) // config["batch_size"],
    )
    criterion = nn.HuberLoss()

    train_loader = DataLoader(
        TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),
        batch_size=config["batch_size"],
        shuffle=True,
        drop_last=True,
    )

    # 训练循环
    best_loss = float("inf")
    early_stop_counter = 0
    for epoch in tqdm(range(epochs), desc="Training"):
        model.train()
        train_loss = 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            preds = model(X_batch)
            loss = criterion(preds, y_batch)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()
            train_loss += loss.item()

        # 验证评估
        val_loss = evaluate_model(model, val_data)

        if (epoch + 1) % 10 == 0:
            print(
                f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
            )

        # 早停机制
        if val_loss < best_loss:
            best_loss = val_loss
            early_stop_counter = 0
        else:
            early_stop_counter += 1
            if early_stop_counter >= 15:
                break

    # 训练完成后保存
    model_path = "./models/pytorch_lstm_model.pth"
    torch.save(
        {
            "model_state_dict": model.state_dict(),
            "input_dim": input_dim,
            "hidden_dim": hidden_dim,
            "num_layers": num_layers,
            "dropout": dropout,
            "batch_size": batch_size,
            "lr": lr,
            "weight_decay": weight_decay,
            "epochs": epochs,
        },
        model_path,
    )
    print(f"PyTorch LSTM model and parameters saved to {model_path}")

    return model

# 模型评估
def evaluate_model(model, val_data):
    X_val, y_val = val_data
    val_loader = DataLoader(
        TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)),
        batch_size=256,
        shuffle=False,
    )
    model.eval()
    total_loss = 0
    criterion = nn.HuberLoss()
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            preds = model(X_batch)
            loss = criterion(preds, y_batch)
            total_loss += loss.item()
    return total_loss / len(val_loader)

3.6 回测引擎

def backtest_strategy(model, test_data, X_scaler, y_scaler, test_df):
    X_test, y_test = test_data
    # 生成预测
    model.eval()
    with torch.no_grad():
        test_tensor = torch.FloatTensor(X_test).to(device)
        preds = model(test_tensor).cpu().numpy()

    # 反归一化
    pred_returns = y_scaler.inverse_transform(preds.reshape(-1, 1)).flatten()

    # 对齐时间索引
    test_dates = test_df.index[seq_length:-1]
    df = pd.DataFrame(
        {
            "close": test_df["close"].iloc[seq_length:-1],
            "pred_returns": pred_returns[:-1],
            "volatility": test_df["atr"].iloc[seq_length:-1]
            / test_df["close"].iloc[seq_length:-1],
        },
        index=test_dates,
    )

    # 生成信号
    strategy = AdaptiveStrategy(
        pred_returns=df["pred_returns"],
        volatility=df["volatility"],
        params={"threshold_lookback": 60},
    )
    signals, position_size = strategy.generate_signals()

    # 输入校验
    assert len(df["close"]) == len(signals), "数据长度不一致"
    assert signals.isin([-1, 0, 1]).all(), "信号包含非法值"

    # 构建投资组合
    pf = vbt.Portfolio.from_signals(
        close=df["close"],
        size=np.abs(position_size),  # 明确方向控制
        size_type="percent",
        entries=signals == 1,
        exits=signals == -1,
        freq="D",
        # 增强参数
        accumulate=False,  # 禁止累积仓位
        log=True,  # 记录交易日志
        call_seq="auto",  # 自动处理订单顺序
    )

    return pf, df

3.7 主程序运行

# 数据准备
train_data, val_data, test_data, X_scaler, y_scaler, test_df = prepare_data(df)

# Optuna超参优化
def objective(trial):
    config = {
        "hidden_dim": trial.suggest_int("hidden_dim", 64, 256),
        "num_layers": trial.suggest_int("num_layers", 1, 3),
        "dropout": trial.suggest_float("dropout", 0.1, 0.5),
        "batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),
        "lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),
        "weight_decay": trial.suggest_float("weight_decay", 1e-6, 1e-4),
        "epochs": 100,
    }
    model = train_model(config, train_data, val_data)
    val_loss = evaluate_model(model, val_data)
    return val_loss


# 超参优化
study = optuna.create_study(direction="minimize")
study.optimize(
    objective, n_trials=10, show_progress_bar=True, timeout=3600
)  # 1小时超时

# 加载最佳模型
checkpoint = torch.load("./models/pytorch_lstm_model.pth", map_location=device)

best_model = EnhancedLSTM(
    input_dim=train_data[0].shape[-1],
    **{
        k: v
        for k, v in checkpoint.items()
        if k in ["hidden_dim", "num_layers", "dropout"]
    }
).to(device)
best_model.load_state_dict(checkpoint["model_state_dict"])

# 回测
pf, result_df = backtest_strategy(best_model, test_data, X_scaler, y_scaler, test_df)

# 绩效分析
print(pf.stats())
pf.plot().show()

4. 关键类和函数说明

4.1 prepare_data 函数

  • 功能:数据预处理,包括特征选择、标准化、时间序列分割等。
  • 参数
    • df:原始数据框。
  • 返回值
    • train_data:训练集数据。
    • val_data:验证集数据。
    • test_data:测试集数据。
    • X_scaler:特征标准化器。
    • y_scaler:目标变量标准化器。
    • test_df:测试集数据框。

4.2 EnhancedLSTM

  • 功能:定义一个增强的LSTM模型,包含注意力机制。
  • 参数
    • input_dim:输入特征维度。
    • hidden_dim:隐藏层维度,默认128。
    • num_layers:LSTM层数,默认2。
    • dropout:Dropout概率,默认0.3。
  • 方法
    • forward:前向传播函数。

4.3 AdaptiveStrategy

  • 功能:根据预测收益率和波动率生成交易信号。
  • 参数
    • pred_returns:预测收益率。
    • volatility:波动率。
    • params:参数字典,包含threshold_lookback
  • 方法
    • generate_signals:生成交易信号。

4.4 train_model 函数

  • 功能:训练LSTM模型。
  • 参数
    • config:配置字典,包含模型参数。
    • train_data:训练集数据。
    • val_data:验证集数据。
  • 返回值
    • model:训练好的模型。

4.5 evaluate_model 函数

  • 功能:评估模型在验证集上的性能。
  • 参数
    • model:模型。
    • val_data:验证集数据。
  • 返回值
    • val_loss:验证集损失。

4.6 backtest_strategy 函数

  • 功能:回测策略并生成投资组合。
  • 参数
    • model:模型。
    • test_data:测试集数据。
    • X_scaler:特征标准化器。
    • y_scaler:目标变量标准化器。
    • test_df:测试集数据框。
  • 返回值
    • pf:投资组合。
    • result_df:结果数据框。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。