VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶四
本方案融合 LSTM 时序预测与动态风险控制。系统采用混合架构,离线训练构建多尺度特征工程和双均线策略,结合在线增量更新持续优化模型。技术要点包括三层特征筛选、波动率动态仓位管理、混合精度训练提升效率,以及用 VectorBT 验证收益。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。
本文是进阶指南🚀,推荐先阅读了解基础知识‼️
- VectorBT:Python量化交易策略开发与回测评估详解 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶二 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶三 🔥
1. 方案概述
本方案设计了一个基于PyTorch和LSTM的短期交易模型,支持多股票增量数据训练和单股数据回测。该模型利用了时间序列预测、特征工程、波动率动态仓位管理、超参数优化以及回测等技术手段,以实现高效的交易策略生成和评估。
原理
LSTM预测模型:
- 采用多尺度LSTM架构,包含短期(5天)和中期(10天)两个时间维度
- 引入时间注意力机制,自动捕捉重要时间节点
- 使用Huber Loss作为损失函数,增强对异常值的鲁棒性
双EMA策略:
- 基于预测收益率生成快慢EMA交叉信号
- 动态参数优化机制:使用Optuna框架进行参数寻优
- 风险控制:波动率动态仓位管理,双重交易成本(0.5%总成本 ‼️示例数据‼️)
Optuna: 用于自动化的超参数优化。
特点
- 增量学习架构:
- 多维度特征体系:
- 8大类技术指标
- 自适应特征选择机制
- 分层标准化策略
- 工程化设计:
- 状态持久化(模型+预处理器)
- 设备自适应(CUDA/MPS/CPU)
- 全流程随机种子控制
注意事项
- 数据泄漏防范:
- 严格按时间序列划分数据集
- 分组计算收益率(groupby ts_code)
- 在线特征工程采用滞后窗口
- 参数过拟合风险:
- Optuna优化次数限制(n_trials=10)
- 使用Walk-Forward验证
- 利润目标与夏普率结合评估
- 计算资源考量:
- GPU显存管理(batch_size=128)
- 数据批处理(window_size=5)
- 特征维度压缩(SelectFromModel)
2. 序列图
3. 工程代码
目录结构:
data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_4_model.pth
├── vectorbt_4_preprocessors.pkl
src/
└── vectorbt_4/
├── data_processing.py
├── model_definition.py
├── training.py
├── backtesting.py
├── main.py
└── __init__.py
3.1 data_processing.py
import pandas as pd
def load_data(ts_codes, data_path="./data"):
"""加载预处理后的股票数据
:param ts_code: 股票代码(如["600000.SH", "600036.SH", "000001.SZ"])
:param data_path: 数据存储路径
:return: 合并后的DataFrame(含ts_code列标识股票)
处理步骤:
1. 读取parquet格式的本地数据
2. 转换交易日期格式
3. 计算次日收益率(目标变量)
4. 删除缺失值
"""
dfs = []
for code in ts_codes:
df = pd.read_parquet(f"{data_path}/processed_{code}.parquet")
df["ts_code"] = code
dfs.append(df)
combined_df = pd.concat(dfs)
combined_df["trade_date"] = pd.to_datetime(
combined_df["trade_date"], format="%Y%m%d"
)
combined_df.set_index("trade_date", inplace=True)
combined_df.sort_index(inplace=True)
# 按股票分组计算收益率,避免跨股票计算,严格避免未来信息
combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)[
"close"
].apply(
lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1) # 添加收益率截断
)
combined_df.dropna(inplace=True)
return combined_df
3.2 model_definition.py
import torch
import torch.nn as nn
import torch.nn.functional as F
class TemporalAttention(nn.Module):
def __init__(self, hidden_dim):
"""时间注意力机制模块。
:param hidden_dim: 隐藏层维度
"""
super().__init__()
# 定义线性变换W,用于计算注意力得分
self.W = nn.Linear(hidden_dim, hidden_dim)
# 定义线性变换V,用于将得分转换为权重
self.V = nn.Linear(hidden_dim, 1)
def forward(self, hidden):
"""前向传播函数。
:param hidden: 输入隐藏状态 (batch, seq_len, hidden_dim)
:return: 上下文向量 (batch, hidden_dim)
"""
# 对hidden应用W变换,并使用tanh激活函数
score = torch.tanh(self.W(hidden)) # (batch, seq_len, hidden_dim)
# 计算每个时间步的注意力权重
attention_weights = F.softmax(self.V(score), dim=1) # (batch, seq_len, 1)
# 加权求和得到上下文向量
context = torch.sum(attention_weights * hidden, dim=1) # (batch, hidden_dim)
return context
class MultiScaleLSTM(nn.Module):
def __init__(self, input_dim, hidden_dim=256, num_layers=3):
"""多尺度LSTM模型。
:param input_dim: 输入特征维度
:param hidden_dim: LSTM隐藏层维度,defaults to 256
:param num_layers: LSTM层数,defaults to 3
"""
super().__init__()
# 短期LSTM配置
self.lstm_short = nn.LSTM(
input_dim,
hidden_dim // 2,
num_layers=num_layers,
bidirectional=True,
batch_first=True,
)
# 中期LSTM配置
self.lstm_mid = nn.LSTM(
input_dim,
hidden_dim,
num_layers=num_layers,
bidirectional=True,
batch_first=True,
)
# 短期特征的时间注意力机制
self.temp_attn_short = TemporalAttention(hidden_dim)
# 中期特征的时间注意力机制
self.temp_attn_mid = TemporalAttention(hidden_dim * 2)
# 动态计算融合后的维度
self.fusion_dim = hidden_dim + hidden_dim * 2
# 收益预测头
self.return_head = nn.Sequential(
nn.Linear(self.fusion_dim, 128),
nn.LayerNorm(128),
nn.GELU(),
nn.Dropout(0.3),
nn.Linear(128, 1), # 输出维度为1
)
def forward(self, x):
"""前向传播函数。
:param x: 输入数据 (batch, seq_len, input_dim)
:return: 预测收益 (batch_size, 1)
"""
# 短期特征处理
out_short, _ = self.lstm_short(x) # (batch, seq_len, hidden_dim)
context_short = self.temp_attn_short(out_short) # (batch, hidden_dim)
# 中期特征处理(带降采样)
x_mid = F.max_pool1d(x.transpose(1, 2), kernel_size=2).transpose(
1, 2
) # (batch, seq_len/2, input_dim)
out_mid, _ = self.lstm_mid(x_mid) # (batch, seq_len/2, hidden_dim*2)
context_mid = self.temp_attn_mid(out_mid) # (batch, hidden_dim*2)
# 特征融合
combined = torch.cat(
[context_short, context_mid], dim=-1
) # (batch, fusion_dim)
# 单任务输出
return self.return_head(combined) # 输出形状 [batch_size, 1]
3.3 training.py
import os
import joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm
from vectorbt_4.model_definition import MultiScaleLSTM
class SingleWindowDataset(Dataset):
def __init__(self, X, y):
"""单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。
:param X: 滑动窗口特征数据
:param y: 目标变量
"""
self.X = X.astype(np.float32) # 将滑动窗口特征数据转换为float32类型
self.y = y.astype(np.float32) # 将目标变量转换为float32类型
def __len__(self):
return len(self.y) # 返回目标变量的长度
def __getitem__(self, idx):
x = torch.from_numpy(
self.X[idx]
).float() # 将滑动窗口特征数据转换为PyTorch张量
label = torch.tensor(
self.y[idx], dtype=torch.float32
) # 将目标变量转换为PyTorch张量
return x, label # 返回特征和标签
class TrainingStateManager:
def __init__(self, model_dir="./models"):
"""训练状态管理器类,负责模型和预处理器的保存和加载。
:param model_dir: 模型保存目录,defaults to "./models"
"""
self.model_dir = model_dir # 设置模型保存目录
self.model_path = f"{self.model_dir}/vectorbt_4_model.pth" # 设置模型文件路径
self.preprocessors_path = (
f"{self.model_dir}/vectorbt_4_preprocessors.pkl" # 设置预处理器文件路径
)
os.makedirs(model_dir, exist_ok=True) # 创建模型保存目录(如果不存在)
def save(self, model, feature_engineer, feature_selector, config):
"""保存模型和预处理器。
:param model: 训练好的模型
:param feature_engineer: 特征工程对象
:param feature_selector: 特征选择器
:param config: 模型配置
"""
torch.save(model.state_dict(), self.model_path) # 保存模型参数
joblib.dump(
{
"feature_engineer": feature_engineer,
"feature_selector": feature_selector,
"config": config,
},
self.preprocessors_path,
) # 保存预处理器和配置
print(f"Model saved to {self.model_path}") # 打印模型保存路径
print(
f"Preprocessor saved to {self.preprocessors_path}"
) # 打印预处理器保存路径
def load(self, device):
"""加载模型和预处理器。
:param device: 计算设备(CPU/GPU)
:return: 加载的模型、特征工程对象、特征选择器和模型配置
"""
model = None # 初始化模型
feature_engineer = None # 初始化特征工程对象
feature_selector = None # 初始化特征选择器
config = None # 初始化配置
if os.path.exists(self.preprocessors_path): # 如果预处理器文件存在
preprocess = joblib.load(self.preprocessors_path) # 加载预处理器
feature_engineer = preprocess["feature_engineer"] # 获取特征工程对象
feature_selector = preprocess["feature_selector"] # 获取特征选择器
config = preprocess["config"] # 获取配置
if os.path.exists(self.model_path): # 如果模型文件存在
model_weights = torch.load(
self.model_path, weights_only=False, map_location=device
) # 加载模型参数
model = MultiScaleLSTM(
input_dim=config["input_dim"], hidden_dim=config["hidden_dim"]
).to(
device
) # 初始化模型并移动到指定设备
model.load_state_dict(model_weights) # 加载模型参数
return (
model,
feature_engineer,
feature_selector,
config,
) # 返回加载的模型、特征工程对象、特征选择器和配置
class OnlineFeatureEngineer:
def __init__(self, windows=[5]):
"""在线特征工程生成器类,用于生成技术指标特征并进行标准化。
:param windows: 滑动窗口列表(用于特征生成),defaults to [5]
"""
self.windows = windows # 设置滑动窗口列表
self.n_features = 10 # 设置特征数量
self.scalers = {
"Trend": RobustScaler(), # 趋势类指标使用RobustScaler
"Momentum": MinMaxScaler(), # 动量类指标使用MinMaxScaler
"Volatility": RobustScaler(), # 波动率类指标使用RobustScaler
"Volume": StandardScaler(), # 成交量类指标使用StandardScaler
# "Sentiment": StandardScaler(), # 市场情绪类指标使用StandardScaler
"SupportResistance": MinMaxScaler(), # 支撑阻力类指标使用MinMaxScaler
"Statistical": StandardScaler(), # 统计类指标使用StandardScaler
"Composite": RobustScaler(), # 复合型指标使用RobustScaler
}
# "price": ["open", "high", "low", "close"],
self.feature_groups = {
# Trend-Following Indicators 趋势类指标
"Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],
# Momentum Indicators 动量类指标
"Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],
# Volatility Indicators 波动率类指标
"Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],
# Volume Indicators 成交量类指标
"Volume": ["OBV", "MFI"],
# Market Sentiment Indicators 市场情绪类指标 (需要外部数据)
# "Sentiment": [],
# Support/Resistance Indicators 支撑阻力类指标
"SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],
# Statistical Indicators 统计类指标
"Statistical": ["LR_slope", "LR_angle"],
# Composite Indicators 复合型指标
"Composite": [
"Ichimoku_tenkan",
"Ichimoku_kijun",
"Ichimoku_senkou_a",
"Ichimoku_senkou_b",
"Ichimoku_chikou",
],
}
self.all_features = [
f for sublist in self.feature_groups.values() for f in sublist
] # 生成所有特征列表
def partial_fit(self, new_df):
"""对新数据进行部分拟合。
:param new_df: 新数据DataFrame
"""
new_features = self.generate_features(new_df, refit=True) # 生成新数据的特征
for group, features in self.feature_groups.items(): # 遍历每个特征组
if hasattr(self.scalers[group], "partial_fit"): # 如果该缩放器支持部分拟合
self.scalers[group].partial_fit(
new_features[features]
) # 对新数据进行部分拟合
def generate_features(self, df, refit=False):
"""生成技术指标特征。
:param df: 原始数据DataFrame
:param refit: 是否重新拟合标准化器,defaults to False
:return: 特征DataFrame
生成8大类技术指标:
1. 趋势类指标(MA, MACD等)
2. 动量类指标(RSI, KDJ等)
3. 波动率指标(布林带, ATR等)
4. 成交量指标(OBV, MFI等)
5. 市场情绪类指标 (需要外部数据) -- 忽略
6. 支撑阻力指标(斐波那契回撤等)
7. 统计指标(线性回归斜率等)
8. 复合指标(Ichimoku云图等)
"""
processed = [] # 初始化处理后的特征列表
for group, features in self.feature_groups.items(): # 遍历每个特征组
scaler = self.scalers[group] # 获取对应的缩放器
if not refit: # 如果不重新拟合
scaler.fit(df[features]) # 拟合缩放器
scaled = scaler.transform(df[features]) # 标准化特征
processed.append(scaled) # 添加到处理后的特征列表
processed_df = pd.DataFrame(
np.hstack(processed), index=df.index, columns=self.all_features
) # 将处理后的特征合并为DataFrame
processed_df["ts_code"] = df["ts_code"] # 添加股票代码
processed_df["returns"] = df["returns"] # 添加收益
return processed_df.dropna() # 删除缺失值
def feature_selection(self, X, y):
"""进行特征选择。
:param X: 特征数据
:param y: 目标变量
:return: 选择后的特征数据
"""
selector = RandomForestRegressor(
n_estimators=50, n_jobs=-1
) # 初始化随机森林回归器
selector.fit(X, y) # 拟合随机森林回归器
feature_selector = SelectFromModel(selector, prefit=True) # 初始化特征选择器
return feature_selector # 返回特征选择器
class IncrementalDataHandler:
def __init__(self, feature_engineer, feature_selector, window_size=5):
"""增量数据处理器类,用于处理增量数据并生成滑动窗口数据。
:param feature_engineer: 特征工程对象
:param feature_selector: 特征选择器
:param window_size: 滑动窗口大小,defaults to 5
"""
self.feature_engineer = feature_engineer # 设置特征工程对象
self.feature_selector = feature_selector # 设置特征选择器
self.window_size = window_size # 设置滑动窗口大小
self.buffer = pd.DataFrame() # 初始化数据缓冲区
def update_buffer(self, new_df):
"""更新数据缓冲区。
:param new_df: 新数据DataFrame
"""
self.buffer = pd.concat(
[self.buffer, new_df]
).sort_index() # 更新数据缓冲区并排序
def prepare_incremental_data(self):
"""准备增量数据。
:return: 训练数据和测试数据
"""
processed_df = self.feature_engineer.generate_features(self.buffer) # 生成特征
X_selected = self.feature_selector.transform(
processed_df[self.feature_engineer.all_features].values
) # 选择特征
X, y = self.sliding_window(processed_df, X_selected) # 生成滑动窗口数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, shuffle=False
) # 划分训练集和测试集
return (X_train, y_train), (X_test, y_test) # 返回训练集和测试集
def sliding_window(self, df, features):
"""生成时序滑动窗口数据"""
sequences = [] # 初始化序列列表
labels = [] # 初始化标签列表
for ts_code, group in df.groupby("ts_code"): # 按股票代码分组
group_features = features[df["ts_code"] == ts_code] # 获取该股票的特征
for i in range(self.window_size, len(group)): # 生成滑动窗口
sequences.append(
group_features[i - self.window_size : i]
) # 添加滑动窗口特征
labels.append(group["returns"].iloc[i]) # 添加标签
return np.array(sequences, dtype=np.float32), np.array(
labels, dtype=np.float32
) # 返回滑动窗口特征和标签
class IncrementalTrainer:
def __init__(self, device, window_size=5):
"""增量训练控制器类,负责模型的初始训练和增量更新。
:param device: 计算设备(CPU/GPU)
"""
self.device = device # 设置计算设备
self.window_size = window_size # 设置滑动窗口大小
self.state_manager = TrainingStateManager() # 初始化训练状态管理器
self.model = None # 初始化模型
self.feature_engineer = None # 初始化特征工程对象
self.feature_selector = None # 初始化特征选择器
self.config = None # 初始化配置
self.load_state() # 加载状态
def load_state(self):
"""加载模型和预处理器。"""
(
self.model,
self.feature_engineer,
self.feature_selector,
self.config,
) = self.state_manager.load(
self.device
) # 加载模型、特征工程对象、特征选择器和配置
def initial_train(self, df):
"""初始训练模型。
:param df: 原始数据DataFrame
"""
# 特征生成
self.feature_engineer = OnlineFeatureEngineer(
windows=[self.window_size]
) # 初始化特征工程对象
processed_df = self.feature_engineer.generate_features(df) # 生成特征
# 特征选择
X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(
processed_df.index
) # 准备特征和标签
self.feature_selector = self.feature_engineer.feature_selection(
X, y
) # 选择特征
# 准备训练数据
data_handler = IncrementalDataHandler(
feature_engineer=self.feature_engineer,
feature_selector=self.feature_selector,
window_size=self.window_size,
) # 初始化增量数据处理器
data_handler.buffer = df # 更新数据缓冲区
(X_train, y_train), (X_test, y_test) = (
data_handler.prepare_incremental_data()
) # 准备训练数据
# Optuna超参优化
self._optimize_parameters(X_train, y_train, X_test, y_test) # 优化超参数
# 最佳模型训练
print(f"Initial Model Config: {self.config}") # 打印初始模型配置
self._train(X_train, y_train) # 训练模型
print("Initial Model Evaluation:") # 打印初始模型评估
self._evaluate(X_test, y_test) # 评估模型
print("Initial Model Save:") # 打印初始模型保存
self.state_manager.save(
self.model, self.feature_engineer, self.feature_selector, self.config
) # 保存模型
def incremental_update(self, new_df):
"""增量更新模型。
:param new_df: 新数据DataFrame
"""
if not self.model:
print(
"The model does not exist, please train it first."
) # 如果模型不存在,提示先训练模型
return
self.feature_engineer.partial_fit(new_df) # 对新数据进行部分拟合
data_handler = IncrementalDataHandler(
self.feature_engineer, self.feature_selector
) # 初始化增量数据处理器
data_handler.update_buffer(new_df) # 更新数据缓冲区
(X_train, y_train), (X_test, y_test) = (
data_handler.prepare_incremental_data()
) # 准备增量数据
print(f"Incremental Model Config: {self.config}") # 打印增量模型配置
self._train(X_train, y_train) # 训练模型
print("\nIncremental Model Evaluation:") # 打印增量模型评估
self._evaluate(X_test, y_test) # 评估模型
print("\nIncremental Model Save:") # 打印增量模型保存
self.state_manager.save(
self.model, self.feature_engineer, self.feature_selector, self.config
) # 保存模型
def _optimize_parameters(self, X_train, y_train, X_test, y_test):
"""Optuna超参优化
:param X_train: 训练集滑动窗口特征数据
:param y_train: 训练集目标变量
:param X_test: 测试集滑动窗口特征数据
:param y_test: 测试集目标变量
"""
def objective(trial):
self.config = {
"hidden_dim": trial.suggest_int(
"hidden_dim", 64, 256
), # 建议隐藏层维度
"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True), # 建议学习率
"batch_size": trial.suggest_categorical(
"batch_size", [32, 64, 128]
), # 建议批量大小
"weight_decay": trial.suggest_float(
"weight_decay", 1e-6, 1e-4
), # 建议权重衰减
"input_dim": X_train[0].shape[-1], # 输入特征维度
"epochs": 100, # 训练轮数
"window_size": self.window_size, # 滑动窗口大小
}
self._train(X_train, y_train) # 训练模型
val_loss = self._evaluate(X_test, y_test) # 评估模型
print(f"Val Loss: {val_loss:.4f}") # 打印验证损失
return val_loss # 返回验证损失
# 超参优化
study = optuna.create_study(direction="minimize") # 创建研究
study.optimize(objective, n_trials=1, show_progress_bar=True) # 优化目标函数
print(f"Training Best params: {study.best_params}") # 打印最佳参数
# 最佳模型参数
self.config.update(study.best_params) # 更新配置
def _train(self, X_train, y_train):
"""模型训练内部方法。
:param X_train: 训练集滑动窗口特征数据
:param y_train: 训练集目标变量
"""
print(f"Training Model Config: {self.config}") # 打印模型配置
lr = self.config.get("lr", 1e-4) # 获取学习率
epochs = self.config.get("epochs", 100) # 获取训练轮数
batch_size = self.config.get("batch_size", 128) # 获取批量大小
weight_decay = self.config.get("weight_decay", 1e-6) # 获取权重衰减
hidden_dim = self.config.get("hidden_dim", 128) # 获取隐藏层维度
input_dim = self.config.get(
"input_dim", X_train[0].shape[-1]
) # 获取输入特征维度
dataset = SingleWindowDataset(X_train, y_train) # 初始化数据集
loader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=False,
) # 初始化数据加载器
# 初始化模型
self.model = MultiScaleLSTM(input_dim=input_dim, hidden_dim=hidden_dim).to(
self.device
) # 初始化模型并移动到指定设备
optimizer = optim.AdamW(
self.model.parameters(),
lr=lr,
weight_decay=weight_decay,
) # 初始化优化器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, "min", patience=5
) # 初始化学习率调度器
criterion = nn.HuberLoss() # 初始化损失函数
# 训练循环
for epoch in tqdm(range(epochs), desc="Training"): # 进行训练
self.model.train() # 设置模型为训练模式
total_loss = 0 # 初始化总损失
for X_batch, y_batch in loader: # 遍历数据加载器
X_batch = X_batch.to(self.device) # 将特征数据移动到指定设备
y_batch = y_batch.to(self.device).unsqueeze(
1
) # 将标签数据移动到指定设备并扩展维度
optimizer.zero_grad() # 清零梯度
preds = self.model(X_batch) # 前向传播
loss = criterion(preds, y_batch) # 计算损失
loss.backward() # 反向传播
nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) # 梯度裁剪
optimizer.step() # 更新参数
total_loss += loss.item() # 累加损失
# 学习率调整
avg_loss = total_loss / len(loader) # 计算平均损失
scheduler.step(avg_loss) # 更新学习率
def _evaluate(self, X_test, y_test):
"""模型评估内部方法。
:param X_test: 测试集滑动窗口特征数据
:param y_test: 测试集目标变量
:return: 平均损失
"""
test_dataset = SingleWindowDataset(X_test, y_test) # 初始化测试数据集
test_loader = DataLoader(
test_dataset,
batch_size=128,
shuffle=False,
) # 初始化测试数据加载器
self.model.eval() # 设置模型为评估模式
total_loss = 0 # 初始化总损失
criterion = nn.HuberLoss() # 初始化损失函数
with torch.no_grad(): # 关闭梯度计算
for X_batch, y_batch in test_loader: # 遍历测试数据加载器
X_batch = X_batch.to(self.device) # 将特征数据移动到指定设备
y_batch = y_batch.to(self.device).unsqueeze(
1
) # 将标签数据移动到指定设备并扩展维度
preds = self.model(X_batch) # 前向传播
loss = criterion(preds, y_batch) # 计算损失
total_loss += loss.item() * len(y_batch) # 累加损失
test_loss = total_loss / len(test_dataset) # 计算平均损失
print(f"Test Loss: {test_loss}") # 打印测试损失
return test_loss # 返回测试损失
3.4 backtesting.py
import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbt
class DualEMACrossoverStrategy:
def __init__(self, pred_returns, volatility, params):
"""双EMA交叉策略类。
:param pred_returns: 预测的收益率序列
:param volatility: 波动率序列(用于仓位计算)
:param params: 参数字典,包含快慢EMA的时间跨度
"""
self.pred_returns = pred_returns # 预测的收益率序列
self.volatility = volatility.clip(lower=0.01) # 防止波动率为0,导致除零错误
self.fast_span = params["fast_span"] # 快速EMA的时间跨度
self.slow_span = params["slow_span"] # 慢速EMA的时间跨度
def generate_signals(self):
"""生成交易信号。
:return: (交易信号, 仓位大小)
"""
ema_fast = self.pred_returns.ewm(
span=self.fast_span, min_periods=self.fast_span
).mean() # 计算快速EMA
ema_slow = self.pred_returns.ewm(
span=self.slow_span, min_periods=self.slow_span
).mean() # 计算慢速EMA
signals = pd.Series(0, index=self.pred_returns.index) # 初始化信号序列
signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (
1 # 买入信号
)
signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (
-1
) # 卖出信号
# 使用tanh函数压缩仓位大小,实现非线性映射
position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(
0.3, 0.8
)
return signals, position_size # 返回信号和仓位大小
class BacktestStrategy:
def __init__(self, model, device):
"""回测执行引擎。
:param model: 训练好的预测模型
:param device: 计算设备(CPU/GPU)
"""
self.model = model # 训练好的预测模型
self.device = device # 计算设备
self._configure_vbt() # 配置VectorBT全局参数
def _configure_vbt(self):
"""配置VectorBT全局参数。"""
vbt.settings.array_wrapper["freq"] = "D" # 设置时间频率为日频
vbt.settings.plotting["layout"]["template"] = "vbt_dark" # 使用暗色主题
vbt.settings.plotting["layout"]["width"] = 1200 # 设置图表宽度
vbt.settings.portfolio["init_cash"] = 100000.0 # 初始资金10万元
vbt.settings.portfolio["fees"] = 0.0025 # 交易成本(手续费)0.25%
vbt.settings.portfolio["slippage"] = 0.0025 # 交易成本(滑点)0.25%
def _optimize_parameters(self, result_df):
"""优化参数逻辑调整。
:param result_df: 包含预测收益率的结果DataFrame
:return: 最优参数
"""
def objective(trial):
params = {
"fast_span": trial.suggest_int(
"fast_span", 10, 30
), # 建议快速EMA的时间跨度
"slow_span": trial.suggest_int(
"slow_span", 50, 100
), # 建议慢速EMA的时间跨度
}
strategy = DualEMACrossoverStrategy(
pred_returns=result_df["pred_returns"],
volatility=result_df["volatility"],
params=params,
) # 创建策略实例
signals, position_size = strategy.generate_signals() # 生成交易信号
pf = vbt.Portfolio.from_signals(
close=result_df["close"],
entries=signals.shift(1) == 1, # 买入信号
exits=signals.shift(1) == -1, # 卖出信号
size=position_size, # 固定仓位
freq="D",
)
return pf.total_profit() # 返回总利润
study = optuna.create_study(direction="maximize") # 创建Optuna研究
study.optimize(objective, n_trials=10, show_progress_bar=True) # 优化参数
print(f"Strategy Best params: {study.best_params}") # 打印最优参数
return study.best_params # 返回最优参数
def run(self, test_data, df):
"""执行完整回测流程。
:param test_data: 测试数据集元组(X_test, y_test)
:param df: 原始数据DataFrame
:return: (组合对象, 结果DataFrame)
"""
X_test = test_data # 测试数据
self.model.eval() # 将模型设置为评估模式
with torch.no_grad(): # 禁用梯度计算
test_tensor = torch.FloatTensor(X_test).to(
self.device
) # 转换为Tensor并移动到指定设备
preds = (
self.model(test_tensor).cpu().numpy().flatten()
) # 获取预测值并转换为NumPy数组
test_dates = df.index[-len(preds) :] # 获取测试日期
result_df = pd.DataFrame(
{
"close": df["close"].values[-len(preds) :],
"pred_returns": preds, # 使用rolling窗口计算动态波动率
"volatility": df["ATR"].values[-len(preds) :]
/ df["close"].values[-len(preds) :],
},
index=test_dates,
) # 创建结果DataFrame
best_params = self._optimize_parameters(result_df) # 运行参数优化
strategy = DualEMACrossoverStrategy(
pred_returns=result_df["pred_returns"],
volatility=result_df["volatility"],
params=best_params,
) # 创建策略实例
signals, position_size = strategy.generate_signals() # 生成交易信号
return vbt.Portfolio.from_signals(
close=result_df["close"],
entries=signals == 1, # 买入信号
exits=signals == -1, # 卖出信号
size=position_size,
size_type="percent",
freq="D",
) # 执行组合回测
3.5 main.py
import random
import numpy as np
import torch
from vectorbt_4.backtesting import BacktestStrategy
from vectorbt_4.data_processing import load_data
from vectorbt_4.training import IncrementalTrainer, TrainingStateManager
def set_random_seed(seed=42):
"""设置全局随机种子
:param seed: 随机种子, defaults to 42
影响范围:
- Python内置随机模块
- Numpy随机数生成
- PyTorch CPU/CUDA随机种子
"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # 如果使用多个GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def prepare_backtest_data(ts_code, device):
"""准备回测数据。
:param ts_code: 股票代码
:param device: 计算设备(CPU/GPU)
:return: 滑动窗口特征数据、原始数据DataFrame、模型、特征工程对象、特征选择器和配置
"""
state_manager = TrainingStateManager() # 初始化训练状态管理器
model, feature_engineer, feature_selector, config = state_manager.load(
device
) # 加载模型和预处理器
print(f"Model Config: {config}") # 打印模型配置
# 加载并处理数据
test_df = load_data([ts_code]) # 加载数据
test_df = test_df[-300:] # 取最近300条数据
processed_df = feature_engineer.generate_features(test_df) # 生成特征
X_selected = feature_selector.transform(
processed_df[feature_engineer.all_features].values
) # 选择特征
# 构建滑动窗口
window_size = config["window_size"] # 获取滑动窗口大小
sequences = [] # 初始化序列列表
for i in range(window_size, len(X_selected)): # 遍历数据
sequences.append(X_selected[i - window_size : i]) # 添加滑动窗口特征
return (
np.array(sequences, dtype=np.float32), # 返回滑动窗口特征数据
test_df, # 返回原始数据DataFrame
model, # 返回模型
feature_engineer, # 返回特征工程对象
feature_selector, # 返回特征选择器
config, # 返回配置
)
if __name__ == "__main__":
# 设置随机种子
# 函数确保了整个训练过程的可重复性。
# 通过设置相同的随机种子,可以保证每次运行时生成的随机数序列一致,这对于调试和实验验证非常重要。
set_random_seed()
# 检测可用计算设备(优先使用CUDA)
device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available() else "cpu"
)
# 股票行情
# start_date = "20100101"
# end_date = "20241231"
trainer = IncrementalTrainer(device)
# 多股票初始训练示例
# 浦发银行(600000.SH)
# 招商银行(600036.SH)
# 平安银行(000001.SZ)
train_codes = ["600000.SH", "600036.SH", "000001.SZ"]
train_df = load_data(train_codes)
model = trainer.initial_train(train_df)
# 增量训练示例
# 贵州茅台(600519.SH)
# new_df = load_data(["600519.SH"])
# model = trainer.incremental_update(new_df)
# 单股票回测
(test_data, test_df, model, feature_engineer, feature_selector, config) = (
prepare_backtest_data("600519.SH", device)
)
backtester = BacktestStrategy(model, device)
pf = backtester.run(test_data, test_df)
print("回测结果统计:")
print(pf.stats())
pf.plot().show()
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。