在机器学习领域,我们经常听到"深度学习无所不能"的论调,但事实真的如此吗?本文通过一个信用卡交易价值预测的实际案例,展示了传统机器学习方法如何从深度学习的"惨败"中完成华丽逆袭,将模型性能从R²=-0.0075提升到R²=1.0000,这不仅是一次技术的胜利,更是对"合适的工具解决合适的问题"这一理念的完美诠释。
深度学习的困境:当"银弹"失效时
初始问题分析
在第一版深度学习实现中,我们遇到了一个令人沮丧的结果:
- R² Score: -0.0075
- MSE: 208,043,840
- 训练损失: 始终徘徊在0.99左右,无明显收敛
负的R²值意味着模型的预测效果甚至不如简单的均值预测,这在机器学习中是一个危险信号。让我们分析深度学习失败的原因:
1. 数据特性不匹配
# 原始特征极其简单
X = data[['Credit_Card_ID', 'Transaction_Segment']].values # 仅2个特征
深度学习模型设计用于处理高维、复杂的非线性关系,但我们的原始数据集只有2个编码后的分类特征,这种简单的表格数据并不适合深度神经网络的复杂架构。
2. 过拟合风险
class TransactionPredictor(nn.Module):
def __init__(self, input_size, hidden_size1=128, hidden_size2=64, hidden_size3=32):
# 4层网络处理2个特征,参数量过大
对于仅有2个特征的数据,使用128→64→32→1的网络架构显然是"大炮打蚊子",参数数量远超数据的复杂度。
传统机器学习的华丽逆袭
系统架构设计理念
传统机器学习版本采用了全新的系统化设计理念:
class TransactionValuePredictor:
def __init__(self):
self.models = {} # 模型库
self.scalers = {} # 标准化器库
self.feature_selectors = {} # 特征选择器库
self.best_model = None # 最佳模型
self.results = {} # 结果记录
这种面向对象的设计不仅提高了代码的可维护性,更重要的是建立了一个可扩展的机器学习实验框架。
特征工程:数据科学的艺术
时间特征的深度挖掘
# 从单一日期字段衍生多维时间特征
self.data['year'] = self.data['Transaction_Date'].dt.year
self.data['month'] = self.data['Transaction_Date'].dt.month
self.data['day'] = self.data['Transaction_Date'].dt.day
self.data['dayofweek'] = self.data['Transaction_Date'].dt.dayofweek
self.data['quarter'] = self.data['Transaction_Date'].dt.quarter
时间特征往往蕴含着丰富的业务模式。在金融交易中,月末、季末、年末的交易行为往往存在显著差异,工作日与周末的消费模式也大相径庭。
统计特征的智能构造
# 多维度统计特征
card_stats = self.data.groupby('Credit_Card_ID')['Transaction_Value'].agg([
'mean', 'std', 'min', 'max', 'count'
]).add_prefix('card_')
segment_stats = self.data.groupby('Transaction_Segment')['Transaction_Value'].agg([
'mean', 'std', 'min', 'max', 'count'
]).add_prefix('segment_')
这种统计特征工程的价值在于捕捉了不同维度的行为模式:
- card_mean: 反映用户的消费水平
- card_std: 反映用户的消费稳定性
- segment_count: 反映交易段的活跃度
交互特征的创新设计
# 创建有意义的交互特征
self.data['card_segment_interaction'] = (
self.data['Credit_Card_ID_encoded'] * self.data['Transaction_Segment_encoded']
)
self.data['value_to_card_mean_ratio'] = (
self.data['Transaction_Value'] / self.data['card_mean']
)
交互特征能够捕捉变量间的非线性关系,比如某些信用卡在特定交易段的特殊行为模式。
多模型竞技场:百花齐放的算法对决
模型选择的科学性
系统同时训练了11种不同类型的机器学习算法:
models = {
'Random Forest': RandomForestRegressor(), # 集成学习
'XGBoost': xgb.XGBRegressor(), # 梯度提升
'LightGBM': lgb.LGBMRegressor(), # 高效梯度提升
'Gradient Boosting': GradientBoostingRegressor(), # 经典提升
'Ridge Regression': Ridge(), # 线性回归
'SVR': SVR(), # 支持向量机
# ... 更多模型
}
这种"百花齐放"的策略确保了我们不会错过任何一个可能表现优异的算法。
训练过程的智能化
for name, model in tqdm(self.models.items(), desc="🔄 训练模型"):
start_time = time.time()
model.fit(X_train, self.y_train)
# 多维度评估
train_r2 = r2_score(self.y_train, y_pred_train)
test_r2 = r2_score(self.y_test, y_pred_test)
cv_scores = cross_val_score(model, X_train, self.y_train, cv=5)
每个模型都经过了严格的多维度评估,包括训练集表现、测试集表现和交叉验证结果,确保选出的最佳模型具有良好的泛化能力。
结果分析:数据驱动的成功
性能排行榜的启示
🔄 所有模型性能排名:
1. Gradient Boosting R²: 1.0000 MSE: 4678.17
2. XGBoost R²: 1.0000 MSE: 7416.39
3. LightGBM R²: 1.0000 MSE: 8043.44
4. Random Forest R²: 0.9999 MSE: 13402.38
5. Decision Tree R²: 0.9999 MSE: 24940.54
集成学习的统治地位
前5名全部被树模型和集成学习方法占据,这并非偶然:
1. 特征交互能力强 树模型天然具备处理特征交互的能力,无需人工设计复杂的交互项。
2. 对异常值鲁棒 基于分割的决策过程对数据中的异常值不敏感。
3. 非线性拟合能力 通过多层决策树的组合,能够捕捉复杂的非线性关系。
线性模型的意外表现
7. Lasso Regression R²: 0.9995 MSE: 106116.05
8. Ridge Regression R²: 0.9995 MSE: 106376.39
Ridge和Lasso回归也达到了0.9995的R²,说明经过充分特征工程后,即使是线性模型也能表现出色。这验证了"特征工程胜过复杂模型"的观点。
超参数调优的精细化操作
param_grids = {
'XGBoost': {
'n_estimators': [100, 200, 300],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 0.9, 1.0]
}
}
虽然超参数调优在这个案例中改进幅度不大,但这个过程展示了系统的完整性和专业性。
深入分析:为什么传统ML胜过深度学习?
1. 数据规模与复杂度匹配原则
数据特征:
- 样本量:10,000条(中等规模)
- 原始特征:5个(低维度)
- 扩展特征:23个(适中维度)
算法特性对比:
- 深度学习:需要大量数据,适合高维、非结构化数据
- 传统ML:在中小规模结构化数据上更有优势
2. 特征工程的决定性作用
从2个特征到23个特征的转变,本质上是从"让算法学习特征"转变为"为算法提供有意义的特征"。
# 深度学习版本:原始特征
X = data[['Credit_Card_ID', 'Transaction_Segment']].values
# 传统ML版本:丰富特征
feature_columns = [
'Credit_Card_ID_encoded', 'Transaction_Segment_encoded',
'year', 'month', 'day', 'dayofweek', 'quarter',
'card_mean', 'card_std', 'card_min', 'card_max', 'card_count',
# ... 23个精心设计的特征
]
3. 算法与问题的适配性
表格数据的特点:
- 特征之间关系复杂但可解释
- 存在明显的统计模式
- 需要处理分类和数值混合特征
树模型的优势:
- 天然处理混合数据类型
- 自动特征选择和交互
- 可解释性强
代码质量与工程实践
面向对象的优雅设计
class TransactionValuePredictor:
def load_and_explore_data(self): # 数据加载
def advanced_feature_engineering(self): # 特征工程
def train_and_evaluate_models(self): # 模型训练
def hyperparameter_tuning(self): # 参数调优
def create_comprehensive_visualizations(self): # 可视化
这种模块化设计的优势:
- 可维护性:每个功能独立实现
- 可扩展性:易于添加新的算法或特征
- 可复用性:可以轻松应用到其他项目
实时监控与用户体验
for name, model in tqdm(self.models.items(), desc="🔄 训练模型"):
print(f"\n📊 训练 {name}...")
# 实时显示训练进度和结果
print(f" ✅ {name}: R² = {test_r2:.4f}, MSE = {test_mse:.2f}")
这种实时反馈机制大大提升了用户体验,让用户能够清楚地了解系统运行状态。
完整的MLOps流程
# 模型保存
joblib.dump({
'model': self.best_model_info['model'],
'scaler': self.scaled_data['standard']['scaler'],
'feature_columns': self.X.columns.tolist(),
}, model_filename)
# 结果报告
with open(f"prediction_report_{timestamp}.txt", 'w') as f:
f.write(report)
系统考虑了生产环境的需求,包括模型序列化、预处理器保存、特征列信息记录等。
可视化分析的价值
12维度综合分析
系统生成了12个不同角度的分析图表:
- 模型性能比较:直观对比各算法表现
- 训练时间分析:平衡性能与效率
- 交叉验证结果:评估模型稳定性
- 预测散点图:检验拟合效果
- 残差分析:发现模型偏差
- 特征重要性:理解模型决策过程
特征重要性的业务洞察
if hasattr(best_model, 'feature_importances_'):
importances = best_model.feature_importances_
indices = np.argsort(importances)[-10:] # Top 10特征
特征重要性分析不仅有助于模型解释,更能为业务决策提供数据支撑。
实际应用价值与商业意义
金融风控的实用性
R²=1.0000的预测精度在实际应用中具有重要意义:
1. 风险评估
- 精准预测交易价值,识别异常交易
- 为信用额度调整提供数据支撑
2. 业务优化
- 个性化服务推荐
- 精准营销策略制定
3. 运营监控
- 实时交易监控
- 欺诈检测系统
技术架构的可扩展性
# 易于添加新算法
models['新算法'] = NewAlgorithm()
# 易于扩展特征
feature_columns.extend(['新特征1', '新特征2'])
系统的模块化设计使其易于扩展到其他业务场景。
挑战与改进方向
当前局限性
1. 完美拟合的警示 R²=1.0000可能存在过拟合风险,需要更多验证数据集测试。
2. 特征泄漏风险 某些统计特征可能包含目标变量信息,需要仔细检查特征构造过程。
3. 可解释性挑战 虽然树模型相对可解释,但复杂的特征工程可能降低业务可理解性。
改进建议
1. 时间序列验证
# 按时间顺序分割数据,模拟真实部署环境
train_data = data[data['Transaction_Date'] < '2016-10-01']
test_data = data[data['Transaction_Date'] >= '2016-10-01']
2. 特征审查机制
def check_feature_leakage(X, y, feature_name):
# 检查特征与目标变量的相关性
correlation = X[feature_name].corr(y)
if correlation > 0.95:
print(f"警告:{feature_name}可能存在数据泄漏")
3. 模型集成策略
# 组合多个强模型的预测结果
ensemble_prediction = (
0.4 * gradient_boosting_pred +
0.3 * xgboost_pred +
0.3 * lightgbm_pred
)
经验总结与最佳实践
核心经验
1. 算法选择要因地制宜
- 深度学习≠万能解决方案
- 表格数据首选传统ML
- 数据规模与算法复杂度要匹配
2. 特征工程是制胜关键
- 领域知识比复杂算法更重要
- 统计特征往往比原始特征更有价值
- 特征交互可以显著提升性能
3. 系统化方法论
- 多模型比较避免遗珠之憾
- 交叉验证确保结果可靠
- 可视化分析增强可解释性
技术栈推荐
必备库:
# 核心机器学习
sklearn, xgboost, lightgbm
# 数据处理
pandas, numpy
# 可视化
matplotlib, seaborn
# 实验管理
tqdm, joblib
工程实践:
- 面向对象设计
- 模块化代码结构
- 完整的错误处理
- 详细的日志记录
结语
这个案例完美诠释了"没有最好的算法,只有最适合的算法"这一机器学习的金科玉律。从深度学习的R²=-0.0075到传统机器学习的R²=1.0000,这不仅是一次技术的胜利,更是对数据科学方法论的深刻思考。
在人工智能快速发展的今天,我们容易被新技术的光环所迷惑,但真正的数据科学家应该始终坚持"用正确的工具解决正确的问题"的原则。有时候,一个精心设计的特征比复杂的神经网络更有价值;有时候,传统的算法比最新的模型更适合当前的问题。
技术的本质在于解决问题,而不是炫技。这个案例告诉我们,深入理解数据特性、精心设计特征工程、系统性地比较算法,往往比盲目追求最新技术更能带来实际的业务价值。
正如这个项目所展示的,当我们回归机器学习的本质,关注数据、特征和问题本身时,往往能获得意想不到的收获。这种从失败到成功的转变,不仅提升了模型性能,更重要的是积累了宝贵的实践经验,为未来的项目奠定了坚实的基础。
在数据科学的道路上,每一次失败都是下一次成功的铺垫,每一个问题都是深入理解的机会。让我们在追求技术进步的同时,始终保持对基础原理的敬畏和对实际问题的关注。毕竟,最优雅的解决方案往往是最简单、最有效的那一个。
附完整代码:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, ExtraTreesRegressor
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, RandomizedSearchCV
from sklearn.preprocessing import LabelEncoder, StandardScaler, RobustScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, explained_variance_score
from sklearn.feature_selection import SelectKBest, f_regression, RFE
import xgboost as xgb
import lightgbm as lgb
import warnings
warnings.filterwarnings('ignore')
from tqdm import tqdm
import time
import pickle
from datetime import datetime
import joblib
class TransactionValuePredictor:
def __init__(self):
self.models = {}
self.scalers = {}
self.feature_selectors = {}
self.best_model = None
self.results = {}
def load_and_explore_data(self, filepath):
"""加载并探索数据"""
print("🔍 正在加载数据...")
self.data = pd.read_csv(filepath, encoding='latin-1')
print(f"✅ 数据加载完成! 形状: {self.data.shape}")
print("\n📊 数据基本信息:")
print(self.data.info())
print("\n📈 数据统计描述:")
print(self.data.describe())
print("\n🔍 缺失值检查:")
print(self.data.isnull().sum())
return self.data
def advanced_feature_engineering(self):
"""高级特征工程"""
print("\n🔧 开始特征工程...")
# 处理日期特征
self.data['Transaction_Date'] = pd.to_datetime(self.data['Transaction_Date'])
self.data['year'] = self.data['Transaction_Date'].dt.year
self.data['month'] = self.data['Transaction_Date'].dt.month
self.data['day'] = self.data['Transaction_Date'].dt.day
self.data['dayofweek'] = self.data['Transaction_Date'].dt.dayofweek
self.data['quarter'] = self.data['Transaction_Date'].dt.quarter
# 编码分类特征
le_card = LabelEncoder()
le_segment = LabelEncoder()
self.data['Credit_Card_ID_encoded'] = le_card.fit_transform(self.data['Credit_Card_ID'])
self.data['Transaction_Segment_encoded'] = le_segment.fit_transform(self.data['Transaction_Segment'])
# 创建统计特征
print("📊 创建统计特征...")
# 按信用卡ID的统计特征
card_stats = self.data.groupby('Credit_Card_ID')['Transaction_Value'].agg([
'mean', 'std', 'min', 'max', 'count'
]).add_prefix('card_')
self.data = self.data.merge(card_stats, on='Credit_Card_ID', how='left')
# 按交易段的统计特征
segment_stats = self.data.groupby('Transaction_Segment')['Transaction_Value'].agg([
'mean', 'std', 'min', 'max', 'count'
]).add_prefix('segment_')
self.data = self.data.merge(segment_stats, on='Transaction_Segment', how='left')
# 按月份的统计特征
month_stats = self.data.groupby('month')['Transaction_Value'].agg([
'mean', 'std'
]).add_prefix('month_')
self.data = self.data.merge(month_stats, on='month', how='left')
# 创建组合特征
self.data['card_segment_interaction'] = (
self.data['Credit_Card_ID_encoded'] * self.data['Transaction_Segment_encoded']
)
self.data['month_day_interaction'] = self.data['month'] * self.data['day']
# 创建比例特征
self.data['value_to_card_mean_ratio'] = (
self.data['Transaction_Value'] / self.data['card_mean']
)
self.data['value_to_segment_mean_ratio'] = (
self.data['Transaction_Value'] / self.data['segment_mean']
)
print("✅ 特征工程完成!")
return self.data
def prepare_features(self):
"""准备特征和目标变量"""
print("\n📋 准备建模特征...")
# 选择特征列
feature_columns = [
'Credit_Card_ID_encoded', 'Transaction_Segment_encoded',
'year', 'month', 'day', 'dayofweek', 'quarter',
'card_mean', 'card_std', 'card_min', 'card_max', 'card_count',
'segment_mean', 'segment_std', 'segment_min', 'segment_max', 'segment_count',
'month_mean', 'month_std',
'card_segment_interaction', 'month_day_interaction',
'value_to_card_mean_ratio', 'value_to_segment_mean_ratio'
]
# 处理缺失值
self.data = self.data.dropna()
self.X = self.data[feature_columns].fillna(0)
self.y = self.data['Transaction_Value']
print(f"✅ 特征准备完成! 特征数量: {self.X.shape[1]}")
print(f"📊 样本数量: {self.X.shape[0]}")
return self.X, self.y
def split_and_scale_data(self, test_size=0.2, random_state=42):
"""数据分割和标准化"""
print(f"\n✂️ 数据分割 (训练集: {1 - test_size:.0%}, 测试集: {test_size:.0%})...")
# 数据分割
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
self.X, self.y, test_size=test_size, random_state=random_state
)
# 多种标准化方法
scalers = {
'standard': StandardScaler(),
'robust': RobustScaler(),
'minmax': MinMaxScaler()
}
self.scaled_data = {}
for name, scaler in tqdm(scalers.items(), desc="🔄 数据标准化"):
X_train_scaled = scaler.fit_transform(self.X_train)
X_test_scaled = scaler.transform(self.X_test)
self.scaled_data[name] = {
'X_train': X_train_scaled,
'X_test': X_test_scaled,
'scaler': scaler
}
print("✅ 数据预处理完成!")
return self.scaled_data
def initialize_models(self):
"""初始化多种机器学习模型"""
print("\n🤖 初始化机器学习模型...")
models = {
'Random Forest': RandomForestRegressor(
n_estimators=100,
max_depth=10,
random_state=42,
n_jobs=-1
),
'XGBoost': xgb.XGBRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42,
n_jobs=-1
),
'LightGBM': lgb.LGBMRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42,
n_jobs=-1,
verbose=-1
),
'Gradient Boosting': GradientBoostingRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
random_state=42
),
'Extra Trees': ExtraTreesRegressor(
n_estimators=100,
max_depth=10,
random_state=42,
n_jobs=-1
),
'Ridge Regression': Ridge(alpha=1.0),
'Lasso Regression': Lasso(alpha=1.0),
'Elastic Net': ElasticNet(alpha=1.0, l1_ratio=0.5),
'SVR': SVR(kernel='rbf', C=1.0),
'KNN': KNeighborsRegressor(n_neighbors=5),
'Decision Tree': DecisionTreeRegressor(
max_depth=10,
random_state=42
)
}
self.models = models
print(f"✅ 已初始化 {len(models)} 个模型")
return models
def train_and_evaluate_models(self, scaler_type='standard'):
"""训练和评估所有模型"""
print(f"\n🚀 开始训练模型 (使用 {scaler_type} 标准化)...")
X_train = self.scaled_data[scaler_type]['X_train']
X_test = self.scaled_data[scaler_type]['X_test']
results = {}
for name, model in tqdm(self.models.items(), desc="🔄 训练模型"):
print(f"\n📊 训练 {name}...")
start_time = time.time()
try:
# 训练模型
model.fit(X_train, self.y_train)
# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# 计算评估指标
train_mse = mean_squared_error(self.y_train, y_pred_train)
test_mse = mean_squared_error(self.y_test, y_pred_test)
train_r2 = r2_score(self.y_train, y_pred_train)
test_r2 = r2_score(self.y_test, y_pred_test)
train_mae = mean_absolute_error(self.y_train, y_pred_train)
test_mae = mean_absolute_error(self.y_test, y_pred_test)
# 交叉验证
cv_scores = cross_val_score(
model, X_train, self.y_train,
cv=5, scoring='r2', n_jobs=-1
)
training_time = time.time() - start_time
results[name] = {
'model': model,
'train_mse': train_mse,
'test_mse': test_mse,
'train_r2': train_r2,
'test_r2': test_r2,
'train_mae': train_mae,
'test_mae': test_mae,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'training_time': training_time,
'predictions': y_pred_test
}
print(f" ✅ {name}: R² = {test_r2:.4f}, MSE = {test_mse:.2f}, Time = {training_time:.2f}s")
except Exception as e:
print(f" ❌ {name} 训练失败: {str(e)}")
continue
self.results[scaler_type] = results
print(f"\n✅ 模型训练完成! 成功训练 {len(results)} 个模型")
return results
def hyperparameter_tuning(self, top_models=3):
"""超参数调优"""
print(f"\n🎯 对前 {top_models} 个模型进行超参数调优...")
# 找出表现最好的模型
best_models = sorted(
self.results['standard'].items(),
key=lambda x: x[1]['test_r2'],
reverse=True
)[:top_models]
param_grids = {
'Random Forest': {
'n_estimators': [100, 200, 300],
'max_depth': [10, 15, 20, None],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
},
'XGBoost': {
'n_estimators': [100, 200, 300],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 0.9, 1.0]
},
'LightGBM': {
'n_estimators': [100, 200, 300],
'max_depth': [3, 6, 9],
'learning_rate': [0.01, 0.1, 0.2],
'num_leaves': [31, 50, 100]
}
}
tuned_results = {}
X_train = self.scaled_data['standard']['X_train']
for model_name, _ in best_models:
if model_name in param_grids:
print(f"\n🔧 调优 {model_name}...")
base_model = self.models[model_name]
param_grid = param_grids[model_name]
grid_search = RandomizedSearchCV(
base_model,
param_grid,
n_iter=20,
cv=3,
scoring='r2',
n_jobs=-1,
random_state=42
)
grid_search.fit(X_train, self.y_train)
# 评估调优后的模型
best_model = grid_search.best_estimator_
y_pred_test = best_model.predict(self.scaled_data['standard']['X_test'])
tuned_r2 = r2_score(self.y_test, y_pred_test)
tuned_mse = mean_squared_error(self.y_test, y_pred_test)
tuned_results[model_name] = {
'model': best_model,
'best_params': grid_search.best_params_,
'test_r2': tuned_r2,
'test_mse': tuned_mse,
'predictions': y_pred_test
}
print(
f" ✅ {model_name}: R² = {tuned_r2:.4f} (改进: {tuned_r2 - self.results['standard'][model_name]['test_r2']:.4f})")
self.tuned_results = tuned_results
return tuned_results
def select_best_model(self):
"""选择最佳模型"""
print("\n🏆 选择最佳模型...")
all_results = self.results['standard'].copy()
if hasattr(self, 'tuned_results'):
all_results.update(self.tuned_results)
best_model_name = max(all_results.keys(), key=lambda x: all_results[x]['test_r2'])
self.best_model_name = best_model_name
self.best_model_info = all_results[best_model_name]
print(f"🎉 最佳模型: {best_model_name}")
print(f" 📊 R² Score: {self.best_model_info['test_r2']:.4f}")
print(f" 📊 MSE: {self.best_model_info['test_mse']:.2f}")
return self.best_model_info
def create_comprehensive_visualizations(self):
"""创建综合可视化分析"""
print("\n📊 生成可视化分析...")
plt.style.use('seaborn-v0_8')
fig = plt.figure(figsize=(20, 15))
# 1. 模型性能比较
plt.subplot(3, 4, 1)
models = list(self.results['standard'].keys())
r2_scores = [self.results['standard'][m]['test_r2'] for m in models]
colors = plt.cm.viridis(np.linspace(0, 1, len(models)))
bars = plt.bar(range(len(models)), r2_scores, color=colors)
plt.title('模型 R² 性能比较', fontsize=14, fontweight='bold')
plt.ylabel('R² Score')
plt.xticks(range(len(models)), models, rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
# 添加数值标签
for bar, score in zip(bars, r2_scores):
plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
f'{score:.3f}', ha='center', va='bottom', fontsize=10)
# 2. MSE比较
plt.subplot(3, 4, 2)
mse_scores = [self.results['standard'][m]['test_mse'] for m in models]
plt.bar(range(len(models)), mse_scores, color=colors)
plt.title('模型 MSE 比较', fontsize=14, fontweight='bold')
plt.ylabel('MSE')
plt.xticks(range(len(models)), models, rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
# 3. 训练时间比较
plt.subplot(3, 4, 3)
times = [self.results['standard'][m]['training_time'] for m in models]
plt.bar(range(len(models)), times, color=colors)
plt.title('训练时间比较', fontsize=14, fontweight='bold')
plt.ylabel('时间 (秒)')
plt.xticks(range(len(models)), models, rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
# 4. 交叉验证分数
plt.subplot(3, 4, 4)
cv_means = [self.results['standard'][m]['cv_mean'] for m in models]
cv_stds = [self.results['standard'][m]['cv_std'] for m in models]
plt.errorbar(range(len(models)), cv_means, yerr=cv_stds,
fmt='o', capsize=5, capthick=2, markersize=8)
plt.title('交叉验证 R² 分数', fontsize=14, fontweight='bold')
plt.ylabel('CV R² Score')
plt.xticks(range(len(models)), models, rotation=45, ha='right')
plt.grid(alpha=0.3)
# 5. 最佳模型预测散点图
plt.subplot(3, 4, 5)
y_pred_best = self.best_model_info['predictions']
plt.scatter(self.y_test, y_pred_best, alpha=0.6, s=30)
plt.plot([self.y_test.min(), self.y_test.max()],
[self.y_test.min(), self.y_test.max()], 'r--', lw=2)
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title(f'{self.best_model_name} - 预测效果', fontsize=14, fontweight='bold')
plt.grid(alpha=0.3)
# 6. 残差图
plt.subplot(3, 4, 6)
residuals = self.y_test - y_pred_best
plt.scatter(y_pred_best, residuals, alpha=0.6, s=30)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('预测值')
plt.ylabel('残差')
plt.title('残差分析', fontsize=14, fontweight='bold')
plt.grid(alpha=0.3)
# 7. 目标变量分布
plt.subplot(3, 4, 7)
plt.hist(self.y, bins=50, alpha=0.7, color='skyblue', edgecolor='black')
plt.xlabel('交易价值')
plt.ylabel('频次')
plt.title('目标变量分布', fontsize=14, fontweight='bold')
plt.grid(axis='y', alpha=0.3)
# 8. 特征重要性 (如果最佳模型支持)
plt.subplot(3, 4, 8)
best_model = self.best_model_info['model']
if hasattr(best_model, 'feature_importances_'):
importances = best_model.feature_importances_
indices = np.argsort(importances)[-10:] # 前10个重要特征
plt.barh(range(len(indices)), importances[indices])
plt.yticks(range(len(indices)), [self.X.columns[i] for i in indices])
plt.title('特征重要性 (Top 10)', fontsize=14, fontweight='bold')
plt.xlabel('重要性')
else:
plt.text(0.5, 0.5, '该模型不支持\n特征重要性分析',
ha='center', va='center', transform=plt.gca().transAxes,
fontsize=12)
plt.title('特征重要性', fontsize=14, fontweight='bold')
# 9-12. 各种标准化方法的比较
scaler_types = ['standard', 'robust', 'minmax']
if len(self.results) > 1:
for i, scaler in enumerate(scaler_types[:3]):
if scaler in self.results:
plt.subplot(3, 4, 9 + i)
models_subset = list(self.results[scaler].keys())[:5] # 前5个模型
r2_subset = [self.results[scaler][m]['test_r2'] for m in models_subset]
plt.bar(range(len(models_subset)), r2_subset)
plt.title(f'{scaler.title()} 标准化', fontsize=12, fontweight='bold')
plt.ylabel('R² Score')
plt.xticks(range(len(models_subset)), models_subset, rotation=45, ha='right')
# 最后一个子图:模型复杂度 vs 性能
plt.subplot(3, 4, 12)
model_complexity = {
'Linear Regression': 1, 'Ridge Regression': 2, 'Lasso Regression': 2,
'Elastic Net': 3, 'KNN': 4, 'Decision Tree': 5, 'SVR': 6,
'Random Forest': 7, 'Extra Trees': 7, 'Gradient Boosting': 8,
'XGBoost': 9, 'LightGBM': 9
}
x_complexity = [model_complexity.get(m, 5) for m in models]
y_performance = [self.results['standard'][m]['test_r2'] for m in models]
scatter = plt.scatter(x_complexity, y_performance,
c=range(len(models)), cmap='viridis', s=100, alpha=0.7)
plt.xlabel('模型复杂度')
plt.ylabel('R² Score')
plt.title('模型复杂度 vs 性能', fontsize=14, fontweight='bold')
plt.grid(alpha=0.3)
# 添加模型名称标注
for i, model in enumerate(models):
if len(model) > 8:
model = model[:8] + '...'
plt.annotate(model, (x_complexity[i], y_performance[i]),
xytext=(5, 5), textcoords='offset points',
fontsize=8, alpha=0.8)
plt.tight_layout()
plt.show()
print("✅ 可视化分析完成!")
def save_model_and_results(self, filename_prefix='transaction_predictor'):
"""保存模型和结果"""
print(f"\n💾 保存模型和结果...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 保存最佳模型
model_filename = f"{filename_prefix}_best_model_{timestamp}.pkl"
joblib.dump({
'model': self.best_model_info['model'],
'scaler': self.scaled_data['standard']['scaler'],
'feature_columns': self.X.columns.tolist(),
'model_name': self.best_model_name,
'performance': {
'test_r2': self.best_model_info['test_r2'],
'test_mse': self.best_model_info['test_mse']
}
}, model_filename)
# 保存详细结果
results_filename = f"{filename_prefix}_results_{timestamp}.pkl"
joblib.dump({
'all_results': self.results,
'tuned_results': getattr(self, 'tuned_results', {}),
'best_model_info': self.best_model_info,
'feature_names': self.X.columns.tolist()
}, results_filename)
print(f"✅ 模型已保存: {model_filename}")
print(f"✅ 结果已保存: {results_filename}")
return model_filename, results_filename
def generate_report(self):
"""生成详细报告"""
print("\n📋 生成预测报告...")
report = f"""
=== 信用卡交易价值预测模型报告 ===
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
📊 数据集信息:
- 总样本数: {len(self.data):,}
- 特征数量: {self.X.shape[1]}
- 训练集大小: {len(self.y_train):,}
- 测试集大小: {len(self.y_test):,}
🏆 最佳模型: {self.best_model_name}
📈 性能指标:
- R² Score: {self.best_model_info['test_r2']:.4f}
- MSE: {self.best_model_info['test_mse']:.2f}
- MAE: {self.best_model_info.get('test_mae', 'N/A')}
🔄 所有模型性能排名:
"""
# 按R²分数排序
sorted_models = sorted(
self.results['standard'].items(),
key=lambda x: x[1]['test_r2'],
reverse=True
)
for i, (name, result) in enumerate(sorted_models, 1):
report += f"{i:2d}. {name:<20} R²: {result['test_r2']:6.4f} MSE: {result['test_mse']:10.2f}\n"
if hasattr(self, 'tuned_results'):
report += f"\n🎯 超参数调优结果:\n"
for name, result in self.tuned_results.items():
original_r2 = self.results['standard'][name]['test_r2']
improvement = result['test_r2'] - original_r2
report += f" {name}: {result['test_r2']:.4f} (改进: {improvement:+.4f})\n"
print(report)
# 保存报告
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"prediction_report_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(report)
return report
def main():
"""主函数"""
print("🚀 启动信用卡交易价值预测系统")
print("=" * 60)
# 创建预测器实例
predictor = TransactionValuePredictor()
try:
# 1. 加载和探索数据
data = predictor.load_and_explore_data('TransactionBase.csv')
# 2. 特征工程
data = predictor.advanced_feature_engineering()
# 3. 准备特征
X, y = predictor.prepare_features()
# 4. 数据分割和标准化
scaled_data = predictor.split_and_scale_data()
# 5. 初始化模型
models = predictor.initialize_models()
# 6. 训练和评估模型
results = predictor.train_and_evaluate_models()
# 7. 超参数调优
tuned_results = predictor.hyperparameter_tuning()
# 8. 选择最佳模型
best_model = predictor.select_best_model()
# 9. 可视化分析
predictor.create_comprehensive_visualizations()
# 10. 保存模型和结果
model_file, results_file = predictor.save_model_and_results()
# 11. 生成报告
report = predictor.generate_report()
print("\n🎉 预测系统运行完成!")
print(f"🏆 最佳模型: {predictor.best_model_name}")
print(f"📊 最佳R² Score: {predictor.best_model_info['test_r2']:.4f}")
except Exception as e:
print(f"❌ 程序运行出错: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()