import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np
# 设置中文字体支持,避免绘图时中文乱码
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams['axes.unicode_minus'] = False
# 检查 GPU 是否可用,优先使用 GPU 加速训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# 1. 数据预处理(调整为 128×128 尺寸,适配数据增强)
# 训练集数据增强:先随机裁剪(从原图多样本中截取),再 resize 到 128×128;或直接随机裁剪到目标尺寸(若原图足够大)
# 这里演示更灵活的方式:先随机从原图取一块,再统一成 128×128(假设原图有足够尺寸支撑裁剪,否则需调整策略)
train_transform = transforms.Compose([
transforms.RandomResizedCrop(128, scale=(0.6, 1.0)), # 随机裁剪并 resize 到 128×128,scale 控制裁剪区域占原图比例
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 颜色抖动
transforms.RandomRotation(15), # 随机旋转
transforms.ToTensor(), # 转为张量
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)) # 标准化(可根据实际数据统计均值、方差优化)
])
# 测试集预处理:直接 resize 到 128×128,保证尺寸统一
test_transform = transforms.Compose([
transforms.Resize((128, 128)), # 统一 resize 到 128×128
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
# 2、用全部数据拆分(示例按 8:2 拆分训练集和测试集)
full_dataset = datasets.ImageFolder(
root=r"D:\python_learning\cyl_python\day43CNN_kaggle\BengaliFishImages\fish_images",
transform=train_transform
)
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])
# 3. 创建数据加载器,按批次加载数据
batch_size = 32 # 因图像尺寸变大,可适当减小 batch_size 避免显存不足
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 4. 定义适配 128×128 尺寸的 CNN 模型(需重新计算全连接层输入维度)
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
# 第一个卷积块:输入 3 通道,输出 32 通道
self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
self.bn1 = nn.BatchNorm2d(32)
self.relu1 = nn.ReLU()
self.pool1 = nn.MaxPool2d(2, 2) # 128 -> 64
# 第二个卷积块:输入 32 通道,输出 64 通道
self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
self.bn2 = nn.BatchNorm2d(64)
self.relu2 = nn.ReLU()
self.pool2 = nn.MaxPool2d(2) # 64 -> 32
# 第三个卷积块:输入 64 通道,输出 128 通道
self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
self.bn3 = nn.BatchNorm2d(128)
self.relu3 = nn.ReLU()
self.pool3 = nn.MaxPool2d(2) # 32 -> 16
# 第四个卷积块(可选,进一步提取特征,若觉得网络浅可添加)
self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
self.bn4 = nn.BatchNorm2d(256)
self.relu4 = nn.ReLU()
self.pool4 = nn.MaxPool2d(2) # 16 -> 8
# 计算全连接层输入维度:根据卷积后输出尺寸推算
# 经过上述 4 次池化(若用 3 次,需对应调整),假设用 4 次池化,最终特征图尺寸是 8×8
# 若卷积块数量变化,需重新计算:比如 3 次池化,尺寸是 128/(2^3)=16,那维度是 128*16*16
self.fc1 = nn.Linear(256 * 8 * 8, 512) # 这里按 4 次池化到 8×8 计算
self.dropout = nn.Dropout(p=0.5)
self.fc2 = nn.Linear(512, 20) # 20 分类,匹配数据集类别数
def forward(self, x):
# 卷积块 1 前向
x = self.conv1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.pool1(x)
# 卷积块 2 前向
x = self.conv2(x)
x = self.bn2(x)
x = self.relu2(x)
x = self.pool2(x)
# 卷积块 3 前向
x = self.conv3(x)
x = self.bn3(x)
x = self.relu3(x)
x = self.pool3(x)
# 卷积块 4 前向(若添加了此块)
x = self.conv4(x)
x = self.bn4(x)
x = self.relu4(x)
x = self.pool4(x)
# 展平特征图,进入全连接层
x = x.view(-1, 256 * 8 * 8) # 与 __init__ 中 fc1 输入维度对应
x = self.fc1(x)
x = self.relu3(x)
x = self.dropout(x)
x = self.fc2(x)
return x
# 初始化模型,并移动到 GPU(若可用)
model = CNN().to(device)
# 定义损失函数、优化器、学习率调度器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', patience=3, factor=0.5
)
# 5. 训练与测试函数(复用逻辑,无需修改)
def train(model, train_loader, test_loader, criterion, optimizer, scheduler, device, epochs):
model.train() # 设置为训练模式
all_iter_losses = [] # 记录每个 batch 的损失
iter_indices = [] # 记录 iteration 序号
train_acc_history = [] # 训练集准确率历史
test_acc_history = [] # 测试集准确率历史
train_loss_history = [] # 训练集损失历史
test_loss_history = [] # 测试集损失历史
for epoch in range(epochs):
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device) # 数据移到 GPU
optimizer.zero_grad() # 梯度清零
output = model(data) # 前向传播
loss = criterion(output, target) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
# 记录当前 iteration 损失
iter_loss = loss.item()
all_iter_losses.append(iter_loss)
iter_indices.append(epoch * len(train_loader) + batch_idx + 1)
# 统计训练集准确率
running_loss += iter_loss
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
# 每 100 个批次打印训练信息
if (batch_idx + 1) % 100 == 0:
print(f'Epoch: {epoch+1}/{epochs} | Batch: {batch_idx+1}/{len(train_loader)} '
f'| 单Batch损失: {iter_loss:.4f} | 累计平均损失: {running_loss/(batch_idx+1):.4f}')
# 计算当前 epoch 训练集指标
epoch_train_loss = running_loss / len(train_loader)
epoch_train_acc = 100. * correct / total
train_acc_history.append(epoch_train_acc)
train_loss_history.append(epoch_train_loss)
# 测试阶段
model.eval() # 切换为评估模式
test_loss = 0
correct_test = 0
total_test = 0
with torch.no_grad(): # 测试时无需计算梯度
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += criterion(output, target).item()
_, predicted = output.max(1)
total_test += target.size(0)
correct_test += predicted.eq(target).sum().item()
# 计算当前 epoch 测试集指标
epoch_test_loss = test_loss / len(test_loader)
epoch_test_acc = 100. * correct_test / total_test
test_acc_history.append(epoch_test_acc)
test_loss_history.append(epoch_test_loss)
# 更新学习率调度器
scheduler.step(epoch_test_loss)
print(f'Epoch {epoch+1}/{epochs} 完成 | 训练准确率: {epoch_train_acc:.2f}% | 测试准确率: {epoch_test_acc:.2f}%')
# 绘制 iteration 损失曲线
plot_iter_losses(all_iter_losses, iter_indices)
# 绘制 epoch 指标曲线
plot_epoch_metrics(train_acc_history, test_acc_history, train_loss_history, test_loss_history)
return epoch_test_acc
# 6. 绘图函数(可视化训练过程,无需修改)
def plot_iter_losses(losses, indices):
plt.figure(figsize=(10, 4))
plt.plot(indices, losses, 'b-', alpha=0.7, label='Iteration Loss')
plt.xlabel('Iteration(Batch序号)')
plt.ylabel('损失值')
plt.title('每个 Iteration 的训练损失')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def plot_epoch_metrics(train_acc, test_acc, train_loss, test_loss):
epochs = range(1, len(train_acc) + 1)
plt.figure(figsize=(12, 4))
# 绘制准确率曲线
plt.subplot(1, 2, 1)
plt.plot(epochs, train_acc, 'b-', label='训练准确率')
plt.plot(epochs, test_acc, 'r-', label='测试准确率')
plt.xlabel('Epoch')
plt.ylabel('准确率 (%)')
plt.title('训练和测试准确率')
plt.legend()
plt.grid(True)
# 绘制损失曲线
plt.subplot(1, 2, 2)
plt.plot(epochs, train_loss, 'b-', label='训练损失')
plt.plot(epochs, test_loss, 'r-', label='测试损失')
plt.xlabel('Epoch')
plt.ylabel('损失值')
plt.title('训练和测试损失')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# 7. 执行训练(根据显存情况调整 epochs 和 batch_size )
epochs = 80 # 图像尺寸大,训练慢,可先小批次试训
print("开始使用 CNN 训练自定义鱼类数据集(128×128 尺寸)...")
final_accuracy = train(model, train_loader, test_loader, criterion, optimizer, scheduler, device, epochs)
print(f"训练完成!最终测试准确率: {final_accuracy:.2f}%")
# 保存模型(可选,根据需求决定是否保存)
# torch.save(model.state_dict(), 'bengali_fish_cnn_128.pth')
import warnings
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
import numpy as np
import torch
from torchvision import transforms
from PIL import Image
# 设置中文字体支持
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# Grad-CAM实现
class GradCAM:
def __init__(self, model, target_layer):
self.model = model
self.target_layer = target_layer
self.feature_maps = None
self.gradient = None
# 注册钩子
self.hook_handles = []
# 保存特征图的正向钩子
def forward_hook(module, input, output):
self.feature_maps = output.detach()
# 保存梯度的反向钩子
def backward_hook(module, grad_in, grad_out):
self.gradient = grad_out[0].detach()
self.hook_handles.append(target_layer.register_forward_hook(forward_hook))
self.hook_handles.append(target_layer.register_backward_hook(backward_hook))
# 设置为评估模式
self.model.eval()
def __call__(self, x):
# 前向传播
x = x.to(device)
self.model.zero_grad()
output = self.model(x)
# 获取预测类别
pred_class = torch.argmax(output, dim=1).item()
# 反向传播
one_hot = torch.zeros_like(output)
one_hot[0, pred_class] = 1
output.backward(gradient=one_hot, retain_graph=True)
# 计算权重 (全局平均池化梯度)
weights = torch.mean(self.gradient, dim=(2, 3), keepdim=True)
# 加权组合特征图
cam = torch.sum(weights * self.feature_maps, dim=1).squeeze()
# ReLU激活,因为我们只关心对预测有积极贡献的区域
cam = torch.relu(cam)
# 归一化
if torch.max(cam) > 0:
cam = cam / torch.max(cam)
# 调整为图像大小
cam = torch.nn.functional.interpolate(
cam.unsqueeze(0).unsqueeze(0),
size=(128, 128), # 匹配图像尺寸
mode='bilinear',
align_corners=False
).squeeze()
return cam.cpu().numpy(), pred_class
def remove_hooks(self):
for handle in self.hook_handles:
handle.remove()
# 转换图像以便可视化
def tensor_to_np(tensor):
img = tensor.cpu().numpy().transpose(1, 2, 0)
# 使用实际训练时的均值和标准差
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
img = std * img + mean # 反标准化
img = np.clip(img, 0, 1) # 确保像素值在[0,1]范围内
return img
# 选择一个随机图像
# idx = np.random.randint(len(test_dataset))
idx = 110 # 选择测试集中的第103张图片 (索引从0开始)
image, label = test_dataset[idx]
# 获取类别名称(根据ImageFolder自动映射的类别顺序)
classes = test_dataset.dataset.classes # 注意:如果使用了random_split,需要从原始dataset获取classes
print(f"选择的图像类别: {classes[label]}")
# 添加批次维度并移动到设备
input_tensor = image.unsqueeze(0).to(device)
# 初始化Grad-CAM(使用model.conv3作为目标层)
# grad_cam = GradCAM(model, model.conv3)
grad_cam = GradCAM(model, model.conv4)
# 生成热力图(修正方法调用)
heatmap, pred_class = grad_cam(input_tensor) # 直接调用对象,而非使用generate_cam方法
# 可视化
plt.figure(figsize=(15, 5))
# 原始图像
plt.subplot(1, 3, 1)
plt.imshow(tensor_to_np(image))
plt.title(f"原始图像: {classes[label]}")
plt.axis('off')
# 热力图
plt.subplot(1, 3, 2)
plt.imshow(heatmap, cmap='jet')
plt.title(f"Grad-CAM热力图: {classes[pred_class]}")
plt.axis('off')
# 叠加的图像
plt.subplot(1, 3, 3)
img = tensor_to_np(image)
heatmap_resized = np.uint8(255 * heatmap)
heatmap_colored = plt.cm.jet(heatmap_resized)[:, :, :3]
superimposed_img = heatmap_colored * 0.4 + img * 0.6
plt.imshow(superimposed_img)
plt.title("叠加热力图")
plt.axis('off')
plt.tight_layout()
plt.savefig('grad_cam_result_128_crop_100.png')
plt.show()
print("Grad-CAM可视化完成。已保存为grad_cam_result_128_crop_100.png")
@浙大疏锦行