✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。
我是Srlua小谢,在这里我会分享我的知识和经验。🎥
希望在这里,我们能一起探索IT世界的奥妙,提升我们的技能。🔮
记得先点赞👍后阅读哦~ 👏👏
📘📚 所属专栏:传知代码论文复现
欢迎访问我的主页:Srlua小谢 获取更多信息和资源。✨✨🌙🌙
目录
LightFR: Lightweight Federated Recommendation with Privacy-preserving Matrix Factorization
本文所有资源均可在该地址处获取。
LightFR: Lightweight Federated Recommendation with Privacy-preserving Matrix Factorization
TOIS-2023 CCF-A
论文介绍
本文的主要内容是介绍了一种名为LightFR的轻量级联邦推荐系统,该系统采用隐私保护的矩阵分解技术。文章首先回顾了矩阵分解、学习哈希和联邦推荐系统等相关领域的研究,然后详细介绍了LightFR的框架和算法,并从存储/通信效率、推荐效率和隐私保护等多个方面论证了其优越性。接下来,文章通过实验验证了LightFR的有效性和效率,并讨论了其对用户隐私的保护能力。
主要创新点
- 提出了一种轻量级、隐私保护的联邦推荐系统(FRS),名为LightFR,以解决现有工作中的效率和隐私问题。
- 通过学习哈希技术获得用户和项目的二进制表示,以有效解决联邦设置中的离散优化问题。
- 设计了一种高效的联邦离散算法,以在服务器和客户端之间嵌入用户偏好,同时减少资源消耗和保护隐私。
- 解释了为什么现有的方法没有考虑效率和隐私问题,以及它们在实际FRS中的主要挑战。
论文解决主要问题
现有的方法存在的问题:
- 现有的联邦推荐系统(FRS)在资源效率和隐私保护方面存在挑战。
- 高成本的资源消耗和通信开销限制了现有方法在大规模推荐场景中的应用。
- 提高用户隐私保护的能力对于现实世界的FRS至关重要,但现有方法在这方面的考虑不足。
本论文试图解决以下问题:
- 开发一种轻量级且隐私保护的联邦推荐系统(FRS),以降低资源成本并提高隐私保护能力。
- 通过学习哈希技术获得用户和项目的二进制表示,从而有效解决效率和隐私问题。
- 设计一种在
服务器和客户端之间高效的联邦离散算法
,以嵌入用户偏好到离散汉明空间,同时降低服务器和客户端的资源利用,保护用户隐私。
论文提出了一种名为LightFR的轻量级联邦推荐方法,它通过矩阵分解和隐私保护的学习哈希技术实现轻量级、高效且安全的推荐。
模型的实验是如何设计的?
实验设计包括以下几个方面:
- 数据集:使用四个不同规模的公开数据集(MovieLens-1M、Filmtrust、Douban-Movie 和 Ciao)进行实验分析,以全面反映模型性能。
- 评估指标:使用两个常用的评估指标,即 Hit Ratio(HR)和 Normalized Discounted Cumulative Gain(NDCG),以评估模型性能和验证其有效性。
- 实验方法:对比中心化经典矩阵分解模型(如PMF、SVD++和DDL)和联邦矩阵分解基线(如FederatedMF)与提出的 LightFR 模型。分析不同超参数(如二进制编码长度 f、权衡参数 λ 和选定客户比例 p)对性能的影响。
本文最核心的创新点:
本文的最核心创新点包括以下几点:
- 提出了一种轻量级的联邦推荐系统(LightFR),它通过学习哈希技术获得用户和项目的二进制表示,从而有效解决资源效率和隐私保护问题。
- 设计了一种高效的联邦离散算法,用于在服务器和客户端之间进行训练离散参数,实现了在保护隐私的方式下提高资源效率。
- 从理论角度证明了LightFR在存储/通信效率、推理效率和隐私保护方面的优越性。
- 通过大量实验证明,LightFR在推荐准确性、资源节约和数据隐私方面明显优于现有的联邦推荐方法。
代码解读
Client.py
import numpy as np
from Metrics import Metrics
class Client:
def __init__(self, configs):
self.bu = None #客户端的哈希表示
self.D = None # 与项目相关的全局参数
self.data_u = None #特定客户端的用户数据
self.data_bin_u = None #特定客户端的用户数据的二进制表示
self.data_len_u = None #特定客户端的用户数据的长度
self.configs = configs
def client_update(self, client, master_flag):
'''
client process, could be implemented in parallel
:param master_flag:
:param bu:
:param D:
:param data_u:
:param data_bin_u:
:param l:
:return:
'''
while True:
flag = 0
for k in range(self.configs.code_len):
dk = client.D[:, k]
buk_hat = np.sum(
( client.data_u - np.dot(client.D, client.bu.T)) * dk * client.data_bin_u) + 2 * self.configs.lambdaa * client.data_len_u * client.bu[k]
buk_new = np.sign(self.K(buk_hat, client.bu[k]))
if (client.bu[k] != buk_new):
flag = 1
client.bu[k] = buk_new
if (flag == 0):
break
master_flag = 1
return client.bu, master_flag
def get_inter_params(self, i, k):
di = self.D[i, :]
grads = (self.data_u[i] - np.dot(self.bu, di.T)) * self.bu[k] * self.data_bin_u[i]
grads_len = self.data_bin_u[i]
return grads, grads_len
def K(self, x, y):
return x if x != 0 else y
def calculate_loss(self):
local_loss = np.sum((self.data_u - np.dot(self.D, self.bu)) ** 2 * self.data_bin_u)
return local_loss
def evaluate_local(self, items, val_data):
configs = {'top_k': 10, 'num_negative_test': 49, }
metric = Metrics(configs)
bus = self.bu
dis = self.D[items]
rating_pred = np.multiply(bus, dis)
preds = np.sum(rating_pred, axis=1)
val_data['pred'] = preds.tolist()
hr = metric.get_hit_ratio(val_data)
ndcg = metric.get_ndcg(val_data)
return hr, ndcg
Client.py
定义解读
属性:
self.bu
: 客户端的哈希表示,代表用户的特征。self.D
:与项目相关的全局参数,可能是由服务端维护并与客户端共享的项目特征。self.data_u
: 特定客户端的用户数据,如用户的评分或交互数据。self.data_bin_u
: 用户数据的二进制表示,用于某些类型的计算。self.data_len_u
: 用户数据的长度,可能用于计算或正则化。self.configs
:客户端的配置设置,如哈希码的长度、正则化参数等。
方法client_update
:
- 这是客户端更新其哈希表示的主要方法。它通过迭代过程调整用户的哈希码,以更好地反映用户数据。
- 使用的主要计算包括评分预测的误差计算和正则化。
- 更新条件基于哈希码的每一位是否发生变化。
get_inter_params
:
- 用于计算与特定项目和用户哈希码位相关的梯度信息。
- 这个信息可能用于服务端的聚合过程。
K
- 一个辅助函数,用于处理特殊情况,比如当输入为0时返回另一个值。
calculate_loss
:
- 计算客户端的本地损失,基于用户的实际评分和通过哈希码和项目特征预测的评分之间的差异。
evaluate_local
:
- 在本地数据上评估模型的性能,如计算命中率(HR)和归一化累积增益(NDCG)。这对于评估客户端模型的推荐质量很重要。
总结:
这个 Client 类体现了联邦学习在推荐系统中的应用,其中每个客户端独立地更新其模型(哈希表示),并可以在本地进行模型性能评估。整个过程旨在优化用户的哈希表示,使其能够更好地捕捉用户的偏好和行为模式,同时在联邦学习的框架下保持用户数据的隐私性。
Base.py
# -*- coding: utf-8 -*-
import numpy as np
import scipy.linalg as la
from collections import defaultdict
from math import log
import pandas as pd
import torch
from DataLoader import DataLoaderCenter
from Metrics import Metrics
class Base:
def __init__(self):
self.user = {}
self.item = {}
self.id2user = {}
self.id2item = {}
self.u_i_r = defaultdict(dict)
self.i_u_r = defaultdict(dict)
self.minVal = 0.5
self.maxVal = 4
self.dataset_name = 'filmtrust'
self.federated_train_data_path = 'data/' + self.dataset_name + '/' + self.dataset_name + '_train.csv'
self.federated_valid_data_path = 'data/' + self.dataset_name + '/' + self.dataset_name + '_val.csv'
self.federated_test_data_path = 'data/' + self.dataset_name + '/' + self.dataset_name + '_test.csv'
pass
def init_model(self):
self.generate_vocabulary()
self.rating_matrix, self.rating_matrix_bin, self.globalmean = self.get_rating_matrix()
self.B = np.sign(np.array(np.random.randn(len(self.user), self.configs.code_len) / (self.configs.code_len ** 0.5)))
self.D = np.sign(np.array(np.random.randn(len(self.item), self.configs.code_len) / (self.configs.code_len ** 0.5)))
self.loss, self.last_delta_loss = 0.0, 0.0
def trainSet(self):
with open(self.federated_train_data_path, 'r') as f:
for index, line in enumerate(f):
if index != 0: # 去除headers
u, i, r = line.strip('\r\n').split(',')
r = 2 * self.configs.code_len * (float(r)) - self.configs.code_len
yield (int(u), int(i), float(r))
def containUser(self, user_id):
if user_id in self.user:
return True
else:
return False
def containItem(self, item_id):
if item_id in self.item:
return True
else:
return False
def valid_test_Set(self, path):
with open(path, 'r') as f:
for index, line in enumerate(f):
if index != 0: # 去除headers
u, i, r = line.strip('\r\n').split(',')
# r = 2 * self.code_len * (float(int(r) - self.minVal) / (self.maxVal - self.minVal) + 0.01) - self.code_len
yield (int(u), int(i), float(r))
def read_federated_valid_dataset(self, path):
data_val = pd.read_csv(path)
return data_val
def generate_vocabulary(self):
for index, line in enumerate(self.trainSet()):
user_id, item_id, rating = line
self.u_i_r[user_id][item_id] = rating
self.i_u_r[item_id][user_id] = rating
if user_id not in self.user:
self.user[user_id] = len(self.user)
self.id2user[self.user[user_id]] = user_id
if item_id not in self.item:
self.item[item_id] = len(self.item)
self.id2item[self.item[item_id]] = item_id
for index, line in enumerate(self.valid_test_Set(self.federated_valid_data_path)):
user_id, item_id, rating = line
self.u_i_r[user_id][item_id] = rating
self.i_u_r[item_id][user_id] = rating
if user_id not in self.user:
self.user[user_id] = len(self.user)
self.id2user[self.user[user_id]] = user_id
if item_id not in self.item:
self.item[item_id] = len(self.item)
self.id2item[self.item[item_id]] = item_id
for index, line in enumerate(self.valid_test_Set(self.federated_test_data_path)):
user_id, item_id, rating = line
self.u_i_r[user_id][item_id] = rating
self.i_u_r[item_id][user_id] = rating
if user_id not in self.user:
self.user[user_id] = len(self.user)
self.id2user[self.user[user_id]] = user_id
if item_id not in self.item:
self.item[item_id] = len(self.item)
self.id2item[self.item[item_id]] = item_id
def get_rating_matrix(self):
rating_matrix = np.zeros((len(self.user), len(self.item))) # (943, 1596)
globalmean = 0.0
lens = 0
for index, line in enumerate(self.trainSet()):
lens += 1
user_id, item_id, rating = line
globalmean += rating
rating_matrix[self.user[user_id]][self.item[item_id]] = int(rating)
rating_matrix_bin = (rating_matrix > 0).astype('int')
globalmean = globalmean / (lens)
return rating_matrix, rating_matrix_bin, globalmean
def K(self, x, y):
return x if x != 0 else y
def valid_test_model(self, path):
pre_true_dict = defaultdict(list)
for index, line in enumerate(self.valid_test_Set(path)):
user_id, item_id, rating = line
if (self.containUser(user_id) and self.containItem(item_id)):
bu = self.B[self.user[user_id], :]
di = self.D[self.item[item_id], :]
pre = np.dot(bu, di)
elif (self.containUser(user_id) and not self.containItem(item_id)):
pre = sum(self.u_i_r[user_id].values()) / float(len(self.u_i_r[user_id]))
elif (not self.containUser(user_id) and self.containItem(item_id)):
pre = sum(self.i_u_r[item_id].values()) / float(len(self.i_u_r[item_id]))
else:
pre = self.globalmean
pre_true_dict[user_id].append([pre, rating])
metrics = Metrics()
ndcg_10 = metrics.calDCG_k(pre_true_dict, 10)
return ndcg_10
Base.py
定义解读
这段代码定义了一个名为 Base
的类,它是一个推荐系统的基础架构。这个类包括初始化、构建词汇表、生成评分矩阵以及评估模型的方法。以下是对代码中各个部分的详细解读:类初始化 (__init__)
初始化中定义了多个字典和路径变量,用于存储用户和项目的信息以及训练、验证和测试数据的路径。生成词汇表 (generate_vocabulary)
从训练集、验证集和测试集中提取用户-项目评分信息,建立两个双向映射:用户ID与内部索引的映射 (self.user
和 self.id2user
),以及项目ID与内部索引的映射 (self.item
和 self.id2item
)。
构建用户到项目 (self.u_i_r
) 和项目到用户 (self.i_u_r
) 的评分字典。
生成评分矩阵 (get_rating_matrix)
创建一个用户-项目评分矩阵 (rating_matrix),其中每个元素代表用户对项目的评分。
生成一个二值评分矩阵 (rating_matrix_bin),表示用户是否对项目进行了评分。
计算全局平均评分 (globalmean)。
模型初始化 (init_model)
调用 generate_vocabulary
方法并生成评分矩阵。
初始化用户和项目的隐特征矩阵 (self.B 和 self.D),这些矩阵用随机值填充并通过符号函数处理。
训练集和验证/测试集的处理 (trainSet, valid_test_Set)
这些方法从指定路径读取训练集和验证/测试集数据。用户和项目存在性检查 (containUser, containItem)
检查特定的用户ID或项目ID是否存在于已定义的用户或项目字典中。评估模型 (valid_test_model)
使用验证或测试集数据评估模型的性能。
计算每个用户的预测评分和实际评分,然后使用这些数据计算归一化累积增益(NDCG)。总结
Base
类提供了一个推荐系统基本框架,包括数据处理、模型初始化和评估。这个类能够处理用户和项目的交互数据,生成评分矩阵,并对推荐模型的性能进行评估。通过这种方式,它为构建更复杂的推荐系统模型提供了基础。
Configs.py
class Configs:
def __init__(self):
self.code_len = 64
self.threshold = 1e-4
self.global_rounds = 50
self.client_ratio = 0.6
self.lambdaa = 0.6
Configs
类的定义Configs
类的属性self.code_len:
表示哈希码的长度。在这个上下文中,它可能指的是用户或项目的哈希表示中使用的二进制位的数量。
在这个例子中,哈希码长度设置为 64,这意味着每个用户或项目将被表示为一个包含 64 位的向量。self.threshold:
这是一个阈值参数,可能用于确定训练过程中的收敛标准或用于其他类型的判断。
在这里,阈值设置为 1e-4(即 0.0001),这可能表示当模型在连续迭代中的改进低于这个值时,训练可以停止。self.global_rounds:
指定全局训练轮数。在联邦学习的背景下,这可能指的是所有客户端参与模型更新的总轮数。
这里设置为 50,意味着整个训练过程将进行 50 轮迭代。self.client_ratio:
这个参数可能用于确定在每轮训练中参与的客户端比例。
0.6 表示每轮有 60% 的客户端将被随机选中参与模型的更新。self.lambdaa:
这是正则化参数,通常用于控制模型复杂性,以避免过拟合。
设置为 0.6,这个参数在计算损失函数或进行参数更新时可能被用作正则化项的权重。
总结:Configs
类作为一个配置存储器,提供了一种便捷的方式来管理和调整模型训练过程中使用的多个参数。通过调整这些参数,可以控制模型的训练行为,如迭代次数、客户端参与度以及正则化程度等,这对于优化模型的性能和效率至关重要。
DataLoader类:
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader as TDataLoader
class DataLoader():
def __init__(self, configs, client_data):
self.configs = configs
self.train_data, self.val_data, self.test_data = client_data['train'], client_data['val'], client_data[
'test']
def get_train_dataloader(self):
users, items, labels = torch.LongTensor(np.array(self.train_data['user_id'])), torch.LongTensor(
np.array(self.train_data['item_id'])), torch.FloatTensor(np.array(self.train_data['ratings']))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
return TDataLoader(dataset, batch_size=self.configs['local_batch_size'], shuffle=True)
def get_val_dataloader(self):
if self.val_data.empty:
users, items, labels = torch.LongTensor(self.val_data['user_id']), torch.LongTensor(
self.val_data['item_id']), torch.FloatTensor(self.val_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.val_data['user_id'])), torch.LongTensor(
np.array(self.val_data['item_id'])), torch.FloatTensor(np.array(self.val_data['ratings']))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
client_data_len = len(items) # 100 for implicit feedback, actual length for explicit feedback during validation in each local client
return TDataLoader(dataset, batch_size=client_data_len, shuffle=False)
def get_test_dataloader(self):
if self.test_data.empty:
users, items, labels = torch.LongTensor(self.test_data['user_id']), torch.LongTensor(
self.test_data['item_id']), torch.FloatTensor(self.test_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.test_data['user_id'])), torch.LongTensor(
np.array(self.test_data['item_id'])), torch.FloatTensor(np.array(self.test_data['ratings']))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
client_data_len = len(items)
return TDataLoader(dataset, batch_size=client_data_len, shuffle=False)
class DataLoaderCenter():
def __init__(self, configs, val_data):
self.configs = configs
self.val_data= val_data
def get_train_dataloader(self):
users, items, labels = torch.LongTensor(np.array(self.train_data['user_id'], dtype='int32')), torch.LongTensor(
np.array(self.train_data['item_id'], dtype='int32')), torch.FloatTensor(
np.array(self.train_data['ratings'], dtype='float32'))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
return TDataLoader(dataset, batch_size=self.configs['local_batch_size'], shuffle=True)
def get_val_dataloader(self):
if self.val_data.empty:
users, items, labels = torch.LongTensor(self.val_data['user_id']), torch.LongTensor(
self.val_data['item_id']), torch.FloatTensor(self.val_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.val_data['user_id'], dtype='int32')), torch.LongTensor(
np.array(self.val_data['item_id'], dtype='int32')), torch.FloatTensor(np.array(self.val_data['ratings'], dtype='float32'))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
data_len = self.configs['num_negative_test'] + 1
return TDataLoader(dataset, batch_size=data_len, shuffle=False)
def get_test_dataloader(self):
if self.test_data.empty:
users, items, labels = torch.LongTensor(self.test_data['user_id']), torch.LongTensor(
self.test_data['item_id']), torch.FloatTensor(self.test_data['ratings'])
else:
users, items, labels = torch.LongTensor(np.array(self.test_data['user_id'], dtype='int32')), torch.LongTensor(
np.array(self.test_data['item_id'], dtype='int32')), torch.FloatTensor(np.array(self.test_data['ratings'], dtype='float32'))
dataset = UserItemRatingDataset(user_tensor=users, item_tensor=items, target_tensor=labels)
data_len = self.configs['num_negative_test'] + 1
return TDataLoader(dataset, batch_size=data_len, shuffle=False)
class UserItemRatingDataset(Dataset):
"""Wrapper, convert <user, item, rating> Tensor into Pytorch Dataset"""
def __init__(self, user_tensor, item_tensor, target_tensor):
"""
args:
target_tensor: torch.Tensor, the corresponding rating for <user, item> pair
"""
self.user_tensor = user_tensor
self.item_tensor = item_tensor
self.target_tensor = target_tensor
def __getitem__(self, index):
return self.user_tensor[index], self.item_tensor[index], self.target_tensor[index]
def __len__(self):
return self.user_tensor.size(0)
if __name__ == '__main__':
configs = {
'dataset': 'ml-1m',
'data_type': 'explicit',
'num_negative_train': 4,
'num_negative_test': 49,
'local_batch_size': 100,
'cold_nums': 10
}
dr = DataReader(configs)
# client_data = dr.get_data_by_client(0)
data = dr.get_train_val_test_data()
dl_center = DataLoaderCenter(configs, data)
td = dl_center.get_train_dataloader()
vd = dl_center.get_val_dataloader()
for index, data in enumerate(vd):
if index == 0:
print(data)
这段代码定义了一个数据加载的类 DataLoader
和 DataLoaderCenter
,以及一个PyTorch Dataset
子类 UserItemRatingDataset
。这些类用于将推荐系统中的用户、项目和评分数据转换为适用于机器学习模型训练和评估的格式。以下是对代码中各个部分的详细解读:DataLoader
和 DataLoaderCenter
类
这两个类的作用是从提供的数据中创建可用于训练、验证和测试的数据加载器 (DataLoader)。
1.初始化
- 接收配置参数 (configs) 和客户端数据 (client_data)。
- 分别处理训练集、验证集和测试集的数据。
2.获取数据加载器:get_train_dataloader, get_val_dataloader, get_test_dataloader
方法分别用于创建训练集、验证集和测试集的DataLoader
。
这些方法首先将数据转换为适用于PyTorch
的张量格式,然后创建一个 UserItemRatingDataset
实例,最后使用 TDataLoader
返回一个数据加载器。
3.UserItemRatingDataset类
总结:这段代码主要用于数据的预处理和加载,为推荐系统的机器学习模型和训练评估提供了必要的数据输入。通过将原始的数据转换为Pytorch模型的格式,这些类和方法使得模型训练过程更加高效和灵活。
main.py
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import torch
from Base import Base
from Client import Client
from Configs import Configs
from DataLoader import DataLoaderCenter
class LightFR(Base):
def __init__(self):
super(LightFR, self).__init__()
self.configs = Configs()
pass
def get_random_client_list(self):
size = int(len(self.user) * self.configs.client_ratio)
random_client_list = np.random.choice(list(self.user.values()), size)
return random_client_list
def get_client_data(self, client_id):
client = Client(self.configs)
client.bu = self.B[client_id, :]
client.D = self.D
client.data_u = self.rating_matrix[client_id, :]
client.data_bin_u = self.rating_matrix_bin[client_id, :]
client.data_len_u = len(self.u_i_r[self.id2user[client_id]])
return client
def train_model(self):
current_round = 0
last_loss = 0.0
while (current_round < self.configs.global_rounds-40):
master_flag = 0
current_round += 1
sampled_clients = self.get_random_client_list()
#runing on clients, could be implemented in parallel
for u in sampled_clients:
client = self.get_client_data(u)
bu, master_flag = client.client_update(client, master_flag)
# running on the server
for i in range(len(self.item)):
while True:
flag = 0
di = self.D[i, :]
for k in range(self.configs.code_len):
# The following can be uploaded by the client side, and we upload the intermediate gradients, i.e., grads_a and grads_b, instead of the raw rating or the user codes. We can use the client-style computation as descriped in the paper, such as B[u,k], rating_matrix[u,i] and rating_matrix_bin[u,i], but it runs slowly.
# For efficient training, we use the batch-style computation to calculate the gradients.
# The intermediate gradients can be divided into multiple clients, that is loss_total=(self.rating_matrix[:, i] - np.dot(self.B, di.T)) can be reformulated into loss_user=(self.rating_matrix[u, i] - np.dot(self.B[u,:], di.T)), so the loss_total can be regarded as the aggregation from the multiple local loss_user.
bk = self.B[sampled_clients, k]
grads_a = (self.rating_matrix[sampled_clients, i] - np.dot(self.B[sampled_clients], di.T)) * bk * self.rating_matrix_bin[sampled_clients, i]
grads_b = len(self.rating_matrix_bin[sampled_clients, i])
# the following performs the simulated aggregation process
dik_hat = np.sum(grads_a) + grads_b * di[k]
dik_new = np.sign(self.K(dik_hat, di[k]))
if (di[k] != dik_new):
flag = 1
di[k] = dik_new
if (flag == 0):
break
self.D[i, :] = di
master_flag = 1
# calculating the loss for all the clients and upload its loss into the server and then aggregate them
self.loss = 0.0
for u in range(len(self.user)):
client = self.get_client_data(u)
local_loss = client.calculate_loss()
self.loss += local_loss
federated_valid_hr_10, federated_valid_ndcg_10 = self.federated_valid_test_model(
self.federated_valid_data_path)
delta_loss = self.loss - last_loss
print('current_round %d: current_loss = %.5f, delta_loss = %.5f valid_HR@10=%.5f valid_NDCG@10=%.5f' %
(current_round, self.loss, delta_loss, federated_valid_hr_10, federated_valid_ndcg_10))
if (master_flag == 0):
break
if (abs(delta_loss) < self.configs.threshold or abs(delta_loss) == abs(self.last_delta_loss)):
break
self.last_delta_loss = delta_loss
last_loss = self.loss
federated_valid_hr_10, federated_valid_ndcg_10 = self.federated_valid_test_model(self.federated_test_data_path)
print('test HR@10 = %.5f, NGCD@10 = %.5f' % (federated_valid_hr_10, federated_valid_ndcg_10))
def federated_valid_test_model(self, path):
val_data = self.read_federated_valid_dataset(path)
configs = {'top_k': 10, 'num_negative_test': 49, }
dl = DataLoaderCenter(configs, val_data)
val_dataloader = dl.get_val_dataloader()
hr_10, ndcg_10 = 0.0, 0.0
len = 0
# one batch represents a client since there is the same user in a batch
for batch_id, batch in enumerate(val_dataloader):
len += 1
assert isinstance(batch[0], torch.LongTensor)
users, items, ratings = batch[0], batch[1], batch[2]
val_data = pd.DataFrame(zip(users.tolist(), items.tolist(), ratings.tolist()),
columns=['user_id', 'item_id', 'ratings'])
items = [self.item[item] for item in items.tolist()]
user_id = self.user[int(users[0])]
client = self.get_client_data(user_id)
hr, ndcg = client.evaluate_local(items, val_data)
hr_10 += hr[10]
ndcg_10 += ndcg[10]
hr_10 /= len
ndcg_10 /= len
return hr_10, ndcg_10
def main(self):
self.init_model()
self.train_model()
if __name__ == '__main__':
model = LightFR()
model.main()
main.py类的定义:
这段代码定义了LightFR
类,它继承自Base
类并实现了一个联邦学习框架。LightFR
类中包含了模型的初始化、训练过程和验证/测试方法。以下是对主要方法的详细解读:
- 初始化 (
__init__
)- 调用基类 Base 的构造函数。
- 初始化配置参数 (self.configs)。
- 获取随机客户端列表 (
get_random_client_list
)- 基于配置中的客户端比例 (self.configs.client_ratio) 随机选择一部分用户作为参与训练的客户端。
- 获取客户端数据 (get_client_data)
- 创建一个 Client 类实例,用于处理特定客户端的数据。
- 初始化客户端相关的属性,如哈希表示、用户和项目数据。
- 训练模型
这是模型训练的主要过程。- 客户端更新:遍历随机选择的客户端列表,并调用 client_update 方法更新客户端的哈希表示。
- 服务端更新:遍历所有项目,并更新全局项目特征 (self.D)。
- 损失计算:计算所有客户端的累计损失,并在每轮更新后检查是否满足终止条件。
import pandas as pd
import torch
import numpy as np
import math
from sklearn import metrics as sk_metrics
# from reader.data_reader import DataReader
# from loader.data_loader import DataLoader
class Metrics(object):
def __init__(self, configs):
super(Metrics, self).__init__()
self.configs = configs
def get_hit_ratio(self, test_data: pd.DataFrame): # for implicit feedback
top_k = self.configs['top_k']
hrs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
hrs[current_top_k] = 0.0
return hrs
assert 'pred' in test_data.columns, "没有预测值"
test_data['rank'] = test_data['pred'].rank(method='first', ascending=False)
test_data_rank = int(test_data.head(1)['rank'])
for current_top_k in range(1, top_k + 1):
if test_data_rank <= current_top_k:
hrs[current_top_k] = 1.0
else:
hrs[current_top_k] = 0.0
return hrs
def get_ndcg(self, test_data: pd.DataFrame): # for implicit feedback
top_k = self.configs['top_k']
ndcgs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
ndcgs[current_top_k] = 0.0
return ndcgs
assert 'pred' in test_data.columns, "没有预测值"
test_data['rank'] = test_data['pred'].rank(method='first', ascending=False)
test_data_rank = int(test_data.head(1)['rank'])
for current_top_k in range(1, top_k + 1):
if test_data_rank <= current_top_k:
ndcgs[current_top_k] = math.log(2) * 1.0 / math.log(1 + test_data_rank)
else:
ndcgs[current_top_k] = 0.0
return ndcgs
def get_hit_ratio_explicit_client(self, test_data: pd.DataFrame): # for explicit feedback
top_k = self.configs['top_k']
hrs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
hrs[current_top_k] = 0.0
return hrs
assert 'pred' in test_data.columns, "没有预测值"
data = test_data[['pred', 'ratings']].to_numpy()
real_value_list = sorted(data, key=lambda x: x[1], reverse=True)
predict_value_list = sorted(data, key=lambda x: x[0], reverse=True)
test_data['rank'] = test_data['pred'].rank(method='first', ascending=False)
test_data_rank = int(test_data.head(1)['rank'])
for current_top_k in range(1, top_k + 1):
if test_data_rank <= current_top_k:
hrs[current_top_k] = 1.0
else:
hrs[current_top_k] = 0.0
return hrs
def get_ndcg_explicit_client(self, test_data: pd.DataFrame): # for explicit feedback
top_k = self.configs['top_k']
ndcgs = {}
if test_data.empty:
for current_top_k in range(1, top_k + 1):
ndcgs[current_top_k] = 0.0
return ndcgs
assert 'pred' in test_data.columns, "没有预测值"
data = test_data[['pred', 'ratings']].to_numpy()
real_value_list = sorted(data, key=lambda x: x[1], reverse=True)
predict_value_list = sorted(data, key=lambda x: x[0], reverse=True)
for current_top_k in range(1, top_k + 1):
if len(real_value_list) >= current_top_k:
idcg, dcg = 0.0, 0.0
for i in range(current_top_k):
idcg += (pow(2, real_value_list[i][1]) - 1) / (math.log(i + 2, 2))
dcg += (pow(2, predict_value_list[i][1]) - 1) / (math.log(i + 2, 2))
if idcg != 0:
ndcgs[current_top_k] = float(dcg / idcg)
else:
ndcgs[current_top_k] = 0.0
else:
ndcgs[current_top_k] = 0.0
return ndcgs
def get_auc(self, test_data: pd.DataFrame):
pass
def get_mrr(self, test_data: pd.DataFrame):
pass
def get_rmse(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
value = sk_metrics.mean_squared_error(y, y_hat) ** 0.5
return value
def get_mae(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
value = sk_metrics.mean_absolute_error(y, y_hat)
return value
def get_rmse_client(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
l = len(y)
value = abs(y - y_hat) ** 2
value = value.sum()
result = math.sqrt(value / l)
return result
def get_mae_client(self, test_data: pd.DataFrame):
assert 'pred' in test_data.columns, "没有预测值"
y = test_data['ratings']
y_hat = test_data['pred']
l = len(y)
value = abs(y - y_hat)
value = value.sum()
result = value / l
return result
def calDCG_k(self, dictdata, k):
nDCG = []
for key in dictdata.keys():
listdata = dictdata[key]
real_value_list = sorted(listdata, key=lambda x: x[1], reverse=True)
idcg = 0.0
predict_value_list = sorted(listdata, key=lambda x: x[0], reverse=True)
dcg = 0.0
if len(listdata) >= k:
for i in range(k):
idcg += (pow(2, real_value_list[i][1]) - 1) / (log(i + 2, 2))
dcg += (pow(2, predict_value_list[i][1]) - 1) / (log(i + 2, 2))
if (idcg != 0):
nDCG.append(float(dcg / idcg))
else:
continue
ave_ndcg = np.mean(nDCG)
# print(nDCG)
return ave_ndcg
if __name__ == '__main__':
configs = {
'dataset': 'filmtrust',
'data_type': 'implicit',
'num_negative_train': 4,
'num_negative_test': 99,
'local_batch_size': 100,
'top_k': 10
}
dr = DataReader(configs)
client_data = dr.get_data_by_client(0)
dl = DataLoader(configs, client_data)
test_data = dl.get_test_dataloader()
metric = Metrics(configs)
for batch_id, batch in enumerate(test_data):
assert isinstance(batch[0], torch.LongTensor)
users, items, labels = batch[0], batch[1], batch[2]
if batch_id == 0:
pred = np.random.uniform(0, 1, 100)
test_data = pd.DataFrame(
{'user_id': users,
'item_id': items,
'label': labels,
'pred': pred}
)
print(test_data)
value1 = metric.get_hit_ratio(test_data)
print(f'value1:{value1}')
value2 = metric.get_ndcg(test_data)
print(f'value2:{value2}')
value3 = metric.get_rmse(test_data)
print(f'value3:{value3}')
value4 = metric.get_mae(test_data)
print(f'value4:{value4}')
这段代码中定义了一个名为‘Metrics‘的类,用于计算推荐系统中的多种性能指标,包括命中率 (Hit Ratio)、归一化累积增益 (NDCG)、均方根误差 (RMSE) 和平均绝对误差 (MAE)。以下是对这个类中主要方法的解读:
Metrics
类
初始化 (init):接收配置参数 configs,这些配置参数包含了评估指标时所需的信息,如 top_k。
性能评估方法
客户端与服务端如何完成梯传输与更新
客户端的梯度更新
更新过程:在 LightFR 类的 train_model 方法中,首先从所有客户端中随机选择一部分进行训练(基于
self.configs.client_ratio)。对于每个选中的客户端,创建一个 Client 实例,并使用 client_update 方法更新客户端的哈希表示(client.bu)。
client_update方法:
这个方法是在 Client 类中定义的。它对客户端的哈希表示进行迭代更新,以更好地反映用户的偏好和行为。
更新是基于用户的实际评分数据和全局项目特征(client.D)。
更新过程涉及计算预测误差、应用正则化项,并使用符号函数(np.sign)调整哈希码的每一位。客户端与服务端的梯度传输
客户端到服务端: 在 LightFR 的 train_model 方法中,客户端完成哈希表示的更新后,这些更新信息(哈希表示)在内存中保留。
代码中没有直接显示客户端将梯度信息发送到服务端的过程。但从逻辑上讲,在一个实际的联邦学习环境中,客户端会将其更新的哈希表示或相关梯度信息发送给服务端。服务端更新
服务端根据收到的客户端信息更新全局项目特征(self.D)。
服务端更新涉及遍历所有项目并对它们的特征进行批量更新,这可能是基于从多个客户端聚合的信息。损失计算和反馈:
在每轮更新后,服务端计算整体损失并可能根据此调整全局更新策略。
损失信息可以反馈给客户端,以指导后续的本地更新。
总结:
在提供的代码中,客户端使用其本地数据独立更新哈希表示。这些更新在内存中保留,并可以被服务端用来更新全局模型。尽管代码中没有直接显示客户端和服务端之间的通信过程,但在一个完整的联邦学习系统中,客户端的更新通常会被发送到服务端,服务端则根据这些信息进行全局模型的更新。这样的设计旨在优化模型的整体性能,同时保护每个客户端的数据隐私。