- 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
- 🍖 原作者:K同学啊
Pytorch文本分类入门
本周主要学习了文本分类,学习使用一个简单的模型来进行文本分类,同时学习了NLP中的文本嵌入
# 自定义数据集类
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
class MyDataset(Dataset):
def __init__(self, texts, labels):
self.texts = texts
self.labels = labels
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
texts = self.texts[idx]
labels = self.labels[idx]
return texts, labels
## 定义填充函数
def collate_batch(batch):
texts, labels = zip(*batch)
max_len = max(len(text) for text in texts)
padded_texts = [F.pad(text, (0, max_len - len(text)), value = 0) for text in texts]
padded_texts = torch.stack(padded_texts)
labels = torch.tensor(labels, dtype=torch.float).unsqueeze(1)
return padded_texts, labels
# 准备数据和数据加载器
# 假设有以下3个样本,分别由不同数量的单词索引组成
text_data = [
torch.tensor([1, 1, 1, 1], dtype=torch.long), # 样本1
torch.tensor([2, 2, 2], dtype=torch.long), # 样本2
torch.tensor([3, 3], dtype=torch.long) # 样本3
]
# 对应的标签
labels = torch.tensor([4, 5, 6], dtype=torch.float)
# 创建数据集和数据加载器
my_dataset = MyDataset(text_data, labels)
data_loader = DataLoader(my_dataset, batch_size=2, shuffle=True, collate_fn=collate_batch)
for batch in data_loader:
print(batch)
# 定义模型
class EmbeddingModel(nn.Module):
def __init__(self, vocab_size, embed_dim):
super(EmbeddingModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.fc = nn.Linear(embed_dim, 1) # 假设我们做一个二分类任务
def forward(self, text):
print("embedding输入文本是", text)
print("embedding输入文本是shape:", text.shape)
embedding = self.embedding(text)
embedding_mean = embedding.mean(dim=1) # 对每个样本的嵌入向量进行平均
print("embedding输出文本shape:", embedding_mean.shape)
return self.fc(embedding_mean)
# 训练模型
# 示例词典大小和嵌入维度
vocab_size = 10
embed_dim = 6
# 创建模型实例
model = EmbeddingModel(vocab_size, embed_dim)
# 定义一个简单的损失函数和优化器
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 训练模型
for epoch in range(1): # 训练1个epoch
for batch in data_loader:
texts, labels = batch
# 前向传播
outputs = model(texts)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch {epoch+1}, Loss: {loss.item()}')
# 加载数据
import torch
# 强制使用 CPU
device = torch.device("cpu")
print(f"Forcing use of device: {device}")
# 确保模型和数据都使用 CPU
# model = model.to(device)
# data = data.to(device)
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms, datasets
import os, PIL, pathlib, warnings
from torchtext.datasets import AG_NEWS
train_iter = AG_NEWS(split='train')
torchtext.datasets.AG_NEWS()是一个用于加载 AG News 数据集的 TorchText 数据集类。
AG News 数据集是一个用于文本分类任务的常见数据集,其中包含四个类别的新闻文章:世界、科技、体育和商业。
torchtext.datasets.AG_NEWS() 类加载的数据集是一个列表,其中每个条目都是一个元组,
包含以下两个元素:一条新闻文章的文本内容。新闻文章所属的类别(一个整数,从1到4,分别对应世界、科技、体育和商业)。
# 构建词典
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
tokenizer = get_tokenizer('basic_english') # 返回分词器函数
def yield_tokens(data_iter):
for _, text in data_iter:
yield tokenizer(text)
vocab = build_vocab_from_iterator(yield_tokens(train_iter),
specials=['<unk>'])
vocab.set_default_index(vocab['<unk>']) # 设置默认索引
vocab(['here', 'is', 'an', 'example'])
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x) - 1
text_pipeline('here is the an example')
label_pipeline('10')
# 生成数据批次和迭代器
from torch.utils.data import DataLoader
def collate_batch(batch):
label_list, text_list, offsets = [], [], [0]
for (_label, _text) in batch:
# 标签列表
label_list.append(label_pipeline(_label))
# 文本列表
processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
text_list.append(processed_text)
# 偏移量, 即语句的总词汇量
offsets.append(processed_text.size(0))
label_list = torch.tensor(label_list, dtype=torch.int64)
text_list = torch.cat(text_list)
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0) # 返回维度dim中输入元素的累积和
return label_list.to(device), text_list.to(device), offsets.to(device)
# 数据加载器
data_loader = DataLoader(train_iter,
batch_size=8,
shuffle=False,
collate_fn=collate_batch)
# 定义模型
from torch import nn
class TextClassificationModel(nn.Module):
def __init__(self, vocab_size, embed_dim, num_class):
super(TextClassificationModel, self).__init__()
self.embedding = nn.EmbeddingBag(vocab_size, # 词典大小
embed_dim, # 嵌入的维度
sparse=False) #
self.fc = nn.Linear(embed_dim, num_class)
self.init_weights()
def init_weights(self):
initrange = 0.5
self.embedding.weight.data.uniform_(-initrange, initrange)
self.fc.weight.data.uniform_(-initrange, initrange)
self.fc.bias.data.zero_()
def forward(self, text, offsets):
embedded = self.embedding(text, offsets)
return self.fc(embedded)
# 定义实例
num_class = len(set([label for (label, text) in train_iter]))
vocab_size = len(vocab)
em_size = 64
model = TextClassificationModel(vocab_size, em_size, num_class).to(device)
# 定义训练函数与评估函数
import time
def train(dataloader):
model.train() # 切换为训练模式
total_acc, train_loss, total_count = 0, 0, 0
log_interval = 500
start_time = time.time()
for idx, (label, text, offsets) in enumerate(dataloader):
predicted_label = model(text, offsets)
optimizer.zero_grad() # grad属性归零
loss = criterion(predicted_label, label) # 计算网络输出和真实值之间的差距,label为真实值
loss.backward() # 反向传播
optimizer.step() # 每一步自动更新
# 记录acc与loss
total_acc += (predicted_label.argmax(1) == label).sum().item()
train_loss += loss.item()
total_count += label.size(0)
if idx % log_interval == 0 and idx > 0:
elapsed = time.time() - start_time
print('| epoch {:1d} | {:4d}/{:4d} batches'
'| train_acc {:4.3f} train_loss {:4.5f}'.format(epoch, idx, len(dataloader),
total_acc/total_count, train_loss/total_count))
total_acc, train_loss, total_count = 0, 0, 0
start_time = time.time()
def evaluate(dataloader):
model.eval() # 切换为测试模式
total_acc, train_loss, total_count = 0, 0, 0
with torch.no_grad():
for idx, (label, text, offsets) in enumerate(dataloader):
predicted_label = model(text, offsets)
loss = criterion(predicted_label, label) # 计算loss值
# 记录测试数据
total_acc += (predicted_label.argmax(1) == label).sum().item()
train_loss += loss.item()
total_count += label.size(0)
return total_acc/total_count, train_loss/total_count
# 训练模型
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset
# 超参数
EPOCHS = 10 # epoch
LR = 5 # 学习率
BATCH_SIZE = 64 # batch size for training
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
total_accu = None
train_iter, test_iter = AG_NEWS() # 加载数据
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = random_split(train_dataset,
[num_train, len(train_dataset) - num_train])
train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
shuffle = True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
shuffle = True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
shuffle = True, collate_fn=collate_batch)
for epoch in range(1, EPOCHS + 1):
epoch_start_time = time.time()
train(train_datasetloader)
val_acc, val_loss = evaluate(valid_dataloader)
if total_accu is not None and total_accu > val_acc:
scheduler.step()
else:
total_accu = val_acc
print('-' * 69)
print('| epoch {:1d} | time:{:4.2f}s |'
'valid_acc {:4.3f} | valid_loss {:4.3f}'.format(epoch, time.time() - epoch_start_time, val_acc, val_loss))
print('-' * 69)
个人总结
- 学习了torchtext.data.utils.get_tokenizer()等分词相关函数
- 了解到词嵌入层的作用是将离散的单词表示(通常为整数索引)映射为固定大小的连续向量。这些向量捕捉了单词之间的语义关系,并作为网络的输入。
- 初始化词嵌入层的权重,可以使得模型在训练开始时具有一定的随机性,有助于避免梯度消失或梯度爆炸等问题
- torchtext.data.functional.to_map_style_dataset 函数的作用是将一个迭代式的数据集(转换为映射式的数据集。使得我们可以通过索引更方便地访问数据集中的元素。
- torchtext.data.functional 中的 to_map_style_dataset 函数可以将一个 Iterable-style 数据集转换为一个易于操作的 Map-style 数据集