VectorBT:使用PyTorch+Transformer训练和回测股票模型 进阶五
本方案基于PyTorch框架与Transformer模型,结合VectorBT回测引擎构建多股票量化交易系统,采用滑动窗口技术构建时序特征,通过自注意力机制捕捉市场规律预测收益率,集成双EMA交叉策略动态生成交易信号,利用Optuna优化模型超参与策略参数,支持增量训练更新特征分布,结合波动率调整非线性仓位,并通过分组标准化与股票分组计算严格规避数据泄漏风险,实现端到端的量化策略研发闭环。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。
本文是进阶指南🚀,推荐先阅读了解基础知识‼️
- VectorBT:Python量化交易策略开发与回测评估详解 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶二 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶三 🔥
- VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶四 🔥
1. 方案概述
本方案基于PyTorch框架与Transformer模型,结合VectorBT回测引擎构建多股票量化交易系统,采用滑动窗口技术构建时序特征,通过自注意力机制捕捉市场规律预测收益率,集成双EMA交叉策略动态生成交易信号,利用Optuna优化模型超参与策略参数,支持增量训练更新特征分布,结合波动率调整非线性仓位,并通过分组标准化与股票分组计算严格规避数据泄漏风险,实现端到端的量化策略研发闭环。
1.1 核心原理
- 多时序特征编码:通过滑动窗口技术构建三维特征矩阵(样本×时间步×特征)
- Transformer建模:利用自注意力机制捕捉时序依赖关系
- 动态仓位管理:结合波动率调整仓位大小(使用tanh函数压缩)
- 增量学习机制:支持在线更新模型参数和特征分布
1.2 关键特点
- 多股票联合训练:共享特征表示,提升模型泛化能力
- 非线性仓位控制:position_size = tanh(|return|/volatility)
- 参数自动优化:使用Optuna进行双重优化(模型超参+策略参数)
- 特征鲁棒处理:分组标准化(趋势类用RobustScaler,成交量用StandardScaler)
1.3 注意事项
- 数据泄漏风险:严格按股票分组计算收益率和技术指标
- 设备兼容性:支持CUDA/MPS/CPU多设备自动切换
- 内存管理:滑动窗口生成时需控制窗口大小(默认5天)
- 过拟合预防:采用早停机制和学习率动态调整
2. 系统架构
架构说明:
应用层:通过
main.py
整合- 统一训练/回测接口
- 设备自动检测
- 全流程种子控制
数据层:通过
data_processing.py
实现- 多股票数据加载与合并
- 收益率计算与异常值处理
- 严格的时间序列管理
模型层:定义于
model_definition.py
- Transformer架构实现时序预测
- 包含位置编码和多头注意力机制
- 支持动态维度调整的Encoder结构
训练层:通过
training.py
实现- 增量式训练框架
- 在线特征工程系统
- 自适应特征选择机制
- Optuna超参优化集成
回测层:定义于
backtesting.py
- 双EMA交叉策略引擎
- 波动率自适应仓位管理
- 策略参数动态优化模块
2.1 数据层(Data Layer)
对应代码:data_processing.py
def load_data(ts_codes, data_path="./data", test=False):
# 核心数据加载逻辑
combined_df["returns"] = combined_df.groupby("ts_code")["close"].pct_change().shift(-1)
- 数据源:本地存储的Parquet文件(含复权处理)
- 关键处理:
- 跨股票数据合并与日期对齐
- 严格避免未来信息:按股票分组计算次日收益率
- 收益率截断(±10%边界)
- 输出格式:带时间戳的DataFrame,包含
open/high/low/close/vol
等原始字段
2.2 特征工程(Feature Engineering)
对应代码:training.py
class OnlineFeatureEngineer:
def generate_features(self, df):
# 生成8大类技术指标
self.feature_groups = {
"Trend": ["MA20","EMA12","MACD"...],
"Momentum": ["RSI","KDJ_K"...],
...
}
动态特征生成:
- 趋势类(MA20, MACD, …)
- 动量类(RSI, KDJ, …)
- 波动率(布林带, ATR, …)
- 成交量(OBV, MFI, …)
标准化策略:
self.scalers = { "Trend": RobustScaler(), "Momentum": MinMaxScaler(), "Volatility": RobustScaler(), ... }
特征选择:
selector = RandomForestRegressor(n_estimators=50) feature_selector = SelectFromModel(selector)
2.3 模型系统(Model System)
对应代码:model_definition.py
class TransformerModel(nn.Module):
def __init__(self, input_size, d_model=64, num_heads=4...):
# Encoder-Only结构
self.encoder_layers = nn.ModuleList([
EncoderLayer(d_model, num_heads, dff)
for _ in range(num_layers)
])
核心架构:
- 可配置的Encoder层数(2-4层)
- 多头注意力(4-8头)
- 位置编码使用正弦/余弦混合编码
训练机制:
criterion = nn.HuberLoss() optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5)
2.4 交易策略(Trading Strategy)
对应代码:backtesting.py
class DualEMACrossoverStrategy:
def generate_signals(self):
ema_fast = pred_returns.ewm(span=fast_span).mean()
ema_slow = pred_returns.ewm(span=slow_span).mean()
position_size = np.tanh(pred_returns.abs()/volatility)
- 动态参数:
- 快线EMA周期:10-30日
- 慢线EMA周期:50-100日
- 仓位控制:
- 基于波动率的tanh函数非线性映射
- 最大仓位限制在30%-80%之间
2.5 回测引擎(Backtesting)
对应代码:backtesting.py
class BacktestStrategy:
def _configure_vbt(self):
vbt.settings.portfolio["fees"] = 0.0025
vbt.settings.portfolio["slippage"] = 0.0025
成本模型:
- 双向0.25%佣金
- 0.25%滑点成本
执行机制:
Portfolio.from_signals( entries=signals.shift(1) == 1, # T+1信号执行 size=position_size, size_type="percent" )
3. 系统流程
3.1 训练流程序列图
3.2 回测流程序列图
3.3 流程关键点说明
训练流程:
- 采用两阶段优化:特征选择 → 超参优化
- 使用Optuna进行贝叶斯优化
- 支持断点续训的模型保存机制
回测流程:
- 策略参数动态搜索空间
- 基于波动率的非线性仓位控制
- 交易成本的双向收取模型
- 结果可视化自动适配暗色主题
跨模块协作:
- 特征工程与模型输入的维度一致性保证
- 训练/推理的设备自动适配
- 时间序列的严格对齐机制
4. 总结与优化建议
4.1 方案优势
- 架构灵活性:模块化设计支持策略快速迭代
- 计算效率:GPU加速训练+VectorBT高效回测
- 风险控制:动态波动率调整+仓位限制(0.3-0.8)
4.2 优化方向
维度 | 当前实现 | 优化建议 |
---|---|---|
特征工程 | 8类技术指标 | 增加市场情绪数据(新闻舆情、资金流向) |
模型结构 | Encoder-only | 增加Decoder实现Seq2Seq预测 |
仓位策略 | 固定比例 | 引入强化学习动态调整 |
数据增强 | 原始序列 | 添加随机时频变换增强 |
风险控制 | 波动率约束 | 加入VaR压力测试模块 |
4.3 实践建议
- 增量更新频率:建议每月更新模型,每周更新特征分布
- 参数搜索空间:可扩展EMA参数范围(快线5-50,慢线30-200)
- 硬件配置:推荐使用至少16GB显存的GPU设备
- 回测验证:建议采用Walk-Forward分析法,避免过拟合
5. 工程代码
目录结构:
data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_5_model.pth
├── vectorbt_5_preprocessors.pkl
src/
└── vectorbt_5/
├── data_processing.py
├── model_definition.py
├── training.py
├── backtesting.py
├── main.py
└── __init__.py
5.1 data_processing.py
import pandas as pd
def load_data(ts_codes, data_path="./data"):
"""加载预处理后的股票数据
:param ts_code: 股票代码(如["600000.SH", "600519.SH", "000001.SZ"])
:param data_path: 数据存储路径)
:return: 合并后的DataFrame(含ts_code列标识股票)
处理步骤:
1. 读取parquet格式的本地数据
2. 转换交易日期格式
3. 计算次日收益率(目标变量)
4. 删除缺失值
"""
dfs = []
for ts_code in ts_codes:
file_path = f"{data_path}/processed_{ts_code}.parquet"
df = pd.read_parquet(file_path)
df["ts_code"] = ts_code
dfs.append(df)
combined_df = pd.concat(dfs)
combined_df["trade_date"] = pd.to_datetime(
combined_df["trade_date"], format="%Y%m%d"
)
combined_df.set_index("trade_date", inplace=True)
combined_df.sort_index(inplace=True)
# 按股票分组计算收益率,避免跨股票计算,严格避免未来信息
combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)[
"close"
].apply(
lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1) # 添加收益率截断
)
combined_df.dropna(inplace=True)
return combined_df
5.2 model_definition.py
import math
import torch
import torch.nn as nn
class MultiHeadAttention(nn.Module):
"""多头注意力机制。
:param d_model: 输入和输出的维度
:param num_heads: 注意力头的数量
"""
def __init__(self, d_model, num_heads):
super().__init__()
self.num_heads = num_heads # 注意力头的数量
self.d_model = d_model # 输入和输出的维度
self.depth = d_model // num_heads # 每个头的维度
self.wq = nn.Linear(d_model, d_model) # 查询线性变换
self.wk = nn.Linear(d_model, d_model) # 键线性变换
self.wv = nn.Linear(d_model, d_model) # 值线性变换
self.dense = nn.Linear(d_model, d_model) # 输出线性变换
def split_heads(self, x, batch_size):
"""将输入张量分割成多个头。
:param x: 输入张量 (batch_size, seq_len, d_model)
:param batch_size: 批次大小
:return: 分割后的张量 (batch_size, num_heads, seq_len, depth)
"""
x = x.view(
batch_size, -1, self.num_heads, self.depth
) # 将d_model维度拆分成num_heads和depth
return x.permute(
0, 2, 1, 3
) # 调整维度顺序为 (batch_size, num_heads, seq_len, depth)
def forward(self, q, k, v):
"""前向传播函数。
:param q: 查询张量 (batch_size, seq_len, d_model)
:param k: 键张量 (batch_size, seq_len, d_model)
:param v: 值张量 (batch_size, seq_len, d_model)
:return: 输出张量 (batch_size, seq_len, d_model) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)
"""
batch_size = q.size(0) # 获取批次大小
q = self.wq(q) # 对查询进行线性变换
k = self.wk(k) # 对键进行线性变换
v = self.wv(v) # 对值进行线性变换
q = self.split_heads(q, batch_size) # 将查询张量分割成多个头
k = self.split_heads(k, batch_size) # 将键张量分割成多个头
v = self.split_heads(v, batch_size) # 将值张量分割成多个头
# 计算注意力
scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v)
scaled_attention = scaled_attention.permute(
0, 2, 1, 3
) # 调整维度顺序为 (batch_size, seq_len, num_heads, depth)
concat_attention = scaled_attention.reshape(
batch_size, -1, self.d_model
) # 合并头,恢复原始维度
output = self.dense(concat_attention) # 进行最终的线性变换
return output, attention_weights
def scaled_dot_product_attention(self, q, k, v):
"""计算缩放点积注意力。
:param q: 查询张量 (batch_size, num_heads, seq_len, depth)
:param k: 键张量 (batch_size, num_heads, seq_len, depth)
:param v: 值张量 (batch_size, num_heads, seq_len, depth)
:return: 注意力输出 (batch_size, num_heads, seq_len, depth) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)
"""
matmul_qk = torch.matmul(q, k.transpose(-1, -2)) # 计算Q和K的点积
dk = torch.tensor(k.size(-1), dtype=torch.float32) # 获取深度dk
scaled_attention_logits = matmul_qk / torch.sqrt(dk) # 缩放点积
attention_weights = torch.softmax(
scaled_attention_logits, dim=-1
) # 计算注意力权重
output = torch.matmul(attention_weights, v) # 应用注意力权重到值上
return output, attention_weights
class EncoderLayer(nn.Module):
"""编码器层。
:param d_model: 输入和输出的维度
:param num_heads: 注意力头的数量
:param dff: 前馈神经网络的中间维度
:param dropout: dropout 概率,默认为 0.1
"""
def __init__(self, d_model, num_heads, dff, dropout=0.1):
super().__init__()
self.mha = MultiHeadAttention(d_model, num_heads) # 多头注意力机制
self.ffn = nn.Sequential(
nn.Linear(d_model, dff), # 线性变换到dff维度
nn.GELU(), # GELU激活函数
nn.Linear(dff, d_model), # 线性变换回d_model维度
)
self.layer_norm1 = nn.LayerNorm(d_model) # 第一个层归一化
self.layer_norm2 = nn.LayerNorm(d_model) # 第二个层归一化
self.dropout = nn.Dropout(dropout) # Dropout层
def forward(self, x):
"""前向传播函数。
:param x: 输入张量 (batch_size, seq_len, d_model)
:return: 输出张量 (batch_size, seq_len, d_model)
"""
# 多头注意力
attn_output, _ = self.mha(x, x, x) # 计算多头注意力
attn_output = self.dropout(attn_output) # 应用dropout
out1 = self.layer_norm1(x + attn_output) # 残差连接和层归一化
# 前馈神经网络
ffn_output = self.ffn(out1) # 前馈神经网络
ffn_output = self.dropout(ffn_output) # 应用dropout
out2 = self.layer_norm2(out1 + ffn_output) # 残差连接和层归一化
return out2
class DecoderLayer(nn.Module):
"""解码器层。
:param d_model: 输入和输出的维度
:param num_heads: 注意力头的数量
:param dff: 前馈神经网络的中间维度
:param dropout: dropout 概率,默认为 0.1
"""
def __init__(self, d_model, num_heads, dff, dropout=0.1):
super().__init__()
self.mha1 = MultiHeadAttention(d_model, num_heads) # 掩码多头注意力
self.mha2 = MultiHeadAttention(d_model, num_heads) # 编码器-解码器注意力
self.ffn = nn.Sequential(
nn.Linear(d_model, dff), # 线性变换到dff维度
nn.GELU(), # GELU激活函数
nn.Linear(dff, d_model), # 线性变换回d_model维度
)
self.layer_norm1 = nn.LayerNorm(d_model) # 第一个层归一化
self.layer_norm2 = nn.LayerNorm(d_model) # 第二个层归一化
self.layer_norm3 = nn.LayerNorm(d_model) # 第三个层归一化
self.dropout = nn.Dropout(dropout) # Dropout层
def forward(self, x, enc_output):
"""前向传播函数。
:param x: 输入张量 (batch_size, seq_len, d_model)
:param enc_output: 编码器输出 (batch_size, src_seq_len, d_model)
:return: 输出张量 (batch_size, seq_len, d_model) 和两个注意力权重
"""
# 掩码多头注意力
attn1, attn_weights1 = self.mha1(x, x, x) # 计算掩码多头注意力
attn1 = self.dropout(attn1) # 应用dropout
out1 = self.layer_norm1(x + attn1) # 残差连接和层归一化
# 编码器-解码器注意力
attn2, attn_weights2 = self.mha2(
out1, enc_output, enc_output
) # 计算编码器-解码器注意力
attn2 = self.dropout(attn2) # 应用dropout
out2 = self.layer_norm2(out1 + attn2) # 残差连接和层归一化
# 前馈神经网络
ffn_output = self.ffn(out2) # 前馈神经网络
ffn_output = self.dropout(ffn_output) # 应用dropout
out3 = self.layer_norm3(out2 + ffn_output) # 残差连接和层归一化
return out3, attn_weights1, attn_weights2
class PositionalEncoding(nn.Module):
"""位置编码。
:param d_model: 输入和输出的维度
:param max_len: 最大序列长度,默认为5000
:param dropout: dropout 概率,默认为 0.1
"""
def __init__(self, d_model, max_len=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout) # Dropout层
pe = torch.zeros(max_len, d_model) # 初始化位置编码
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 位置索引
div_term = torch.exp(
torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)
) # 用于计算正弦和余弦的位置项
pe[:, 0::2] = torch.sin(position * div_term) # 正弦位置编码
pe[:, 1::2] = torch.cos(position * div_term) # 余弦位置编码
pe = pe.unsqueeze(0) # 增加批次维度
self.register_buffer("pe", pe) # 注册位置编码为缓冲区
def forward(self, x):
"""前向传播函数。
:param x: 输入张量 (batch_size, seq_len, d_model)
:return: 添加了位置编码的张量 (batch_size, seq_len, d_model)
"""
x = x + self.pe[:, : x.size(1)] # 添加位置编码
return self.dropout(x) # 应用dropout
class TransformerModel(nn.Module):
"""Transformer模型。
:param input_size: 输入特征的维度
:param d_model: 输入和输出的维度
:param num_heads: 注意力头的数量
:param num_layers: 编码器层数
:param dff: 前馈神经网络的中间维度
:param dropout: dropout 概率,默认为 0.1
"""
def __init__(self, input_size, d_model, num_heads, num_layers, dff, dropout=0.1):
super().__init__()
self.embedding = nn.Linear(input_size, d_model) # 输入嵌入
self.pos_encoding = PositionalEncoding(d_model, dropout=dropout) # 位置编码
self.encoder_layers = nn.ModuleList(
[
EncoderLayer(d_model, num_heads, dff, dropout)
for _ in range(num_layers)
] # 编码器层
)
self.fc = nn.Linear(d_model, 1) # 全连接层
def forward(self, x):
"""前向传播函数。
:param x: 输入张量 (batch_size, seq_len, input_size)
:return: 输出张量 (batch_size, 1)
"""
x = self.embedding(x) # 输入嵌入
x = self.pos_encoding(x) # 位置编码
for enc_layer in self.encoder_layers:
x = enc_layer(x) # 通过每个编码器层
x = self.fc(x.mean(dim=1)) # 使用全局平均池化并进行最终线性变换
return x
5.3 training.py
import os
import joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdm
from vectorbt_5.model_definition import TransformerModel
class SingleWindowDataset(Dataset):
def __init__(self, X, y):
"""单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。
:param X: 滑动窗口特征数据
:param y: 目标变量
"""
self.X = X.astype(np.float32) # 将滑动窗口特征数据转换为float32类型
self.y = y.astype(np.float32) # 将目标变量转换为float32类型
def __len__(self):
return len(self.y) # 返回目标变量的长度
def __getitem__(self, idx):
x = torch.from_numpy(self.X[idx]).float() # 将滑动窗口特征数据转换为PyTorch张量
label = torch.tensor(
self.y[idx], dtype=torch.float32
) # 将目标变量转换为PyTorch张量
return x, label # 返回特征和标签
class TrainingStateManager:
def __init__(self, model_dir="./models"):
"""训练状态管理器类,负责模型和预处理器的保存和加载。
:param model_dir: 模型保存目录,defaults to "./models"
"""
self.model_dir = model_dir # 设置模型保存目录
self.model_path = f"{self.model_dir}/vectorbt_5_model.pth" # 设置模型文件路径
self.preprocessors_path = (
f"{self.model_dir}/vectorbt_5_preprocessors.pkl" # 设置预处理器文件路径
)
os.makedirs(model_dir, exist_ok=True) # 创建模型保存目录(如果不存在)
def save(self, model, feature_engineer, feature_selector, config):
"""保存模型和预处理器。
:param model: 训练好的模型
:param feature_engineer: 特征工程对象
:param feature_selector: 特征选择器
:param config: 模型配置
"""
torch.save(model.state_dict(), self.model_path) # 保存模型参数
joblib.dump(
{
"feature_engineer": feature_engineer,
"feature_selector": feature_selector,
"config": config,
},
self.preprocessors_path,
) # 保存预处理器和配置
print(f"Model saved to {self.model_path}") # 打印模型保存路径
print(
f"Preprocessor saved to {self.preprocessors_path}"
) # 打印预处理器保存路径
def load(self, device):
"""加载模型和预处理器。
:param device: 计算设备(CPU/GPU)
:return: 加载的模型、特征工程对象、特征选择器和模型配置
"""
model = None # 初始化模型
feature_engineer = None # 初始化特征工程对象
feature_selector = None # 初始化特征选择器
config = None # 初始化配置
if os.path.exists(self.preprocessors_path): # 如果预处理器文件存在
preprocess = joblib.load(self.preprocessors_path) # 加载预处理器
feature_engineer = preprocess["feature_engineer"] # 获取特征工程对象
feature_selector = preprocess["feature_selector"] # 获取特征选择器
config = preprocess["config"] # 获取配置
if os.path.exists(self.model_path): # 如果模型文件存在
model = TransformerModel(
input_size=config["input_size"],
d_model=config["d_model"],
num_heads=config["num_heads"],
num_layers=config["num_layers"],
dff=config["dff"],
dropout=config["dropout"],
).to(device)
model.load_state_dict(
torch.load(self.model_path, weights_only=False, map_location=device)
) # 加载模型参数
return (
model,
feature_engineer,
feature_selector,
config,
) # 返回加载的模型、特征工程对象、特征选择器和配置
class OnlineFeatureEngineer:
def __init__(self, windows=[5]):
"""在线特征工程生成器类,用于生成技术指标特征并进行标准化。
:param windows: 滑动窗口列表(用于特征生成),defaults to [5]
"""
self.windows = windows # 设置滑动窗口列表
self.n_features = 10 # 设置特征数量
self.scalers = {
"Trend": RobustScaler(), # 趋势类指标使用RobustScaler
"Momentum": MinMaxScaler(), # 动量类指标使用MinMaxScaler
"Volatility": RobustScaler(), # 波动率类指标使用RobustScaler
"Volume": StandardScaler(), # 成交量类指标使用StandardScaler
# "Sentiment": StandardScaler(), # 市场情绪类指标使用StandardScaler
"SupportResistance": MinMaxScaler(), # 支撑阻力类指标使用MinMaxScaler
"Statistical": StandardScaler(), # 统计类指标使用StandardScaler
"Composite": RobustScaler(), # 复合型指标使用RobustScaler
}
# "price": ["open", "high", "low", "close"],
self.feature_groups = {
# Trend-Following Indicators 趋势类指标
"Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],
# Momentum Indicators 动量类指标
"Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],
# Volatility Indicators 波动率类指标
"Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],
# Volume Indicators 成交量类指标
"Volume": ["OBV", "MFI"],
# Market Sentiment Indicators 市场情绪类指标 (需要外部数据)
# "Sentiment": [],
# Support/Resistance Indicators 支撑阻力类指标
"SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],
# Statistical Indicators 统计类指标
"Statistical": ["LR_slope", "LR_angle"],
# Composite Indicators 复合型指标
"Composite": [
"Ichimoku_tenkan",
"Ichimoku_kijun",
"Ichimoku_senkou_a",
"Ichimoku_senkou_b",
"Ichimoku_chikou",
],
}
self.all_features = [
f for sublist in self.feature_groups.values() for f in sublist
] # 生成所有特征列表
def partial_fit(self, new_df):
"""对新数据进行部分拟合。
:param new_df: 新数据DataFrame
"""
new_features = self.generate_features(new_df, refit=True) # 生成新数据的特征
for group, features in self.feature_groups.items(): # 遍历每个特征组
if hasattr(self.scalers[group], "partial_fit"): # 如果该缩放器支持部分拟合
self.scalers[group].partial_fit(
new_features[features]
) # 对新数据进行部分拟合
def generate_features(self, df, refit=False):
"""生成技术指标特征。
:param df: 原始数据DataFrame
:param refit: 是否重新拟合标准化器,defaults to False
:return: 特征DataFrame
生成8大类技术指标:
1. 趋势类指标(MA, MACD等)
2. 动量类指标(RSI, KDJ等)
3. 波动率指标(布林带, ATR等)
4. 成交量指标(OBV, MFI等)
5. 市场情绪类指标 (需要外部数据) -- 忽略
6. 支撑阻力指标(斐波那契回撤等)
7. 统计指标(线性回归斜率等)
8. 复合指标(Ichimoku云图等)
"""
processed = [] # 初始化处理后的特征列表
for group, features in self.feature_groups.items(): # 遍历每个特征组
scaler = self.scalers[group] # 获取对应的缩放器
if not refit: # 如果不重新拟合
scaler.fit(df[features]) # 拟合缩放器
scaled = scaler.transform(df[features]) # 标准化特征
processed.append(scaled) # 添加到处理后的特征列表
processed_df = pd.DataFrame(
np.hstack(processed), index=df.index, columns=self.all_features
) # 将处理后的特征合并为DataFrame
processed_df["ts_code"] = df["ts_code"] # 添加股票代码
processed_df["returns"] = df["returns"] # 添加收益
return processed_df.dropna() # 删除缺失值
def feature_selection(self, X, y):
"""进行特征选择。
:param X: 特征数据
:param y: 目标变量
:return: 选择后的特征数据
"""
selector = RandomForestRegressor(
n_estimators=50, n_jobs=-1
) # 初始化随机森林回归器
selector.fit(X, y) # 拟合随机森林回归器
feature_selector = SelectFromModel(selector, prefit=True) # 初始化特征选择器
return feature_selector # 返回特征选择器
class IncrementalDataHandler:
def __init__(self, feature_engineer, feature_selector, window_size=5):
"""增量数据处理器类,用于处理增量数据并生成滑动窗口数据。
:param feature_engineer: 特征工程对象
:param feature_selector: 特征选择器
:param window_size: 滑动窗口大小,defaults to 5
"""
self.feature_engineer = feature_engineer # 设置特征工程对象
self.feature_selector = feature_selector # 设置特征选择器
self.window_size = window_size # 设置滑动窗口大小
self.buffer = pd.DataFrame() # 初始化数据缓冲区
def update_buffer(self, new_df):
"""更新数据缓冲区。
:param new_df: 新数据DataFrame
"""
self.buffer = pd.concat(
[self.buffer, new_df]
).sort_index() # 更新数据缓冲区并排序
def prepare_incremental_data(self):
"""准备增量数据。
:return: 训练数据和测试数据
"""
processed_df = self.feature_engineer.generate_features(self.buffer) # 生成特征
X_selected = self.feature_selector.transform(
processed_df[self.feature_engineer.all_features].values
) # 选择特征
X, y = self.sliding_window(processed_df, X_selected) # 生成滑动窗口数据
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, shuffle=False
) # 划分训练集和测试集
return (X_train, y_train), (X_test, y_test) # 返回训练集和测试集
def sliding_window(self, df, features):
"""生成时序滑动窗口数据"""
sequences = [] # 初始化序列列表
labels = [] # 初始化标签列表
for ts_code, group in df.groupby("ts_code"): # 按股票代码分组
group_features = features[df["ts_code"] == ts_code] # 获取该股票的特征
for i in range(self.window_size, len(group)): # 生成滑动窗口
sequences.append(
group_features[i - self.window_size : i]
) # 添加滑动窗口特征
labels.append(group["returns"].iloc[i]) # 添加标签
return np.array(sequences, dtype=np.float32), np.array(
labels, dtype=np.float32
) # 返回滑动窗口特征和标签
class IncrementalTrainer:
def __init__(self, device, window_size=5):
"""增量训练控制器类,负责模型的初始训练和增量更新。
:param device: 计算设备(CPU/GPU)
"""
self.device = device # 设置计算设备
self.window_size = window_size # 设置滑动窗口大小
self.state_manager = TrainingStateManager() # 初始化训练状态管理器
self.model = None # 初始化模型
self.feature_engineer = None # 初始化特征工程对象
self.feature_selector = None # 初始化特征选择器
self.config = None # 初始化配置
self.load_state() # 加载状态
def load_state(self):
"""加载模型和预处理器。"""
(
self.model,
self.feature_engineer,
self.feature_selector,
self.config,
) = self.state_manager.load(
self.device
) # 加载模型、特征工程对象、特征选择器和配置
def initial_train(self, df):
"""初始训练模型。
:param df: 原始数据DataFrame
"""
# 特征生成
self.feature_engineer = OnlineFeatureEngineer(
windows=[self.window_size]
) # 初始化特征工程对象
processed_df = self.feature_engineer.generate_features(df) # 生成特征
# 特征选择
X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(
processed_df.index
) # 准备特征和标签
self.feature_selector = self.feature_engineer.feature_selection(
X, y
) # 选择特征
# 准备训练数据
data_handler = IncrementalDataHandler(
feature_engineer=self.feature_engineer,
feature_selector=self.feature_selector,
window_size=self.window_size,
) # 初始化增量数据处理器
data_handler.buffer = df # 更新数据缓冲区
(X_train, y_train), (X_test, y_test) = (
data_handler.prepare_incremental_data()
) # 准备训练数据
# Optuna超参优化
self._optimize_parameters(X_train, y_train, X_test, y_test) # 优化超参数
# 最佳模型训练
print(f"Initial Model Config: {self.config}") # 打印初始模型配置
self._train(X_train, y_train) # 训练模型
print("Initial Model Evaluation:") # 打印初始模型评估
self._evaluate(X_test, y_test) # 评估模型
print("Initial Model Save:") # 打印初始模型保存
self.state_manager.save(
self.model, self.feature_engineer, self.feature_selector, self.config
) # 保存模型
def incremental_update(self, new_df):
"""增量更新模型。
:param new_df: 新数据DataFrame
"""
if not self.model:
print(
"The model does not exist, please train it first."
) # 如果模型不存在,提示先训练模型
return
self.feature_engineer.partial_fit(new_df) # 对新数据进行部分拟合
data_handler = IncrementalDataHandler(
self.feature_engineer, self.feature_selector
) # 初始化增量数据处理器
data_handler.update_buffer(new_df) # 更新数据缓冲区
(X_train, y_train), (X_test, y_test) = (
data_handler.prepare_incremental_data()
) # 准备增量数据
print(f"Incremental Model Config: {self.config}") # 打印增量模型配置
self._train(X_train, y_train) # 训练模型
print("\nIncremental Model Evaluation:") # 打印增量模型评估
self._evaluate(X_test, y_test) # 评估模型
print("\nIncremental Model Save:") # 打印增量模型保存
self.state_manager.save(
self.model, self.feature_engineer, self.feature_selector, self.config
) # 保存模型
def _optimize_parameters(self, X_train, y_train, X_test, y_test):
"""Optuna超参优化
:param X_train: 训练集滑动窗口特征数据
:param y_train: 训练集目标变量
:param X_test: 测试集滑动窗口特征数据
:param y_test: 测试集目标变量
"""
def objective(trial):
self.config = {
"num_heads": trial.suggest_categorical("num_heads", [4, 8]),
"d_model": trial.suggest_int("d_model", 64, 256, step=32), # 以32为步长
"num_layers": trial.suggest_int("num_layers", 2, 4),
"dff": trial.suggest_int("dff", 128, 512, step=64),
"dropout": trial.suggest_float("dropout", 0.1, 0.3),
"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),
"input_size": X_train[0].shape[-1], # 输入特征维度
"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),
"epochs": 100, # 训练轮数
"window_size": self.window_size, # 滑动窗口大小
}
self._train(X_train, y_train) # 训练模型
val_loss = self._evaluate(X_test, y_test) # 评估模型
print(f"Val Loss: {val_loss:.4f}") # 打印验证损失
return val_loss # 返回验证损失
# 超参优化
study = optuna.create_study(direction="minimize") # 创建研究
study.optimize(objective, n_trials=10, show_progress_bar=True) # 优化目标函数
print(f"Training Best params: {study.best_params}") # 打印最佳参数
# 最佳模型参数
self.config.update(study.best_params) # 更新配置
def _train(self, X_train, y_train):
"""模型训练内部方法。
:param X_train: 训练集滑动窗口特征数据
:param y_train: 训练集目标变量
"""
print(f"Training Model Config: {self.config}") # 打印模型配置
num_heads = self.config.get("num_heads", 8)
d_model = self.config.get("d_model", 64)
d_model = (d_model // num_heads) * num_heads
num_layers = self.config.get("num_layers", 3)
dff = self.config.get("dff", 256)
dropout = self.config.get("dropout", 0.1)
lr = self.config.get("lr", 1e-4)
input_size = self.config.get(
"input_dim", X_train[0].shape[-1]
) # 获取输入特征维度
epochs = self.config.get("epochs", 100) # 获取训练轮数
batch_size = self.config.get("batch_size", 128)
# 强制维度约束
if d_model % num_heads != 0:
d_model = (d_model // num_heads) * num_heads # 自动调整为最近的可整除数
dataset = SingleWindowDataset(X_train, y_train) # 初始化数据集
loader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=False,
) # 初始化数据加载器
# 初始化模型
self.model = TransformerModel(
input_size, d_model, num_heads, num_layers, dff, dropout
).to(self.device)
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr) # 初始化优化器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, "min", patience=5
) # 初始化学习率调度器
criterion = nn.HuberLoss() # 初始化损失函数
# 训练循环
for epoch in tqdm(range(epochs), desc="Training"): # 进行训练
self.model.train() # 设置模型为训练模式
total_loss = 0 # 初始化总损失
for X_batch, y_batch in loader: # 遍历数据加载器
X_batch = X_batch.to(self.device) # 将特征数据移动到指定设备
y_batch = y_batch.to(self.device).unsqueeze(
1
) # 将标签数据移动到指定设备并扩展维度
optimizer.zero_grad() # 清零梯度
preds = self.model(X_batch) # 前向传播
loss = criterion(preds, y_batch) # 计算损失
loss.backward() # 反向传播
nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) # 梯度裁剪
optimizer.step() # 更新参数
total_loss += loss.item() # 累加损失
# 学习率调整
avg_loss = total_loss / len(loader) # 计算平均损失
scheduler.step(avg_loss) # 更新学习率
def _evaluate(self, X_test, y_test):
"""模型评估内部方法。
:param X_test: 测试集滑动窗口特征数据
:param y_test: 测试集目标变量
:return: 平均损失
"""
test_dataset = SingleWindowDataset(X_test, y_test) # 初始化测试数据集
test_loader = DataLoader(
test_dataset,
batch_size=128,
shuffle=False,
) # 初始化测试数据加载器
self.model.eval() # 设置模型为评估模式
total_loss = 0 # 初始化总损失
criterion = nn.HuberLoss() # 初始化损失函数
with torch.no_grad(): # 关闭梯度计算
for X_batch, y_batch in test_loader: # 遍历测试数据加载器
X_batch = X_batch.to(self.device) # 将特征数据移动到指定设备
y_batch = y_batch.to(self.device).unsqueeze(
1
) # 将标签数据移动到指定设备并扩展维度
preds = self.model(X_batch) # 前向传播
loss = criterion(preds, y_batch) # 计算损失
total_loss += loss.item() * len(y_batch) # 累加损失
test_loss = total_loss / len(test_dataset) # 计算平均损失
print(f"Test Loss: {test_loss}") # 打印测试损失
return test_loss # 返回测试损失
5.4 backtesting.py
import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbt
class DualEMACrossoverStrategy:
def __init__(self, pred_returns, volatility, params):
"""双EMA交叉策略类。
:param pred_returns: 预测的收益率序列
:param volatility: 波动率序列(用于仓位计算)
:param params: 参数字典,包含快慢EMA的时间跨度
"""
self.pred_returns = pred_returns # 预测的收益率序列
self.volatility = volatility.clip(lower=0.01) # 防止波动率为0,导致除零错误
self.fast_span = params["fast_span"] # 快速EMA的时间跨度
self.slow_span = params["slow_span"] # 慢速EMA的时间跨度
def generate_signals(self):
"""生成交易信号。
:return: (交易信号, 仓位大小)
"""
ema_fast = self.pred_returns.ewm(
span=self.fast_span, min_periods=self.fast_span
).mean() # 计算快速EMA
ema_slow = self.pred_returns.ewm(
span=self.slow_span, min_periods=self.slow_span
).mean() # 计算慢速EMA
signals = pd.Series(0, index=self.pred_returns.index) # 初始化信号序列
signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (
1 # 买入信号
)
signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (
-1
) # 卖出信号
# 使用tanh函数压缩仓位大小,实现非线性映射
position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(
0.3, 0.8
)
return signals, position_size # 返回信号和仓位大小
class BacktestStrategy:
def __init__(self, model, device):
"""回测执行引擎。
:param model: 训练好的预测模型
:param device: 计算设备(CPU/GPU)
"""
self.model = model # 训练好的预测模型
self.device = device # 计算设备
self._configure_vbt() # 配置VectorBT全局参数
def _configure_vbt(self):
"""配置VectorBT全局参数。"""
vbt.settings.array_wrapper["freq"] = "D" # 设置时间频率为日频
vbt.settings.plotting["layout"]["template"] = "vbt_dark" # 使用暗色主题
vbt.settings.plotting["layout"]["width"] = 1200 # 设置图表宽度
vbt.settings.portfolio["init_cash"] = 100000.0 # 初始资金10万元
vbt.settings.portfolio["fees"] = 0.0025 # 交易成本(手续费)0.25%
vbt.settings.portfolio["slippage"] = 0.0025 # 交易成本(滑点)0.25%
def _optimize_parameters(self, result_df):
"""优化参数逻辑调整。
:param result_df: 包含预测收益率的结果DataFrame
:return: 最优参数
"""
def objective(trial):
params = {
"fast_span": trial.suggest_int(
"fast_span", 10, 30
), # 建议快速EMA的时间跨度
"slow_span": trial.suggest_int(
"slow_span", 50, 100
), # 建议慢速EMA的时间跨度
}
strategy = DualEMACrossoverStrategy(
pred_returns=result_df["pred_returns"],
volatility=result_df["volatility"],
params=params,
) # 创建策略实例
signals, position_size = strategy.generate_signals() # 生成交易信号
pf = vbt.Portfolio.from_signals(
close=result_df["close"],
entries=signals.shift(1) == 1, # 买入信号
exits=signals.shift(1) == -1, # 卖出信号
size=position_size, # 固定仓位
freq="D",
)
return pf.total_profit() # 返回总利润
study = optuna.create_study(direction="maximize") # 创建Optuna研究
study.optimize(objective, n_trials=10, show_progress_bar=True) # 优化参数
print(f"Strategy Best params: {study.best_params}") # 打印最优参数
return study.best_params # 返回最优参数
def run(self, test_data, df):
"""执行完整回测流程。
:param test_data: 测试数据集元组(X_test, y_test)
:param df: 原始数据DataFrame
:return: (组合对象, 结果DataFrame)
"""
X_test = test_data # 测试数据
self.model.eval() # 将模型设置为评估模式
with torch.no_grad(): # 禁用梯度计算
test_tensor = torch.FloatTensor(X_test).to(
self.device
) # 转换为Tensor并移动到指定设备
preds = (
self.model(test_tensor).cpu().numpy().flatten()
) # 获取预测值并转换为NumPy数组
test_dates = df.index[-len(preds) :] # 获取测试日期
result_df = pd.DataFrame(
{
"close": df["close"].values[-len(preds) :],
"pred_returns": preds, # 使用rolling窗口计算动态波动率
"volatility": df["ATR"].values[-len(preds) :]
/ df["close"].values[-len(preds) :],
},
index=test_dates,
) # 创建结果DataFrame
best_params = self._optimize_parameters(result_df) # 运行参数优化
strategy = DualEMACrossoverStrategy(
pred_returns=result_df["pred_returns"],
volatility=result_df["volatility"],
params=best_params,
) # 创建策略实例
signals, position_size = strategy.generate_signals() # 生成交易信号
return vbt.Portfolio.from_signals(
close=result_df["close"],
entries=signals == 1, # 买入信号
exits=signals == -1, # 卖出信号
size=position_size,
size_type="percent",
freq="D",
) # 执行组合回测
5.5 main.py
import random
import numpy as np
import torch
from vectorbt_5.backtesting import BacktestStrategy
from vectorbt_5.data_processing import load_data
from vectorbt_5.training import IncrementalTrainer, TrainingStateManager
def set_random_seed(seed=42):
"""设置全局随机种子
:param seed: 随机种子, defaults to 42
影响范围:
- Python内置随机模块
- Numpy随机数生成
- PyTorch CPU/CUDA随机种子
"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # 如果使用多个GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def prepare_backtest_data(ts_code, device):
"""准备回测数据。
:param ts_code: 股票代码
:param device: 计算设备(CPU/GPU)
:return: 滑动窗口特征数据、原始数据DataFrame、模型、特征工程对象、特征选择器和配置
"""
state_manager = TrainingStateManager() # 初始化训练状态管理器
model, feature_engineer, feature_selector, config = state_manager.load(
device
) # 加载模型和预处理器
print(f"Model Config: {config}") # 打印模型配置
# 加载并处理数据
test_df = load_data([ts_code]) # 加载数据
test_df = test_df[-300:] # 取最近300条数据
processed_df = feature_engineer.generate_features(test_df) # 生成特征
X_selected = feature_selector.transform(
processed_df[feature_engineer.all_features].values
) # 选择特征
# 构建滑动窗口
window_size = config["window_size"] # 获取滑动窗口大小
sequences = [] # 初始化序列列表
for i in range(window_size, len(X_selected)): # 遍历数据
sequences.append(X_selected[i - window_size : i]) # 添加滑动窗口特征
return (
np.array(sequences, dtype=np.float32), # 返回滑动窗口特征数据
test_df, # 返回原始数据DataFrame
model, # 返回模型
feature_engineer, # 返回特征工程对象
feature_selector, # 返回特征选择器
config, # 返回配置
)
if __name__ == "__main__":
# 设置随机种子
# 函数确保了整个训练过程的可重复性。
# 通过设置相同的随机种子,可以保证每次运行时生成的随机数序列一致,这对于调试和实验验证非常重要。
set_random_seed()
# 检测可用计算设备(优先使用CUDA)
device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available() else "cpu"
)
# 股票行情
# start_date = "20180101"
# end_date = "20241231"
trainer = IncrementalTrainer(device)
# 多股票初始训练示例
# 浦发银行(600000.SH)
# 招商银行(600036.SH)
# 平安银行(000001.SZ)
train_codes = ["600000.SH", "600036.SH", "000001.SZ"]
train_df = load_data(train_codes)
model = trainer.initial_train(train_df)
# 增量训练示例
# 贵州茅台(600519.SH)
# new_df = load_data(["600519.SH"])
# model = trainer.incremental_update(new_df)
# 单股票回测
(test_data, test_df, model, feature_engineer, feature_selector, config) = (
prepare_backtest_data("600036.SH", device)
)
backtester = BacktestStrategy(model, device)
pf = backtester.run(test_data, test_df)
print("回测结果统计:")
print(pf.stats())
pf.plot().show()
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。