N3-5学习打卡

发布于:2025-03-30 ⋅ 阅读:(32) ⋅ 点赞:(0)

Pytorch文本分类入门

本周主要学习了文本分类,学习使用一个简单的模型来进行文本分类,同时学习了NLP中的文本嵌入

# 自定义数据集类
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

class MyDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        texts = self.texts[idx]
        labels = self.labels[idx]

        return texts, labels
## 定义填充函数
def collate_batch(batch):
    texts, labels = zip(*batch)
    max_len = max(len(text) for text in texts)
    padded_texts = [F.pad(text, (0, max_len - len(text)), value = 0) for text in texts]
    padded_texts = torch.stack(padded_texts)
    labels = torch.tensor(labels, dtype=torch.float).unsqueeze(1)
    return padded_texts, labels

# 准备数据和数据加载器
# 假设有以下3个样本,分别由不同数量的单词索引组成
text_data = [
    torch.tensor([1, 1, 1, 1], dtype=torch.long),  # 样本1
    torch.tensor([2, 2, 2], dtype=torch.long),     # 样本2
    torch.tensor([3, 3], dtype=torch.long)         # 样本3
]

# 对应的标签
labels = torch.tensor([4, 5, 6], dtype=torch.float)

# 创建数据集和数据加载器
my_dataset = MyDataset(text_data, labels)
data_loader = DataLoader(my_dataset, batch_size=2, shuffle=True, collate_fn=collate_batch)

for batch in data_loader:
    print(batch)
# 定义模型
class EmbeddingModel(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        super(EmbeddingModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.fc = nn.Linear(embed_dim, 1) # 假设我们做一个二分类任务

    def forward(self, text):
        print("embedding输入文本是", text)
        print("embedding输入文本是shape:", text.shape)
        embedding = self.embedding(text)
        embedding_mean = embedding.mean(dim=1) # 对每个样本的嵌入向量进行平均
        print("embedding输出文本shape:", embedding_mean.shape)
        return self.fc(embedding_mean)

# 训练模型
# 示例词典大小和嵌入维度
vocab_size = 10
embed_dim = 6

# 创建模型实例
model = EmbeddingModel(vocab_size, embed_dim)

# 定义一个简单的损失函数和优化器
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 训练模型
for epoch in range(1): # 训练1个epoch
    for batch in data_loader:
        texts, labels = batch

        # 前向传播
        outputs = model(texts)
        loss = criterion(outputs, labels)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}, Loss: {loss.item()}')

# 加载数据
import torch

# 强制使用 CPU
device = torch.device("cpu")
print(f"Forcing use of device: {device}")

# 确保模型和数据都使用 CPU
# model = model.to(device)
# data = data.to(device)
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms, datasets
import os, PIL, pathlib, warnings

from torchtext.datasets import AG_NEWS

train_iter = AG_NEWS(split='train')

torchtext.datasets.AG_NEWS()是一个用于加载 AG News 数据集的 TorchText 数据集类。
AG News 数据集是一个用于文本分类任务的常见数据集,其中包含四个类别的新闻文章:世界、科技、体育和商业。
torchtext.datasets.AG_NEWS() 类加载的数据集是一个列表,其中每个条目都是一个元组,
包含以下两个元素:一条新闻文章的文本内容。新闻文章所属的类别(一个整数,从1到4,分别对应世界、科技、体育和商业)。


# 构建词典
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

tokenizer = get_tokenizer('basic_english') # 返回分词器函数

def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(yield_tokens(train_iter),
                                  specials=['<unk>'])

vocab.set_default_index(vocab['<unk>'])  # 设置默认索引
vocab(['here', 'is', 'an', 'example'])
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x) - 1

text_pipeline('here is the an example')
label_pipeline('10')

# 生成数据批次和迭代器
from torch.utils.data import DataLoader

def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]

    for (_label, _text) in batch:
        # 标签列表
        label_list.append(label_pipeline(_label))

        # 文本列表
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)

        # 偏移量, 即语句的总词汇量
        offsets.append(processed_text.size(0))

    label_list = torch.tensor(label_list, dtype=torch.int64)
    text_list = torch.cat(text_list)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0) # 返回维度dim中输入元素的累积和

    return label_list.to(device), text_list.to(device), offsets.to(device)

# 数据加载器
data_loader = DataLoader(train_iter,
                         batch_size=8,
                         shuffle=False,
                         collate_fn=collate_batch)
# 定义模型
from torch import nn

class TextClassificationModel(nn.Module):

    def __init__(self, vocab_size, embed_dim, num_class):
        super(TextClassificationModel, self).__init__()

        self.embedding = nn.EmbeddingBag(vocab_size,    # 词典大小
                                         embed_dim,     # 嵌入的维度
                                         sparse=False)   #

        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)
# 定义实例
num_class = len(set([label for (label, text) in train_iter]))
vocab_size = len(vocab)
em_size = 64
model = TextClassificationModel(vocab_size, em_size, num_class).to(device)

# 定义训练函数与评估函数
import time

def train(dataloader):
    model.train()  # 切换为训练模式
    total_acc, train_loss, total_count = 0, 0, 0
    log_interval = 500
    start_time = time.time()

    for idx, (label, text, offsets) in enumerate(dataloader):

        predicted_label = model(text, offsets)

        optimizer.zero_grad()                      # grad属性归零
        loss = criterion(predicted_label, label)   # 计算网络输出和真实值之间的差距,label为真实值
        loss.backward()                            # 反向传播
        optimizer.step()                          # 每一步自动更新

        # 记录acc与loss
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        train_loss += loss.item()
        total_count += label.size(0)

        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print('| epoch {:1d} | {:4d}/{:4d} batches'
                  '| train_acc {:4.3f} train_loss {:4.5f}'.format(epoch, idx, len(dataloader),
                                              total_acc/total_count, train_loss/total_count))
            total_acc, train_loss, total_count = 0, 0, 0
            start_time = time.time()

def evaluate(dataloader):
    model.eval()   # 切换为测试模式
    total_acc, train_loss, total_count = 0, 0, 0

    with torch.no_grad():
         for idx, (label, text, offsets) in enumerate(dataloader):
             predicted_label = model(text, offsets)

             loss = criterion(predicted_label, label) # 计算loss值
             # 记录测试数据
             total_acc += (predicted_label.argmax(1) == label).sum().item()
             train_loss += loss.item()
             total_count += label.size(0)

    return total_acc/total_count, train_loss/total_count

# 训练模型
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset

# 超参数
EPOCHS = 10 # epoch
LR = 5      # 学习率
BATCH_SIZE = 64  # batch size for training

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
total_accu = None

train_iter, test_iter = AG_NEWS() # 加载数据
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)
num_train = int(len(train_dataset) * 0.95)

split_train_, split_valid_ = random_split(train_dataset,
                                          [num_train, len(train_dataset) - num_train])

train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
                                 shuffle = True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
                                 shuffle = True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
                             shuffle = True, collate_fn=collate_batch)

for epoch in range(1, EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_datasetloader)
    val_acc, val_loss = evaluate(valid_dataloader)

    if total_accu is not None and total_accu > val_acc:
        scheduler.step()
    else:
        total_accu = val_acc
    print('-' * 69)
    print('| epoch {:1d} | time:{:4.2f}s |'
          'valid_acc {:4.3f} | valid_loss {:4.3f}'.format(epoch, time.time() - epoch_start_time, val_acc, val_loss))
    print('-' * 69)


个人总结

  • 学习了torchtext.data.utils.get_tokenizer()等分词相关函数
  • 了解到词嵌入层的作用是将离散的单词表示(通常为整数索引)映射为固定大小的连续向量。这些向量捕捉了单词之间的语义关系,并作为网络的输入。
  • 初始化词嵌入层的权重,可以使得模型在训练开始时具有一定的随机性,有助于避免梯度消失或梯度爆炸等问题
  • torchtext.data.functional.to_map_style_dataset 函数的作用是将一个迭代式的数据集(转换为映射式的数据集。使得我们可以通过索引更方便地访问数据集中的元素。
  • torchtext.data.functional 中的 to_map_style_dataset 函数可以将一个 Iterable-style 数据集转换为一个易于操作的 Map-style 数据集