基于Elasticsearch的短视频平台个性化推荐系统设计与实现

发布于:2025-07-07 ⋅ 阅读:(16) ⋅ 点赞:(0)

在当今内容爆炸的时代,个性化推荐系统已成为短视频平台的核心竞争力之一。本文将详细介绍如何利用Elasticsearch(ES)构建一个高效、可扩展的短视频个性化推荐系统。

一、系统架构概述

我们的推荐系统将采用混合推荐策略,结合协同过滤、内容相似度和热度推荐等多种方法。Elasticsearch作为核心搜索引擎和数据存储,将承担以下职责:

  1. 用户画像存储与查询
  2. 视频内容索引与检索
  3. 实时行为日志分析
  4. 推荐结果计算与排序

二、数据模型设计

1. 用户数据模型

{
  "mappings": {
    "properties": {
      "user_id": {"type": "keyword"},
      "age": {"type": "integer"},
      "gender": {"type": "keyword"},
      "location": {"type": "geo_point"},
      "interests": {"type": "keyword"},
      "watch_history": {
        "type": "nested",
        "properties": {
          "video_id": {"type": "keyword"},
          "watch_time": {"type": "date"},
          "duration": {"type": "float"},
          "interaction": {
            "type": "nested",
            "properties": {
              "type": {"type": "keyword"}, // like, share, comment, etc.
              "timestamp": {"type": "date"}
            }
          }
        }
      },
      "followers": {"type": "keyword"},
      "following": {"type": "keyword"},
      "created_at": {"type": "date"}
    }
  }
}

2. 视频数据模型

{
  "mappings": {
    "properties": {
      "video_id": {"type": "keyword"},
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "description": {
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "tags": {"type": "keyword"},
      "category": {"type": "keyword"},
      "creator_id": {"type": "keyword"},
      "duration": {"type": "integer"},
      "created_at": {"type": "date"},
      "location": {"type": "geo_point"},
      "stats": {
        "properties": {
          "views": {"type": "integer"},
          "likes": {"type": "integer"},
          "shares": {"type": "integer"},
          "comments": {"type": "integer"},
          "watch_time_avg": {"type": "float"}
        }
      },
      "embedding": {
        "type": "dense_vector",
        "dims": 512
      }
    }
  }
}

三、核心推荐算法实现

1. 基于用户画像的内容推荐

from elasticsearch import Elasticsearch
from datetime import datetime, timedelta

es = Elasticsearch(["localhost:9200"])

def get_content_based_recommendations(user_id, size=10):
    # 获取用户画像
    user_profile = es.get(index="user_profiles", id=user_id)['_source']
    
    # 构建查询
    query = {
        "bool": {
            "should": [
                {"terms": {"tags": user_profile.get("interests", [])}},
                {"term": {"category": user_profile.get("primary_interest")}},
                {"geo_distance": {
                    "distance": "100km",
                    "location": user_profile.get("location")
                }}
            ],
            "must_not": [
                {"terms": {
                    "video_id": [h['video_id'] for h in user_profile.get('watch_history', [])]
                }}
            ]
        }
    }
    
    # 添加时间衰减因子 - 优先推荐新内容
    recency_script = {
        "script_score": {
            "script": {
                "source": """
                    double decay = 0.5;
                    double scale = 7;
                    double offset = 0;
                    double decayValue = decay * Math.exp(-Math.max(
                        doc['created_at'].value.toInstant().toEpochMilli() - params.now, 0) / scale);
                    return decayValue + _score;
                """,
                "params": {
                    "now": datetime.now().timestamp() * 1000
                }
            }
        }
    }
    
    response = es.search(
        index="videos",
        body={
            "query": {
                "function_score": {
                    "query": query,
                    "functions": [recency_script],
                    "score_mode": "sum"
                }
            },
            "size": size
        }
    )
    
    return [hit['_source'] for hit in response['hits']['hits']]

2. 基于协同过滤的相似用户推荐

def find_similar_users(user_id, size=5):
    # 获取目标用户观看历史
    target_user = es.get(index="user_profiles", id=user_id)['_source']
    target_videos = {h['video_id'] for h in target_user.get('watch_history', [])}
    
    # 查找观看过相同视频的用户
    query = {
        "bool": {
            "must": [
                {"nested": {
                    "path": "watch_history",
                    "query": {
                        "terms": {
                            "watch_history.video_id": list(target_videos)[:100]  # 限制数量防止查询过大
                        }
                    }
                }},
                {"range": {
                    "watch_history.watch_time": {
                        "gte": "now-30d/d"
                    }
                }}
            ],
            "must_not": [
                {"term": {"user_id": user_id}}
            ]
        }
    }
    
    # 使用脚本评分计算相似度
    script = {
        "script_score": {
            "script": {
                "source": """
                    double score = 0;
                    for (def item : params.target_videos) {
                        for (def wh : doc['watch_history']) {
                            if (wh.video_id == item) {
                                score += 1;
                                break;
                            }
                        }
                    }
                    return score;
                """,
                "params": {
                    "target_videos": list(target_videos)
                }
            }
        }
    }
    
    response = es.search(
        index="user_profiles",
        body={
            "query": {
                "function_score": {
                    "query": query,
                    "functions": [script],
                    "score_mode": "sum"
                }
            },
            "size": size
        }
    )
    
    return [hit['_source']['user_id'] for hit in response['hits']['hits']]

def get_collaborative_recommendations(user_id, size=10):
    similar_users = find_similar_users(user_id)
    
    # 获取相似用户观看但目标用户未观看的视频
    query = {
        "bool": {
            "must": [
                {"terms": {"creator_id": similar_users}},
                {"nested": {
                    "path": "watch_history",
                    "query": {
                        "terms": {
                            "watch_history.user_id": similar_users
                        }
                    }
                }}
            ],
            "must_not": [
                {"term": {"watch_history.user_id": user_id}}
            ]
        }
    }
    
    # 根据观看次数和互动率排序
    response = es.search(
        index="videos",
        body={
            "query": query,
            "sort": [
                {"stats.likes": {"order": "desc"}},
                {"stats.watch_time_avg": {"order": "desc"}}
            ],
            "size": size
        }
    )
    
    return [hit['_source'] for hit in response['hits']['hits']]

3. 基于向量相似度的深度推荐

import numpy as np
from sentence_transformers import SentenceTransformer

# 初始化模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

def get_video_embeddings(video_ids):
    # 从ES获取视频文本内容
    response = es.mget(
        index="videos",
        body={"ids": video_ids}
    )
    
    videos = [doc['_source'] for doc in response['docs'] if doc['found']]
    texts = [
        f"{v['title']} {v['description']} {' '.join(v.get('tags', []))}"
        for v in videos
    ]
    
    # 生成嵌入向量
    embeddings = model.encode(texts, convert_to_tensor=False)
    
    # 更新ES中的视频嵌入
    for i, vid in enumerate(video_ids):
        es.update(
            index="videos",
            id=vid,
            body={"doc": {"embedding": embeddings[i].tolist()}}
        )
    
    return dict(zip(video_ids, embeddings))

def get_semantic_recommendations(user_id, size=10):
    # 获取用户最近观看的视频
    user = es.get(index="user_profiles", id=user_id)['_source']
    recent_watched = [
        h for h in sorted(
            user.get('watch_history', []),
            key=lambda x: x.get('watch_time', 0),
            reverse=True
        )[:5]
    ]
    
    if not recent_watched:
        return []
    
    # 获取这些视频的嵌入向量
    video_ids = [h['video_id'] for h in recent_watched]
    video_embeddings = get_video_embeddings(video_ids)
    
    # 计算平均用户兴趣向量
    user_vector = np.mean([video_embeddings[vid] for vid in video_ids], axis=0)
    
    # 在ES中搜索相似视频
    script_query = {
        "script_score": {
            "query": {"match_all": {}},
            "script": {
                "source": """
                    double similarity = cosineSimilarity(params.user_vector, 'embedding');
                    return similarity;
                """,
                "params": {
                    "user_vector": user_vector.tolist()
                }
            }
        }
    }
    
    response = es.search(
        index="videos",
        body={
            "query": script_query,
            "size": size,
            "_source": ["video_id", "title", "description", "tags"]
        }
    )
    
    return [hit['_source'] for hit in response['hits']['hits']]

四、混合推荐策略

def hybrid_recommendation(user_id, size=20):
    # 获取各种推荐结果
    content_based = get_content_based_recommendations(user_id, size//4)
    collaborative = get_collaborative_recommendations(user_id, size//4)
    semantic = get_semantic_recommendations(user_id, size//4)
    
    # 获取热门推荐作为补充
    popular = get_popular_videos(size//4)
    
    # 合并结果并去重
    all_recs = {}
    for rec_list in [content_based, collaborative, semantic, popular]:
        for rec in rec_list:
            vid = rec['video_id']
            if vid not in all_recs:
                all_recs[vid] = rec
    
    # 个性化排序
    ranked = personalize_ranking(user_id, list(all_recs.values()))
    
    return ranked[:size]

def personalize_ranking(user_id, recommendations):
    user = es.get(index="user_profiles", id=user_id)['_source']
    
    # 为每个推荐项计算个性化分数
    for rec in recommendations:
        score = 0
        
        # 内容匹配分数
        content_score = 0
        if 'interests' in user and 'tags' in rec:
            common_tags = set(user['interests']) & set(rec['tags'])
            content_score = len(common_tags) * 0.2
        
        # 创作者关注分数
        creator_score = 1 if rec['creator_id'] in user.get('following', []) else 0
        
        # 热度分数
        popularity_score = min(rec['stats']['likes'] / 1000, 5)
        
        # 时间衰减
        recency = (datetime.now() - datetime.fromisoformat(rec['created_at'])).days
        recency_score = max(0, 1 - recency / 30)
        
        # 综合分数
        rec['personal_score'] = (
            0.4 * content_score +
            0.3 * creator_score +
            0.2 * popularity_score +
            0.1 * recency_score
        )
    
    # 按分数排序
    return sorted(recommendations, key=lambda x: x['personal_score'], reverse=True)

def get_popular_videos(size=5, time_range="7d"):
    response = es.search(
        index="videos",
        body={
            "query": {
                "range": {
                    "created_at": {
                        "gte": f"now-{time_range}/d"
                    }
                }
            },
            "sort": [
                {"stats.likes": {"order": "desc"}},
                {"stats.views": {"order": "desc"}}
            ],
            "size": size
        }
    )
    return [hit['_source'] for hit in response['hits']['hits']]

五、实时反馈与模型更新

def log_user_interaction(user_id, video_id, interaction_type):
    # 记录用户交互
    timestamp = datetime.utcnow().isoformat()
    
    script = """
        if (ctx._source.watch_history == null) {
            ctx._source.watch_history = [];
        }
        
        boolean found = false;
        for (int i = 0; i < ctx._source.watch_history.size(); i++) {
            if (ctx._source.watch_history[i].video_id == params.video_id) {
                ctx._source.watch_history[i].last_watched = params.timestamp;
                if (params.interaction_type == 'watch') {
                    ctx._source.watch_history[i].watch_count += 1;
                } else {
                    if (ctx._source.watch_history[i].interactions == null) {
                        ctx._source.watch_history[i].interactions = [];
                    }
                    ctx._source.watch_history[i].interactions.add(
                        {
                            'type': params.interaction_type,
                            'timestamp': params.timestamp
                        }
                    );
                }
                found = true;
                break;
            }
        }
        
        if (!found && params.interaction_type == 'watch') {
            ctx._source.watch_history.add(
                {
                    'video_id': params.video_id,
                    'first_watched': params.timestamp,
                    'last_watched': params.timestamp,
                    'watch_count': 1,
                    'interactions': []
                }
            );
        }
    """
    
    es.update(
        index="user_profiles",
        id=user_id,
        body={
            "script": {
                "source": script,
                "lang": "painless",
                "params": {
                    "video_id": video_id,
                    "interaction_type": interaction_type,
                    "timestamp": timestamp
                }
            }
        }
    )
    
    # 更新视频统计
    if interaction_type in ['like', 'share', 'comment']:
        es.update(
            index="videos",
            id=video_id,
            body={
                "script": {
                    "source": f"ctx._source.stats.{interaction_type}s += 1",
                    "lang": "painless"
                }
            }
        )

六、性能优化与扩展

  1. 索引优化

    • 为常用查询字段设置合适的mapping类型
    • 使用index sorting预排序
    • 合理设置分片数和副本数
  2. 查询优化

    • 使用filter context缓存常用过滤条件
    • 合理使用bool查询的must/should/filter组合
    • 限制返回字段数量
  3. 缓存策略

    • 使用Redis缓存热门推荐结果
    • 实现用户推荐结果的短期缓存
    • 对向量相似度计算实现近似最近邻(ANN)搜索
  4. 扩展性考虑

    • 实现AB测试框架评估不同推荐策略
    • 设计插件式架构便于添加新的推荐算法
    • 考虑使用Elasticsearch的机器学习功能进行异常检测

七、总结

本文详细介绍了基于Elasticsearch构建短视频平台个性化推荐系统的完整方案。通过结合内容推荐、协同过滤和语义向量相似度等多种技术,我们能够为用户提供精准的个性化内容推荐。Elasticsearch的强大搜索和分析能力使其成为构建推荐系统的理想选择。

实际应用中,还需要考虑以下方面:

  • 冷启动问题的解决方案
  • 推荐多样性与惊喜度的平衡
  • 实时推荐与批量推荐的结合
  • 推荐结果的解释性

网站公告

今日签到

点亮在社区的每一天
去签到