DAY 37 早停策略和模型权重的保存

发布于:2025-06-22 ⋅ 阅读:(11) ⋅ 点赞:(0)

DAY 37 早停策略和模型权重的保存

import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')

iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(4, 10)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(10, 3)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
losses = []
epochs = []
start_time = time.time()

with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
    for epoch in range(num_epochs):
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 200 == 0:
            losses.append(loss.item())
            epochs.append(epoch + 1)
            pbar.set_postfix({'Loss': f'{loss.item():.4f}'})

        if (epoch + 1) % 1000 == 0:
            pbar.update(1000)

    if pbar.n < num_epochs:
        pbar.update(num_epochs - pbar.n)

time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')

plt.figure(figsize=(10, 6))
plt.plot(epochs, losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.grid(True)
plt.show()

model.eval()
with torch.no_grad():
    outputs = model(X_test)
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == y_test).sum().item()
    accuracy = correct / y_test.size(0)
    print(f'测试集准确率: {accuracy * 100:.2f}%')

使用设备: cpu


训练进度: 100%|██████████| 20000/20000 [00:10<00:00, 1929.56epoch/s, Loss=0.0626]


Training time: 10.37 seconds

在这里插入图片描述

测试集准确率: 96.67%
1.过拟合的判断:测试集和训练集同步打印指标
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')

iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(4, 10)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(10, 3)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
train_losses = []
test_losses = []
epochs = []
start_time = time.time()

with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
    for epoch in range(num_epochs):
        outputs = model(X_train)
        train_loss = criterion(outputs, y_train)
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()

        if (epoch + 1) % 200 == 0:
            model.eval()
            with torch.no_grad():
                test_outputs = model(X_test)
                test_loss = criterion(test_outputs, y_test)
            model.train()
            train_losses.append(train_loss.item())
            test_losses.append(test_loss.item())
            epochs.append(epoch + 1)
            pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})

        if (epoch + 1) % 1000 == 0:
            pbar.update(1000)

    if pbar.n < num_epochs:
        pbar.update(num_epochs - pbar.n)

time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

model.eval()
with torch.no_grad():
    outputs = model(X_test) 
    _, predicted = torch.max(outputs, 1) 
    correct = (predicted == y_test).sum().item()
    accuracy = correct / y_test.size(0)
    print(f'测试集准确率: {accuracy * 100:.2f}%')
 
使用设备: cpu


训练进度: 100%|██████████| 20000/20000 [00:10<00:00, 1902.74epoch/s, Train Loss=0.0623, Test Loss=0.0553]


Training time: 10.51 seconds

在这里插入图片描述

测试集准确率: 96.67%
2.模型的保存和加载
a.仅保存权重
torch.save(model.state_dict(), 'model_weights.pth')

model = MLP()
model.load_state_dict(torch.load('model_weights.pth'))

<All keys matched successfully>
b.保存权重和模型
torch.save(model, 'full_model.pth')
model = torch.load('full_model.pth')
model.eval()

MLP(
  (fc1): Linear(in_features=4, out_features=10, bias=True)
  (relu): ReLU()
  (fc2): Linear(in_features=10, out_features=3, bias=True)
)
c.保存全部信息checkpoint,还包含训练状态
# checkpoint = {
#     'model_state_dict': model.state_dict(),
#     'optimizer_state_dict': optimizer.state_dict(),
#     'epoch': epoch,
#     'loss': best_loss,
# }
# torch.save(checkpoint, 'checkpoint.pth')

# model = MLP()
# optimizer = torch.optim.Adam(model.parameters())
# checkpoint = torch.load('checkpoint.pth')

# model.load_state_dict(checkpoint['model_state_dict'])
# optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
# start_epoch = checkpoint['epoch'] + 1
# best_loss = checkpoint['loss']

# for epoch in range(start_epoch, num_epochs):
#     train(model, optimizer, ...)

3.早停策略
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')

iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(4, 10)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(10, 3)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
train_losses = []
test_losses = []
epochs = []
best_test_loss = float('inf')
best_epoch = 0
patience = 50
counter = 0
early_stopped = False
start_time = time.time()

with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
    for epoch in range(num_epochs):
        outputs = model(X_train)
        train_loss = criterion(outputs, y_train)
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()

        if (epoch + 1) % 200 == 0:
            model.eval()
            with torch.no_grad():
                test_outputs = model(X_test)
                test_loss = criterion(test_outputs, y_test)
            model.train()
            train_losses.append(train_loss.item())
            test_losses.append(test_loss.item())
            epochs.append(epoch + 1)
            pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})
            
            if test_loss.item() < best_test_loss:
                best_test_loss = test_loss.item()
                best_epoch = epoch + 1
                counter = 0
                torch.save(model.state_dict(), 'best_model.pth')
            else:
                counter += 1
                if counter >= patience:
                    print(f'早停触发!在第{epoch+1}轮, 测试集损失已有{patience}轮未改善')
                    print(f'最佳测试集损失出现在第{best_epoch}轮, 损失值为{best_test_loss:.4f}')
                    early_stopped = True
                    break

        if (epoch + 1) % 1000 == 0:
            pbar.update(1000)

    if pbar.n < num_epochs:
        pbar.update(num_epochs - pbar.n)

time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')

if early_stopped:
    print(f'加载第{best_epoch}轮的最佳模型进行最终评估')
    model.load_state_dict(torch.load('best_model.pth'))

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

model.eval()
with torch.no_grad():
    outputs = model(X_test)
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == y_test).sum().item()
    accuracy = correct / y_test.size(0)
    print(f'测试集准确率: {accuracy * 100:.2f}%')    

使用设备: cpu


训练进度:   0%|          | 0/20000 [00:00<?, ?epoch/s]

训练进度: 100%|██████████| 20000/20000 [00:11<00:00, 1802.94epoch/s, Train Loss=0.0623, Test Loss=0.0576]


Training time: 11.09 seconds

在这里插入图片描述

测试集准确率: 96.67%
作业:对信贷数据集训练后保存权重,加载权重后继续训练50轮,并采取早停策略
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm 
import pandas as pd

plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

data = pd.read_csv(r'data.csv')

list_discrete = data.select_dtypes(include=['object']).columns.tolist()

home_ownership_mapping = {'Own Home': 1, 'Rent': 2,
                          'Have Mortgage': 3, 'Home Mortgage': 4}
data['Home Ownership'] = data['Home Ownership'].map(home_ownership_mapping)

years_in_job_mapping = {'< 1 year': 1, '1 year': 2, '2 years': 3, '3 years': 4, '4 years': 5,
                        '5 years': 6, '6 years': 7, '7 years': 8, '8 years': 9, '9 years': 10, '10+ years': 11}
data['Years in current job'] = data['Years in current job'].map(
    years_in_job_mapping)

data = pd.get_dummies(data, columns=['Purpose'])
data2 = pd.read_csv(r'data.csv')
list_new = []
for i in data.columns:
    if i not in data2.columns:
        list_new.append(i)
for i in list_new:
    data[i] = data[i].astype(int)

term_mapping = {'Short Term': 0, 'Long Term': 1}
data['Term'] = data['Term'].map(term_mapping)
data.rename(columns={'Term': 'Long Term'}, inplace=True)

list_continuous = data.select_dtypes(
    include=['int64', 'float64']).columns.tolist()

for i in list_continuous:
    median_value = data[i].median()
    data[i] = data[i].fillna(median_value)
 
X = data.drop(['Credit Default'], axis=1)
y = data['Credit Default']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}\n')

scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train.values).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test.values).to(device)

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(31, 64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(64, 3)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out

model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
train_losses = []
test_losses = []
epochs = []
best_test_loss = float('inf')
best_epoch = 0
patience = 50
counter = 0
early_stopped = False
start_time = time.time()

with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
    for epoch in range(num_epochs):
        outputs = model(X_train)
        train_loss = criterion(outputs, y_train)
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()

        if (epoch + 1) % 200 == 0:
            model.eval()
            with torch.no_grad():
                test_outputs = model(X_test)
                test_loss = criterion(test_outputs, y_test)
            model.train()
            train_losses.append(train_loss.item())
            test_losses.append(test_loss.item())
            epochs.append(epoch + 1)
            pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})
            
            if test_loss.item() < best_test_loss:
                best_test_loss = test_loss.item()
                best_epoch = epoch + 1
                counter = 0
                torch.save(model.state_dict(), 'best_model.pth')
            else:
                counter += 1
                if counter >= patience:
                    print(f'早停触发!在第{epoch+1}轮, 测试集损失已有{patience}轮未改善')
                    print(f'最佳测试集损失出现在第{best_epoch}轮, 损失值为{best_test_loss:.4f}')
                    early_stopped = True
                    break

        if (epoch + 1) % 1000 == 0:
            pbar.update(1000)

    if pbar.n < num_epochs:
        pbar.update(num_epochs - pbar.n)

time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')

if early_stopped:
    print(f'加载第{best_epoch}轮的最佳模型进行最终评估')
    model.load_state_dict(torch.load('best_model.pth'))

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()

model.eval()
with torch.no_grad():
    outputs = model(X_test)
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == y_test).sum().item()
    accuracy = correct / y_test.size(0)
    print(f'测试集准确率: {accuracy * 100:.2f}%')    

使用设备: cpu



训练进度: 100%|██████████| 20000/20000 [00:49<00:00, 407.11epoch/s, Train Loss=0.4646, Test Loss=0.4723]

Training time: 49.13 seconds

在这里插入图片描述

测试集准确率: 76.80%

@浙大疏锦行


网站公告

今日签到

点亮在社区的每一天
去签到