pyspark读取hive表中数据后进行lgb建模

发布于:2025-09-14 ⋅ 阅读:(26) ⋅ 点赞:(0)

本文使用上篇文章中生成的稀疏向量进行建模。

因from pyspark_lightgbm import LGBMClassifier和from synapse.ml.lightgbm import LightGBMClassifier在集群上均未安装,故使用原生lgb进行建模。(理论上前两者效率更优,可并行处理数据,而原生lgb只能单机处理)

# 网格寻参
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import sys
import time
import numpy as np
import lightgbm as lgb
import joblib
from scipy.sparse import csr_matrix
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    f1_score,
    precision_score,
    recall_score,
    accuracy_score,
    confusion_matrix
)
from sklearn.model_selection import train_test_split, GridSearchCV

# 配置环境变量
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf",
                                    "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \
    .config("spark.driver.memory", "48g") \
    .config("spark.driver.maxResultSize", "16g") \
    .appName("test_djj") \
    .enableHiveSupport() \
    .getOrCreate()


# 计时装饰器
def timeit(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"开始: {func.__name__}...")
        result = func(*args, **kwargs)
        end_time = time.time()
        elapsed = end_time - start_time
        print(f"完成: {func.__name__} | 耗时: {elapsed:.2f}秒")
        return result

    return wrapper


# 1. 数据加载并转换为CSR矩阵
@timeit
def load_and_prepare_data():
    print("加载数据并转换为CSR矩阵...")
    # 直接读取已标记的正负样本(label=1为正样本,label=0为负样本)
    df = spark.sql(
        "SELECT id, aggregated_vector, label FROM database.table WHERE label IN (0, 1)")

    # 收集数据到Driver节点
    print("收集数据到Driver节点...")
    pdf = df.select(
        F.col("aggregated_vector").alias("features"),
        F.col("label").alias("label")
    ).toPandas()

    print("提取特征和标签...")
    # 准备CSR矩阵的数据
    row_indices = []
    col_indices = []
    data_values = []
    labels = []

    # 遍历所有样本,构建CSR矩阵
    for i, row in pdf.iterrows():
        vec = row['features']
        size = vec['size']
        indices = vec['indices']
        values = vec['values']
        label = row['label']

        for j in range(len(indices)):
            row_indices.append(i)
            col_indices.append(indices[j])
            data_values.append(values[j])

        labels.append(label)

    # 获取特征维度
    num_features = pdf["features"].iloc[0]['size']
    num_samples = len(pdf)

    # 创建CSR矩阵
    X = csr_matrix((data_values, (row_indices, col_indices)),
                   shape=(num_samples, num_features))

    y = np.array(labels)

    print(f"数据矩阵形状: {X.shape}")

    return X, y


# 2. 准备训练数据(直接使用已标记的正负样本)
@timeit
def prepare_training_data(X, y):
    print("准备训练数据...")

    # 计算样本权重
    positive_count = sum(y == 1)
    negative_count = sum(y == 0)
    total_count = len(y)

    print(f"正样本数量: {positive_count:,}")
    print(f"负样本数量: {negative_count:,}")
    print(f"总量: {total_count:,}")

    # 创建样本权重数组
    sample_weight = np.zeros(total_count)
    sample_weight[y == 1] = total_count / (2.0 * positive_count)
    sample_weight[y == 0] = total_count / (2.0 * negative_count)

    return X, y, sample_weight


# 3. LightGBM模型训练(带精简网格寻参)
@timeit
def train_lightgbm_model(X_train, y_train, sample_weight):
    print("训练LightGBM模型(带精简网格寻参)...")

    # 定义基础模型
    base_model = lgb.LGBMClassifier(
        objective='binary',
        random_state=42,
        n_jobs=-1  # 使用所有CPU核心
    )

    # 精简参数网格 - 专注于最重要的参数
    param_grid = {
        'num_leaves': [31, 63],  # 控制模型复杂度
        'min_child_samples': [20, 50],  # 防止过拟合
        'reg_alpha': [0, 0.1],  # L1正则化
        'reg_lambda': [0, 0.1],  # L2正则化
    }

    # 创建网格搜索对象
    grid_search = GridSearchCV(
        estimator=base_model,
        param_grid=param_grid,
        scoring='roc_auc',
        cv=3,  # 3折交叉验证
        verbose=2,
        n_jobs=-1  # 使用所有CPU核心
    )

    # 执行网格搜索
    start_time = time.time()
    grid_search.fit(X_train, y_train, sample_weight=sample_weight)
    training_time = time.time() - start_time

    # 输出最佳参数
    print("最佳参数组合:")
    for param, value in grid_search.best_params_.items():
        print(f"{param}: {value}")
    print(f"最佳AUC分数: {grid_search.best_score_:.4f}")

    # 获取最佳模型
    best_model = grid_search.best_estimator_

    return best_model, training_time


# 4. 模型评估
@timeit
def evaluate_model(model, X_test, y_test):
    print("模型评估...")

    # 使用模型进行预测
    start_time = time.time()
    y_pred_proba = model.predict_proba(X_test)[:, 1]  # 正类的概率
    y_pred = (y_pred_proba > 0.5).astype(int)
    prediction_time = time.time() - start_time

    # 计算评估指标
    auc = roc_auc_score(y_test, y_pred_proba)
    auprc = average_precision_score(y_test, y_pred_proba)
    f1 = f1_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)

    # 生成混淆矩阵
    cm = confusion_matrix(y_test, y_pred)
    tn, fp, fn, tp = cm.ravel()

    # 计算各类别指标
    precision_pos = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall_pos = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1_pos = 2 * (precision_pos * recall_pos) / (precision_pos + recall_pos) if (precision_pos + recall_pos) > 0 else 0

    precision_neg = tn / (tn + fn) if (tn + fn) > 0 else 0
    recall_neg = tn / (tn + fp) if (tn + fp) > 0 else 0
    f1_neg = 2 * (precision_neg * recall_neg) / (precision_neg + recall_neg) if (precision_neg + recall_neg) > 0 else 0

    # 生成分类报告(包含所有评估指标)
    report = f"""
    ================= 分类报告 =================
    混淆矩阵:
    [[{tn} {fp}]
     [{fn} {tp}]]

    正样本 (1):
        Precision: {precision_pos:.4f}
        Recall:    {recall_pos:.4f}
        F1 Score:  {f1_pos:.4f}

    负样本 (0):
        Precision: {precision_neg:.4f}
        Recall:    {recall_neg:.4f}
        F1 Score:  {f1_neg:.4f}

    整体评估指标:
        Accuracy:  {accuracy:.4f}
        AUC:       {auc:.4f}
        AUPRC:     {auprc:.4f}
        F1 Score:  {f1:.4f}
        Precision: {precision:.4f}
        Recall:    {recall:.4f}
    """

    # 返回时间信息和报告
    return {
               "training_time": None,  # 将在主函数中设置
               "prediction_time": prediction_time
           }, report


# 主函数
def main():
    # 1. 加载数据并转换为CSR矩阵
    X, y = load_and_prepare_data()

    # 2. 准备训练数据
    X, y, sample_weight = prepare_training_data(X, y)

    # 计算正负样本比例(用于scale_pos_weight参数)
    positive_count = sum(y == 1)
    negative_count = sum(y == 0)
    print(f"正负样本比例: {negative_count / positive_count:.2f}:1")

    # 3. 数据分割
    print("数据分割...")
    X_train, X_test, y_train, y_test, sample_weight_train, _ = train_test_split(
        X, y, sample_weight, test_size=0.2, random_state=42
    )

    print(f"训练集大小: {X_train.shape[0]}")
    print(f"测试集大小: {X_test.shape[0]}")

    # 4. 训练LightGBM模型
    print(f"\n{'=' * 50}")
    print("开始训练 LightGBM 模型(带精简网格寻参)")
    print(f"{'=' * 50}")

    model, training_time = train_lightgbm_model(X_train, y_train, sample_weight_train)

    # 5. 模型评估
    results, report = evaluate_model(model, X_test, y_test)
    results["training_time"] = training_time

    # 打印报告(包含所有评估指标)
    print(report)

    # 打印时间信息
    print(f"\n===== 时间统计 =====")
    print(f"训练时间: {results['training_time']:.2f}秒")
    print(f"预测时间: {results['prediction_time']:.2f}秒")

    print("建模流程完成!")
    spark.stop()
    # [Info] 最佳参数组合:
    # [Info] min_child_samples: 50
    # [Info] num_leaves: 63
    # [Info] reg_alpha: 0.1
    # [Info] reg_lambda: 0
    # [Info] 最佳AUC分数: 0.6631
    # [Info] 完成: train_lightgbm_model | 耗时: 15272.61秒
    # [Info] 模型保存成功!


if __name__ == "__main__":
    main()