神经网络与深度学习:案例与实践——第三章(2)

发布于:2025-04-06 ⋅ 阅读:(17) ⋅ 点赞:(0)

神经网络与深度学习:案例与实践——第三章(2)

基于Softmax回归的多分类任务

Logistic回归可以有效地解决二分类问题,但在分类任务中,还有一类多分类问题,即类别数
C大于2 的分类问题。Softmax回归就是Logistic回归在多分类问题上的推广。
使用Softmax回归模型对一个简单的数据集进行多分类实验。

数据集构建

我们首先构建一个简单的多分类任务,并构建训练集、验证集和测试集。 本任务的数据来自3个不同的簇,每个簇对一个类别。采集1000条样本,每个样本包含2个特征。

数据集的构建函数make_multi的代码实现如下:

import numpy as np
import paddle
def make_multiclass_classification(n_samples=100, n_features=2, n_classes=3, shuffle=True, noise=0.1):
    """
    生成带噪音的多类别数据
    输入:
        - n_samples:数据量大小,数据类型为int
        - n_features:特征数量,数据类型为int
        - shuffle:是否打乱数据,数据类型为bool
        - noise:以多大的程度增加噪声,数据类型为None或float,noise为None时表示不增加噪声
    输出:
        - X:特征数据,shape=[n_samples,2]
        - y:标签数据, shape=[n_samples,1]
    """
    # 计算每个类别的样本数量
    n_samples_per_class = [int(n_samples / n_classes) for k in range(n_classes)]
    for i in range(n_samples - sum(n_samples_per_class)):
        n_samples_per_class[i % n_classes] += 1
    
    # 将特征和标签初始化为0
    X = paddle.zeros([n_samples, n_features])
    y = paddle.zeros([n_samples], dtype='int32')

    # 随机生成3个簇中心作为类别中心
    #paddle.randperm(2 ** n_features),生成 0 到 2^n_features - 1 的随机排列(整数序列)。
    # [:n_classes]:选择前 n_classes 个不同的索引作为类别中心。
    centroids = paddle.randperm(2 ** n_features)[:n_classes]

    #centroids.numpy().astype('uint8'):将 Paddle 张量转为 NumPy 数组,并转换为 uint8 类型(8位无符号整数)。
    # np.unpackbits():将每个整数解包为 8 位二进制(如 3 → [0, 0, 0, 0, 0, 0, 1, 1])。
    # reshape((-1, 8)):将解包后的二进制数组重塑为 (n_classes, 8) 的形状。
    # [:, -n_features:]:仅保留最后 n_features 位(如 n_features=2 时取最后 2 位 [1,1])
    centroids_bin = np.unpackbits(centroids.numpy().astype('uint8')).reshape((-1, 8))[:, -n_features:]
    centroids = paddle.to_tensor(centroids_bin, dtype='float32')

    # 控制簇中心的分离程度
    #在之前俺 centroids 变为了二进制数,所以现在只有1,0两种数字。
    #经过下面的计算,使得变换后的值被映射到 [-1, 0.5] 区间。
    centroids = 1.5 * centroids - 1

    # 随机生成特征值
    X[:, :n_features] = paddle.randn(shape=[n_samples, n_features])

    stop = 0
    # 将每个类的特征值控制在簇中心附近
    #生成多类别分类数据的核心部分,负责为每个类别生成具有特定分布的特征数据
    #k:类别索引(0 到 n_classes-1)
    # centroid:当前类别的中心点坐标(形状 [n_features,])
    for k, centroid in enumerate(centroids):
        #确定当前类别的样本在总数据中的位置范围
        start, stop = stop, stop + n_samples_per_class[k]
        # 指定标签值,[start:stop] 范围内 的标签设为 k
        y[start:stop] = k % n_classes
        # X_k:当前类别的特征数据子集(形状 [n_samples_per_class[k], n_features])
        X_k = X[start:stop, :n_features]
        # 控制每个类别特征值的分散程度
        # paddle.rand:生成 [0,1) 均匀分布随机数
        # 变换 2*A-1:将值域映射到 [-1, 1),使变换矩阵包含收缩/拉伸和旋转
        A = 2 * paddle.rand(shape=[n_features, n_features]) - 1
        X_k[...] = paddle.matmul(X_k, A)
        #将变换后的数据平移至当前类别的中心点位置
        X_k += centroid
        #将处理后的类别数据存回总数组
        X[start:stop, :n_features] = X_k

    # 如果noise不为None,则给特征加入噪声
    if noise > 0.0:
        # 生成noise掩膜,用来指定给那些样本加入噪声
        noise_mask = paddle.rand([n_samples]) < noise
        for i in range(len(noise_mask)):
            if noise_mask[i]:
                # 给加噪声的样本随机赋标签值
                y[i] = paddle.randint(n_classes, shape=[1]).astype('int32')
    # 如果shuffle为True,将所有数据打乱
    if shuffle:
        idx = paddle.randperm(X.shape[0])
        X = X[idx]
        y = y[idx]

    return X, y

随机采集1000个样本,并进行可视化。代码如下:

# 固定随机种子,保持每次运行结果一致
paddle.seed(102)
# 采样1000个样本
n_samples = 1000
X, y = make_multiclass_classification(n_samples=n_samples, n_features=2, n_classes=3, noise=0.2)

# 可视化生产的数据集,不同颜色代表不同类别
plt.figure(figsize=(5,5))
plt.scatter(x=X[:, 0].tolist(), y=X[:, 1].tolist(), marker='*', c=y.tolist())
plt.show()

输出·结果:
在这里插入图片描述
将实验数据拆分成训练集、验证集和测试集。其中训练集640条、验证集160条、测试集200条。
代码如下:

num_train = 640
num_dev = 160
num_test = 200

X_train, y_train = X[:num_train], y[:num_train]
X_dev, y_dev = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]
X_test, y_test = X[num_train + num_dev:], y[num_train + num_dev:]

# 打印X_train和y_train的维度
print("X_train shape: ", X_train.shape, "y_train shape: ", y_train.shape)

就完成了Multi1000数据集的构建。

模型构建

Softmax回归的核心思想
功能:用于多类别分类问题,预测输入样本属于每个类别的概率。
输出:一个长度为类别数 C 的概率向量,每个元素表示对应类别的预测概率。
对比Logistic回归:
Logistic回归:二分类,输出单个概率值(通过Sigmoid函数)。
Softmax回归:多分类,输出概率分布(通过Softmax函数)。

Softmax函数

该函数可以将多个标量映射为一个概率分布。
在这里插入图片描述
在这里插入图片描述
解决方案:
在这里插入图片描述
Softmax函数的代码实现如下:
这段代码实现了一个数值稳定的Softmax函数,用于将输入张量(Tensor)转换为概率分布。

#数值稳定的Softmax函数实现代码:
# x为tensor
def softmax(X):
    """
    输入:
        - X:shape=[N, C],N为向量数量,C为向量维度
    输出:
        形状相同的张量,每行是一个概率分布(所有元素和为1)。
    """
    # paddle.max:沿 axis=1(每行)求最大值。
    # keepdim=True:保持维度,使结果形状为 [N, 1](便于广播)。
    x_max = paddle.max(X, axis=1, keepdim=True)#N,1
    
    x_exp = paddle.exp(X - x_max)
    partition = paddle.sum(x_exp, axis=1, keepdim=True)#N,1
    return x_exp / partition

# 观察softmax的计算方式
X = paddle.to_tensor([[0.1, 0.2, 0.3, 0.4],[1,2,3,4]])
predict = softmax(X)
print(predict)

Softmax回归算子

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

softmax回归算子代码实现:

#softmax回归算子代码实现:
# 观察softmax的计算方式
X = paddle.to_tensor([[0.1, 0.2, 0.3, 0.4],[1,2,3,4]])
predict = softmax(X)
print(predict)

class model_SR(op.Op):
    def __init__(self, input_dim, output_dim):
        super(model_SR, self).__init__()
        self.params = {}
        # 将线性层的权重参数全部初始化为0
        self.params['W'] = paddle.zeros(shape=[input_dim, output_dim])
        # self.params['W'] = paddle.normal(mean=0, std=0.01, shape=[input_dim, output_dim])
        # 将线性层的偏置参数初始化为0
        self.params['b'] = paddle.zeros(shape=[output_dim])
        self.outputs = None

    def __call__(self, inputs):
        return self.forward(inputs)

    def forward(self, inputs):
        """
        输入:
            - inputs: shape=[N,D], N是样本数量,D是特征维度
        输出:
            - outputs:预测值,shape=[N,C],C是类别数
        """
        # 线性计算
        score = paddle.matmul(inputs, self.params['W']) + self.params['b']
        # Softmax 函数
        self.outputs = softmax(score)
        return self.outputs

# 随机生成1条长度为4的数据
inputs = paddle.randn(shape=[1,4])
print('Input is:', inputs)
# 实例化模型,这里令输入长度为4,输出类别数为3
model = model_SR(input_dim=4, output_dim=3)
outputs = model(inputs)
print('Output is:', outputs)

损失函数

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

多分类交叉熵损失函数实现代码:

#多分类交叉熵实现代码:
class MultiCrossEntropyLoss(op.Op):
    def __init__(self):
        self.predicts = None
        self.labels = None
        self.num = None

    def __call__(self, predicts, labels):
        return self.forward(predicts, labels)

    def forward(self, predicts, labels):
        """
        输入:
            - predicts:预测值,shape=[N, 1],N为样本数量
            - labels:真实标签,shape=[N, 1]
        输出:
            - 损失值:shape=[1]
        """
        self.predicts = predicts
        self.labels = labels
        self.num = self.predicts.shape[0]
        loss = 0
        for i in range(0, self.num):
            index = self.labels[i]
            loss -= paddle.log(self.predicts[i][index])
        return loss / self.num

# 测试一下
# 假设真实标签为第1类
labels = paddle.to_tensor([0])
# 计算风险函数
mce_loss = MultiCrossEntropyLoss()
print(mce_loss(outputs, labels))

模型优化

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

将上述计算方法定义在模型的backward函数中,代码实现如下:

#优化代码实现:
#对比于回归算子,多了参数梯度存储和反向传播算法

class model_SR(op.Op):
    def __init__(self, input_dim, output_dim):
        super(model_SR, self).__init__()
        self.params = {}
        # 将线性层的权重参数全部初始化为0
        self.params['W'] = paddle.zeros(shape=[input_dim, output_dim])
        # self.params['W'] = paddle.normal(mean=0, std=0.01, shape=[input_dim, output_dim])
        # 将线性层的偏置参数初始化为0
        self.params['b'] = paddle.zeros(shape=[output_dim])
        # 存放参数的梯度
        self.grads = {}
        self.X = None
        self.outputs = None
        self.output_dim = output_dim

    def __call__(self, inputs):
        return self.forward(inputs)

    def forward(self, inputs):
        self.X = inputs
        # 线性计算
        score = paddle.matmul(self.X, self.params['W']) + self.params['b']
        # Softmax 函数
        self.outputs = softmax(score)
        return self.outputs

    def backward(self, labels):
        """
        输入:
            - labels:真实标签,shape=[N, 1],其中N为样本数量
        """
        # 计算偏导数
        N =labels.shape[0]
        labels = paddle.nn.functional.one_hot(labels, self.output_dim)
        self.grads['W'] = -1 / N * paddle.matmul(self.X.t(), (labels-self.outputs))
        self.grads['b'] = -1 / N * paddle.matmul(paddle.ones(shape=[N]), (labels-self.outputs))

参数更新:
在这里插入图片描述

#实现一个梯度下降法的优化器函数SimpleBatchGD来执行参数更新过程。
# 其中step函数从模型的grads属性取出参数的梯度并更新。代码实现如下:
#参数更新实现:
class Optimizer(object):
    def __init__(self, init_lr, model):
        """
        优化器类初始化
        """
        # 初始化学习率,用于参数更新的计算
        self.init_lr = init_lr
        # 指定优化器需要优化的模型
        self.model = model

    @abstractmethod
    def step(self):
        """
        定义每次迭代如何更新参数
        """
        pass

class SimpleBatchGD(Optimizer):
    def __init__(self, init_lr, model):
        super(SimpleBatchGD, self).__init__(init_lr=init_lr, model=model)

    def step(self):
        # 参数更新
        # 遍历所有参数,按照公式(3.8)和(3.9)更新参数
        if isinstance(self.model.params, dict):
            for key in self.model.params.keys():
                self.model.params[key] = self.model.params[key] - self.init_lr * self.model.grads[key]

模型训练

实例化RunnerV2类,并传入训练配置。使用训练集和验证集进行模型训练,共训练500个epoch。每隔50个epoch打印训练集上的指标。代码实现如下:

class RunnerV2(object):
    def __init__(self, model, optimizer, metric, loss_fn):
        '''
        -model: 包含前向计算和反向传播方法的模型
        -optimizer: 负责参数更新的优化器
        -metric: 评估模型的性能函数(准确率)
        -loss_fn: 计算模型损失函数(交叉熵)
        '''
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metric = metric
        # 记录训练过程(训练集和验证集)中的评价指标变化情况
        self.train_scores = []
        self.dev_scores = []
        # 记录训练过程(训练集和验证集)中的损失函数变化情况
        self.train_loss = []
        self.dev_loss = []

    def train(self, train_set, dev_set, **kwargs):
        # 传入训练轮数,如果没有传入值则默认为0
        num_epochs = kwargs.get("num_epochs", 0)
        # 传入log打印频率,如果没有传入值则默认为100
        log_epochs = kwargs.get("log_epochs", 100)
        # 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"
        save_path = kwargs.get("save_path", "best_model.pdparams")
        # 梯度打印函数,如果没有传入则默认为"None"
        print_grads = kwargs.get("print_grads", None)
        # 记录全局最优指标
        best_score = 0
        # 进行num_epochs轮训练
        for epoch in range(num_epochs):
            X, y = train_set
            # 前向计算,获取模型预测
            logits = self.model(X)
            # 计算交叉熵损失
            trn_loss = self.loss_fn(logits, y).item()
            self.train_loss.append(trn_loss)
            # 计算评价指标
            trn_score = self.metric(logits, y).item()
            self.train_scores.append(trn_score)
            # 反向传播与优化:计算参数梯度
            self.model.backward(y)
            if print_grads is not None:
                # 打印每一层的梯度
                print_grads(self.model)
            # 更新模型参数
            self.optimizer.step() # 参数更新
            #验证步骤
            dev_score, dev_loss = self.evaluate(dev_set)
            # 如果当前指标为最优指标,保存该模型
            if dev_score > best_score:
                self.save_model(save_path)
                print(f"best accuracy performence has been updated: {best_score:.5f} --> {dev_score:.5f}")
                best_score = dev_score
            #日志输出
            if epoch % log_epochs == 0:
                print(f"[Train] epoch: {epoch}, loss: {trn_loss}, score: {trn_score}")
                print(f"[Dev] epoch: {epoch}, loss: {dev_loss}, score: {dev_score}")
    #再给定数据集上评估模型性能,返回评估指标和损失值;记录历史评估结果         
    def evaluate(self, data_set):
        X, y = data_set
        # 计算模型输出
        logits = self.model(X)
        # 计算损失函数
        loss = self.loss_fn(logits, y).item()
        self.dev_loss.append(loss)
        # 计算评价指标
        score = self.metric(logits, y).item()
        self.dev_scores.append(score)
        return score, loss

    #模型预测
    #对输入数据进行预测,返回模型输出
    def predict(self, X):
        return self.model(X)

    # 模型保存
    def save_model(self, save_path):
        paddle.save(self.model.paramloadel_models, save_path)

    # 模型加载(loadel_model)
    def load_model(self, model_path):
        self.model.params = paddle.load(model_path)

# 固定随机种子,保持每次运行结果一致
paddle.seed(102)

# 特征维度
input_dim = 2
# 类别数
output_dim = 3
# 学习率
lr = 0.1

# 实例化模型
model = model_SR(input_dim=input_dim, output_dim=output_dim)
# 指定优化器
optimizer = SimpleBatchGD(init_lr=lr, model=model)
# 指定损失函数
loss_fn = MultiCrossEntropyLoss()
# 指定评价方式
metric = accuracy
# 实例化RunnerV2类
runner = RunnerV2(model, optimizer, metric, loss_fn)

# 模型训练
runner.train([X_train, y_train], [X_dev, y_dev], num_epochs=500, log_eopchs=50, eval_epochs=1, save_path="best_model.pdparams")

# 可视化观察训练集与验证集的准确率变化情况
plt(runner,fig_name='linear-acc2.pdf')

可视化结果:
在这里插入图片描述
在这里插入图片描述

模型评价

使用测试集对训练完成后的最终模型进行评价,观察模型在测试集上的准确率。代码实现如下:

score, loss = runner.evaluate([X_test, y_test])
print("[Test] score/loss: {:.4f}/{:.4f}".format(score, loss))

可视化观察类别划分结果。代码如下:


# 均匀生成40000个数据点
x1, x2 = paddle.meshgrid(paddle.linspace(-3.5, 2, 200), paddle.linspace(-4.5, 3.5, 200))
x = paddle.stack([paddle.flatten(x1), paddle.flatten(x2)], axis=1)
# 预测对应类别
y = runner.predict(x)
y = paddle.argmax(y, axis=1)
# 绘制类别区域
plt.ylabel('x2')
plt.xlabel('x1')
plt.scatter(x[:,0].tolist(), x[:,1].tolist(), c=y.tolist(), cmap=plt.cm.Spectral)

paddle.seed(102)
n_samples = 1000
X, y = make_multiclass_classification(n_samples=n_samples, n_features=2, n_classes=3, noise=0.2)

plt.scatter(X[:, 0].tolist(), X[:, 1].tolist(), marker='*', c=y.tolist())

可视化结果如下:
在这里插入图片描述