@浙大疏锦行 Python day45.
Tensorboard可以在运行过程中实时渲染来进行可视化
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import numpy as np
import matplotlib.pyplot as plt
import os
# 设置随机种子以确保结果可复现
torch.manual_seed(42)
np.random.seed(42)
BathSize = 64
EPOCHS = 100
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用device: {DEVICE}")
# 定义数据变化
train_transform = transforms([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.RandomRotation(15),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
test_transform = transforms([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# 加载cifa-10 dataset
train_dataset = datasets.CIFAR10(
root = './data',
train=True,
download=True,
transform=train_transform
)
test_dataset = datasets.CIFAR10(
root='./data'
train=False,
transform=test_transform
)
# dataloader
train_dataloader = DataLoader(
dataset= train_dataset,
shuffle=True,
batch_size=BathSize,
num_workers=2
)
test_dataloader = DataLoader(
dataset= test_dataset,
shuffle=False,
batch_size=BathSize,
num_workers=2
)
# CIFA-10类别
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
# 定义模型
class CNN(nn.Module):
def __init__(self):
super(CNN,self).__init__()
# --- layer 1 ---
self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1) # 卷积
self.bn1 = nn.BatchNorm2d(num_features=32) # 批量归一化
self.relu1 = nn.ReLU()
self.pool1 = nn.MaxPool2d(kernel_size = 2, stride = 2) # 32*32 -> 16*16
# layer2
self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
self.bn2 = nn.BatchNorm2d(num_features=64)
self.relu2 = nn.ReLU()
self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # 64 -> 32
# layer3
self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
self.bn3 = nn.BatchNorm2d(num_features=128)
self.relu3 = nn.ReLU()
self.pool3 = nn.MaxPool2d(kernel_size=2) # 尺寸减半:8x8→4x4
# 4 * 4 * 128 H W C
# self.conv4 = nn.Conv2d(in_channels=128, out_channels=10, kernel_size=1)
# self.pool4 = nn.MaxPool2d(kernel_size=2)
# self.pool5 = nn.MaxPool2d(kernel_size=2) # 1 * 1 * 10
self.fc1 = nn.Linear(in_features=128 * 4 * 4, out_features=512)
self.dropout = nn.Dropout(p=0.5)
self.fc2 = nn.Linear(in_features=512, out_features=10)
def forward(self, x):
# 输入尺寸:[batch_size, 3, 32, 32](batch_size=批量大小,3=通道数,32x32=图像尺寸)
# ---------- 卷积块1处理 ----------
x = self.conv1(x) # 卷积后尺寸:[batch_size, 32, 32, 32](padding=1保持尺寸)
x = self.bn1(x) # 批量归一化,不改变尺寸
x = self.relu1(x) # 激活函数,不改变尺寸
x = self.pool1(x) # 池化后尺寸:[batch_size, 32, 16, 16](32→16是因为池化窗口2x2)
# ---------- 卷积块2处理 ----------
x = self.conv2(x) # 卷积后尺寸:[batch_size, 64, 16, 16](padding=1保持尺寸)
x = self.bn2(x)
x = self.relu2(x)
x = self.pool2(x) # 池化后尺寸:[batch_size, 64, 8, 8]
# ---------- 卷积块3处理 ----------
x = self.conv3(x) # 卷积后尺寸:[batch_size, 128, 8, 8](padding=1保持尺寸)
x = self.bn3(x)
x = self.relu3(x)
x = self.pool3(x) # 池化后尺寸:[batch_size, 128, 4, 4]
# ---------- 展平与全连接层 ----------
# 将多维特征图展平为一维向量:[batch_size, 128*4*4] = [batch_size, 2048]
x = x.view(-1, 128 * 4 * 4) # -1自动计算批量维度,保持批量大小不变 展平
x = self.fc1(x) # 全连接层:2048→512,尺寸变为[batch_size, 512]
x = self.relu3(x) # 激活函数(复用relu3,与卷积块3共用)
x = self.dropout(x) # Dropout随机丢弃神经元,不改变尺寸
x = self.fc2(x) # 全连接层:512→10,尺寸变为[batch_size, 10](未激活,直接输出logits)
return x # 输出未经过Softmax的logits,适用于交叉熵损失函数
# 定义模型
model = CNN()
model.to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode='min', patience=3, factor=0.5, verbose=True)
# Tensorboard
log_dir = 'runs/cifar10_cnn_exp'
writer = SummaryWriter(log_dir=log_dir)
# 训练模型
def trian(model, train_dataloader, criterion, optimizer, scheduler, device, epochs, writer):
model.train()
all_iter_losses = []
iter_idx = []
global_steps = 0
# # (可选)记录模型结构:用一个真实样本走一遍前向传播,让 TensorBoard 解析计算图
# dataiter = iter(train_loader)
# images, labels = next(dataiter)
# images = images.to(device)
# writer.add_graph(model, images) # 写入模型结构到 TensorBoard
for epoch in range(epochs):
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (data,target) in enumerate(train_dataloader):
optimizer.zero_grad()
# 计算输出
output = model(data)
# 计算损失
loss = criterion(output, target)
# 反向传播
loss.backward()
# 修改参数
optimizer.step()
# 记录损失
iter_loss = loss.item()
all_iter_losses.append(iter_loss)
iter_idx.append(global_steps + 1)
running_loss += iter_loss
_, predicted = output.max(1)
total += target.size()
correct = predicted.eq(target).sum().item()
batch_acc = 100. * correct / total
writer.add_scalar('Train/Batch Loss', iter_loss, global_step)
writer.add_scalar('Train/Batch Accuracy', batch_acc, global_step)
# 记录学习率(可选)
writer.add_scalar('Train/Learning Rate', optimizer.param_groups[0]['lr'], global_step)
# 每 200 个 batch 记录一次参数直方图(可选,耗时稍高)
if (batch_idx + 1) % 200 == 0:
for name, param in model.named_parameters():
writer.add_histogram(f'Weights/{name}', param, global_step)
if param.grad is not None:
writer.add_histogram(f'Gradients/{name}', param.grad, global_step)
# 每 100 个 batch 打印控制台日志(同原代码)
if (batch_idx + 1) % 100 == 0:
print(f'Epoch: {epoch+1}/{epochs} | Batch: {batch_idx+1}/{len(train_loader)} '
f'| 单Batch损失: {iter_loss:.4f} | 累计平均损失: {running_loss/(batch_idx+1):.4f}')
global_step += 1 # 全局步骤递增
# 计算 epoch 级训练指标
epoch_train_loss = running_loss / len(train_dataloader)
epoch_train_acc = 100. * correct / total
# ======================== TensorBoard epoch 标量记录 ========================
writer.add_scalar('Train/Epoch Loss', epoch_train_loss, epoch)
writer.add_scalar('Train/Epoch Accuracy', epoch_train_acc, epoch)
def val(model, test_dataloader):
model.eval()
test_loss = 0
correct_test = 0
total_test = 0
wrong_images = [] # 存储错误预测样本(用于可视化)
wrong_labels = []
wrong_preds = []
for data, target in test_dataloader:
data, target = data.to(DEVICE), target.to(DEVICE)
output = model(data)
test_loss += criterion(output, target).item()
_, predicted = output.max(1)
total_test += target.size(0)
correct_test += predicted.eq(target).sum().item()