前面介绍了熵最小化、常用的权重函数汇总、半监督学习:低密度分离假设 (Low-Density Separation Assumption)、标签平滑、信息最大化等相关的知识点,本文采用一个MNIST
10分类的数据集来进一步体会它们的效果。
案例实施
对比方法
- 纯监督学习方法(”supervised“):仅含10%的标签数据。(损失函数=监督损失)
- 熵最小化方法(“entropy”):10%的标签数据+无监督数据。(损失函数=监督损失+熵最小化损失)
- 伪标签方法(“pseudo”):10%的标签数据+无监督数据(伪标签),即将模型预测置信度大的标签作为对应样本的伪标签。(损失函数=监督损失+伪标签监督损失)
- 熵最小化+伪标签(“pseudo_entropy”):10%的标签数据+无监督数据(生成伪标签)。(损失函数=监督损失+熵最小化+伪标签监督损失)
- 信息最大化(“inform_max”):10%的标签数据+无监督数据。损失函数=监督损失+信息最大化损失。
代码
说明:
create_semi_supervised_dataset()
:表示从全体训练数据集中随机选取指定比例的带标签和不带标签数据,用于训练。entropy_loss()
:熵最小化损失函数。LabelSmoothingCrossEntropy()
:标签平滑交叉熵,效果可能会比交叉熵好,本文用的是timm
包中的代码。inform_max_loss()
:信息最大化损失函数。
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import random
import argparse
from timm.loss import LabelSmoothingCrossEntropy
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoaderdevice = torch.device("cuda")def parse_args():parser = argparse.ArgumentParser(description='train')parser.add_argument('--compare', type=bool, default=True, help='比较所有方法')parser.add_argument('--method', type=str, default='inform_max', choices=['supervised', 'entropy', 'pseudo', 'pseudo_entropy', 'inform_max'])# 数据集参数parser.add_argument('--data_path', type=str, default='G:\CV_opensource_code\datasets\mnist', help='MNIST数据集路径')parser.add_argument('--download', type=bool, default=False, help='是否下载MNIST数据集')# 训练参数parser.add_argument('--epochs', type=int, default=1000, help='训练轮数')parser.add_argument('--bs', type=int, default=128, help='batch size')parser.add_argument('--optimizer', type=str, default='adam', choices=['adam', 'sgd'], help='优化器类型')parser.add_argument('--lr', type=float, default=0.001, help='学习率')parser.add_argument('--label_smooth', type=bool, default=True, help='采用标签平滑损失或者交叉熵损失')# 半监督参数parser.add_argument('--labeled_ratio', type=float, default=0.1, help='整个训练集中带标签样本比率')parser.add_argument('--lambda_et', type=float, default=.5, help='IM的熵损失权重')parser.add_argument('--lambda_div', type=float, default=.5, help='IM的多样化损失权重')parser.add_argument('--alpha', type=float, default=0.5, help='无标签损失的权重')parser.add_argument('--confidence_threshold', type=float, default=0.95, help='伪标签的置信度阈值')args = parser.parse_args()return argsdef set_seed(seed):random.seed(seed)np.random.seed(seed)torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)torch.backends.cudnn.benchmark = Falsetorch.backends.cudnn.deterministic = Truedef create_semi_supervised_dataset(dataset, labeled_ratio=0.1):labeled_indices = []unlabeled_indices = []targets = np.array(dataset.targets) # labelfor i in range(10):class_indices = np.where(targets == i)[0]np.random.shuffle(class_indices)n_labeled = int(len(class_indices) * labeled_ratio)labeled_indices.extend(class_indices[:n_labeled]) # 取labeled_ratio比率的样本作为带标签的样本unlabeled_indices.extend(class_indices[n_labeled:]) # 剩余的作为未标记的return torch.utils.data.Subset(dataset, labeled_indices), torch.utils.data.Subset(dataset, unlabeled_indices)class CNN(nn.Module):def __init__(self):super(CNN, self).__init__()self.conv1 = nn.Conv2d(1, 32, 3, 1)self.conv2 = nn.Conv2d(32, 64, 3, 1)self.dropout1 = nn.Dropout2d(0.25)self.dropout2 = nn.Dropout(0.5)self.fc1 = nn.Linear(9216, 128)self.fc2 = nn.Linear(128, 10)def forward(self, x):x = nn.functional.relu(self.conv1(x))x = nn.functional.relu(self.conv2(x))x = nn.functional.max_pool2d(x, 2)x = self.dropout1(x)x = torch.flatten(x, 1)x = nn.functional.relu(self.fc1(x))x = self.dropout2(x)return self.fc2(x)def entropy_loss(predictions):probabilities = torch.softmax(predictions, dim=1)log_probs = torch.log_softmax(predictions, dim=1)entropy = -torch.sum(probabilities * log_probs, dim=1)return torch.mean(entropy)def inform_max_loss(logits, lambda_div=0.1, lambda_et=1., eps=1e-8):# 计算softmax概率probs = F.softmax(logits, dim=1)# 1. L_ent: 熵最小化损失,使预测更确定entropy_per_sample = -torch.sum(probs * torch.log(probs + eps), dim=1)entropy_loss = torch.mean(entropy_per_sample)# 2. L_div: 多样性最大化损失, 使类别分布均匀mean_probs = torch.mean(probs, dim=0) # 边缘分布,由于样本是独立同分布的,这里考虑概率的平均值而非总和diversity_loss = -torch.sum(mean_probs * torch.log(mean_probs + eps))# L_IM总损失total_loss = lambda_et * entropy_loss - lambda_div * diversity_lossreturn total_lossdef evaluate(model, test_loader):model.eval()correct = 0total = 0with torch.no_grad():for inputs, labels in test_loader:inputs, labels = inputs.to(device), labels.to(device)outputs = model(inputs)_, predicted = torch.max(outputs.data, 1)total += labels.size(0)correct += (predicted == labels).sum().item()return correct / totaldef loss_supervised(model, labeled_inputs, labels, **kwargs):outputs = model(labeled_inputs)labeled_loss = kwargs['criterion'](outputs, labels)return labeled_loss, {'labeled_loss': labeled_loss.item()}def loss_entropy(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):outputs_labeled = model(labeled_inputs)labeled_loss = kwargs['criterion'](outputs_labeled, labels)outputs_unlabeled = model(unlabeled_inputs)ent_loss = entropy_loss(outputs_unlabeled)total_loss = labeled_loss + kwargs['alpha'] * ent_lossreturn total_loss, {'labeled_loss': labeled_loss.item(),'entropy_loss': ent_loss.item()}def loss_pseudo(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):outputs_labeled = model(labeled_inputs)labeled_loss = kwargs['criterion'](outputs_labeled, labels)outputs_unlabeled = model(unlabeled_inputs)unlabeled_probs = torch.softmax(outputs_unlabeled, dim=1)max_probs, pseudo_labels = torch.max(unlabeled_probs, dim=1)mask = max_probs.ge(kwargs['confidence_threshold'])if mask.sum() > 0:pl_loss = kwargs['criterion'](outputs_unlabeled[mask], pseudo_labels[mask])else:pl_loss = torch.tensor(0.0).to(device)total_loss = labeled_loss + kwargs['alpha'] * pl_lossreturn total_loss, {'labeled_loss': labeled_loss.item(),'pseudo_loss': pl_loss.item() if mask.sum() > 0 else 0.0,'pseudo_ratio': mask.sum().item() / unlabeled_inputs.size(0)}def loss_pseudo_entropy(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):outputs_labeled = model(labeled_inputs)labeled_loss = kwargs['criterion'](outputs_labeled, labels)outputs_unlabeled = model(unlabeled_inputs)unlabeled_probs = torch.softmax(outputs_unlabeled, dim=1)max_probs, pseudo_labels = torch.max(unlabeled_probs, dim=1)mask = max_probs.ge(kwargs['confidence_threshold'])if mask.sum() > 0:pl_loss = kwargs['criterion'](outputs_unlabeled[mask], pseudo_labels[mask])else:pl_loss = torch.tensor(0.0).to(device)ent_loss = entropy_loss(outputs_unlabeled)total_loss = labeled_loss + kwargs['alpha'] * (pl_loss + ent_loss)return total_loss, {'labeled_loss': labeled_loss.item(),'pseudo_loss': pl_loss.item() if mask.sum() > 0 else 0.0,'entropy_loss': ent_loss.item(),'pseudo_ratio': mask.sum().item() / unlabeled_inputs.size(0)}def loss_inform_max(model, labeled_inputs, labels, unlabeled_inputs, **kwargs):outputs_labeled = model(labeled_inputs)labeled_loss = kwargs['criterion'](outputs_labeled, labels)outputs_unlabeled = model(unlabeled_inputs)im_loss = inform_max_loss(outputs_unlabeled, lambda_et=kwargs['lambda_et'], lambda_div=kwargs['lambda_div'])total_loss = labeled_loss + im_lossreturn total_loss, {'labeled_loss': labeled_loss.item(),'inform_max_loss': im_loss.item()}def train(model, optimizer, criterion, train_loaders, test_loader, epochs, loss_function, **loss_kwargs):history = {'labeled_loss': [], 'entropy_loss': [], 'pseudo_loss': [], 'inform_max_loss': [], 'total_loss': [], 'accuracy': [], 'pseudo_ratio': []}labeled_loader = train_loaders['labeled']unlabeled_loader = train_loaders['unlabeled']for epoch in range(epochs):model.train()epoch_metrics = {k: 0.0 for k in history.keys() if k != 'accuracy'}epoch_counts = {'labeled': 0, 'unlabeled': 0}# 为监督学习方法创建虚拟无标签数据迭代器if unlabeled_loader is None:unlabeled_iter = iter([])else:unlabeled_iter = iter(unlabeled_loader)for batch_idx, (labeled_inputs, labels) in enumerate(labeled_loader):labeled_inputs, labels = labeled_inputs.to(device), labels.to(device)batch_size = labeled_inputs.size(0)epoch_counts['labeled'] += batch_sizeunlabeled_inputs = Nonetry:unlabeled_data, _ = next(unlabeled_iter)unlabeled_inputs = unlabeled_data.to(device)epoch_counts['unlabeled'] += unlabeled_inputs.size(0)except StopIteration:if unlabeled_loader is not None:unlabeled_iter = iter(unlabeled_loader)unlabeled_data, _ = next(unlabeled_iter)unlabeled_inputs = unlabeled_data.to(device)epoch_counts['unlabeled'] += unlabeled_inputs.size(0)optimizer.zero_grad()loss_args = {'model': model, 'labeled_inputs': labeled_inputs, 'labels': labels, 'criterion': criterion, **loss_kwargs}if unlabeled_inputs is not None:loss_args['unlabeled_inputs'] = unlabeled_inputstotal_loss, loss_metrics = loss_function(**loss_args)total_loss.backward()optimizer.step()# 累计指标for key in loss_metrics:if key in epoch_metrics:if key == 'pseudo_ratio':epoch_metrics[key] += loss_metrics[key] * batch_sizeelse:epoch_metrics[key] += loss_metrics[key] * batch_sizeepoch_metrics['total_loss'] += total_loss.item() * batch_size# 计算平均指标for key in epoch_metrics:if key == 'pseudo_ratio':history[key].append(epoch_metrics[key] / epoch_counts['labeled'])else:history[key].append(epoch_metrics[key] / epoch_counts['labeled'])# 评估模型test_acc = evaluate(model, test_loader)history['accuracy'].append(test_acc)# 打印进度print(f"Epoch {epoch + 1}/{epochs}:", end=' ')for key, value in history.items():if key != 'accuracy' and value:print(f"{key}: {value[-1]:.4f}", end=' ')print(f"Test Acc: {test_acc:.2%}")return historydef Trainer(args):# datasettransform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])train_ds = torchvision.datasets.MNIST(root=args.data_path, train=True, download=args.download, transform=transform)labeled_train_ds, unlabeled_train_ds = create_semi_supervised_dataset(train_ds, args.labeled_ratio)test_ds = torchvision.datasets.MNIST(root=args.data_path, train=False, download=args.download, transform=transform)print(f'labeled train data: {len(labeled_train_ds)}, unlabeled train data: {len(unlabeled_train_ds)}, test data: {len(test_ds)}')# dataloaderlabeled_loader = DataLoader(labeled_train_ds, batch_size=args.bs, shuffle=True)if args.method == 'supervised':unlabeled_loader = Noneelse:unlabeled_loader = DataLoader(unlabeled_train_ds, batch_size=args.bs, shuffle=True)train_loaders = {'labeled': labeled_loader, 'unlabeled': unlabeled_loader}test_loader = DataLoader(test_ds, batch_size=args.bs, shuffle=False)# 选择损失函数loss_functions = {'supervised': loss_supervised, 'entropy': loss_entropy, 'pseudo': loss_pseudo, 'pseudo_entropy': loss_pseudo_entropy,'inform_max': loss_inform_max}loss_function = loss_functions[args.method]# 初始化模型model = CNN().to(device)# 选择优化器if args.optimizer == 'adam':optimizer = optim.Adam(model.parameters(), lr=args.lr)elif args.optimizer == 'sgd':optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=0.9)else:raise ValueError(f"未知优化器: {args.optimizer}")# 交叉熵if args.label_smooth:criterion = LabelSmoothingCrossEntropy().to(device)else:criterion = nn.CrossEntropyLoss().to(device)# 训练参数loss_kwargs = {'alpha': args.alpha, 'lambda_et': args.lambda_et, 'lambda_div': args.lambda_div, 'confidence_threshold': args.confidence_threshold}history = train(model, optimizer, criterion, train_loaders, test_loader, args.epochs, loss_function, **loss_kwargs)return historydef compare_methods(args):methods = ["supervised", "entropy", "pseudo", "pseudo_entropy", "inform_max"]results = {'best_acc': [], 'last_acc': [], 'final_loss': [], 'pseudo_ratio': []}histories = {}original_method = args.methodfor method in methods:print(f"\n训练方法: {method} method")args.method = methodhistory = Trainer(args)histories[method] = history# 收集结果results['last_acc'].append(history['accuracy'][-1]*100)results['best_acc'].append(max(history['accuracy'])*100)results['final_loss'].append(history['total_loss'][-1])results['pseudo_ratio'].append(history['pseudo_ratio'][-1] if 'pseudo_ratio' in history else 0.0)args.method = original_method# 可视化结果plt.rcParams['axes.unicode_minus'] = Falseplt.rcParams['font.family'] = 'serif'plt.rcParams['font.serif'] = 'Times New Roman'plt.rcParams['font.weight'] = 'normal'plt.rcParams['font.size'] = 10plt.figure(figsize=(14, 10))colors = ['red', 'black', 'blue', 'g', 'magenta']line_st = ['-', '--', '-.', ':', (0, (3, 9, 1, 9))]# 损失曲线比较plt.subplot(2, 2, 1)for i, method in enumerate(methods):plt.plot(histories[method]['total_loss'], color=colors[i], linestyle=line_st[i], label=method, linewidth=1.3)plt.title('Training Loss')plt.xlabel('Epoch')plt.ylabel('Loss')plt.legend()plt.grid(True)# 准确率plt.subplot(2, 2, 2)for i, method in enumerate(methods):plt.plot(histories[method]['accuracy'], color=colors[i], linestyle=line_st[i], label=method)plt.title('Test Accuracy')plt.xlabel('Epoch')plt.ylabel('Accuracy')plt.legend()plt.grid(True)# 绘制双柱状图plt.subplot(2, 2, 3)last_acc = results['last_acc']best_acc = results['best_acc']x = np.arange(len(methods)) # 标签位置width = 0.35 # 柱状图宽度bar1 = plt.bar(x - width / 2, last_acc, width, label='Last Acc')bar2 = plt.bar(x + width / 2, best_acc, width, label='Best Acc')plt.ylabel('Accuracy')plt.ylim(90, 100)plt.xticks(x, methods)plt.legend()# 添加数值标签def add_labels(bars):for bar in bars:height = bar.get_height()plt.text(bar.get_x() + bar.get_width() / 2, height, f'{height:.2f}',ha='center', va='bottom', fontsize=8)add_labels(bar1)add_labels(bar2)# 伪标签使用情况plt.subplot(2, 2, 4)pseudo_names = [methods[2], methods[3]]pseudo_ratios = [results['pseudo_ratio'][2], results['pseudo_ratio'][3]]plt.bar(pseudo_names, pseudo_ratios, color=[colors[2], colors[3]])plt.title('Pseudo Label Usage')plt.ylabel('Ratio')plt.ylim(0, 1)for i, v in enumerate(pseudo_ratios):plt.text(i, v + 0.02, f"{v:.2%}", ha='center')plt.tight_layout()plt.savefig('comparison_results.png', dpi=500)plt.show()if __name__ == "__main__":set_seed(2025)args = parse_args()if args.compare:compare_methods(args)else:print(f'方法:{args.method} method')history = Trainer(args)print(f"\nBest acc (test): {max(history['accuracy']):.2%}, Last acc (test): {history['accuracy'][-1]:.2%}")
结论
- 观察发现,一般标签平滑
LabelSmoothingCrossEntropy
比CrossEntropy
的效果有一定的提升。 - 对比五种方法,联合伪标签+熵最小化效果有微弱的提升,其余方法对比纯监督方法没有竞争力。
- 本案例没有任何调参,直接采用随机或者默认的参数,实际中可以采用学习率退火、变权重等技巧,可能会涨点。
最后,上述源代码第一版是由deepseek生成,本人做了部分修改。因此,代码仅供参考。