打卡第44天:无人机数据集分类

发布于:2025-06-14 ⋅ 阅读:(17) ⋅ 点赞:(0)

重复以下内容

作业:
kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化

进阶:
并拆分成多个文件

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
from PIL import Image  # 添加缺失的导入
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm

# 设置随机种子确保结果可复现
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

class CustomDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None):
        self.image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
        self.label_paths = sorted([os.path.join(label_dir, f) for f in os.listdir(label_dir) if f.lower().endswith('.txt')])
        self.transform = transform
        
        # 确保图像和标签数量匹配
        assert len(self.image_paths) == len(self.label_paths), "图像数量与标签数量不匹配"
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        with open(self.label_paths[idx], 'r') as f:
            label = int(f.read().strip())  # 假设标签文件中是整数类别
        
        if self.transform:
            image = self.transform(image)
            
        return image, label
class CustomDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None, debug=False):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        self.debug = debug
        
        # 获取图像和标签文件列表
        self.image_files = sorted([f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
        self.label_files = sorted([f for f in os.listdir(label_dir) if f.lower().endswith('.txt')])
        
        # 打印调试信息
        if self.debug:
            print(f"图像目录: {image_dir}")
            print(f"标签目录: {label_dir}")
            print(f"找到 {len(self.image_files)} 个图像文件")
            print(f"找到 {len(self.label_files)} 个标签文件")
            
            # 打印前10个文件检查排序是否匹配
            print("\n前10个图像文件:")
            for f in self.image_files[:10]:
                print(f"  {f}")
                
            print("\n前10个标签文件:")
            for f in self.label_files[:10]:
                print(f"  {f}")
        
        # 确保图像和标签数量匹配
        assert len(self.image_files) == len(self.label_files), \
            f"图像数量({len(self.image_files)})与标签数量({len(self.label_files)})不匹配"
        
        # 创建文件映射(假设文件名除去扩展名后相同)
        self.image_to_label = {}
        for img_file in self.image_files:
            # 提取图像文件名(不含扩展名)
            img_base = os.path.splitext(img_file)[0]
            
            # 查找对应的标签文件
            found = False
            for lbl_file in self.label_files:
                lbl_base = os.path.splitext(lbl_file)[0]
                if img_base == lbl_base:
                    self.image_to_label[img_file] = lbl_file
                    found = True
                    break
            
            if not found and self.debug:
                print(f"警告: 找不到图像 '{img_file}' 对应的标签文件")
        
        # 再次确认所有图像都有对应的标签
        assert len(self.image_to_label) == len(self.image_files), \
            f"只有 {len(self.image_to_label)} 个图像找到了对应的标签,总数应为 {len(self.image_files)}"
        
        if self.debug:
            print(f"成功建立 {len(self.image_to_label)} 个图像-标签映射")
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        img_file = self.image_files[idx]
        lbl_file = self.image_to_label[img_file]
        
        image_path = os.path.join(self.image_dir, img_file)
        label_path = os.path.join(self.label_dir, lbl_file)
        
        image = Image.open(image_path).convert('RGB')
        with open(label_path, 'r') as f:
            label = int(f.read().strip())
        
        if self.transform:
            image = self.transform(image)
            
        return image, label
# 数据路径配置
data_dir = r"C:\Users\许兰\Desktop\打卡文件\mix20230204"  # 替换为你的数据集路径
train_image_dir = os.path.join(data_dir, 'train/images')
train_label_dir = os.path.join(data_dir, 'train/labels')
val_image_dir = os.path.join(data_dir, 'validation/images')
val_label_dir = os.path.join(data_dir, 'validation/labels')
test_image_dir = os.path.join(data_dir, 'test/images')
test_label_dir = os.path.join(data_dir, 'test/labels')

# 数据预处理和增强
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# 使用自定义数据集类加载数据
image_datasets = {
    'train': CustomDataset(train_image_dir, train_label_dir, data_transforms['train']),
    'val': CustomDataset(val_image_dir, val_label_dir, data_transforms['val']),
    'test': CustomDataset(test_image_dir, test_label_dir, data_transforms['test'])
}

# 创建数据加载器
batch_size = 32
dataloaders = {
    'train': DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=True, num_workers=4),
    'val': DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=False, num_workers=4),
    'test': DataLoader(image_datasets['test'], batch_size=batch_size, shuffle=False, num_workers=4)
}

# 获取类别数量(假设类别从0开始连续编号)
num_classes = len(set([label for _, label in image_datasets['train']]))
print(f"数据集包含 {num_classes} 个类别")

# 检查GPU是否可用
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

# 定义CNN模型
class CNNModel(nn.Module):
    def __init__(self, num_classes):
        super(CNNModel, self).__init__()
        # 特征提取部分
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        # 分类部分
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes)
        )
        
        # 初始化权重
        self._initialize_weights()
    
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

# 初始化模型
model = CNNModel(num_classes)
model = model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# 训练模型
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):
    best_model_wts = model.state_dict()
    best_acc = 0.0
    
    # 记录训练过程
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # 每个epoch有训练和验证阶段
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # 训练模式
            else:
                model.eval()   # 评估模式
            
            running_loss = 0.0
            running_corrects = 0
            
            # 迭代数据
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # 梯度归零
                optimizer.zero_grad()
                
                # 前向传播
                # 只有在训练时才跟踪历史
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    # 只有在训练阶段才反向传播+优化
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # 统计
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            if phase == 'train' and scheduler is not None:
                scheduler.step()
            
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # 记录历史
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())
            
            # 保存最佳模型
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()
                print(f'保存最佳模型,准确率: {best_acc:.4f}')
        
        print()
    
    # 加载最佳模型权重
    model.load_state_dict(best_model_wts)
    return model, history

# 训练模型
num_epochs = 25
model, history = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs)

# 在测试集上评估模型
def evaluate_model(model, dataloader):
    model.eval()
    running_corrects = 0
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            running_corrects += torch.sum(preds == labels.data)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = running_corrects.double() / len(dataloader.dataset)
    print(f'测试集准确率: {accuracy:.4f}')
    
    return accuracy.item(), all_preds, all_labels

# 评估模型
test_accuracy, predictions, true_labels = evaluate_model(model, dataloaders['test'])

# 可视化训练过程
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Loss Over Time')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Training Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.title('Accuracy Over Time')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.savefig('training_history.png')
plt.show()

# 保存模型
torch.save(model.state_dict(), 'cnn_image_classifier.pth')
print("模型已保存为 'cnn_image_classifier.pth'")

# 可选:使用预训练模型进行迁移学习
def use_pretrained_model(num_classes):
    # 加载预训练的ResNet18
    model_ft = models.resnet18(pretrained=True)
    
    # 冻结部分层
    for param in list(model_ft.parameters())[:-4]:
        param.requires_grad = False
    
    # 修改最后的全连接层
    num_ftrs = model_ft.fc.in_features
    model_ft.fc = nn.Linear(num_ftrs, num_classes)
    
    return model_ft.to(device)