HBU深度学习实验16-循环神经网络(3)

发布于:2024-12-22 ⋅ 阅读:(21) ⋅ 点赞:(0)

基于双向LSTM模型完成文本分类任务

模型训练、预测、评价

import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from data import load_vocab
from functools import partial
import time
import random
import numpy as np
from nndl import Accuracy, RunnerV3

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# 加载数据集
def load_imdb_data(path):
    assert os.path.exists(path)
    trainset, devset, testset = [], [], []
    with open(os.path.join(path, "train.txt"), "r", encoding='utf-8') as fr:
        for line in fr:
            sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)
            trainset.append((sentence, sentence_label))

    with open(os.path.join(path, "dev.txt"), "r", encoding='utf-8') as fr:
        for line in fr:
            sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)
            devset.append((sentence, sentence_label))

    with open(os.path.join(path, "test.txt"), "r", encoding='utf-8') as fr:
        for line in fr:
            sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)
            testset.append((sentence, sentence_label))

    return trainset, devset, testset


# 加载IMDB数据集
train_data, dev_data, test_data = load_imdb_data("./dataset/")
# # 打印一下加载后的数据样式
print(train_data[4])


class IMDBDataset(Dataset):
    def __init__(self, examples, word2id_dict):
        super(IMDBDataset, self).__init__()
        # 词典,用于将单词转为字典索引的数字
        self.word2id_dict = word2id_dict
        # 加载后的数据集
        self.examples = self.words_to_id(examples)

    def words_to_id(self, examples):
        tmp_examples = []
        for idx, example in enumerate(examples):
            seq, label = example
            # 将单词映射为字典索引的ID, 对于词典中没有的单词用[UNK]对应的ID进行替代
            seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split(" ")]
            label = int(label)
            tmp_examples.append([seq, label])
        return tmp_examples

    def __getitem__(self, idx):
        seq, label = self.examples[idx]
        return seq, label

    def __len__(self):
        return len(self.examples)


# 加载词表
word2id_dict = load_vocab("./dataset/vocab.txt")

# 实例化Dataset
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)

print('训练集样本数:', len(train_set))
print('样本示例:', train_set[4])


def collate_fn(batch_data, pad_val=0, max_seq_len=256):
    seqs, seq_lens, labels = [], [], []
    max_len = 0
    for example in batch_data:
        seq, label = example
        # 对数据序列进行截断
        seq = seq[:max_seq_len]
        # 对数据截断并保存于seqs中
        seqs.append(seq)
        seq_lens.append(len(seq))
        labels.append(label)
        # 保存序列最大长度
        max_len = max(max_len, len(seq))
    # 对数据序列进行填充至最大长度
    for i in range(len(seqs)):
        seqs[i] = seqs[i] + [pad_val] * (max_len - len(seqs[i]))

    # return (torch.tensor(seqs), torch.tensor(seq_lens)), torch.tensor(labels)
    return (torch.tensor(seqs).to(device), torch.tensor(seq_lens)), torch.tensor(labels).to(device)


max_seq_len = 5
batch_data = [[[1, 2, 3, 4, 5, 6], 1], [[2, 4, 6], 0]]
(seqs, seq_lens), labels = collate_fn(batch_data, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len)
print("seqs: ", seqs)
print("seq_lens: ", seq_lens)
print("labels: ", labels)

max_seq_len = 256
batch_size = 128
collate_fn = partial(collate_fn, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size,
                                           shuffle=True, drop_last=False, collate_fn=collate_fn)
dev_loader = torch.utils.data.DataLoader(dev_set, batch_size=batch_size,
                                         shuffle=False, drop_last=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size,
                                          shuffle=False, drop_last=False, collate_fn=collate_fn)


class AveragePooling(nn.Module):
    def __init__(self):
        super(AveragePooling, self).__init__()

    def forward(self, sequence_output, sequence_length):
        # 假设 sequence_length 是一个 PyTorch 张量
        sequence_length = sequence_length.unsqueeze(-1).to(torch.float32)
        # 根据sequence_length生成mask矩阵,用于对Padding位置的信息进行mask
        max_len = sequence_output.shape[1]

        mask = torch.arange(max_len, device='cuda') < sequence_length.to('cuda')
        mask = mask.to(torch.float32).unsqueeze(-1)
        # 对序列中paddling部分进行mask

        sequence_output = torch.multiply(sequence_output, mask.to('cuda'))
        # 对序列中的向量取均值
        batch_mean_hidden = torch.divide(torch.sum(sequence_output, dim=1), sequence_length.to('cuda'))
        return batch_mean_hidden


class Model_BiLSTM_FC(nn.Module):
    def __init__(self, num_embeddings, input_size, hidden_size, num_classes=2):
        super(Model_BiLSTM_FC, self).__init__()
        # 词典大小
        self.num_embeddings = num_embeddings
        # 单词向量的维度
        self.input_size = input_size
        # LSTM隐藏单元数量
        self.hidden_size = hidden_size
        # 情感分类类别数量
        self.num_classes = num_classes
        # 实例化嵌入层
        self.embedding_layer = nn.Embedding(num_embeddings, input_size, padding_idx=0)
        # 实例化LSTM层
        self.lstm_layer = nn.LSTM(input_size, hidden_size, batch_first=True, bidirectional=True)
        # 实例化聚合层
        self.average_layer = AveragePooling()
        # 实例化输出层
        self.output_layer = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, inputs):
        # 对模型输入拆分为序列数据和mask
        input_ids, sequence_length = inputs
        # 获取词向量
        inputs_emb = self.embedding_layer(input_ids)

        packed_input = nn.utils.rnn.pack_padded_sequence(inputs_emb, sequence_length.cpu(), batch_first=True,
                                                         enforce_sorted=False)
        # 使用lstm处理数据
        packed_output, _ = self.lstm_layer(packed_input)
        # 解包输出
        sequence_output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        # 使用聚合层聚合sequence_output
        batch_mean_hidden = self.average_layer(sequence_output, sequence_length)
        # 输出文本分类logits
        logits = self.output_layer(batch_mean_hidden)
        return logits


np.random.seed(0)
random.seed(0)
torch.seed()

# 指定训练轮次
num_epochs = 3
# 指定学习率
learning_rate = 0.001
# 指定embedding的数量为词表长度
num_embeddings = len(word2id_dict)
# embedding向量的维度
input_size = 256
# LSTM网络隐状态向量的维度
hidden_size = 256

# 实例化模型
model = Model_BiLSTM_FC(num_embeddings, input_size, hidden_size).to(device)
# 指定优化器
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999))
# 指定损失函数
loss_fn = nn.CrossEntropyLoss()
# 指定评估指标
metric = Accuracy()
# 实例化Runner
runner = RunnerV3(model, optimizer, loss_fn, metric)
# 模型训练
start_time = time.time()
runner.train(train_loader, dev_loader, num_epochs=num_epochs, eval_steps=10, log_steps=10,
             save_path="./checkpoints/best.pdparams")
end_time = time.time()
print("time: ", (end_time - start_time))

from nndl import plot_training_loss_acc

# 图像名字
fig_name = "./images/6.16.pdf"
# sample_step: 训练损失的采样step,即每隔多少个点选择1个点绘制
# loss_legend_loc: loss 图像的图例放置位置
# acc_legend_loc: acc 图像的图例放置位置
plot_training_loss_acc(runner, fig_name, fig_size=(16, 6), sample_step=10, loss_legend_loc="lower left",
                       acc_legend_loc="lower right")

model_path = "./checkpoints/best.pdparams"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")

id2label = {0: "消极情绪", 1: "积极情绪"}
text = "this movie is so great. I watched it three times already"
# 处理单条文本
sentence = text.split(" ")
words = [word2id_dict[word] if word in word2id_dict else word2id_dict['[UNK]'] for word in sentence]
words = words[:max_seq_len]
sequence_length = torch.tensor([len(words)], dtype=torch.int64)
words = torch.tensor(words, dtype=torch.int64).unsqueeze(0)
# 使用模型进行预测
logits = runner.predict((words.to(device), sequence_length.to(device)))
max_label_id = torch.argmax(logits, dim=-1).cpu().numpy()[0]
pred_label = id2label[max_label_id]
print("Label: ", pred_label)

模型预测

import torch
import torch.nn as nn
import os
from torch.utils.data import Dataset
from data import load_vocab
from functools import partial

# 设备选择,优先使用GPU,如果不可用则使用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载词表,这里假设你的词表加载函数 `load_vocab` 是正确定义的,和你之前代码里的保持一致
word2id_dict = load_vocab("./dataset/vocab.txt")
id2label = {0: "消极情绪", 1: "积极情绪"}

# 定义一个函数,用于将输入的文本转换为模型可接受的输入格式(类似之前在数据处理中的操作)
def preprocess_text(text, word2id_dict, max_seq_len=256):
    sentence = text.split(" ")
    words = [word2id_dict[word] if word in word2id_dict else word2id_dict['[UNK]'] for word in sentence]
    words = words[:max_seq_len]
    sequence_length = torch.tensor([len(words)], dtype=torch.int64)
    words = torch.tensor(words, dtype=torch.int64).unsqueeze(0)
    return words.to(device), sequence_length.to(device)

class AveragePooling(nn.Module):
    def __init__(self):
        super(AveragePooling, self).__init__()

    def forward(self, sequence_output, sequence_length):
        # 假设 sequence_length 是一个 PyTorch 张量
        sequence_length = sequence_length.unsqueeze(-1).to(torch.float32)
        # 根据sequence_length生成mask矩阵,用于对Padding位置的信息进行mask
        max_len = sequence_output.shape[1]

        mask = torch.arange(max_len, device='cuda') < sequence_length.to('cuda')
        mask = mask.to(torch.float32).unsqueeze(-1)
        # 对序列中paddling部分进行mask

        sequence_output = torch.multiply(sequence_output, mask.to('cuda'))
        # 对序列中的向量取均值
        batch_mean_hidden = torch.divide(torch.sum(sequence_output, dim=1), sequence_length.to('cuda'))
        return batch_mean_hidden

# 定义模型类,和你之前训练时定义的 `Model_BiLSTM_FC` 保持一致,这里直接复制过来,确保你的模型结构正确定义
class Model_BiLSTM_FC(nn.Module):
    def __init__(self, num_embeddings, input_size, hidden_size, num_classes=2):
        super(Model_BiLSTM_FC, self).__init__()
        self.num_embeddings = num_embeddings
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_classes = num_classes
        self.embedding_layer = nn.Embedding(num_embeddings, input_size, padding_idx=0)
        self.lstm_layer = nn.LSTM(input_size, hidden_size, batch_first=True, bidirectional=True)
        self.average_layer = AveragePooling()
        self.output_layer = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, inputs):
        input_ids, sequence_length = inputs
        inputs_emb = self.embedding_layer(input_ids)

        packed_input = nn.utils.rnn.pack_padded_sequence(inputs_emb, sequence_length.cpu(), batch_first=True,
                                                         enforce_sorted=False)
        packed_output, _ = self.lstm_layer(packed_input)
        sequence_output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        batch_mean_hidden = self.average_layer(sequence_output, sequence_length)
        logits = self.output_layer(batch_mean_hidden)
        return logits

# 实例化模型对象
num_embeddings = len(word2id_dict)
input_size = 256
hidden_size = 256
model = Model_BiLSTM_FC(num_embeddings, input_size, hidden_size).to(device)

# 加载训练好的模型参数,指定你的模型参数文件路径
model_path = "./checkpoints/best.pdparams"
if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()  # 设置为评估模式
else:
    print(f"模型参数文件 {model_path} 不存在,请检查路径!")

# 待预测的新文本示例,你可以替换为实际想要预测的文本内容
text = "这个电影总体来说还不错,但是中间有一段的台词太尬了"
# 对文本进行预处理,转换为模型输入格式
input_data, sequence_length = preprocess_text(text, word2id_dict)

# 使用模型进行预测
with torch.no_grad():  # 不需要计算梯度,加快预测速度并节省内存
    logits = model((input_data, sequence_length))
    max_label_id = torch.argmax(logits, dim=-1).cpu().numpy()[0]
    pred_label = id2label[max_label_id]
    print("预测结果:", pred_label)

其他代码文件 

data.py

import os


def load_vocab(path):
    assert os.path.exists(path)
    words = []
    with open(path, "r", encoding="utf-8") as f:
        words = f.readlines()
        words = [word.strip() for word in words if word.strip()]
    word2id = dict(zip(words, range(len(words))))
    return word2id

nndl.py

import torch
import matplotlib.pyplot as plt


def draw_process(title, color, iters, data, label):
    plt.title(title, fontsize=24)
    plt.xlabel("iter", fontsize=20)
    plt.ylabel(label, fontsize=20)
    plt.plot(iters, data, color=color, label=label)
    plt.legend()
    plt.grid()
    print(plt.show())


def plot_training_loss_acc(runner, fig_name, fig_size=(16, 6), sample_step=10, loss_legend_loc="lower left",
                           acc_legend_loc="lower left"):
    plt.figure(figsize=fig_size)

    plt.subplot(1, 2, 1)
    train_items = runner.train_step_losses[::sample_step]
    train_steps = [x[0] for x in train_items]
    train_losses = [x[1] for x in train_items]

    plt.plot(train_steps, train_losses, color='#8E004D', label="Train loss")

    while runner.dev_losses[-1][0] == -1:
        runner.dev_losses.pop()
        runner.dev_scores.pop()
    dev_steps = [x[0] for x in runner.dev_losses]
    dev_losses = [x[1] for x in runner.dev_losses]
    plt.plot(dev_steps, dev_losses, color='#E20079', linestyle='--', label="Dev loss")
    # 绘制坐标轴和图例
    plt.ylabel("loss", fontsize='x-large')
    plt.xlabel("step", fontsize='x-large')
    plt.legend(loc=loss_legend_loc, fontsize='x-large')

    plt.subplot(1, 2, 2)
    # 绘制评价准确率变化曲线
    plt.plot(dev_steps, runner.dev_scores, color='#E20079', linestyle="--", label="Dev accuracy")

    # 绘制坐标轴和图例
    plt.ylabel("score", fontsize='x-large')
    plt.xlabel("step", fontsize='x-large')
    plt.legend(loc=acc_legend_loc, fontsize='x-large')

    plt.savefig(fig_name)
    plt.show()


class RunnerV3(object):
    def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metric = metric  # 只用于计算评价指标

        # 记录训练过程中的评价指标变化情况
        self.dev_scores = []

        # 记录训练过程中的损失函数变化情况
        self.train_epoch_losses = []  # 一个epoch记录一次loss
        self.train_step_losses = []  # 一个step记录一次loss
        self.dev_losses = []

        # 记录全局最优指标
        self.best_score = 0

    def train(self, train_loader, dev_loader=None, **kwargs):
        # 将模型切换为训练模式
        self.model.train()

        # 传入训练轮数,如果没有传入值则默认为0
        num_epochs = kwargs.get("num_epochs", 0)
        # 传入log打印频率,如果没有传入值则默认为100
        log_steps = kwargs.get("log_steps", 100)
        # 评价频率
        eval_steps = kwargs.get("eval_steps", 0)

        # 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"
        save_path = kwargs.get("save_path", "best_model.pdparams")

        custom_print_log = kwargs.get("custom_print_log", None)

        # 训练总的步数
        num_training_steps = num_epochs * len(train_loader)

        if eval_steps:
            if self.metric is None:
                raise RuntimeError('Error: Metric can not be None!')
            if dev_loader is None:
                raise RuntimeError('Error: dev_loader can not be None!')

        # 运行的step数目
        global_step = 0
        total_acces = []
        total_losses = []
        Iters = []

        # 进行num_epochs轮训练
        for epoch in range(num_epochs):
            # 用于统计训练集的损失
            total_loss = 0

            for step, data in enumerate(train_loader):
                X, y = data
                # 获取模型预测
                # 计算logits
                logits = self.model(X)

                # 将y转换为和logits相同的形状
                acc_y = y.view(-1, 1)

                # 计算准确率
                probs = torch.softmax(logits, dim=1)
                pred = torch.argmax(probs, dim=1)
                correct = (pred == acc_y).sum().item()
                total = acc_y.size(0)
                acc = correct / total
                total_acces.append(acc)
                # print(acc.numpy()[0])

                loss = self.loss_fn(logits, y)  # 默认求mean
                total_loss += loss
                total_losses.append(loss.item())
                Iters.append(global_step)

                # 训练过程中,每个step的loss进行保存
                self.train_step_losses.append((global_step, loss.item()))

                if log_steps and global_step % log_steps == 0:
                    print(
                        f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")
                # 梯度反向传播,计算每个参数的梯度值
                loss.backward()

                if custom_print_log:
                    custom_print_log(self)

                # 小批量梯度下降进行参数更新
                self.optimizer.step()
                # 梯度归零
                self.optimizer.zero_grad()

                # 判断是否需要评价
                if eval_steps > 0 and global_step != 0 and \
                        (global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):

                    dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
                    print(f"[Evaluate]  dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")

                    # 将模型切换为训练模式
                    self.model.train()

                    # 如果当前指标为最优指标,保存该模型
                    if dev_score > self.best_score:
                        self.save_model(save_path)
                        print(
                            f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
                        self.best_score = dev_score

                global_step += 1

            # 当前epoch 训练loss累计值
            trn_loss = (total_loss / len(train_loader)).item()
            # epoch粒度的训练loss保存
            self.train_epoch_losses.append(trn_loss)

        draw_process("trainning acc", "green", Iters, total_acces, "trainning acc")
        print("total_acc:")
        print(total_acces)
        print("total_loss:")

        print(total_losses)

        print("[Train] Training done!")

    # 模型评估阶段,使用'paddle.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def evaluate(self, dev_loader, **kwargs):
        assert self.metric is not None

        # 将模型设置为评估模式
        self.model.eval()

        global_step = kwargs.get("global_step", -1)

        # 用于统计训练集的损失
        total_loss = 0

        # 重置评价
        self.metric.reset()

        # 遍历验证集每个批次
        for batch_id, data in enumerate(dev_loader):
            X, y = data

            # 计算模型输出
            logits = self.model(X)

            # 计算损失函数
            loss = self.loss_fn(logits, y).item()
            # 累积损失
            total_loss += loss

            # 累积评价
            self.metric.update(logits, y)

        dev_loss = (total_loss / len(dev_loader))
        self.dev_losses.append((global_step, dev_loss))

        dev_score = self.metric.accumulate()
        self.dev_scores.append(dev_score)

        return dev_score, dev_loss

    # 模型评估阶段,使用'paddle.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def predict(self, x, **kwargs):
        # 将模型设置为评估模式
        self.model.eval()
        # 运行模型前向计算,得到预测值
        logits = self.model(x)
        return logits

    def save_model(self, save_path):
        torch.save(self.model.state_dict(), save_path)

    def load_model(self, model_path):
        model_state_dict = torch.load(model_path)
        self.model.load_state_dict(model_state_dict)


class Accuracy():
    def __init__(self, is_logist=True):
        # 用于统计正确的样本个数
        self.num_correct = 0
        # 用于统计样本的总数
        self.num_count = 0

        self.is_logist = is_logist

    def update(self, outputs, labels):

        # 判断是二分类任务还是多分类任务,shape[1]=1时为二分类任务,shape[1]>1时为多分类任务
        if outputs.shape[1] == 1:  # 二分类
            outputs = torch.squeeze(outputs, dim=-1)
            if self.is_logist:
                # logist判断是否大于0
                preds = torch.tensor((outputs >= 0), dtype=torch.float32)
            else:
                # 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0
                preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)
        else:
            # 多分类时,使用'torch.argmax'计算最大元素索引作为类别
            preds = torch.argmax(outputs, dim=1)

        # 获取本批数据中预测正确的样本个数
        labels = torch.squeeze(labels, dim=-1)
        batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).cpu().numpy()
        batch_count = len(labels)

        # 更新num_correct 和 num_count
        self.num_correct += batch_correct
        self.num_count += batch_count

    def accumulate(self):
        # 使用累计的数据,计算总的指标
        if self.num_count == 0:
            return 0
        return self.num_correct / self.num_count

    def reset(self):
        # 重置正确的数目和总数
        self.num_correct = 0
        self.num_count = 0

    def name(self):
        return "Accuracy"

基于双向LSTM和注意力机制的文本分类

注意力机制:

使用加性注意力模型进行实验 

import os
import time
import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.optim import Adam
from torch.utils.data import Dataset
from functools import partial
from nndl import Accuracy, plot,draw_process

plt.rcParams['font.sans-serif'] = ['SimHei']
torch.seed()
def load_imdb_data(path):
    assert os.path.exists(path)
    trainset, devset, testset = [], [], []

    # 加载训练集
    with open(os.path.join(path, "train.txt"), "r", encoding="utf-8") as fr:
        for line in fr:
            sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)
            trainset.append((sentence, sentence_label))

    # 加载验证集
    with open(os.path.join(path, "dev.txt"), "r", encoding="utf-8") as fr:
        for line in fr:
            sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)
            devset.append((sentence, sentence_label))

    # 加载测试集
    with open(os.path.join(path, "test.txt"), "r", encoding="utf-8") as fr:
        for line in fr:
            sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)
            testset.append((sentence, sentence_label))

    return trainset, devset, testset

# 加载IMDB数据集
train_data, dev_data, test_data = load_imdb_data("./dataset/")
print("Train data:")
text,label=train_data[0]
print(f"Text: {text}; Label {label}")
class IMDBDataset(Dataset):
    def __init__(self, examples, word2id_dict):

        super(IMDBDataset, self).__init__()
        self.word2id_dict = word2id_dict
        self.examples = self.words_to_id(examples)

    def words_to_id(self, examples):
        tmp_examples = []
        for seq, label in examples:
            seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split(" ")]
            label = int(label)
            tmp_examples.append((seq, label))
        return tmp_examples

    def __getitem__(self, idx):
        seq, label = self.examples[idx]
        return torch.tensor(seq, dtype=torch.long), torch.tensor(label, dtype=torch.long)

    def __len__(self):
        return len(self.examples)
def load_vocab(vocab_file):

    word2id = {}
    with open(vocab_file, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line or "\t" not in line:  # 跳过空行或无TAB分隔的行
                continue
            try:
                word, idx = line.split("\t")  # 按TAB分割单词和ID
                word2id[word] = int(idx)  # 映射单词到整数ID
            except ValueError:
                print(f"Skipping invalid line: {line}")  # 打印格式错误的行
    return word2id

# 加载词表
word2id_dict = load_vocab("./dataset/vocab.txt")

batch_size = 128
max_seq_len = 256
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)

print('训练集样本数:', len(train_set))
print('处理后的第一条样本:', train_set[0])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def collate_fn(batch_data, pad_val=0, max_seq_len=256):
    seqs, seq_lens, labels = [], [], []
    max_len = 0
    for example in batch_data:
        seq, label = example
        # 对数据序列进行截断
        seq = seq[:max_seq_len]
        seqs.append(seq)
        seq_lens.append(len(seq))
        labels.append(label)
        max_len = max(max_len, len(seq))

    for i in range(len(seqs)):
        seqs[i] = list(seqs[i]) + [pad_val] * (max_len - len(seqs[i]))

    # 确保所有数据都被转移到同一设备(GPU或CPU)
    return (torch.tensor(seqs).to(device), torch.tensor(seq_lens).to(device)), torch.tensor(labels).to(device)


collate_fn = partial(collate_fn, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
dev_loader = torch.utils.data.DataLoader(dev_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
# 在训练时,将输入和标签都移动到相同的设备
for ix, batch in enumerate(train_loader):
    (inputs, seq_lens), labels = batch
    inputs, seq_lens, labels = inputs.to(device), seq_lens.to(device), labels.to(device)
    break  # 只打印第一个batch
class AdditiveScore(nn.Module):
    def __init__(self, hidden_size):
        super(AdditiveScore, self).__init__()
        self.fc_W = nn.Linear(hidden_size, hidden_size, bias=False)
        self.fc_U = nn.Linear(hidden_size, hidden_size, bias=False)
        self.fc_v = nn.Linear(hidden_size, 1, bias=False)
        # 查询向量使用均匀分布随机初始化
        self.q = nn.Parameter(torch.empty(1, hidden_size).uniform_(-0.5, 0.5))

    def forward(self, inputs):
        batch_size, seq_len, hidden_size = inputs.shape
        # scores: [batch_size, seq_len, hidden_size]
        scores = torch.tanh(self.fc_W(inputs) + self.fc_U(self.q))
        # scores: [batch_size, seq_len]
        scores = self.fc_v(scores).squeeze(-1)
        return scores

# 设置随机种子
torch.manual_seed(2021)

# 示例输入
inputs = torch.rand(1, 3, 3)

# 创建AdditiveScore模型实例并运行
additiveScore = AdditiveScore(hidden_size=3)
scores = additiveScore(inputs)

# 打印结果
print(scores)
class DotProductScore(nn.Module):
    def __init__(self, hidden_size):
        super(DotProductScore, self).__init__()
        # 使用均匀分布随机初始化一个查询向量
        self.q = nn.Parameter(torch.empty(hidden_size, 1).uniform_(-0.5, 0.5))

    def forward(self, inputs):
        # inputs: [batch_size, seq_length, hidden_size]
        # scores: [batch_size, seq_length, 1]
        scores = torch.matmul(inputs, self.q)
        # scores: [batch_size, seq_length]
        scores = scores.squeeze(-1)
        return scores

# 设置随机种子
torch.manual_seed(2021)

# 示例输入
inputs = torch.rand(1, 3, 3)

# 创建 DotProductScore 模型实例并运行
dotScore = DotProductScore(hidden_size=3)
scores = dotScore(inputs)

# 打印结果
print(scores)
import torch
import torch.nn as nn
import torch.nn.functional as F


class Attention(nn.Module):
    def __init__(self, hidden_size, use_additive=False):
        super(Attention, self).__init__()
        self.use_additive = use_additive
        # 使用加性模型或者点积模型
        if self.use_additive:
            self.scores = AdditiveScore(hidden_size)  # 确保 AdditiveScore 已经定义
        else:
            self.scores = DotProductScore(hidden_size)  # 确保 DotProductScore 已经定义
        self._attention_weights = None

    def forward(self, X, valid_lens):
        """
        输入:
            - X:输入矩阵,shape=[batch_size, seq_len, hidden_size]
            - valid_lens:长度矩阵,shape=[batch_size]
        输出:
            - context :输出矩阵,表示的是注意力的加权平均的结果
        """
        # scores: [batch_size, seq_len]
        scores = self.scores(X)

        # arrange: [1, seq_len], 比如 seq_len=4, arrange 会变为 [0, 1, 2, 3]
        arrange = torch.arange(scores.shape[1], dtype=torch.float32, device=X.device).unsqueeze(0)

        # valid_lens: [batch_size, 1]
        valid_lens = valid_lens.unsqueeze(1)

        # mask: [batch_size, seq_len]
        mask = arrange < valid_lens

        # scores: [batch_size, seq_len],对无效的长度位置使用一个极小的值(-1e9)
        y = torch.full_like(scores, -1e9)
        scores = torch.where(mask, scores, y)

        # attn_weights: [batch_size, seq_len]
        attn_weights = F.softmax(scores, dim=-1)
        self._attention_weights = attn_weights

        # context: [batch_size, 1, hidden_size]
        context = torch.matmul(attn_weights.unsqueeze(1), X)

        # context: [batch_size, hidden_size]
        context = context.squeeze(1)

        return context

    @property
    def attention_weights(self):
        return self._attention_weights

# 设置随机种子
torch.manual_seed(2021)
# 输入向量
X = torch.rand(1, 3, 3)
# 长度矩阵 valid_lens
valid_lens = torch.tensor([2])
# 打印输入向量
print("输入向量为 {}".format(X.detach().numpy()))
# 创建 Attention 实例并运行
add_atten = Attention(hidden_size=3, use_additive=True)  # 可设置 use_additive=False 使用点积模型
context = add_atten(X, valid_lens)
# 打印注意力输出
print("注意力的输出为 : {}".format(context.detach().numpy()))
# 打印注意力权重
print("注意力权重为 : {}".format(add_atten.attention_weights.detach().numpy()))

class Model_LSTMAttention(nn.Module):
    def __init__(
        self,
        hidden_size,
        embedding_size,
        vocab_size,
        n_classes=10,
        n_layers=1,
        use_additive=False,
    ):
        super(Model_LSTMAttention, self).__init__()
        # LSTM单元的隐藏神经元数量,它也将用来表示hidden和cell向量状态的维度
        self.hidden_size = hidden_size
        # 词向量的维度
        self.embedding_size = embedding_size
        # 词典的单词数量
        self.vocab_size = vocab_size
        # 文本分类的类别数量
        self.n_classes = n_classes
        # LSTM的层数
        self.n_layers = n_layers

        # 定义embedding层
        self.embedding = nn.Embedding(self.vocab_size, self.embedding_size)

        # 定义LSTM层,双向LSTM
        self.lstm = nn.LSTM(
            input_size=self.embedding_size,
            hidden_size=self.hidden_size,
            num_layers=self.n_layers,
            bidirectional=True,
            batch_first=True,
        )

        # LSTM的输出维度,因双向LSTM,所以乘以2
        output_size = self.hidden_size * 2

        # 定义Attention层
        self.attention = Attention(output_size, use_additive=use_additive)

        # 定义分类层,用于将语义向量映射到相应的类别
        self.cls_fc = nn.Linear(output_size, self.n_classes)

    def forward(self, inputs):
        input_ids, valid_lens = inputs
        # 获取batch_size
        batch_size = input_ids.shape[0]

        # 获取词向量并且进行dropout(这里没有明确指定dropout,默认embedding层是没有dropout的)
        embedded_input = self.embedding(input_ids)

        # 使用LSTM进行语义编码,输出为LSTM的最后一层的隐藏状态
        last_layers_hiddens, (last_step_hiddens, last_step_cells) = self.lstm(embedded_input)

        # 使用注意力机制
        last_layers_hiddens = self.attention(last_layers_hiddens, valid_lens)

        # 通过分类层获得类别数值
        logits = self.cls_fc(last_layers_hiddens)

        return logits
class RunnerV3(object):
    def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metric = metric  # 只用于计算评价指标

        # 记录训练过程中的评价指标变化情况
        self.dev_scores = []

        # 记录训练过程中的损失函数变化情况
        self.train_epoch_losses = []  # 一个epoch记录一次loss
        self.train_step_losses = []  # 一个step记录一次loss
        self.dev_losses = []

        # 记录全局最优指标
        self.best_score = 0

    def train(self, train_loader, dev_loader=None, **kwargs):
        # 将模型切换为训练模式
        self.model.train()

        # 传入训练轮数,如果没有传入值则默认为0
        num_epochs = kwargs.get("num_epochs", 0)
        # 传入log打印频率,如果没有传入值则默认为100
        log_steps = kwargs.get("log_steps", 100)
        # 评价频率
        eval_steps = kwargs.get("eval_steps", 0)

        # 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"
        save_path = kwargs.get("save_path", "best_model.pdparams")

        custom_print_log = kwargs.get("custom_print_log", None)

        # 训练总的步数
        num_training_steps = num_epochs * len(train_loader)

        if eval_steps:
            if self.metric is None:
                raise RuntimeError('Error: Metric can not be None!')
            if dev_loader is None:
                raise RuntimeError('Error: dev_loader can not be None!')

        # 运行的step数目
        global_step = 0
        total_acces = []
        total_losses = []
        Iters = []

        # 进行num_epochs轮训练
        for epoch in range(num_epochs):
            # 用于统计训练集的损失
            total_loss = 0

            for step, data in enumerate(train_loader):
                X, y = data
                # 获取模型预测
                # 计算logits
                logits = self.model(X)

                # 将y转换为和logits相同的形状
                acc_y = y.view(-1, 1)

                # 计算准确率
                probs = torch.softmax(logits, dim=1)
                pred = torch.argmax(probs, dim=1)
                correct = (pred == acc_y).sum().item()
                total = acc_y.size(0)
                acc = correct / total
                total_acces.append(acc)
                # print(acc.numpy()[0])

                loss = self.loss_fn(logits, y)  # 默认求mean
                total_loss += loss
                total_losses.append(loss.item())
                Iters.append(global_step)

                # 训练过程中,每个step的loss进行保存
                self.train_step_losses.append((global_step, loss.item()))

                if log_steps and global_step % log_steps == 0:
                    print(
                        f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")
                # 梯度反向传播,计算每个参数的梯度值
                loss.backward()

                if custom_print_log:
                    custom_print_log(self)

                # 小批量梯度下降进行参数更新
                self.optimizer.step()
                # 梯度归零
                self.optimizer.zero_grad()

                # 判断是否需要评价
                if eval_steps > 0 and global_step != 0 and \
                        (global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):

                    dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
                    print(f"[Evaluate]  dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")

                    # 将模型切换为训练模式
                    self.model.train()

                    # 如果当前指标为最优指标,保存该模型
                    if dev_score > self.best_score:
                        self.save_model(save_path)
                        print(
                            f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
                        self.best_score = dev_score

                global_step += 1

            # 当前epoch 训练loss累计值
            trn_loss = (total_loss / len(train_loader)).item()
            # epoch粒度的训练loss保存
            self.train_epoch_losses.append(trn_loss)

        draw_process("训练准确率变化曲线", "red", Iters, total_acces, "trainning acc")
        print("total_acc:")
        print(total_acces)
        print("total_loss:")

        print(total_losses)

        print("[Train] Training done!")

    # 模型评估阶段,使用'paddle.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def evaluate(self, dev_loader, **kwargs):
        assert self.metric is not None

        # 将模型设置为评估模式
        self.model.eval()

        global_step = kwargs.get("global_step", -1)

        # 用于统计训练集的损失
        total_loss = 0

        # 重置评价
        self.metric.reset()

        # 遍历验证集每个批次
        for batch_id, data in enumerate(dev_loader):
            X, y = data

            # 计算模型输出
            logits = self.model(X)

            # 计算损失函数
            loss = self.loss_fn(logits, y).item()
            # 累积损失
            total_loss += loss

            # 累积评价
            self.metric.update(logits, y)

        dev_loss = (total_loss / len(dev_loader))
        self.dev_losses.append((global_step, dev_loss))

        dev_score = self.metric.accumulate()
        self.dev_scores.append(dev_score)

        return dev_score, dev_loss

    # 模型评估阶段,使用'paddle.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def predict(self, x, **kwargs):
        # 将模型设置为评估模式
        self.model.eval()
        # 运行模型前向计算,得到预测值
        logits = self.model(x)
        return logits

    def save_model(self, save_path):
        torch.save(self.model.state_dict(), save_path)

    def load_model(self, model_path):
        model_state_dict = torch.load(model_path)
        self.model.load_state_dict(model_state_dict)

# 迭代的epoch数
epochs = 2
# 词汇表的大小
vocab_size = len(word2id_dict)
# lstm的输出单元的大小
hidden_size = 128
# embedding的维度
embedding_size = 128
# 类别数
n_classes = 2
# lstm的层数
n_layers = 1
# 学习率
learning_rate = 0.001
# 定义交叉熵损失
criterion = nn.CrossEntropyLoss()
# 指定评价指标
metric = Accuracy()
# 实例化基于LSTM的注意力模型
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=True,).to(device)
# 定义优化器
optimizer = Adam(params=model_atten.parameters(), lr=learning_rate)
# 实例化RunnerV3
runner = RunnerV3(model_atten, optimizer, criterion, metric)
start_time = time.time()
# 训练
runner.train(train_loader,dev_loader,num_epochs=epochs,log_steps=10,eval_steps=10,save_path="./checkpoint/model_best.pdparams")

end_time = time.time()
print("训练时间:{}".format(end_time-start_time))

plot(runner, 'att-loss-acc.pdf')

model_path = "checkpoint/model_best.pdparams"
runner.load_model(model_path)
accuracy, _ =  runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")

 nndl.py

import os

import torch
import matplotlib.pyplot as plt
from torch import nn
from torch.utils.data import Dataset

class RunnerV3(object):
    def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metric = metric  # 只用于计算评价指标

        # 记录训练过程中的评价指标变化情况
        self.dev_scores = []

        # 记录训练过程中的损失函数变化情况
        self.train_epoch_losses = []  # 一个epoch记录一次loss
        self.train_step_losses = []  # 一个step记录一次loss
        self.dev_losses = []

        # 记录全局最优指标
        self.best_score = 0

    def train(self, train_loader, dev_loader=None, **kwargs):
        # 将模型切换为训练模式
        self.model.train()

        # 传入训练轮数,如果没有传入值则默认为0
        num_epochs = kwargs.get("num_epochs", 0)
        # 传入log打印频率,如果没有传入值则默认为100
        log_steps = kwargs.get("log_steps", 100)
        # 评价频率
        eval_steps = kwargs.get("eval_steps", 0)

        # 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"
        save_path = kwargs.get("save_path", "best_model.pdparams")

        custom_print_log = kwargs.get("custom_print_log", None)

        # 训练总的步数
        num_training_steps = num_epochs * len(train_loader)

        if eval_steps:
            if self.metric is None:
                raise RuntimeError('Error: Metric can not be None!')
            if dev_loader is None:
                raise RuntimeError('Error: dev_loader can not be None!')

        # 运行的step数目
        global_step = 0

        # 进行num_epochs轮训练
        for epoch in range(num_epochs):
            # 用于统计训练集的损失
            total_loss = 0
            for step, data in enumerate(train_loader):
                X, y = data
                # 获取模型预测
                logits = self.model(X)
                loss = self.loss_fn(logits, y.long())  # 默认求mean
                total_loss += loss

                # 训练过程中,每个step的loss进行保存
                self.train_step_losses.append((global_step, loss.item()))

                if log_steps and global_step % log_steps == 0:
                    print(
                        f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")

                # 梯度反向传播,计算每个参数的梯度值
                loss.backward()

                if custom_print_log:
                    custom_print_log(self)

                # 小批量梯度下降进行参数更新
                self.optimizer.step()
                # 梯度归零
                self.optimizer.zero_grad()

                # 判断是否需要评价
                if eval_steps > 0 and global_step > 0 and \
                        (global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):

                    dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
                    print(f"[Evaluate]  dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")

                    # 将模型切换为训练模式
                    self.model.train()

                    # 如果当前指标为最优指标,保存该模型
                    if dev_score > self.best_score:
                        self.save_model(save_path)
                        print(
                            f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
                        self.best_score = dev_score

                global_step += 1

            # 当前epoch 训练loss累计值
            trn_loss = (total_loss / len(train_loader)).item()
            # epoch粒度的训练loss保存
            self.train_epoch_losses.append(trn_loss)

        print("[Train] Training done!")

    # 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def evaluate(self, dev_loader, **kwargs):
        assert self.metric is not None

        # 将模型设置为评估模式
        self.model.eval()

        global_step = kwargs.get("global_step", -1)

        # 用于统计训练集的损失
        total_loss = 0

        # 重置评价
        self.metric.reset()

        # 遍历验证集每个批次
        for batch_id, data in enumerate(dev_loader):
            X, y = data

            # 计算模型输出
            logits = self.model(X)

            # 计算损失函数
            loss = self.loss_fn(logits, y.long()).item()
            # 累积损失
            total_loss += loss

            # 累积评价
            self.metric.update(logits, y)

        dev_loss = (total_loss / len(dev_loader))
        dev_score = self.metric.accumulate()

        # 记录验证集loss
        if global_step != -1:
            self.dev_losses.append((global_step, dev_loss))
            self.dev_scores.append(dev_score)

        return dev_score, dev_loss

    # 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def predict(self, x, **kwargs):
        # 将模型设置为评估模式
        self.model.eval()
        # 运行模型前向计算,得到预测值
        logits = self.model(x)
        return logits

    def save_model(self, save_path):
        torch.save(self.model.state_dict(), save_path)

    def load_model(self, model_path):
        state_dict = torch.load(model_path)
        self.model.load_state_dict(state_dict)


class Accuracy():
    def __init__(self, is_logist=True):
        # 用于统计正确的样本个数
        self.num_correct = 0
        # 用于统计样本的总数
        self.num_count = 0

        self.is_logist = is_logist

    def update(self, outputs, labels):

        # 判断是二分类任务还是多分类任务,shape[1]=1时为二分类任务,shape[1]>1时为多分类任务
        if outputs.shape[1] == 1:  # 二分类
            outputs = torch.squeeze(outputs, dim=-1)
            if self.is_logist:
                # logist判断是否大于0
                preds = torch.tensor((outputs >= 0), dtype=torch.float32)
            else:
                # 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0
                preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)
        else:
            # 多分类时,使用'torch.argmax'计算最大元素索引作为类别
            preds = torch.argmax(outputs, dim=1)

        # 获取本批数据中预测正确的样本个数
        labels = torch.squeeze(labels, dim=-1)
        batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).cpu().numpy()
        batch_count = len(labels)

        # 更新num_correct 和 num_count
        self.num_correct += batch_correct
        self.num_count += batch_count

    def accumulate(self):
        # 使用累计的数据,计算总的指标
        if self.num_count == 0:
            return 0
        return self.num_correct / self.num_count

    def reset(self):
        # 重置正确的数目和总数
        self.num_correct = 0
        self.num_count = 0

    def name(self):
        return "Accuracy"
def plot(runner, fig_name):
    plt.figure(figsize=(10, 5))

    plt.subplot(1, 2, 1)
    train_items = runner.train_step_losses[::30]
    train_steps = [x[0] for x in train_items]
    train_losses = [x[1] for x in train_items]

    plt.plot(train_steps, train_losses, color='#8E004D', label="Train loss")
    if runner.dev_losses[0][0] != -1:
        dev_steps = [x[0] for x in runner.dev_losses]
        dev_losses = [x[1] for x in runner.dev_losses]
        plt.plot(dev_steps, dev_losses, color='#E20079', linestyle='--', label="Dev loss")
    # 绘制坐标轴和图例
    plt.ylabel("loss", fontsize='x-large')
    plt.xlabel("step", fontsize='x-large')
    plt.legend(loc='upper right', fontsize='x-large')

    plt.subplot(1, 2, 2)
    # 绘制评价准确率变化曲线
    if runner.dev_losses[0][0] != -1:
        plt.plot(dev_steps, runner.dev_scores,
                 color='#E20079', linestyle="--", label="Dev accuracy")
    else:
        plt.plot(list(range(len(runner.dev_scores))), runner.dev_scores,
                 color='#E20079', linestyle="--", label="Dev accuracy")
    # 绘制坐标轴和图例
    plt.ylabel("score", fontsize='x-large')
    plt.xlabel("step", fontsize='x-large')
    plt.legend(loc='lower right', fontsize='x-large')
    plt.show()

def plot_training_loss(runner, fig_name, sample_step):
    plt.figure()
    train_items = runner.train_step_losses[::sample_step]
    train_steps = [x[0] for x in train_items]
    train_losses = [x[1] for x in train_items]
    plt.plot(train_steps, train_losses, color='#e4007f', label="Train loss")

    dev_steps = [x[0] for x in runner.dev_losses]
    dev_losses = [x[1] for x in runner.dev_losses]
    plt.plot(dev_steps, dev_losses, color='#f19ec2', linestyle='--', label="Dev loss")

    # 绘制坐标轴和图例
    plt.ylabel("loss", fontsize='large')
    plt.xlabel("step", fontsize='large')
    plt.legend(loc='upper right', fontsize='x-large')

    plt.savefig(fig_name)
    plt.show()
def plot_grad(W_list, U_list, b_list, save_path, keep_steps=40):
    # 开始绘制图片
    plt.figure()
    # 默认保留前40步的结果
    steps = list(range(keep_steps))
    plt.plot(steps, W_list[:keep_steps], "r-", color="#e4007f", label="W_grad_l2")
    plt.plot(steps, U_list[:keep_steps], "-.", color="#f19ec2", label="U_grad_l2")
    plt.plot(steps, b_list[:keep_steps], "--", color="#000000", label="b_grad_l2")

    plt.xlabel("step")
    plt.ylabel("L2 Norm")
    plt.legend(loc="upper right")
    plt.show()
    plt.savefig(save_path)
    print("image has been saved to: ", save_path)
# 嵌入层
class Embedding(nn.Module):
    def __init__(self, num_embeddings, embedding_dim):
        super(Embedding, self).__init__()
        # 定义嵌入矩阵
        self.W = nn.init.xavier_uniform_(torch.empty(num_embeddings, embedding_dim), gain=1.0)
    def forward(self, inputs):
        # 根据索引获取对应词向量
        embs = self.W[inputs]
        return embs
class DigitSumDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __getitem__(self, idx):
        example = self.data[idx]
        seq = torch.tensor(example[0], dtype=torch.int64)
        label = torch.tensor(example[1], dtype=torch.int64)
        return seq, label

    def __len__(self):
        return len(self.data)
def load_data(data_path):
    # 加载训练集
    train_examples = []
    train_path = os.path.join(data_path, "train.txt")
    with open(train_path, "r", encoding="utf-8") as f:
        for line in f.readlines():
            # 解析一行数据,将其处理为数字序列seq和标签label
            items = line.strip().split("\t")
            seq = [int(i) for i in items[0].split(" ")]
            label = int(items[1])
            train_examples.append((seq, label))

    # 加载验证集
    dev_examples = []
    dev_path = os.path.join(data_path, "dev.txt")
    with open(dev_path, "r", encoding="utf-8") as f:
        for line in f.readlines():
            # 解析一行数据,将其处理为数字序列seq和标签label
            items = line.strip().split("\t")
            seq = [int(i) for i in items[0].split(" ")]
            label = int(items[1])
            dev_examples.append((seq, label))

    # 加载测试集
    test_examples = []
    test_path = os.path.join(data_path, "test.txt")
    with open(test_path, "r", encoding="utf-8") as f:
        for line in f.readlines():
            # 解析一行数据,将其处理为数字序列seq和标签label
            items = line.strip().split("\t")
            seq = [int(i) for i in items[0].split(" ")]
            label = int(items[1])
            test_examples.append((seq, label))

    return train_examples, dev_examples, test_examples
class Model_RNN4SeqClass(nn.Module):
    def __init__(self, model, num_digits, input_size, hidden_size, num_classes):
        super(Model_RNN4SeqClass, self).__init__()
        # 传入实例化的RNN层,例如SRN
        self.rnn_model = model
        # 词典大小
        self.num_digits = num_digits
        # 嵌入向量的维度
        self.input_size = input_size
        # 定义Embedding层
        self.embedding = Embedding(num_digits, input_size)
        # 定义线性层
        self.linear = nn.Linear(hidden_size, num_classes)

    def forward(self, inputs):
        # 将数字序列映射为相应向量
        inputs_emb = self.embedding(inputs)
        # 调用RNN模型
        hidden_state = self.rnn_model(inputs_emb)
        # 使用最后一个时刻的状态进行数字预测
        logits = self.linear(hidden_state)
        return logits


import torch
import matplotlib.pyplot as plt


class RunnerV3(object):
    def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metric = metric  # 只用于计算评价指标

        # 记录训练过程中的评价指标变化情况
        self.dev_scores = []

        # 记录训练过程中的损失函数变化情况
        self.train_epoch_losses = []  # 一个epoch记录一次loss
        self.train_step_losses = []  # 一个step记录一次loss
        self.dev_losses = []

        # 记录全局最优指标
        self.best_score = 0

    def train(self, train_loader, dev_loader=None, **kwargs):
        # 将模型切换为训练模式
        self.model.train()

        # 传入训练轮数,如果没有传入值则默认为0
        num_epochs = kwargs.get("num_epochs", 0)
        # 传入log打印频率,如果没有传入值则默认为100
        log_steps = kwargs.get("log_steps", 100)
        # 评价频率
        eval_steps = kwargs.get("eval_steps", 0)

        # 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"
        save_path = kwargs.get("save_path", "best_model.pdparams")

        custom_print_log = kwargs.get("custom_print_log", None)

        # 训练总的步数
        num_training_steps = num_epochs * len(train_loader)

        if eval_steps:
            if self.metric is None:
                raise RuntimeError('Error: Metric can not be None!')
            if dev_loader is None:
                raise RuntimeError('Error: dev_loader can not be None!')

        # 运行的step数目
        global_step = 0
        total_acces = []
        total_losses = []
        Iters = []

        # 进行num_epochs轮训练
        for epoch in range(num_epochs):
            # 用于统计训练集的损失
            total_loss = 0

            for step, data in enumerate(train_loader):
                X, y = data
                # 获取模型预测
                # 计算logits
                logits = self.model(X)

                # 将y转换为和logits相同的形状
                acc_y = y.view(-1, 1)

                # 计算准确率
                probs = torch.softmax(logits, dim=1)
                pred = torch.argmax(probs, dim=1)
                correct = (pred == acc_y).sum().item()
                total = acc_y.size(0)
                acc = correct / total
                total_acces.append(acc)
                # print(acc.numpy()[0])

                loss = self.loss_fn(logits, y)  # 默认求mean
                total_loss += loss
                total_losses.append(loss.item())
                Iters.append(global_step)

                # 训练过程中,每个step的loss进行保存
                self.train_step_losses.append((global_step, loss.item()))

                if log_steps and global_step % log_steps == 0:
                    print(
                        f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")
                # 梯度反向传播,计算每个参数的梯度值
                loss.backward()

                if custom_print_log:
                    custom_print_log(self)

                # 小批量梯度下降进行参数更新
                self.optimizer.step()
                # 梯度归零
                self.optimizer.zero_grad()

                # 判断是否需要评价
                if eval_steps > 0 and global_step != 0 and \
                        (global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):

                    dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
                    print(f"[Evaluate]  dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")

                    # 将模型切换为训练模式
                    self.model.train()

                    # 如果当前指标为最优指标,保存该模型
                    if dev_score > self.best_score:
                        self.save_model(save_path)
                        print(
                            f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
                        self.best_score = dev_score

                global_step += 1

            # 当前epoch 训练loss累计值
            trn_loss = (total_loss / len(train_loader)).item()
            # epoch粒度的训练loss保存
            self.train_epoch_losses.append(trn_loss)

        draw_process("trainning acc", "green", Iters, total_acces, "trainning acc")
        print("total_acc:")
        print(total_acces)
        print("total_loss:")

        print(total_losses)

        print("[Train] Training done!")

    # 模型评估阶段,使用'paddle.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def evaluate(self, dev_loader, **kwargs):
        assert self.metric is not None

        # 将模型设置为评估模式
        self.model.eval()

        global_step = kwargs.get("global_step", -1)

        # 用于统计训练集的损失
        total_loss = 0

        # 重置评价
        self.metric.reset()

        # 遍历验证集每个批次
        for batch_id, data in enumerate(dev_loader):
            X, y = data

            # 计算模型输出
            logits = self.model(X)

            # 计算损失函数
            loss = self.loss_fn(logits, y).item()
            # 累积损失
            total_loss += loss

            # 累积评价
            self.metric.update(logits, y)

        dev_loss = (total_loss / len(dev_loader))
        self.dev_losses.append((global_step, dev_loss))

        dev_score = self.metric.accumulate()
        self.dev_scores.append(dev_score)

        return dev_score, dev_loss

    # 模型评估阶段,使用'paddle.no_grad()'控制不计算和存储梯度
    @torch.no_grad()
    def predict(self, x, **kwargs):
        # 将模型设置为评估模式
        self.model.eval()
        # 运行模型前向计算,得到预测值
        logits = self.model(x)
        return logits

    def save_model(self, save_path):
        torch.save(self.model.state_dict(), save_path)

    def load_model(self, model_path):
        model_state_dict = torch.load(model_path)
        self.model.load_state_dict(model_state_dict)


class Accuracy():
    def __init__(self, is_logist=True):
        # 用于统计正确的样本个数
        self.num_correct = 0
        # 用于统计样本的总数
        self.num_count = 0

        self.is_logist = is_logist

    def update(self, outputs, labels):

        # 判断是二分类任务还是多分类任务,shape[1]=1时为二分类任务,shape[1]>1时为多分类任务
        if outputs.shape[1] == 1:  # 二分类
            outputs = torch.squeeze(outputs, dim=-1)
            if self.is_logist:
                # logist判断是否大于0
                preds = torch.tensor((outputs >= 0), dtype=torch.float32)
            else:
                # 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0
                preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)
        else:
            # 多分类时,使用'torch.argmax'计算最大元素索引作为类别
            preds = torch.argmax(outputs, dim=1)

        # 获取本批数据中预测正确的样本个数
        labels = torch.squeeze(labels, dim=-1)
        batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).cpu().numpy()
        batch_count = len(labels)

        # 更新num_correct 和 num_count
        self.num_correct += batch_correct
        self.num_count += batch_count

    def accumulate(self):
        # 使用累计的数据,计算总的指标
        if self.num_count == 0:
            return 0
        return self.num_correct / self.num_count

    def reset(self):
        # 重置正确的数目和总数
        self.num_correct = 0
        self.num_count = 0

    def name(self):
        return "Accuracy"


def draw_process(title, color, iters, data, label):
    plt.title(title, fontsize=24)
    plt.xlabel("iter", fontsize=20)
    plt.ylabel(label, fontsize=20)
    plt.plot(iters, data, color=color, label=label)
    plt.legend()
    plt.grid()
    print(plt.show())


def plot_training_loss_acc(runner, fig_name, fig_size=(16, 6), sample_step=10, loss_legend_loc="lower left",
                           acc_legend_loc="lower left"):
    plt.figure(figsize=fig_size)

    plt.subplot(1, 2, 1)
    train_items = runner.train_step_losses[::sample_step]
    train_steps = [x[0] for x in train_items]
    train_losses = [x[1] for x in train_items]

    plt.plot(train_steps, train_losses, color='#8E004D', label="Train loss")

    while runner.dev_losses[-1][0] == -1:
        runner.dev_losses.pop()
        runner.dev_scores.pop()
    dev_steps = [x[0] for x in runner.dev_losses]
    dev_losses = [x[1] for x in runner.dev_losses]
    plt.plot(dev_steps, dev_losses, color='#E20079', linestyle='--', label="Dev loss")
    # 绘制坐标轴和图例
    plt.ylabel("loss", fontsize='x-large')
    plt.xlabel("step", fontsize='x-large')
    plt.legend(loc=loss_legend_loc, fontsize='x-large')

    plt.subplot(1, 2, 2)
    # 绘制评价准确率变化曲线
    plt.plot(dev_steps, runner.dev_scores, color='#E20079', linestyle="--", label="Dev accuracy")

    # 绘制坐标轴和图例
    plt.ylabel("score", fontsize='x-large')
    plt.xlabel("step", fontsize='x-large')
    plt.legend(loc=acc_legend_loc, fontsize='x-large')

    plt.savefig(fig_name)
    plt.show()

使用点积注意力模型进行实验 

 

注意力可视化 

这部分一直在报错,设备显示冲突

参考链接:

通俗易懂理解注意力机制(Attention Mechanism)-CSDN博客

【Transformer系列(2)】注意力机制、自注意力机制、多头注意力机制、通道注意力机制、空间注意力机制超详细讲解-CSDN博客


网站公告

今日签到

点亮在社区的每一天
去签到