今天给大家分享一个NLP(自然语言处理)中的一个小案例,本案例讲解了RNN、LSTM、GRU模型是如何使用并进行预测的,
一、案例架构
人名分类器的实现可分为以下五个步骤:
- 第一步: 导入必备的工具包
- 第二步: 对data文件中的数据进行处理,满足训练要求
- 第三步: 构建RNN模型(包括传统RNN, LSTM以及GRU)
- 第四步: 构建训练函数并进行训练
- 五步第: 构建评估函数并进行预测
二 、实现步骤
1.导包
# 导入torch工具
import json
import torch
# 导入nn准备构建模型
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# 导入torch的数据源 数据迭代器工具包
from torch.utils.data import Dataset, DataLoader
# 用于获得常见字母及字符规范化
import string
# 导入时间工具包
import time
# 引入制图工具包
import matplotlib.pyplot as plt
from tqdm import tqdm
2.获取常用的字符数量
all_letters = string.ascii_letters+" .,;'"
print(f'all_letters--》{all_letters}')
print(f'{all_letters.find("A")}')
n_letters = len(all_letters)
print('字符的总个数', n_letters)
3.获取国家的类别个数
# 国家名 种类数
categorys = ['Italian', 'English', 'Arabic', 'Spanish', 'Scottish', 'Irish', 'Chinese', 'Vietnamese', 'Japanese',
'French', 'Greek', 'Dutch', 'Korean', 'Polish', 'Portuguese', 'Russian', 'Czech', 'German']
# 国家名 个数
categorynum = len(categorys)
print('categorys--->', categorynum)
4.读取数据到内存中
def read_data(filename):
# 定义两个空列表,分别存储人名和国家名
my_list_x, my_list_y = [], []
# 读取数据
with open(filename, 'r', encoding='utf-8') as fr:
for line in fr.readlines():
# 数据清洗
if len(line) <= 5:
continue
x, y = line.strip().split('\t')
my_list_x.append(x)
my_list_y.append(y)
return my_list_x, my_list_y
5.构建dataset类
定义数据集,并重写三种魔法方法:
def __init__(self, my_list_x, my_list_y):
# 如果继承的父类没有__init__方法,那么此时可以省略掉super().__init__(),当然要是写上也不会报错
# super().__init__()
# 获取样本x
self.my_list_x = my_list_x
# 获取样本x对应的标签y
self.my_list_y = my_list_y
# 获取样本的长度
self.sample_len = len(my_list_x)
def __len__(self):
return self.sample_len
def __getitem__(self, item):
# item代表就是索引
index = min(max(item, 0), self.sample_len-1)
# 根据索引取出对应的x和y
x = self.my_list_x[index]
# print(f'x---->{x}')
y = self.my_list_y[index]
# print(f'y---》{y}')
# 初始化全零的一个张量
tensor_x = torch.zeros(len(x), n_letters)
# 遍历人名的每个字母变成one-hot编码
for idx, letter in enumerate(x):
tensor_x[idx][all_letters.find(letter)] = 1
# print(f'tensor_x--》{tensor_x}')
tensor_y = torch.tensor(categorys.index(y), dtype=torch.long)
return tensor_x, tensor_y
6.实例化DataLoader
def get_dataloader():
my_list_x, my_list_y = read_data(filename='./data/name_classfication.txt')
nameClass_dataset = NameClassDataset(my_list_x, my_list_y)
# 实例化Dataloader
train_dataloader = DataLoader(dataset=nameClass_dataset,
batch_size=1,
shuffle=True)
return train_dataloader
7.定义RNN模型
定义RNN类,以下是继承Module父类,重写三种魔法方法:
init方法
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super().__init__()
# input_size-->输入x单词的词嵌入维度
self.input_size = input_size
# hidden_size-->RNN输出的隐藏层维度
self.hidden_size = hidden_size
# output_size-->国家类别的总个数
self.output_size = output_size
# num_layers:几层隐藏层
self.num_layers = num_layers
# 实例化RNN对象
# batch_first=False,默认
self.rnn = nn.RNN(input_size, hidden_size, num_layers)
# 定义输出层
self.out = nn.Linear(hidden_size, output_size)
# 定义logsoftmax层
self.softmax = nn.LogSoftmax(dim=-1)
forward方法
def forward(self, input_x, hidden):
# input_x按照讲义,输入的时候,是个二维的【seq_len, input_size】-->[6, 57]
# hidden:初始化隐藏层的值:[1, 1, 128]
# 升维度,在dim=1的维度进行升维:input_x-->[6, 1, 57]
input_x = input_x.unsqueeze(dim=1)
# 将input_x和hidden送入rnn:rnn_output-->shape--[6,1,128];hn-->[1, 1, 128]
rnn_output, hn = self.rnn(input_x, hidden)
# 将上述rnn的结果经过输出层;rnn_output[-1]获取最后一个单词的词向量代表整个句子的语意
# temp_vec-->[1, 128]
temp_vec = rnn_output[-1]
# 将temp_vec送入输出层:result-->[1, 18]
result = self.out(temp_vec)
return self.softmax(result), hn
inithidden方法
def inithidden(self):
return torch.zeros(self.num_layers, 1, self.hidden_size)
8.定义LSTM模型
定义LSTM类,以下是重写三种魔法方法:
init方法
def __init__(self, input_size, hidden_size, output_size, num_layers=1):
super().__init__()
# input_size-->输入x单词的词嵌入维度
self.input_size = input_size
# hidden_size-->RNN输出的隐藏层维度
self.hidden_size = hidden_size
# output_size-->国家类别的总个数
self.output_size = output_size
# num_layers:几层隐藏层
self.num_layers = num_layers
# 实例化LSTM对象
# batch_first=False,默认
self.lstm = nn.LSTM(input_size, hidden_size, num_layers)
# 定义输出层
self.out = nn.Linear(hidden_size, output_size)
# 定义logsoftmax层
self.softmax = nn.LogSoftmax(dim=-1)
forward方法
def forward(self, input_x, h0, c0):
# input_x-shape-->[seq_len, embed_dim]-->[6, 57]
# h0,c0-->shape-->[num_layers, batch_size, hidden_size]-->[1, 1, 128]
# 1.先对input_x升维度;shape-->[6, 1, 57]
input_x = input_x.unsqueeze(dim=1)
# 2.将input_x, h0,c0送入lstm模型
# lstm_output-->shape=-->[6, 1, 128]
lstm_output, (hn, cn) = self.lstm(input_x, (h0, c0))
# 3.取出lstm输出结果中最后一个单词对应的隐藏层输出结果送入输出层
# temp_vec-->shape-->[1, 128]
temp_vec = lstm_output[-1]
# 送入输出层result->shape-->[1, 18]
result = self.out(temp_vec)
return self.softmax(result), hn, cn
inithidden方法
def inithidden(self):
h0 = torch.zeros(self.num_layers, 1, self.hidden_size)
c0 = torch.zeros(self.num_layers, 1, self.hidden_size)
return h0, c0
9.定义GRU模型
GRU模型的类跟RNN大差不差,这里就不展示了哈,就是将RNN内的名称改成GRU
10.定义RNN的训练函数
def train_rnn():
# 1.读取txt文档数据
my_list_x, my_list_y = read_data(filename='./data/name_classfication.txt')
# 2.实例化Dataset对象
name_dataset = NameClassDataset(my_list_x, my_list_y)
# 3.实例化自定义的RNN模型对象
input_size = n_letters # 57
hidden_size = 128
output_size = categorynum # 18
name_rnn = NameRNN(input_size, hidden_size, output_size)
# 4.实例化损失函数对象以及优化器对象
rnn_nllloss = nn.NLLLoss()
rnn_adam = optim.Adam(name_rnn.parameters(), lr=my_lr)
# 5.定义训练日志的参数
start_time = time.time()
total_iter_num = 0 # 已经训练的样本的总数
total_loss = 0.0 # 已经训练的样本的总损失
total_loss_list = [] # 每隔100个样本我们计算一下平均损失,并保存
total_acc_num = 0 # 已经训练的样本中预测正确的个数
total_acc_list = []# 每隔100个样本我们计算一下平均准确率,并保存
# 6. 开始外部迭代
for epoch_idx in range(epochs):
# 6.1 实例化Dataloader的对象
train_dataloader = DataLoader(dataset=name_dataset, batch_size=1, shuffle=True)
# 6.2 开始遍历迭代器,内部迭代
for idx, (tensor_x, tensor_y) in enumerate(tqdm(train_dataloader)):
# 6.3 准备模型需要的数据
tensor_x0 = tensor_x[0] # [seq_length, input_size]
h0 = name_rnn.inithidden()
# print(f'tensor_y--》{tensor_y}')
# 6.4 将数据送入模型得到预测结果output-->shape-->[1, 18]
output, hn = name_rnn(tensor_x0, h0)
# print(f'output--》{output}')
# 6.5 计算损失
my_loss = rnn_nllloss(output, tensor_y)
# print(f'my_loss--》{my_loss}')
# 6.6 梯度清零
rnn_adam.zero_grad()
# 6.7 反向传播
my_loss.backward()
# 6.8 梯度更新
rnn_adam.step()
# 6.9 统计已经训练的样本的总个数
total_iter_num = total_iter_num + 1
# 6.10 统计已经训练的样本总损失
total_loss = total_loss + my_loss.item()
# 6.11 统计已经训练的样本中预测正确的样本总个数
# torch.argmax(output)取出预测结果中最大概率值对应的索引
predict_id = 1 if torch.argmax(output).item() == tensor_y.item() else 0
total_acc_num = total_acc_num + predict_id
# 6.12 每间隔100步,保存平均损失已经平均准确率
if (total_iter_num % 100) == 0:
# 计算平均损失并保存
avg_loss = total_loss / total_iter_num
total_loss_list.append(avg_loss)
# 计算平均准确率并保存
avg_acc = total_acc_num / total_iter_num
total_acc_list.append(avg_acc)
# 6.13 每间隔2000步,打印日志
if (total_iter_num % 2000) == 0:
temp_avg_loss = total_loss / total_iter_num
temp_avg_acc = total_acc_num / total_iter_num
print('轮次:%d, 损失:%.6f, 时间:%d,准确率:%.3f' %(epoch_idx+1, temp_avg_loss, time.time() - start_time, temp_avg_acc))
# 7保存模型
torch.save(name_rnn.state_dict(), './save_model/szAI_%d.bin'%(epoch_idx+1))
# 8. 计算训练的总时间
total_time = time.time() - start_time
# 9. 将损失列表和准确率列表以及时间保存到字典并存储到文件里
rnn_dict = {"total_loss_list":total_loss_list,
"total_time":total_time,
"total_acc_list":total_acc_list}
with open('name_classify_rnn.json', 'w', encoding='utf-8')as fw:
fw.write(json.dumps(rnn_dict))
11.定义LSTM模型的训练函数
def train_lstm():
# 1.读取txt文档数据
my_list_x, my_list_y = read_data(filename='./data/name_classfication.txt')
# 2.实例化Dataset对象
name_dataset = NameClassDataset(my_list_x, my_list_y)
# 3.实例化自定义的RNN模型对象
input_size = n_letters # 57
hidden_size = 128
output_size = categorynum # 18
name_lstm = NameLSTM(input_size, hidden_size, output_size)
# 4.实例化损失函数对象以及优化器对象
rnn_nllloss = nn.NLLLoss()
rnn_adam = optim.Adam(name_lstm.parameters(), lr=my_lr)
# 5.定义训练日志的参数
start_time = time.time()
total_iter_num = 0 # 已经训练的样本的总数
total_loss = 0.0 # 已经训练的样本的总损失
total_loss_list = [] # 每隔100个样本我们计算一下平均损失,并保存
total_acc_num = 0 # 已经训练的样本中预测正确的个数
total_acc_list = []# 每隔100个样本我们计算一下平均准确率,并保存
12.定义GRU模型的训练函数
这里GRU模型不写了,跟RNN大差不差,一样只改名字就好了
13.对比不同模型训练的结果
def compare_results():
# 1.读取rnn模型训练的结果
with open('name_classify_rnn.json', 'r')as fr:
rnn_result = fr.read()
rnn_dict = json.loads(rnn_result)
# print(f'rnn_dict["total_loss_list"]-->{rnn_dict["total_loss_list"]}')
# print(f'rnn_dict["total_time"]-->{rnn_dict["total_time"]}')
# print(f'rnn_dict["total_acc_list"]-->{rnn_dict["total_acc_list"]}')
# 2.读取lstm模型训练的结果
with open('name_classify_lstm.json', 'r') as fr:
lstm_result = fr.read()
lstm_dict = json.loads(lstm_result)
# 3.读取gru模型训练的结果
with open('name_classify_gru.json', 'r') as fr:
gru_result = fr.read()
gru_dict = json.loads(gru_result)
# 4. 对比不同模型损失结果
plt.figure(0)
plt.plot(rnn_dict["total_loss_list"], label="RNN")
plt.plot(lstm_dict["total_loss_list"], color='red', label="LSTM")
plt.plot(gru_dict["total_loss_list"], color='orange', label="GRU")
plt.legend(loc='upper left')
plt.savefig('name_classify_loss.png')
plt.show()
# 5. 对比不同模型时间结果
plt.figure(1)
x_data = ["RNN", "LSTM", "GRU"]
y_data = [rnn_dict["total_time"], lstm_dict["total_time"], gru_dict["total_time"]]
plt.bar(range(len(x_data)), y_data, tick_label=x_data)
plt.savefig('name_classify_time.png')
plt.show()
# 6. 对比不同模型准确率结果
plt.figure(0)
plt.plot(rnn_dict["total_acc_list"], label="RNN")
plt.plot(lstm_dict["total_acc_list"], color='red', label="LSTM")
plt.plot(gru_dict["total_acc_list"], color='orange', label="GRU")
plt.legend(loc='upper left')
plt.savefig('name_classify_acc.png')
plt.show()
# 6. 开始外部迭代
for epoch_idx in range(epochs):
# 6.1 实例化Dataloader的对象
train_dataloader = DataLoader(dataset=name_dataset, batch_size=1, shuffle=True)
# 6.2 开始遍历迭代器,内部迭代
for idx, (tensor_x, tensor_y) in enumerate(tqdm(train_dataloader)):
# 6.3 准备模型需要的数据
tensor_x0 = tensor_x[0] # [seq_length, input_size]
h0, c0 = name_lstm.inithidden()
# print(f'tensor_y--》{tensor_y}')
# 6.4 将数据送入模型得到预测结果output-->shape-->[1, 18]
output, hn, cn = name_lstm(tensor_x0, h0, c0)
# print(f'output--》{output}')
# 6.5 计算损失
my_loss = rnn_nllloss(output, tensor_y)
# print(f'my_loss--》{my_loss}')
# 6.6 梯度清零
rnn_adam.zero_grad()
# 6.7 反向传播
my_loss.backward()
# 6.8 梯度更新
rnn_adam.step()
# 6.9 统计已经训练的样本的总个数
total_iter_num = total_iter_num + 1
# 6.10 统计已经训练的样本总损失
total_loss = total_loss + my_loss.item()
# 6.11 统计已经训练的样本中预测正确的样本总个数
# torch.argmax(output)取出预测结果中最大概率值对应的索引
predict_id = 1 if torch.argmax(output).item() == tensor_y.item() else 0
total_acc_num = total_acc_num + predict_id
# 6.12 每间隔100步,保存平均损失已经平均准确率
if (total_iter_num % 100) == 0:
# 计算平均损失并保存
avg_loss = total_loss / total_iter_num
total_loss_list.append(avg_loss)
# 计算平均准确率并保存
avg_acc = total_acc_num / total_iter_num
total_acc_list.append(avg_acc)
# 6.13 每间隔2000步,打印日志
if (total_iter_num % 2000) == 0:
temp_avg_loss = total_loss / total_iter_num
temp_avg_acc = total_acc_num / total_iter_num
print('轮次:%d, 损失:%.6f, 时间:%d,准确率:%.3f' %(epoch_idx+1, temp_avg_loss, time.time() - start_time, temp_avg_acc))
# 7保存模型
torch.save(name_lstm.state_dict(), './save_model/szAI_lstm_%d.bin'%(epoch_idx+1))
# 8. 计算训练的总时间
total_time = time.time() - start_time
# 9. 将损失列表和准确率列表以及时间保存到字典并存储到文件里
lstm_dict = {"total_loss_list":total_loss_list,
"total_time":total_time,
"total_acc_list":total_acc_list}
with open('name_classify_lstm.json', 'w', encoding='utf-8')as fw:
fw.write(json.dumps(lstm_dict))
14.实现模型的预测
这一定义了三种路径,这个是上面模型生成出来的,可以通过读的方式获取这些模型,读就需要路径,所以要指出来,后面会用。
rnn_model_path = './save_model/szAI_1.bin'
lstm_model_path = './save_model/szAI_lstm_1.bin'
gru_model_path = './save_model/szAI_gru_1.bin'
这里的话定义数据的预处理函数,将人名转换为向量。
def line2tensor(x):
# x--》代表人名--》"zhang"
tensor_x = torch.zeros(len(x), n_letters)
# 将上述的tensor_x变成one-hot
for idx, letter in enumerate(x):
tensor_x[idx][all_letters.find(letter)] = 1
return tensor_x
定义rnn模型的预测函数
def rnn2predict(x):
# 将x人名进行向量的转换
tensor_x = line2tensor(x)
# 实例化模型
input_size = n_letters # 57
hidden_size = 128
output_size = categorynum # 18
my_rnn = NameRNN(input_size, hidden_size, output_size)
# 加载训练好的模型的参数
my_rnn.load_state_dict(torch.load(rnn_model_path))
# 将数据送入模型得到预测结果
with torch.no_grad():
output, hn = my_rnn(tensor_x, my_rnn.inithidden())
# print(f'output--》{output}')
# 取出预测结果的前三个最大的值以及对应的索引
values, indexes = torch.topk(output, k=3, dim=1)
# print(f'values--》{values}')
# print(f'indexes--》{indexes}')
print('rnn预测的结果=========》')
for i in range(3):
value = values[0][i]
idx = indexes[0][i]
predict_label = categorys[idx]
print(f'当前的人名是--》{x}, 预测的国家为:{predict_label}')
定义lstm模型的预测函数
def lstm2predict(x):
# 将x人名进行向量的转换
tensor_x = line2tensor(x)
# 实例化模型
input_size = n_letters # 57
hidden_size = 128
output_size = categorynum # 18
my_rnn = NameLSTM(input_size, hidden_size, output_size)
# 加载训练好的模型的参数
my_rnn.load_state_dict(torch.load(lstm_model_path))
# 将数据送入模型得到预测结果
with torch.no_grad():
h0, c0 = my_rnn.inithidden()
output, hn, cn = my_rnn(tensor_x, h0, c0)
# print(f'output--》{output}')
# 取出预测结果的前三个最大的值以及对应的索引
values, indexes = torch.topk(output, k=3, dim=1)
# print(f'values--》{values}')
# print(f'indexes--》{indexes}')
print('lstm预测的结果=========》')
for i in range(3):
value = values[0][i]
idx = indexes[0][i]
predict_label = categorys[idx]
print(f'当前的人名是--》{x}, 预测的国家为:{predict_label}')
定义gru模型的预测函数
def gru2predict(x):
# 将x人名进行向量的转换
tensor_x = line2tensor(x)
# 实例化模型
input_size = n_letters # 57
hidden_size = 128
output_size = categorynum # 18
my_rnn = NameGRU(input_size, hidden_size, output_size)
# 加载训练好的模型的参数
my_rnn.load_state_dict(torch.load(gru_model_path))
# 将数据送入模型得到预测结果
with torch.no_grad():
output, hn = my_rnn(tensor_x, my_rnn.inithidden())
# print(f'output--》{output}')
# 取出预测结果的前三个最大的值以及对应的索引
values, indexes = torch.topk(output, k=3, dim=1)
# print(f'values--》{values}')
# print(f'indexes--》{indexes}')
print('gru预测的结果=========》')
for i in range(3):
value = values[0][i]
idx = indexes[0][i]
predict_label = categorys[idx]
print(f'当前的人名是--》{x}, 预测的国家为:{predict_label}')
三、总结
以上就是RNN案例人名分类器核心代码。以一个人名为输入, 使用模型帮助我们判断它最有可能是来自哪一个国家的人名, 这在某些国际化公司的业务中具有重要意义, 在用户注册过程中, 会根据用户填写的名字直接给他分配可能的国家或地区选项, 以及该国家或地区的国旗, 限制手机号码位数等等。通过这篇博客,希望可以帮助大家对RNN、LSTM、GRU有更深的理解。
ps:其中有些方法(不一一列举了),我的其他博客有解释,可以去浏览一下。