前面介绍了熵最小化、常用的权重函数汇总、半监督学习:低密度分离假设 (Low-Density Separation Assumption)、标签平滑、信息最大化等相关的知识点,本文采用一个MNIST
10分类的数据集来进一步体会它们的效果。
案例实施
对比方法
- 纯监督学习方法(”supervised“):仅含10%的标签数据。(损失函数=监督损失)
- 熵最小化方法(“entropy”):10%的标签数据+无监督数据。(损失函数=监督损失+熵最小化损失)
- 伪标签方法(“pseudo”):10%的标签数据+无监督数据(伪标签),即将模型预测置信度大的标签作为对应样本的伪标签。(损失函数=监督损失+伪标签监督损失)
- 熵最小化+伪标签(“pseudo_entropy”):10%的标签数据+无监督数据(生成伪标签)。(损失函数=监督损失+熵最小化+伪标签监督损失)
- 信息最大化(“inform_max”):10%的标签数据+无监督数据。损失函数=监督损失+信息最大化损失。
代码
说明:
create_semi_supervised_dataset()
:表示从全体训练数据集中随机选取指定比例的带标签和不带标签数据,用于训练。entropy_loss()
:熵最小化损失函数。LabelSmoothingCrossEntropy()
:标签平滑交叉熵,效果可能会比交叉熵好,本文用的是timm
包中的代码。inform_max_loss()
:信息最大化损失函数。
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import random
import argparse
from timm.loss import LabelSmoothingCrossEntropy
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
device = torch.device("cuda")
def parse_args():
parser = argparse.ArgumentParser(description='train')
parser.add_argument('--compare', type=bool, default=True, help='比较所有方法')
parser.add_argument('--method', type=str, default='inform_max', choices=['supervised', 'entropy', 'pseudo', 'pseudo_entropy', 'inform_max'])
# 数据集参数
parser.add_argument('--data_path', type=str, default='G:\CV_opensource_code\datasets\mnist', help='MNIST数据集路径')
parser.add_argument('--download', type=bool, default=False, help='是否下载MNIST数据集')
# 训练参数
parser.add_argument('--epochs', type=int, default=1000, help='训练轮数')
parser.add_argument('--bs', type=int, default=128, help='batch size')
parser.add_argument('--optimizer', type=str, default='adam', choices=['adam', 'sgd'], help='优化器类型')
parser.add_argument('--lr', type=float, default=0.001, help='学习率')
parser.add_argument('--label_smooth', type=bool, default=True, help='采用标签平滑损失或者交叉熵损失')
# 半监督参数
parser.add_argument('--labeled_ratio', type=float, default=0.1, help='整个训练集中带标签样本比率')
parser.add_argument('--lambda_et', type=float, default=.5, help='IM的熵损失权重')
parser.add_argument('--lambda_div', type=float, default=.5, help='IM的多样化损失权重')
parser.add_argument('--alpha', type=float, default=0.5, help='无标签损失的权重')
parser.add_argument('--confidence_threshold', type=float, default=0.95, help='伪标签的置信度阈值')
args = parser.parse_args()
return args
def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
def create_semi_supervised_dataset(dataset, labeled_ratio=0.1):
labeled_indices = []
unlabeled_indices = []
targets = np.array(dataset.targets) # label
for i in range(10):
class_indices = np.where(targets == i)[0]
np.random.shuffle(class_indices)
n_labeled = int(len(class_indices) * labeled_ratio)
labeled_indices.extend(class_indices[:n_labeled]) # 取labeled_ratio比率的样本作为带标签的样本
unlabeled_indices.extend(class_indices[n_labeled:]) # 剩余的作为未标记的
return torch.utils.data.Subset(dataset, labeled_indices), torch.utils.data.Subset(dataset, unlabeled_indices)
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout2d(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = nn.functional.relu(self.conv1(x))
x = nn.functional.relu(self.conv2(x))
x = nn.functional.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = nn.functional.relu(self.fc1(x))
x = self.dropout2(x)
return self.fc2(x)
def entropy_loss(predictions):
probabilities = torch.softmax(predictions, dim=1)
log_probs = torch.log_softmax(predictions, dim=1)
entropy = -torch.sum(probabilities * log_probs, dim=1)
return torch.mean(entropy)
def inform_max_loss(logits, lambda_div=0.1, lambda_et=1., eps=1e-8):
# 计算softmax概率
probs = F.softmax(logits, dim=1)
# 1. L_ent: 熵最小化损失,使预测更确定
entropy_per_sample = -torch.sum(probs * torch.log(probs + eps), dim=1)
entropy_loss = torch.mean(entropy_per_sample)
# 2. L_div: 多样性最大化损失, 使类别分布均匀
mean_probs = torch.mean(probs, dim=0) # 边缘分布,由于样本是独立同分布的,这里考虑概率的平均值而非总和
diversity_loss = -torch.sum(mean_probs * torch.log(mean_probs + eps))
# L_IM总损失
total_loss = lambda_et * entropy_loss - lambda_div * diversity_loss
return total_loss
def evaluate(model, test_loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return correct / total
def loss_supervised(model, labeled_inputs, labels, **kwargs):
outputs = model(labeled_inputs)
labeled_loss = kwargs['criterion'](outputs, labels)
return labeled_loss, {'labeled_loss': labeled_loss.item()}
def loss_entropy(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
outputs_labeled = model(labeled_inputs)
labeled_loss = kwargs['criterion'](outputs_labeled, labels)
outputs_unlabeled = model(unlabeled_inputs)
ent_loss = entropy_loss(outputs_unlabeled)
total_loss = labeled_loss + kwargs['alpha'] * ent_loss
return total_loss, {
'labeled_loss': labeled_loss.item(),
'entropy_loss': ent_loss.item()
}
def loss_pseudo(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
outputs_labeled = model(labeled_inputs)
labeled_loss = kwargs['criterion'](outputs_labeled, labels)
outputs_unlabeled = model(unlabeled_inputs)
unlabeled_probs = torch.softmax(outputs_unlabeled, dim=1)
max_probs, pseudo_labels = torch.max(unlabeled_probs, dim=1)
mask = max_probs.ge(kwargs['confidence_threshold'])
if mask.sum() > 0:
pl_loss = kwargs['criterion'](outputs_unlabeled[mask], pseudo_labels[mask])
else:
pl_loss = torch.tensor(0.0).to(device)
total_loss = labeled_loss + kwargs['alpha'] * pl_loss
return total_loss, {
'labeled_loss': labeled_loss.item(),
'pseudo_loss': pl_loss.item() if mask.sum() > 0 else 0.0,
'pseudo_ratio': mask.sum().item() / unlabeled_inputs.size(0)
}
def loss_pseudo_entropy(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
outputs_labeled = model(labeled_inputs)
labeled_loss = kwargs['criterion'](outputs_labeled, labels)
outputs_unlabeled = model(unlabeled_inputs)
unlabeled_probs = torch.softmax(outputs_unlabeled, dim=1)
max_probs, pseudo_labels = torch.max(unlabeled_probs, dim=1)
mask = max_probs.ge(kwargs['confidence_threshold'])
if mask.sum() > 0:
pl_loss = kwargs['criterion'](outputs_unlabeled[mask], pseudo_labels[mask])
else:
pl_loss = torch.tensor(0.0).to(device)
ent_loss = entropy_loss(outputs_unlabeled)
total_loss = labeled_loss + kwargs['alpha'] * (pl_loss + ent_loss)
return total_loss, {
'labeled_loss': labeled_loss.item(),
'pseudo_loss': pl_loss.item() if mask.sum() > 0 else 0.0,
'entropy_loss': ent_loss.item(),
'pseudo_ratio': mask.sum().item() / unlabeled_inputs.size(0)
}
def loss_inform_max(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):
outputs_labeled = model(labeled_inputs)
labeled_loss = kwargs['criterion'](outputs_labeled, labels)
outputs_unlabeled = model(unlabeled_inputs)
im_loss = inform_max_loss(outputs_unlabeled, lambda_et=kwargs['lambda_et'], lambda_div=kwargs['lambda_div'])
total_loss = labeled_loss + im_loss
return total_loss, {
'labeled_loss': labeled_loss.item(),
'inform_max_loss': im_loss.item()
}
def train(model, optimizer, criterion, train_loaders, test_loader, epochs, loss_function, **loss_kwargs):
history = {'labeled_loss': [], 'entropy_loss': [], 'pseudo_loss': [], 'inform_max_loss': [], 'total_loss': [], 'accuracy': [], 'pseudo_ratio': []}
labeled_loader = train_loaders['labeled']
unlabeled_loader = train_loaders['unlabeled']
for epoch in range(epochs):
model.train()
epoch_metrics = {k: 0.0 for k in history.keys() if k != 'accuracy'}
epoch_counts = {'labeled': 0, 'unlabeled': 0}
# 为监督学习方法创建虚拟无标签数据迭代器
if unlabeled_loader is None:
unlabeled_iter = iter([])
else:
unlabeled_iter = iter(unlabeled_loader)
for batch_idx, (labeled_inputs, labels) in enumerate(labeled_loader):
labeled_inputs, labels = labeled_inputs.to(device), labels.to(device)
batch_size = labeled_inputs.size(0)
epoch_counts['labeled'] += batch_size
unlabeled_inputs = None
try:
unlabeled_data, _ = next(unlabeled_iter)
unlabeled_inputs = unlabeled_data.to(device)
epoch_counts['unlabeled'] += unlabeled_inputs.size(0)
except StopIteration:
if unlabeled_loader is not None:
unlabeled_iter = iter(unlabeled_loader)
unlabeled_data, _ = next(unlabeled_iter)
unlabeled_inputs = unlabeled_data.to(device)
epoch_counts['unlabeled'] += unlabeled_inputs.size(0)
optimizer.zero_grad()
loss_args = {'model': model, 'labeled_inputs': labeled_inputs, 'labels': labels, 'criterion': criterion, **loss_kwargs}
if unlabeled_inputs is not None:
loss_args['unlabeled_inputs'] = unlabeled_inputs
total_loss, loss_metrics = loss_function(**loss_args)
total_loss.backward()
optimizer.step()
# 累计指标
for key in loss_metrics:
if key in epoch_metrics:
if key == 'pseudo_ratio':
epoch_metrics[key] += loss_metrics[key] * batch_size
else:
epoch_metrics[key] += loss_metrics[key] * batch_size
epoch_metrics['total_loss'] += total_loss.item() * batch_size
# 计算平均指标
for key in epoch_metrics:
if key == 'pseudo_ratio':
history[key].append(epoch_metrics[key] / epoch_counts['labeled'])
else:
history[key].append(epoch_metrics[key] / epoch_counts['labeled'])
# 评估模型
test_acc = evaluate(model, test_loader)
history['accuracy'].append(test_acc)
# 打印进度
print(f"Epoch {epoch + 1}/{epochs}:", end=' ')
for key, value in history.items():
if key != 'accuracy' and value:
print(f"{key}: {value[-1]:.4f}", end=' ')
print(f"Test Acc: {test_acc:.2%}")
return history
def Trainer(args):
# dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_ds = torchvision.datasets.MNIST(root=args.data_path, train=True, download=args.download, transform=transform)
labeled_train_ds, unlabeled_train_ds = create_semi_supervised_dataset(train_ds, args.labeled_ratio)
test_ds = torchvision.datasets.MNIST(root=args.data_path, train=False, download=args.download, transform=transform)
print(f'labeled train data: {len(labeled_train_ds)}, unlabeled train data: {len(unlabeled_train_ds)}, test data: {len(test_ds)}')
# dataloader
labeled_loader = DataLoader(labeled_train_ds, batch_size=args.bs, shuffle=True)
if args.method == 'supervised':
unlabeled_loader = None
else:
unlabeled_loader = DataLoader(unlabeled_train_ds, batch_size=args.bs, shuffle=True)
train_loaders = {'labeled': labeled_loader, 'unlabeled': unlabeled_loader}
test_loader = DataLoader(test_ds, batch_size=args.bs, shuffle=False)
# 选择损失函数
loss_functions = {'supervised': loss_supervised, 'entropy': loss_entropy, 'pseudo': loss_pseudo, 'pseudo_entropy': loss_pseudo_entropy,
'inform_max': loss_inform_max}
loss_function = loss_functions[args.method]
# 初始化模型
model = CNN().to(device)
# 选择优化器
if args.optimizer == 'adam':
optimizer = optim.Adam(model.parameters(), lr=args.lr)
elif args.optimizer == 'sgd':
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=0.9)
else:
raise ValueError(f"未知优化器: {args.optimizer}")
# 交叉熵
if args.label_smooth:
criterion = LabelSmoothingCrossEntropy().to(device)
else:
criterion = nn.CrossEntropyLoss().to(device)
# 训练参数
loss_kwargs = {'alpha': args.alpha, 'lambda_et': args.lambda_et, 'lambda_div': args.lambda_div, 'confidence_threshold': args.confidence_threshold}
history = train(model, optimizer, criterion, train_loaders, test_loader, args.epochs, loss_function, **loss_kwargs)
return history
def compare_methods(args):
methods = ["supervised", "entropy", "pseudo", "pseudo_entropy", "inform_max"]
results = {'best_acc': [], 'last_acc': [], 'final_loss': [], 'pseudo_ratio': []}
histories = {}
original_method = args.method
for method in methods:
print(f"\n训练方法: {method} method")
args.method = method
history = Trainer(args)
histories[method] = history
# 收集结果
results['last_acc'].append(history['accuracy'][-1]*100)
results['best_acc'].append(max(history['accuracy'])*100)
results['final_loss'].append(history['total_loss'][-1])
results['pseudo_ratio'].append(history['pseudo_ratio'][-1] if 'pseudo_ratio' in history else 0.0)
args.method = original_method
# 可视化结果
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = 'Times New Roman'
plt.rcParams['font.weight'] = 'normal'
plt.rcParams['font.size'] = 10
plt.figure(figsize=(14, 10))
colors = ['red', 'black', 'blue', 'g', 'magenta']
line_st = ['-', '--', '-.', ':', (0, (3, 9, 1, 9))]
# 损失曲线比较
plt.subplot(2, 2, 1)
for i, method in enumerate(methods):
plt.plot(histories[method]['total_loss'], color=colors[i], linestyle=line_st[i], label=method, linewidth=1.3)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
# 准确率
plt.subplot(2, 2, 2)
for i, method in enumerate(methods):
plt.plot(histories[method]['accuracy'], color=colors[i], linestyle=line_st[i], label=method)
plt.title('Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
# 绘制双柱状图
plt.subplot(2, 2, 3)
last_acc = results['last_acc']
best_acc = results['best_acc']
x = np.arange(len(methods)) # 标签位置
width = 0.35 # 柱状图宽度
bar1 = plt.bar(x - width / 2, last_acc, width, label='Last Acc')
bar2 = plt.bar(x + width / 2, best_acc, width, label='Best Acc')
plt.ylabel('Accuracy')
plt.ylim(90, 100)
plt.xticks(x, methods)
plt.legend()
# 添加数值标签
def add_labels(bars):
for bar in bars:
height = bar.get_height()
plt.text(bar.get_x() + bar.get_width() / 2, height, f'{height:.2f}',
ha='center', va='bottom', fontsize=8)
add_labels(bar1)
add_labels(bar2)
# 伪标签使用情况
plt.subplot(2, 2, 4)
pseudo_names = [methods[2], methods[3]]
pseudo_ratios = [results['pseudo_ratio'][2], results['pseudo_ratio'][3]]
plt.bar(pseudo_names, pseudo_ratios, color=[colors[2], colors[3]])
plt.title('Pseudo Label Usage')
plt.ylabel('Ratio')
plt.ylim(0, 1)
for i, v in enumerate(pseudo_ratios):
plt.text(i, v + 0.02, f"{v:.2%}", ha='center')
plt.tight_layout()
plt.savefig('comparison_results.png', dpi=500)
plt.show()
if __name__ == "__main__":
set_seed(2025)
args = parse_args()
if args.compare:
compare_methods(args)
else:
print(f'方法:{args.method} method')
history = Trainer(args)
print(f"\nBest acc (test): {max(history['accuracy']):.2%}, Last acc (test): {history['accuracy'][-1]:.2%}")
结论
- 观察发现,一般标签平滑
LabelSmoothingCrossEntropy
比CrossEntropy
的效果有一定的提升。 - 对比五种方法,联合伪标签+熵最小化效果有微弱的提升,其余方法对比纯监督方法没有竞争力。
- 本案例没有任何调参,直接采用随机或者默认的参数,实际中可以采用学习率退火、变权重等技巧,可能会涨点。
最后,上述源代码第一版是由deepseek生成,本人做了部分修改。因此,代码仅供参考。