PyTorch深度学习框架60天进阶学习计划 - 第46天:自动化模型设计(一)

发布于:2025-04-21 ⋅ 阅读:(37) ⋅ 点赞:(0)

PyTorch深度学习框架60天进阶学习计划 - 第46天:自动化模型设计(一)

第一部分:使用ENAS算法生成图像分类网络

大家好!欢迎来到我们PyTorch深度学习框架60天进阶学习计划的第46天。今天我们要深入探讨一个话题——使用高效神经架构搜索(Efficient Neural Architecture Search, ENAS)算法来自动设计图像分类网络。

1. ENAS简介

ENAS是由谷歌大脑团队在2018年提出的一种高效神经架构搜索方法。与传统NAS和DARTS相比,ENAS的主要创新在于引入了参数共享机制,大大提高了搜索效率。

ENAS的核心思想是:将整个搜索空间视为一个超大的计算图(supergraph),所有可能的模型架构都是这个超图的子图。通过让不同的子网络共享参数,ENAS避免了为每个候选架构单独训练的巨大计算开销。

与DARTS相比,ENAS的主要区别在于:DARTS使用可微分放松使得架构可以用梯度下降优化,而ENAS使用强化学习来学习如何从超图中采样高性能架构。

2. ENAS算法原理

ENAS算法由两个交替进行的步骤组成:

  1. 子模型采样与训练:使用控制器(Controller)从搜索空间中采样子模型架构,然后训练这些子模型。
  2. 控制器更新:基于子模型的性能,使用策略梯度(Policy Gradient)方法更新控制器,使其更有可能采样出高性能的架构。

这个过程可以形象地比作"建筑师(控制器)"和"工人(子模型训练)"的协作:建筑师提供设计图纸,工人根据图纸建造并给出反馈,建筑师根据反馈改进设计图纸。

3. ENAS搜索空间

ENAS的搜索空间通常是一个有向无环图(DAG),节点表示特征图,边表示操作。对于卷积网络,常见的候选操作包括:

操作类型 描述 PyTorch实现
3x3 标准卷积 基本卷积操作 nn.Conv2d(C, C, 3, padding=1)
5x5 标准卷积 更大感受野的卷积 nn.Conv2d(C, C, 5, padding=2)
3x3 深度可分离卷积 参数更少的卷积 SepConv(C, C, 3, 1)
5x5 深度可分离卷积 更大感受野的深度可分离卷积 SepConv(C, C, 5, 2)
3x3 最大池化 特征下采样 nn.MaxPool2d(3, stride=1, padding=1)
3x3 平均池化 另一种下采样方式 nn.AvgPool2d(3, stride=1, padding=1)
恒等映射 直接连接 Identity()

4. ENAS的控制器设计

ENAS控制器通常是一个循环神经网络(RNN),特别是LSTM,它学习生成网络架构。控制器根据当前状态输出每一步的动作概率,然后根据这些概率采样一个动作(比如选择哪种操作)。

以下是控制器的基本设计:

import torch
import torch.nn as nn
import torch.nn.functional as F

class Controller(nn.Module):
    """ENAS控制器,用于生成网络架构"""
    def __init__(self, num_nodes, num_ops, lstm_size=100, lstm_num_layers=1):
        super(Controller, self).__init__()
        self.num_nodes = num_nodes
        self.num_ops = num_ops
        self.lstm_size = lstm_size
        self.lstm_num_layers = lstm_num_layers
        
        # 输入嵌入
        self.embed = nn.Embedding(num_nodes + num_ops, lstm_size)
        
        # LSTM控制器
        self.lstm = nn.LSTMCell(lstm_size, lstm_size)
        
        # 节点选择器
        self.node_selector = nn.Linear(lstm_size, num_nodes)
        
        # 操作选择器
        self.op_selector = nn.Linear(lstm_size, num_ops)
        
        # 存储架构决策
        self.sampled_arch = []
        self.sampled_probs = []
        
    def forward(self, temperature=1.0):
        """生成一个架构样本"""
        # 初始化LSTM隐藏状态
        h = torch.zeros(1, self.lstm_size).cuda()
        c = torch.zeros(1, self.lstm_size).cuda()
        
        # 初始化输入
        x = torch.zeros(1).long().cuda()
        
        # 清空之前的采样
        self.sampled_arch = []
        self.sampled_probs = []
        
        # 循环生成每个节点的连接和操作
        for node_idx in range(2, self.num_nodes):  # 跳过输入节点
            # 为当前节点选择连接的前驱节点
            for i in range(node_idx):
                # 更新LSTM状态
                embed = self.embed(x)
                h, c = self.lstm(embed, (h, c))
                
                # 计算前驱节点的概率
                logits = self.node_selector(h) / temperature
                probs = F.softmax(logits, dim=-1)
                
                # 采样前驱节点
                prev_node = torch.multinomial(probs, 1).item()
                self.sampled_arch.append(prev_node)
                self.sampled_probs.append(probs[0, prev_node])
                
                # 为连接选择操作
                x = torch.tensor([prev_node]).cuda()
                embed = self.embed(x)
                h, c = self.lstm(embed, (h, c))
                
                # 计算操作的概率
                logits = self.op_selector(h) / temperature
                probs = F.softmax(logits, dim=-1)
                
                # 采样操作
                op_id = torch.multinomial(probs, 1).item()
                self.sampled_arch.append(op_id)
                self.sampled_probs.append(probs[0, op_id])
                
                # 更新输入
                x = torch.tensor([op_id + self.num_nodes]).cuda()
        
        return self.sampled_arch, torch.stack(self.sampled_probs)

5. ENAS的子模型设计

ENAS的子模型是从超图中采样出的具体架构。下面是子模型的基本实现:

class ENASModel(nn.Module):
    """ENAS子模型"""
    def __init__(self, arch, num_nodes, num_ops, C):
        super(ENASModel, self).__init__()
        self.arch = arch
        self.num_nodes = num_nodes
        self.num_ops = num_ops
        self.C = C  # 通道数
        
        # 定义候选操作列表
        self.OPS = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(C, C, 3, padding=1, bias=False),
                nn.BatchNorm2d(C),
                nn.ReLU(inplace=False)
            ),  # 3x3 标准卷积
            nn.Sequential(
                nn.Conv2d(C, C, 5, padding=2, bias=False),
                nn.BatchNorm2d(C),
                nn.ReLU(inplace=False)
            ),  # 5x5 标准卷积
            SepConv(C, C, 3, 1),  # 3x3 深度可分离卷积
            SepConv(C, C, 5, 2),  # 5x5 深度可分离卷积
            nn.MaxPool2d(3, stride=1, padding=1),  # 3x3 最大池化
            nn.AvgPool2d(3, stride=1, padding=1),  # 3x3 平均池化
            nn.Identity()  # 恒等映射
        ])
        
        # 节点特征初始化
        self.nodes = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(3, C, 3, padding=1, bias=False),
                nn.BatchNorm2d(C),
                nn.ReLU(inplace=False)
            ),  # 节点0(输入处理)
            nn.Sequential(
                nn.Conv2d(C, C, 3, padding=1, bias=False),
                nn.BatchNorm2d(C),
                nn.ReLU(inplace=False)
            )   # 节点1
        ])
        
        # 分类器
        self.classifier = nn.Linear(C, 10)  # CIFAR-10分类
        
    def forward(self, x):
        # 初始化所有节点的特征
        node_features = [None] * self.num_nodes
        node_features[0] = self.nodes[0](x)
        node_features[1] = self.nodes[1](node_features[0])
        
        # 根据架构描述构建计算图
        idx = 0
        for node_idx in range(2, self.num_nodes):
            # 每个节点的所有输入
            node_inputs = []
            for i in range(node_idx):
                # 获取连接的前驱节点和操作
                prev_node = self.arch[idx]
                op_id = self.arch[idx + 1]
                idx += 2
                
                # 计算特征
                node_input = self.OPS[op_id](node_features[prev_node])
                node_inputs.append(node_input)
            
            # 节点特征为所有输入的和
            node_features[node_idx] = sum(node_inputs)
        
        # 全局平均池化
        out = F.adaptive_avg_pool2d(node_features[-1], 1)
        out = out.view(out.size(0), -1)
        
        # 分类
        logits = self.classifier(out)
        
        return logits

# 定义可分离卷积
class SepConv(nn.Module):
    def __init__(self, C_in, C_out, kernel_size, padding):
        super(SepConv, self).__init__()
        self.op = nn.Sequential(
            nn.ReLU(inplace=False),
            nn.Conv2d(C_in, C_in, kernel_size=kernel_size, stride=1, 
                      padding=padding, groups=C_in, bias=False),
            nn.Conv2d(C_in, C_out, kernel_size=1, padding=0, bias=False),
            nn.BatchNorm2d(C_out),
            nn.ReLU(inplace=False),
            nn.Conv2d(C_out, C_out, kernel_size=kernel_size, stride=1, 
                      padding=padding, groups=C_out, bias=False),
            nn.Conv2d(C_out, C_out, kernel_size=1, padding=0, bias=False),
            nn.BatchNorm2d(C_out)
        )

    def forward(self, x):
        return self.op(x)

6. ENAS的训练过程

ENAS训练包括两个交替的阶段:子模型训练和控制器训练。下面是一个简化的ENAS训练过程:

def train_enas(controller, shared_model, train_queue, valid_queue, controller_optimizer, 
               shared_optimizer, epochs, device='cuda'):
    """ENAS训练主循环"""
    for epoch in range(epochs):
        # 1. 训练共享参数
        shared_model.train()
        controller.eval()
        
        for step, (x, target) in enumerate(train_queue):
            x, target = x.to(device), target.to(device)
            
            # 采样架构
            with torch.no_grad():
                arch, _ = controller()
            
            # 使用采样架构进行前向计算
            shared_optimizer.zero_grad()
            logits = shared_model(x, arch)
            loss = F.cross_entropy(logits, target)
            loss.backward()
            shared_optimizer.step()
        
        # 2. 训练控制器
        shared_model.eval()
        controller.train()
        
        # 在验证集上评估采样架构
        for step, (x, target) in enumerate(valid_queue):
            x, target = x.to(device), target.to(device)
            
            # 采样架构并记录概率
            controller_optimizer.zero_grad()
            arch, probs = controller()
            
            # 使用采样架构进行前向计算
            with torch.no_grad():
                logits = shared_model(x, arch)
                reward = compute_reward(logits, target)  # 如验证准确率
            
            # 使用REINFORCE算法更新控制器
            log_prob = torch.sum(torch.log(probs))
            loss = -log_prob * reward
            loss.backward()
            controller_optimizer.step()
        
        # 打印当前最佳架构
        with torch.no_grad():
            best_arch, _ = controller()
            print(f"Epoch {epoch}, Best Architecture: {best_arch}")

def compute_reward(logits, target):
    """计算架构奖励(通常是验证准确率)"""
    _, predicted = torch.max(logits, 1)
    correct = (predicted == target).sum().item()
    reward = correct / target.size(0)
    return reward

7. ENAS架构搜索流程图

下面是ENAS架构搜索的流程图:

8. ENAS相比DARTS的优势

ENAS和DARTS都是高效的神经架构搜索方法,但它们有不同的特点:

特性 ENAS DARTS
搜索方法 强化学习(离散) 梯度下降(连续)
参数共享 完全共享 软权重共享
计算效率 高(0.5 GPU天) 中(1-4 GPU天)
内存需求 高(需要二阶导数)
架构离散化 不需要 需要(连续→离散)
实现复杂度 中等 较高

第二部分:搜索空间设计对模型性能的影响

9. 搜索空间设计的重要性

搜索空间设计是神经架构搜索中最关键的因素之一。一个好的搜索空间应该满足以下条件:

  1. 覆盖性:包含足够多样化的架构,包括潜在的高性能架构
  2. 高效性:避免过于广泛导致搜索困难
  3. 结构合理性:符合神经网络设计的基本原则和先验知识

搜索空间设计对最终模型性能有直接影响。如果搜索空间不包含高性能架构,即使有最好的搜索算法也无法找到好的模型;反之,如果搜索空间过大,搜索效率会大大降低。

10. 常见的搜索空间类型

我们可以根据设计方式将搜索空间分为以下几类:

  1. 宏搜索空间(Macro):直接搜索整个网络的连接方式和操作类型,灵活性最高,但搜索难度也最大。
  2. 微搜索空间(Micro):预定义网络的宏观结构(如层数),只搜索重复单元(cell)的内部结构,平衡了灵活性和搜索效率。
  3. 层级搜索空间(Hierarchical):结合宏观和微观搜索,按层次化方式定义搜索空间。

下面是不同搜索空间类型的对比:

搜索空间类型 灵活性 搜索效率 典型方法 适用场景
宏搜索空间 早期NAS 特定任务定制
微搜索空间 ENAS/DARTS 通用视觉任务
层级搜索空间 Auto-DeepLab 复杂任务

11. ENAS不同搜索空间的实现

让我们实现几种不同的ENAS搜索空间,并对比它们的影响:

11.1 基于链式结构的搜索空间
class ChainSearchSpace:
    """链式结构搜索空间,每个节点只连接到前一个节点"""
    def __init__(self, num_layers, num_ops):
        self.num_layers = num_layers
        self.num_ops = num_ops
    
    def sample_arch(self, controller):
        """从控制器采样架构"""
        # 只需要采样每层的操作类型
        arch = []
        for i in range(self.num_layers):
            op_id = controller.sample_op()
            arch.append(op_id)
        return arch
    
    def build_model(self, arch, C, num_classes):
        """根据架构构建模型"""
        layers = []
        in_channels = 3  # 输入图像通道数
        
        # 干细胞层
        layers.append(nn.Conv2d(in_channels, C, 3, padding=1))
        layers.append(nn.BatchNorm2d(C))
        layers.append(nn.ReLU(inplace=True))
        
        # 构建主要层
        for i, op_id in enumerate(arch):
            # 定义当前层操作
            if op_id == 0:  # 3x3 卷积
                layers.append(nn.Conv2d(C, C, 3, padding=1))
                layers.append(nn.BatchNorm2d(C))
                layers.append(nn.ReLU(inplace=True))
            elif op_id == 1:  # 5x5 卷积
                layers.append(nn.Conv2d(C, C, 5, padding=2))
                layers.append(nn.BatchNorm2d(C))
                layers.append(nn.ReLU(inplace=True))
            elif op_id == 2:  # 3x3 最大池化
                layers.append(nn.MaxPool2d(3, stride=1, padding=1))
            elif op_id == 3:  # 3x3 平均池化
                layers.append(nn.AvgPool2d(3, stride=1, padding=1))
            # 可以添加更多操作类型
            
            # 每隔几层下采样
            if i > 0 and i % 3 == 0:
                layers.append(nn.MaxPool2d(2, stride=2))
                
        # 分类头
        layers.append(nn.AdaptiveAvgPool2d(1))
        
        # 构建序列模型
        model = nn.Sequential(*layers)
        model.add_module('classifier', nn.Linear(C, num_classes))
        
        return model
11.2 基于单元(Cell)的搜索空间
class CellSearchSpace:
    """基于单元的搜索空间,搜索重复单元的内部结构"""
    def __init__(self, num_cells, num_nodes, num_ops):
        self.num_cells = num_cells  # 单元数量
        self.num_nodes = num_nodes  # 每个单元中的节点数
        self.num_ops = num_ops      # 候选操作数量
    
    def sample_arch(self, controller):
        """从控制器采样架构"""
        arch = []
        for i in range(self.num_cells):
            cell_arch = []
            # 为单元中的每个节点采样
            for j in range(2, self.num_nodes):  # 跳过前两个输入节点
                # 为当前节点采样前驱节点和操作
                for k in range(j):
                    prev_node = controller.sample_node(k)
                    op_id = controller.sample_op()
                    cell_arch.extend([prev_node, op_id])
            arch.append(cell_arch)
        return arch
    
    def build_model(self, arch, C, num_classes):
        """根据架构构建模型"""
        model = CellBasedNetwork(arch, self.num_cells, self.num_nodes, 
                                 self.num_ops, C, num_classes)
        return model

class CellBasedNetwork(nn.Module):
    """基于单元的网络模型"""
    def __init__(self, arch, num_cells, num_nodes, num_ops, C, num_classes):
        super(CellBasedNetwork, self).__init__()
        self.arch = arch
        self.num_cells = num_cells
        self.num_nodes = num_nodes
        self.num_ops = num_ops
        self.C = C
        
        # 定义干细胞网络
        self.stem = nn.Sequential(
            nn.Conv2d(3, C, 3, padding=1, bias=False),
            nn.BatchNorm2d(C)
        )
        
        # 定义单元
        self.cells = nn.ModuleList()
        C_prev, C_curr = C, C
        for i in range(num_cells):
            # 每隔几个单元进行下采样
            if i in [num_cells//3, 2*num_cells//3]:
                C_curr *= 2
                reduction = True
            else:
                reduction = False
            
            cell = Cell(arch[i], C_prev, C_curr, reduction, num_nodes, num_ops)
            self.cells.append(cell)
            C_prev = C_curr * num_nodes  # 单元输出通道数
        
        # 分类器
        self.global_pooling = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Linear(C_prev, num_classes)
    
    def forward(self, x):
        # 干细胞处理
        x = self.stem(x)
        
        # 通过所有单元
        for cell in self.cells:
            x = cell(x)
        
        # 分类
        out = self.global_pooling(x)
        out = out.view(out.size(0), -1)
        logits = self.classifier(out)
        
        return logits

class Cell(nn.Module):
    """网络中的基本单元"""
    def __init__(self, arch, C_in, C_out, reduction, num_nodes, num_ops):
        super(Cell, self).__init__()
        self.arch = arch
        self.reduction = reduction
        self.num_nodes = num_nodes
        
        # 预处理输入
        stride = 2 if reduction else 1
        self.preprocess = nn.Sequential(
            nn.ReLU(inplace=False),
            nn.Conv2d(C_in, C_out, 1, stride=stride, bias=False),
            nn.BatchNorm2d(C_out)
        )
        
        # 定义候选操作
        self.ops = nn.ModuleList()
        for i in range(num_ops):
            if i == 0:  # 3x3 卷积
                op = nn.Sequential(
                    nn.ReLU(inplace=False),
                    nn.Conv2d(C_out, C_out, 3, padding=1, bias=False),
                    nn.BatchNorm2d(C_out)
                )
            elif i == 1:  # 5x5 卷积
                op = nn.Sequential(
                    nn.ReLU(inplace=False),
                    nn.Conv2d(C_out, C_out, 5, padding=2, bias=False),
                    nn.BatchNorm2d(C_out)
                )
            elif i == 2:  # 3x3 可分离卷积
                op = SepConv(C_out, C_out, 3, 1)
            elif i == 3:  # 5x5 可分离卷积
                op = SepConv(C_out, C_out, 5, 2)
            elif i == 4:  # 3x3 最大池化
                op = nn.MaxPool2d(3, stride=1, padding=1)
            elif i == 5:  # 3x3 平均池化
                op = nn.AvgPool2d(3, stride=1, padding=1)
            elif i == 6:  # 恒等映射
                op = nn.Identity()
            
            self.ops.append(op)
    
    def forward(self, x):
        # 预处理输入
        x = self.preprocess(x)
        
        # 初始化所有节点的特征
        nodes = [x]
        
        # 根据架构构建计算图
        idx = 0
        for i in range(2, self.num_nodes):
            # 为当前节点计算所有输入
            node_inputs = []
            for j in range(i):
                prev_node = self.arch[idx]
                op_id = self.arch[idx + 1]
                idx += 2
                
                # 计算该输入的特征
                node_input = self.ops[op_id](nodes[prev_node])
                node_inputs.append(node_input)
            
            # 节点特征为所有输入的和
            nodes.append(sum(node_inputs))
        
        # 连接所有中间节点
        output = torch.cat(nodes[1:], dim=1)
        return output
11.3 基于分层搜索空间
class HierarchicalSearchSpace:
    """分层搜索空间,同时搜索网络架构和单元结构"""
    def __init__(self, num_blocks, num_cells_per_block, num_nodes, num_ops):
        self.num_blocks = num_blocks
        self.num_cells_per_block = num_cells_per_block
        self.num_nodes = num_nodes
        self.num_ops = num_ops
    
    def sample_arch(self, controller):
        """从控制器采样架构"""
        # 采样整体网络架构
        network_arch = []
        for i in range(self.num_blocks):
            # 为每个块采样单元数量(可变)
            num_cells = controller.sample_cells_count()
            
            # 为每个块采样下采样策略
            downsample = controller.sample_downsample()
            
            network_arch.append((num_cells, downsample))
        
        # 采样单元内部结构
        cell_arch = []
        for j in range(2, self.num_nodes):
            # 为当前节点采样前驱节点和操作
            for k in range(j):
                prev_node = controller.sample_node(k)
                op_id = controller.sample_op()
                cell_arch.extend([prev_node, op_id])
        
        return (network_arch, cell_arch)
    
    def build_model(self, arch, C, num_classes):
        """根据架构构建模型"""
        network_arch, cell_arch = arch
        model = HierarchicalNetwork(network_arch, cell_arch, self.num_nodes, 
                                    self.num_ops, C, num_classes)
        return model

class HierarchicalNetwork(nn.Module):
    """分层搜索的网络模型"""
    def __init__(self, network_arch, cell_arch, num_nodes, num_ops, C, num_classes):
        super(HierarchicalNetwork, self).__init__()
        self.network_arch = network_arch
        self.cell_arch = cell_arch
        self.num_nodes = num_nodes
        self.num_ops = num_ops
        self.C = C
        
        # 干细胞网络
        self.stem = nn.Sequential(
            nn.Conv2d(3, C, 3, padding=1, bias=False),
            nn.BatchNorm2d(C)
        )
        
        # 构建网络主体
        self.blocks = nn.ModuleList()
        in_channels = C
        for block_id, (num_cells, downsample) in enumerate(network_arch):
            block = nn.ModuleList()
            
            # 确定是否下采样及通道数
            if block_id > 0 and downsample:
                stride = 2
                out_channels = in_channels * 2
            else:
                stride = 1
                out_channels = in_channels
            
            # 添加该块中的所有单元
            for cell_id in range(num_cells):
                # 第一个单元可能需要下采样
                if cell_id == 0 and stride == 2:
                    cell = Cell(cell_arch, in_channels, out_channels, True, num_nodes, num_ops)
                else:
                    cell = Cell(cell_arch, out_channels, out_channels, False, num_nodes, num_ops)
                block.append(cell)
            
            self.blocks.append(block)
            in_channels = out_channels
        
        # 分类器
        self.global_pooling = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Linear(in_channels, num_classes)
    
    def forward(self, x):
        # 干细胞处理
        x = self.stem(x)
        
        # 通过所有块和单元
        for block in self.blocks:
            for cell in block:
                x = cell(x)
        
        # 分类
        out = self.global_pooling(x)
        out = out.view(out.size(0), -1)
        logits = self.classifier(out)
        
        return logits

12. 不同搜索空间的对比实验

让我们设计一个实验来对比不同搜索空间对ENAS性能的影响。我们将使用CIFAR-10数据集,并在三种不同的搜索空间上运行ENAS算法:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import time
import numpy as np

def compare_search_spaces():
    """对比不同搜索空间的性能"""
    # 设置参数
    C = 36  # 初始通道数
    num_classes = 10  # CIFAR-10
    epochs = 50
    
    # 定义数据加载器
    transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
    ])
    
    train_data = datasets.CIFAR10(root='./data', train=True, 
                                  download=True, transform=transform)
    valid_data = datasets.CIFAR10(root='./data', train=False, 
                                  download=True, transform=transform)
    
    # 划分训练集和验证集
    indices = list(range(len(train_data)))
    np.random.shuffle(indices)
    split = int(0.8 * len(indices))
    train_indices, valid_indices = indices[:split], indices[split:]
    
    train_queue = DataLoader(
        train_data, batch_size=128,
        sampler=torch.utils.data.sampler.SubsetRandomSampler(train_indices)
    )
    
    valid_queue = DataLoader(
        train_data, batch_size=128,
        sampler=torch.utils.data.sampler.SubsetRandomSampler(valid_indices)
    )
    
    test_queue = DataLoader(valid_data, batch_size=128)
    
    # 定义搜索空间
    search_spaces = {
        'chain': ChainSearchSpace(num_layers=15, num_ops=7),
        'cell': CellSearchSpace(num_cells=8, num_nodes=7, num_ops=7),
        'hierarchical': HierarchicalSearchSpace(num_blocks=3, 
                                               num_cells_per_block=3, 
                                               num_nodes=7, 
                                               num_ops=7)
    }
    
    # 对比结果存储
    results = {}
    
    # 对每个搜索空间运行ENAS
    for name, search_space in search_spaces.items():
        print(f"Testing search space: {name}")
        
        # 创建控制器
        controller = Controller(
            num_nodes=7, 
            num_ops=7, 
            lstm_size=100, 
            lstm_num_layers=1
        ).cuda()
        
        # 创建优化器
        controller_optimizer = optim.Adam(
            controller.parameters(),
            lr=0.001
        )
        
        # 记录时间和最佳准确率
        start_time = time.time()
        
        # 运行ENAS搜索
        best_arch, best_acc = run_enas_search(
            controller, 
            search_space,
            train_queue, 
            valid_queue, 
            controller_optimizer,
            epochs,
            C,
            num_classes
        )
        
        # 计算搜索时间
        search_time = time.time() - start_time
        
        # 从头训练最佳架构
        final_model = search_space.build_model(best_arch, C, num_classes).cuda()
        final_acc = train_from_scratch(final_model, train_queue, test_queue, epochs=100)
        
        # 记录结果
        results[name] = {
            'search_time': search_time,
            'search_acc': best_acc,
            'final_acc': final_acc
        }
    
    # 打印结果
    print("\nResults:")
    print("-" * 50)
    print(f"{'Search Space':<15} {'Search Time(h)':<15} {'Search Acc(%)':<15} {'Final Acc(%)':<15}")
    print("-" * 50)
    for name, result in results.items():
        print(f"{name:<15} {result['search_time']/3600:<15.2f} {result['search_acc']:<15.2f} {result['final_acc']:<15.2f}")
    
    return results

def run_enas_search(controller, search_space, train_queue, valid_queue, 
                    controller_optimizer, epochs, C, num_classes):
    """运行ENAS搜索过程"""
    best_arch = None
    best_acc = 0
    
    # 初始化共享参数
    shared_model = SharedModel(search_space, C, num_classes).cuda()
    shared_optimizer = optim.SGD(
        shared_model.parameters(),
        lr=0.05,
        momentum=0.9,
        weight_decay=3e-4
    )
    
    for epoch in range(epochs):
        # 训练共享参数
        for step, (x, target) in enumerate(train_queue):
            shared_model.train()
            controller.eval()
            
            x, target = x.cuda(), target.cuda(non_blocking=True)
            
            # 采样架构
            with torch.no_grad():
                arch, _ = controller()
            
            # 构建临时模型
            model = search_space.build_model(arch, C, num_classes).cuda()
            model.load_state_dict(shared_model.state_dict(), strict=False)
            
            # 前向计算和优化
            shared_optimizer.zero_grad()
            logits = model(x)
            loss = nn.CrossEntropyLoss()(logits, target)
            loss.backward()
            shared_optimizer.step()
            
            # 更新共享模型参数
            shared_model.load_state_dict(model.state_dict(), strict=False)
        
        # 训练控制器
        controller.train()
        shared_model.eval()
        
        # 采样多个架构并评估
        sampled_archs = []
        accuracies = []
        
        for _ in range(10):  # 采样10个架构
            arch, probs = controller()
            sampled_archs.append(arch)
            
            # 构建临时模型
            model = search_space.build_model(arch, C, num_classes).cuda()
            model.load_state_dict(shared_model.state_dict(), strict=False)
            
            # 在验证集上评估
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for x, target in valid_queue:
                    x, target = x.cuda(), target.cuda(non_blocking=True)
                    logits = model(x)
                    _, predicted = torch.max(logits, 1)
                    total += target.size(0)
                    correct += (predicted == target).sum().item()
            
            acc = 100 * correct / total
            accuracies.append(acc)
            
            # 更新最佳架构
            if acc > best_acc:
                best_acc = acc
                best_arch = arch
        
        # 更新控制器
        controller_optimizer.zero_grad()
        baseline = sum(accuracies) / len(accuracies)
        
        # 计算所有采样架构的损失
        loss = 0
        for i, (arch, acc) in enumerate(zip(sampled_archs, accuracies)):
            _, probs = controller(arch=arch)
            log_prob = torch.sum(torch.log(probs))
            reward = acc - baseline
            loss -= log_prob * reward
        
        loss = loss / len(sampled_archs)
        loss.backward()
        controller_optimizer.step()
        
        print(f"Epoch {epoch}: best_acc={best_acc:.2f}%")
    
    return best_arch, best_acc

class SharedModel(nn.Module):
    """共享参数模型"""
    def __init__(self, search_space, C, num_classes):
        super(SharedModel, self).__init__()
        self.search_space = search_space
        self.C = C
        self.num_classes = num_classes
        
        # 初始化共享参数
        self.shared_params = nn.ParameterDict()
        
        # 初始化干细胞层参数
        self.shared_params['stem.weight'] = nn.Parameter(
            torch.zeros(C, 3, 3, 3)
        )
        self.shared_params['stem.bn.weight'] = nn.Parameter(
            torch.ones(C)
        )
        self.shared_params['stem.bn.bias'] = nn.Parameter(
            torch.zeros(C)
        )
        
        # 初始化操作参数
        for i in range(7):  # 7种操作
            self.shared_params[f'op{i}.weight'] = nn.Parameter(
                torch.zeros(C, C, 3, 3)
            )
            self.shared_params[f'op{i}.bn.weight'] = nn.Parameter(
                torch.ones(C)
            )
            self.shared_params[f'op{i}.bn.bias'] = nn.Parameter(
                torch.zeros(C)
            )
        
        # 初始化分类器参数
        self.shared_params['classifier.weight'] = nn.Parameter(
            torch.zeros(num_classes, C)
        )
        self.shared_params['classifier.bias'] = nn.Parameter(
            torch.zeros(num_classes)
        )
    
    def forward(self, x):
        # 必须提供具体架构才能前向计算
        raise NotImplementedError("SharedModel需要具体架构才能前向计算")
    
    def state_dict(self):
        return self.shared_params

def train_from_scratch(model, train_queue, test_queue, epochs=100):
    """从头训练最终模型"""
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(
        model.parameters(),
        lr=0.025,
        momentum=0.9,
        weight_decay=3e-4
    )
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs)
    
    best_acc = 0
    for epoch in range(epochs):
        # 训练
        model.train()
        for step, (x, target) in enumerate(train_queue):
            x, target = x.cuda(), target.cuda(non_blocking=True)
            
            optimizer.zero_grad()
            logits = model(x)
            loss = criterion(logits, target)
            loss.backward()
            optimizer.step()
        
        # 测试
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for x, target in test_queue:
                x, target = x.cuda(), target.cuda(non_blocking=True)
                logits = model(x)
                _, predicted = torch.max(logits, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()
        
        acc = 100 * correct / total
        if acc > best_acc:
            best_acc = acc
        
        scheduler.step()
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: acc={acc:.2f}%, best_acc={best_acc:.2f}%")
    
    return best_acc

13. 搜索空间对比结果分析

根据实验结果,我们可以分析不同搜索空间的优缺点:

通过对比实验结果,我们可以得出以下结论:

  1. 搜索空间复杂度与性能的权衡

    • 链式结构搜索空间最简单,搜索速度最快,但最终性能有限
    • 分层结构搜索空间最复杂,搜索时间最长,但也能找到性能最好的模型
    • 基于单元的搜索空间在搜索效率和模型性能之间取得了良好的平衡
  2. 模型大小与计算复杂度

    • 分层搜索通常会产生更大的模型,参数量和推理延迟相对较高
    • 链式结构模型最小,但表达能力有限
    • 实际应用时需要根据部署环境限制选择适当的搜索空间
  3. 搜索稳定性

    • 基于单元的搜索空间通常更稳定,不同运行之间性能波动小
    • 分层结构搜索由于空间大,可能需要更多次搜索才能找到最优架构

清华大学全五版的《DeepSeek教程》完整的文档需要的朋友,关注我私信:deepseek 即可获得。

怎么样今天的内容还满意吗?再次感谢朋友们的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


网站公告

今日签到

点亮在社区的每一天
去签到