kaggle地址:
https://www.kaggle.com/competitions/ml2021spring-hw1/overview
初始版本:
import csv
import time
import numpy as np
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from torch import optim
from torch.utils.data import Dataset, DataLoader
# 第一步:处理数据
# 继承 Dataset,表示它是一个 PyTorch 可用的数据集类【不用调用父类的init方法是因为Dataset.__init__()为空】
class Covid_dataset(Dataset):
def __init__(self, file_path, mode): # mode 指定数据集类型(train, val, test)
with open(file_path, "r") as f: # 打开 file_path 指定的文件,以只读模式 ("r") 进行读取,并赋值给 f 变量
csv_data = list(csv.reader(f)) # 读取 CSV 文件
data = np.array(csv_data[1:]) # 跳过表头(即第一行),将数据转换为 NumPy 数组
# 划分训练集和验证集
if mode == "train":
indices = [i for i in range(len(data)) if i % 5 != 0] # 80% 数据用于训练
elif mode == "val":
indices = [i for i in range(len(data)) if i % 5 == 0] # 20% 数据用于验证
# 从表中取数据
if mode == "test":
x = data[:, 1:].astype(float) # 取所有行,第2列到最后一列(测试集没有y只有x,且前面已经去掉第一行了)
x = torch.tensor(x) # 转化为张量
else:
x = data[indices, 1:-1].astype(float) # 取indices行,x为第2列到倒数第2列
x = torch.tensor(x)
y = data[indices, -1].astype(float) # y为最后一列
self.y = torch.tensor(y) # 加self的目的是让局部变量xy变成self的属性,可以在其他函数内通过self来访问
# 对x进行标准化,将数据每一列的均值变为0,标准差变为1,避免某些特征因为量纲(比如10和0.001)不同导致训练的不稳定
self.x = (x - x.mean(dim=0, keepdim=True)) / x.std(dim=0, keepdim=True)
self.mode = mode
def __getitem__(self, item):
if self.mode == "test":
return self.x[item].float() # 测试集只返回 x(没有 y),同时注意data要转为模型需要的float32型
else:
return self.x[item].float(), self.y[item].float() # 训练/验证集返回 (x, y)
def __len__(self):
return len(self.x) # 返回数据集的样本数,让 DataLoader 知道数据集的大小,便于批量训练
# 第二步:定义模型
# 继承自 nn.Module,是一个 PyTorch 全连接神经网络(MLP),用于执行回归或二分类任务
class myModel(nn.Module):
def __init__(self, dim): # dim 是输入特征的维度
super(myModel, self).__init__() # 调用父类的init方法,保证 myModel 正确继承 torch.nn.Module 的功能
self.fc1 = nn.Linear(dim, 100) # 第一层:全连接层,输入维度 dim,输出 64 维
self.relu = nn.ReLU() # 激活函数 ReLU
self.fc2 = nn.Linear(100, 1) # 第二层:全连接层,输出 1 维(用于回归或二分类)
def forward(self, x): # 定义前向传播
x = self.fc1(x) # 通过第一层全连接
x = self.relu(x) # 通过 ReLU 激活函数
x = self.fc2(x) # 通过第二层全连接
# 处理输出维度问题,确保 x 变为 一维张量(batch_size,) 而不是 (batch_size, 1)
if len(x.size()) > 1:
return x.squeeze(1) # 去掉多余的维度
else:
return x
# 第三步:定义超参数
device = "cuda" if torch.cuda.is_available() else "cpu"
dim = 93
config = {
"lr": 0.001, # 学习率
"momentum": 0.9, # 动量:加速收敛,减少梯度震荡,momentum=0.9代表90%的上一次梯度方向+10%的当前梯度方向
"epochs": 20, # 训练轮数
"save_path": "model_save/model.pth", # 模型保存路径
"rel_path": "pred.csv" # 预测结果的保存路径
}
# 加载数据文件
train_file = "covid.train.csv"
test_file = "covid.test.csv"
# 实例化了前面的数据集类
train_data = Covid_dataset(train_file, "train")
val_data = Covid_dataset(train_file, "val")
test_data = Covid_dataset(test_file, "test")
# 数据加载器,对训练集和验证集来说每次随机取16条数据,测试集不需要随机打乱,且一条一条处理数据
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
val_loader = DataLoader(val_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)
# 【前向传播】初始化模型 myModel
model = myModel(dim)
# 【计算偏差】均方误差损失函数,计算预测值和真实值的平方误差的平均值
loss = nn.MSELoss()
# 【梯度下降/更新参数】优化器
optimizer = optim.SGD(model.parameters(), lr=config["lr"], momentum=config["momentum"])
# 第四步:开始训练
# 用于训练 & 验证的函数
def train_val(model, train_loader, val_loader, device, epochs, optimizer, loss, save_path):
model = model.to(device) # 将model移动到device上进行计算
plt_train_loss = [] # 后面画图要用的
plt_val_loss = []
min_val_loss = 9999999999 # 只要有比最大值更小的就更新loss值
for epoch in range(epochs):
train_loss = 0.0
val_loss = 0.0
start_time = time.time()
model.train() # 启用训练模式(反正就是必须启用就对了)
# optimizer内部不进行梯度计算,使用的是loss.backward()算好的梯度,因此后者本身是需要梯度计算的,不能torch.no_grad()
for batch_x, batch_y in train_loader:
x, target = batch_x.to(device), batch_y.to(device) # 数据迁移到device
pred = model(x) # 前向传播
train_bat_loss = loss(pred, target) # 计算损失,调用的是前面定义的loss = nn.MSELoss(),但接口是这样写的嗯
train_bat_loss.backward() # 反向传播
optimizer.step() # 更新参数(反正也是必须这么用就对了,反向传播之后就这样)
optimizer.zero_grad() # 清空梯度,防止累积
train_loss += train_bat_loss.cpu().item() # 记录损失,先将loss从GPU转移到CPU,再从张量类型转换为标量float
# 计算每个epoch的平均训练损失,并将其添加到plt_train_loss列表中,用于后续绘制训练损失的曲线
plt_train_loss.append(train_loss / train_loader.dataset.__len__())
model.eval() # 启用验证模式(反正就是必须启用就对了)
# 验证集主要任务是计算输出并计算损失,过程中不需要更新参数,也不需要计算梯度,为了提高性能可以直接全部torch.no_grad()
with torch.no_grad():
for batch_x, batch_y in val_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
val_bat_loss = loss(pred, target)
val_loss += val_bat_loss.cpu().item()
plt_val_loss.append(val_loss / val_loader.dataset.__len__())
# 验证损失(val_loss)比之前的最小验证损失(min_val_loss)更小时保存模型,即保存最优的模型参数
if val_loss < min_val_loss:
torch.save(model, save_path)
min_val_loss = val_loss
print('[%03d/%03d] %2.2f sec(s) TrainLoss : %.6f | valLoss: %.6f' % \
(epoch, epochs, time.time() - start_time, plt_train_loss[-1], plt_val_loss[-1])
) # 打印当前轮次训练结果。03d用于打印三位整数;2.2f表示总宽度为2个字符,且保留2位小数;.6f保留6位小数
# 全部轮次执行结束后画图
plt.plot(plt_train_loss)
plt.plot(plt_val_loss)
plt.title("loss")
plt.legend(["train", "val"]) # 图形中有两条曲线,分别标记为train和val
plt.show()
train_val(model, train_loader, val_loader, device, config["epochs"], optimizer, loss, config["save_path"])
# 用于测试的函数
def evaluate(save_path, device, test_loader, rel_path):
model = torch.load(save_path).to(device) # 加载模型并移到指定设备
rel = [] # 用于保存预测结果
model.eval() # 启用验证模式
with torch.no_grad(): # 禁用梯度计算
for x in test_loader: # 遍历测试集中的数据
# print(x)
# print(x.to(device))
pred = model(x.to(device)) # 对数据进行预测,移到设备上
# print(pred)
rel.append(pred.cpu().item()) # 将预测结果移回CPU,并转换为标量,最后添加到结果列表rel中
print(rel) # 打印预测结果
# 将预测结果保存到CSV文件
# 使用csv.writer时,csv默认行结束符为 \r\n,导致每写一行后会多出一个空行
with open(rel_path, "w", newline='') as f:
csv_writer = csv.writer(f)
csv_writer.writerow(["id", "tested_positive"]) # 写入表头,第一列是样本的ID,第二列是预测值
for i in range(len(rel)):
csv_writer.writerow([str(i), str(rel[i])]) # 写入每个预测值
print("文件已经保存到" + rel_path) # 打印保存路径
evaluate(config["save_path"], device, test_loader, config["rel_path"])
ps:
需要注意的是,在数据集中,某些列的数值只有零点几,而有些列的数值达到了八九十,列与列之间的数据差距过大会导致大数值主导了整个计算,最终模型可能学不到正确的模式,导致所有输出值都接近某个固定值(如均值),这就是一开始忘记归一化之后所有预测数据都是一样的原因!
事实上我们需要关注的只是每一列内部的数据变化情况,所以通过标准化,使得所有特征的均值为 0,标准差为 1,这样模型就不会偏向某些特征了
结果:
但可以看到结果还是有比较大的偏差的,这时可以通过以下几种方法进行进一步的优化:
1.正则化:
正则化(Regularization)是一种防止机器学习模型过拟合的方法。它的核心思想是在优化过程中增加对模型复杂度的约束,从而提高模型的泛化能力,使其在未见过的数据上表现更好。(防止模型"死记硬背"数据,而是学到真正有用的模式)
其中一种方法为:L1 和 L2 正则化:
L1 正则化会在 损失函数 中加入 权重(参数)w的 L1 范数(绝对值之和):
L o s s = O r i g i n a l L o s s + λ ∑ ∣ w i ∣ Loss=OriginalLoss+λ∑∣wi∣ Loss=OriginalLoss+λ∑∣wi∣
- 特点:可以让部分权重变成 0,实现 特征选择。
L2 正则化会在 损失函数 中加入 权重(参数)w的 L2 范数(平方和):
L o s s = L o s s o r i g i n a l + λ ∑ w i ∗ w i Loss=Lossoriginal+λ∑wi*wi Loss=Lossoriginal+λ∑wi∗wi
- 特点:防止权重变得过大,但不会让权重变为 0,而是趋向于较小的值,通常被称为 权重衰减
def mseLoss(pred, target, model):
loss = nn.MSELoss(reduction='mean')
''' Calculate loss '''
regularization_loss = 0 # 正则项
for param in model.parameters():
# TODO: you may implement L1/L2 regularization here
# 使用L2正则项
# regularization_loss += torch.sum(abs(param))
regularization_loss += torch.sum(param ** 2) # 计算所有参数平方
return loss(pred, target) + 0.0001 * regularization_loss # 返回损失。
loss = mseLoss
2.相关系数:
并不是每一列的数据都同等重要,甚至有些列完全是噪声。所以通过计算出每一列和结果的相关系数,只保留相关性最高的几列,其他的全部去掉【用别人写好的现成的代码即可】
# 计算相关系数,找到相关性最高的6列
def get_feature_importance(feature_data, label_data, k=6, column=None):
"""
feature_data, label_data 要求字符串形式
k为选择的特征数量
如果需要打印column,需要传入行名
此处省略 feature_data, label_data 的生成代码。
如果是 CSV 文件,可通过 read_csv() 函数获得特征和标签。
这个函数的目的是, 找到所有的特征种, 比较有用的k个特征, 并打印这些列的名字。
"""
model = SelectKBest(chi2, k=k) # 定义一个选择k个最佳特征的函数
feature_data = np.array(feature_data, dtype=np.float64)
X_new = model.fit_transform(feature_data, label_data) # 用这个函数选择k个最佳特征
# feature_data是特征数据,label_data是标签数据,该函数可以选择出k个特征
# print('x_new', X_new)
scores = model.scores_ # scores即每一列与结果的相关性
# 按重要性排序,选出最重要的 k 个
indices = np.argsort(scores)[::-1] # [::-1]表示反转一个列表或者矩阵。
# argsort这个函数, 可以矩阵排序后的下标。 比如 indices[0]表示的是,scores中最小值的下标。
if column: # 如果需要打印选中的列名字
k_best_features = [column[i] for i in indices[0:k].tolist()] # 选中这些列 打印
print('k best features are: ', k_best_features)
return X_new, indices[0:k] # 返回选中列的特征和他们的下标。
3.主成分分析PCA(但本项目未使用)
PCA是一种 降维 方法,主要用于 数据压缩、去噪、可视化,以及消除特征之间的冗余信息。它的核心思想是找到数据中的主成分(最重要的方向),并仅保留主要的特征信息,同时减少数据的维【想象成在高维空间里找到一组最佳坐标轴,让数据投影到这些轴上,使得不同数据之间区分度最大】
优化结果:
【第一次正则时系数使用为0.00075,反而效果变差了,后续调小系数为0.0001,效果变好】
完整代码:
import csv
import time
import numpy as np
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from sklearn.feature_selection import SelectKBest, chi2
from torch import optim
from torch.utils.data import Dataset, DataLoader
# 第一步:处理数据
# 计算相关系数,找到相关性最高的6列
def get_feature_importance(feature_data, label_data, k=6, column=None):
"""
feature_data, label_data 要求字符串形式
k为选择的特征数量
如果需要打印column,需要传入行名
此处省略 feature_data, label_data 的生成代码。
如果是 CSV 文件,可通过 read_csv() 函数获得特征和标签。
这个函数的目的是, 找到所有的特征种, 比较有用的k个特征, 并打印这些列的名字。
"""
model = SelectKBest(chi2, k=k) # 定义一个选择k个最佳特征的函数
feature_data = np.array(feature_data, dtype=np.float64)
X_new = model.fit_transform(feature_data, label_data) # 用这个函数选择k个最佳特征
# feature_data是特征数据,label_data是标签数据,该函数可以选择出k个特征
# print('x_new', X_new)
scores = model.scores_ # scores即每一列与结果的相关性
# 按重要性排序,选出最重要的 k 个
indices = np.argsort(scores)[::-1] # [::-1]表示反转一个列表或者矩阵。
# argsort这个函数, 可以矩阵排序后的下标。 比如 indices[0]表示的是,scores中最小值的下标。
if column: # 如果需要打印选中的列名字
k_best_features = [column[i] for i in indices[0:k].tolist()] # 选中这些列 打印
print('k best features are: ', k_best_features)
return X_new, indices[0:k] # 返回选中列的特征和他们的下标。
# 继承 Dataset,表示它是一个 PyTorch 可用的数据集类【不用调用父类的init方法是因为Dataset.__init__()为空】
class Covid_dataset(Dataset):
def __init__(self, file_path, mode): # mode 指定数据集类型(train, val, test)
with open(file_path, "r") as f: # 打开 file_path 指定的文件,以只读模式 ("r") 进行读取,并赋值给 f 变量
csv_data = list(csv.reader(f)) # 读取 CSV 文件
data = np.array(csv_data[1:]) # 跳过表头(即第一行),将数据转换为 NumPy 数组
# 相关系数
feature_data = data[:, 1:-1] # 特征就是x,在去掉第一行的基础上,再去掉第一列和最后一列
label_data = data[:, -1] # 标签就是y,在去掉第一行的基础上,只取最后一列
_, col = get_feature_importance(feature_data, label_data)
# 该函数返回两个值,X_new和indices[0:k],但是前者后续过程中不准备用,所以用下划线作为变量名
col = col.tolist() # 把col从数组转化为列表,便于后续使用
# 划分训练集和验证集
if mode == "train":
indices = [i for i in range(len(data)) if i % 5 != 0] # 80% 数据用于训练
elif mode == "val":
indices = [i for i in range(len(data)) if i % 5 == 0] # 20% 数据用于验证
# 从表中取数据
if mode == "test":
x = data[:, 1:].astype(float) # 取所有行,第2列到最后一列(测试集没有y只有x,且前面已经去掉第一行了)
x = torch.tensor(x) # 转化为张量
else:
x = data[indices, 1:-1].astype(float) # 取indices行,x为第2列到倒数第2列
x = torch.tensor(x)
y = data[indices, -1].astype(float) # y为最后一列
self.y = torch.tensor(y) # 加self的目的是让局部变量xy变成self的属性,可以在其他函数内通过self来访问
x = x[:, col] # 取相关系数最强的几列
# 对x进行标准化,将数据每一列的均值变为0,标准差变为1,避免某些特征因为量纲(比如10和0.001)不同导致训练的不稳定
self.x = (x - x.mean(dim=0, keepdim=True)) / x.std(dim=0, keepdim=True)
self.mode = mode
def __getitem__(self, item):
if self.mode == "test":
return self.x[item].float() # 测试集只返回 x(没有 y),同时注意data要转为模型需要的float32型
else:
return self.x[item].float(), self.y[item].float() # 训练/验证集返回 (x, y)
def __len__(self):
return len(self.x) # 返回数据集的样本数,让 DataLoader 知道数据集的大小,便于批量训练
# 第二步:定义模型
# 继承自 nn.Module,是一个 PyTorch 全连接神经网络(MLP),用于执行回归或二分类任务
class myModel(nn.Module):
def __init__(self, dim): # dim 是输入特征的维度
super(myModel, self).__init__() # 调用父类的init方法,保证 myModel 正确继承 torch.nn.Module 的功能
self.fc1 = nn.Linear(dim, 100) # 第一层:全连接层,输入维度 dim,输出 64 维
self.relu = nn.ReLU() # 激活函数 ReLU
self.fc2 = nn.Linear(100, 1) # 第二层:全连接层,输出 1 维(用于回归或二分类)
def forward(self, x): # 定义前向传播
x = self.fc1(x) # 通过第一层全连接
x = self.relu(x) # 通过 ReLU 激活函数
x = self.fc2(x) # 通过第二层全连接
# 处理输出维度问题,确保 x 变为 一维张量(batch_size,) 而不是 (batch_size, 1)
if len(x.size()) > 1:
return x.squeeze(1) # 去掉多余的维度
else:
return x
# 第三步:定义超参数
device = "cuda" if torch.cuda.is_available() else "cpu"
# dim = 93
dim = 6
config = {
"lr": 0.001, # 学习率
"momentum": 0.9, # 动量:加速收敛,减少梯度震荡,momentum=0.9代表90%的上一次梯度方向+10%的当前梯度方向
"epochs": 20, # 训练轮数
"save_path": "model_save/model.pth", # 模型保存路径
"rel_path": "pred.csv" # 预测结果的保存路径
}
# 加载数据文件
train_file = "covid.train.csv"
test_file = "covid.test.csv"
# 实例化了前面的数据集类
train_data = Covid_dataset(train_file, "train")
val_data = Covid_dataset(train_file, "val")
test_data = Covid_dataset(test_file, "test")
# 数据加载器,对训练集和验证集来说每次随机取16条数据,测试集不需要随机打乱,且一条一条处理数据
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
val_loader = DataLoader(val_data, batch_size=16, shuffle=True)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)
# 【前向传播】初始化模型 myModel
model = myModel(dim)
# 【计算偏差】均方误差损失函数,计算预测值和真实值的平方误差的平均值
# loss = nn.MSELoss()
def mseLoss(pred, target, model):
loss = nn.MSELoss(reduction='mean')
''' Calculate loss '''
regularization_loss = 0 # 正则项
for param in model.parameters():
# TODO: you may implement L1/L2 regularization here
# 使用L2正则项
# regularization_loss += torch.sum(abs(param))
regularization_loss += torch.sum(param ** 2) # 计算所有参数平方
return loss(pred, target) + 0.0001 * regularization_loss # 返回损失。
loss = mseLoss
# 【梯度下降/更新参数】优化器
optimizer = optim.SGD(model.parameters(), lr=config["lr"], momentum=config["momentum"])
# 第四步:开始训练
# 用于训练 & 验证的函数
def train_val(model, train_loader, val_loader, device, epochs, optimizer, loss, save_path):
model = model.to(device) # 将model移动到device上进行计算
plt_train_loss = [] # 后面画图要用的
plt_val_loss = []
min_val_loss = 9999999999 # 只要有比最大值更小的就更新loss值
for epoch in range(epochs):
train_loss = 0.0
val_loss = 0.0
start_time = time.time()
model.train() # 启用训练模式(反正就是必须启用就对了)
# optimizer内部不进行梯度计算,使用的是loss.backward()算好的梯度,因此后者本身是需要梯度计算的,不能torch.no_grad()
for batch_x, batch_y in train_loader:
x, target = batch_x.to(device), batch_y.to(device) # 数据迁移到device
pred = model(x) # 前向传播
train_bat_loss = loss(pred, target, model) # 计算损失,调用的是前面定义的loss = nn.MSELoss(),但接口是这样写的嗯
train_bat_loss.backward() # 反向传播
optimizer.step() # 更新参数(反正也是必须这么用就对了,反向传播之后就这样)
optimizer.zero_grad() # 清空梯度,防止累积
train_loss += train_bat_loss.cpu().item() # 记录损失,先将loss从GPU转移到CPU,再从张量类型转换为标量float
# 计算每个epoch的平均训练损失,并将其添加到plt_train_loss列表中,用于后续绘制训练损失的曲线
plt_train_loss.append(train_loss / train_loader.dataset.__len__())
model.eval() # 启用验证模式(反正就是必须启用就对了)
# 验证集主要任务是计算输出并计算损失,过程中不需要更新参数,也不需要计算梯度,为了提高性能可以直接全部torch.no_grad()
with torch.no_grad():
for batch_x, batch_y in val_loader:
x, target = batch_x.to(device), batch_y.to(device)
pred = model(x)
val_bat_loss = loss(pred, target, model)
val_loss += val_bat_loss.cpu().item()
plt_val_loss.append(val_loss / val_loader.dataset.__len__())
# 验证损失(val_loss)比之前的最小验证损失(min_val_loss)更小时保存模型,即保存最优的模型参数
if val_loss < min_val_loss:
torch.save(model, save_path)
min_val_loss = val_loss
print('[%03d/%03d] %2.2f sec(s) TrainLoss : %.6f | valLoss: %.6f' % \
(epoch, epochs, time.time() - start_time, plt_train_loss[-1], plt_val_loss[-1])
) # 打印当前轮次训练结果。03d用于打印三位整数;2.2f表示总宽度为2个字符,且保留2位小数;.6f保留6位小数
# 全部轮次执行结束后画图
plt.plot(plt_train_loss)
plt.plot(plt_val_loss)
plt.title("loss")
plt.legend(["train", "val"]) # 图形中有两条曲线,分别标记为train和val
plt.show()
train_val(model, train_loader, val_loader, device, config["epochs"], optimizer, loss, config["save_path"])
# 用于测试的函数
def evaluate(save_path, device, test_loader, rel_path):
model = torch.load(save_path).to(device) # 加载模型并移到指定设备
rel = [] # 用于保存预测结果
model.eval() # 启用验证模式
with torch.no_grad(): # 禁用梯度计算
for x in test_loader: # 遍历测试集中的数据
# print(x)
# print(x.to(device))
pred = model(x.to(device)) # 对数据进行预测,移到设备上
# print(pred)
rel.append(pred.cpu().item()) # 将预测结果移回CPU,并转换为标量,最后添加到结果列表rel中
print(rel) # 打印预测结果
# 将预测结果保存到CSV文件
# 使用csv.writer时,csv默认行结束符为 \r\n,导致每写一行后会多出一个空行
with open(rel_path, "w", newline='') as f:
csv_writer = csv.writer(f)
csv_writer.writerow(["id", "tested_positive"]) # 写入表头,第一列是样本的ID,第二列是预测值
for i in range(len(rel)):
csv_writer.writerow([str(i), str(rel[i])]) # 写入每个预测值
print("文件已经保存到" + rel_path) # 打印保存路径
evaluate(config["save_path"], device, test_loader, config["rel_path"])