DAY 37 早停策略和模型权重的保存
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.fc1 = nn.Linear(4, 10)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(10, 3)
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
losses = []
epochs = []
start_time = time.time()
with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
for epoch in range(num_epochs):
outputs = model(X_train)
loss = criterion(outputs, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 200 == 0:
losses.append(loss.item())
epochs.append(epoch + 1)
pbar.set_postfix({'Loss': f'{loss.item():.4f}'})
if (epoch + 1) % 1000 == 0:
pbar.update(1000)
if pbar.n < num_epochs:
pbar.update(num_epochs - pbar.n)
time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')
plt.figure(figsize=(10, 6))
plt.plot(epochs, losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Epochs')
plt.grid(True)
plt.show()
model.eval()
with torch.no_grad():
outputs = model(X_test)
_, predicted = torch.max(outputs, 1)
correct = (predicted == y_test).sum().item()
accuracy = correct / y_test.size(0)
print(f'测试集准确率: {accuracy * 100:.2f}%')
使用设备: cpu
训练进度: 100%|██████████| 20000/20000 [00:10<00:00, 1929.56epoch/s, Loss=0.0626]
Training time: 10.37 seconds

测试集准确率: 96.67%
1.过拟合的判断:测试集和训练集同步打印指标
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.fc1 = nn.Linear(4, 10)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(10, 3)
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
train_losses = []
test_losses = []
epochs = []
start_time = time.time()
with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
for epoch in range(num_epochs):
outputs = model(X_train)
train_loss = criterion(outputs, y_train)
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
if (epoch + 1) % 200 == 0:
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
test_loss = criterion(test_outputs, y_test)
model.train()
train_losses.append(train_loss.item())
test_losses.append(test_loss.item())
epochs.append(epoch + 1)
pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})
if (epoch + 1) % 1000 == 0:
pbar.update(1000)
if pbar.n < num_epochs:
pbar.update(num_epochs - pbar.n)
time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')
plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()
model.eval()
with torch.no_grad():
outputs = model(X_test)
_, predicted = torch.max(outputs, 1)
correct = (predicted == y_test).sum().item()
accuracy = correct / y_test.size(0)
print(f'测试集准确率: {accuracy * 100:.2f}%')
使用设备: cpu
训练进度: 100%|██████████| 20000/20000 [00:10<00:00, 1902.74epoch/s, Train Loss=0.0623, Test Loss=0.0553]
Training time: 10.51 seconds

测试集准确率: 96.67%
2.模型的保存和加载
a.仅保存权重
torch.save(model.state_dict(), 'model_weights.pth')
model = MLP()
model.load_state_dict(torch.load('model_weights.pth'))
<All keys matched successfully>
b.保存权重和模型
torch.save(model, 'full_model.pth')
model = torch.load('full_model.pth')
model.eval()
MLP(
(fc1): Linear(in_features=4, out_features=10, bias=True)
(relu): ReLU()
(fc2): Linear(in_features=10, out_features=3, bias=True)
)
c.保存全部信息checkpoint,还包含训练状态
3.早停策略
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}')
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test).to(device)
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.fc1 = nn.Linear(4, 10)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(10, 3)
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
train_losses = []
test_losses = []
epochs = []
best_test_loss = float('inf')
best_epoch = 0
patience = 50
counter = 0
early_stopped = False
start_time = time.time()
with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
for epoch in range(num_epochs):
outputs = model(X_train)
train_loss = criterion(outputs, y_train)
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
if (epoch + 1) % 200 == 0:
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
test_loss = criterion(test_outputs, y_test)
model.train()
train_losses.append(train_loss.item())
test_losses.append(test_loss.item())
epochs.append(epoch + 1)
pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})
if test_loss.item() < best_test_loss:
best_test_loss = test_loss.item()
best_epoch = epoch + 1
counter = 0
torch.save(model.state_dict(), 'best_model.pth')
else:
counter += 1
if counter >= patience:
print(f'早停触发!在第{epoch+1}轮, 测试集损失已有{patience}轮未改善')
print(f'最佳测试集损失出现在第{best_epoch}轮, 损失值为{best_test_loss:.4f}')
early_stopped = True
break
if (epoch + 1) % 1000 == 0:
pbar.update(1000)
if pbar.n < num_epochs:
pbar.update(num_epochs - pbar.n)
time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')
if early_stopped:
print(f'加载第{best_epoch}轮的最佳模型进行最终评估')
model.load_state_dict(torch.load('best_model.pth'))
plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()
model.eval()
with torch.no_grad():
outputs = model(X_test)
_, predicted = torch.max(outputs, 1)
correct = (predicted == y_test).sum().item()
accuracy = correct / y_test.size(0)
print(f'测试集准确率: {accuracy * 100:.2f}%')
使用设备: cpu
训练进度: 0%| | 0/20000 [00:00<?, ?epoch/s]
训练进度: 100%|██████████| 20000/20000 [00:11<00:00, 1802.94epoch/s, Train Loss=0.0623, Test Loss=0.0576]
Training time: 11.09 seconds

测试集准确率: 96.67%
作业:对信贷数据集训练后保存权重,加载权重后继续训练50轮,并采取早停策略
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import time
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
data = pd.read_csv(r'data.csv')
list_discrete = data.select_dtypes(include=['object']).columns.tolist()
home_ownership_mapping = {'Own Home': 1, 'Rent': 2,
'Have Mortgage': 3, 'Home Mortgage': 4}
data['Home Ownership'] = data['Home Ownership'].map(home_ownership_mapping)
years_in_job_mapping = {'< 1 year': 1, '1 year': 2, '2 years': 3, '3 years': 4, '4 years': 5,
'5 years': 6, '6 years': 7, '7 years': 8, '8 years': 9, '9 years': 10, '10+ years': 11}
data['Years in current job'] = data['Years in current job'].map(
years_in_job_mapping)
data = pd.get_dummies(data, columns=['Purpose'])
data2 = pd.read_csv(r'data.csv')
list_new = []
for i in data.columns:
if i not in data2.columns:
list_new.append(i)
for i in list_new:
data[i] = data[i].astype(int)
term_mapping = {'Short Term': 0, 'Long Term': 1}
data['Term'] = data['Term'].map(term_mapping)
data.rename(columns={'Term': 'Long Term'}, inplace=True)
list_continuous = data.select_dtypes(
include=['int64', 'float64']).columns.tolist()
for i in list_continuous:
median_value = data[i].median()
data[i] = data[i].fillna(median_value)
X = data.drop(['Credit Default'], axis=1)
y = data['Credit Default']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'使用设备: {device}\n')
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
X_train = torch.FloatTensor(X_train).to(device)
y_train = torch.LongTensor(y_train.values).to(device)
X_test = torch.FloatTensor(X_test).to(device)
y_test = torch.LongTensor(y_test.values).to(device)
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
self.fc1 = nn.Linear(31, 64)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(64, 3)
def forward(self, x):
out = self.fc1(x)
out = self.relu(out)
out = self.fc2(out)
return out
model = MLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
num_epochs = 20000
train_losses = []
test_losses = []
epochs = []
best_test_loss = float('inf')
best_epoch = 0
patience = 50
counter = 0
early_stopped = False
start_time = time.time()
with tqdm(total=num_epochs, desc='训练进度', unit='epoch') as pbar:
for epoch in range(num_epochs):
outputs = model(X_train)
train_loss = criterion(outputs, y_train)
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
if (epoch + 1) % 200 == 0:
model.eval()
with torch.no_grad():
test_outputs = model(X_test)
test_loss = criterion(test_outputs, y_test)
model.train()
train_losses.append(train_loss.item())
test_losses.append(test_loss.item())
epochs.append(epoch + 1)
pbar.set_postfix({'Train Loss': f'{train_loss.item():.4f}', 'Test Loss': f'{test_loss.item():.4f}'})
if test_loss.item() < best_test_loss:
best_test_loss = test_loss.item()
best_epoch = epoch + 1
counter = 0
torch.save(model.state_dict(), 'best_model.pth')
else:
counter += 1
if counter >= patience:
print(f'早停触发!在第{epoch+1}轮, 测试集损失已有{patience}轮未改善')
print(f'最佳测试集损失出现在第{best_epoch}轮, 损失值为{best_test_loss:.4f}')
early_stopped = True
break
if (epoch + 1) % 1000 == 0:
pbar.update(1000)
if pbar.n < num_epochs:
pbar.update(num_epochs - pbar.n)
time_all = time.time() - start_time
print(f'Training time: {time_all:.2f} seconds')
if early_stopped:
print(f'加载第{best_epoch}轮的最佳模型进行最终评估')
model.load_state_dict(torch.load('best_model.pth'))
plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss over Epochs')
plt.legend()
plt.grid(True)
plt.show()
model.eval()
with torch.no_grad():
outputs = model(X_test)
_, predicted = torch.max(outputs, 1)
correct = (predicted == y_test).sum().item()
accuracy = correct / y_test.size(0)
print(f'测试集准确率: {accuracy * 100:.2f}%')
使用设备: cpu
训练进度: 100%|██████████| 20000/20000 [00:49<00:00, 407.11epoch/s, Train Loss=0.4646, Test Loss=0.4723]
Training time: 49.13 seconds

测试集准确率: 76.80%
@浙大疏锦行