(深度学习记录)第TR6周:Transformer实战-单词预测

发布于:2024-06-26 ⋅ 阅读:(49) ⋅ 点赞:(0)

🏡我的环境:

  • 语言环境:Python3.11.4
  • 编译器:Jupyter Notebook
  • torcch版本:2.0.1

1.定义模型(数据准备)

from torchtext.datasets import WikiText2
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import dataset
from torch import nn, Tensor
import math, os, torch
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from tempfile import TemporaryDirectory

# 全局设备对象
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 加载训练集,创建词汇表
train_iter = WikiText2(split='train', root='.')
tokenizer = get_tokenizer('basic_english')
vocab = build_vocab_from_iterator(map(tokenizer, train_iter), specials=['<unk>'])

def data_process(raw_text_iter: dataset.IterableDataset) -> Tensor:
    """将原始文本转换成扁平的张量"""
    data = [torch.tensor(vocab(tokenizer(item)), dtype=torch.long) for item in raw_text_iter]
    return torch.cat(tuple(filter(lambda t: t.numel() > 0, data)))

def batchify(data: Tensor, bsz: int) -> Tensor:
    """将数据划分为bsz个单独的序列,去除不能完全容纳的额外元素
    参数:
        data: Tensor, 形状为``[N]``
        bsz: int, 批大小
    返回:
        形状为 [N // bsz, bsz] 的张量
    """
    seq_len = data.size(0) // bsz
    data = data[:seq_len*bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

# 创建数据集
train_iter, val_iter, test_iter = WikiText2(root='.')
train_data = data_process(train_iter)
val_data = data_process(val_iter)
test_data = data_process(test_iter)


batch_size = 20
eval_batch_size = 10

# 将三类数据集都处理成固定长度
train_data = batchify(train_data, batch_size)
val_data = batchify(val_data, batch_size)
test_data = batchify(test_data, batch_size)

# 编写数据集取值函数(就像CV里的data_loader一样)
bptt = 35

def get_batch(source: Tensor, i: int) -> tuple[Tensor, Tensor]:
    """获取批次数据
    参数:
        source: Tensor, 形状为 ``[full_seq_len, batch_size]``
        i: int, 当前批次索引
    返回:
        tuple(data, target),
        - data形状为[seq_len, batch_size]
        - target形状为[seq_len * batch_size]
    """
    # 计算当前批次的序列长度,最大为bptt,确保不超过source的长度
    seq_len = min(bptt, len(source) - 1 - i)
    # 获取data,从i开始,长度为seq_len
    data = source[i:i+seq_len]
    # 获取target,从i+1开始,长度为seq_len,并将其形状转换为一维张量
    target = source[i+1:i+1+seq_len].reshape(-1)

    return data, target

 2.模型搭建

# 位置编码
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()

        self.dropout = nn.Dropout(p=dropout)

        # 生成位置编码的位置张量
        position = torch.arange(max_len).unsqueeze(1)
        # 计算位置编码的除数项
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        # 创建位置编码张量
        pe = torch.zeros(max_len, 1, d_model)
        # 使用正弦函数计算位置编码中的奇数维度部分
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        # 使用余弦函数计算位置编码中的偶数维度部分
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """Arguments:
            x: Tensor, 形状为 [seq_len, batch_size, embedding_dim]
        """
        # 将位置编码添加到输入张量
        x = x + self.pe[:x.size(0)]
        # 应用dropout
        return self.dropout(x)
        
# Transformer模型
class TransformerModel(nn.Module):
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int, nlayers: int, dropout: float = 0.5):
        super().__init__()

		# 位置编码
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        # 定义编码器层
        encoder_layers = TransformerEncoderLayer(d_model, nhead, d_hid, dropout)

        # 定义编码器
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.embedding = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.linear = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self) -> None:
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        self.linear.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, src_mask: Tensor = None) -> Tensor:
        """Arguments
            src: Tensor, 形状为 [seq_len, batch_size]
            src_mask: Tensor, 形状为 [seq_len, seq_len]

        Returns:
            输出的Tensor, 形状为 [seq_len, batch_size, ntoken]
        """
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask)
        output = self.linear(output)
        return output

3.创建模型

ntokens = len(vocab) # 词汇表的大小
emsize = 200 # 嵌入维度
d_hid = 200 # TransformerEncoder中前馈网络模型的维度
nlayers = 2 # TransformerEncoder中EncoderLayer层数
nhead = 2 # Transformer中的头数
dropout = 0.2 # 丢弃概率

model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

4. 训练模型

import time

# 定义交叉熵损失函数
criterion = nn.CrossEntropyLoss() 
# 学习率
lr = 5.0 
# 使用随机梯度下降(SGD)优化器,将模型参数传入优化器
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
# 使用学习率调度器,每隔1个epoch,将学习率按0.95的比例进行衰减
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
def train(model: nn.Module) -> None:
	"""单轮训练过程"""
    model.train()
    total_loss = 0
    log_interval = 200 # 每隔200个batch打印一次日志
    start_time = time.time()

    # 计算总的batch数量
    num_batches = len(train_data)
    for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
        data, targets = get_batch(train_data, i)
        output = model(data)
        output_flat = output.view(-1, ntokens)
        loss = criterion(output_flat, targets)

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)# 梯度裁剪,防止梯度爆炸
        optimizer.step()

        total_loss += loss.item()
        if batch % log_interval == 0 and batch > 0:
            lr = scheduler.get_last_lr()[0]

            ms_per_batch = (time.time() - start_time) * 1000 / log_interval
            cur_loss = total_loss / log_interval
            ppl = math.exp(cur_loss) # 计算困惑度
            print(f"|epoch {epoch:3d} | {batch:5d}/{num_batches:5d} batches | "
                  f"lr {lr:02.2f} | ms/batch {ms_per_batch:5.2f} | "
                  f"loss {cur_loss:5.2f} | ppl {ppl:8.2f}")

            total_loss = 0
            start_time = time.time()

def evaluate(model: nn.Module, eval_data: Tensor) -> None:
	"""单轮评估过程"""
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for i in range(0, eval_data.size(0) - 1, bptt):
            data, targets = get_batch(eval_data, i )
            seq_len = data.size(0)
            output = model(data)
            output_flat = output.view(-1, ntokens)
            total_loss += seq_len * criterion(output_flat, targets).item()
    return total_loss / (len(eval_data) -1)
best_val_loss = float('inf')
epochs = 1

with TemporaryDirectory() as tempdir: # 创建临时目录来保存最佳模型参数
    # 最佳模型参数的实际保存路径
    best_model_params_path = os.path.join(tempdir, 'best_model_params.pth')
    for epoch in range(1, epochs + 1):
        epoch_start_time = time.time()
        train(model)
        val_loss = evaluate(model, val_data)
        val_ppl = math.exp(val_loss)
        elapsed = time.time() - epoch_start_time # 计算当前epoch的耗时

        print('-'*89)
        print(f'| end of epoch {epoch:3d} | time: {elapsed:5.2f}s | '
              f'valid loss {val_loss:5.2f} | valid ppl {val_ppl:8.2f}')
        print('-'*89)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_params_path)

        scheduler.step()
    # 退出前加载性能最好的模型
    model.load_state_dict(torch.load(best_model_params_path))


 5.训练过程

|epoch   1 |   200/102499 batches | lr 5.00 | ms/batch  6.75 | loss  8.14 | ppl  3426.82
|epoch   1 |   400/102499 batches | lr 5.00 | ms/batch  5.60 | loss  6.25 | ppl   517.82
|epoch   1 |   600/102499 batches | lr 5.00 | ms/batch  6.05 | loss  5.61 | ppl   272.82
|epoch   1 |   800/102499 batches | lr 5.00 | ms/batch  5.80 | loss  5.27 | ppl   194.54
|epoch   1 |  1000/102499 batches | lr 5.00 | ms/batch  6.78 | loss  4.90 | ppl   133.77
|epoch   1 |  1200/102499 batches | lr 5.00 | ms/batch  7.06 | loss  4.51 | ppl    91.22
|epoch   1 |  1400/102499 batches | lr 5.00 | ms/batch  6.35 | loss  4.20 | ppl    66.74
|epoch   1 |  1600/102499 batches | lr 5.00 | ms/batch  6.72 | loss  4.00 | ppl    54.55
|epoch   1 |  1800/102499 batches | lr 5.00 | ms/batch  5.59 | loss  3.76 | ppl    42.92
|epoch   1 |  2000/102499 batches | lr 5.00 | ms/batch  6.63 | loss  3.63 | ppl    37.74
|epoch   1 |  2200/102499 batches | lr 5.00 | ms/batch  6.46 | loss  3.47 | ppl    32.27
|epoch   1 |  2400/102499 batches | lr 5.00 | ms/batch  6.50 | loss  3.45 | ppl    31.41
|epoch   1 |  2600/102499 batches | lr 5.00 | ms/batch  6.94 | loss  3.41 | ppl    30.35
|epoch   1 |  2800/102499 batches | lr 5.00 | ms/batch  6.64 | loss  3.27 | ppl    26.42
-----------------------------------------------------------------------------------------
| end of epoch   1 | time: 19.20s | valid loss  1.95 | valid ppl     7.01
-----------------------------------------------------------------------------------------

6.模型效果

test_loss = evaluate(model, test_data)
test_ppl = math.exp(test_loss)
print('='*89)
print(f'| End of training | test loss {test_loss:5.2f} | '
      f'test ppl {test_ppl:8.2f}')
print('='*89)

7.测试结果

=========================================================================================
| End of training | test loss  1.93 | test ppl     6.88
=========================================================================================

8.总结 

在数据的导入步中多次报错,推荐下载地址“https://aistudio.baidu.com/datasetdetail/230431
”先下载数据在进行测试。


网站公告

今日签到

点亮在社区的每一天
去签到