102.1 Kaggle竞赛:Optiver - Trading at the Close 竞赛项目Python代码详解

发布于:2025-02-11 ⋅ 阅读:(13) ⋅ 点赞:(0)

0. 承前

如果想更加全面清晰地了解金融资产组合模型进化论的体系架构,可参考:
0. 金融资产组合模型进化全图鉴

1. 竞赛背景与目标

在这里插入图片描述

1.1 竞赛背景

数据结构与竞赛要求详情请查看,在此不赘述:
Optiver - Trading at the Close

1.2 Optiver简介

Optiver是全球领先的做市商之一,此次竞赛围绕股市收盘阶段的交易展开。在美股市场中,收盘价的确定是通过收盘拍卖(Closing Auction)机制完成的,这个阶段的价格预测对于机构投资者具有重要意义。

1.3 竞赛目标

预测股票在收盘拍卖阶段的价格变动。该阶段具有以下特点:

  1. 收盘竞价机制

    • 时间窗口:在交易日最后10分钟进行
    • 订单累积:系统持续接收买卖订单但不立即撮合
    • 价格发现:根据供需关系计算最终收盘价
    • 订单类型:可以提交限价单、市价单和收盘定价单
  2. 预测目标

    • 基于实时市场数据,预测股票价格到收盘时的变化百分比
    • 预测范围从常规交易时段延伸到收盘竞价阶段

2. 竞赛项目框架

收盘价预测系统
数据预处理
特征工程
模型训练
预测部署
数据加载与清洗
内存优化
市场微观结构特征
时序特征
统计特征
LightGBM训练
模型集成
实时特征生成
集成预测
缓存管理

3. 代码实现详解

3.1 数据预处理模块

  1. 数据加载与预处理(Data Loading)
import gc, os, time, warnings
from itertools import combinations
from warnings import simplefilter
import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold, TimeSeriesSplit
import polars as pl
import pickle

# 基础配置
warnings.filterwarnings("ignore")
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

# 运行模式配置
is_offline = False 
LGB = True
NN = False
is_train = True  
is_infer = True 
max_lookback = np.nan
  1. 读取训练数据并进行基础清洗
df = pd.read_csv("/kaggle/input/optiver-trading-at-the-close/train.csv")
df = df.dropna(subset=["target"])
df.reset_index(drop=True, inplace=True)
df_shape = df.shape
df['target_shift1'] = df.groupby(['stock_id','seconds_in_bucket'])['target'].shift(1)
  1. 内存优化与数据类型转换
def reduce_mem_usage(df, verbose=0):
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float32)
    return df

3.2 特征工程模块

  1. 特征权重与全局特征定义
# 股票权重定义
weights = [
    0.004, 0.001, 0.002, 0.006, 0.004, 0.004, 0.002, 0.006, 0.006, 0.002, 0.002, 0.008,
    0.006, 0.002, 0.008, 0.006, 0.002, 0.006, 0.004, 0.002, 0.004, 0.001, 0.006, 0.004,
    # ... (为简洁起见省略中间部分)
    0.002, 0.008, 0.002, 0.004, 0.001, 0.004, 0.006, 0.004,
]
weights = {int(k):v for k,v in enumerate(weights)}

# 全局股票特征计算
global_stock_id_feats = {
    "median_size": df_train.groupby("stock_id")["bid_size"].median() + df_train.groupby("stock_id")["ask_size"].median(),
    "std_size": df_train.groupby("stock_id")["bid_size"].std() + df_train.groupby("stock_id")["ask_size"].std(),
    "ptp_size": df_train.groupby("stock_id")["bid_size"].max() - df_train.groupby("stock_id")["bid_size"].min(),
    "median_price": df_train.groupby("stock_id")["bid_price"].median() + df_train.groupby("stock_id")["ask_price"].median(),
    "std_price": df_train.groupby("stock_id")["bid_price"].std() + df_train.groupby("stock_id")["ask_price"].std(),
    "ptp_price": df_train.groupby("stock_id")["bid_price"].max() - df_train.groupby("stock_id")["ask_price"].min(),
}
  1. 市场微观结构特征构建
def imbalance_features(df):
    prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
    sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]
    
    # 基础特征
    df["volume"] = df.eval("ask_size + bid_size")
    df["mid_price"] = df.eval("(ask_price + bid_price) / 2")
    df["liquidity_imbalance"] = df.eval("(bid_size-ask_size)/(bid_size+ask_size)")
    df["matched_imbalance"] = df.eval("(imbalance_size-matched_size)/(matched_size+imbalance_size)")
    df["size_imbalance"] = df.eval("bid_size / ask_size")

    # 价格组合特征
    for c in combinations(prices, 2):
        df[f"{c[0]}_{c[1]}_imb"] = df.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]})")

    # 三重价格失衡特征
    for c in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
        triplet_feature = calculate_triplet_imbalance_numba(c, df)
        df[triplet_feature.columns] = triplet_feature.values

    # 加权特征
    df["stock_weights"] = df["stock_id"].map(weights)
    df["weighted_wap"] = df["stock_weights"] * df["wap"]
    ss = df.groupby('time_id')['weighted_wap'].sum()/df.groupby('time_id')['stock_weights'].sum()
    ss = ss.reset_index()
    ss.columns = ['time_id','wapindex']
    df = pd.merge(df,ss,how='left',on='time_id')
    df['wapdiff'] = df['wap'] - df['wapindex']

    # 动量特征
    df['wap_momentum'] = df.groupby('stock_id')['weighted_wap'].pct_change(periods=6)
    df["imbalance_momentum"] = df.groupby(['stock_id'])['imbalance_size'].diff(periods=1) / df['matched_size']
    
    # 价格压力特征
    df['price_pressure'] = df['imbalance_size'] * (df['ask_price'] - df['bid_price'])
    df['market_urgency'] = df['price_spread'] * df['liquidity_imbalance']
    df['depth_pressure'] = (df['ask_size'] - df['bid_size']) * (df['far_price'] - df['near_price'])

    # 统计特征
    for func in ["mean", "std", "skew", "kurt"]:
        df[f"all_prices_{func}"] = df[prices].agg(func, axis=1)
        df[f"all_sizes_{func}"] = df[sizes].agg(func, axis=1)
        
    # 滚动统计特征
    for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size', 'weighted_wap','price_spread']:
        for window in [1,3,5,10]:
            df[f"{col}_diff_{window}"] = df.groupby("stock_id")[col].diff(window)

    return df
  1. 时序特征生成
def generate_all_features(df):
    # 生成市场微观结构特征
    df = imbalance_features(df)
    gc.collect() 
    
    # 生成其他特征
    df = other_features(df)
    gc.collect()  
    
    feature_name = [i for i in df.columns if i not in ["row_id", "target", "time_id", "date_id"]]
    return df[feature_name]
  1. other_features函数
def other_features(df):
    df["dow"] = df["date_id"] % 5  # Day of the week
    df["seconds"] = df["seconds_in_bucket"] % 60  
    df["minute"] = df["seconds_in_bucket"] // 60  
    df['time_to_market_close'] = 540 - df['seconds_in_bucket']
    
    for key, value in global_stock_id_feats.items():
        df[f"global_{key}"] = df["stock_id"].map(value.to_dict())
    return df

3.3 模型训练模块

  1. LightGBM模型配置与训练
lgb_params = {
    "objective": "mae",
    "n_estimators": 7000,
    "num_leaves": 256,
    "subsample": 0.6,
    "colsample_bytree": 0.8,
    "learning_rate": 0.01,
    "max_depth": 11,
    "n_jobs": 4,
    "device": "gpu",
    "verbosity": -1,
    "importance_type": "gain",
    "reg_alpha": 0.2,
    "reg_lambda": 3.25
}

# 模型训练循环
models = []
models_cbt = []
scores = []

model_save_path = 'modelitos_para_despues' 
if not os.path.exists(model_save_path):
    os.makedirs(model_save_path)

for i in range(num_folds):
    # 训练模型
    lgb_model = lgb.LGBMRegressor(**lgb_params)
    lgb_model.fit(
        df_fold_train[feature_columns],
        df_fold_train_target,
        eval_set=[(df_fold_valid[feature_columns], df_fold_valid_target)],
        callbacks=[
            lgb.callback.early_stopping(stopping_rounds=100),
            lgb.callback.log_evaluation(period=100),
        ],
    )
    
    # 保存模型
    models.append(lgb_model)
    model_filename = os.path.join(model_save_path, f'doblez_{i+1}.txt')
    lgb_model.booster_.save_model(model_filename)
    
    # 评估模型
    fold_predictions = lgb_model.predict(df_fold_valid[feature_columns])
    fold_score = mean_absolute_error(fold_predictions, df_fold_valid_target)
    scores.append(fold_score)
    print(f":LGB Fold {i+1} MAE: {fold_score}")

2. 最终模型训练
```python
# 计算平均最佳迭代次数
average_best_iteration = int(np.mean([model.best_iteration_ for model in models]))

# 使用全量数据训练最终模型
num_model = 1
for i in range(num_model):
    final_model = lgb.LGBMRegressor(**final_model_params)
    final_model.fit(
        df_train_feats[feature_columns],
        df_train['target'],
        callbacks=[
            lgb.callback.log_evaluation(period=100),
        ],
    )
    models.append(final_model)
    model_filename = os.path.join(model_save_path, f'doblez.txt')
    final_model.booster_.save_model(model_filename)

3.4 预测系统模块

  1. 模型集成权重计算
def weighted_average(a):
    w = []
    n = len(a)
    for j in range(1, n + 1):
        j = 2 if j == 1 else j
        w.append(1 / n)
    return w
  1. 预测系统实现
# 预测环境初始化
import optiver2023
env = optiver2023.make_env()
iter_test = env.iter_test()

counter = 0
y_min, y_max = -64, 64
qps, predictions = [], []
cache = pd.DataFrame()
cachey = pd.DataFrame()
stocklist = list(df.stock_id.unique())

lgb_model_weights = weighted_average(models)

for (test, revealed_targets, sample_prediction) in iter_test:
    now_time = time.time()
    columns_given = ['seconds_in_bucket', 'imbalance_size',
                    'imbalance_buy_sell_flag', 'reference_price', 'matched_size',
                    'far_price', 'near_price', 'bid_price', 'bid_size',
                    'ask_price', 'ask_size', 'wap']
    test[columns_given] = test[columns_given].astype(float)
    test['time_id'] = test['date_id'].astype(str) +'_'+test['seconds_in_bucket'].astype(str)

    try:
        feat = generate_all_features(cache)[-len(test):]
        lgb_predictions = np.zeros(len(test))
        for model, weight in zip(models, lgb_model_weights):
            lgb_predictions += weight * model.predict(feat[feature_columns])
            
        final_predictions = lgb_predictions - np.mean(lgb_predictions)
        clipped_predictions = np.clip(final_predictions, y_min, y_max)
        sample_prediction['target'] = clipped_predictions
    except:
        sample_prediction['target'] = 0

    if test.currently_scored.iloc[0]== False:
        sample_prediction['target'] = 0
        env.predict(sample_prediction)
        counter += 1
        qps.append(time.time() - now_time)
        if counter % 10 == 0:
            print(counter, 'qps:', np.mean(qps))
        continue

3. 缓存管理实现
```python
# 历史数据缓存
cachey = pd.concat([cachey, revealed_targets], ignore_index=True, axis=0)
cachey = cachey[cachey.stock_id.isin(stocklist)].sort_values(by=['date_id', 'seconds_in_bucket', 'stock_id'])
cachey['revealed_target'] = cachey.revealed_target.astype(float)
cachey['target_shift1'] = cachey.groupby(['stock_id','seconds_in_bucket'])['revealed_target'].shift(0)

test = pd.merge(test,cachey[['date_id', 'stock_id', 'seconds_in_bucket','target_shift1']],\
                how='left',on = ['date_id', 'stock_id','seconds_in_bucket'])

cache = pd.concat([cache, test], ignore_index=True, axis=0)
if counter > 0:
    cache = cache.groupby(['stock_id']).tail(21).sort_values(by=['date_id', 'seconds_in_bucket', 'stock_id']).reset_index(drop=True)

3.5 性能监控

  1. 执行时间统计
if counter % 10 == 0:
    print(counter, 'qps:', np.mean(qps))

time_cost = 1.146 * np.mean(qps)
print(f"The code will take approximately {np.round(time_cost, 4)} hours to reason about")

4. 代码汇总

# 导入必要库
import gc, os, time, warnings
from itertools import combinations
from warnings import simplefilter
import joblib
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import KFold, TimeSeriesSplit
import polars as pl
import pickle

# 基础配置
warnings.filterwarnings("ignore")
simplefilter(action="ignore", category=pd.errors.PerformanceWarning)

# 运行模式配置
is_offline = False 
LGB = True
NN = False
is_train = True  
is_infer = True 
max_lookback = np.nan

# 内存优化函数
def reduce_mem_usage(df, verbose=0):
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float32)
    return df

# 三重失衡计算函数
@njit(parallel=True)
def compute_triplet_imbalance(df_values, comb_indices):
    num_rows = df_values.shape[0]
    num_combinations = len(comb_indices)
    imbalance_features = np.empty((num_rows, num_combinations))
    for i in prange(num_combinations):
        a, b, c = comb_indices[i]
        for j in range(num_rows):
            max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])
            min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])
            mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val
            
            if mid_val == min_val:
                imbalance_features[j, i] = np.nan
            else:
                imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)
    return imbalance_features

def calculate_triplet_imbalance_numba(price, df):
    df_values = df[price].values
    comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]
    features_array = compute_triplet_imbalance(df_values, comb_indices)
    columns = [f"{a}_{b}_{c}_imb2" for a, b, c in combinations(price, 3)]
    features = pd.DataFrame(features_array, columns=columns)
    return features

# 特征工程函数
def imbalance_features(df):
    prices = ["reference_price", "far_price", "near_price", "ask_price", "bid_price", "wap"]
    sizes = ["matched_size", "bid_size", "ask_size", "imbalance_size"]
    
    # 基础特征
    df["volume"] = df.eval("ask_size + bid_size")
    df["mid_price"] = df.eval("(ask_price + bid_price) / 2")
    df["liquidity_imbalance"] = df.eval("(bid_size-ask_size)/(bid_size+ask_size)")
    df["matched_imbalance"] = df.eval("(imbalance_size-matched_size)/(matched_size+imbalance_size)")
    df["size_imbalance"] = df.eval("bid_size / ask_size")

    # 价格组合特征
    for c in combinations(prices, 2):
        df[f"{c[0]}_{c[1]}_imb"] = df.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]})")

    # 三重价格失衡特征
    for c in [['ask_price', 'bid_price', 'wap', 'reference_price'], sizes]:
        triplet_feature = calculate_triplet_imbalance_numba(c, df)
        df[triplet_feature.columns] = triplet_feature.values

    # 加权特征
    df["stock_weights"] = df["stock_id"].map(weights)
    df["weighted_wap"] = df["stock_weights"] * df["wap"]
    ss = df.groupby('time_id')['weighted_wap'].sum()/df.groupby('time_id')['stock_weights'].sum()
    ss = ss.reset_index()
    ss.columns = ['time_id','wapindex']
    df = pd.merge(df,ss,how='left',on='time_id')
    df['wapdiff'] = df['wap'] - df['wapindex']

    # 动量特征
    df['wap_momentum'] = df.groupby('stock_id')['weighted_wap'].pct_change(periods=6)
    df["imbalance_momentum"] = df.groupby(['stock_id'])['imbalance_size'].diff(periods=1) / df['matched_size']
    
    # 价格压力特征
    df['price_pressure'] = df['imbalance_size'] * (df['ask_price'] - df['bid_price'])
    df['market_urgency'] = df['price_spread'] * df['liquidity_imbalance']
    df['depth_pressure'] = (df['ask_size'] - df['bid_size']) * (df['far_price'] - df['near_price'])

    # 统计特征
    for func in ["mean", "std", "skew", "kurt"]:
        df[f"all_prices_{func}"] = df[prices].agg(func, axis=1)
        df[f"all_sizes_{func}"] = df[sizes].agg(func, axis=1)
        
    # 滚动统计特征
    for col in ['ask_price', 'bid_price', 'ask_size', 'bid_size', 'weighted_wap','price_spread']:
        for window in [1,3,5,10]:
            df[f"{col}_diff_{window}"] = df.groupby("stock_id")[col].diff(window)

    return df

def other_features(df):
    df["dow"] = df["date_id"] % 5  # Day of the week
    df["seconds"] = df["seconds_in_bucket"] % 60  
    df["minute"] = df["seconds_in_bucket"] // 60  
    df['time_to_market_close'] = 540 - df['seconds_in_bucket']
    
    for key, value in global_stock_id_feats.items():
        df[f"global_{key}"] = df["stock_id"].map(value.to_dict())
    return df

def generate_all_features(df):
    # 生成市场微观结构特征
    df = imbalance_features(df)
    gc.collect() 
    
    # 生成其他特征
    df = other_features(df)
    gc.collect()  
    
    feature_name = [i for i in df.columns if i not in ["row_id", "target", "time_id", "date_id"]]
    return df[feature_name]

# 模型训练配置
lgb_params = {
    "objective": "mae",
    "n_estimators": 7000,
    "num_leaves": 256,
    "subsample": 0.6,
    "colsample_bytree": 0.8,
    "learning_rate": 0.01,
    "max_depth": 11,
    "n_jobs": 4,
    "device": "gpu",
    "verbosity": -1,
    "importance_type": "gain",
    "reg_alpha": 0.2,
    "reg_lambda": 3.25
}

# 主程序执行
if __name__ == "__main__":
    # 1. 数据加载与预处理
    df = pd.read_csv("/kaggle/input/optiver-trading-at-the-close/train.csv")
    df = df.dropna(subset=["target"])
    df.reset_index(drop=True, inplace=True)
    df['target_shift1'] = df.groupby(['stock_id','seconds_in_bucket'])['target'].shift(1)
    
    # 2. 全局特征计算
    global_stock_id_feats = {
        "median_size": df.groupby("stock_id")["bid_size"].median() + df.groupby("stock_id")["ask_size"].median(),
        "std_size": df.groupby("stock_id")["bid_size"].std() + df.groupby("stock_id")["ask_size"].std(),
        "ptp_size": df.groupby("stock_id")["bid_size"].max() - df.groupby("stock_id")["bid_size"].min(),
        "median_price": df.groupby("stock_id")["bid_price"].median() + df.groupby("stock_id")["ask_price"].median(),
        "std_price": df.groupby("stock_id")["bid_price"].std() + df.groupby("stock_id")["ask_price"].std(),
        "ptp_price": df.groupby("stock_id")["bid_price"].max() - df.groupby("stock_id")["ask_price"].min(),
    }
    
    # 3. 特征生成
    df_train_feats = generate_all_features(df)
    df_train_feats = reduce_mem_usage(df_train_feats)
    
    # 4. 数据分割与模型训练
    num_folds = 5
    fold_size = 480 // num_folds
    gap = 5
    date_ids = df['date_id'].values
    
    models = []
    models_cbt = []
    scores = []
    
    model_save_path = 'modelitos_para_despues' 
    if not os.path.exists(model_save_path):
        os.makedirs(model_save_path)

    for i in range(num_folds):
        start = i * fold_size
        end = start + fold_size
        if i < num_folds - 1:
            purged_start = end - 2
            purged_end = end + gap + 2
            train_indices = (date_ids >= start) & (date_ids < purged_start) | (date_ids > purged_end)
        else:
            train_indices = (date_ids >= start) & (date_ids < end)
        test_indices = (date_ids >= end) & (date_ids < end + fold_size)
        
        df_fold_train = df_train_feats[train_indices]
        df_fold_train_target = df['target'][train_indices]
        df_fold_valid = df_train_feats[test_indices]
        df_fold_valid_target = df['target'][test_indices]
        
        lgb_model = lgb.LGBMRegressor(**lgb_params)
        lgb_model.fit(
            df_fold_train[feature_columns],
            df_fold_train_target,
            eval_set=[(df_fold_valid[feature_columns], df_fold_valid_target)],
            callbacks=[
                lgb.callback.early_stopping(stopping_rounds=100),
                lgb.callback.log_evaluation(period=100),
            ],
        )
        
        models.append(lgb_model)
        model_filename = os.path.join(model_save_path, f'doblez_{i+1}.txt')
        lgb_model.booster_.save_model(model_filename)
        
        fold_predictions = lgb_model.predict(df_fold_valid[feature_columns])
        fold_score = mean_absolute_error(fold_predictions, df_fold_valid_target)
        scores.append(fold_score)
        print(f":LGB Fold {i+1} MAE: {fold_score}")
    
    # 5. 预测系统
    env = optiver2023.make_env()
    iter_test = env.iter_test()
    
    counter = 0
    y_min, y_max = -64, 64
    qps, predictions = [], []
    cache = pd.DataFrame()
    cachey = pd.DataFrame()
    stocklist = list(df.stock_id.unique())
    
    lgb_model_weights = weighted_average(models)
    
    for (test, revealed_targets, sample_prediction) in iter_test:
        now_time = time.time()
        columns_given = ['seconds_in_bucket', 'imbalance_size',
                        'imbalance_buy_sell_flag', 'reference_price', 'matched_size',
                        'far_price', 'near_price', 'bid_price', 'bid_size',
                        'ask_price', 'ask_size', 'wap']
        test[columns_given] = test[columns_given].astype(float)
        test['time_id'] = test['date_id'].astype(str) +'_'+test['seconds_in_bucket'].astype(str)

        cachey = pd.concat([cachey, revealed_targets], ignore_index=True, axis=0)
        cachey = cachey[cachey.stock_id.isin(stocklist)].sort_values(by=['date_id', 'seconds_in_bucket', 'stock_id'])
        cachey['revealed_target'] = cachey.revealed_target.astype(float)
        cachey['target_shift1'] = cachey.groupby(['stock_id','seconds_in_bucket'])['revealed_target'].shift(0)
        
        test = pd.merge(test,cachey[['date_id', 'stock_id', 'seconds_in_bucket','target_shift1']],\
                        how='left',on = ['date_id', 'stock_id','seconds_in_bucket'])
        
        cache = pd.concat([cache, test], ignore_index=True, axis=0)
        if counter > 0:
            cache = cache.groupby(['stock_id']).tail(21).sort_values(by=['date_id', 'seconds_in_bucket', 'stock_id']).reset_index(drop=True)

        try:
            feat = generate_all_features(cache)[-len(test):]
            lgb_predictions = np.zeros(len(test))
            for model, weight in zip(models, lgb_model_weights):
                lgb_predictions += weight * model.predict(feat[feature_columns])
                
            final_predictions = lgb_predictions - np.mean(lgb_predictions)
            clipped_predictions = np.clip(final_predictions, y_min, y_max)
            sample_prediction['target'] = clipped_predictions
        except:
            sample_prediction['target'] = 0

        env.predict(sample_prediction)
        counter += 1
        qps.append(time.time() - now_time)
        if counter % 10 == 0:
            print(counter, 'qps:', np.mean(qps))

5. 项目总结

5.1 技术亮点

  1. 特征工程的深度和广度

    • 构建了丰富的市场微观结构特征
    • 利用三重价格失衡捕捉市场深层信息
    • 结合时序特征和统计特征提升预测效果
  2. 模型训练的稳定性

    • 采用时间序列交叉验证确保模型稳定性
    • 使用模型集成减少预测波动
    • 实现了预测结果的零和约束
  3. 系统实现的高效性

    • 使用numba加速特征计算
    • 实现了高效的内存管理
    • GPU加速模型训练过程

5.2 经验总结

本项目构建了一个完整的股市收盘价预测系统,通过深入的特征工程和稳定的模型训练,实现了较好的预测效果。项目中的许多技术和经验,如市场微观结构特征的构建、时间序列数据的处理方法等,都可以应用到其他金融预测任务中。同时,项目也展示了在实际交易场景中,如何平衡预测准确性和系统效率的重要性。