文章目录
迁移学习是什么?
迁移学习(Transfer Learning)是机器学习中的一个重要概念,旨在将从一个任务(源任务)中学习到的知识应用到另一个相关的任务(目标任务)中,以提高目标任务的学习效率和性能。
在传统的机器学习中,模型通常是在特定的数据集上进行训练,并且只能在相似的任务上表现良好。如果遇到新的任务,就需要重新收集和标注大量的数据,并从头开始训练模型。迁移学习的出现,为解决这些问题提供了一种有效的方法。
迁移学习的基本思想是:不同任务之间可能存在一些共同的特征或模式,通过学习源任务中的这些共同部分,可以将其迁移到目标任务中,从而减少目标任务对大量标注数据的依赖。
- 迁移学习的方法主要有以下几类:
- 基于实例的迁移学习:选择源任务中与目标任务相似的实例,将其用于目标任务的训练。例如,在图像分类中,可以选择与目标图像相似的源图像及其标签,来辅助目标任务的学习。
- 基于同构空间的迁移学习:在源任务和目标任务的特征空间之间建立映射关系,使得源任务的知识能够在目标任务的特征空间中得到应用。
- 基于模型的迁移学习:将在源任务上预训练好的模型参数,迁移到目标任务中进行微调。例如,在自然语言处理中,常常使用预训练的语言模型(如 BERT),然后在特定的任务(如文本分类、情感分析)上进行微调。
- 基于关系的迁移学习:学习源任务和目标任务之间的关系,利用这种关系来进行知识的迁移。
迁移学习的步骤
- 选择预训练的模型和适当的层:通常,我们会选择在大规模图像数据集(如 ImageNet)上预训练的模型,如 VGG、ResNet 等。然后,根据新数据集的特点,选择需要微调的模型层。对于低级特征的任务(如边缘检测),最好使用浅层模型的层,而对于高级特征的任务(如分类),则应选择更深层次的模型。
- 冻结预训练模型的参数:保持预训练模型的权重不变,只训练新增加的层或者微调一些层,避免因为在数据集中过拟合导致预训练模型过度拟合。
- 在新数据集上训练新增加的层:在冻结预训练模型的参数情况下,训练新增加的层。这样,可以使新模型适应新的任务,从而获得更高的性能。
- 微调预训练模型的层:在新层上进行训练后,可以解冻一些已经训练过的层,并且将它们作为微调的目标。这样做可以提高模型在新数据集上的性能。
- 评估和测试:在训练完成之后,使用测试集对模型进行评估。如果模型的性能仍然不够好,可以尝试调整超参数或者更改微调层。
迁移学习实例
需求:使用ResNet-18模型对食物图片进行分类处理
共有20个类别,图片保存在food_dataset2的test、train目录下:
导入所需库
import torch
import torchvision.models as models
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import numpy as np
import os
定义生成训练集和测试集的文本文件的函数并生成文件
# 定义一个函数,用于生成训练集和测试集的文本文件,文件中包含图像路径和对应的标签
def train_test_file(root, dir):
# 打开一个文本文件用于写入图像路径和标签
file_txt = open(dir + '.txt', 'w')
# 拼接根目录和子目录(train或test)的路径
path = os.path.join(root, dir)
# 遍历指定路径下的所有文件和文件夹
for roots, directories, files in os.walk(path):
if len(directories) != 0:
# 如果存在子文件夹,记录这些子文件夹的名称,作为类别标签
dirs = directories
else:
# 获取当前文件夹的名称
now_dir = roots.split('\\')
for file in files:
# 拼接图像文件的完整路径
path_1 = os.path.join(roots, file)
print(path_1)
# 将图像路径和对应的类别标签写入文本文件
file_txt.write(path_1 + ' ' + str(dirs.index(now_dir[-1])) + '\n')
# 关闭文件
file_txt.close()
# 定义数据集的根目录
root = r'.\食物分类\food_dataset2'
# 定义训练集文件夹名称
train_dir = 'train'
# 定义测试集文件夹名称
test_dir = 'test'
# 生成训练集的文本文件
train_test_file(root, train_dir)
# 生成测试集的文本文件
train_test_file(root, test_dir)
加载ResNet-18模型
# 加载预训练的ResNet-18模型
resnet_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
冻结参数
# 遍历模型的所有参数
for param in resnet_model.parameters():
# 打印参数
print(param)
# 冻结参数,使其在训练过程中不更新
param.requires_grad = False
修改模型
# 获取全连接层的输入特征数
in_features = resnet_model.fc.in_features
# 替换全连接层,将输出维度改为20,以适应20个类别的分类任务
resnet_model.fc = nn.Linear(in_features, 20)
更新参数
# 定义一个列表,用于存储需要更新的参数
param_to_updata = []
# 遍历模型的所有参数
for param in resnet_model.parameters():
if param.requires_grad == True:
# 如果参数需要更新,则将其添加到列表中
param_to_updata.append(param)
定义数据预处理转换
# 定义训练集和验证集的数据预处理转换
data_transforms = {
'train':
transforms.Compose([
# 将图像调整为300x300的大小
transforms.Resize([300, 300]),
# 随机旋转图像,旋转角度在-45到45度之间
transforms.RandomRotation(45),
# 从图像中心裁剪出256x256的区域
transforms.CenterCrop(256),
# 以0.5的概率随机水平翻转图像
transforms.RandomHorizontalFlip(p=0.5),
# 以0.5的概率随机垂直翻转图像
transforms.RandomVerticalFlip(p=0.5),
# 随机调整图像的亮度、对比度、饱和度和色调
transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1),
# 以0.1的概率将图像转换为灰度图
transforms.RandomGrayscale(p=0.1),
# 将图像转换为张量
transforms.ToTensor(),
# 对图像进行归一化处理
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'valid':
transforms.Compose([
# 将图像调整为256x256的大小
transforms.Resize([256, 256]),
# 将图像转换为张量
transforms.ToTensor(),
# 对图像进行归一化处理
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
}
定义自定义数据集类
# 定义自定义数据集类
class food_dataset(Dataset):
def __init__(self, file_path, transform=None):
# 存储文本文件的路径
self.file_path = file_path
# 存储图像路径的列表
self.imgs = []
# 存储图像标签的列表
self.labels = []
# 存储数据预处理转换
self.transform = transform
# 打开文本文件
with open(self.file_path) as f:
# 读取文本文件中的每一行,并按空格分割成图像路径和标签
samples = [x.strip().split(' ') for x in f.readlines()]
for img_path, label in samples:
# 将图像路径添加到列表中
self.imgs.append(img_path)
# 将标签添加到列表中
self.labels.append(label)
def __len__(self):
# 返回数据集的大小
return len(self.imgs)
def __getitem__(self, idx):
# 打开图像文件
image = Image.open(self.imgs[idx])
if self.transform:
# 对图像进行预处理转换
image = self.transform(image)
# 获取图像的标签
label = self.labels[idx]
# 将标签转换为PyTorch张量
label = torch.from_numpy(np.array(label, dtype=np.int64))
return image, label
创建训练集和测试集数据集对象
# 创建训练集数据集对象
training_data = food_dataset(file_path='./train.txt', transform=data_transforms['train'])
# 创建测试集数据集对象
test_data = food_dataset(file_path='./test.txt', transform=data_transforms['valid'])
创建数据加载器
# 创建训练集数据加载器,设置批量大小为64,并打乱数据顺序
train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
# 创建测试集数据加载器,设置批量大小为64,并打乱数据顺序
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)
判断是否使用GPU并将模型移动到GPU
# 判断是否支持GPU,如果支持则使用CUDA,否则检查是否支持MPS,若都不支持则使用CPU
device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
print(f'Using {device} device')
# 将模型移动到指定设备上
model = resnet_model.to(device)
创建损失函数、优化器、学习调度器
# 创建交叉熵损失函数对象,用于多分类任务
loss_fn = nn.CrossEntropyLoss()
# 创建Adam优化器,只更新需要更新的参数,学习率为0.001
optimizer = torch.optim.Adam(param_to_updata, lr=0.001)
# 创建学习率调度器,每5个epoch将学习率乘以0.5
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
定义训练函数和测试函数
# 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
# 将模型设置为训练模式
model.train()
# 记录当前批次的编号
batch_size_num = 1
for x, y in dataloader: # 遍历数据加载器中的每个批次
# 将输入数据和标签移动到指定设备上
x, y = x.to(device), y.to(device)
# 前向传播,计算模型的预测结果
pred = model.forward(x)
# 计算损失值
loss = loss_fn(pred, y)
# 梯度清零
optimizer.zero_grad()
# 反向传播,计算梯度
loss.backward()
# 根据梯度更新模型参数
optimizer.step()
# 获取损失值
loss_value = loss.item()
if batch_size_num % 10 == 0:
# 每10个批次打印一次损失值
print(f'loss:{loss_value:7f} [number:{batch_size_num}]')
batch_size_num += 1
# 定义最优准确率
best_acc = 0
# 定义测试函数
def test(dataloader, model, loss_fn):
global best_acc
# 获取数据集的大小
size = len(dataloader.dataset)
# 获取数据加载器中的批次数量
num_batches = len(dataloader)
# 将模型设置为评估模式
model.eval()
# 初始化测试损失和正确预测的数量
test_loss, correct = 0, 0
# 关闭梯度计算
with torch.no_grad():
for x, y in dataloader:
# 将输入数据和标签移动到指定设备上
x, y = x.to(device), y.to(device)
# 前向传播,计算模型的预测结果
pred = model.forward(x)
# 累加测试损失
test_loss += loss_fn(pred, y).item()
# 计算正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算平均测试损失
test_loss /= num_batches
# 计算准确率
correct /= size
if correct > best_acc:
# 如果当前准确率高于最优准确率,则更新最优准确率
best_acc = correct
# 打印模型的参数名称
print(model.state_dict().keys())
# 保存模型的参数
torch.save(model.state_dict(), 'best_resnet.pth')
# 保存整个模型
torch.save(model, 'best1_resnet.pt')
# 打印测试结果
print(f'Test result: \n Accuracy:{(100 * correct)}%,Avg loss:{test_loss}')
训练模型
# 定义训练的轮数
epochs = 30
# 用于存储每一轮的准确率
acc_s = []
# 用于存储每一轮的损失值
loss_s = []
for t in range(epochs):
print(f'epoch{t + 1}\n--------------------')
# 训练模型
train(train_dataloader, model, loss_fn, optimizer)
# 更新学习率
scheduler.step()
# 测试模型
test(test_dataloader, model, loss_fn)
# 打印最优训练结果
print('最优训练结果:', best_acc)