基于Bert-base-chinese训练多分类文本模型(代码详解)

发布于:2024-09-05 ⋅ 阅读:(93) ⋅ 点赞:(0)

目录

一、简介

二、模型训练

三、模型推理


一、简介

BERT(Bidirectional Encoder Representations from Transformers)是基于深度学习在自然语言处理(NLP)领域近几年出现的、影响深远的创新模型之一。在BERT之前,已经有许多预训练语言模型,如ELMO和GPT,它们展示了预训练模型在NLP任务中的强大性能。然而,这些模型通常基于单向的上下文信息,即只考虑文本中的前向或后向信息,这限制了它们对文本的全局理解BERT旨在通过引入双向上下文信息来解决这一问题,从而更准确地表示文本中的语义信息

与传统的单向语言模型相比,BERT 的核心优势在于:

  • 双向性:BERT通过使用Transformer的编码器结构,能够同时从文本的左右两个方向学习上下文信息,使模型能够更好地理解句子中的每个词的语义。
  • 预训练与微调:通过预训练任务,BERT 可以在多种下游任务上进行快速微调。

其中,Bert-base-chinese模型是一个在简体和繁体中文文本上训练得到的预训练模型

二、模型训练

数据示例如下,现在有一个data.csv文件,包含两列分别是特征(feature)和标签(label)。其中,标签可能是多个分类。

第一步:读取数据并提取出特征和标签
data = pd.read_csv('./data/data.csv', encoding='utf-8')   # 如果表格数据是gbk格式,则修改encoding='gbk'
X = data['feature']   # 特征列
y = data['label'].values   # 标签列
第二步:对标签数据进行编码转换
label_encoder = LabelEncoder()  # 初始化
y_encoded = label_encoder.fit_transform(y)
joblib.dump(label_encoder, './data/encoder.joblib') # 保存 label_encoder 以便以后使用
print(f'分类数:{len(label_encoder.classes_)} \n')  # 标签的类别数量
第三步:划分训练数据集和测试数据集
X_train, X_val, y_train, y_val = train_test_split(X, y_encoded, test_size=0.1,random_state=42)  # test_size=0.1表示训练集和测试集划分比例是9:1 
# random_state=42表示固定随机种子为42,保证每一次分割数据集都是一样的结果
第四步:加载BERT分词器
local_model_path = './bert-base-chinese'   # 模型地址
tokenizer = BertTokenizer.from_pretrained(local_model_path)
tokenizer.save_pretrained(best_model_path)
第五步:将文本数据转换成BERT模型能够理解的格式
def preprocess_for_bert(data, labels):
    input_ids = []
    attention_masks = []

    for sent in data:  # 对每个句子(sent)进行编码处理
        encoded_sent = tokenizer.encode_plus(
            text=sent,  # 要处理的句子
            add_special_tokens=True,  # 添加特殊标记,如句子的起始标记和结束标记
            max_length=256,  # 句子的最大长度为256个标记,超出部分将被截断,不足部分将被填充
            padding='max_length',  # 将句子填充到固定长度(256),不足部分会用0补齐
            return_attention_mask=True,  # 返回注意力掩码,用于标记哪些位置是填充部分,哪些位置是实际的句子内容
            truncation=True  # 如果句子超过了最大长度,进行截断
        )

        input_ids.append(encoded_sent.get('input_ids'))
        attention_masks.append(encoded_sent.get('attention_mask'))

    # 转换为PyTorch张量(tensor),以便后续可以输入到模型中进行训练或推理
    input_ids = torch.tensor(input_ids)
    attention_masks = torch.tensor(attention_masks)
    labels = torch.tensor(labels)

    return input_ids, attention_masks, labels


train_inputs, train_masks, train_labels = preprocess_for_bert(X_train, y_train)
val_inputs, val_masks, val_labels = preprocess_for_bert(X_val, y_val)
第六步:创建训练集DataLoader和测试集DataLoader
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=8)

validation_data = TensorDataset(val_inputs, val_masks, val_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=8)

第七步:加载BERT模型
model = BertForSequenceClassification.from_pretrained(local_model_path, num_labels=len(label_encoder.classes_),ignore_mismatched_sizes=True)
model.cuda()  # 默认使用第一张显卡,如果没有显卡,则可以注释改行代码
第八步:设置优化器和调度器
EPOCHS = 5   # 训练次数,可以自定义修改
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)   # 优化器
total_steps = len(train_dataloader) * EPOCHS   # 训练步数
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)  # 调度器
第九步:设置精确度的计算方式
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)  # 通过比较预测类别和实际标签的相同之处,计算出预测正确的比例
第十步:训练和评估
best_model_path = './model' # 最优模型训练结果的保存路径
best_val_accuracy = 0  # 初始化最优精确度
for epoch in range(EPOCHS):
    model.train()  # 第一步:将模型设置为训练模式
    total_train_loss = 0  # 初始化训练总损失为0

    for step, batch in enumerate(train_dataloader):  # 第二步:加载训练集DataLoader
        b_input_ids = batch[0].cuda()  # 如果没有显卡,则可以将.cuda给删除了
        b_input_mask = batch[1].cuda()  # 如果没有显卡,则可以将.cuda给删除了
        b_labels = batch[2].cuda().long()  # 如果没有显卡,则可以将.cuda给删除了

        model.zero_grad() # 清除模型的梯度

        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)  # 第三步:将输入数据传递给模型,得到模型的输出
        loss = outputs.loss # 第四步:提取出损失值,用于后续的反向传播
        total_train_loss += loss.item()
        loss.backward() # 第五步:进行反向传播,计算梯度
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_train_loss / len(train_dataloader)  # 第六步:更新学习率

    torch.cuda.empty_cache()  # 训练一轮就清空一次显卡缓存,如果没有显卡,则注释

    # 第七步:模型测试,计算准确度,处理逻辑和训练差不多

    model.eval()
    total_eval_accuracy = 0
    total_eval_loss = 0

    for batch in validation_dataloader:  # 加载测试集DataLoader
        b_input_ids = batch[0].cuda()
        b_input_mask = batch[1].cuda()
        b_labels = batch[2].cuda().long()

        with torch.no_grad():
            outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)

        loss = outputs.loss
        total_eval_loss += loss.item()
        logits = outputs.logits
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()
        total_eval_accuracy += flat_accuracy(logits, label_ids)

    avg_val_accuracy = total_eval_accuracy / len(validation_dataloader)
    avg_val_loss = total_eval_loss / len(validation_dataloader)

    torch.cuda.empty_cache() # 验证一轮就清空一次显卡缓存,如果没有显卡,则注释

    print(f'Training loss: {avg_train_loss}')
    print(f'Validation loss: {avg_val_loss}')
    print(f'Validation Accuracy: {avg_val_accuracy}')   # 主要看这个精度,一般准确率90%以上就可以投入实际生产环境中

    # 在验证集上计算准确率
    if avg_val_accuracy > best_val_accuracy:
        best_val_accuracy = avg_val_accuracy
        # 保存模型
        model.save_pretrained(best_model_path)   # 根据训练次数,保存最优的一个模型结果
完整代码如下:
import pandas as pd
import numpy as np
import joblib
import torch
import time

from transformers import BertTokenizer, BertForSequenceClassification
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, TensorDataset, random_split
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup

# 读取数据
data = pd.read_csv('./data/data.csv', encoding='utf-8')   # 如果表格数据是gbk,则修改encoding='gbk'

# 最优模型训练结果的保存路径
best_model_path = './model'

X = data['feature']   # 特征列
y = data['label'].values   # 标签列

# 对标签数据进行编码转换
print("1、开始编码转换啦~")
label_encoder = LabelEncoder()  # 初始化
#label_encoder = joblib.load('./data/encoder.joblib')   # 当你使用同样的data第二次运行脚本时,就可以直接加载上一次保存的编码结果,而不需要重复编码(除非两次加载的数据有变动)
y_encoded = label_encoder.fit_transform(y)
print(f'分类数:{len(label_encoder.classes_)} \n')  # 标签的类别数量

# 保存 label_encoder 以便以后使用
joblib.dump(label_encoder, './data/encoder.joblib')

# 分割数据集
X_train, X_val, y_train, y_val = train_test_split(X, y_encoded, test_size=0.1,random_state=42)  # 这里训练和测试数据集比例是9:1,test_size=0.2或者0.3  固定随机种子42,保证每一次分割数据集都是一样的

# 加载BERT分词器
local_model_path = './bert-base-chinese'
tokenizer = BertTokenizer.from_pretrained(local_model_path)
tokenizer.save_pretrained(best_model_path)

# BERT预处理 -- 将文本数据转换成BERT模型能够理解的格式
def preprocess_for_bert(data, labels):
    input_ids = []
    attention_masks = []

    for sent in data:  # 对每个句子(sent)进行编码处理
        encoded_sent = tokenizer.encode_plus(
            text=sent,  # 要处理的句子
            add_special_tokens=True,  # 添加特殊标记,如句子的起始标记和结束标记
            max_length=256,  # 句子的最大长度为256个标记,超出部分将被截断,不足部分将被填充
            padding='max_length',  # 将句子填充到固定长度(256),不足部分会用0补齐
            return_attention_mask=True,  # 返回注意力掩码,用于标记哪些位置是填充部分,哪些位置是实际的句子内容
            truncation=True  # 如果句子超过了最大长度,进行截断
        )

        input_ids.append(encoded_sent.get('input_ids'))
        attention_masks.append(encoded_sent.get('attention_mask'))

    # 转换为PyTorch张量(tensor),以便后续可以输入到模型中进行训练或推理
    input_ids = torch.tensor(input_ids)
    attention_masks = torch.tensor(attention_masks)
    labels = torch.tensor(labels)

    return input_ids, attention_masks, labels


# 预处理数据
print("2、开始预处理数据啦~")
train_inputs, train_masks, train_labels = preprocess_for_bert(X_train, y_train)
val_inputs, val_masks, val_labels = preprocess_for_bert(X_val, y_val)

# 创建DataLoader
train_data = TensorDataset(train_inputs, train_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=8)

validation_data = TensorDataset(val_inputs, val_masks, val_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=8)

# 加载BERT模型
print("3、开始预加载模型啦~")
model = BertForSequenceClassification.from_pretrained(local_model_path, num_labels=len(label_encoder.classes_),ignore_mismatched_sizes=True)
model.cuda()  # 默认使用第一张显卡

# 设置优化器和调度器
EPOCHS = 5   # 训练次数,可以先训练5次看看效果,可以自定义修改
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)   # 优化器
total_steps = len(train_dataloader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# 计算精确度 -- 通过比较预测类别和实际标签的相同之处,计算出预测正确的比例
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

# 训练和评估
print("4、开始训练啦~")
best_val_accuracy = 0
for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}')
    now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())   # 记录每一轮的训练开始时间和结束时间
    print("start time:", now_time)

    model.train()  # 模型设置为训练模式
    total_train_loss = 0  # 初始化训练总损失为0

    for step, batch in enumerate(train_dataloader):
        b_input_ids = batch[0].cuda()
        b_input_mask = batch[1].cuda()
        b_labels = batch[2].cuda().long()

        model.zero_grad() # 清除模型的梯度

        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)  # 将输入数据传递给模型,得到模型的输出
        loss = outputs.loss # 提取出损失值,用于后续的反向传播
        total_train_loss += loss.item()
        loss.backward() # 进行反向传播,计算梯度
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()

    avg_train_loss = total_train_loss / len(train_dataloader)  # 更新学习率

    torch.cuda.empty_cache()  # 训练一轮就清空一次显卡缓存
    # 模型测试,计算准确度
    model.eval()
    total_eval_accuracy = 0
    total_eval_loss = 0

    for batch in validation_dataloader:
        b_input_ids = batch[0].cuda()
        b_input_mask = batch[1].cuda()
        b_labels = batch[2].cuda().long()

        with torch.no_grad():
            outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)

        loss = outputs.loss
        total_eval_loss += loss.item()
        logits = outputs.logits
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()
        total_eval_accuracy += flat_accuracy(logits, label_ids)

    avg_val_accuracy = total_eval_accuracy / len(validation_dataloader)
    avg_val_loss = total_eval_loss / len(validation_dataloader)

    torch.cuda.empty_cache() # 验证一轮就清空一次显卡缓存

    print(f'Training loss: {avg_train_loss}')
    print(f'Validation loss: {avg_val_loss}')
    print(f'Validation Accuracy: {avg_val_accuracy}')   # 主要看这个精度,一般准确率90%以上就可以投入实际生产环境中

    # 在验证集上计算准确率
    if avg_val_accuracy > best_val_accuracy:
        best_val_accuracy = avg_val_accuracy
        # 保存模型
        model.save_pretrained(best_model_path)   # 根据训练次数,保存最优的一个模型结果

    now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    print("end time:",now_time)
    print("-------------------")


三、模型推理

模型训练完成后,现在有一批新数据,你想要使用训练好的模型预测该文本数据的分类结果,则可以使用下面的推理代码,详解看注释。

import pandas as pd
import time
import torch
import joblib
from transformers import BertTokenizer, BertForSequenceClassification
import torch.nn.functional as F

# 第一步:加载数据
file_path = './data/detect.csv'  # 要推理的数据路径
df = pd.read_csv(file_path, encoding='utf-8')

# 第二步:加载训练好的模型
best_model_path = './model'
model = BertForSequenceClassification.from_pretrained(best_model_path)
tokenizer = BertTokenizer.from_pretrained(best_model_path)

# 第三步:加载编码(训练时保存的结果)
label_encoder = joblib.load('./data/encoder.joblib')

predictions = []  # 预测值
confidence_scores = []  # 可信度,一般可信度大于0.9说明效果比较准确

# 第四步:遍历推理数据
for row in df.iterrows():
    content = row[1]['feature']  # 特征列(推理样本)
    inputs = tokenizer(content, return_tensors="pt", padding=True, truncation=True, max_length=256)
    outputs = model(**inputs)
    probs = F.softmax(outputs.logits, dim=1)
    pred = torch.argmax(probs, dim=1)
    confidence = torch.max(probs, dim=1)   # 获取置信度的值

    predictions.append(pred.item())
    confidence_scores.append(confidence.values.item())

# 第五步:将预测结果解码为类别标签
decoded_categories = label_encoder.inverse_transform(predictions)

# 第六步:创建一个空的DataFrame来存储推理结果
df['pred'] = decoded_categories
df['confidence_score'] = confidence_scores

# 将结果保存到本地
output_file_path = './data/detect_pred.csv'  # 保存推理结果的路径
df.to_csv(output_file_path, index=False)

参考文章链接:https://blog.csdn.net/yihong23/article/details/138543746