重复以下内容
作业:
kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化
进阶:
并拆分成多个文件
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
from PIL import Image # 添加缺失的导入
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
# 设置随机种子确保结果可复现
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)
class CustomDataset(Dataset):
def __init__(self, image_dir, label_dir, transform=None):
self.image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
self.label_paths = sorted([os.path.join(label_dir, f) for f in os.listdir(label_dir) if f.lower().endswith('.txt')])
self.transform = transform
# 确保图像和标签数量匹配
assert len(self.image_paths) == len(self.label_paths), "图像数量与标签数量不匹配"
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
image = Image.open(self.image_paths[idx]).convert('RGB')
with open(self.label_paths[idx], 'r') as f:
label = int(f.read().strip()) # 假设标签文件中是整数类别
if self.transform:
image = self.transform(image)
return image, label
class CustomDataset(Dataset):
def __init__(self, image_dir, label_dir, transform=None, debug=False):
self.image_dir = image_dir
self.label_dir = label_dir
self.transform = transform
self.debug = debug
# 获取图像和标签文件列表
self.image_files = sorted([f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
self.label_files = sorted([f for f in os.listdir(label_dir) if f.lower().endswith('.txt')])
# 打印调试信息
if self.debug:
print(f"图像目录: {image_dir}")
print(f"标签目录: {label_dir}")
print(f"找到 {len(self.image_files)} 个图像文件")
print(f"找到 {len(self.label_files)} 个标签文件")
# 打印前10个文件检查排序是否匹配
print("\n前10个图像文件:")
for f in self.image_files[:10]:
print(f" {f}")
print("\n前10个标签文件:")
for f in self.label_files[:10]:
print(f" {f}")
# 确保图像和标签数量匹配
assert len(self.image_files) == len(self.label_files), \
f"图像数量({len(self.image_files)})与标签数量({len(self.label_files)})不匹配"
# 创建文件映射(假设文件名除去扩展名后相同)
self.image_to_label = {}
for img_file in self.image_files:
# 提取图像文件名(不含扩展名)
img_base = os.path.splitext(img_file)[0]
# 查找对应的标签文件
found = False
for lbl_file in self.label_files:
lbl_base = os.path.splitext(lbl_file)[0]
if img_base == lbl_base:
self.image_to_label[img_file] = lbl_file
found = True
break
if not found and self.debug:
print(f"警告: 找不到图像 '{img_file}' 对应的标签文件")
# 再次确认所有图像都有对应的标签
assert len(self.image_to_label) == len(self.image_files), \
f"只有 {len(self.image_to_label)} 个图像找到了对应的标签,总数应为 {len(self.image_files)}"
if self.debug:
print(f"成功建立 {len(self.image_to_label)} 个图像-标签映射")
def __len__(self):
return len(self.image_files)
def __getitem__(self, idx):
img_file = self.image_files[idx]
lbl_file = self.image_to_label[img_file]
image_path = os.path.join(self.image_dir, img_file)
label_path = os.path.join(self.label_dir, lbl_file)
image = Image.open(image_path).convert('RGB')
with open(label_path, 'r') as f:
label = int(f.read().strip())
if self.transform:
image = self.transform(image)
return image, label
# 数据路径配置
data_dir = r"C:\Users\许兰\Desktop\打卡文件\mix20230204" # 替换为你的数据集路径
train_image_dir = os.path.join(data_dir, 'train/images')
train_label_dir = os.path.join(data_dir, 'train/labels')
val_image_dir = os.path.join(data_dir, 'validation/images')
val_label_dir = os.path.join(data_dir, 'validation/labels')
test_image_dir = os.path.join(data_dir, 'test/images')
test_label_dir = os.path.join(data_dir, 'test/labels')
# 数据预处理和增强
data_transforms = {
'train': transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(10),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'val': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'test': transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
# 使用自定义数据集类加载数据
image_datasets = {
'train': CustomDataset(train_image_dir, train_label_dir, data_transforms['train']),
'val': CustomDataset(val_image_dir, val_label_dir, data_transforms['val']),
'test': CustomDataset(test_image_dir, test_label_dir, data_transforms['test'])
}
# 创建数据加载器
batch_size = 32
dataloaders = {
'train': DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=True, num_workers=4),
'val': DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=False, num_workers=4),
'test': DataLoader(image_datasets['test'], batch_size=batch_size, shuffle=False, num_workers=4)
}
# 获取类别数量(假设类别从0开始连续编号)
num_classes = len(set([label for _, label in image_datasets['train']]))
print(f"数据集包含 {num_classes} 个类别")
# 检查GPU是否可用
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# 定义CNN模型
class CNNModel(nn.Module):
def __init__(self, num_classes):
super(CNNModel, self).__init__()
# 特征提取部分
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(256, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
)
# 分类部分
self.classifier = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(512 * 7 * 7, 4096),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, num_classes)
)
# 初始化权重
self._initialize_weights()
def forward(self, x):
x = self.features(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.constant_(m.bias, 0)
# 初始化模型
model = CNNModel(num_classes)
model = model.to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
# 训练模型
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
best_model_wts = model.state_dict()
best_acc = 0.0
# 记录训练过程
history = {
'train_loss': [], 'train_acc': [],
'val_loss': [], 'val_acc': []
}
for epoch in range(num_epochs):
print(f'Epoch {epoch+1}/{num_epochs}')
print('-' * 10)
# 每个epoch有训练和验证阶段
for phase in ['train', 'val']:
if phase == 'train':
model.train() # 训练模式
else:
model.eval() # 评估模式
running_loss = 0.0
running_corrects = 0
# 迭代数据
for inputs, labels in tqdm(dataloaders[phase]):
inputs = inputs.to(device)
labels = labels.to(device)
# 梯度归零
optimizer.zero_grad()
# 前向传播
# 只有在训练时才跟踪历史
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = criterion(outputs, labels)
# 只有在训练阶段才反向传播+优化
if phase == 'train':
loss.backward()
optimizer.step()
# 统计
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train' and scheduler is not None:
scheduler.step()
epoch_loss = running_loss / len(dataloaders[phase].dataset)
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
# 记录历史
history[f'{phase}_loss'].append(epoch_loss)
history[f'{phase}_acc'].append(epoch_acc.item())
# 保存最佳模型
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = model.state_dict()
print(f'保存最佳模型,准确率: {best_acc:.4f}')
print()
# 加载最佳模型权重
model.load_state_dict(best_model_wts)
return model, history
# 训练模型
num_epochs = 25
model, history = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs)
# 在测试集上评估模型
def evaluate_model(model, dataloader):
model.eval()
running_corrects = 0
all_preds = []
all_labels = []
with torch.no_grad():
for inputs, labels in dataloader:
inputs = inputs.to(device)
labels = labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
running_corrects += torch.sum(preds == labels.data)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
accuracy = running_corrects.double() / len(dataloader.dataset)
print(f'测试集准确率: {accuracy:.4f}')
return accuracy.item(), all_preds, all_labels
# 评估模型
test_accuracy, predictions, true_labels = evaluate_model(model, dataloaders['test'])
# 可视化训练过程
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Loss Over Time')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Training Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.title('Accuracy Over Time')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.savefig('training_history.png')
plt.show()
# 保存模型
torch.save(model.state_dict(), 'cnn_image_classifier.pth')
print("模型已保存为 'cnn_image_classifier.pth'")
# 可选:使用预训练模型进行迁移学习
def use_pretrained_model(num_classes):
# 加载预训练的ResNet18
model_ft = models.resnet18(pretrained=True)
# 冻结部分层
for param in list(model_ft.parameters())[:-4]:
param.requires_grad = False
# 修改最后的全连接层
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, num_classes)
return model_ft.to(device)