基于Transformer的多资产收益预测模型实战(附PyTorch模型训练及可视化完整代码)
一、项目背景与目标
在量化投资领域,利用时间序列数据预测资产收益是核心任务之一。传统方法如LSTM难以捕捉资产间的复杂依赖关系,而Transformer架构通过自注意力机制能有效建模多资产间的联动效应。
本文将从零开始构建一个基于PyTorch的多资产收益预测模型,涵盖数据生成、特征工程、模型设计、训练及可视化全流程,适合深度学习与量化投资的初学者入门。
二、核心技术栈
- 数据处理:Pandas/Numpy(数据生成与预处理)
- 深度学习框架:PyTorch(模型构建与训练)
- 可视化:Matplotlib(结果分析)
- 核心算法:Transformer(自注意力机制)
三、数据生成与预处理
1. 模拟金融数据生成
我们通过以下步骤生成包含5只资产的时间序列数据:
- 市场基准因子:模拟市场整体趋势(几何布朗运动)
- 行业因子:引入周期性波动区分不同行业(如科技、消费、能源)
- 特质因子:每只资产的独立噪声
def generate_market_data(days=2000, n_assets=5):
np.random.seed(42)
market = np.cumprod(1 + np.random.normal(0.0003, 0.015, days)) # 市场基准
assets = []
sector_map = {0: "Tech", 1: "Tech", 2: "Consume", 3: "Consume", 4: "Energy"}
for i in range(n_assets):
sector_factor = 0.3 * np.sin(i * 0.8 + np.linspace(0, 10 * np.pi, days)) # 行业周期因子
idiosyncratic = np.cumprod(1 + np.random.normal(0.0002, 0.02, days)) # 特质因子
price = market * (1 + sector_factor) * idiosyncratic # 价格合成
assets.append(price)
dates = pd.date_range("2015-01-01", periods=days)
return pd.DataFrame(np.array(assets).T, index=dates, columns=[f"Asset_{i}" for i in range(n_assets)])
2. 数据形状说明
生成的DataFrame形状为[2000天, 5资产]
,索引为时间戳,列名为Asset_0到Asset_4。
四、特征工程:从价格到可训练数据
1. 基础时间序列特征
为每只资产计算以下特征:
- 收益率(Return):相邻日价格变化率
- 波动率(Volatility):20日滚动标准差年化
- 移动平均(MA10):10日价格移动平均
- 行业相对强弱(Sector_RS):资产价格与所属行业平均价格的比值
def create_features(data, lookback=60):
n_assets = data.shape[1]
sector_map = {0: "Tech", 1: "Tech", 2: "Consume", 3: "Consume", 4: "Energy"}
features = []
for i, asset in enumerate(data.columns):
df = pd.DataFrame()
df["Return"] = data[asset].pct_change()
df["Volatility"] = df["Return"].rolling(20).std() * np.sqrt(252) # 年化波动率
df["MA10"] = data[asset].rolling(10).mean()
# 计算行业相对强弱
sector = sector_map[i]
sector_cols = [col for col in data.columns if sector_map[int(col.split("_")[1])] == sector]
df["Sector_RS"] = data[asset] / data[sector_cols].mean(axis=1)
features.append(df.dropna()) # 去除NaN
# 对齐时间索引
common_idx = features[0].index
for df in features[1:]:
common_idx = common_idx.intersection(df.index)
features = [df.loc[common_idx] for df in features]
# 构建3D特征张量 [样本数, 时间步, 资产数, 特征数]
X = np.stack([np.stack([feat.iloc[i-lookback:i] for i in range(lookback, len(feat))], axis=0) for feat in features], axis=2)
# 标签:未来5日平均收益率
y = np.array([data.loc[common_idx].iloc[i:i+5].pct_change().mean().values for i in range(lookback, len(common_idx))])
return X, y
2. 输入输出形状
- 特征张量
X
形状:[样本数, 时间步(60), 资产数(5), 特征数(4)]
- 标签
y
形状:[样本数, 资产数(5)]
(每个样本对应5只资产的未来5日平均收益率)
五、Transformer模型构建:核心架构解析
1. 模型设计目标
- 处理多资产时间序列:同时输入5只资产的历史数据
- 捕捉时间依赖与资产间依赖:通过位置编码和自注意力机制
- 输出多资产收益预测:回归问题,使用MSE损失
2. 关键组件解析
(1)资产嵌入层(Asset Embedding)
将每个资产的4维特征映射到64维隐空间:
self.asset_embed = nn.Linear(n_features=4, d_model=64)
输入形状:(batch, seq_len, assets, features)
→ 输出:(batch, seq_len, assets, d_model)
(2)位置编码(Positional Embedding)
由于Transformer无内置时序信息,需手动添加位置编码:
self.time_pos = nn.Parameter(torch.randn(1, lookback=60, 1, d_model=64)) # 时间位置编码
self.asset_pos = nn.Parameter(torch.randn(1, 1, n_assets=5, d_model=64)) # 资产位置编码
- 通过广播机制与资产嵌入相加,分别捕获时间和资产维度的位置信息。
(3)自定义Transformer编码器层(Custom Transformer Encoder Layer)
继承PyTorch原生层,返回注意力权重以可视化:
class CustomTransformerEncoderLayer(nn.TransformerEncoderLayer):
def __init__(self, d_model, nhead, dim_feedforward=256, dropout=0.1):
super().__init__(d_model, nhead, dim_feedforward, dropout, batch_first=True) # 显式启用batch_first
def forward(self, src, src_mask=None, src_key_padding_mask=None):
src2, attn_weights = self.self_attn(src, src, src, need_weights=True) # 获取注意力权重
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)
return src, attn_weights
(4)维度调整核心逻辑
在进入Encoder前,将张量形状从(batch, seq, assets, d_model)
调整为(batch, seq*assets, d_model)
,以便Encoder处理:
x = x + self.time_pos + self.asset_pos # 叠加位置编码
x = x.permute(0, 1, 3, 2) # 调整维度为 [batch, seq, d_model, assets]
x = x.reshape(batch_size, seq_len * n_assets, d_model) # 合并资产与序列维度
x, attn_weights = self.encoder(x) # Encoder输出形状:(batch, seq*assets, d_model)
x = x.reshape(batch_size, seq_len, n_assets, d_model) # 恢复维度
(5)解码器(Decoder)
提取最后时间步特征,拼接后映射到5维收益空间:
self.decoder = nn.Linear(d_model * n_assets, n_assets) # 输入320维,输出5维
六、模型训练:从数据加载到优化
1. 数据加载器
使用PyTorch的DataLoader处理批量数据:
train_dataset = TensorDataset(torch.FloatTensor(X_train), torch.FloatTensor(y_train))
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
2. 训练配置
- 损失函数:均方误差(MSE)
- 优化器:Adam(学习率
1e-4
) - 训练循环:50个epoch,记录训练/验证损失
model = AssetTransformer(n_features=4, n_assets=5, lookback=60)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
七、可视化与结果分析
1. 自注意力矩阵热力图(Cross-Asset Attention Matrix)
提取第一层编码器的注意力权重,展示资产间的依赖关系(以最后时间步为例):
def plot_attention(attention_weights, asset_names):
num_assets = len(asset_names)
# 取第一个样本、第一个注意力头的权重,形状为 [batch, heads, query, key]
attn_matrix = attention_weights[0][0].detach().numpy() # 假设batch=1
attn_matrix = attn_matrix.reshape(num_assets, num_assets) # 恢复资产间注意力矩阵
plt.imshow(attn_matrix, cmap="viridis", aspect="auto")
plt.colorbar()
plt.title("Cross-Asset Attention Matrix (First Layer)")
plt.xlabel("Key Assets")
plt.ylabel("Query Assets")
plt.xticks(range(num_assets), asset_names)
plt.yticks(range(num_assets), asset_names)
plt.tight_layout()
plt.show()
核心作用:可视化Transformer编码器中资产间的依赖关系,揭示模型如何通过自注意力机制捕捉不同资产的联动效应。
(1)数据提取逻辑
- 注意力权重形状:通过自定义编码器层获取的注意力权重形状为
[batch_size, n_heads, query_length, key_length]
,其中query_length
和key_length
均为时间步×资产数
(本例中为60×5=300
)。 - 关键筛选:聚焦最后一个时间步(当前预测时刻)的资产间注意力,提取最后
n_assets
个查询和键的权重,重塑为[资产数, 资产数]
矩阵(本例中为5×5
),消除时间维度的干扰。
(2)图形元素解析
- 横轴/纵轴:均为资产名称(Asset_0 到 Asset_4),横轴表示“键资产”(Key),纵轴表示“查询资产”(Query)。例如,纵轴Asset_0对应横轴Asset_1的单元格值,表示“资产0在计算时对资产1的关注度”。
- 颜色映射:使用
viridis
色卡,深色(如紫色/蓝色)代表高注意力权重(接近1),浅色(如黄色/白色)代表低权重(接近0)。 - 行业分组验证:
- 同一行业资产(如Tech行业的Asset_0和Asset_1)的交叉区域颜色应更深,表明模型关注行业内协同效应。
- 跨行业资产(如Tech与Energy)的权重可能较低,颜色较浅,符合数据生成时的行业因子设计(行业间波动相关性较低)。
(3)金融意义
- 资产联动捕捉:高权重表示模型认为这两项资产的历史特征对预测当前收益更重要,可能用于构建资产组合时的相关性分析。
- 注意力异常排查:若某资产对自身的注意力权重显著高于其他资产(对角线元素突出),可能意味着模型过度依赖单一资产,存在过拟合风险。
2. 预测结果散点图(Predicted vs Actual Returns)
散点图对比实际收益与预测收益,理想情况下点分布在y=x直线附近:
plt.figure(figsize=(12, 6))
for i in range(5):
plt.scatter(y_test[:, i], pred_test[:, i], alpha=0.5, label=f"Asset_{i}")
plt.plot([-0.1, 0.1], [-0.1, 0.1], "k--", label="Perfect Prediction")
plt.title("Predicted vs Actual 5-Day Returns")
plt.xlabel("Actual Returns")
plt.ylabel("Predicted Returns")
plt.legend()
plt.grid(alpha=0.3)
plt.show()
核心作用:评估模型对每只资产未来5日平均收益率的预测精度,直观展示预测值与实际值的偏离程度。
(1)图形结构
- 横轴(X轴):实际收益率(Actual 5-Day Returns),范围约
[-0.1, 0.1]
(覆盖多数金融资产的短期波动区间)。 - 纵轴(Y轴):预测收益率(Predicted Returns),与横轴范围一致,便于对比。
- 散点分布:
- 每个资产(5只)用不同颜色区分(如Asset_0为蓝色,Asset_1为橙色),标签清晰标注。
- 理想情况下,散点应紧密分布在黑色虚线
y=x
附近,表明预测值与实际值接近;散点越偏离虚线,预测误差越大。
(2)量化指标辅助解读
- MSE损失:训练日志中显示的Test Loss(如0.0006)对应散点的整体离散程度,值越小,散点越集中。
- 资产差异:若某资产(如Asset_4)的散点明显偏离对角线,可能是该资产的特质因子噪声较大,或模型对其行业特征的捕捉不足。
(3)实战意义
- 预测可靠性判断:若多数散点位于对角线附近,且各资产分布均匀,说明模型泛化能力较强,可用于实际收益预测;反之,需优化特征工程或调整模型结构。
3. 策略回测累计收益曲线(Long-Short Portfolio Performance)
通过预测结果构建多空策略(买入前20%预测收益资产,卖空后20%),对比策略与市场收益:
def backtest_strategy(pred_returns, data, lookback=60):
long_thresh = np.quantile(pred_returns, 0.8, axis=1)[:, None]
short_thresh = np.quantile(pred_returns, 0.2, axis=1)[:, None]
long_mask = pred_returns >= long_thresh
short_mask = pred_returns <= short_thresh
# 标准化仓位(多头/空头资产等权分配)
position = (long_mask / long_mask.sum(axis=1)[:, None]) - (short_mask / short_mask.sum(axis=1)[:, None])
# 计算收益
returns = data.pct_change().iloc[lookback+1:lookback+1+len(position)]
strategy_ret = (position[:-1] * returns.iloc[1:]).sum(axis=1)
market_ret = returns.mean(axis=1)
return strategy_ret.cumsum(), market_ret.cumsum()
核心作用:验证基于预测结果的多空策略能否获取超额收益,对比策略表现与市场基准。
(1)策略构建逻辑
- 多空筛选:
- 多头:预测收益前20%的资产(高于80%分位数),等权买入。
- 空头:预测收益后20%的资产(低于20%分位数),等权卖空。
- 仓位管理:多头和空头仓位分别标准化(权重和为1),确保风险中性。
(2)曲线元素解析
- 蓝色曲线(Transformer Strategy):
- 若曲线向上倾斜且斜率大于市场曲线,表明策略有效,能通过多空操作获取超额收益。
- 若曲线波动较大,可能受限于模拟数据的高噪声,或需增加风险控制(如止损)。
- 黄色曲线(Market Index):
- 市场平均收益(所有资产等权平均),作为基准线。若策略曲线长期位于其上方,说明模型具备实际应用价值。
(3)金融指标延伸
- 夏普比率:可进一步计算策略的风险调整后收益(假设无风险利率为0),公式为
(策略年化收益) / (策略收益标准差)
,值越高表明风险收益比越好。 - 最大回撤:曲线中的回调幅度,反映策略的抗风险能力,与累计收益结合评估策略稳定性。
4. 可视化总结表
图形类型 | 核心价值 | 理想结果特征 | 常见异常信号 |
---|---|---|---|
注意力热力图 | 资产间依赖关系建模验证 | 同行业资产权重高,跨行业权重低 | 对角线权重异常高(过拟合) |
预测散点图 | 模型预测精度评估 | 散点紧密分布于y=x附近,MSE低 | 某资产散点显著偏离(特征不足) |
回测收益曲线 | 策略有效性验证 | 策略曲线持续跑赢市场,夏普比率>1 | 曲线长期低于市场(模型失效) |
通过这三类图形,可从模型机理(注意力)、预测能力(散点)、实战价值(回测)三个维度全面评估Transformer在多资产收益预测中的表现,为后续优化提供明确方向。
八、常见错误与解决方案
1. Transformer输入维度不匹配
错误信息:TypeError: CustomTransformerEncoderLayer.forward() got an unexpected keyword argument 'is_causal'
原因:未正确处理PyTorch Transformer的隐含参数。
解决:在自定义编码器层的forward
方法中添加is_causal=False
默认参数,兼容框架逻辑。
2. 注意力权重形状错误
错误信息:ValueError: cannot reshape array of size 300 into shape (5,5)
原因:未正确提取最后时间步的注意力权重,误将全序列权重直接重塑。
解决:先筛选最后时间步的查询/键资产权重,再重塑为[资产数, 资产数]
矩阵:
# 正确提取最后时间步(假设序列长度60,资产数5)
last_step_attn = attn_matrix[-5:, -5:] # 取最后5个查询(资产)对最后5个键(资产)的权重
3. 位置编码广播失败
错误信息:RuntimeError: The size of tensor a (60) must match the size of tensor b (1) at non-singleton dimension 1
解决:确保位置编码维度包含资产轴(如[1, lookback, 1, d_model]
),通过广播自动适配资产数。
九、项目总结与扩展方向
1. 项目亮点
- 多维度建模:同时捕捉时间序列依赖(位置编码)和资产间依赖(自注意力)。
- 完整量化流程:从数据生成到策略回测,复现真实量化研究闭环。
- 维度调整详解:通过
view()
/reshape()
处理复杂张量变换,解决PyTorch常见形状问题。
2. 改进方向
- 数据增强:添加滑动窗口、噪声注入等技术提升模型鲁棒性。
- 混合架构:结合CNN提取局部特征,或LSTM捕捉长期趋势。
- 真实场景适配:接入股票/期货高频数据,优化数据预处理流程(如复权处理)。
十、给初学者的建议
- 分步调试:先用小数据集(如
days=100, n_assets=2
)验证代码逻辑,再扩展规模。 - 维度打印:在模型各关键节点添加
print(x.shape)
,确保输入输出形状符合预期。 - 官方文档优先:PyTorch Transformer文档是理解注意力机制的最佳材料,重点关注
batch_first
参数和输入维度要求。
通过本项目,读者可掌握多变量时间序列建模的核心思路,理解Transformer在序列建模中的优势,为后续量化模型开发打下坚实基础。
完整代码
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
# ==================== 数据生成与预处理 ====================
def generate_market_data(days=2000, n_assets=5):
np.random.seed(42)
market = np.cumprod(1 + np.random.normal(0.0003, 0.015, days))
assets = []
for i in range(n_assets):
sector_factor = 0.3 * np.sin(i * 0.8 + np.linspace(0, 10 * np.pi, days))
idiosyncratic = np.cumprod(1 + np.random.normal(0.0002, 0.02, days))
price = market * (1 + sector_factor) * idiosyncratic
assets.append(price)
dates = pd.date_range("2015-01-01", periods=days)
return pd.DataFrame(
np.array(assets).T, index=dates, columns=[f"Asset_{i}" for i in range(n_assets)]
)
data = generate_market_data()
# ==================== 特征工程 ====================
def create_features(data, lookback=60):
n_assets = data.shape[1]
features = []
sector_map = {0: "Tech", 1: "Tech", 2: "Consume", 3: "Consume", 4: "Energy"}
for i, asset in enumerate(data.columns):
df = pd.DataFrame()
df["Return"] = data[asset].pct_change()
df["Volatility"] = df["Return"].rolling(20).std() * np.sqrt(252)
df["MA10"] = data[asset].rolling(10).mean()
sector = sector_map[i]
sector_cols = [
col for col in data.columns if sector_map[int(col.split("_")[1])] == sector
]
df["Sector_RS"] = data[asset] / data[sector_cols].mean(axis=1)
features.append(df.dropna())
common_idx = features[0].index
for df in features[1:]:
common_idx = common_idx.intersection(df.index)
features = [df.loc[common_idx] for df in features]
X = np.stack(
[
np.stack(
[feat.iloc[i - lookback : i] for i in range(lookback, len(feat))],
axis=0,
)
for feat in features
],
axis=2,
)
y = np.array(
[
data.loc[common_idx].iloc[i : i + 5].pct_change().mean().values
for i in range(lookback, len(common_idx))
]
)
valid_indices = ~np.isnan(y).any(axis=1) & ~np.isinf(y).any(axis=1)
return X[valid_indices], y[valid_indices]
X, y = create_features(data)
print(f"数据形状: X={X.shape}, y={y.shape}")
# ==================== 模型定义 ====================
class CustomTransformerEncoderLayer(nn.TransformerEncoderLayer):
"""自定义编码器层以返回注意力权重"""
def __init__(
self, d_model, nhead, dim_feedforward=256, dropout=0.1, activation="relu"
):
super().__init__(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
activation=activation,
batch_first=True,
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, src, src_mask=None, src_key_padding_mask=None):
src2, attn_weights = self.self_attn(
src,
src,
src,
attn_mask=src_mask,
key_padding_mask=src_key_padding_mask,
need_weights=True,
)
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)
return src, attn_weights
class AssetTransformer(nn.Module):
def __init__(self, n_features, n_assets, lookback, d_model=64, nhead=4):
super().__init__()
self.asset_embed = nn.Linear(n_features, d_model)
self.time_pos = nn.Parameter(torch.randn(1, lookback, 1, d_model))
self.asset_pos = nn.Parameter(torch.randn(1, 1, n_assets, d_model))
self.final_norm = nn.LayerNorm(d_model)
encoder_layer = CustomTransformerEncoderLayer(
d_model=d_model, nhead=nhead, dim_feedforward=256
)
self.encoder = nn.TransformerEncoder(
encoder_layer, num_layers=3, norm=nn.LayerNorm(d_model)
)
self.decoder = nn.Linear(d_model * n_assets, n_assets)
self.attention_weights = None
def forward(self, x):
batch_size = x.size(0)
seq_len = x.size(1)
n_assets = x.size(2)
x = self.asset_embed(x)
x = x + self.time_pos + self.asset_pos
x = x.view(batch_size, seq_len * n_assets, -1)
attn_weights_list = []
for layer in self.encoder.layers:
x, attn_weights = layer(x)
attn_weights_list.append(attn_weights)
self.attention_weights = attn_weights_list
x = x.view(batch_size, n_assets, seq_len, -1)
x = x[:, :, -1, :]
x = x.reshape(batch_size, -1)
return self.decoder(x)
# ==================== 模型训练 ====================
def train_model(X, y, lookback=60, n_epochs=50):
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
train_dataset = TensorDataset(
torch.FloatTensor(X_train), torch.FloatTensor(y_train)
)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
model = AssetTransformer(n_features=4, n_assets=5, lookback=lookback)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
for epoch in range(n_epochs):
model.train()
total_loss = 0
for batch_x, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
model.eval()
with torch.no_grad():
test_pred = model(torch.FloatTensor(X_test))
test_loss = criterion(test_pred, torch.FloatTensor(y_test))
print(
f"Epoch {epoch + 1}/{n_epochs} | "
f"Train Loss: {total_loss / len(train_loader):.4f} | "
f"Test Loss: {test_loss:.4f}"
)
return model, y_test, test_pred.detach().numpy()
model, y_test, pred_test = train_model(X, y)
# ==================== 可视化部分 ====================
def plot_attention(attention_weights, asset_names):
plt.figure(figsize=(12, 8))
# 取第一个样本、第一个注意力头的注意力权重
num_assets = len(asset_names)
num_seq = 60 # 假设序列长度为 60
layer_weights = attention_weights[0][0, :, : num_assets * num_seq].detach().numpy()
layer_weights = layer_weights.reshape(num_assets * num_seq, num_assets * num_seq)
# 取每个资产最后一个时间步的注意力权重
last_step_weights = layer_weights[-num_assets:, -num_assets:]
plt.imshow(last_step_weights, cmap="viridis", aspect="auto")
plt.colorbar()
plt.title("Cross-Asset Attention Matrix (First Layer)")
plt.xlabel("Key Assets")
plt.ylabel("Query Assets")
plt.xticks(range(num_assets), asset_names)
plt.yticks(range(num_assets), asset_names)
plt.tight_layout()
plt.show()
if model.attention_weights:
asset_names = data.columns.tolist()
plot_attention(model.attention_weights, asset_names)
else:
print("未能捕获注意力权重")
# 预测结果散点图
plt.figure(figsize=(12, 6))
for i in range(5):
plt.scatter(y_test[:, i], pred_test[:, i], alpha=0.5, label=f"Asset_{i}")
plt.plot([-0.1, 0.1], [-0.1, 0.1], "k--")
plt.title("Predicted vs Actual Returns")
plt.xlabel("Actual 5-Day Returns")
plt.ylabel("Predicted Returns")
plt.legend()
plt.grid(alpha=0.3)
plt.show()
# ==================== 策略回测 ====================
def backtest_strategy(pred_returns, data, lookback=60):
long_thresh = np.quantile(pred_returns, 0.8, axis=1)
short_thresh = np.quantile(pred_returns, 0.2, axis=1)
long_mask = pred_returns >= long_thresh[:, None]
short_mask = pred_returns <= short_thresh[:, None]
position = long_mask.astype(float) / long_mask.sum(axis=1)[:, None]
position -= short_mask.astype(float) / short_mask.sum(axis=1)[:, None]
returns = data.pct_change().iloc[lookback + 1 : lookback + 1 + len(position)]
strategy_ret = (position[:-1] * returns.iloc[1:]).sum(axis=1)
market_ret = returns.mean(axis=1)
return strategy_ret.cumsum(), market_ret.cumsum()
strategy_cum, market_cum = backtest_strategy(pred_test, data)
plt.figure(figsize=(12, 6))
plt.plot(strategy_cum, label="Transformer Strategy")
plt.plot(market_cum, label="Market Index", alpha=0.7)
plt.title("Long-Short Portfolio Performance")
plt.xlabel("Trading Days")
plt.ylabel("Cumulative Returns")
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()