PyTorch深度学习框架60天进阶学习计划 - 第46天:自动化模型设计(二)
14. 搜索空间设计的最佳实践
基于上述分析,我们总结出ENAS搜索空间设计的几点最佳实践:
适当约束搜索空间:
- 过大的搜索空间不一定带来更好的性能,反而会增加搜索难度
- 利用领域知识合理限制搜索空间,如设置最大深度、最大宽度等
层次化设计:
- 将搜索空间分为多个层次,从粗粒度到细粒度
- 先固定宏观结构,再优化细节
加入先验知识:
- 纳入已知有效的网络模块,如残差连接、瓶颈结构等
- 限制搜索空间中不合理的连接模式
平衡多样性和有效性:
- 确保搜索空间包含多样化的架构
- 避免包含过多低效架构
下面是一个结合最佳实践的ENAS搜索空间实现:
class EnhancedCellSearchSpace:
"""结合最佳实践的增强型单元搜索空间"""
def __init__(self, num_cells=8, num_nodes=7, num_ops=7):
self.num_cells = num_cells
self.num_nodes = num_nodes
self.num_ops = num_ops
# 强制添加残差连接
self.residual = True
# 定义先验知识:节点连接概率矩阵
# 根据经验,相邻节点连接概率更高
self.connection_priors = np.zeros((num_nodes, num_nodes))
for i in range(1, num_nodes):
for j in range(i):
self.connection_priors[i, j] = 1.0 / (1 + (i - j)) # 距离越近,概率越高
# 操作先验:卷积操作通常比池化更有效
self.op_priors = np.ones(num_ops)
# 增加卷积操作的先验概率
self.op_priors[0:4] *= 1.5 # 卷积操作
# 标准化
self.op_priors = self.op_priors / np.sum(self.op_priors)
def sample_arch(self, controller=None):
"""从搜索空间采样架构,可使用控制器或先验知识"""
if controller is not None:
# 使用控制器采样
arch = []
for i in range(self.num_cells):
cell_arch = []
for j in range(2, self.num_nodes):
for k in range(j):
# 使用控制器采样前驱节点和操作
prev_node = controller.sample_node(k, prior=self.connection_priors[j, k])
op_id = controller.sample_op(prior=self.op_priors)
cell_arch.extend([prev_node, op_id])
arch.append(cell_arch)
else:
# 使用先验知识直接采样
arch = []
for i in range(self.num_cells):
cell_arch = []
for j in range(2, self.num_nodes):
for k in range(j):
# 根据先验概率采样
prev_probs = self.connection_priors[j, :k+1]
prev_probs = prev_probs / np.sum(prev_probs)
prev_node = np.random.choice(k+1, p=prev_probs)
op_id = np.random.choice(self.num_ops, p=self.op_priors)
cell_arch.extend([prev_node, op_id])
arch.append(cell_arch)
# 添加残差连接
if self.residual:
# 每个单元增加一个残差连接
for i in range(len(arch)):
# 找到恒等映射操作的ID
identity_op = 6 # 假设ID 6是恒等映射
# 添加从输入到输出的残差连接
arch[i].extend([0, identity_op])
return arch
def build_model(self, arch, C, num_classes):
"""构建模型"""
model = EnhancedCellNetwork(arch, self.num_cells, self.num_nodes,
self.num_ops, C, num_classes, self.residual)
return model
class EnhancedCellNetwork(nn.Module):
"""增强型单元网络"""
def __init__(self, arch, num_cells, num_nodes, num_ops, C, num_classes, residual=True):
super(EnhancedCellNetwork, self).__init__()
self.arch = arch
self.num_cells = num_cells
self.num_nodes = num_nodes
self.num_ops = num_ops
self.C = C
self.residual = residual
# 干细胞网络
self.stem = nn.Sequential(
nn.Conv2d(3, C, 3, padding=1, bias=False),
nn.BatchNorm2d(C),
nn.ReLU(inplace=True)
)
# 定义单元
self.cells = nn.ModuleList()
C_prev, C_curr = C, C
# 添加额外的单元间连接
self.connections = nn.ModuleList()
for i in range(num_cells):
# 每隔几个单元进行下采样
if i in [num_cells//3, 2*num_cells//3]:
C_curr *= 2
reduction = True
# 添加下采样连接
self.connections.append(FactorizedReduce(C_prev, C_curr))
else:
reduction = False
# 添加正常连接
if i > 0:
self.connections.append(
nn.Conv2d(C_prev, C_curr, 1, bias=False)
)
cell = EnhancedCell(arch[i], C_prev, C_curr, reduction, num_nodes, num_ops)
self.cells.append(cell)
C_prev = C_curr * num_nodes # 单元输出通道数
# 引入注意力机制
self.attention = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Conv2d(C_prev, C_prev // 16, 1),
nn.ReLU(inplace=True),
nn.Conv2d(C_prev // 16, C_prev, 1),
nn.Sigmoid()
)
# 分类器
self.global_pooling = nn.AdaptiveAvgPool2d(1)
self.dropout = nn.Dropout(0.2) # 添加dropout提高泛化性
self.classifier = nn.Linear(C_prev, num_classes)
def forward(self, x):
# 干细胞处理
x = self.stem(x)
# 存储中间特征
features = [x]
# 通过所有单元
for i, cell in enumerate(self.cells):
# 处理特征连接
if i > 0:
conn_idx = i - 1
if i >= len(self.cells) // 3 and i < 2 * len(self.cells) // 3:
conn_idx += 1
elif i >= 2 * len(self.cells) // 3:
conn_idx += 2
# 应用连接
x = self.connections[conn_idx](features[-1])
# 应用单元
cell_out = cell(x)
# 如果使用残差连接,添加输入到输出
if self.residual and x.size(2) == cell_out.size(2) and x.size(3) == cell_out.size(3):
# 调整通道数
if x.size(1) != cell_out.size(1):
x = nn.Conv2d(x.size(1), cell_out.size(1), 1).to(x.device)(x)
cell_out = cell_out + x
features.append(cell_out)
x = cell_out
# 应用注意力机制
att = self.attention(x)
x = x * att
# 分类
out = self.global_pooling(x)
out = out.view(out.size(0), -1)
out = self.dropout(out)
logits = self.classifier(out)
return logits
class EnhancedCell(nn.Module):
"""增强型网络单元"""
def __init__(self, arch, C_in, C_out, reduction, num_nodes, num_ops):
super(EnhancedCell, self).__init__()
self.arch = arch
self.reduction = reduction
self.num_nodes = num_nodes
# 预处理输入
stride = 2 if reduction else 1
self.preprocess = nn.Sequential(
nn.ReLU(inplace=False),
nn.Conv2d(C_in, C_out, 1, stride=stride, bias=False),
nn.BatchNorm2d(C_out)
)
# 定义候选操作,增加更多高级操作
self.ops = nn.ModuleList()
for i in range(num_ops):
if i == 0: # 3x3 卷积
op = nn.Sequential(
nn.ReLU(inplace=False),
nn.Conv2d(C_out, C_out, 3, padding=1, bias=False),
nn.BatchNorm2d(C_out)
)
elif i == 1: # 5x5 卷积
op = nn.Sequential(
nn.ReLU(inplace=False),
nn.Conv2d(C_out, C_out, 5, padding=2, bias=False),
nn.BatchNorm2d(C_out)
)
elif i == 2: # 3x3 深度可分离卷积
op = SepConv(C_out, C_out, 3, 1)
elif i == 3: # 5x5 深度可分离卷积
op = SepConv(C_out, C_out, 5, 2)
elif i == 4: # 3x3 空洞卷积
op = nn.Sequential(
nn.ReLU(inplace=False),
nn.Conv2d(C_out, C_out, 3, padding=2, dilation=2, bias=False),
nn.BatchNorm2d(C_out)
)
elif i == 5: # 3x3 可变形卷积(简化版)
op = nn.Sequential(
nn.ReLU(inplace=False),
nn.Conv2d(C_out, C_out, 3, padding=1, bias=False),
nn.BatchNorm2d(C_out)
)
elif i == 6: # 恒等映射
op = nn.Identity()
self.ops.append(op)
def forward(self, x):
# 预处理输入
x = self.preprocess(x)
# 初始化所有节点的特征
nodes = [x]
# 根据架构构建计算图
idx = 0
for i in range(2, self.num_nodes):
# 为当前节点计算所有输入
node_inputs = []
for j in range(i):
prev_node = self.arch[idx]
op_id = self.arch[idx + 1]
idx += 2
# 计算该输入的特征
node_input = self.ops[op_id](nodes[prev_node])
node_inputs.append(node_input)
# 节点特征为所有输入的和
nodes.append(sum(node_inputs))
# 连接所有中间节点
output = torch.cat(nodes[1:], dim=1)
return output
# 实现分解降采样
class FactorizedReduce(nn.Module):
"""使用分解降采样进行下采样"""
def __init__(self, C_in, C_out):
super(FactorizedReduce, self).__init__()
assert C_out % 2 == 0
self.relu = nn.ReLU(inplace=False)
self.conv_1 = nn.Conv2d(C_in, C_out // 2, 1, stride=2, padding=0, bias=False)
self.conv_2 = nn.Conv2d(C_in, C_out // 2, 1, stride=2, padding=0, bias=False)
self.bn = nn.BatchNorm2d(C_out)
def forward(self, x):
x = self.relu(x)
out = torch.cat([self.conv_1(x), self.conv_2(x[:, :, 1:, 1:])], dim=1)
out = self.bn(out)
return out
15. 针对图像分类的ENAS优化
基于对搜索空间的分析,我们可以进一步优化ENAS算法,使其更适合图像分类任务。下面是一些针对性优化策略:
15.1 特征增强策略
图像分类任务中,特征提取至关重要。我们可以在ENAS搜索空间中增加以下特性:
- 多尺度特征融合:添加从前面层到当前层的跳跃连接,捕获多尺度信息
- 注意力机制:引入通道注意力和空间注意力,增强关键特征
- 特征金字塔:在网络中添加特征金字塔结构,处理不同尺度的目标
下面是优化后的ENAS实现:
class EnhancedENASController(nn.Module):
"""增强型ENAS控制器,支持更多设计决策"""
def __init__(self, num_nodes, num_ops, lstm_size=100, attention_ops=2):
super(EnhancedENASController, self).__init__()
self.num_nodes = num_nodes
self.num_ops = num_ops
self.attention_ops = attention_ops # 注意力操作数量
self.lstm_size = lstm_size
# 输入嵌入
self.embed = nn.Embedding(num_nodes + num_ops + attention_ops, lstm_size)
# LSTM控制器
self.lstm = nn.LSTMCell(lstm_size, lstm_size)
# 节点选择器
self.node_selector = nn.Linear(lstm_size, num_nodes)
# 操作选择器
self.op_selector = nn.Linear(lstm_size, num_ops)
# 注意力选择器
self.attention_selector = nn.Linear(lstm_size, attention_ops)
# 多尺度连接选择器
self.scale_selector = nn.Linear(lstm_size, 3) # 三种尺度选择
# 存储架构决策
self.sampled_arch = []
self.sampled_probs = []
def forward(self, temperature=1.0):
"""生成架构决策"""
# 初始化LSTM状态
h = torch.zeros(1, self.lstm_size).cuda()
c = torch.zeros(1, self.lstm_size).cuda()
# 初始化输入
x = torch.zeros(1).long().cuda()
self.sampled_arch = []
self.sampled_probs = []
# 1. 为每个节点生成连接
for node_idx in range(2, self.num_nodes):
for i in range(node_idx):
# 选择前驱节点
embed = self.embed(x)
h, c = self.lstm(embed, (h, c))
logits = self.node_selector(h) / temperature
probs = F.softmax(logits, dim=-1)
prev_node = torch.multinomial(probs, 1).item()
self.sampled_arch.append(prev_node)
self.sampled_probs.append(probs[0, prev_node])
# 选择操作
x = torch.tensor([prev_node]).cuda()
embed = self.embed(x)
h, c = self.lstm(embed, (h, c))
logits = self.op_selector(h) / temperature
probs = F.softmax(logits, dim=-1)
op_id = torch.multinomial(probs, 1).item()
self.sampled_arch.append(op_id)
self.sampled_probs.append(probs[0, op_id])
x = torch.tensor([op_id + self.num_nodes]).cuda()
# 2. 生成注意力决策(为每个单元选择注意力机制)
embed = self.embed(x)
h, c = self.lstm(embed, (h, c))
logits = self.attention_selector(h) / temperature
probs = F.softmax(logits, dim=-1)
attention_id = torch.multinomial(probs, 1).item()
self.sampled_arch.append(attention_id)
self.sampled_probs.append(probs[0, attention_id])
# 3. 生成多尺度连接决策
x = torch.tensor([attention_id + self.num_nodes + self.num_ops]).cuda()
embed = self.embed(x)
h, c = self.lstm(embed, (h, c))
logits = self.scale_selector(h) / temperature
probs = F.softmax(logits, dim=-1)
scale_id = torch.multinomial(probs, 1).item()
self.sampled_arch.append(scale_id)
self.sampled_probs.append(probs[0, scale_id])
return self.sampled_arch, torch.stack(self.sampled_probs)
15.2 完整的图像分类ENAS实现
结合上述优化,我们可以实现一个完整的、针对图像分类优化的ENAS系统:
class ImageClassificationENAS:
"""针对图像分类优化的ENAS实现"""
def __init__(self, dataset='cifar10', batch_size=128, epochs=50):
self.dataset = dataset
self.batch_size = batch_size
self.epochs = epochs
# 初始化数据加载器
self.train_queue, self.valid_queue, self.test_queue = self._get_data_loaders()
# 初始化搜索空间
self.search_space = EnhancedCellSearchSpace(
num_cells=8,
num_nodes=7,
num_ops=7
)
# 初始化控制器
self.controller = EnhancedENASController(
num_nodes=7,
num_ops=7,
lstm_size=100,
attention_ops=3
).cuda()
# 初始化优化器
self.controller_optimizer = optim.Adam(
self.controller.parameters(),
lr=0.001
)
# 记录搜索历史
self.search_history = []
def _get_data_loaders(self):
"""获取数据加载器"""
if self.dataset == 'cifar10':
mean = [0.4914, 0.4822, 0.4465]
std = [0.2023, 0.1994, 0.2010]
num_classes = 10
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
valid_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
train_data = datasets.CIFAR10(root='./data', train=True,
download=True, transform=train_transform)
valid_data = datasets.CIFAR10(root='./data', train=False,
download=True, transform=valid_transform)
elif self.dataset == 'cifar100':
mean = [0.5071, 0.4867, 0.4408]
std = [0.2675, 0.2565, 0.2761]
num_classes = 100
train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
valid_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean, std)
])
train_data = datasets.CIFAR100(root='./data', train=True,
download=True, transform=train_transform)
valid_data = datasets.CIFAR100(root='./data', train=False,
download=True, transform=valid_transform)
else:
raise ValueError(f'不支持的数据集: {self.dataset}')
# 划分训练集和验证集
indices = list(range(len(train_data)))
np.random.shuffle(indices)
split = int(0.8 * len(indices))
train_indices, valid_indices = indices[:split], indices[split:]
train_queue = DataLoader(
train_data, batch_size=self.batch_size,
sampler=torch.utils.data.sampler.SubsetRandomSampler(train_indices)
)
valid_queue = DataLoader(
train_data, batch_size=self.batch_size,
sampler=torch.utils.data.sampler.SubsetRandomSampler(valid_indices)
)
test_queue = DataLoader(valid_data, batch_size=self.batch_size)
return train_queue, valid_queue, test_queue
def search(self):
"""执行架构搜索"""
# 初始化共享模型
shared_model = SharedModel(self.search_space, 36, 10).cuda()
shared_optimizer = optim.SGD(
shared_model.parameters(),
lr=0.05,
momentum=0.9,
weight_decay=3e-4
)
best_arch = None
best_acc = 0
for epoch in range(self.epochs):
# 训练共享参数
shared_model.train()
self.controller.eval()
for step, (x, target) in enumerate(self.train_queue):
x, target = x.cuda(), target.cuda(non_blocking=True)
# 采样架构
with torch.no_grad():
arch, _ = self.controller()
# 构建临时模型
model = self.search_space.build_model(arch, 36, 10).cuda()
model.load_state_dict(shared_model.state_dict(), strict=False)
# 前向计算和优化
shared_optimizer.zero_grad()
logits = model(x)
loss = nn.CrossEntropyLoss()(logits, target)
loss.backward()
shared_optimizer.step()
# 更新共享模型参数
shared_model.load_state_dict(model.state_dict(), strict=False)
# 训练控制器
self.controller.train()
shared_model.eval()
# 采样多个架构并评估
sampled_archs = []
accuracies = []
for _ in range(10): # 采样10个架构
arch, probs = self.controller()
sampled_archs.append(arch)
# 构建临时模型
model = self.search_space.build_model(arch, 36, 10).cuda()
model.load_state_dict(shared_model.state_dict(), strict=False)
# 在验证集上评估
acc = self._validate(model, self.valid_queue)
accuracies.append(acc)
# 记录搜索历史
self.search_history.append((arch, acc))
# 更新最佳架构
if acc > best_acc:
best_acc = acc
best_arch = arch
# 更新控制器
self.controller_optimizer.zero_grad()
baseline = sum(accuracies) / len(accuracies)
# 计算所有采样架构的损失
loss = 0
for i, (arch, acc) in enumerate(zip(sampled_archs, accuracies)):
_, probs = self.controller(arch=arch)
log_prob = torch.sum(torch.log(probs))
reward = acc - baseline
loss -= log_prob * reward
loss = loss / len(sampled_archs)
loss.backward()
self.controller_optimizer.step()
print(f"Epoch {epoch}: best_acc={best_acc:.2f}%")
# 返回最佳架构
return best_arch, best_acc
def _validate(self, model, dataloader):
"""验证模型性能"""
model.eval()
correct = 0
total = 0
with torch.no_grad():
for x, target in dataloader:
x, target = x.cuda(), target.cuda(non_blocking=True)
logits = model(x)
_, predicted = torch.max(logits, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
return 100 * correct / total
def evaluate_best_model(self, arch):
"""从头训练并评估最佳架构"""
# 构建最终模型
final_model = self.search_space.build_model(arch, 36, 10).cuda()
# 优化器
optimizer = optim.SGD(
final_model.parameters(),
lr=0.025,
momentum=0.9,
weight_decay=3e-4
)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, 200)
# 训练
best_acc = 0
for epoch in range(200): # 完整训练200轮
# 训练
final_model.train()
for step, (x, target) in enumerate(self.train_queue):
x, target = x.cuda(), target.cuda(non_blocking=True)
optimizer.zero_grad()
logits = final_model(x)
loss = nn.CrossEntropyLoss()(logits, target)
loss.backward()
optimizer.step()
# 测试
test_acc = self._validate(final_model, self.test_queue)
if test_acc > best_acc:
best_acc = test_acc
scheduler.step()
if epoch % 20 == 0:
print(f"Epoch {epoch}: test_acc={test_acc:.2f}%, best_acc={best_acc:.2f}%")
return best_acc
16. ENAS搜索过程可视化
为了更好地理解ENAS搜索过程,我们可以实现一个可视化工具:
17. 总结与最佳实践
通过本文的详细探讨,我们对ENAS算法及其在图像分类任务中的应用有了深入理解。现在让我们总结一些关键发现和最佳实践:
17.1 ENAS与其他NAS方法的比较
方法 | 搜索策略 | 参数共享 | 搜索时间 | 计算需求 | 最终性能 |
---|---|---|---|---|---|
传统NAS | 强化学习 | 无 | 1000+ GPU天 | 极高 | 优秀 |
ENAS | 强化学习 | 完全共享 | 0.5 GPU天 | 低 | 很好 |
DARTS | 梯度下降 | 软权重共享 | 1-4 GPU天 | 中 | 很好 |
PC-DARTS | 梯度下降 | 部分通道共享 | 0.1 GPU天 | 低 | 优秀 |
17.2 不同搜索空间的适用场景
链式结构搜索空间
- 适用场景:资源受限设备,需要轻量级模型
- 优点:搜索快速,模型简单
- 缺点:表达能力有限,性能上限较低
基于单元的搜索空间
- 适用场景:通用图像分类任务,需要平衡效率和性能
- 优点:搜索效率高,模型性能好,易于迁移到其他数据集
- 缺点:结构有限制,可能不如手工设计的特定模型
分层搜索空间
- 适用场景:复杂任务,需要高性能模型
- 优点:表达能力强,性能上限高
- 缺点:搜索开销大,模型较大
17.3 ENAS在实际项目中的最佳实践
搜索空间设计
- 根据任务复杂度和计算资源选择适当的搜索空间
- 加入领域知识和先验约束
- 避免过大或过小的搜索空间
控制器训练
- 使用小批量数据评估架构
- 应用熵正则化防止过早收敛
- 采用温度退火策略平衡探索与利用
参数共享策略
- 确保不同架构之间的公平评估
- 适当调整共享范围,避免架构干扰
- 周期性重置共享参数,防止偏差累积
最终模型训练
- 使用更大的模型和更长的训练时间
- 应用常见的训练技巧:学习率调度、数据增强等
- 考虑集成多个发现的架构
结论
本文详细介绍了ENAS算法在图像分类任务中的应用,分析了不同搜索空间对模型性能的影响,并提供了完整的PyTorch实现代码。通过实验比较,我们发现ENAS能够在极短的搜索时间内发现性能接近手工设计模型的神经网络架构。
基于单元的搜索空间在大多数图像分类任务中表现出最好的平衡性,它既保持了较高的搜索效率,又能产生高性能的模型。特别地,通过添加先验知识和适当的结构约束,可以进一步提高ENAS的搜索效率和所得模型的性能。
随着硬件性能的提升和算法的改进,自动化模型设计将成为深度学习发展的重要方向。通过ENAS等高效神经架构搜索方法,我们可以更容易地为特定任务定制最佳神经网络架构,减少人工设计的工作量,同时获得更好的性能。
最后,值得注意的是,虽然自动化模型设计取得了显著进展,但人工知识和领域经验在搜索空间设计和结果解释方面仍然至关重要。结合人工智能与人类智慧,我们能够创造出更加强大和高效的深度学习模型。
清华大学全五版的《DeepSeek教程》完整的文档需要的朋友,关注我私信:deepseek 即可获得。
怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!