导购APP的离线计算架构:基于Spark的用户画像与推荐模型训练
大家好,我是阿可,微赚淘客系统及省赚客APP创始人,是个冬天不穿秋裤,天冷也要风度的程序猿!
导购APP的核心竞争力在于精准推荐——通过分析用户行为(浏览、收藏、下单)生成个性化商品推荐,可提升转化率30%以上。当用户量突破100万、日均行为数据达5000万条时,传统单机计算无法处理海量数据。基于此,我们构建Spark离线计算架构,实现用户画像标签生成与推荐模型训练,每日凌晨批量计算并更新推荐结果,将推荐点击率从8%提升至15%。以下从架构设计、用户画像实现、推荐模型训练三方面展开,附完整代码示例。
一、离线计算整体架构
1.1 技术栈选型
针对导购APP的离线计算场景(高数据量、低实时性、周期性任务),选型如下:
- 计算引擎:Spark 3.3.2(支持DataFrame API与MLlib机器学习库);
- 数据存储:HDFS(原始数据存储)、Hive(数据仓库)、Redis(画像标签与推荐结果缓存);
- 任务调度:Airflow 2.5.3(按日调度计算任务,支持依赖管理);
- 模型管理:MLflow 2.3.1(模型版本控制与部署)。
1.2 数据流向
- 每日凌晨2点,Airflow触发计算任务;
- Spark从Hive读取用户行为数据(近30天)与商品基础数据;
- 第一步:计算用户标签(如“母婴用品偏好”“高频返利用户”);
- 第二步:训练协同过滤推荐模型,生成个性化商品推荐列表;
- 将用户画像标签与推荐结果写入Redis,供线上服务查询。
二、用户画像标签计算实现
用户画像以“标签-权重”形式表示用户偏好,如“女装:0.8”“数码:0.3”,权重越高表示偏好越强。
2.1 数据准备(Hive表结构)
-- 用户行为表(dwd.user_behavior)
CREATE TABLE dwd.user_behavior (
user_id STRING COMMENT '用户ID',
product_id STRING COMMENT '商品ID',
category_id STRING COMMENT '商品类目ID',
behavior_type STRING COMMENT '行为类型:view/collect/buy',
behavior_time STRING COMMENT '行为时间',
behavior_weight DOUBLE COMMENT '行为权重(view:1, collect:3, buy:5)'
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET;
-- 商品类目表(dim.category)
CREATE TABLE dim.category (
category_id STRING COMMENT '类目ID',
category_name STRING COMMENT '类目名称',
parent_id STRING COMMENT '父类目ID'
)
STORED AS PARQUET;
2.2 Spark计算用户类目偏好标签
package cn.juwatech.userportrait.job;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.util.Calendar;
import java.util.Date;
/**
* 用户类目偏好标签计算任务(每日执行)
*/
public class UserCategoryPreferenceJob {
public static void main(String[] args) {
// 初始化SparkSession
SparkSession spark = SparkSession.builder()
.appName("UserCategoryPreferenceJob")
.enableHiveSupport()
.getOrCreate();
// 日期参数(默认处理昨天数据,计算近30天偏好)
String endDt = args.length > 0 ? args[0] : getYesterday();
String startDt = getDateBefore(endDt, 30);
// 1. 读取近30天用户行为数据
Dataset<Row> behaviorDF = spark.sql(String.format(
"SELECT user_id, category_id, behavior_weight " +
"FROM dwd.user_behavior " +
"WHERE dt >= '%s' AND dt <= '%s'", startDt, endDt
));
// 2. 按用户+类目聚合,计算总权重(偏好分数)
Dataset<Row> categoryScoreDF = behaviorDF
.groupBy("user_id", "category_id")
.agg(functions.sum("behavior_weight").alias("total_weight"));
// 3. 归一化处理(将权重缩放到0-1范围)
Dataset<Row> maxScoreDF = categoryScoreDF
.groupBy("user_id")
.agg(functions.max("total_weight").alias("max_weight"));
Dataset<Row> normalizedDF = categoryScoreDF
.join(maxScoreDF, "user_id")
.withColumn("preference_score",
functions.col("total_weight").divide(functions.col("max_weight")))
.select(
functions.col("user_id"),
functions.col("category_id"),
functions.round(functions.col("preference_score"), 2).alias("score")
);
// 4. 关联类目名称,生成最终标签
Dataset<Row> tagDF = normalizedDF
.join(spark.table("dim.category"), "category_id")
.select(
functions.col("user_id"),
functions.col("category_name").alias("tag_name"),
functions.col("score")
)
.where(functions.col("score").gt(0.1)) // 过滤低权重标签
.orderBy(functions.col("user_id"), functions.col("score").desc());
// 5. 写入Redis(用户ID为key,标签列表为value)
tagDF.foreachPartition(partition -> {
// 初始化Redis连接(实际应使用连接池)
RedisClient redisClient = new RedisClient("redis-host", 6379);
while (partition.hasNext()) {
Row row = partition.next();
String userId = row.getString(0);
String tagName = row.getString(1);
double score = row.getDouble(2);
// 存储格式:user:tag:{userId} -> 哈希表(tagName:score)
redisClient.hset(
String.format("user:tag:%s", userId),
tagName,
String.valueOf(score)
);
}
redisClient.close();
});
spark.stop();
}
// 获取昨天日期(yyyy-MM-dd)
private static String getYesterday() {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.DATE, -1);
return String.format("%d-%02d-%02d",
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DATE));
}
// 获取指定日期前n天的日期
private static String getDateBefore(String date, int n) {
// 实现日期计算逻辑(略)
return date;
}
}
2.3 用户价值标签计算
除偏好标签外,需计算用户价值标签(如“高价值用户”“流失风险用户”):
// 计算用户消费能力标签
Dataset<Row> userValueDF = spark.sql("SELECT user_id, sum(order_amount) as total_amount FROM dwd.order GROUP BY user_id");
Dataset<Row> valueTagDF = userValueDF
.withColumn("value_tag",
functions.when(functions.col("total_amount").gt(10000), "high_value")
.when(functions.col("total_amount").gt(1000), "medium_value")
.otherwise("low_value"))
.select("user_id", "value_tag");
三、推荐模型训练与预测
基于Spark MLlib实现协同过滤推荐模型,根据用户历史行为预测商品偏好分数。
3.1 协同过滤模型训练
package cn.juwatech.recommend.job;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.functions;
/**
* 基于ALS的商品推荐模型训练任务
*/
public class ProductRecommendationJob {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("ProductRecommendationJob")
.enableHiveSupport()
.getOrCreate();
// 1. 读取用户-商品行为数据,转换为评分矩阵
Dataset<Row> ratingDF = spark.sql(
"SELECT " +
" cast(user_id as integer) as user_id, " +
" cast(product_id as integer) as product_id, " +
" sum(behavior_weight) as rating " + // 行为权重作为评分
"FROM dwd.user_behavior " +
"WHERE dt >= date_sub(current_date(), 90) " + // 近90天数据
"GROUP BY user_id, product_id"
).filter("rating > 0"); // 过滤无效评分
// 2. 划分训练集与测试集(8:2)
Dataset<Row>[] splits = ratingDF.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> trainingDF = splits[0];
Dataset<Row> testDF = splits[1];
// 3. 训练ALS模型
ALS als = new ALS()
.setMaxIter(10) // 迭代次数
.setRank(20) // 特征维度
.setRegParam(0.1) // 正则化参数
.setUserCol("user_id")
.setItemCol("product_id")
.setRatingCol("rating")
.setColdStartStrategy("drop"); // 冷启动策略:丢弃无评分用户/商品
ALSModel model = als.fit(trainingDF);
// 4. 模型评估(RMSE指标)
Dataset<Row> predictions = model.transform(testDF);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
// 5. 为每个用户推荐10个商品
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// 6. 处理推荐结果(提取商品ID列表)
Dataset<Row> recommendationsDF = userRecs
.withColumn("recommendations",
functions.col("recommendations")
.getField("product_id")
.cast(DataTypes.StringType))
.select(
functions.col("user_id").cast(DataTypes.StringType).alias("user_id"),
functions.col("recommendations")
);
// 7. 写入Redis(用户ID为key,推荐商品ID列表为value)
recommendationsDF.foreachPartition(partition -> {
RedisClient redisClient = new RedisClient("redis-host", 6379);
while (partition.hasNext()) {
Row row = partition.next();
String userId = row.getString(0);
String productIds = row.getString(1);
// 存储格式:user:rec:{userId} -> 商品ID列表(逗号分隔)
redisClient.set(
String.format("user:rec:%s", userId),
productIds
);
// 设置过期时间(24小时)
redisClient.expire(String.format("user:rec:%s", userId), 86400);
}
redisClient.close();
});
// 8. 保存模型到HDFS(供后续增量训练)
model.save("hdfs:///models/als-recommender/" + System.currentTimeMillis());
spark.stop();
}
}
3.2 模型优化策略
- 特征工程优化:在评分矩阵中加入商品类目、价格区间等特征,使用ALS-WR模型提升精度;
- 冷启动处理:对新用户,基于注册信息(如性别、年龄)匹配相似用户的推荐列表;
- 增量训练:每日仅用新增数据更新模型,减少计算量(Spark MLlib支持模型加载与增量训练);
- 结果过滤:过滤用户已购买商品、低评分商品,提升推荐相关性。
四、任务调度与监控
4.1 Airflow任务调度(DAG定义)
# dags/user_portrait_and_recommend.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email': ['data@juwatech.cn'],
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# 定义DAG:每日凌晨2点执行,先运行用户画像任务,再运行推荐模型任务
dag = DAG(
'user_portrait_and_recommend',
default_args=default_args,
schedule_interval='0 2 * * *',
catchup=False
)
# 任务1:用户画像标签计算
user_tag_task = BashOperator(
task_id='user_category_preference',
bash_command='spark-submit --class cn.juwatech.userportrait.job.UserCategoryPreferenceJob '
'--master yarn --deploy-mode cluster '
'/opt/jobs/user-portrait-1.0.0.jar {{ ds }}',
dag=dag
)
# 任务2:推荐模型训练
recommend_task = BashOperator(
task_id='product_recommendation_train',
bash_command='spark-submit --class cn.juwatech.recommend.job.ProductRecommendationJob '
'--master yarn --deploy-mode cluster '
'--executor-memory 8G --num-executors 10 '
'/opt/jobs/recommend-1.0.0.jar',
dag=dag
)
# 任务依赖:先计算用户画像,再训练推荐模型
user_tag_task >> recommend_task
4.2 计算任务监控
- 资源监控:通过YARN ResourceManager监控Spark任务的内存、CPU使用情况,避免资源不足导致失败;
- 质量监控:对比每日推荐结果数量、用户覆盖率,低于阈值时触发告警;
- 效果监控:线上收集推荐点击率(CTR)、转化率(CVR),定期反馈模型优化方向。
本文著作权归聚娃科技省赚客app开发者团队,转载请注明出处!