实战4. Python利用Bow、TF-iDF和预训练模型解决文本分类问题

发布于:2025-04-08 ⋅ 阅读:(42) ⋅ 点赞:(0)

在这个任务中,我们将尝试解决文本分类问题,并且在这样做的过程中,我们将尝试使用课程6中讨论的单词的信息向量描述。

解决思路:
我们将尝试建立一个基于神经网络和逻辑回归的二元分类器。将某个特征向量作为输入提供给它。输出是构建的逻辑回归模型的预测值(一个数字)。

建议使用文本的向量表示作为特征。这可以是经典的词袋模型,也可以是嵌入袋模型。因此,模型必须根据每个文本的给定向量表示来预测相应类别的概率。

我们需要使用 PyTorch 库自己实现 sklearn.linear_model.LogisticRegression 的类似功能。

功能包

# __________块的开始__________
from collections import Counter

import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator
from sklearn.linear_model import LogisticRegression
from sklearn import naive_bayes
from sklearn.metrics import roc_auc_score, roc_curve, accuracy_score

import torch
from torch import nn
from torch.nn import functional as F
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau



from IPython import display
import matplotlib.pyplot as plt
%matplotlib inline


out_dict = dict()
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# __________块结束__________

文本预处理和标记化

首先,我们下载源数据:

df = pd.read_csv( # 读取原始数据集,制表符分隔符,无表头
    'https://github.com/clairett/pytorch-sentiment-classification/raw/master/data/SST2/train.tsv',
    delimiter='\t',
    header=None
)
df.head(5) # 我们来看一下读取数据集中的前 5 行

输出:
在这里插入图片描述
原始数据集是各种电影的评论列表,以及相应评论的情绪(正面或负面)。您需要解决情绪分析问题,即建立一个二元分类器,让我们根据已知的评论判断它是正面的还是负面的。

输入数据的预处理非常简单,在代码的注释中详细描述。

nltk 库在文本处理中被广泛使用。您可以在上面的链接找到它的详细描述和文档。

# __________start of block__________

texts_train = df[0].values[:5000] # 我们选择前5000个句子作为训练样本
y_train = df[1].values[:5000] # 每个句子都有一个对应的类别标签——一个整数
texts_test = df[0].values[5000:] # 我们使用所有剩余的句子作为测试样本
y_test = df[1].values[5000:]

from nltk.tokenize import WordPunctTokenizer
tokenizer = WordPunctTokenizer() # 我们将使用单个单词和标点符号作为标记

# 作为预处理步骤,我们将文本转换为小写
# 使用 tokenizer 对其进行分词,最后将分词结果用空格连接成一个新的字符串
preprocess = lambda text: ' '.join(tokenizer.tokenize(text.lower()))

text = 'How to be a grown-up at work: replace "I don\'t want to do that" with "Ok, great!".'
print("before:", text,)
print("after:", preprocess(text),) # 让我们看看给定字符串文本的预处理是如何进行的

# 使用列表推导式对 texts_train 和 texts_test 中的每个文本进行预处理
texts_train = [preprocess(text) for text in texts_train] # 获取训练样本的预处理表示
texts_test = [preprocess(text) for text in texts_test] # 类似地,我们获得测试样本的预处理表示

# 使用 assert 语句进行数据检查
# 对训练和测试样本的处理是否正确进行了小幅检查
assert texts_train[5] ==  'campanella gets the tone just right funny in the middle of sad in the middle of hopeful'
assert texts_test[74] == 'poetry in motion captured on film'
assert len(texts_test) == len(y_test)
# __________end of block__________

输出:
before: How to be a grown-up at work: replace “I don’t want to do that” with “Ok, great!”.
after: how to be a grown - up at work : replace " i don ’ t want to do that " with " ok , great !".

以下函数将帮助您可视化网络训练过程:

# __________start of block__________
# 绘制训练过程中的损失和准确率曲线
def plot_train_process(train_loss, val_loss, train_accuracy, val_accuracy, title_suffix=''):
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    axes[0].set_title(' '.join(['Loss', title_suffix]))
    axes[0].plot(train_loss, label='train')
    axes[0].plot(val_loss, label='validation')
    axes[0].legend() # 方便区分训练集和验证集的曲线

    axes[1].set_title(' '.join(['Validation accuracy', title_suffix]))
    axes[1].plot(train_accuracy, label='train')
    axes[1].plot(val_accuracy, label='validation')
    axes[1].legend()
    plt.show()

# 绘制模型在训练集和测试集上的 ROC 曲线,并计算 AUC 值
def visualize_and_save_results(model, model_name, X_train, X_test, y_train, y_test, out_dict):
    for data_name, X, y, model in [
    ('train', X_train, y_train, model),
    ('test', X_test, y_test, model)
    ]:
        if isinstance(model, BaseEstimator):
            proba = model.predict_proba(X)[:, 1]
        elif isinstance(model, nn.Module):
            proba = model(X).detach().cpu().numpy()[:, 1]
        else:
            raise ValueError('Unrecognized model type')
            
		# 使用 roc_auc_score 函数计算真实标签和预测概率之间的 AUC 值
        auc = roc_auc_score(y, proba)

        out_dict['{}_{}'.format(model_name, data_name)] = auc
        # 使用 roc_curve 函数计算 ROC 曲线的坐标,并绘制曲线,同时添加标签显示数据集名称和 AUC 值。
        plt.plot(*roc_curve(y, proba)[:2], label='%s AUC=%.4f' % (data_name, auc))

    plt.plot([0, 1], [0, 1], '--', color='black',)
    plt.legend(fontsize='large')
    plt.title(model_name)
    plt.grid()
    return out_dict
# __________end of block__________

特别注意:

  • model参数可以是 sklearn 中的 BaseEstimator 类型,也可以是 torch.nn.Module 类型
  • 如果模型是 BaseEstimator 类型,使用 predict_proba 方法计算预测概率。
  • 如果模型是 nn.Module 类型,将输入数据传入模型,然后将输出转换为 NumPy 数组,并提取第二列的概率值。

任务1. 词袋模型(BoW)

课程6探讨了将单个标记表示为向量的方法。然而,在现实生活中我们处理的是完整的文本。

我们从课程6中了解到,文本是一系列的标记,其长度不固定。因此,如果每个文本都用一个矩阵表示,其中每一列都是下一个标记的向量描述,我们将收到不同文本的不同大小的矩阵。我们知道,这样的矩阵不适合传输到神经网络,因为网络输入数据的大小总是固定的。因此,我们需要学习将任何文本表示为某个固定大小的张量(向量或矩阵)。

最简单的方法是使用经典的文本矢量化方法:词袋(BoW)。课程6中提到,在这种方法中,每个标记都可以编码为一个稀疏向量,其大小将等于字典的大小。因此,为了获得整个文本的向量表示,只需将其中包含的所有标记的向量表示相加即可。

我们首先要看的是 One-hot 标记编码。在这种情况下,我们得出的结论是,文本将由一个长度等于字典大小的向量表示,该向量中的第 N 个值将是原始文本中索引为 N 的单词的使用次数。

为了实现这种方法,您可以使用sklearn中的CountVectorizer,也可以自己实现。请注意,此任务仅使用训练样本中出现频率最高的k个单词。

# __________start of block__________

# 我们只选择文本中最流行的 k 个单词
k = min(10000, len(set(' '.join(texts_train).split()))) 
# 此方法是如果词典中的单词少于 10,000 个,则我们将选取所有单词,否则我们将选择最受欢迎的 10,000 个单词

# 让我们建立一个包含训练样本中所有唯一单词的词典,只留下最流行的 k 个单词。

counts = Counter(' '.join(texts_train).split()) # 使用Counter统计训练集中每个单词出现的次数
bow_vocabulary = [key for key, val in counts.most_common(k)] # 选取出现次数最多的k个单词


def text_to_bow(text):
    """ 基于词袋模型将输入字符串转换为向量表示的函数。 """
    sent_vec = np.zeros(len(bow_vocabulary))
    counts = Counter(text.split())
    # 通过遍历词典中的每个单词token,检查其是否在输入文本的单词统计结果中。如果存在,则将sent_vec中对应位置的值设置为该单词在输入文本中的出现次数。
    for i, token in enumerate(bow_vocabulary):
        if token in counts:
            sent_vec[i] = counts[token]
    return np.array(sent_vec, 'float32')

# 使用map函数将text_to_bow函数应用到训练集texts_train的每个文本上
# 再用np.stack将其转换为二维数组
# np.stack用于将多个数组按指定维度堆叠起来,这里按默认的第 0 维堆叠,形成一个形状为(len(texts_train), len(bow_vocabulary))的数组。
X_train_bow = np.stack(list(map(text_to_bow, texts_train)))
X_test_bow = np.stack(list(map(text_to_bow, texts_test)))

# 小检查 - 如果您想实现自己的词袋模型,则需要这些。
k_max = len(set(' '.join(texts_train).split()))
assert X_train_bow.shape == (len(texts_train), min(k, k_max))
assert X_test_bow.shape == (len(texts_test), min(k, k_max))
assert np.all(X_train_bow[5:10].sum(-1) == np.array([len(s.split()) for s in  texts_train[5:10]]))
assert len(bow_vocabulary) <= min(k, k_max)
assert X_train_bow[65, bow_vocabulary.index('!')] == texts_train[65].split().count('!')


# 我们为获得的文本向量表示建立一个逻辑回归模型
bow_model = LogisticRegression(max_iter=1500).fit(X_train_bow, y_train) # 最大迭代次数为 1500 次

out_dict = visualize_and_save_results(bow_model, 'bow_log_reg_sklearn', X_train_bow, X_test_bow, y_train, y_test, out_dict)
# __________end of block__________

输出:
在这里插入图片描述
结果还不错,但是过度拟合明显可见。该结论可以通过训练样本相对于测试样本的显著质量优势(AUC ROC)来判断。此外,在训练样本上,质量趋向于一,而在延迟样本上,质量明显较低,即模型捕获了许多仅特定于训练样本的依赖关系。机器学习课程中讨论了过度拟合的问题。随着您学习课程的进展,将会多次更详细地讨论它。

我们稍后会处理这个过度拟合的问题。现在实现一个基于逻辑回归的解决方案,但使用 PyTorch。因此,您应该有一个经过训练的模型,可以预测这两个类别的概率。测试样本的质量应该不比逻辑回归差。

因此,我们需要实现一个单层线性网络,使用softmax作为激活函数,输出数量等于类数。

# 使用 PyTorch 实现逻辑回归模型(类似于 sklearn.linear_model.LogisticRegression)
model = nn.Sequential(
    nn.Linear(X_train_bow.shape[1], 2),  # 输入特征数量为X_train_bow的列数(即词袋模型中词典的大小),输出为2,因为是二分类问题
    nn.Softmax(dim=1) # dim=1表示在维度 1 上进行操作,即将每个样本的两个输出值转换为概率值,使它们的和为 1。
)

不要忘记损失函数:nn.CrossEntropyLoss 结合了 LogSoftMaxNLLLoss。也不要忘记将张量传输到所使用的device的需要。

loss_function = nn.CrossEntropyLoss() # 交叉熵损失(多分类使用的损失函数)
opt = torch.optim.Adam(model.parameters(), lr=1e-3) # Adam优化器,学习率为0.001
# 将 numpy 数组转换为 torch 张量
X_train_bow_torch = torch.tensor(X_train_bow, dtype=torch.float32) 
X_test_bow_torch = torch.tensor(X_test_bow, dtype=torch.float32) 

# torch.long的原因:交叉熵损失函数要求标签是长整型
y_train_torch = torch.tensor(y_train, dtype=torch.long) 
y_test_torch = torch.tensor(y_test, dtype=torch.long) 

以下函数将有助于训练模型:

def train_model(
    model,
    opt,
    X_train_torch,
    y_train_torch,
    X_val_torch,
    y_val_torch,
    n_iterations=500,
    batch_size=32,
    show_plots=True,
    eval_every=50
):
    train_loss_history = []
    train_acc_history = []
    val_loss_history = []
    val_acc_history = []

    local_train_loss_history = []
    local_train_acc_history = []
    # 进行n_iterations次迭代训练
    for i in range(n_iterations):

        # 获取大小为 batch_size 的随机批次用于训练
        ix = np.random.randint(0, len(X_train_torch), batch_size)
        # 根据随机索引获取一个小批次的训练数据
        x_batch = X_train_torch[ix]
        y_batch = y_train_torch[ix]

        # 预测响应(对数概率或 logits)
        y_predicted = model(x_batch)

        # 按照上述方法计算损失
        loss = loss_function(y_predicted, y_batch)

        # 计算梯度
        loss.backward()

        # Adam step
        opt.step()

        # clear gradients
        opt.zero_grad()

        local_train_loss_history.append(loss.item())
        local_train_acc_history.append(
            accuracy_score(
                y_batch.to('cpu').detach().numpy(),
                y_predicted.to('cpu').detach().numpy().argmax(axis=1)
            )
        )

        if i % eval_every == 0:
        	# 计算平均损失和平均准确率
            train_loss_history.append(np.mean(local_train_loss_history))
            train_acc_history.append(np.mean(local_train_acc_history))
            local_train_loss_history, local_train_acc_history = [], []

            predictions_val = model(X_val_torch) # 对验证集进行预测
            val_loss_history.append(loss_function(predictions_val, y_val_torch).to('cpu').detach().item())

            acc_score_val = accuracy_score(y_val_torch.cpu().numpy(), predictions_val.to('cpu').detach().numpy().argmax(axis=1))
            val_acc_history.append(acc_score_val)

            if show_plots:
                display.clear_output(wait=True)
                plot_train_process(train_loss_history, val_loss_history, train_acc_history, val_acc_history)
    return model

让我们来训练模型:

bow_nn_model = train_model(model, opt, X_train_bow_torch, y_train_torch, X_test_bow_torch, y_test_torch, n_iterations=3000)

输出:
在这里插入图片描述
查看是否符合要求:

# __________start of block__________
out_dict = visualize_and_save_results(bow_nn_model, 'bow_nn_torch', X_train_bow_torch, X_test_bow_torch, y_train, y_test, out_dict)

assert out_dict['bow_log_reg_sklearn_test'] - out_dict['bow_nn_torch_test'] < 0.01, 'AUC ROC on test data should be close to the sklearn implementation'
# 测试数据上的 AUC ROC 应该接近 sklearn 实现
# __________end of block__________

输出:
在这里插入图片描述
现在重复上述训练过程,但使用不同的“k”值——字典大小。在结果列表中,保存使用大小为“k”的词汇训练的模型的测试子集上的“AUC ROC”。

# 从 100 开始,到 5800 结束(不包含 5800),步长为 700 的整数序列
vocab_sizes_list = np.arange(100, 5800, 700)
results = []

for k in vocab_sizes_list:
    # your code here
    counts = Counter(' '.join(texts_train).split())
    bow_vocabulary = [key for key, val in counts.most_common(k)]
    def new_text_to_bow(text):
        sent_vec = np.zeros(len(bow_vocabulary))
        counts = Counter(text.split())
        for i, token in enumerate(bow_vocabulary):
            if token in counts:
                sent_vec[i] = counts[token]
        return np.array(sent_vec, 'float32')
    X_train_bow = np.stack(list(map(new_text_to_bow, texts_train)))
    X_test_bow = np.stack(list(map(new_text_to_bow, texts_test)))
    X_train_bow_torch = torch.tensor(X_train_bow, dtype=torch.float32)
    X_test_bow_torch = torch.tensor(X_test_bow, dtype=torch.float32)
    model = nn.Sequential(
        nn.Linear(X_train_bow.shape[1], 2),
        nn.Softmax(dim=1)
    )
    loss_function = nn.CrossEntropyLoss()
    opt = torch.optim.Adam(model.parameters(), lr=1e-3)
    model = train_model(model, opt, X_train_bow_torch, y_train_torch, X_test_bow_torch, y_test_torch,
                        n_iterations=3000, show_plots=False)
    
    # 在上下文管理器中禁用梯度计算,因为在测试阶段不需要计算梯度,这样可以节省内存和计算资源
    with torch.no_grad():
        predictions = model(X_test_bow_torch)
        predicted_probas_on_test_for_k_sized_dict = predictions[:, 1].cpu().numpy()
    assert predicted_probas_on_test_for_k_sized_dict is not None
    auc = roc_auc_score(y_test, predicted_probas_on_test_for_k_sized_dict)
    results.append(auc)
    # ---------------------------------
    # predicted_probas_on_test_for_k_sized_dict = None
    # assert predicted_probas_on_test_for_k_sized_dict is not None
    # auc = roc_auc_score(y_test, predicted_probas_on_test_for_k_sized_dict)
    # results.append(auc)

查看是否复合要求,输出结果:

# __________start of block__________
assert len(results) == len(vocab_sizes_list), 'Check the code above'
assert min(results) >= 0.65, 'Seems like the model is not trained well enough'
assert results[-1] > 0.84, 'Best AUC ROC should not be lower than 0.84'

plt.plot(vocab_sizes_list, results)
plt.xlabel('num of tokens')
plt.ylabel('AUC')
plt.grid()

out_dict['bow_k_vary'] = results
# __________end of block__________

输出:
在这里插入图片描述

任务2. 使用 TF-iDF 特征

TF-IDF(Term Frequency-Inverse Document Frequency,词频 - 逆文档频率)

您还可以使用 TF-iDF 对文本进行矢量化。这使得我们在评估文本差异时可以排除许多没有显著影响的词语。

课程6中已经提到了如何使用这种方法对单个标记进行矢量化。因此,文本的向量表示将是字典大小的向量,其中数字为 N 的分量将指示给定文本中索引为 N 的标记的 TF-IDF 值。

您可以阅读有关 TF-iDF 的更多信息,例如此处

您的任务:使用 TF-iDF(或 sklearn 中的 TfidfVectorizer,或自己实现)对文本进行矢量化,并使用 PyTorch 构建分类器,类似于任务1。

然后还使用 AUC ROC 评估不同词汇量大小的分类性能。

分类质量应不低于0.86 AUC ROC。

from sklearn.feature_extraction.text import TfidfVectorizer
# 创建 TfidfVectorizer 对象
# 对输入的文本进行预处理(如转换为小写、去除停用词等),并计算每个单词的 TF-IDF 值。
tfidf_vectorizer = TfidfVectorizer()

# 将 TF-IDF 应用于文本数据
X_train_tfidf = tfidf_vectorizer.fit_transform(texts_train).toarray()
X_test_tfidf = tfidf_vectorizer.transform(texts_test).toarray()

# 将 numpy 数组转换为 torch 张量
X_train_tfidf_torch = torch.tensor(X_train_tfidf, dtype=torch.float32)
X_test_tfidf_torch = torch.tensor(X_test_tfidf, dtype=torch.float32)
y_train_torch = torch.tensor(y_train, dtype=torch.long)
y_test_torch = torch.tensor(y_test, dtype=torch.long)

特别注意:

  • tfidf_vectorizer.fit_transform(texts_train):对训练集 texts_train 进行拟合(fit)和转换(transform)操作。拟合操作会学习训练集中的词汇表和每个单词的统计信息,转换操作会将文本数据转换为 TF-IDF 特征矩阵。toarray() 方法将结果转换为 NumPy 数组,存储在 X_train_tfidf 中。
  • tfidf_vectorizer.transform(texts_test):对测试集 texts_test 进行转换操作,使用在训练集上学习到的词汇表和统计信息将测试集文本转换为 TF-IDF 特征矩阵,同样使用 toarray() 方法转换为 NumPy 数组,存储在 X_test_tfidf 中。
model = nn.Sequential(
    nn.Linear(X_train_tfidf.shape[1], 2),
    nn.Softmax(dim=1)
)
loss_function = nn.CrossEntropyLoss() 
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

model_tf_idf = train_model(model, opt, X_train_tfidf_torch, y_train_torch, X_test_tfidf_torch, y_test_torch, n_iterations=3000)

输出:
在这里插入图片描述
检查我们是否复合要求:

# __________start of block__________
out_dict = visualize_and_save_results(model_tf_idf, 'tf_idf_nn_torch', X_train_tfidf_torch, X_test_tfidf_torch, y_train, y_test, out_dict)

assert out_dict['tf_idf_nn_torch_test'] >= out_dict['bow_nn_torch_test'], 'AUC ROC on test data should be better or close to BoW for TF-iDF features'
# __________end of block__________

输出:
在这里插入图片描述
与任务1类似,对不同的k值(字典大小)重复训练过程,并将样本测试部分上的AUC ROC保存在results列表中。

vocab_sizes_list = np.arange(100, 5800, 700)
results = []

for k in vocab_sizes_list:
    # 使用 TF-IDF 对文本进行矢量化,只保留最常见的 k 个词
    vectorizer = TfidfVectorizer(max_features=k)
    X_train_tfidf = vectorizer.fit_transform(texts_train).toarray()
    X_test_tfidf = vectorizer.transform(texts_test).toarray()

    # 将 numpy 数组转换为 torch 张量
    X_train_tfidf_torch = torch.tensor(X_train_tfidf, dtype=torch.float32)
    X_test_tfidf_torch = torch.tensor(X_test_tfidf, dtype=torch.float32)
    y_train_torch = torch.tensor(y_train, dtype=torch.long)
    y_test_torch = torch.tensor(y_test, dtype=torch.long)

    # 定义模型
    model = nn.Sequential(
        nn.Linear(X_train_tfidf.shape[1], 2),
        nn.Softmax(dim=1)
    )

    # 定义损失函数
    loss_function = nn.CrossEntropyLoss()

    # 定义优化器
    opt = torch.optim.Adam(model.parameters(), lr=1e-3)

    # 训练模型
    model = train_model(model, opt, X_train_tfidf_torch, y_train_torch, X_test_tfidf_torch, y_test_torch,
                        n_iterations=3000, show_plots=False)

    # 在测试集上进行预测
    with torch.no_grad():
        predictions = model(X_test_tfidf_torch)
        predicted_probas_on_test_for_k_sized_dict = predictions[:, 1].cpu().numpy()

    assert predicted_probas_on_test_for_k_sized_dict is not None
    auc = roc_auc_score(y_test, predicted_probas_on_test_for_k_sized_dict)
    results.append(auc)

检查我们是否符合要求:

# __________start of block__________
assert len(results) == len(vocab_sizes_list), 'Check the code above'
assert min(results) >= 0.65, 'Seems like the model is not trained well enough'
assert results[-1] > 0.85, 'Best AUC ROC for TF-iDF should not be lower than 0.84'

plt.plot(vocab_sizes_list, results)
plt.xlabel('num of tokens')
plt.ylabel('AUC')
plt.grid()

out_dict['tf_idf_k_vary'] = results
# __________end of block__________

输出:
在这里插入图片描述

任务3. 与朴素贝叶斯分类器进行比较

经典模型在很多任务中仍然能够展现良好的效果。在用 BoW 和 TF-iDF 矢量化的文本上训练朴素贝叶斯分类器,并将结果与​​上述模型进行比较。

注释:请注意,需要为适合给定问题的特征选择先验分布,即从sklearn中选择正确的分类器版本:GaussianNBMultinomialNBComplementNBBernoulliNBCategoricalNB

from sklearn.naive_bayes import MultinomialNB
clf_nb_bow = MultinomialNB()
clf_nb_bow.fit(X_train_bow, y_train)

# __________start of block__________
out_dict = visualize_and_save_results(clf_nb_bow, 'bow_nb_sklearn', X_train_bow, X_test_bow, y_train, y_test, out_dict)
# __________end of block__________

输出:
在这里插入图片描述

特别注意:

  • clf_nb_bow = MultinomialNB():创建一个 MultinomialNB 类的实例 clf_nb_bow,即初始化一个多项式朴素贝叶斯分类器,使用默认参数。多项式朴素贝叶斯适用于处理离散特征数据,如文本数据的词袋表示。
  • clf_nb_bow.fit(X_train_bow, y_train):使用训练集的词袋特征矩阵 X_train_bow 和对应的标签 y_train 对朴素贝叶斯分类器 clf_nb_bow 进行训练。在训练过程中,分类器会学习每个特征(单词)与不同类别的概率关系。
clf_nb_tfidf = MultinomialNB()
clf_nb_tfidf.fit(X_train_tfidf, y_train)

# 不要更改下面块中的代码
# __________start of block__________
out_dict = visualize_and_save_results(clf_nb_tfidf, 'tf_idf_nb_sklearn', X_train_tfidf, X_test_tfidf, y_train, y_test, out_dict)
# __________end of block__________

输出:
在这里插入图片描述
我们能发现效果更好了些
检查是否符合要求:

# __________start of block__________
assert out_dict['tf_idf_nb_sklearn_test'] > out_dict['bow_nb_sklearn_test'],' TF-iDF results should be better'
assert out_dict['tf_idf_nb_sklearn_test'] > 0.86, 'TF-iDF Naive Bayes score should be above 0.86'
# __________end of block__________

任务4. 使用预先训练的嵌入

最后,我们将使用来自“gensim”库的预训练嵌入。它带有针对不同文本语料库进行预先训练的几个嵌入。完整列表可在此处找到。我们提醒您,最好使用那些在具有类似结构的文本上训练的嵌入。

您的任务:使用评论中所有标记的平均嵌入来训练模型(逻辑回归或两层神经网络就足够了),实现不比使用 BoW/TF-iDF 更差的质量并降低过度拟合的程度(训练和测试样本上的 AUC ROC 之间的差异)。

请注意!
Colab/其它云端服务器编程的环境,可能安装了过时版本的 gensim 库,因此其某些更新功能可能不受支持。
例如,可能会发生 AttributeError。
在这种情况下,您需要取消注释下面的行,执行此单元,然后重新执行所有后续单元:

!pip install --upgrade gensim

并且要求numpy版本符合要求,我们可以进行:

!pip install numpy==1.25

按照要求会进行重新连接响应(注意不是中断重启),即可使用

import gensim.downloader as api
gensim_embedding_model = api.load('word2vec-google-news-300')
# 这里我用的是基于 Google News 数据集训练的 Word2Vec 模型,每个词向量的维度为 300。
def text_to_average_embedding(text, gensim_embedding_model):
    tokens = text.split()
    valid_vectors = []
    for token in tokens:
        if token in gensim_embedding_model:
            valid_vectors.append(gensim_embedding_model[token])
    if len(valid_vectors) == 0:
        # 如果文本中没有有效的词向量,返回全零向量
        return np.zeros(gensim_embedding_model.vector_size)
    # 计算有效词向量的平均值
    embedding_for_text = np.mean(valid_vectors, axis=0)
    return embedding_for_text

这块详细解释一下这个文本向量化函数:

  • 定义了一个名为 text_to_average_embedding 的函数,该函数接受两个参数:text(要转换的文本)和 gensim_embedding_model(预训练的词向量模型)。
  • tokens = text.split():将输入的文本按空格分割成单词列表 tokens。
  • 通过遍历 tokens 中的每个单词 token,检查该单词是否在预训练模型gensim_embedding_model 中。如果在,则将其对应的词向量添加到 valid_vectors 列表中。
  • 如果 valid_vectors 列表为空,说明文本中的单词在预训练模型中都不存在,此时返回一个全零向量,其长度为预训练模型的向量维度(gensim_embedding_model.vector_size)。
  • 如果 valid_vectors 列表不为空,则计算这些有效词向量的平均值,得到该文本的向量表示 embedding_for_text,并返回。
X_train_emb = [text_to_average_embedding(text, gensim_embedding_model) for text in texts_train]
X_test_emb = [text_to_average_embedding(text, gensim_embedding_model) for text in texts_test]

assert len(X_train_emb[0]) == gensim_embedding_model.vector_size, 'Seems like the embedding shape is wrong'

转换向量:

X_train_emb_torch = torch.tensor(np.array(X_train_emb), dtype=torch.float32)
X_test_emb_torch = torch.tensor(np.array(X_test_emb), dtype=torch.float32)

y_train_torch = torch.tensor(y_train, dtype=torch.long)
y_test_torch = torch.tensor(y_test, dtype=torch.long)

模型:

model = nn.Sequential(
    nn.Linear(X_train_emb_torch.shape[1], 2),
    nn.Softmax(dim=1)
)
loss_function = nn.CrossEntropyLoss()
opt = torch.optim.Adam(model.parameters(), lr=0.001)

model = train_model(model, opt, X_train_emb_torch, y_train_torch, X_test_emb_torch, y_test_torch, n_iterations=3000)

输出:
在这里插入图片描述
检查结果是否符合:

# __________start of block__________

out_dict = visualize_and_save_results(model, 'emb_nn_torch', X_train_emb_torch, X_test_emb_torch, y_train, y_test, out_dict)
assert out_dict['emb_nn_torch_test'] > 0.87, 'AUC ROC on test data should be better than 0.87'
assert out_dict['emb_nn_torch_train'] - out_dict['emb_nn_torch_test'] < 0.1, 'AUC ROC on test and train data should not be different more than by 0.1'
# __________end of block__________

在这里插入图片描述

输出我们的模型

这会将之前所有的任务模型都保存在npy里:

# __________start of block__________

np.save('submission_dict_hw06.npy', out_dict, allow_pickle=True)
print('File saved to `submission_dict_hw06.npy`')
# __________end of block__________

注意:
下面简要说明一下这些测试以及无法通过的原因:

  1. test_tfidf_on_test - 将 TF-IDF 应用于测试数据集时,测试样本上的 ROC-AUC 值不够高(参见条件)。

  2. test_tfidf_nb_first - 可能为朴素贝叶斯分类器选择了错误的分布,BoW 的质量优于 TF-IDF。

  3. test_tfidf_nb_second - 可能为朴素贝叶斯分类器选择了错误的分布:测试样本的质量低于要求(参见条件)。

  4. test_embedding_on_test — 对于基于嵌入的模型来说质量不够高。

  5. test_embedding_overfit——使用嵌入的模型很可能过度拟合。

补充内容

什么是ROC曲线

请看视频:

  1. ROC曲线详解
  2. 模型评估: ROC曲线与AUC值

网站公告

今日签到

点亮在社区的每一天
去签到