导购APP的离线计算架构:基于Spark的用户画像与推荐模型训练

发布于:2025-09-15 ⋅ 阅读:(19) ⋅ 点赞:(0)

导购APP的离线计算架构:基于Spark的用户画像与推荐模型训练

大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!

导购APP的核心竞争力在于精准推荐——通过分析用户行为(浏览、收藏、下单)生成个性化商品推荐,可提升转化率30%以上。当用户量突破100万、日均行为数据达5000万条时,传统单机计算无法处理海量数据。基于此,我们构建Spark离线计算架构,实现用户画像标签生成与推荐模型训练,每日凌晨批量计算并更新推荐结果,将推荐点击率从8%提升至15%。以下从架构设计、用户画像实现、推荐模型训练三方面展开,附完整代码示例。
导购APP

一、离线计算整体架构

1.1 技术栈选型

针对导购APP的离线计算场景(高数据量、低实时性、周期性任务),选型如下:

  • 计算引擎:Spark 3.3.2(支持DataFrame API与MLlib机器学习库);
  • 数据存储:HDFS(原始数据存储)、Hive(数据仓库)、Redis(画像标签与推荐结果缓存);
  • 任务调度:Airflow 2.5.3(按日调度计算任务,支持依赖管理);
  • 模型管理:MLflow 2.3.1(模型版本控制与部署)。

1.2 数据流向

  1. 每日凌晨2点,Airflow触发计算任务;
  2. Spark从Hive读取用户行为数据(近30天)与商品基础数据;
  3. 第一步:计算用户标签(如“母婴用品偏好”“高频返利用户”);
  4. 第二步:训练协同过滤推荐模型,生成个性化商品推荐列表;
  5. 将用户画像标签与推荐结果写入Redis,供线上服务查询。

二、用户画像标签计算实现

用户画像以“标签-权重”形式表示用户偏好,如“女装:0.8”“数码:0.3”,权重越高表示偏好越强。

2.1 数据准备(Hive表结构)

-- 用户行为表(dwd.user_behavior)
CREATE TABLE dwd.user_behavior (
  user_id STRING COMMENT '用户ID',
  product_id STRING COMMENT '商品ID',
  category_id STRING COMMENT '商品类目ID',
  behavior_type STRING COMMENT '行为类型:view/collect/buy',
  behavior_time STRING COMMENT '行为时间',
  behavior_weight DOUBLE COMMENT '行为权重(view:1, collect:3, buy:5)'
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;

-- 商品类目表(dim.category)
CREATE TABLE dim.category (
  category_id STRING COMMENT '类目ID',
  category_name STRING COMMENT '类目名称',
  parent_id STRING COMMENT '父类目ID'
)
STORED AS PARQUET;

2.2 Spark计算用户类目偏好标签

package cn.juwatech.userportrait.job;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.util.Calendar;
import java.util.Date;

/**
 * 用户类目偏好标签计算任务(每日执行)
 */
public class UserCategoryPreferenceJob {

    public static void main(String[] args) {
        // 初始化SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("UserCategoryPreferenceJob")
                .enableHiveSupport()
                .getOrCreate();

        // 日期参数(默认处理昨天数据,计算近30天偏好)
        String endDt = args.length > 0 ? args[0] : getYesterday();
        String startDt = getDateBefore(endDt, 30);

        // 1. 读取近30天用户行为数据
        Dataset<Row> behaviorDF = spark.sql(String.format(
                "SELECT user_id, category_id, behavior_weight " +
                "FROM dwd.user_behavior " +
                "WHERE dt >= '%s' AND dt <= '%s'", startDt, endDt
        ));

        // 2. 按用户+类目聚合,计算总权重(偏好分数)
        Dataset<Row> categoryScoreDF = behaviorDF
                .groupBy("user_id", "category_id")
                .agg(functions.sum("behavior_weight").alias("total_weight"));

        // 3. 归一化处理(将权重缩放到0-1范围)
        Dataset<Row> maxScoreDF = categoryScoreDF
                .groupBy("user_id")
                .agg(functions.max("total_weight").alias("max_weight"));

        Dataset<Row> normalizedDF = categoryScoreDF
                .join(maxScoreDF, "user_id")
                .withColumn("preference_score",
                        functions.col("total_weight").divide(functions.col("max_weight")))
                .select(
                        functions.col("user_id"),
                        functions.col("category_id"),
                        functions.round(functions.col("preference_score"), 2).alias("score")
                );

        // 4. 关联类目名称,生成最终标签
        Dataset<Row> tagDF = normalizedDF
                .join(spark.table("dim.category"), "category_id")
                .select(
                        functions.col("user_id"),
                        functions.col("category_name").alias("tag_name"),
                        functions.col("score")
                )
                .where(functions.col("score").gt(0.1)) // 过滤低权重标签
                .orderBy(functions.col("user_id"), functions.col("score").desc());

        // 5. 写入Redis(用户ID为key,标签列表为value)
        tagDF.foreachPartition(partition -> {
            // 初始化Redis连接(实际应使用连接池)
            RedisClient redisClient = new RedisClient("redis-host", 6379);
            while (partition.hasNext()) {
                Row row = partition.next();
                String userId = row.getString(0);
                String tagName = row.getString(1);
                double score = row.getDouble(2);
                // 存储格式:user:tag:{userId} -> 哈希表(tagName:score)
                redisClient.hset(
                        String.format("user:tag:%s", userId),
                        tagName,
                        String.valueOf(score)
                );
            }
            redisClient.close();
        });

        spark.stop();
    }

    // 获取昨天日期(yyyy-MM-dd)
    private static String getYesterday() {
        Calendar cal = Calendar.getInstance();
        cal.setTime(new Date());
        cal.add(Calendar.DATE, -1);
        return String.format("%d-%02d-%02d",
                cal.get(Calendar.YEAR),
                cal.get(Calendar.MONTH) + 1,
                cal.get(Calendar.DATE));
    }

    // 获取指定日期前n天的日期
    private static String getDateBefore(String date, int n) {
        // 实现日期计算逻辑(略)
        return date;
    }
}

2.3 用户价值标签计算

除偏好标签外,需计算用户价值标签(如“高价值用户”“流失风险用户”):

// 计算用户消费能力标签
Dataset<Row> userValueDF = spark.sql("SELECT user_id, sum(order_amount) as total_amount FROM dwd.order GROUP BY user_id");
Dataset<Row> valueTagDF = userValueDF
        .withColumn("value_tag",
                functions.when(functions.col("total_amount").gt(10000), "high_value")
                        .when(functions.col("total_amount").gt(1000), "medium_value")
                        .otherwise("low_value"))
        .select("user_id", "value_tag");

三、推荐模型训练与预测

基于Spark MLlib实现协同过滤推荐模型,根据用户历史行为预测商品偏好分数。

3.1 协同过滤模型训练

package cn.juwatech.recommend.job;

import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.functions;

/**
 * 基于ALS的商品推荐模型训练任务
 */
public class ProductRecommendationJob {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("ProductRecommendationJob")
                .enableHiveSupport()
                .getOrCreate();

        // 1. 读取用户-商品行为数据,转换为评分矩阵
        Dataset<Row> ratingDF = spark.sql(
                "SELECT " +
                "  cast(user_id as integer) as user_id, " +
                "  cast(product_id as integer) as product_id, " +
                "  sum(behavior_weight) as rating " + // 行为权重作为评分
                "FROM dwd.user_behavior " +
                "WHERE dt >= date_sub(current_date(), 90) " + // 近90天数据
                "GROUP BY user_id, product_id"
        ).filter("rating > 0"); // 过滤无效评分

        // 2. 划分训练集与测试集(8:2)
        Dataset<Row>[] splits = ratingDF.randomSplit(new double[]{0.8, 0.2});
        Dataset<Row> trainingDF = splits[0];
        Dataset<Row> testDF = splits[1];

        // 3. 训练ALS模型
        ALS als = new ALS()
                .setMaxIter(10) // 迭代次数
                .setRank(20) // 特征维度
                .setRegParam(0.1) // 正则化参数
                .setUserCol("user_id")
                .setItemCol("product_id")
                .setRatingCol("rating")
                .setColdStartStrategy("drop"); // 冷启动策略:丢弃无评分用户/商品

        ALSModel model = als.fit(trainingDF);

        // 4. 模型评估(RMSE指标)
        Dataset<Row> predictions = model.transform(testDF);
        RegressionEvaluator evaluator = new RegressionEvaluator()
                .setMetricName("rmse")
                .setLabelCol("rating")
                .setPredictionCol("prediction");
        double rmse = evaluator.evaluate(predictions);
        System.out.println("Root-mean-square error = " + rmse);

        // 5. 为每个用户推荐10个商品
        Dataset<Row> userRecs = model.recommendForAllUsers(10);

        // 6. 处理推荐结果(提取商品ID列表)
        Dataset<Row> recommendationsDF = userRecs
                .withColumn("recommendations",
                        functions.col("recommendations")
                                .getField("product_id")
                                .cast(DataTypes.StringType))
                .select(
                        functions.col("user_id").cast(DataTypes.StringType).alias("user_id"),
                        functions.col("recommendations")
                );

        // 7. 写入Redis(用户ID为key,推荐商品ID列表为value)
        recommendationsDF.foreachPartition(partition -> {
            RedisClient redisClient = new RedisClient("redis-host", 6379);
            while (partition.hasNext()) {
                Row row = partition.next();
                String userId = row.getString(0);
                String productIds = row.getString(1);
                // 存储格式:user:rec:{userId} -> 商品ID列表(逗号分隔)
                redisClient.set(
                        String.format("user:rec:%s", userId),
                        productIds
                );
                // 设置过期时间(24小时)
                redisClient.expire(String.format("user:rec:%s", userId), 86400);
            }
            redisClient.close();
        });

        // 8. 保存模型到HDFS(供后续增量训练)
        model.save("hdfs:///models/als-recommender/" + System.currentTimeMillis());

        spark.stop();
    }
}

3.2 模型优化策略

  1. 特征工程优化:在评分矩阵中加入商品类目、价格区间等特征,使用ALS-WR模型提升精度;
  2. 冷启动处理:对新用户,基于注册信息(如性别、年龄)匹配相似用户的推荐列表;
  3. 增量训练:每日仅用新增数据更新模型,减少计算量(Spark MLlib支持模型加载与增量训练);
  4. 结果过滤:过滤用户已购买商品、低评分商品,提升推荐相关性。

四、任务调度与监控

4.1 Airflow任务调度(DAG定义)

# dags/user_portrait_and_recommend.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'email': ['data@juwatech.cn'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# 定义DAG:每日凌晨2点执行,先运行用户画像任务,再运行推荐模型任务
dag = DAG(
    'user_portrait_and_recommend',
    default_args=default_args,
    schedule_interval='0 2 * * *',
    catchup=False
)

# 任务1:用户画像标签计算
user_tag_task = BashOperator(
    task_id='user_category_preference',
    bash_command='spark-submit --class cn.juwatech.userportrait.job.UserCategoryPreferenceJob '
                 '--master yarn --deploy-mode cluster '
                 '/opt/jobs/user-portrait-1.0.0.jar {{ ds }}',
    dag=dag
)

# 任务2:推荐模型训练
recommend_task = BashOperator(
    task_id='product_recommendation_train',
    bash_command='spark-submit --class cn.juwatech.recommend.job.ProductRecommendationJob '
                 '--master yarn --deploy-mode cluster '
                 '--executor-memory 8G --num-executors 10 '
                 '/opt/jobs/recommend-1.0.0.jar',
    dag=dag
)

# 任务依赖:先计算用户画像,再训练推荐模型
user_tag_task >> recommend_task

4.2 计算任务监控

  1. 资源监控:通过YARN ResourceManager监控Spark任务的内存、CPU使用情况,避免资源不足导致失败;
  2. 质量监控:对比每日推荐结果数量、用户覆盖率,低于阈值时触发告警;
  3. 效果监控:线上收集推荐点击率(CTR)、转化率(CVR),定期反馈模型优化方向。

本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!