入门pytorch-联邦学习

发布于:2025-07-02 ⋅ 阅读:(16) ⋅ 点赞:(0)

本文联邦学习的代码引用于https://github.com/shaoxiongji/federated-learning

本篇文章相当于带大家读一遍联邦学习的代码,同时加深了大家对联邦学习Pytorch框架的理解。

这里想简单介绍一下联邦学习。

联邦学习说白了,就是假如有 N N N个数据拥有者 F 1 , . . . , F N {F_1,...,F_N} F1,...,FN,他们希望使用这些数据来训练机器学习模型,但是又各自想隐藏自己的数据不被别人所知道(隐私保护),这个过程每个用户传递本地模型参数到中心服务器训练模型 M F E D M_{FED} MFED,该过程中任何数据拥有者 F i F_i Fi都不会暴露其数据 D i D_i Di给其他人。而传统的方法将所有的数据放到一起(中心服务器)并使用 D = D 1 ∪ D 2 . . . D N D=D_1 \cup D_2...D_N D=D1D2...DN训练模型 M S U M M_{SUM} MSUM,但是在传统方法的过程中,中心服务器会得知所有用户的数据,故有了联邦学习这个概念,并由此衍生出了针对联邦学习的攻击与防御等。

在这里,我们对比模型 M F E D M_{FED} MFED和模型 M S U M M_{SUM} MSUM的精度 V F E D V_{FED} VFED V S U M V_{SUM} VSUM应该非常接近,如果其精度有了损失,可能会因隐私保护而得不偿失了,下面的 δ \delta δ是联邦学习算法的精度值损失 ∣ V F E D − V S U M ∣ < δ |V_{FED}-V_{SUM}|<\delta VFEDVSUM<δ


整体架构(main函数)

首先,我们先从整体进行大览整体逻辑。首先初始化全局模型,然后划分每个用户的本地数据集,开始训练,由每个客户端进行本地训练,然后将参数传递给中心服务器,进行全局平均更新模型参数并将将新的参数传递给每个客户端。迭代数轮,最终就训练好了模型

if __name__ == '__main__':
    # parse args
    args = args_parser() # 参数解析
    args.device = torch.device('cuda:{}'.format(args.gpu) if torch.cuda.is_available() and args.gpu != -1 else 'cpu') # 切换设备

    # load dataset and split users
    if args.dataset == 'mnist': # 加载数据集
        trans_mnist = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
        dataset_train = datasets.MNIST('./data/mnist/', train=True, download=True, transform=trans_mnist)
        dataset_test = datasets.MNIST('./data/mnist/', train=False, download=True, transform=trans_mnist)
        # sample users
        if args.iid: # 是否服从独立同分布的划分数据集
            dict_users = mnist_iid(dataset_train, args.num_users)
        else:
            dict_users = mnist_noniid(dataset_train, args.num_users)
    elif args.dataset == 'cifar': # 加载数据集
        trans_cifar = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
        dataset_train = datasets.CIFAR10('./data/cifar', train=True, download=True, transform=trans_cifar)
        dataset_test = datasets.CIFAR10('./data/cifar', train=False, download=True, transform=trans_cifar)
        if args.iid:
            dict_users = cifar_iid(dataset_train, args.num_users)
        else:
            exit('Error: only consider IID setting in CIFAR10')
    else:
        exit('Error: unrecognized dataset')
    img_size = dataset_train[0][0].shape

    # build model
    if args.model == 'cnn' and args.dataset == 'cifar': # 选定模型
        net_glob = CNNCifar(args=args).to(args.device)
    elif args.model == 'cnn' and args.dataset == 'mnist':
        net_glob = CNNMnist(args=args).to(args.device)
    elif args.model == 'mlp':
        len_in = 1
        for x in img_size:
            len_in *= x
        net_glob = MLP(dim_in=len_in, dim_hidden=200, dim_out=args.num_classes).to(args.device)
    else:
        exit('Error: unrecognized model')
    print(net_glob)
    net_glob.train()

    # copy weights
    w_glob = net_glob.state_dict() # 获取全局权重

    # training
    loss_train = []
    cv_loss, cv_acc = [], []
    val_loss_pre, counter = 0, 0
    net_best = None
    best_loss = None
    val_acc_list, net_list = [], []

    if args.all_clients: 
        print("Aggregation over all clients") # 所有客户端的聚合
        w_locals = [w_glob for i in range(args.num_users)] # 将初始化权重分配给每个用户
    for iter in range(args.epochs): # 总的训练轮次
        loss_locals = [] #
        if not args.all_clients:
            w_locals = []
        ''' args.frac每次梯度下降的比例	args.num_users客户端数量 '''
        m = max(int(args.frac * args.num_users), 1) # 选择需要进行梯度下降的用户数量
        idxs_users = np.random.choice(range(args.num_users), m, replace=False) # 随机选择
        for idx in idxs_users:
            local = LocalUpdate(args=args, dataset=dataset_train, idxs=dict_users[idx])
            w, loss = local.train(net=copy.deepcopy(net_glob).to(args.device)) # 获取全局模型并开始本地训练
            if args.all_clients:
                w_locals[idx] = copy.deepcopy(w) # 获取每个客户端的本地参数
            else:
                w_locals.append(copy.deepcopy(w))
            loss_locals.append(copy.deepcopy(loss)) # 获取损失函数
        # update global weights
        w_glob = FedAvg(w_locals) # 进行聚合平均

        # copy weight to net_glob
        net_glob.load_state_dict(w_glob) # 更新参数

        # print loss
        loss_avg = sum(loss_locals) / len(loss_locals)
        print('Round {:3d}, Average loss {:.3f}'.format(iter, loss_avg))
        loss_train.append(loss_avg)

    # plot loss curve
    plt.figure()
    plt.plot(range(len(loss_train)), loss_train)
    plt.ylabel('train_loss')
    plt.savefig('./save/fed_{}_{}_{}_C{}_iid{}.png'.format(args.dataset, args.model, args.epochs, args.frac, args.iid))

    # testing
    net_glob.eval()
    acc_train, loss_train = test_img(net_glob, dataset_train, args)
    acc_test, loss_test = test_img(net_glob, dataset_test, args)
    print("Training accuracy: {:.2f}".format(acc_train))
    print("Testing accuracy: {:.2f}".format(acc_test))

参数处理

这里给出我们所使用到的参数,还有一些参数在代码中并没有使用到

参数 解释
epochs 中心服务器训练的轮次
num_users 客户端数量
frac 每次进行梯度下降的比例
local_ep 本地训练模型的轮次
local_bs 本地批量大小
lr 学习率
momentum SGD梯度下降法的动量大小
model 选用模型
dataset 所用数据集
iid 数据集划分是否符合独立同分布
num_classes 模型的通道数
gpu 选用模型
stopping_rounds 选用模型
verbose 详细打印
seed 随机种子
all_clients 聚合所有的客户端
def args_parser():
    parser = argparse.ArgumentParser()
    # federated arguments
    parser.add_argument('--epochs', type=int, default=10, help="rounds of training")
    parser.add_argument('--num_users', type=int, default=100, help="number of users: K")
    parser.add_argument('--frac', type=float, default=0.1, help="the fraction of clients: C")
    parser.add_argument('--local_ep', type=int, default=5, help="the number of local epochs: E")
    parser.add_argument('--local_bs', type=int, default=10, help="local batch size: B")
    parser.add_argument('--bs', type=int, default=128, help="test batch size")
    parser.add_argument('--lr', type=float, default=0.01, help="learning rate")
    parser.add_argument('--momentum', type=float, default=0.5, help="SGD momentum (default: 0.5)")
    parser.add_argument('--split', type=str, default='user', help="train-test split type, user or sample")

    # model arguments
    parser.add_argument('--model', type=str, default='mlp', help='model name')
    parser.add_argument('--kernel_num', type=int, default=9, help='number of each kind of kernel')
    parser.add_argument('--kernel_sizes', type=str, default='3,4,5',
                        help='comma-separated kernel size to use for convolution')
    parser.add_argument('--norm', type=str, default='batch_norm', help="batch_norm, layer_norm, or None")
    parser.add_argument('--num_filters', type=int, default=32, help="number of filters for conv nets")
    parser.add_argument('--max_pool', type=str, default='True',
                        help="Whether use max pooling rather than strided convolutions")

    # other arguments
    parser.add_argument('--dataset', type=str, default='mnist', help="name of dataset")
    parser.add_argument('--iid', action='store_true', help='whether i.i.d or not')
    parser.add_argument('--num_classes', type=int, default=10, help="number of classes")
    parser.add_argument('--num_channels', type=int, default=3, help="number of channels of imges")
    parser.add_argument('--gpu', type=int, default=-1, help="GPU ID, -1 for CPU")
    parser.add_argument('--stopping_rounds', type=int, default=10, help='rounds of early stopping')
    parser.add_argument('--verbose', action='store_true', help='verbose print')
    parser.add_argument('--seed', type=int, default=1, help='random seed (default: 1)')
    parser.add_argument('--all_clients', action='store_true', help='aggregation over all clients')
    args = parser.parse_args()
    return args

独立同分布划分数据集

可以设置独立同分布还是非独立同分布划分数据集(其实这块内容基本使用不到),在目前学术界大都采用Dirichlet分布或者Pathological分布

其实这块内容从某个方面来看,在常规阶段提点无法有较大提升,所以提出了新的场景(Dirichlet分布Pathological分布),在新的场景中可以完成显著的提点,当然这个场景本身是没有问题的,以及衍生的算法也没有问题。但是我们可以学习到一个学术思路,将我们创新出的算法放置到一个新场景或许有意料之外的效果。其实也有点先射箭后画靶的意思了。好了,这里不再过多谈论了

def mnist_iid(dataset, num_users): # 独立同分布划分
    """
    Sample I.I.D. client data from MNIST dataset
    :param dataset:
    :param num_users:
    :return: dict of image index
    """
    num_items = int(len(dataset)/num_users)
    dict_users, all_idxs = {}, [i for i in range(len(dataset))]
    for i in range(num_users): # 遍历每个用户
        dict_users[i] = set(np.random.choice(all_idxs, num_items, replace=False)) # 进行抽取
        all_idxs = list(set(all_idxs) - dict_users[i]) # 删除已经抽取过的数据
    return dict_users # 返回划分好的字典


def mnist_noniid(dataset, num_users): # 非独立同分布划分
    """
    Sample non-I.I.D client data from MNIST dataset
    :param dataset:
    :param num_users:
    :return:
    """
    num_shards, num_imgs = 200, 300
    idx_shard = [i for i in range(num_shards)]
    dict_users = {i: np.array([], dtype='int64') for i in range(num_users)}
    idxs = np.arange(num_shards*num_imgs)
    labels = dataset.train_labels.numpy()

    # sort labels
    idxs_labels = np.vstack((idxs, labels))
    idxs_labels = idxs_labels[:,idxs_labels[1,:].argsort()]
    idxs = idxs_labels[0,:]

    # divide and assign
    for i in range(num_users):
        rand_set = set(np.random.choice(idx_shard, 2, replace=False))
        idx_shard = list(set(idx_shard) - rand_set)
        for rand in rand_set:
            dict_users[i] = np.concatenate((dict_users[i], idxs[rand*num_imgs:(rand+1)*num_imgs]), axis=0)
    return dict_users

模型

MLP模型

class MLP(nn.Module):
    def __init__(self, dim_in, dim_hidden, dim_out):
        super(MLP, self).__init__()
        self.layer_input = nn.Linear(dim_in, dim_hidden)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout()
        self.layer_hidden = nn.Linear(dim_hidden, dim_out)

    def forward(self, x):
        x = x.view(-1, x.shape[1]*x.shape[-2]*x.shape[-1])
        x = self.layer_input(x)
        x = self.dropout(x)
        x = self.relu(x)
        x = self.layer_hidden(x)
        return x

卷积模型

class CNNMnist(nn.Module):
    def __init__(self, args):
        super(CNNMnist, self).__init__()
        self.conv1 = nn.Conv2d(args.num_channels, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, args.num_classes)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, x.shape[1]*x.shape[2]*x.shape[3])
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return x

class CNNCifar(nn.Module):
    def __init__(self, args):
        super(CNNCifar, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, args.num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

本地模型训练

每个客户端经过本地训练,上传模型置中央服务器上,服务器进行聚合并将聚合后的模型下发到各个客户端。迭代数次,一个泛化性强大的模型便训练好了。

这里谈到了泛化性,我便多说一点,与之对应的便是个性化。此时引出来了个性化联邦学习PFL,其实所谓的PFL从技术层面上看就是取巧了,更多的强调的是个性化,那么它是否丧失了泛化性呢?说实话,其实大部分这方面论文早已丧失了泛化性,本身就是本地训练个模型,但是将其中的某些层经过聚合,实际上不经过聚合,模型的点数也很高。其实这里已经给出了为什么PFL的点数如此之高

当然,是否存在两者兼具的算法呢?当让存在,只不过其它方面又存在一些问题。学术其实就是这样,不断打补丁

class LocalUpdate(object):
    def __init__(self, args, dataset=None, idxs=None):
        self.args = args
        self.loss_func = nn.CrossEntropyLoss()
        self.selected_clients = []
        self.ldr_train = DataLoader(DatasetSplit(dataset, idxs), batch_size=self.args.local_bs, shuffle=True)

    def train(self, net):
        net.train() # 设置为训练模式
        # train and update
        optimizer = torch.optim.SGD(net.parameters(), lr=self.args.lr, momentum=self.args.momentum)

        epoch_loss = []
        for iter in range(self.args.local_ep): # 本地训练的轮次
            batch_loss = []
            for batch_idx, (images, labels) in enumerate(self.ldr_train):
                images, labels = images.to(self.args.device), labels.to(self.args.device)
                net.zero_grad() # 梯度清零
                log_probs = net(images) # 预测
                loss = self.loss_func(log_probs, labels) # 计算损失函数
                loss.backward() # 反向传播
                optimizer.step() # 进行优化
                if self.args.verbose and batch_idx % 10 == 0: # 详细打印程度
                    print('Update Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                        iter, batch_idx * len(images), len(self.ldr_train.dataset),
                               100. * batch_idx / len(self.ldr_train), loss.item()))
                batch_loss.append(loss.item())
            epoch_loss.append(sum(batch_loss)/len(batch_loss))
        return net.state_dict(), sum(epoch_loss) / len(epoch_loss)

聚合权重

中心服务器每轮接受到了客户端传递的参数,进行平均聚合(同样的,也可以采用加权聚合),然后再下发给每个客户端

def FedAvg(w):
    w_avg = copy.deepcopy(w[0]) # 对第一个客户端进行深层拷贝
    for k in w_avg.keys(): # 遍历每一个参数
        for i in range(1, len(w)): # 遍历每一个客户端并相加
            w_avg[k] += w[i][k]
        w_avg[k] = torch.div(w_avg[k], len(w)) # 最后求平均
    return w_avg

个性化联邦学习(Personalized federated learning, PFL)

谈论到了联邦学习(Federated Learning),那就不得不谈论PFL了,代码引用于https://github.com/TsingZ0/PFLlib

上述的PFLib库不仅仅有PFL,同样也集成了FL。跑联邦的实验很好用,建议使用此框架完成对比实验,这里就不再详细介绍代码了,PFLib作者本身做的还是很不错的,代码架构清晰明了。

看别人的代码清晰明了,看自己的代码不堪入目ε(┬┬﹏┬┬)3

实际上,PFL是个伪需求(未来有可能成为真是需求)。我们回顾一下FL的诞生,FL诞生之前是分布式学习,摇身一变成为了联邦学习。再过几年,个性化联邦学习应运而生。FL是为了隐私保护,保护各个客户端的数据不被其他人获取,但是依然希望获取一个泛化性能较强的模型,而PFL为了追求个性化,其实是在泛化性和个性化作了一个平衡,但是随时间各个论文为了提点,不得不更偏向于个性化

过度个性化 ≈ \approx 本地训练模型

从这里就可以看到,目前近几年部分论文在底层上,实际与本地训练个模型别无二致,无非添加了些概念(截至2025.6)

谈论到这里,其实还一种学术思路,便是将其他领域的方法嫁接到本领域当中,比如对抗学习、元学习、对比学习、知识蒸馏等等(这些都已有论文了)。所以往往不要在本领域寻找创新点,创新点可能在其它领域中等待着发现。


网站公告

今日签到

点亮在社区的每一天
去签到