目标
本文基于预训练模型bert
分词器BertTokenizer
,将输入的文本以文本对的形式,送入到分词器中得到文本对的词嵌入向量,之后经过若干网络层,输出在已知2类别匹配或不匹配的概率分布,从而实现一个简单的句子对级别的匹配任务。
数据准备
预训练模型bert-base-chinese预训练模型
类别标签文件schema.json
{
"停机保号": 0,
"密码重置": 1,
"宽泛业务问题": 2,
"亲情号码设置与修改": 3,
"固话密码修改": 4,
"来电显示开通": 5,
"亲情号码查询": 6,
"密码修改": 7,
"无线套餐变更": 8,
"月返费查询": 9,
"移动密码修改": 10,
"固定宽带服务密码修改": 11,
"UIM反查手机号": 12,
"有限宽带障碍报修": 13,
"畅聊套餐变更": 14,
"呼叫转移设置": 15,
"短信套餐取消": 16,
"套餐余量查询": 17,
"紧急停机": 18,
"VIP密码修改": 19,
"移动密码重置": 20,
"彩信套餐变更": 21,
"积分查询": 22,
"话费查询": 23,
"短信套餐开通立即生效": 24,
"固话密码重置": 25,
"解挂失": 26,
"挂失": 27,
"无线宽带密码修改": 28
}
训练集数据train.json训练集数据
验证集数据valid.json验证集数据
参数配置
config.py
# -*- coding: utf-8 -*-
"""
配置参数信息
"""
Config = {
"model_path": "model_output",
"schema_path": "../data/schema.json",
"train_data_path": "../data/train.json",
"valid_data_path": "../data/valid.json",
"pretrain_model_path":r"../../../bert-base-chinese",
"vocab_path":r"../../../bert-base-chinese/vocab.txt",
"max_length": 20,
"hidden_size": 256,
"epoch": 10,
"batch_size": 128,
"epoch_data_size": 10000, #每轮训练中采样数量
"positive_sample_rate":0.5, #正样本比例
"optimizer": "adam",
"learning_rate": 1e-3,
}
数据处理
loader.py
# -*- coding: utf-8 -*-
import json
import re
import os
import torch
import random
import logging
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from transformers import BertTokenizer
"""
数据加载
"""
logging.getLogger("transformers").setLevel(logging.ERROR)
class DataGenerator:
def __init__(self, data_path, config):
self.config = config
self.path = data_path
self.tokenizer = load_vocab(config["vocab_path"])
self.config["vocab_size"] = len(self.tokenizer.vocab)
self.schema = load_schema(config["schema_path"])
self.train_data_size = config["epoch_data_size"] #由于采取随机采样,所以需要设定一个采样数量,否则可以一直采
self.max_length = config["max_length"]
self.data_type = None #用来标识加载的是训练集还是测试集 "train" or "test"
self.load()
def load(self):
self.data = []
self.knwb = defaultdict(list)
with open(self.path, encoding="utf8") as f:
for line in f:
line = json.loads(line)
#加载训练集
if isinstance(line, dict):
self.data_type = "train"
questions = line["questions"]
label = line["target"]
for question in questions:
self.knwb[self.schema[label]].append(question)
#加载测试集
else:
self.data_type = "test"
assert isinstance(line, list)
question, label = line
label_index = torch.LongTensor([self.schema[label]])
self.data.append([question, label_index])
return
#每次加载两个文本,输出他们的拼接后编码
def encode_sentence(self, text1, text2):
input_id = self.tokenizer.encode(text1, text2,
truncation='longest_first',
max_length=self.max_length,
padding='max_length',
)
return input_id
def __len__(self):
if self.data_type == "train":
return self.config["epoch_data_size"]
else:
assert self.data_type == "test", self.data_type
return len(self.data)
def __getitem__(self, index):
if self.data_type == "train":
return self.random_train_sample() #随机生成一个训练样本
else:
return self.data[index]
#依照一定概率生成负样本或正样本
#负样本从随机两个不同的标准问题中各随机选取一个
#正样本从随机一个标准问题中随机选取两个
def random_train_sample(self):
standard_question_index = list(self.knwb.keys())
#随机正样本
if random.random() <= self.config["positive_sample_rate"]:
p = random.choice(standard_question_index)
#如果选取到的标准问下不足两个问题,则无法选取,所以重新随机一次
if len(self.knwb[p]) < 2:
return self.random_train_sample()
else:
s1, s2 = random.sample(self.knwb[p], 2)
input_ids = self.encode_sentence(s1, s2)
input_ids = torch.LongTensor(input_ids)
return [input_ids, torch.LongTensor([1])]
#随机负样本
else:
p, n = random.sample(standard_question_index, 2)
s1 = random.choice(self.knwb[p])
s2 = random.choice(self.knwb[n])
input_ids = self.encode_sentence(s1, s2)
input_ids = torch.LongTensor(input_ids)
return [input_ids, torch.LongTensor([0])]
#加载字表或词表
def load_vocab(vocab_path):
tokenizer = BertTokenizer(vocab_path)
return tokenizer
#加载schema
def load_schema(schema_path):
with open(schema_path, encoding="utf8") as f:
return json.loads(f.read())
#用torch自带的DataLoader类封装数据
def load_data(data_path, config, shuffle=True):
dg = DataGenerator(data_path, config)
dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)
return dl
这段目的是用于构建一个数据加载和处理系统,主要针对自然语言处理(NLP)任务中的文本对比训练,例如句子相似度、问答匹配等任务。具体来说,代码实现了以下功能:
数据加载与解析:
- 通过
DataGenerator
类从指定的文件路径加载数据,支持训练集和测试集的加载。 - 对于训练集数据,支持从标准问题中随机选择问题对,生成正样本(相同标签的两个问题)和负样本(不同标签的两个问题)。
- 对于测试集数据,加载并按序存储问题和标签。
- 通过
数据编码:
- 使用
BertTokenizer
对文本进行分词和编码,将输入文本(这里输入的是文本对
)转化为BERT模型可以处理的格式(即input_ids
)。
- 使用
正负样本生成:
- 通过
random_train_sample
函数,生成正样本和负样本。正样本是从同一类别中随机选择两个问题,而负样本是从不同类别中各选择一个问题。
- 通过
数据批量加载:
- 利用
torch.utils.data.DataLoader
将数据封装成批量加载格式,方便在训练过程中按批次加载数据,提高训练效率。
- 利用
总结
核心目的是为训练深度学习模型(如BERT等基于Transformer的模型)提供高效、灵活的数据加载和处理流程,特别是用于训练包含正负样本对比学习任务的模型。它支持从外部文件加载文本数据,进行编码,并生成带有标签的数据对,以供模型训练和评估使用。
模型构建
model.py
# -*- coding: utf-8 -*-
import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from transformers import BertModel, BertConfig
"""
建立网络模型结构
"""
class GetFirst(nn.Module):
def __init__(self):
super(GetFirst, self).__init__()
def forward(self, x):
return x[0]
class SentenceMatchNetwork(nn.Module):
def __init__(self, config):
super(SentenceMatchNetwork, self).__init__()
# 可以用bert,参考下面
# pretrain_model_path = config["pretrain_model_path"]
# self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)
# 常规的embedding + layer
hidden_size = config["hidden_size"]
#20000应为词表大小,这里借用bert的词表,没有用它精确的数字,因为里面有很多无用词,舍弃一部分,不影响效果
self.embedding = nn.Embedding(20000, hidden_size)
#一种多层按顺序执行的写法,具体的层可以换
#unidirection:batch_size, max_len, hidden_size
#bidirection:batch_size, max_len, hidden_size * 2
self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),
GetFirst(),
nn.ReLU(),
nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_size
nn.ReLU(),
)
self.classify_layer = nn.Linear(hidden_size, 2)
self.loss = nn.CrossEntropyLoss()
# 同时传入两个句子的拼接编码
# 输出一个相似度预测,不匹配的概率
def forward(self, input_ids, target=None):
# x = self.bert_encoder(input_ids)[1]
#input_ids = batch_size, max_length
x = self.embedding(input_ids) #x:batch_size, max_length, embedding_size
x = self.encoder(x) #
#x: batch_size, max_len, hidden_size
x = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()
#x: batch_size, hidden_size
x = self.classify_layer(x)
#x: batch_size, 2
#如果有标签,则计算loss
if target is not None:
return self.loss(x, target.squeeze())
#如果无标签,预测相似度
else:
return torch.softmax(x, dim=-1)[:, 1] #如果改为x[:,0]则是两句话不匹配的概率
def choose_optimizer(config, model):
optimizer = config["optimizer"]
learning_rate = config["learning_rate"]
if optimizer == "adam":
return Adam(model.parameters(), lr=learning_rate)
elif optimizer == "sgd":
return SGD(model.parameters(), lr=learning_rate)
实现一个用于句子匹配任务的神经网络模型,具体结构包括以下部分:
GetFirst类:该类用于提取LSTM输出的第一个时间步的结果。它的作用是从LSTM的双向输出中获取第一个词的向量表示。
SentenceMatchNetwork类:
- 初始化:该模型首先定义了一个嵌入层(
nn.Embedding
),将输入的单词ID转化为词向量。然后通过双向LSTM进行编码,提取句子的特征。 - 编码器:使用LSTM(
nn.LSTM
)对输入句子进行处理,并应用GetFirst
提取LSTM输出的第一个时间步的特征。接着通过全连接层进行处理和降维。 - 分类层:将LSTM的输出传入一个全连接层,输出两类标签(匹配或不匹配)。使用
CrossEntropyLoss
计算损失。
- 初始化:该模型首先定义了一个嵌入层(
forward方法:此方法接收输入句子的ID和标签(如果有)。如果提供了标签,则计算损失;如果没有标签,则返回句子匹配的概率。
choose_optimizer函数:根据配置选择优化器(Adam或SGD)及学习率。
主程序
main.py
# -*- coding: utf-8 -*-
import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SentenceMatchNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_data
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
"""
模型训练主程序
"""
def main(config):
#创建保存模型的目录
if not os.path.isdir(config["model_path"]):
os.mkdir(config["model_path"])
#加载训练数据
train_data = load_data(config["train_data_path"], config)
#加载模型
model = SentenceMatchNetwork(config)
# 标识是否使用gpu
cuda_flag = torch.cuda.is_available()
if cuda_flag:
logger.info("gpu可以使用,迁移模型至gpu")
model = model.cuda()
#加载优化器
optimizer = choose_optimizer(config, model)
#加载效果测试类
evaluator = Evaluator(config, model, logger)
#训练
for epoch in range(config["epoch"]):
epoch += 1
model.train()
logger.info("epoch %d begin" % epoch)
train_loss = []
for index, batch_data in enumerate(train_data):
optimizer.zero_grad()
if cuda_flag: #如果gpu可用则使用gpu加速
batch_data = [d.cuda() for d in batch_data]
input_ids, labels = batch_data
loss = model(input_ids, labels) #计算loss
train_loss.append(loss.item())
#每轮训练一半的时候输出一下loss,观察下降情况
if index % int(len(train_data) / 2) == 0:
logger.info("batch loss %f" % loss)
loss.backward() #梯度计算
optimizer.step() #梯度更新
logger.info("epoch average loss: %f" % np.mean(train_loss))
evaluator.eval(config["epoch"])
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)
torch.save(model.state_dict(), model_path)
return
if __name__ == "__main__":
main(Config)
实现模型训练过程。首先,它加载训练数据并初始化模型,选择是否使用GPU加速。训练过程中,模型计算损失并通过反向传播更新参数,每个epoch结束后输出平均损失值。优化器用于更新模型权重,并通过Evaluator
类评估模型效果。最终,训练完成后,模型权重被保存到指定路径。整个过程确保了训练的高效性和可持续性,适用于深度学习任务中的模型训练与保存。
测试与评估
evaluate.py
# -*- coding: utf-8 -*-
import torch
from loader import load_data
import numpy as np
"""
模型效果测试
"""
class Evaluator:
def __init__(self, config, model, logger):
self.config = config
self.model = model
self.logger = logger
self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
# 由于效果测试需要训练集当做知识库,再次加载训练集。
# 事实上可以通过传参把前面加载的训练集传进来更合理,但是为了主流程代码改动量小,在这里重新加载一遍
self.train_data = load_data(config["train_data_path"], config)
self.tokenizer = self.train_data.dataset.tokenizer
self.stats_dict = {"correct":0, "wrong":0} #用于存储测试结果
#将知识库中的问题向量化,为匹配做准备
#每轮训练的模型参数不一样,生成的向量也不一样,所以需要每轮测试都重新进行向量化
def knwb_to_vector(self):
self.question_index_to_standard_question_index = {}
self.questions = []
for standard_question_index, questions in self.train_data.dataset.knwb.items():
for question in questions:
#记录问题编号到标准问题标号的映射,用来确认答案是否正确
self.question_index_to_standard_question_index[len(self.questions)] = standard_question_index
self.questions.append(question)
return
def eval(self, epoch):
self.logger.info("开始测试第%d轮模型效果:" % epoch)
self.stats_dict = {"correct":0, "wrong":0} #清空前一轮的测试结果
self.model.eval()
self.knwb_to_vector()
for index, batch_data in enumerate(self.valid_data):
test_questions, labels = batch_data
predicts = []
for test_question in test_questions:
input_ids = []
for question in self.questions:
input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))
with torch.no_grad():
input_ids = torch.LongTensor(input_ids)
if torch.cuda.is_available():
input_ids = input_ids.cuda()
scores = self.model(input_ids).detach().cpu().tolist()
hit_index = np.argmax(scores)
# print(hit_index)
predicts.append(hit_index)
self.write_stats(predicts, labels)
self.show_stats()
return
def write_stats(self, predicts, labels):
assert len(labels) == len(predicts)
for hit_index, label in zip(predicts, labels):
hit_index = self.question_index_to_standard_question_index[hit_index] #转化成标准问编号
if int(hit_index) == int(label):
self.stats_dict["correct"] += 1
else:
self.stats_dict["wrong"] += 1
return
def show_stats(self):
correct = self.stats_dict["correct"]
wrong = self.stats_dict["wrong"]
self.logger.info("预测集合条目总量:%d" % (correct +wrong))
self.logger.info("预测正确条目:%d,预测错误条目:%d" % (correct, wrong))
self.logger.info("预测准确率:%f" % (correct / (correct + wrong)))
self.logger.info("--------------------")
return
实现一个用于模型效果评估的Evaluator
类。该类初始化时加载了验证数据和训练数据,其中训练数据被当作知识库用于匹配。knwb_to_vector
方法将训练数据中的问题向量化,并建立问题编号与标准问题编号的映射,以便在评估时正确匹配答案。eval
方法执行模型的评估过程,遍历验证数据集并将每个问题与知识库中的问题进行匹配,通过模型预测结果并计算正确与错误的预测。每次评估完成后,write_stats
方法记录正确和错误的预测次数,show_stats
方法则输出最终的评估结果,包括预测正确率。
总结
基于交互型文本匹配,每次都以句子对或文本对输入,输入到交互层,统一编码形式后经过网络层,最后到二分类表示层(当然也可以设置为多种关系,比如中性等,只不过这时数据采样部分的标签也要同步,损失函数也要对应适配),表示文本的配对情况。交互型文本匹配重点把握文本对之间的潜在关系,实际上也是一种对比学习思想。