基于用户的协同过滤推荐系统实战项目

发布于:2025-04-19 ⋅ 阅读:(25) ⋅ 点赞:(0)

基于用户的协同过滤推荐系统实战项目

1. 推荐系统基础理论

1.1 协同过滤概述

协同过滤(Collaborative Filtering, CF)是推荐系统中最经典、应用最广泛的技术之一。其核心思想是利用群体的智慧来进行推荐,基于"相似的用户喜欢相似的物品"或"喜欢某物品的用户也喜欢其他相似物品"的假设。

协同过滤主要分为两大类:

  • 基于记忆的协同过滤(Memory-based CF):直接使用用户-物品交互数据进行推荐,包括基于用户的协同过滤(User-based CF)和基于物品的协同过滤(Item-based CF)。
  • 基于模型的协同过滤(Model-based CF):通过机器学习算法从数据中学习模型,如矩阵分解(Matrix Factorization)、奇异值分解(SVD)等。

本项目主要聚焦于基于用户的协同过滤。

1.2 基于用户的协同过滤原理

基于用户的协同过滤的工作流程如下:

  1. 构建用户-物品评分矩阵:每行代表一个用户,每列代表一个物品,矩阵中的元素表示用户对物品的评分。
  2. 计算用户相似度:寻找与目标用户具有相似品味的用户群体。
  3. 预测评分:基于相似用户的评分,预测目标用户对未评分物品的可能评分。
  4. 生成推荐:为用户推荐评分最高的未接触物品。

1.3 相似度计算方法

在协同过滤中,常用的相似度计算方法包括:

1.3.1 余弦相似度(Cosine Similarity)

余弦相似度计算两个向量之间夹角的余弦值,范围从-1到1,值越大表示越相似。

对于用户 u u u和用户 v v v,其余弦相似度计算公式为:

s i m c o s ( u , v ) = ∑ i ∈ I u v r u i ⋅ r v i ∑ i ∈ I u r u i 2 ⋅ ∑ i ∈ I v r v i 2 sim_{cos}(u, v) = \frac{\sum_{i \in I_{uv}} r_{ui} \cdot r_{vi}}{\sqrt{\sum_{i \in I_{u}} r_{ui}^2} \cdot \sqrt{\sum_{i \in I_{v}} r_{vi}^2}} simcos(u,v)=iIurui2 iIvrvi2 iIuvruirvi

其中:

  • I u v I_{uv} Iuv是用户 u u u和用户 v v v共同评分的物品集合
  • r u i r_{ui} rui是用户 u u u对物品 i i i的评分
  • r v i r_{vi} rvi是用户 v v v对物品 i i i的评分
  • I u I_u Iu是用户 u u u评分的所有物品集合
  • I v I_v Iv是用户 v v v评分的所有物品集合

在Python中,可以使用sklearn.metrics.pairwise中的cosine_similarity1 - pairwise_distances(X, metric='cosine')计算余弦相似度。

1.3.2 皮尔逊相关系数(Pearson Correlation)

皮尔逊相关系数衡量两个变量之间的线性相关性,范围从-1到1,也称为"皮尔逊积矩相关系数"。

计算公式为:

s i m p e a r s o n ( u , v ) = ∑ i ∈ I u v ( r u i − r ˉ u ) ⋅ ( r v i − r ˉ v ) ∑ i ∈ I u v ( r u i − r ˉ u ) 2 ⋅ ∑ i ∈ I u v ( r v i − r ˉ v ) 2 sim_{pearson}(u, v) = \frac{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u) \cdot (r_{vi} - \bar{r}_v)}{\sqrt{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u)^2} \cdot \sqrt{\sum_{i \in I_{uv}} (r_{vi} - \bar{r}_v)^2}} simpearson(u,v)=iIuv(ruirˉu)2 iIuv(rvirˉv)2 iIuv(ruirˉu)(rvirˉv)

其中:

  • r ˉ u \bar{r}_u rˉu是用户 u u u的平均评分
  • r ˉ v \bar{r}_v rˉv是用户 v v v的平均评分

皮尔逊相关系数考虑了用户评分的偏置(bias),能更好地处理用户评分标准不同的情况(有些用户倾向于给高分,有些用户倾向于给低分)。

在Python中,可以使用numpy.corrcoef()计算皮尔逊相关系数。

1.3.3 欧几里得距离(Euclidean Distance)

欧几里得距离直接计算两个向量在空间中的距离,距离越小表示越相似。

计算公式为:

d i s t a n c e e u c l i d e a n ( u , v ) = ∑ i ∈ I u v ( r u i − r v i ) 2 distance_{euclidean}(u, v) = \sqrt{\sum_{i \in I_{uv}} (r_{ui} - r_{vi})^2} distanceeuclidean(u,v)=iIuv(ruirvi)2

为了将距离转换为相似度,通常使用以下变换:

s i m e u c l i d e a n ( u , v ) = 1 1 + d i s t a n c e e u c l i d e a n ( u , v ) sim_{euclidean}(u, v) = \frac{1}{1 + distance_{euclidean}(u, v)} simeuclidean(u,v)=1+distanceeuclidean(u,v)1

在Python中,可以使用sklearn.metrics.pairwise中的euclidean_distances计算欧几里得距离。

1.3.4 调整余弦相似度(Adjusted Cosine Similarity)

调整余弦相似度在计算前先减去用户的平均评分,解决了用户评分标准不一致的问题:

s i m a d j _ c o s ( u , v ) = ∑ i ∈ I u v ( r u i − r ˉ u ) ⋅ ( r v i − r ˉ v ) ∑ i ∈ I u v ( r u i − r ˉ u ) 2 ⋅ ∑ i ∈ I u v ( r v i − r ˉ v ) 2 sim_{adj\_cos}(u, v) = \frac{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u) \cdot (r_{vi} - \bar{r}_v)}{\sqrt{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u)^2} \cdot \sqrt{\sum_{i \in I_{uv}} (r_{vi} - \bar{r}_v)^2}} simadj_cos(u,v)=iIuv(ruirˉu)2 iIuv(rvirˉv)2 iIuv(ruirˉu)(rvirˉv)

1.4 评分预测方法

在确定用户相似度后,需要预测目标用户对未评分物品的可能评分。传统的基于用户的协同过滤通常采用加权平均的方式进行预测。

1.4.1 简单加权平均

r ^ u i = ∑ v ∈ N u ( i ) s i m ( u , v ) ⋅ r v i ∑ v ∈ N u ( i ) ∣ s i m ( u , v ) ∣ \hat{r}_{ui} = \frac{\sum_{v \in N_u(i)} sim(u, v) \cdot r_{vi}}{\sum_{v \in N_u(i)} |sim(u, v)|} r^ui=vNu(i)sim(u,v)vNu(i)sim(u,v)rvi

其中:

  • r ^ u i \hat{r}_{ui} r^ui是预测的用户 u u u对物品 i i i的评分
  • N u ( i ) N_u(i) Nu(i)是与用户 u u u相似且评价过物品 i i i的用户集合
  • s i m ( u , v ) sim(u, v) sim(u,v)是用户 u u u与用户 v v v的相似度
  • r v i r_{vi} rvi是用户 v v v对物品 i i i的实际评分
1.4.2 考虑用户评分偏置的加权平均

为了解决不同用户评分标准不同的问题,可以使用考虑评分偏置的改进公式:

r ^ u i = r ˉ u + ∑ v ∈ N u ( i ) s i m ( u , v ) ⋅ ( r v i − r ˉ v ) ∑ v ∈ N u ( i ) ∣ s i m ( u , v ) ∣ \hat{r}_{ui} = \bar{r}_u + \frac{\sum_{v \in N_u(i)} sim(u, v) \cdot (r_{vi} - \bar{r}_v)}{\sum_{v \in N_u(i)} |sim(u, v)|} r^ui=rˉu+vNu(i)sim(u,v)vNu(i)sim(u,v)(rvirˉv)

其中:

  • r ˉ u \bar{r}_u rˉu是用户 u u u的平均评分
  • r ˉ v \bar{r}_v rˉv是用户 v v v的平均评分

这种方法不直接使用原始评分,而是使用评分与用户平均评分的偏差,能够更好地处理用户评分偏好不同的情况。

1.5 评估指标

推荐系统的性能评估通常使用以下指标:

1.5.1 均方根误差(RMSE)

RMSE是预测评分与实际评分之间差异的平方平均的平方根,值越小表示预测越准确。

R M S E = 1 ∣ T ∣ ∑ ( u , i ) ∈ T ( r ^ u i − r u i ) 2 RMSE = \sqrt{\frac{1}{|T|} \sum_{(u,i) \in T} (\hat{r}_{ui} - r_{ui})^2} RMSE=T1(u,i)T(r^uirui)2

其中:

  • T T T是测试集中的用户-物品对集合
  • r ^ u i \hat{r}_{ui} r^ui是预测的评分
  • r u i r_{ui} rui是实际评分
1.5.2 平均绝对误差(MAE)

MAE是预测评分与实际评分之间绝对差值的平均,同样值越小表示预测越准确。

M A E = 1 ∣ T ∣ ∑ ( u , i ) ∈ T ∣ r ^ u i − r u i ∣ MAE = \frac{1}{|T|} \sum_{(u,i) \in T} |\hat{r}_{ui} - r_{ui}| MAE=T1(u,i)Tr^uirui

与RMSE相比,MAE对异常值的敏感度较低,因此两个指标通常一起使用,以全面评估推荐系统的性能。

1.6 TopN推荐

在实际应用中,我们通常不仅关注评分预测的准确度,还关注能否为用户推荐最适合的N个物品,称为TopN推荐。为此,可以使用额外的评估指标:

1.6.1 精确率(Precision)和召回率(Recall)
  • 精确率:推荐的物品中实际相关的比例

    P r e c i s i o n @ k = ∣ 推荐列表 ∩ 相关物品 ∣ ∣ 推荐列表 ∣ Precision@k = \frac{|推荐列表 \cap 相关物品|}{|推荐列表|} Precision@k=推荐列表推荐列表相关物品

  • 召回率:实际相关物品中被成功推荐的比例

    R e c a l l @ k = ∣ 推荐列表 ∩ 相关物品 ∣ ∣ 相关物品 ∣ Recall@k = \frac{|推荐列表 \cap 相关物品|}{|相关物品|} Recall@k=相关物品推荐列表相关物品

1.6.2 F1分数

F1分数是精确率和召回率的调和平均数,综合考虑两个指标。

F 1 = 2 ⋅ P r e c i s i o n ⋅ R e c a l l P r e c i s i o n + R e c a l l F1 = 2 \cdot \frac{Precision \cdot Recall}{Precision + Recall} F1=2Precision+RecallPrecisionRecall

2. 项目介绍

协同过滤是推荐系统的核心算法之一,它基于用户行为数据来推荐物品。基于用户的协同过滤(User-Based Collaborative Filtering)算法通过寻找与目标用户相似的用户群体,然后推荐这些相似用户喜欢但目标用户尚未接触的物品。gitcode

本项目基于以下经典论文:

  • Resnick P, Iacovou N, Suchak M, et al. Grouplens: An open architecture for collaborative filtering of netnews[C]//Proceedings of the 1994 ACM conference on Computer supported cooperative work. 1994: 175-186. https://doi.org/10.1145/192844.192905
  • Breese J S, Heckerman D, Kadie C. Empirical analysis of predictive algorithms for collaborative filtering[J]. arXiv preprint arXiv:1301.7363, 2013. https://doi.org/10.48550/arXiv.1301.7363

2.1 数据集介绍

本项目使用经典的MovieLens 100K数据集,该数据集包含:

  • 943个用户
  • 1682部电影
  • 10万条评分数据(1-5分)
  • 用户的人口统计学特征
  • 电影的类型信息

2.2 项目架构设计

为了构建一个可维护、可扩展的推荐系统,本项目采用模块化设计,将核心功能按照职责分散到不同模块中。

2.2.1 模块化架构

项目由以下核心模块组成:

  1. 数据加载模块(data_loader.py):负责数据读取、预处理和矩阵构建
  2. 相似度计算模块(similarity.py):实现多种相似度计算方法
  3. 评分预测模块(prediction.py):实现评分预测算法
  4. 推荐生成模块(recommendation.py):生成个性化推荐列表
  5. 评估模块(evaluation.py):实现多种评估指标计算
  6. 日志模块(logger.py):管理系统日志记录
  7. 主模型模块(model.py):集成各个组件,提供统一接口
  8. 命令行接口(main.py):提供命令行交互功能
  9. Web应用(app.py):提供Web界面

3. 项目实施步骤

3.1 数据获取与探索

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error
from sklearn.metrics.pairwise import pairwise_distances
from scipy.spatial.distance import cosine
import warnings
warnings.filterwarnings('ignore')

# 设置随机种子保证结果可重复
np.random.seed(42)

# 加载数据集
# 从 https://grouplens.org/datasets/movielens/100k/ 下载
# 或使用以下代码自动下载
!wget -nc http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -n ml-100k.zip

# 加载用户评分数据
column_names = ['user_id', 'item_id', 'rating', 'timestamp']
df = pd.read_csv('ml-100k/u.data', sep='\t', names=column_names)

# 加载电影信息
movies_column_names = ['movie_id', 'movie_title', 'release_date', 'video_release_date',
                      'IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation',
                      'Children', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy',
                      'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi',
                      'Thriller', 'War', 'Western']
movies_df = pd.read_csv('ml-100k/u.item', sep='|', names=movies_column_names, encoding='latin-1')

# 查看数据集基本信息
print(f"评分数据集形状: {df.shape}")
print(f"电影数据集形状: {movies_df.shape}")
print("\n评分数据预览:")
print(df.head())
print("\n电影数据预览:")
print(movies_df[['movie_id', 'movie_title', 'release_date']].head())

在这里插入图片描述

3.2 数据加载模块实现

我们将数据加载和处理功能封装在DataLoader类中,提供以下功能:

  1. 从文件加载数据
  2. 构建用户-物品评分矩阵
  3. 划分训练集和测试集
  4. 计算数据集统计信息
class DataLoader:
    """
    负责加载和预处理MovieLens数据集的类
    """
    def __init__(self, data_path="data/ml-100k"):
        """初始化DataLoader并设置数据路径"""
        self.data_path = data_path
        self.ratings_df = None
        self.movies_df = None
        self.n_users = None
        self.n_items = None
        self.ratings_matrix = None
        self.train_data = None
        self.test_data = None
        
    def load_data(self):
        """加载评分数据和电影数据"""
        # 检查数据路径
        if not os.path.exists(self.data_path):
            raise FileNotFoundError(f"数据目录 {self.data_path} 不存在")
        
        # 加载评分数据
        ratings_file = os.path.join(self.data_path, "u.data")
        if not os.path.exists(ratings_file):
            raise FileNotFoundError(f"评分文件 {ratings_file} 不存在")
            
        column_names = ["user_id", "item_id", "rating", "timestamp"]
        self.ratings_df = pd.read_csv(ratings_file, sep="\t", names=column_names)
        
        # 加载电影数据
        movies_file = os.path.join(self.data_path, "u.item")
        if not os.path.exists(movies_file):
            raise FileNotFoundError(f"电影文件 {movies_file} 不存在")
            
        movies_column_names = [
            "movie_id", "movie_title", "release_date", "video_release_date",
            "IMDb_URL", "unknown", "Action", "Adventure", "Animation",
            "Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
            "Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi",
            "Thriller", "War", "Western"
        ]
        self.movies_df = pd.read_csv(
            movies_file, sep="|", names=movies_column_names, encoding="latin-1"
        )
        
        # 获取用户和电影数量
        self.n_users = self.ratings_df["user_id"].max()
        self.n_items = self.ratings_df["item_id"].max()
        
        return self.ratings_df, self.movies_df

3.3 构建评分矩阵和数据集拆分

def create_matrix(self):
    """创建用户-物品评分矩阵"""
    if self.ratings_df is None:
        self.load_data()
        
    # 创建矩阵
    self.ratings_matrix = np.zeros((self.n_users, self.n_items))
    
    # 填充评分
    for row in self.ratings_df.itertuples():
        # 调整为0-based索引
        self.ratings_matrix[row.user_id-1, row.item_id-1] = row.rating
        
    return self.ratings_matrix
    
def split_data(self, test_size=0.2, random_state=42):
    """将数据划分为训练集和测试集"""
    if self.ratings_df is None:
        self.load_data()
        
    from sklearn.model_selection import train_test_split
    
    # 划分数据
    self.train_data, self.test_data = train_test_split(
        self.ratings_df, test_size=test_size, random_state=random_state
    )
    
    # 创建训练集矩阵
    self.train_matrix = np.zeros((self.n_users, self.n_items))
    for row in self.train_data.itertuples():
        self.train_matrix[row.user_id-1, row.item_id-1] = row.rating
        
    # 创建测试集矩阵
    self.test_matrix = np.zeros((self.n_users, self.n_items))
    for row in self.test_data.itertuples():
        self.test_matrix[row.user_id-1, row.item_id-1] = row.rating
        
    return self.train_data, self.test_data, self.train_matrix, self.test_matrix

3.4 日志系统设计

为了跟踪系统运行状态、性能和错误,我们实现了一个灵活的日志模块:

import os
import logging
from logging.handlers import RotatingFileHandler
import time

def setup_logger(log_dir="results/logs", log_level=logging.INFO, silent=False):
    """
    配置并返回推荐系统的日志记录器
    
    参数:
    log_dir: 日志文件存储目录
    log_level: 日志级别
    silent: 是否静默模式(不记录日志)
    
    返回:
    logger: 配置好的日志记录器
    """
    # 创建日志记录器
    logger = logging.getLogger("recommendation_system")
    logger.setLevel(log_level)
    
    # 清除已有的处理器(避免重复日志)
    if logger.handlers:
        logger.handlers = []
        
    if silent:
        # 添加NullHandler以防止"找不到处理器"警告
        logger.addHandler(logging.NullHandler())
    else:
        # 创建日志目录
        os.makedirs(log_dir, exist_ok=True)
        
        # 使用时间戳创建唯一的日志文件名
        timestamp = time.strftime("%Y%m%d-%H%M%S")
        log_file = os.path.join(log_dir, f"recommender_{timestamp}.log")
        
        # 创建文件处理器(每文件最大10MB,保留5个备份)
        file_handler = RotatingFileHandler(
            log_file, maxBytes=10*1024*1024, backupCount=5, encoding="utf-8"
        )
        file_handler.setLevel(log_level)
        
        # 创建控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(log_level)
        
        # 创建格式化器并添加到处理器
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        # 将处理器添加到记录器
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
    return logger

def get_logger():
    """获取推荐系统日志记录器"""
    logger = logging.getLogger("recommendation_system")
    # 如果记录器没有处理器,设置一个默认的静默记录器
    if not logger.handlers:
        return setup_logger(silent=True)
    return logger

3.5 相似度计算模块

def calculate_similarity(ratings_matrix, method="cosine"):
    """
    计算用户相似度矩阵
    
    参数:
    ratings_matrix: 用户-物品评分矩阵
    method: 相似度计算方法('cosine', 'pearson', 'euclidean', 'adjusted_cosine')
    
    返回:
    similarity: 用户相似度矩阵
    """
    if method == "cosine":
        return cosine_similarity(ratings_matrix)
    elif method == "pearson":
        return pearson_similarity(ratings_matrix)
    elif method == "euclidean":
        return euclidean_similarity(ratings_matrix)
    elif method == "adjusted_cosine":
        return adjusted_cosine_similarity(ratings_matrix)
    else:
        raise ValueError(f"未知的相似度计算方法: {method}")

每种相似度计算方法的具体实现:

def cosine_similarity(ratings_matrix):
    """计算余弦相似度"""
    similarity = 1 - pairwise_distances(ratings_matrix, metric="cosine", n_jobs=-1)
    similarity = np.nan_to_num(similarity)  # 处理NaN值
    return similarity

def pearson_similarity(ratings_matrix):
    """计算皮尔逊相关系数"""
    similarity = np.corrcoef(ratings_matrix)
    similarity = np.nan_to_num(similarity)  # 处理NaN值
    return similarity

def euclidean_similarity(ratings_matrix):
    """计算基于欧几里得距离的相似度"""
    distances = pairwise_distances(ratings_matrix, metric="euclidean", n_jobs=-1)
    similarity = 1 / (1 + distances)  # 转换距离为相似度
    return similarity

def adjusted_cosine_similarity(ratings_matrix):
    """计算调整后的余弦相似度"""
    # 获取非零元素以计算平均值
    rated_mask = ratings_matrix != 0
    
    # 计算用户平均评分(仅考虑已评分项目)
    user_means = np.sum(ratings_matrix, axis=1) / np.sum(rated_mask, axis=1)
    
    # 通过减去用户平均值归一化评分(仅对已评分项目)
    normalized_matrix = np.zeros_like(ratings_matrix)
    for i, (user_ratings, mask, mean) in enumerate(zip(ratings_matrix, rated_mask, user_means)):
        normalized_matrix[i, mask] = user_ratings[mask] - mean
        
    # 计算余弦相似度
    similarity = 1 - pairwise_distances(normalized_matrix, metric="cosine", n_jobs=-1)
    
    # 处理NaN值
    similarity = np.nan_to_num(similarity)
    
    return similarity

3.6 评分预测模块

def predict_ratings(ratings_matrix, similarity_matrix, method="bias_weighted", k=10):
    """
    预测用户对物品的评分
    
    参数:
    ratings_matrix: 用户-物品评分矩阵
    similarity_matrix: 用户相似度矩阵
    method: 预测方法('simple_weighted', 'bias_weighted')
    k: 考虑的近邻数量
    
    返回:
    predicted_ratings: 预测评分矩阵
    """
    if method == "simple_weighted":
        return simple_weighted_average(ratings_matrix, similarity_matrix, k)
    elif method == "bias_weighted":
        return bias_weighted_average(ratings_matrix, similarity_matrix, k)
    else:
        raise ValueError(f"未知的预测方法: {method}")

def bias_weighted_average(ratings_matrix, similarity_matrix, k=10):
    """使用考虑用户偏置的加权平均预测评分"""
    n_users, n_items = ratings_matrix.shape
    predicted_ratings = np.zeros((n_users, n_items))
    
    # 计算用户平均评分
    user_rated_mask = ratings_matrix > 0
    user_ratings_count = np.sum(user_rated_mask, axis=1)
    user_ratings_sum = np.sum(ratings_matrix, axis=1)
    
    # 处理除零问题
    user_mean_ratings = np.where(
        user_ratings_count > 0, user_ratings_sum / user_ratings_count, 0
    )
    
    # 为每个用户预测评分
    for u in range(n_users):
        # 找到k个最相似用户(不包括自己)
        user_similarities = similarity_matrix[u]
        user_similarities[u] = -1  # 排除自己
        similar_users = np.argsort(user_similarities)[::-1][:k]
        
        # 用户的平均评分
        u_mean = user_mean_ratings[u]
        
        # 为每个物品预测评分
        for i in range(n_items):
            # 如果用户已经评分,保留原评分
            if ratings_matrix[u, i] > 0:
                predicted_ratings[u, i] = ratings_matrix[u, i]
                continue
                
            # 获取评价过该物品的相似用户
            sim_users_rated = [v for v in similar_users if ratings_matrix[v, i] > 0]
            
            # 如果没有相似用户评价过,使用用户平均评分
            if len(sim_users_rated) == 0:
                predicted_ratings[u, i] = u_mean if u_mean > 0 else np.mean(ratings_matrix[ratings_matrix > 0])
                continue
                
            # 计算考虑偏置的加权平均
            sim_sum = sum(abs(similarity_matrix[u, v]) for v in sim_users_rated)
            if sim_sum == 0:
                predicted_ratings[u, i] = u_mean
                continue
                
            weighted_sum = sum(
                similarity_matrix[u, v] * (ratings_matrix[v, i] - user_mean_ratings[v])
                for v in sim_users_rated
            )
            
            predicted_rating = u_mean + weighted_sum / sim_sum
            
            # 将预测评分限制在有效范围[1,5]
            predicted_ratings[u, i] = max(1, min(5, predicted_rating))
            
    return predicted_ratings

3.7 推荐生成模块

def recommend_items(user_id, ratings_matrix, predicted_ratings, movies_df, top_n=10):
    """
    为特定用户生成电影推荐
    
    参数:
    user_id: 用户ID(从1开始)
    ratings_matrix: 用户-物品评分矩阵
    predicted_ratings: 预测评分矩阵
    movies_df: 电影信息DataFrame
    top_n: 推荐数量
    
    返回:
    recommendations: 包含推荐电影的DataFrame
    """
    # 调整为0-based索引
    user_idx = user_id - 1
    
    # 获取用户评分
    user_ratings = ratings_matrix[user_idx]
    user_predictions = predicted_ratings[user_idx]
    
    # 找出用户未评分的物品
    unrated_items = np.where(user_ratings == 0)[0]
    
    # 如果用户已评分所有物品,返回空DataFrame
    if len(unrated_items) == 0:
        return pd.DataFrame(columns=["movie_id", "movie_title", "predicted_rating"])
        
    # 获取未评分物品的预测评分
    unrated_predictions = user_predictions[unrated_items]
    
    # 按预测评分降序排序
    sorted_indices = np.argsort(-unrated_predictions)
    
    # 获取top_n推荐
    top_item_indices = sorted_indices[:top_n]
    top_items = unrated_items[top_item_indices]
    top_ratings = unrated_predictions[top_item_indices]
    
    # 转换为1-based电影ID
    movie_ids = top_items + 1
    
    # 创建推荐DataFrame
    recommendations = pd.DataFrame({
        "movie_id": movie_ids,
        "predicted_rating": top_ratings
    })
    
    # 合并电影信息
    recommendations = recommendations.merge(
        movies_df[["movie_id", "movie_title"]], on="movie_id"
    )
    
    # 按预测评分降序排序
    recommendations = recommendations.sort_values("predicted_rating", ascending=False)
    
    return recommendations

在这里插入图片描述
在这里插入图片描述

3.8 评估模块

def evaluate_recommendations(test_data, predicted_ratings, ratings_matrix=None, k_values=[5, 10], threshold=3.5):
    """
    对推荐系统性能进行全面评估
    
    参数:
    test_data: 测试集DataFrame
    predicted_ratings: 预测评分矩阵
    ratings_matrix: 原始评分矩阵
    k_values: 评估的k值列表
    threshold: 判定物品相关性的评分阈值
    
    返回:
    results: 包含各项评估指标的字典
    """
    # 评估评分预测
    rating_metrics = evaluate_rating_predictions(test_data, predicted_ratings)
    
    # 初始化结果字典
    results = rating_metrics.copy()
    
    # 评估top-k推荐
    for k in k_values:
        pr_metrics = calculate_precision_recall_at_k(
            test_data, predicted_ratings, ratings_matrix=ratings_matrix, k=k, threshold=threshold
        )
        
        precision_key = f"precision@{k}"
        recall_key = f"recall@{k}"
        
        results[precision_key] = pr_metrics[precision_key]
        results[recall_key] = pr_metrics[recall_key]
        
        # 计算F1分数
        precision = pr_metrics[precision_key]
        recall = pr_metrics[recall_key]
        f1 = calculate_f1_score(precision, recall)
        results[f"f1@{k}"] = f1
        
    return results

3.9 完整推荐系统模型

class UserBasedCF:
    """
    基于用户的协同过滤推荐系统
    
    这个类实现了一个完整的推荐系统,使用基于用户的协同过滤算法。
    它通过识别具有相似评分模式的用户,并推荐这些相似用户喜欢但目标用户尚未体验的物品。
    """
    
    def __init__(self, similarity_method="cosine", prediction_method="bias_weighted", k=30):
        """
        初始化推荐系统模型
        
        参数:
        similarity_method: 计算用户相似度的方法
        prediction_method: 预测评分的方法
        k: 考虑的相似用户数量
        """
        self.logger = get_logger()
        self.logger.info(
            f"初始化UserBasedCF模型,相似度方法={similarity_method},"
            f"预测方法={prediction_method},k={k}"
        )
        
        self.similarity_method = similarity_method
        self.prediction_method = prediction_method
        self.k = k
        
        # 初始化数据属性
        self.ratings_df = None
        self.movies_df = None
        self.train_data = None
        self.test_data = None
        self.ratings_matrix = None
        self.user_similarity = None
        self.predicted_ratings = None
        self.n_users = None
        self.n_items = None
        
        # 跟踪模型是否已训练
        self.is_trained = False
        
    def fit(self, ratings_df, movies_df=None, test_size=0.2, random_state=42):
        """
        训练推荐模型
        
        参数:
        ratings_df: 包含评分数据的DataFrame
        movies_df: 包含电影信息的DataFrame
        test_size: 用于测试的数据比例
        random_state: 随机数种子
        
        返回:
        self: 训练后的模型实例
        """
        from sklearn.model_selection import train_test_split
        
        self.logger.info("开始模型训练")
        train_start = time.time()
        
        # 存储数据
        self.ratings_df = ratings_df
        self.movies_df = movies_df
        
        # 获取维度
        self.n_users = ratings_df["user_id"].max()
        self.n_items = ratings_df["item_id"].max()
        
        self.logger.info(f"使用{self.n_users}个用户和{self.n_items}个物品训练模型")
        
        # 如果test_size>0,划分数据
        if test_size > 0:
            self.logger.info(f"划分数据,test_size={test_size},random_state={random_state}")
            split_start = time.time()
            self.train_data, self.test_data = train_test_split(
                ratings_df, test_size=test_size, random_state=random_state
            )
            split_time = time.time() - split_start
            
            self.logger.info(
                f"数据划分耗时{split_time:.2f}秒: {len(self.train_data)}训练样本,{len(self.test_data)}测试样本"
            )
        else:
            self.train_data = ratings_df
            self.test_data = None
            self.logger.info("使用所有数据进行训练(无测试集)")
            
        # 创建评分矩阵
        self.logger.info("从训练数据创建评分矩阵")
        matrix_start = time.time()
        self.ratings_matrix = np.zeros((self.n_users, self.n_items))
        
        for row in self.train_data.itertuples():
            # 调整为0-based索引
            user_idx = row.user_id - 1
            item_idx = row.item_id - 1
            self.ratings_matrix[user_idx, item_idx] = row.rating
            
        matrix_time = time.time() - matrix_start
        self.logger.info(f"评分矩阵创建耗时{matrix_time:.2f}秒")
        
        # 计算矩阵密度
        n_ratings = np.sum(self.ratings_matrix > 0)
        density = n_ratings / (self.n_users * self.n_items)
        self.logger.info(f"评分矩阵密度: {density:.6f} ({n_ratings}条评分)")
        
        # 计算用户相似度
        self.logger.info(f"使用{self.similarity_method}方法计算用户相似度")
        sim_start = time.time()
        self.user_similarity = calculate_similarity(
            self.ratings_matrix, method=self.similarity_method
        )
        sim_time = time.time() - sim_start
        self.logger.info(f"用户相似度计算耗时{sim_time:.2f}秒")
        
        # 预测评分
        self.logger.info(f"使用{self.prediction_method}方法预测评分,k={self.k}")
        pred_start = time.time()
        self.predicted_ratings = predict_ratings(
            self.ratings_matrix,
            self.user_similarity,
            method=self.prediction_method,
            k=self.k
        )
        pred_time = time.time() - pred_start
        self.logger.info(f"评分预测耗时{pred_time:.2f}秒")
        
        # 标记模型为已训练
        self.is_trained = True
        
        total_time = time.time() - train_start
        self.logger.info(f"模型训练完成,总耗时{total_time:.2f}秒")
        return self

3.10 命令行接口设计

为了方便用户使用推荐系统,我们设计了一个功能丰富的命令行接口:

def parse_args():
    """解析命令行参数"""
    parser = argparse.ArgumentParser(
        description="基于用户的协同过滤推荐系统"
    )
    
    # 数据参数
    parser.add_argument(
        "--data_path",
        type=str,
        default="data/ml-100k",
        help="MovieLens数据集目录路径"
    )
    
    # 模型参数
    parser.add_argument(
        "--similarity",
        type=str,
        default="cosine",
        choices=["cosine", "pearson", "euclidean", "adjusted_cosine"],
        help="使用的相似度度量"
    )
    parser.add_argument(
        "--prediction",
        type=str,
        default="bias_weighted",
        choices=["simple_weighted", "bias_weighted"],
        help="使用的预测方法"
    )
    parser.add_argument(
        "--k",
        type=int,
        default=30,
        help="考虑的相似用户数量"
    )
    
    # 输出目录参数
    parser.add_argument(
        "--output_dir",
        type=str,
        default="results",
        help="保存可视化输出的目录"
    )
    parser.add_argument(
        "--log_dir",
        type=str,
        default="results/logs",
        help="保存日志文件的目录"
    )
    parser.add_argument(
        "--log_level",
        type=str,
        default="INFO",
        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
        help="日志级别"
    )
    
    # 推荐参数
    parser.add_argument(
        "--user_id",
        type=int,
        required=False,
        help="为其生成推荐的用户ID"
    )
    parser.add_argument(
        "--num_recommendations",
        type=int,
        default=10,
        help="生成的推荐数量"
    )
    
    # 评估参数
    parser.add_argument(
        "--test_size",
        type=float,
        default=0.2,
        help="用于测试的数据比例"
    )
    parser.add_argument(
        "--evaluate",
        action="store_true",
        help="评估模型"
    )
    
    # 可视化参数
    parser.add_argument(
        "--visualize",
        action="store_true",
        help="可视化用户相似度和推荐"
    )
    
    # 模型保存/加载
    parser.add_argument(
        "--save_model",
        type=str,
        default=None,
        help="保存训练模型的路径"
    )
    parser.add_argument(
        "--load_model",
        type=str,
        default=None,
        help="加载预训练模型的路径"
    )
    
    # 参数调优
    parser.add_argument(
        "--tune",
        action="store_true",
        help="执行参数调优"
    )
    
    return parser.parse_args()

使用示例:

# 生成推荐
python main.py --user_id 123 --num_recommendations 10 --similarity cosine --k 30

# 评估模型
python main.py --evaluate --test_size 0.2

# 参数调优
python main.py --tune --output_dir results

# 保存和加载模型
python main.py --similarity pearson --k 40 --save_model models/my_model.pkl
python main.py --load_model models/my_model.pkl --user_id 123

在这里插入图片描述

3.11 模型序列化与反序列化

对于训练好的模型,我们提供了保存和加载功能,以便在不同环境中复用:

def save_model(self, filepath):
    """
    将训练好的模型保存到文件
    
    参数:
    filepath: 保存模型的路径
    """
    if not self.is_trained:
        error_msg = "模型未训练。保存前请先调用fit()"
        self.logger.error(error_msg)
        raise ValueError(error_msg)
        
    self.logger.info(f"保存模型到{filepath}")
    
    # 如果目录不存在则创建
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    
    import pickle
    
    model_data = {
        "similarity_method": self.similarity_method,
        "prediction_method": self.prediction_method,
        "k": self.k,
        "ratings_matrix": self.ratings_matrix,
        "user_similarity": self.user_similarity,
        "predicted_ratings": self.predicted_ratings,
        "n_users": self.n_users,
        "n_items": self.n_items,
        "is_trained": self.is_trained
    }
    
    save_start = time.time()
    with open(filepath, "wb") as f:
        pickle.dump(model_data, f)
    save_time = time.time() - save_start
    
    # 计算文件大小
    file_size = os.path.getsize(filepath) / (1024 * 1024)  # MB
    self.logger.info(
        f"模型已保存到{filepath} ({file_size:.2f} MB),耗时{save_time:.2f}秒"
    )
    
@classmethod
def load_model(cls, filepath, movies_df=None):
    """
    从文件加载训练好的模型
    
    参数:
    filepath: 模型文件路径
    movies_df: 电影信息DataFrame
    
    返回:
    model: 加载的模型实例
    """
    logger = get_logger()
    logger.info(f"从{filepath}加载模型")
    
    if not os.path.exists(filepath):
        error_msg = f"模型文件{filepath}不存在"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)
        
    import pickle
    
    load_start = time.time()
    with open(filepath, "rb") as f:
        model_data = pickle.load(f)
    load_time = time.time() - load_start
    
    # 创建新实例
    model = cls(
        similarity_method=model_data["similarity_method"],
        prediction_method=model_data["prediction_method"],
        k=model_data["k"]
    )
    
    # 恢复模型属性
    model.ratings_matrix = model_data["ratings_matrix"]
    model.user_similarity = model_data["user_similarity"]
    model.predicted_ratings = model_data["predicted_ratings"]
    model.n_users = model_data["n_users"]
    model.n_items = model_data["n_items"]
    model.is_trained = model_data["is_trained"]
    model.movies_df = movies_df
    
    # 计算文件大小
    file_size = os.path.getsize(filepath) / (1024 * 1024)  # MB
    logger.info(
        f"从{filepath}加载模型 ({file_size:.2f} MB),耗时{load_time:.2f}秒"
    )
    logger.info(f"模型维度: {model.n_users}用户, {model.n_items}物品")
    
    return model

3.12 Web应用开发

为了提供友好的用户界面,我们使用Flask开发了一个Web应用:

from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_file

app = Flask(__name__)
app.secret_key = os.urandom(24)

# 全局变量
data_loader = None
model = None
MODEL_PATH = "../models/user_based_cf.pkl"
DATA_PATH = "../data/ml-100k"
OUTPUT_DIR = "../results"

# 创建目录
os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, "images"), exist_ok=True)

def initialize_data_and_model():
    """初始化数据加载器和模型"""
    global data_loader, model
    
    logger.info("初始化数据和模型...")
    
    # 初始化数据加载器并加载数据
    if data_loader is None:
        try:
            data_loader = DataLoader(data_path=DATA_PATH)
            ratings_df, movies_df = data_loader.load_data()
            logger.info(f"数据加载完成: {len(ratings_df)}条评分, {len(movies_df)}部电影")
        except Exception as e:
            logger.error(f"加载数据出错: {str(e)}")
            logger.error(traceback.format_exc())
            return False
            
    # 初始化模型(如果未加载)
    if model is None:
        try:
            # 检查是否存在预训练模型
            if os.path.exists(MODEL_PATH):
                logger.info(f"从{MODEL_PATH}加载预训练模型")
                model = UserBasedCF.load_model(MODEL_PATH, movies_df=data_loader.movies_df)
            else:
                logger.info("使用默认参数训练新模型")
                model = UserBasedCF(similarity_method="cosine", prediction_method="bias_weighted", k=30)
                model.fit(data_loader.ratings_df, data_loader.movies_df, test_size=0)
                # 保存模型以供将来使用
                model.save_model(MODEL_PATH)
                logger.info(f"模型训练完成并保存到{MODEL_PATH}")
        except Exception as e:
            logger.error(f"初始化模型出错: {str(e)}")
            logger.error(traceback.format_exc())
            return False
            
    return True

@app.route("/")
def index():
    """渲染首页"""
    # 初始化数据和模型
    if not initialize_data_and_model():
        flash("初始化数据和模型出错,请检查日志", "danger")
        
    # 如果模型已初始化,获取模型参数
    model_params = {}
    if model and model.is_trained:
        model_params = {
            "similarity_method": model.similarity_method,
            "prediction_method": model.prediction_method,
            "k": model.k,
            "n_users": model.n_users,
            "n_items": model.n_items
        }
        
    # 获取可用的相似度和预测方法
    similarity_methods = ["cosine", "pearson", "euclidean", "adjusted_cosine"]
    prediction_methods = ["simple_weighted", "bias_weighted"]
    
    # 获取数据集统计信息
    dataset_stats = {}
    if data_loader:
        dataset_stats = data_loader.get_dataset_stats()
        
    return render_template(
        "index.html",
        model_params=model_params,
        similarity_methods=similarity_methods,
        prediction_methods=prediction_methods,
        dataset_stats=dataset_stats
    )

@app.route("/recommend", methods=["GET", "POST"])
def recommend():
    """基于表单输入或URL参数生成推荐"""
    # 初始化数据和模型
    if not initialize_data_and_model():
        flash("初始化数据和模型出错,请检查日志", "danger")
        return redirect(url_for("index"))
        
    # 提取当前模型参数作为默认值
    current_similarity = model.similarity_method or "cosine"
    current_prediction = model.prediction_method or "bias_weighted"
    current_k = model.k or 30
    
    # 根据请求方法提取参数
    if request.method == "POST":
        # 获取表单数据
        user_id = int(request.form.get("user_id", 1))
        num_recommendations = int(request.form.get("num_recommendations", 10))
        similarity_method = request.form.get("similarity_method", current_similarity)
        prediction_method = request.form.get("prediction_method", current_prediction)
        k = int(request.form.get("k", current_k))
    else:  # GET请求
        # 获取URL参数
        user_id = int(float(request.args.get("user_id", "1")))
        # 验证user_id在有效范围内
        if user_id < 1 or user_id > model.n_users:
            flash(f"无效的用户ID: {user_id}。必须在1到{model.n_users}之间", "danger")
            return redirect(url_for("index"))
        num_recommendations = int(request.args.get("num_recommendations", 10))
        similarity_method = request.args.get("similarity_method", current_similarity)
        prediction_method = request.args.get("prediction_method", current_prediction)
        k = int(request.args.get("k", current_k))
        
    # 确保参数不为None
    similarity_method = similarity_method or "cosine"
    prediction_method = prediction_method or "bias_weighted"
    k = k or 30
    
    logger.info(f"推荐参数: user_id={user_id}, similarity={similarity_method}, prediction={prediction_method}, k={k}")
    
    # 检查是否需要重新训练模型
    retrain = model.similarity_method != similarity_method or model.prediction_method != prediction_method or model.k != k
    
    # 如需要,重新训练
    if retrain:
        try:
            logger.info(f"使用参数重新训练模型: similarity={similarity_method}, prediction={prediction_method}, k={k}")
            model = UserBasedCF(
                similarity_method=similarity_method,
                prediction_method=prediction_method,
                k=k
            )
            model.fit(data_loader.ratings_df, data_loader.movies_df, test_size=0)
            # 保存重新训练的模型
            model.save_model(MODEL_PATH)
            logger.info("模型重新训练完成并保存")
        except Exception as e:
            logger.error(f"重新训练模型出错: {str(e)}")
            logger.error(traceback.format_exc())
            flash(f"重新训练模型出错: {str(e)}", "danger")
            return redirect(url_for("index"))
            
    # 生成推荐
    try:
        start_time = time.time()
        recommendations = model.recommend(user_id, top_n=num_recommendations)
        generation_time = time.time() - start_time
        
        # 可视化推荐
        img_path = None
        if not recommendations.empty:
            img_path = visualize_recommendations(user_id, recommendations)
            
        # 获取用户当前评分(如果有)
        user_ratings = None
        if data_loader:
            try:
                user_ratings = data_loader.get_user_ratings(user_id)
                # 按评分降序排序
                user_ratings = user_ratings.sort_values("rating", ascending=False)
            except:
                pass
                
        # 获取相似用户
        similar_users = None
        try:
            similar_users = model.get_similar_users(user_id, top_n=5)
        except:
            pass
            
        return render_template(
            "recommendations.html",
            user_id=user_id,
            recommendations=recommendations,
            generation_time=generation_time,
            img_path=img_path,
            user_ratings=user_ratings,
            similar_users=similar_users,
            model_params={
                "similarity_method": model.similarity_method,
                "prediction_method": model.prediction_method,
                "k": model.k
            }
        )
    except Exception as e:
        logger.error(f"生成推荐出错: {str(e)}")
        logger.error(traceback.format_exc())
        flash(f"生成推荐出错: {str(e)}", "danger")
        return redirect(url_for("index"))

在这里插入图片描述
在这里插入图片描述

3.13 性能优化与错误处理

为了提高系统性能并增强鲁棒性,我们实现了以下优化与错误处理机制:

# 矩阵运算优化
# 使用向量化操作代替循环
def calculate_similarity_optimized(ratings_matrix, method="cosine"):
    """优化的相似度计算函数"""
    # 利用多核加速计算
    return 1 - pairwise_distances(ratings_matrix, metric=method, n_jobs=-1)

# 大规模数据处理:使用稀疏矩阵表示
def create_sparse_matrix(self):
    """创建稀疏评分矩阵以节省内存"""
    from scipy.sparse import csr_matrix
    
    rows, cols, data = [], [], []
    for row in self.ratings_df.itertuples():
        rows.append(row.user_id - 1)
        cols.append(row.item_id - 1)
        data.append(row.rating)
        
    self.sparse_ratings_matrix = csr_matrix(
        (data, (rows, cols)), shape=(self.n_users, self.n_items)
    )
    return self.sparse_ratings_matrix

# 异常处理与数据验证
def recommend(self, user_id, top_n=10):
    """带异常处理的推荐函数"""
    # 验证模型状态
    if not self.is_trained:
        raise ValueError("模型未训练,请先调用fit()")
        
    # 验证用户ID
    if user_id < 1 or user_id > self.n_users:
        raise ValueError(f"无效的用户ID: {user_id}。必须在1到{self.n_users}之间")
        
    # 验证参数
    if top_n < 1:
        raise ValueError(f"无效的推荐数量: {top_n}。必须大于0")
        
    # 验证必要的数据是否可用
    if self.movies_df is None:
        raise ValueError("缺少电影信息。请在fit()中提供movies_df")
        
    # 生成推荐
    try:
        return recommend_items(user_id, self.ratings_matrix, self.predicted_ratings, self.movies_df, top_n)
    except Exception as e:
        self.logger.error(f"为用户{user_id}生成推荐时出错: {str(e)}")
        # 重新抛出异常,附加上下文信息
        raise RuntimeError(f"推荐生成失败: {str(e)}")

5. 模块化设计的优势

采用模块化设计使得我们的推荐系统具有以下优势:

  1. 代码可维护性:每个模块专注于单一功能,便于理解和修改
  2. 可扩展性:可以轻松添加新的相似度计算方法或评分预测算法
  3. 可重用性:各个组件可以在其他项目中重用
  4. 测试便利性:可以独立测试每个模块的功能
  5. 团队协作:不同团队成员可以同时开发不同模块

6. 结论与未来工作

本项目成功实现了一个基于用户的协同过滤推荐系统,并在MovieLens 100K数据集上进行了评估。系统具有以下特点:

  1. 模块化设计:清晰的代码结构,便于维护和扩展
  2. 多种算法支持:实现了多种相似度计算和评分预测方法
  3. 完整的评估体系:使用RMSE、MAE、精确率、召回率等指标全面评估性能
  4. 用户友好界面:提供命令行和Web两种交互方式
  5. 日志和错误处理:完善的日志系统和健壮的错误处理机制

7. 参考资料

  1. Resnick, P., Iacovou, N., Suchak, M., Bergstrom, P., & Riedl, J. (1994). GroupLens: An Open Architecture for Collaborative Filtering of Netnews. Proceedings of the 1994 ACM Conference on Computer Supported Cooperative Work.
  2. Breese, J. S., Heckerman, D., & Kadie, C. (1998). Empirical Analysis of Predictive Algorithms for Collaborative Filtering. Proceedings of the 14th Conference on Uncertainty in Artificial Intelligence.
  3. Sarwar, B., Karypis, G., Konstan, J., & Riedl, J. (2001). Item-based collaborative filtering recommendation algorithms. Proceedings of the 10th International Conference on World Wide Web.
  4. Harper, F. M., & Konstan, J. A. (2015). The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS).
  5. Koren, Y., Bell, R., & Volinsky, C. (2009). Matrix factorization techniques for recommender systems. Computer, 42(8), 30-37.
  6. Ning, X., Desrosiers, C., & Karypis, G. (2015). A comprehensive survey of neighborhood-based recommendation methods. Recommender systems handbook, 37-76.

网站公告

今日签到

点亮在社区的每一天
去签到