VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一
本文介绍了如何使用PyTorch和LSTM模型进行股票数据的训练和回测。涵盖了数据预处理、特征选择、LSTM模型构建、模型训练与验证、动态阈值策略生成交易信号以及使用VectorBT进行回测和绩效分析。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。
本文是 🚀 VectorBT:Python量化交易策略开发与回测评估详解 🔥的进阶指南,推荐先阅读了解基础知识‼️
1. 知识点总结
本文将介绍如何使用PyTorch+LSTM模型结合VectorBT来训练和回测股票数据。
将涵盖以下知识点:
- 数据预处理
- 特征选择
- LSTM模型构建
- 模型训练与验证
- 动态阈值策略
- 回测与绩效分析
2. 关键步骤讲解
2.1 数据预处理
数据预处理是机器学习中的关键步骤,包括读取数据、处理缺失值、标准化等。
- Pandas:用于数据处理和操作。
- MinMaxScaler:用于数据标准化,将特征缩放到0到1之间。
2.2 特征选择
特征选择是为了减少特征维度,提高模型性能。这里使用互信息法进行特征选择。
- mutual_info_regression:计算特征与目标变量之间的互信息。
2.3 LSTM模型构建
LSTM(长短期记忆网络)是一种特殊的RNN,适用于时间序列预测。
- EnhancedLSTM:自定义的LSTM模型,包含注意力机制。
- nn.LSTM:PyTorch中的LSTM层。
- nn.Sequential:用于构建神经网络层的顺序容器。
2.4 模型训练与验证
- AdamW优化器:一种基于梯度下降的优化算法。
- OneCycleLR调度器:动态调整学习率。
- HuberLoss损失函数:对异常值不敏感的损失函数。
2.5 动态阈值策略
动态阈值策略根据历史数据生成交易信号。
- AdaptiveStrategy:根据预测收益率和波动率生成交易信号。
2.6 回测与绩效分析
- VectorBT:用于回测和绩效分析的库。
- Portfolio.from_signals:根据交易信号构建投资组合。
3. 代码实现
3.1 环境设置
import torch
import vectorbt as vbt
print(f"PyTorch版本: {torch.__version__}")
print(f"VectorBT版本: {vbt.__version__}")
vbt.settings.array_wrapper["freq"] = "D"
vbt.settings.plotting["layout"]["template"] = "vbt_dark"
vbt.settings.plotting["layout"]["width"] = 1200
vbt.settings.portfolio["init_cash"] = 100000.0 # 100000 CNY
vbt.settings.portfolio["fees"] = 0.0025 # 0.25%
vbt.settings.portfolio["slippage"] = 0.0025 # 0.25%
device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available() else "cpu"
)
3.2 数据准备
import pandas as pd
# 股票代码
# 贵州茅台(600519.SH)
ts_code = "600519.SH"
# 读取Parquet文件
df = pd.read_parquet(f"./data/processed_{ts_code}.parquet")
df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
df.set_index("trade_date", inplace=True)
df.dropna(inplace=True)
print(df.head())
print(df.shape)
3.3 数据预处理
- 特征筛选
- 目标变量
- 平稳性检验
- 时序分割
- 时序分割
- 标准化
- 创建时间序列数据
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import optuna
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_selection import mutual_info_regression
from statsmodels.tsa.stattools import adfuller
from tqdm.auto import tqdm
# 滑动窗口生成
seq_length = 30
def prepare_data(df):
# 特征工程(示例)
features = [
"open",
"high",
"low",
"vol",
"close",
"ma5",
"ma10",
"ma20",
"rsi",
"macd",
"macdsignal",
"macdhist",
"bb_upper",
"bb_middle",
"bb_lower",
"momentum",
"roc",
"atr",
"obv",
]
# 特征筛选(互信息法)
target = df["close"].pct_change().shift(-1).dropna()
selected_features = []
for feature in features:
mi = mutual_info_regression(
df.iloc[:-1][feature].values.reshape(-1, 1), target.iloc[: len(df)].values
)
if mi > 0.05:
selected_features.append(feature)
if len(selected_features) > 0:
features = selected_features
print(f"特征维度: {len(features)}列: {features}")
# 目标变量转换为收益率
df["returns"] = df["close"].pct_change().shift(-1) # 下一日收益率
df.dropna(inplace=True)
# 平稳性检验
adf_result = adfuller(df["returns"].dropna())
if adf_result[1] > 0.05:
df["returns"] = df["returns"].diff().fillna(0) # 一阶差分
# 时序分割
split_ratios = (0.6, 0.2, 0.2) # train/val/test
splits = np.cumsum(split_ratios) * len(df)
splits = splits.astype(int)
train_df = df.iloc[: splits[0]]
val_df = df.iloc[splits[0] : splits[1]]
test_df = df.iloc[splits[1] :]
# 标准化
X_scaler = MinMaxScaler().fit(train_df[features])
y_scaler = MinMaxScaler().fit(train_df[["returns"]])
# 创建时间序列数据
def create_sequences(data, target):
X, y = [], []
for i in range(len(data) - seq_length):
X.append(data[i : i + seq_length]) # 滑动窗口
y.append(target[i + seq_length - 1]) # 预测下一个时间步
return np.array(X), np.array(y)
X_train, y_train = create_sequences(
X_scaler.transform(train_df[features]),
y_scaler.transform(train_df[["returns"]]),
)
X_val, y_val = create_sequences(
X_scaler.transform(val_df[features]), y_scaler.transform(val_df[["returns"]])
)
X_test, y_test = create_sequences(
X_scaler.transform(test_df[features]), y_scaler.transform(test_df[["returns"]])
)
print(f"训练集维度: X{X_train.shape} y{y_train.shape}")
print(f"验证集维度: X{X_val.shape} y{y_val.shape}")
print(f"测试集维度: X{X_test.shape} y{y_test.shape}")
print(f"总样本数: {len(X_train)+len(X_val)+len(X_test)}")
return (
(X_train, y_train),
(X_val, y_val),
(X_test, y_test),
X_scaler,
y_scaler,
test_df,
)
3.4 LSTM模型定义
class EnhancedLSTM(nn.Module):
def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.3):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
)
self.attention = nn.Sequential(
nn.Linear(hidden_dim, 16), nn.Tanh(), nn.Linear(16, 1), nn.Softmax(dim=1)
)
self.fc = nn.Sequential(
nn.Linear(hidden_dim, 32), nn.ReLU(), nn.Dropout(dropout), nn.Linear(32, 1)
)
def forward(self, x):
out, _ = self.lstm(x) # [batch, seq_len, hidden]
attn_weights = self.attention(out) # [batch, seq_len, 1]
context = torch.sum(attn_weights * out, dim=1) # [batch, hidden]
return self.fc(context)
3.5 动态阈值策略
class AdaptiveStrategy:
def __init__(self, pred_returns, volatility, params):
self.pred_returns = pred_returns
self.volatility = volatility
self.params = params
def generate_signals(self):
# 动态阈值
lookback = self.params["threshold_lookback"]
upper_q = self.pred_returns.rolling(lookback).quantile(0.7)
lower_q = self.pred_returns.rolling(lookback).quantile(0.3)
# 波动率调整仓位(凯利公式变体)
position_size = 0.5 * self.pred_returns.abs() / (self.volatility + 1e-6)
position_size = position_size.clip(0.1, 0.8)
signals = pd.Series(0, index=self.pred_returns.index)
long_signals = (self.pred_returns > upper_q) & (position_size > 0.15)
exit_signals = self.pred_returns < lower_q
signals[long_signals] = 1
signals[exit_signals] = -1
return signals, position_size
3.6 模型训练和评估
# 模型训练
def train_model(config, train_data, val_data):
X_train, y_train = train_data
input_dim = X_train.shape[-1]
hidden_dim = config["hidden_dim"]
num_layers = config["num_layers"]
dropout = config["dropout"]
batch_size = config["batch_size"]
lr = config["lr"]
weight_decay = config["weight_decay"]
epochs = config["epochs"]
# 初始化模型
model = EnhancedLSTM(input_dim, hidden_dim, num_layers, dropout).to(device)
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
scheduler = optim.lr_scheduler.OneCycleLR(
optimizer,
max_lr=config["lr"],
total_steps=config["epochs"] * len(X_train) // config["batch_size"],
)
criterion = nn.HuberLoss()
train_loader = DataLoader(
TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train)),
batch_size=config["batch_size"],
shuffle=True,
drop_last=True,
)
# 训练循环
best_loss = float("inf")
early_stop_counter = 0
for epoch in tqdm(range(epochs), desc="Training"):
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
preds = model(X_batch)
loss = criterion(preds, y_batch)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
train_loss += loss.item()
# 验证评估
val_loss = evaluate_model(model, val_data)
if (epoch + 1) % 10 == 0:
print(
f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}"
)
# 早停机制
if val_loss < best_loss:
best_loss = val_loss
early_stop_counter = 0
else:
early_stop_counter += 1
if early_stop_counter >= 15:
break
# 训练完成后保存
model_path = "./models/pytorch_lstm_model.pth"
torch.save(
{
"model_state_dict": model.state_dict(),
"input_dim": input_dim,
"hidden_dim": hidden_dim,
"num_layers": num_layers,
"dropout": dropout,
"batch_size": batch_size,
"lr": lr,
"weight_decay": weight_decay,
"epochs": epochs,
},
model_path,
)
print(f"PyTorch LSTM model and parameters saved to {model_path}")
return model
# 模型评估
def evaluate_model(model, val_data):
X_val, y_val = val_data
val_loader = DataLoader(
TensorDataset(torch.FloatTensor(X_val), torch.FloatTensor(y_val)),
batch_size=256,
shuffle=False,
)
model.eval()
total_loss = 0
criterion = nn.HuberLoss()
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
preds = model(X_batch)
loss = criterion(preds, y_batch)
total_loss += loss.item()
return total_loss / len(val_loader)
3.6 回测引擎
def backtest_strategy(model, test_data, X_scaler, y_scaler, test_df):
X_test, y_test = test_data
# 生成预测
model.eval()
with torch.no_grad():
test_tensor = torch.FloatTensor(X_test).to(device)
preds = model(test_tensor).cpu().numpy()
# 反归一化
pred_returns = y_scaler.inverse_transform(preds.reshape(-1, 1)).flatten()
# 对齐时间索引
test_dates = test_df.index[seq_length:-1]
df = pd.DataFrame(
{
"close": test_df["close"].iloc[seq_length:-1],
"pred_returns": pred_returns[:-1],
"volatility": test_df["atr"].iloc[seq_length:-1]
/ test_df["close"].iloc[seq_length:-1],
},
index=test_dates,
)
# 生成信号
strategy = AdaptiveStrategy(
pred_returns=df["pred_returns"],
volatility=df["volatility"],
params={"threshold_lookback": 60},
)
signals, position_size = strategy.generate_signals()
# 输入校验
assert len(df["close"]) == len(signals), "数据长度不一致"
assert signals.isin([-1, 0, 1]).all(), "信号包含非法值"
# 构建投资组合
pf = vbt.Portfolio.from_signals(
close=df["close"],
size=np.abs(position_size), # 明确方向控制
size_type="percent",
entries=signals == 1,
exits=signals == -1,
freq="D",
# 增强参数
accumulate=False, # 禁止累积仓位
log=True, # 记录交易日志
call_seq="auto", # 自动处理订单顺序
)
return pf, df
3.7 主程序运行
# 数据准备
train_data, val_data, test_data, X_scaler, y_scaler, test_df = prepare_data(df)
# Optuna超参优化
def objective(trial):
config = {
"hidden_dim": trial.suggest_int("hidden_dim", 64, 256),
"num_layers": trial.suggest_int("num_layers", 1, 3),
"dropout": trial.suggest_float("dropout", 0.1, 0.5),
"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),
"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),
"weight_decay": trial.suggest_float("weight_decay", 1e-6, 1e-4),
"epochs": 100,
}
model = train_model(config, train_data, val_data)
val_loss = evaluate_model(model, val_data)
return val_loss
# 超参优化
study = optuna.create_study(direction="minimize")
study.optimize(
objective, n_trials=10, show_progress_bar=True, timeout=3600
) # 1小时超时
# 加载最佳模型
checkpoint = torch.load("./models/pytorch_lstm_model.pth", map_location=device)
best_model = EnhancedLSTM(
input_dim=train_data[0].shape[-1],
**{
k: v
for k, v in checkpoint.items()
if k in ["hidden_dim", "num_layers", "dropout"]
}
).to(device)
best_model.load_state_dict(checkpoint["model_state_dict"])
# 回测
pf, result_df = backtest_strategy(best_model, test_data, X_scaler, y_scaler, test_df)
# 绩效分析
print(pf.stats())
pf.plot().show()
4. 关键类和函数说明
4.1 prepare_data
函数
- 功能:数据预处理,包括特征选择、标准化、时间序列分割等。
- 参数:
df
:原始数据框。
- 返回值:
train_data
:训练集数据。val_data
:验证集数据。test_data
:测试集数据。X_scaler
:特征标准化器。y_scaler
:目标变量标准化器。test_df
:测试集数据框。
4.2 EnhancedLSTM
类
- 功能:定义一个增强的LSTM模型,包含注意力机制。
- 参数:
input_dim
:输入特征维度。hidden_dim
:隐藏层维度,默认128。num_layers
:LSTM层数,默认2。dropout
:Dropout概率,默认0.3。
- 方法:
forward
:前向传播函数。
4.3 AdaptiveStrategy
类
- 功能:根据预测收益率和波动率生成交易信号。
- 参数:
pred_returns
:预测收益率。volatility
:波动率。params
:参数字典,包含threshold_lookback
。
- 方法:
generate_signals
:生成交易信号。
4.4 train_model
函数
- 功能:训练LSTM模型。
- 参数:
config
:配置字典,包含模型参数。train_data
:训练集数据。val_data
:验证集数据。
- 返回值:
model
:训练好的模型。
4.5 evaluate_model
函数
- 功能:评估模型在验证集上的性能。
- 参数:
model
:模型。val_data
:验证集数据。
- 返回值:
val_loss
:验证集损失。
4.6 backtest_strategy
函数
- 功能:回测策略并生成投资组合。
- 参数:
model
:模型。test_data
:测试集数据。X_scaler
:特征标准化器。y_scaler
:目标变量标准化器。test_df
:测试集数据框。
- 返回值:
pf
:投资组合。result_df
:结果数据框。
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。