python 线性回归模型

发布于:2024-05-23 ⋅ 阅读:(38) ⋅ 点赞:(0)

教材链接-3.2. 线性回归的从零开始实现

c++实现

该博客仅用于记录一下自己的代码,可与c++实现作为对照

from d2l import torch as d2l
import torch
import random
# nn是神经网络的缩写
from torch import nn
from torch.utils import data

# 加载训练数据  
# 加载训练数据集 
simples = torch.load('datas.pt')
# 这里是加载了训练和测试数据集的真实权重和偏差,仅作为最后训练结果的验证使用
tw, tb = torch.load('wb.pt')
# 加载测试数据集  
tests = torch.load('test.pt')
# 获取训练数据集的样本数量  
simple_num = simples.shape[0]

# 获取数据读取迭代器  
def data_iter(batch_size, features, labels):
    # 计算数据的总数量
    num_examples = len(features)
    # 创建一个包含数据索引的列表  
    indices = list(range(num_examples))
    # 随机打乱索引列表,以实现随机读取样本,对训练结果意义不明
    # random.shuffle(indices)
    # 遍历打乱后的indices,每次取出batch_size个索引,用于构建一个小批量数据  
    for i in range(0, num_examples, batch_size):
        # 获取当前批次的索引号并以张量形式存储
        batch_indices = torch.tensor(indices[i: min(i + batch_size, num_examples)])
        # 根据索引从特征和标签中提取数据  
        yield features[batch_indices], labels[batch_indices]
# 在Python中,yield 是一个关键字,用于定义一个生成器(generator)。生成器是一种特殊的迭代器,它允许你定义一个可以记住上一次返回时在函数体中的位置的函数。对生成器函数的第二次(或第n次)调用将恢复函数的执行,并继续从上次挂起的位置开始。

# 定义一个函数来加载并批量处理数据,返回数据获取迭代器 
def load_array(data_arrays, batch_size, is_train=True):  #@save
    """构造一个PyTorch数据迭代器"""
    # 使用TensorDataset将多个tensor组合成一个数据集  
    dataset = data.TensorDataset(*data_arrays)
    # 使用DataLoader加载数据集,并指定批量大小和是否打乱数据
    return data.DataLoader(dataset, batch_size, shuffle=is_train)

# 定义线性回归模型  
def linreg(X, w, b):  #@save
    """线性回归模型"""
    # 使用矩阵乘法计算预测值,并加上偏差  
    return torch.matmul(X, w) + b

# 定义平方损失函数  
def squared_loss(y_hat, y):  #@save
    """均方损失"""
    # 计算预测值与实际值之间的平方差,并除以2(方便梯度计算)
    return (y_hat - y.reshape(y_hat.shape)) ** 2 / 2

# 定义交叉熵损失函数,线性回归模型用不到
def cross_entropy(y_hat, y):
    return - torch.log(y_hat[range(len(y_hat)), y])

# 定义一个鲁棒的损失函数,结合了平方损失和绝对值损失
def robust_loss(y_hat, y, delta=1.0):
    residual = torch.abs(y_hat - y)
    return torch.where(residual<delta, 0.5* residual **2, delta*(residual-0.5*delta))

# 绝对值损失函数  
def abs_loss(y_hat, y):
    return torch.abs(y_hat - y.reshape(y_hat.shape))
    
# 定义随机梯度下降函数  
def sgd(params, lr, batch_size):  #@save
    """小批量随机梯度下降"""
    with torch.no_grad():
        # 遍历模型参数 
        for param in params:
            # 更新参数值,使用学习率lr乘以参数的梯度,并除以批量大小 
            param -= lr * param.grad / batch_size
            # 清除参数的梯度,为下一轮迭代做准备  
            param.grad.zero_()

# 数据标准化处理  
def standard(X):
    X_mean = torch.mean(X, dim=0)
    X_std = torch.std(X, dim=0)
    return (X-X_mean)/X_std

# 数据最小最大归一化处理  
def min_max(X):
    X_min = torch.min(X, dim=0)[0]
    X_max = torch.max(X, dim=0)[0]
    return (X-X_min)/(X_max-X_min)

# 不进行任何处理,直接返回输入
def noProcess(X):
    return X
#Linear Regression Implementation from Scratch
if __name__ == '__main__':
    # 设置学习率和训练轮数  
    lr = 0.03
    num_epochs = 20
    # 这里其实net变量并没有定义为一个神经网络模型,而是一个函数  
    # 但为了与后续代码保持一致,我们仍然使用net来表示这个线性回归函数
    # loss同理
    net = linreg
    loss = squared_loss
    # 使用不进行任何处理的数据处理方式  
    data_process = noProcess
    # 将数据分成50个批次,计算每批数据的数量 
    batch_size = simple_num // 50
    # 提取特征和标签 
    # 提取最后一列作为标签  
    label = simples[:,-1]
    # 提取除最后一列外的所有列作为特征,并使用data_process进行处理
    feature=data_process(simples[:, :-1])
    # 初始化权重和偏差,权重使用正态分布初始化,偏差初始化为0  
    w = torch.normal(0, 1, size=(feature.shape[1], 1), requires_grad=True)
    # w = torch.tensor([0.3], requires_grad=True)
    b = torch.tensor([0.0], requires_grad=True)
    timer = d2l.Timer()
    # 开始训练  
    for epoch in range(num_epochs):
        # 通过data_iter遍历数据进行一轮训练
        for X,y in data_iter(batch_size, feature, label):
            # 计算预测值
            y_hat = net(X, w, b)
            # 计算损失
            l = loss(y_hat, y)
            # 反向传播计算梯度  
            l.sum().backward()
            # 使用随机梯度下降更新参数
            sgd([w,b], lr, batch_size)
        # 一轮训练结束后,计算整个训练集上的损失,用以监控训练效果
        # with torch.no_grad(): 告诉 PyTorch 在这个上下文内不要计算梯度,从而节省内存并加速计算。
        with torch.no_grad():
            label_hat = net(feature, w, b)
            epoch_loss = loss(label_hat, label)
            if epoch%5 == 0:
                print(f'in epoch{epoch+1}, loss is {epoch_loss.sum()}')

    # 在训练完成后,计算测试集上的预测值和损失  
    # 提取测试集的特征和标签 
    test_feature = data_process(tests[:, :-1])
    test_label = tests[:, -1]
    # 计算测试集上的预测值和损失 
    test_label_hat = net(test_feature, w, b)
    label_loss = loss(test_label_hat, test_label)
    print(f'in test epoch, loss is {label_loss.mean()}')
    print(f'true_w={tw}, true_b={tb}, w={w}, b={b}')
    print(f' {num_epochs} epoch, time {timer.stop():.2f} sec')
#Concise Implementation of Linear Regression
#the concise implementation have lower accuracy than from scratch
if __name__ == '__main2__':
    # 设置学习率、训练轮数、数据处理方式和批量大小  
    lr = 0.03
    num_epochs = 15
    # 使用不进行任何处理的数据处理方式  
    data_process = noProcess
    # 将数据分成50个批次,计算每批数据的数量  
    batch_size = simple_num // 50
    # 提取特征和标签 
    label = simples[:,-1]
    feature=data_process(simples[:, :-1])
    # 加载数据并创建数据迭代器  
    data_iter = load_array((feature, label), batch_size)
    # 构建神经网络模型,这里是一个简单的线性回归模型  
    net = nn.Sequential(nn.Linear(feature.shape[1], 1))
    # 我们的模型只包含一个层,因此实际上不需要Sequential
    # 不使用Sequential时,后面的net[0]需要改为net
    # net = nn.Linear(feature.shape[1], 1)
    # 初始化网络权重和偏置 
    net[0].weight.data.normal_(0, 0.01)
    net[0].bias.data.fill_(0)
    # 使用均方误差损失函数
    loss = nn.MSELoss()
    # 使用随机梯度下降优化器  
    trainer = torch.optim.SGD(net.parameters(), lr=lr)
    
    # 开始训练  
    for epoch in range(num_epochs):
        # 通过data_iter遍历数据进行一轮训练
        for X, y in data_iter:
            # 前向传播计算预测值
            y_hat = net(X)
            # 计算损失 
            l = loss(y_hat, y.reshape(y_hat.shape))
            # 梯度清零,为下一轮迭代计算做准备
            trainer.zero_grad()
            # 反向传播计算梯度   
            l.backward()
            # 使用随机梯度下降更新参数
            trainer.step()
        # 在每个epoch结束后,对整个数据集进行前向传播并计算损失,用于监控训练过程 
        label_hat = net(feature)
        epoch_loss = loss(label_hat, label.reshape(label_hat.shape))
        if epoch%5 == 0:
            print(f'in epoch{epoch+1}, loss is {epoch_loss.mean()}')
    
    # 在训练完成后,计算测试集上的预测值和损失  
    # 提取测试集的特征和标签 
    test_feature = data_process(tests[:, :-1])
    test_label = tests[:, -1]
    # 计算测试集上的预测值和损失 
    test_label_hat = net(test_feature)
    label_loss = loss(test_label_hat, test_label.reshape(test_label_hat.shape))
    print(f'in test epoch, loss is {label_loss.mean():f}')
    print(f'tw={tw}, tb={tb}, w={net[0].weight.data}, b={net[0].bias.data}')