139-Twitter评论情绪基础RNN模型分类
143-LSTM文本分类模型
【参考文档】17-3Twitter评论情绪分类.ipynb
【导出代码】
# %% [markdown]
# # 139-Twitter评论情绪分类
# %% [markdown]
# ## 数据读取处理
# %%
import torch
import torchtext
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
# %%
data = pd.read_csv('Tweets.csv')
# %%
data.head()
# %% [markdown]
# 取两列:评价,文本
# %%
data = data[['airline_sentiment', 'text']]
# %%
data
# %% [markdown]
# 查看评价唯一值
# %%
data.airline_sentiment.unique()
# %%
data.info() #【查看缺失值】
# %%
data.duplicated().sum() #【查看重复值】
# %%
data.drop_duplicates(inplace=True) #【去掉重复值】
# %%
data.airline_sentiment.value_counts() #【查看各种类数量】
# %% [markdown]
# 情绪改为0,1,2进行编码
# %%
label = pd.factorize(data.airline_sentiment)[0]
# %% [markdown]
# 简化文本:0-转化为小写、1-去掉特殊符号
# %%
data.text
# %%
import re
pat = re.compile('[A-Za-z]+')
# %%
#【文本处理函数】
def pre_text(text):
text = pat.findall(text) #【提取所有英文】
text = [w.lower() for w in text] #【转化为小写】
return text
# %%
x = data.text.apply(pre_text) #【应用函数】
# %%
x
# %% [markdown]
# ## 创建词表
# vocab:每个单词创建一个序号
# %%
word_set = set()
for t in x: #【x:文本列表集合,t:每条文本列表】
for word in t:
word_set.add(word)
# %%
word_set #【所有的唯一单词】
# %%
max_word = len(word_set)+1
# %%
word_list = list(word_set) #【字典转列表】
# %%
word_list.index('you')
# %%
word_index = dict((w, word_list.index(w)+1) for w in word_list) #【列表推导式,直接输出字典】
# %%
word_index
# %%
x = x.apply(lambda t: [word_index.get(w, 0) for w in t]) #【将单词w从文本t取出,根据index转化为编码,最后应用导x上】
# %%
x
# %%
max_len = max(len(t) for t in x)
# %%
max_len
# %%
pad_x = [t + (max_len-len(t))*[0] for t in x] #【填充文本长度到最大:最大长度-当前长度*列表[0],最后加到当前文本t后】
# %%
pad_x = np.array(pad_x) #【转化列表】
# %%
pad_x.shape #【14452条,长度都是34】
# %%
label.shape
# %% [markdown]
# ## 划分训练测试数据
# %% [markdown]
# pip install sklearn:机器学习库
# %%
from sklearn.model_selection import train_test_split
# %%
x_train, x_test, y_train, y_test = train_test_split(pad_x, label)
# %%
x_train.shape, x_test.shape
# %% [markdown]
# 创建DataSet类
# %%
class Mydataset(torch.utils.data.Dataset):
def __init__(self, text_array, label_array):
self.text_array = text_array
self.label_array = label_array
def __getitem__(self, index):
text = torch.LongTensor(self.text_array[index])
label = self.label_array[index]
return text, label
def __len__(self):
return len(self.label_array)
# %%
train_ds = Mydataset(x_train, y_train)
test_ds = Mydataset(x_test, y_test)
# %%
BATCH_SIZE = 32
# %%
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)
# %% [markdown]
# ## 基础文本分类模型
# %%
embedding_dim = 100
# %%
#【没用考虑时序关系,只考虑了单句话】
class Basic_Net(nn.Module):
def __init__(self):
super(Basic_Net, self).__init__()
#【嵌入层】
self.embedding = nn.Embedding(num_embeddings=max_word, embedding_dim=embedding_dim) #【文本标记为enmbedding——dim张量】
#【Linear层】
self.fc1 = nn.Linear(max_len*100, 1024)
#【输出层】
self.fc2 = nn.Linear(1024, 3)
def forward(self, x):
x = self.embedding(x)
x = x.view(x.size(0), -1) #【将embedding——dim张量展开为1维张量】
x = F.relu(self.fc1(x))
x = self.fc2(x) #【输出层:不需要激活函数】
return x
# %%
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Basic_Net().to(device)
# %%
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# %%
def fit(epoch, model, train_dl, test_dl):
crrect = 0
total = 0
running_loss = 0.0
model.train()
for x, y in train_dl:
x, y = x.to(device), y.to(device)
y_pred = model(x)
loss = loss_fn(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
y_pred = torch.argmax(y_pred, dim=1)
crrect += (y_pred == y).sum().item()
total += y.size(0)
running_loss += loss.item()
epoch_loss = running_loss / len(train_dl.dataset)
epoch_acc = crrect / total
test_correct = 0
test_total = 0
test_running_loss = 0.0
model.eval()
with torch.no_grad():
for x, y in test_dl:
x, y = x.to(device), y.to(device)
y_pred = model(x)
loss = loss_fn(y_pred, y)
y_pred = torch.argmax(y_pred, dim=1)
test_correct += (y_pred == y).sum().item()
test_total += y.size(0)
test_running_loss += loss.item()
test_loss = test_running_loss / len(test_dl.dataset)
test_acc = test_correct / test_total
print(f"Epoch {epoch+1} loss: {epoch_loss:.4f} acc: {epoch_acc:.4f} | Test loss: {test_loss:.4f} acc: {test_acc:.4f}")
return epoch_loss, epoch_acc, test_loss, test_acc
# %%
epochs = 10
# %%
train_loss = []
train_acc = []
test_loss = []
test_acc = []
for epoch in range(epochs):
epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc = fit(epoch, model, train_dl, test_dl)
train_loss.append(epoch_loss)
train_acc.append(epoch_acc)
test_loss.append(epoch_test_loss)
test_acc.append(epoch_test_acc)
# %% [markdown]
# 严重的过拟合
# %% [markdown]
# # 143-LSTM文本分类模型
# %%
embedding_dim = 100
hidden_size = 200
# %%
class LSTM_Net(nn.Module):
def __init__(self,max_word, embedding_dim):
super(LSTM_Net, self).__init__()
self.embedding = nn.Embedding(max_word, embedding_dim) #【batch * maxlen * embedding_dim】
self.lstm = nn.LSTM(embedding_dim, hidden_size, batch_first=True)
self.fc1 = nn.Linear(hidden_size, 256)
self.fc2 = nn.Linear(256, 3)
def forward(self, x):
x = self.embedding(x)
x, _ = self.lstm(x) #【x --> batch, time_step, output】
x = x[:, -1, :]
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
# %%
model = LSTM_Net(max_word, embedding_dim).to(device)
# %%
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
def fit(epoch, model, train_dl, test_dl):
crrect = 0
total = 0
running_loss = 0.0
model.train()
for x, y in train_dl:
x, y = x.to(device), y.to(device)
y_pred = model(x)
loss = loss_fn(y_pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
with torch.no_grad():
y_pred = torch.argmax(y_pred, dim=1)
crrect += (y_pred == y).sum().item()
total += y.size(0)
running_loss += loss.item()
epoch_loss = running_loss / len(train_dl.dataset)
epoch_acc = crrect / total
test_correct = 0
test_total = 0
test_running_loss = 0.0
model.eval()
with torch.no_grad():
for x, y in test_dl:
x, y = x.to(device), y.to(device)
y_pred = model(x)
loss = loss_fn(y_pred, y)
y_pred = torch.argmax(y_pred, dim=1)
test_correct += (y_pred == y).sum().item()
test_total += y.size(0)
test_running_loss += loss.item()
test_loss = test_running_loss / len(test_dl.dataset)
test_acc = test_correct / test_total
print(f"Epoch {epoch+1} loss: {epoch_loss:.4f} acc: {epoch_acc:.4f} | Test loss: {test_loss:.4f} acc: {test_acc:.4f}")
return epoch_loss, epoch_acc, test_loss, test_acc
epochs = 10
train_loss = []
train_acc = []
test_loss = []
test_acc = []
for epoch in range(epochs):
epoch_loss, epoch_acc, epoch_test_loss, epoch_test_acc = fit(epoch, model, train_dl, test_dl)
train_loss.append(epoch_loss)
train_acc.append(epoch_acc)
test_loss.append(epoch_test_loss)
test_acc.append(epoch_test_acc)
# %%
import matplotlib.pyplot as plt
# %%
plt.plot(range(epochs), train_acc, c='r', label='Training Accuracy')
plt.plot(range(epochs), test_acc, c='b', label='Test Accuracy')
plt.title('Training and Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
# %%