14 - 大语言模型 — 抽取式问答系统 “成长记”:靠 BERT 学本事,从文本里精准 “揪” 答案的全过程(呆瓜版-1号)

发布于:2025-07-30 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

1、什么是问答系统?

2、问答系统的核心工作流程

2.1、理解问题:把问题 “翻译” 成机器能懂的形式

2.2、 寻找答案:从信息中定位答案

2.3、生成答案:整理并输出结果

2.4、优化迭代:让系统更 “聪明”

3、主流技术:让系统 “变聪明” 的关键

4、问答系统的常见类型

5、现实挑战:问答系统还需要突破什么?

6、问答系统的应用场景

7、英文版回答系统(显示上下文)

8、中文版回答系统(显示上下文)

9、中文版回答系统(隐藏上下文)

10、中英文版回答系统(隐藏上下文)


1、什么是问答系统?

简单来说,问答系统是一种能 “听懂” 人类问题,并给出准确答案的智能系统。比如我们平时用的智能助手(如 Siri、小爱同学),输入 “今天天气怎么样?” 就能得到答案,这就是最常见的问答系统应用。

从技术角度看,它的核心任务是:接收自然语言问题,结合已有信息(如上下文、知识库),返回简洁准确的答案

2、问答系统的核心工作流程

一个完整的问答系统,无论复杂与否,都离不开以下 4 个关键步骤:

2.1、理解问题:把问题 “翻译” 成机器能懂的形式

人类用自然语言提问(比如 “法国的首都是什么?”),机器首先需要理解问题的核心含义:

  • 识别关键词:提取问题中的关键信息(如 “法国”“首都”)。
  • 判断问题类型:是问事实(“是什么”)、原因(“为什么”),还是方法(“怎么做”)?不同类型的问题,寻找答案的方式不同。
  • 转换为机器语言:通过技术手段(如词向量、预训练模型)将文字转为机器能处理的数字形式(向量)。

比如 “法国的首都是什么?”,机器会识别出 “法国” 是主体,“首都” 是要查询的属性,核心是寻找 “法国” 对应的 “首都” 名称。

2.2、 寻找答案:从信息中定位答案

理解问题后,系统需要从 “信息源” 中找答案。常见的信息源有两种:

  • 给定的上下文:比如阅读一篇文章后回答相关问题(如考试中的阅读理解),答案一定在文章里。
  • 外部知识库:比如回答 “地球自转一周需要多久?”,系统需要调用已有的知识储备(如百科全书、数据库)。

以 “从上下文找答案” 为例,机器需要完成:

  • 定位相关段落:在上下文里筛选出可能包含答案的部分(比如问题问 “法国首都”,就重点看提到 “法国” 的段落)。
  • 精确匹配答案:在相关段落中,找到与问题关键词匹配的内容(比如 “巴黎是法国的首都”,这里 “巴黎” 就是答案)。

2.3、生成答案:整理并输出结果

找到答案后,系统需要把它整理成人类能理解的自然语言:

  • 提取核心信息:如果答案藏在长句子里(如 “法国的首都是巴黎,它是欧洲的重要城市”),需要提炼出 “巴黎” 这个核心答案。
  • 确保答案准确:检查答案是否完全匹配问题(比如问题问 “首都”,就不能返回 “法国的货币是欧元”)。
  • 格式化输出:用简洁的语言呈现(比如直接说 “巴黎”,而不是重复整句话)。

2.4、优化迭代:让系统更 “聪明”

问答系统不是一成不变的,需要通过不断优化提升效果:

  • 学习新数据:用更多的问答样本(如 “问题 - 答案 - 上下文” 组合)训练系统,让它见过更多场景。
  • 修正错误:如果系统答错了(比如把 “法国首都” 说成 “伦敦”),通过人工标注或算法调整纠正错误。
  • 适应复杂场景:逐步处理更难的问题(如需要推理的问题 “小明有 3 个苹果,吃了 1 个,又买了 2 个,现在有几个?”)。

3、主流技术:让系统 “变聪明” 的关键

目前,问答系统的核心技术依赖自然语言处理(NLP) 和机器学习,其中最具代表性的是 “预训练语言模型”(如 BERT、GPT 等):

  • BERT 模型:擅长从上下文中找答案。它能像人类阅读一样,理解句子中每个词的含义,以及词与词之间的关系(比如 “它” 指的是前文的 “巴黎”)。因此,在 “给定上下文找答案” 的场景中表现突出。
  • GPT 模型:擅长生成答案。它不仅能理解问题,还能结合自己 “学过” 的海量知识,生成全新的答案(即使上下文里没有直接提到),更接近人类的 “思考” 过程。

4、问答系统的常见类型

根据应用场景和技术特点,问答系统可以分为几类:

  1. 抽取式问答:答案一定来自给定的上下文(如阅读理解),系统的任务是 “抽取” 出其中的片段。
  2. 生成式问答:答案可以是全新的句子(不一定来自上下文),系统需要 “创造” 答案(如智能助手回答常识问题)。
  3. 知识库问答:专门从结构化的知识库(如百科数据库)中查询答案,适合精准的事实性问题(如 “李白是哪一年出生的?”)。
  4. 对话式问答:能进行多轮对话(比如 “明天会下雨吗?”→“会下小雨。”→“那需要带伞吗?”→“需要。”),需要记住之前的对话内容。

5、现实挑战:问答系统还需要突破什么?

尽管现在的问答系统已经很实用,但仍有不少难题:

  • 复杂问题推理:对于需要多步思考的问题(如 “小明的妈妈是小红的姑姑,小红和小明是什么关系?”),系统往往难以处理。
  • 歧义问题理解:同一个问题可能有多种含义(如 “苹果多少钱?” 可能指水果,也可能指手机),系统需要结合语境判断。
  • 常识与经验依赖:很多问题需要生活常识(如 “为什么天热会出汗?”),而机器缺乏人类的生活经验,容易答错。
  • 对抗性问题干扰:遇到故意设计的 “陷阱问题”(如语法混乱、包含错误信息的问题),系统可能被误导。

6、问答系统的应用场景

除了我们熟悉的智能助手,问答系统还广泛应用在:

  • 客服领域:自动回答用户关于产品的常见问题(如 “退货流程是什么?”),减少人工成本。
  • 教育领域:作为 “智能辅导老师”,解答学生的学科问题(如 “数学公式怎么推导?”)。
  • 医疗领域:辅助医生查询医学知识(如 “这个症状可能是什么病?”),但需严格验证准确性。
  • 搜索引擎:比如百度、谷歌的 “直接回答” 功能,在搜索结果顶部直接显示问题答案(如搜索 “地球半径”,结果顶部会直接给出数字)。

7、英文版回答系统(显示上下文)

# 导入PyTorch深度学习框架,用于张量运算和模型训练
import torch
# 导入datasets库,用于加载和处理数据集;Dataset类用于构建自定义数据集
from datasets import load_dataset, Dataset
# 导入os库用于文件路径操作,shutil库用于文件删除/移动等操作
import os
import shutil
# 从transformers库导入所需组件:
# BertForQuestionAnswering:用于问答任务的BERT模型
# BertTokenizerFast:快速BERT分词器(支持offset_mapping等功能)
# TrainingArguments:训练参数配置类
# Trainer:模型训练器(封装了训练循环)
# DataCollatorWithPadding:用于批量数据填充的工具
from transformers import (
    BertForQuestionAnswering,
    BertTokenizerFast,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding
)

# 设置环境变量:解决protobuf库的潜在兼容性问题(部分环境下可能出现导入错误)
os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"

# 检查是否有可用GPU,优先使用GPU加速训练/推理,否则使用CPU
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")


# 1. 加载本地JSON格式的SQuAD风格数据集,并展开嵌套结构为单个问答样本
def load_and_expand_squad(json_path):
    """
    加载SQuAD格式的JSON数据集,并将嵌套结构展开为扁平的问答样本列表
    参数:
        json_path: 本地JSON文件路径(SQuAD 1.1/2.0格式)
    返回:
        Dataset对象:包含"question"(问题)、"context"(上下文)、"answers"(答案)的样本集合
    """
    # 加载原始JSON数据(SQuAD格式的JSON通常包含"data"字段,内嵌套篇章、段落、问答对)
    try:
        # 使用load_dataset加载JSON文件,指定field="data"提取数据核心字段,取"train"拆分(实际可根据文件内容调整)
        raw_dataset = load_dataset("json", data_files=json_path, field="data")["train"]
    except Exception as e:
        print(f"加载数据集失败: {e}")
        print(f"请确保 {json_path} 文件存在且格式正确(SQuAD风格)")
        exit(1)  # 加载失败则退出程序

    # 展开嵌套结构:SQuAD格式为 篇章(article)→段落(paragraphs)→问答对(qas),需展开为单个样本
    expanded = []  # 存储展开后的样本
    invalid_count = 0  # 统计无效样本数量

    for article in raw_dataset:  # 遍历每个篇章
        try:
            for paragraph in article["paragraphs"]:  # 遍历篇章中的每个段落
                context = paragraph["context"]  # 提取段落文本作为上下文
                for qa in paragraph["qas"]:  # 遍历段落中的每个问答对
                    # 构建单个样本:包含问题、上下文、答案(确保答案格式为{"text": [答案文本], "answer_start": [起始位置]})
                    expanded.append({
                        "question": qa["question"],  # 问题文本
                        "context": context,  # 上下文文本
                        "answers": {
                            # 取第一个答案(SQuAD可能有多个答案,此处简化为单答案)
                            "text": [qa["answers"][0]["text"]],
                            "answer_start": [qa["answers"][0]["answer_start"]]
                        }
                    })
        except Exception as e:
            print(f"解析样本时出错: {e},跳过该样本")
            invalid_count += 1
            continue  # 跳过无效样本

    print(f"加载完成,有效样本数: {len(expanded)},无效样本数: {invalid_count}")
    return Dataset.from_list(expanded)  # 转换为Dataset对象返回


# 2. 加载并准备训练集和验证集
print("加载训练集...")
# 加载训练集(假设本地有SQuAD格式的train-v2.0.json)
dataset = load_and_expand_squad("train-v2.0.json")
print("加载验证集...")
# 加载验证集(假设本地有SQuAD格式的dev-v2.0.json)
val_dataset = load_and_expand_squad("dev-v2.0.json")

# 取1%样本用于快速测试(仅为调试用,实际训练可注释掉以使用全量数据)
# train_test_split(test_size=0.01)表示取1%数据作为测试集(此处用作快速训练样本)
dataset = dataset.train_test_split(test_size=0.01)["test"]
val_dataset = val_dataset.train_test_split(test_size=0.01)["test"]

print(f"训练集大小: {len(dataset)},验证集大小: {len(val_dataset)}")


# 3. 加载预训练模型和分词器(基于BERT-base-uncased)
model_name = "bert-base-uncased"  # 模型名称:BERT-base无大小写区分版本(适合英文任务)
local_model_dir = "./bert-base-uncased"  # 本地缓存模型的路径

# 清理旧缓存(可选,仅在需要强制重新下载模型时启用)
# if os.path.exists(local_model_dir):
#     print(f"清理模型缓存: {local_model_dir}")
#     shutil.rmtree(local_model_dir, ignore_errors=True)  # 删除旧缓存目录

# 加载分词器和模型
try:
    # 加载快速分词器(BertTokenizerFast支持return_offsets_mapping等功能,是QA任务必需的)
    tokenizer = BertTokenizerFast.from_pretrained(
        model_name,
        cache_dir=local_model_dir,  # 缓存到本地目录,避免重复下载
        use_fast=True  # 强制使用快速分词器
    )
    # 加载用于问答任务的BERT模型(输出start_logits和end_logits,用于预测答案位置)
    model = BertForQuestionAnswering.from_pretrained(
        model_name,
        cache_dir=local_model_dir
    ).to(device)  # 将模型移动到指定设备(GPU/CPU)
except Exception as e:
    print(f"加载模型失败: {e}")
    print(f"请确保 {local_model_dir} 目录存在或网络连接正常(首次运行需下载模型)")
    exit(1)

# 验证分词器类型(快速分词器是QA任务必需的,否则无法获取offset_mapping)
print(f"加载的分词器类型: {type(tokenizer).__name__}")
if type(tokenizer).__name__ != "BertTokenizerFast":
    print("警告: 当前使用的不是快速分词器,可能导致性能问题或功能缺失(QA任务需要快速分词器)")


# 4. 数据预处理函数(将文本转换为模型可接受的输入格式)
def preprocess_function(examples):
    """
    预处理函数:将原始文本(问题、上下文、答案)转换为模型输入格式
    参数:
        examples: 包含"question"、"context"、"answers"的批量样本
    返回:
        处理后的字典:包含input_ids、attention_mask、start_positions、end_positions
    """
    # 编码问题和上下文:使用分词器将文本转换为token ID
    inputs = tokenizer(
        examples["question"],  # 问题文本(列表)
        examples["context"],  # 上下文文本(列表)
        max_length=512,  # 最大序列长度(BERT-base支持最长512token)
        truncation="only_second",  # 仅截断上下文(question在前,context在后,优先保留问题)
        return_offsets_mapping=True,  # 返回offset_mapping(token与原始文本字符的映射关系,用于定位答案)
        padding="max_length"  # 填充到max_length长度
    )

    # 提取offset_mapping(每个token对应的原始文本起始/结束字符位置)
    offset_mapping = inputs.pop("offset_mapping")  # 移除offset_mapping(不输入模型,仅用于计算答案位置)
    start_positions = []  # 存储每个样本的答案起始token位置
    end_positions = []  # 存储每个样本的答案结束token位置

    # 遍历每个样本的offset_mapping,计算答案的start和end token位置
    for i, offsets in enumerate(offset_mapping):
        answer = examples["answers"][i]  # 当前样本的答案

        # 处理无答案情况(SQuAD 2.0支持无答案样本,用(0,0)表示)
        if len(answer["text"]) == 0:
            start_positions.append(0)  # 无答案时起始位置设为0([CLS] token)
            end_positions.append(0)
            continue

        # 获取答案在原始上下文的字符位置(start_char:起始字符索引;end_char:结束字符索引)
        start_char = answer["answer_start"][0]  # 答案起始字符位置
        end_char = start_char + len(answer["text"][0])  # 答案结束字符位置(起始+长度)

        # 定位上下文在token序列中的范围(通过sequence_ids区分问题和上下文)
        # sequence_ids:每个token对应的序列类型(0=问题,1=上下文,None=特殊token如[CLS]、[SEP])
        sequence_ids = inputs.sequence_ids(i)  # 获取第i个样本的sequence_ids
        # 找到上下文的起始token索引(第一个sequence_id=1的位置)
        context_start = 0
        while sequence_ids[context_start] != 1:
            context_start += 1
        # 找到上下文的结束token索引(最后一个sequence_id=1的位置)
        context_end = len(sequence_ids) - 1
        while sequence_ids[context_end] != 1:
            context_end -= 1

        # 查找答案在token序列中的起始和结束位置
        start_token = None  # 答案起始token索引
        end_token = None    # 答案结束token索引

        # 检查答案是否超出上下文范围(若答案不在上下文中,视为无答案)
        if offsets[context_start][0] > end_char or offsets[context_end][1] < start_char:
            start_positions.append(0)
            end_positions.append(0)
        else:
            # 寻找答案的起始token:找到第一个包含start_char的token
            for idx in range(context_start, context_end + 1):
                token_start, token_end = offsets[idx]  # 当前token的字符范围
                if token_start <= start_char <= token_end:
                    start_token = idx  # 记录起始token索引
                    break

            # 寻找答案的结束token:找到最后一个包含end_char的token
            for idx in range(context_end, context_start - 1, -1):
                token_start, token_end = offsets[idx]  # 当前token的字符范围
                if token_start <= end_char <= token_end:
                    end_token = idx  # 记录结束token索引
                    break

            # 若找不到答案位置(如tokenization导致的偏移),设为[CLS] token(0)
            start_positions.append(start_token if start_token is not None else 0)
            end_positions.append(end_token if end_token is not None else 0)

    # 将计算好的start和end位置添加到输入中(作为模型训练的标签)
    inputs["start_positions"] = start_positions
    inputs["end_positions"] = end_positions
    return inputs


# 5. 应用预处理函数到训练集和验证集
print("预处理训练数据...")
# 对训练集应用预处理:batched=True表示批量处理,remove_columns移除原始文本列(模型不需要)
tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=dataset.column_names)
print("预处理验证数据...")
# 对验证集应用同样的预处理
tokenized_val_dataset = val_dataset.map(preprocess_function, batched=True, remove_columns=val_dataset.column_names)

# 将数据集转换为PyTorch张量格式(模型需要张量输入)
# 只保留模型需要的列:input_ids(token ID)、attention_mask(掩码,区分有效token和填充)、start/end_positions(标签)
tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask", "start_positions", "end_positions"])
tokenized_val_dataset.set_format("torch", columns=["input_ids", "attention_mask", "start_positions", "end_positions"])


# 6. 配置训练参数
training_args = TrainingArguments(
    output_dir="./qa_model",  # 模型和日志输出目录
    evaluation_strategy="epoch",  # 每轮结束后进行验证
    learning_rate=3e-5,  # 学习率(BERT微调常用3e-5)
    per_device_train_batch_size=4,  # 每个设备的训练批大小(GPU内存小则设小些)
    per_device_eval_batch_size=4,   # 每个设备的验证批大小
    num_train_epochs=2,  # 训练轮次(小样本快速测试用2轮,实际训练可增至3-5轮)
    weight_decay=0.01,  # 权重衰减(L2正则化,防止过拟合)
    logging_dir="./logs",  # 日志目录(可通过tensorboard查看)
    logging_steps=10,  # 每10步记录一次日志
    save_strategy="epoch",  # 每轮结束后保存模型
    fp16=True if device == "cuda" else False,  # 若使用GPU,启用混合精度训练(加速训练并减少内存占用)
)

# 数据整理器:用于批量处理时自动填充(确保同批次样本长度一致)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)


# 7. 初始化Trainer并开始训练
print("开始训练模型...")
trainer = Trainer(
    model=model,  # 待训练的模型
    args=training_args,  # 训练参数
    train_dataset=tokenized_dataset,  # 训练集
    eval_dataset=tokenized_val_dataset,  # 验证集
    tokenizer=tokenizer,  # 分词器(用于保存模型时一同保存)
    data_collator=data_collator,  # 数据整理器
)

# 开始训练(封装了完整的训练循环:前向传播、损失计算、反向传播、参数更新)
trainer.train()

# 训练完成后保存模型(Trainer会自动保存,这里再次确认)
print(f"训练完成,模型已保存到: {training_args.output_dir}")
trainer.save_model(training_args.output_dir)  # 手动保存模型(包含配置、分词器、权重)


# 8. 答案生成函数(使用训练好的模型进行问答推理)
def generate_answer(question, context):
    """
    生成答案:根据问题和上下文,使用模型预测答案
    参数:
        question: 输入的问题文本
        context: 相关的上下文文本
    返回:
        模型预测的答案文本
    """
    # 确保模型在正确的设备上,并设置为评估模式(关闭dropout等训练特有的层)
    model.to(device)
    model.eval()

    # 编码输入:将问题和上下文转换为模型输入格式(返回张量)
    inputs = tokenizer(
        question,  # 单个问题
        context,   # 单个上下文
        max_length=512,
        truncation="only_second",  # 仅截断上下文
        return_tensors="pt",  # 返回PyTorch张量
        padding="max_length"  # 填充到max_length
    ).to(device)  # 移动到指定设备

    # 模型推理(关闭梯度计算,节省内存)
    with torch.no_grad():
        outputs = model(** inputs)  # 输入解包(input_ids、attention_mask等)

    # 从输出中获取预测的答案起始和结束token索引(取概率最大的位置)
    start_idx = torch.argmax(outputs.start_logits).item()  # 起始token索引
    end_idx = torch.argmax(outputs.end_logits).item()      # 结束token索引

    # 将token ID转换为原始token(用于拼接答案)
    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])  # 取第一个样本(仅输入一个样本)
    answer_tokens = tokens[start_idx:end_idx + 1]  # 截取答案对应的token序列

    #答案对应的token序列

    # 处理无答案情况(若start和end都为0,对应[CLS] token,视为无答案)
    if start_idx == 0 and end_idx == 0:
        return "No answer found"  # 无答案时返回提示

    # 将token转换为文本,并清理特殊token(如[CLS]、[SEP])
    answer = tokenizer.convert_tokens_to_string(answer_tokens)
    # 移除可能残留的特殊token,并strip去空格
    return answer.replace("[CLS]", "").replace("[SEP]", "").strip()


# 9. 交互式问答测试(方便用户手动输入问题和上下文进行测试)
if __name__ == "__main__":
    print("\n=== 问答测试 ===")

    # 示例问题(用于快速验证模型功能)
    test_questions = [
        {
            "question": "What is the capital of France?",  # 问题:法国的首都是什么?
            "context": "France is a country in Europe. Its capital is Paris, which is also a major cultural center."  # 上下文
        },
        {
            "question": "Who developed BERT?",  # 问题:谁开发了BERT?
            "context": "BERT is a language model developed by Google in 2018. It uses a Transformer architecture."  # 上下文
        }
    ]

    # 运行示例问题测试
    for i, test in enumerate(test_questions):
        print(f"\n问题 {i + 1}: {test['question']}")
        print(f"答案: {generate_answer(test['question'], test['context'])}")

    # 交互式问答(允许用户自定义输入)
    while True:
        print("\n=== 交互式问答 ===")
        question = input("请输入问题 (输入q退出): ")
        if question.lower() == "q":  # 输入q则退出
            break
        context = input("请输入相关上下文: ")
        if not context:  # 上下文不能为空
            print("上下文不能为空!")
            continue

        # 生成并打印答案
        answer = generate_answer(question, context)
        print(f"模型回答: {answer}")

8、中文版回答系统(显示上下文)

# 导入JSON模块用于处理JSON格式数据(数据集通常为JSON格式)
import json
# 导入os模块用于文件路径操作(创建目录、检查文件是否存在等)
import os
# 导入PyTorch深度学习框架,用于张量运算和模型训练
import torch
# 从torch.utils.data导入Dataset(自定义数据集基类)和DataLoader(数据加载器)
from torch.utils.data import Dataset, DataLoader
# 从transformers库导入中文问答任务所需组件:
# BertTokenizerFast:快速BERT分词器(支持offset_mapping,用于定位答案在原始文本中的位置)
# BertForQuestionAnswering:用于问答任务的BERT模型(输出start_logits和end_logits,预测答案起止位置)
# AdamW:适用于Transformer模型的优化器
# get_linear_schedule_with_warmup:学习率调度器(控制训练过程中学习率的变化)
from transformers import (
    BertTokenizerFast,
    BertForQuestionAnswering,
    AdamW,
    get_linear_schedule_with_warmup
)
# 导入tqdm用于显示训练进度条
from tqdm import tqdm


# 设置计算设备:优先使用GPU(cuda)加速训练/推理,若无则使用CPU
# 说明:GPU的并行计算能力可大幅缩短训练时间,尤其适合BERT等大型模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")


# 自定义数据集类:将预处理后的编码数据转换为PyTorch可识别的数据集格式
# 作用:统一数据接口,方便DataLoader按批次加载数据
class QADataset(Dataset):
    def __init__(self, encodings):
        # encodings:包含input_ids、attention_mask、start_positions、end_positions的字典
        self.encodings = encodings

    # 按索引获取单个样本(PyTorch数据集必需实现的方法)
    def __getitem__(self, idx):
        # 将每个字段转换为PyTorch张量(模型输入需为张量格式)
        return {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}

    # 返回数据集总样本数(PyTorch数据集必需实现的方法)
    def __len__(self):
        # 以input_ids的长度为准(所有字段长度相同)
        return len(self.encodings.input_ids)


# 加载并验证数据集:确保数据格式正确,为训练提供可靠输入
def load_existing_dataset(train_path='data/train_dataset.json', val_path='data/val_dataset.json'):
    print(f"加载数据集: {train_path} 和 {val_path}")

    # 内部函数:加载单个数据集并验证格式
    def validate_dataset(path):
        # 检查文件是否存在
        if not os.path.exists(path):
            raise FileNotFoundError(f"数据集文件不存在: {path}")

        # 加载JSON数据并解析
        with open(path, 'r', encoding='utf-8') as f:
            try:
                dataset = json.load(f)  # 加载为列表格式(每个元素是一个样本)
            except json.JSONDecodeError as e:
                raise ValueError(f"JSON解析错误: {path}, 错误: {e}")

        # 验证数据集是否为列表(JSON数组)
        if not isinstance(dataset, list):
            raise ValueError(f"数据集格式错误: {path} 应为JSON数组(样本列表)")

        # 验证数据集不为空
        if not dataset:
            raise ValueError(f"数据集为空: {path}(至少需要一个样本)")

        # 验证每个样本的结构是否符合要求
        required_fields = ['question', 'context', 'answers']  # 每个样本必须包含的字段
        for i, sample in enumerate(dataset):
            # 检查是否包含必要字段
            for field in required_fields:
                if field not in sample:
                    raise ValueError(f"样本 {i} 缺少必要字段: {field}(必须包含问题、上下文、答案)")

            # 验证answers字段的结构(必须是字典)
            answers = sample['answers']
            if not isinstance(answers, dict):
                raise TypeError(f"样本 {i} 的answers字段应为字典,实际为: {type(answers).__name__}")

            # 检查answers是否包含text和answer_start子字段(抽取式问答必需)
            if 'text' not in answers or 'answer_start' not in answers:
                raise ValueError(f"样本 {i} 的answers字段缺少必要子字段(需包含text和answer_start)")

            # 验证text和answer_start是否为列表(允许空列表表示无答案)
            if not isinstance(answers['text'], list) or not isinstance(answers['answer_start'], list):
                raise TypeError(f"样本 {i} 的answers.text或answers.answer_start应为列表(存储答案文本和起始位置)")

            # 关键校验:无答案样本需text和answer_start同时为空,有答案则同时非空
            if (not answers['text'] and answers['answer_start']) or (answers['text'] and not answers['answer_start']):
                raise ValueError(f"样本 {i} 的answers.text和answer_start需同时为空(无答案)或同时非空(有答案)")

            # 对有答案的样本,进一步验证answer_start的类型和位置
            if answers['answer_start']:  # 仅处理有答案的样本
                # 验证answer_start中的元素是否为整数(位置必须是整数索引)
                if not all(isinstance(pos, int) for pos in answers['answer_start']):
                    raise TypeError(f"样本 {i} 的answers.answer_start包含非整数类型(必须为字符起始位置)")

                # 验证答案文本与上下文的匹配性(确保答案确实在上下文中)
                context = sample['context']
                for j, (text, pos) in enumerate(zip(answers['text'], answers['answer_start'])):
                    if not isinstance(text, str):
                        raise TypeError(f"样本 {i} 的answers.text[{j}]不是字符串(答案必须是文本)")

                    # 检查答案起始位置是否超出上下文范围
                    if pos < 0 or pos >= len(context):
                        raise ValueError(f"样本 {i} 的answers.answer_start[{j}]={pos}超出上下文长度(上下文长度为{len(context)})")

                    # 检查答案文本是否与上下文对应位置的内容一致(防止标注错误)
                    if context[pos:pos + len(text)] != text:
                        print(f"警告: 样本 {i} 的答案文本与上下文不匹配")
                        print(f"  上下文: {context}")
                        print(f"  答案文本: '{text}', 起始位置: {pos}(上下文对应位置为'{context[pos:pos + len(text)]}')")

        print(f"已验证数据集: {path}, 样本数: {len(dataset)}")
        return dataset, len(dataset)  # 返回验证后的数据集和样本数

    # 验证训练集和验证集
    train_data, train_size = validate_dataset(train_path)
    val_data, val_size = validate_dataset(val_path)

    # 保存验证后的数据集(确保后续使用的是格式正确的数据)
    with open(train_path, 'w', encoding='utf-8') as f:
        json.dump(train_data, f, ensure_ascii=False, indent=2)
    with open(val_path, 'w', encoding='utf-8') as f:
        json.dump(val_data, f, ensure_ascii=False, indent=2)

    print(f"训练集规模: {train_size}, 验证集规模: {val_size}")
    return train_path, val_path  # 返回验证后的数据集路径


# 数据预处理:将原始文本转换为BERT可接受的输入格式(核心步骤)
def prepare_data(dataset_path, tokenizer, max_length=512):
    # 加载数据集
    with open(dataset_path, 'r', encoding='utf-8') as f:
        dataset = json.load(f)

    # 打印第一个样本的结构(方便调试和理解数据格式)
    if dataset:
        print(f"\n样本结构示例:")
        for key, value in dataset[0].items():
            if key == 'answers':
                print(f"  {key}: {type(value).__name__}({list(value.keys())})")  # 显示answers的子字段
            else:
                print(f"  {key}: {type(value).__name__}")  # 显示问题和上下文的类型

    # 过滤无效样本(保留格式正确的样本用于训练)
    valid_samples = []
    for i, item in enumerate(dataset):
        try:
            # 基本字段类型检查(问题和上下文必须是字符串)
            if not isinstance(item['question'], str) or not isinstance(item['context'], str):
                raise TypeError(f"问题或上下文不是字符串类型(必须为文本)")

            ans = item['answers']
            if not isinstance(ans, dict):
                raise TypeError(f"answers字段不是字典类型(格式错误)")

            # 验证答案字段的格式(必须是列表)
            if not isinstance(ans.get('text', []), list) or not isinstance(ans.get('answer_start', []), list):
                raise ValueError(f"answers.text或answer_start格式错误(必须为列表)")

            # 处理无答案样本(text和answer_start均为空)
            if not ans['text'] and not ans['answer_start']:
                valid_samples.append({
                    'question': item['question'],
                    'context': item['context'],
                    'answers': {'text': [], 'answer_start': []}  # 保留无答案样本
                })
                continue

            # 有答案样本必须同时包含text和answer_start(排除格式矛盾的样本)
            if not ans['text'] or not ans['answer_start']:
                raise ValueError(f"answers.text和answer_start需同时为空或非空(格式矛盾)")

            # 取第一个答案(通常抽取式问答每个问题对应一个答案)
            answer_text = ans['text'][0]
            answer_start = ans['answer_start'][0]

            # 确保answer_start是整数(若为字符串则尝试转换)
            if not isinstance(answer_start, int):
                try:
                    answer_start = int(answer_start)
                except:
                    raise TypeError(f"answer_start不是整数(无法转换为位置索引)")

            # 验证答案起始位置是否在上下文范围内
            context = item['context']
            if answer_start < 0 or answer_start >= len(context):
                raise ValueError(f"answer_start超出上下文范围(上下文长度为{len(context)})")

            # 修正答案文本与上下文的匹配性(若标注错误,以上下文实际内容为准)
            actual_text = context[answer_start:answer_start + len(answer_text)]
            if actual_text != answer_text:
                print(f"警告: 样本 {i} 的答案文本不匹配")
                print(f"  预期: '{answer_text}', 实际: '{actual_text}'")
                answer_text = actual_text  # 修正为上下文实际内容

            # 将验证通过的样本加入有效列表
            valid_samples.append({
                'question': item['question'],
                'context': context,
                'answers': {
                    'text': [answer_text],
                    'answer_start': [answer_start]
                }
            })

        except Exception as e:
            print(f"跳过无效样本 {i}: {e}")  # 打印错误信息并跳过无效样本

    print(f"有效样本数: {len(valid_samples)}/{len(dataset)}")  # 统计有效样本比例

    # 提取问题、上下文、答案列表(用于批量处理)
    questions = [item['question'] for item in valid_samples]
    contexts = [item['context'] for item in valid_samples]
    answers = [item['answers'] for item in valid_samples]

    # 用BERT分词器编码文本:将问题和上下文转换为模型可识别的token ID
    # 关键参数说明:
    # - truncation="only_first": 仅截断上下文(保留问题完整,因问题更关键)
    # - return_offsets_mapping: 返回token与原始文本字符的映射关系(用于定位答案)
    # - return_token_type_ids: 返回token类型(0表示问题,1表示上下文,BERT需要区分)
    encodings = tokenizer(
        contexts,  # 第一个参数:上下文列表
        questions,  # 第二个参数:问题列表(BERT要求上下文在前,问题在后)
        truncation="only_first",
        padding='max_length',  # 填充至max_length(512)
        max_length=max_length,  # BERT-base模型最大支持512个token
        return_tensors='pt',  # 返回PyTorch张量
        return_offsets_mapping=True,  # 核心:token与原始字符的映射(抽取答案必需)
        return_token_type_ids=True
    )

    # 存储每个样本的答案在token序列中的起始和结束位置(模型训练的标签)
    start_positions = []
    end_positions = []

    for i in range(len(answers)):
        answer = answers[i]
        # 无答案样本:将start和end位置设为0(对应[CLS] token,BERT中常用0表示无答案)
        if not answer['text'] or not answer['answer_start']:
            start_positions.append(0)
            end_positions.append(0)
            continue

        # 有答案样本:计算答案在token序列中的位置
        start_char = answer['answer_start'][0]  # 答案在上下文的起始字符索引
        end_char = start_char + len(answer['text'][0]) - 1  # 答案在上下文的结束字符索引(减1避免超出)

        # 定位上下文在token序列中的范围(通过token_type_ids区分问题和上下文)
        sequence_ids = encodings.sequence_ids(i)  # 第i个样本的token类型(0=问题,1=上下文,None=特殊token)
        context_start = sequence_ids.index(1)  # 上下文的第一个token索引
        context_end = len(sequence_ids) - 1 - sequence_ids[::-1].index(1)  # 上下文的最后一个token索引

        # 查找答案对应的token起始和结束位置
        start_token = None  # 答案起始token索引
        end_token = None    # 答案结束token索引
        offsets = encodings.offset_mapping[i]  # 第i个样本的token-字符映射(每个token对应(起始字符, 结束字符))

        # 寻找答案起始token:找到第一个包含start_char的token
        for idx in range(context_start, context_end + 1):
            token_start, token_end = offsets[idx]  # 当前token的字符范围
            if token_start <= start_char < token_end:  # start_char在当前token范围内
                start_token = idx
                break

        # 寻找答案结束token:找到最后一个包含end_char的token
        for idx in range(context_end, context_start - 1, -1):
            token_start, token_end = offsets[idx]  # 当前token的字符范围
            if token_start <= end_char < token_end:  # end_char在当前token范围内
                end_token = idx
                break

        # 兜底处理:若未找到答案位置(如tokenization导致偏移),视为无答案
        if start_token is None or end_token is None:
            print(f"警告:样本 {i} 的答案无法映射到tokens,设为无答案")
            start_positions.append(0)
            end_positions.append(0)
        else:
            start_positions.append(start_token)
            end_positions.append(end_token)

    # 移除offset_mapping(模型不需要该字段,仅预处理时用)
    encodings.pop('offset_mapping')

    # 将start_positions和end_positions加入编码结果(作为训练标签)
    encodings.update({
        'start_positions': start_positions,  # 答案起始token索引
        'end_positions': end_positions      # 答案结束token索引
    })

    # 返回自定义数据集对象(封装编码后的数据)
    return QADataset(encodings)


# 模型训练函数:定义训练流程,优化模型参数
def train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, model_save_path):
    # 将模型设为训练模式(启用dropout等训练特有的层)
    model.train()
    # 创建模型保存目录(若不存在)
    os.makedirs(model_save_path, exist_ok=True)

    # 遍历训练轮次
    for epoch in range(epochs):
        total_loss = 0  # 累计训练损失
        # 用tqdm显示训练进度
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))

        # 遍历每个批次的训练数据
        for step, batch in progress_bar:
            # 将批次数据移至指定设备(GPU/CPU)
            input_ids = batch['input_ids'].to(device)  # token ID序列
            attention_mask = batch['attention_mask'].to(device)  # 注意力掩码(区分有效token和填充)
            start_positions = batch['start_positions'].to(device)  # 答案起始位置标签
            end_positions = batch['end_positions'].to(device)      # 答案结束位置标签
            # 获取token_type_ids(区分问题和上下文,BERT需要)
            token_type_ids = batch.get('token_type_ids', None)
            if token_type_ids is not None:
                token_type_ids = token_type_ids.to(device)

            # 前向传播:将输入传入模型,得到输出
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids,
                start_positions=start_positions,  # 传入标签用于计算损失
                end_positions=end_positions
            )

            # 提取损失值(BertForQuestionAnswering会自动计算start和end位置的联合损失)
            loss = outputs.loss
            total_loss += loss.item()  # 累计损失(转换为Python数值)

            # 反向传播:计算梯度
            optimizer.zero_grad()  # 清空之前的梯度(避免累积)
            loss.backward()        # 计算当前损失对参数的梯度
            optimizer.step()       # 根据梯度更新参数
            scheduler.step()       # 调整学习率(按预设策略)

            # 更新进度条显示当前轮次和损失
            progress_bar.set_description(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}')

        # 计算本轮平均训练损失
        avg_train_loss = total_loss / len(train_loader)
        # 在验证集上评估模型性能(计算验证损失)
        val_loss = evaluate_model(model, val_loader)
        # 打印本轮训练和验证损失(用于判断模型是否过拟合)
        print(f'Epoch {epoch + 1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}')

    # 训练完成后保存模型和分词器(模型权重、配置、分词器词汇表)
    model.save_pretrained(model_save_path)
    tokenizer.save_pretrained(model_save_path)
    print(f"模型已保存至: {model_save_path}")


# 模型评估函数:在验证集上计算损失,评估模型泛化能力
def evaluate_model(model, val_loader):
    # 将模型设为评估模式(关闭dropout等层,确保结果稳定)
    model.eval()
    total_loss = 0  # 累计验证损失

    # 关闭梯度计算(评估时无需更新参数,节省内存)
    with torch.no_grad():
        # 遍历验证集批次
        for batch in val_loader:
            # 数据移至设备
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            start_positions = batch['start_positions'].to(device)
            end_positions = batch['end_positions'].to(device)
            token_type_ids = batch.get('token_type_ids', None)
            if token_type_ids is not None:
                token_type_ids = token_type_ids.to(device)

            # 前向传播(仅计算输出,不更新参数)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                token_type_ids=token_type_ids,
                start_positions=start_positions,
                end_positions=end_positions
            )

            # 累计验证损失
            total_loss += outputs.loss.item()

    # 返回平均验证损失(越低表示模型在未见过的数据上表现越好)
    return total_loss / len(val_loader)


# 答案预测函数:根据问题和上下文,用训练好的模型生成答案
def predict_answer(question, context, model, tokenizer, confidence_threshold=0.1, debug=True):
    # 将模型设为评估模式
    model.eval()

    # 用分词器编码输入(问题和上下文)
    inputs = tokenizer(
        context,  # 上下文在前
        question,  # 问题在后(符合BERT输入格式)
        truncation=True,  # 超过最大长度则截断
        max_length=512,
        return_offsets_mapping=True,  # 保留token与字符的映射,用于提取答案
        return_tensors='pt'  # 返回PyTorch张量
    )
    # 提取offset_mapping并转换为numpy数组(用于后续映射)
    offset_mapping = inputs.pop('offset_mapping').numpy()[0]  # 形状:(token数, 2)
    # 将输入移至模型所在设备
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # 关闭梯度计算,进行推理
    with torch.no_grad():
        outputs = model(** inputs)  # 传入所有输入字段

    # 提取模型输出的logits(未归一化的概率)
    start_logits = outputs.start_logits[0].cpu().numpy()  # 每个token作为答案起始的分数
    end_logits = outputs.end_logits[0].cpu().numpy()      # 每个token作为答案结束的分数
    # 找到分数最高的起始和结束token索引
    start_idx = start_logits.argmax()
    end_idx = end_logits.argmax()

    # 调试信息:打印中间结果(帮助分析预测错误原因)
    if debug:
        print(f"\n===== 预测调试信息 =====")
        print(f"问题: {question}")
        print(f"上下文: {context}")
        # 打印预测的token索引和对应分数(分数越高表示模型越有把握)
        # print(f"start_idx={start_idx}, end_idx={end_idx}")
        # print(f"start_logits[start_idx]={start_logits[start_idx]:.4f}, end_logits[end_idx]={end_logits[end_idx]:.4f}")

        # 将token ID转换为文字(查看模型关注的token)
        tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0].cpu().numpy())
        # print(f"预测的起始token: {tokens[start_idx]}")
        # print(f"预测的结束token: {tokens[end_idx]}")
        # print(f"start_idx的offset: {offset_mapping[start_idx]}")
        # print(f"end_idx的offset: {offset_mapping[end_idx]}")

    # 过滤低置信度或无效的答案(无答案逻辑)
    if (start_logits[start_idx] < confidence_threshold or  # 起始位置分数过低
            end_logits[end_idx] < confidence_threshold or  # 结束位置分数过低
            start_idx > end_idx or  # 起始位置在结束位置之后(无效)
            (start_idx == 0 and end_idx == 0)):  # 起始和结束均为0(对应[CLS],视为无答案)
        if debug:
            print(f"答案过滤: 置信度不足或无答案")
        return "未找到答案"

    # 边界扩展:适当延长答案结束位置(若下一个token的分数较高)
    max_end_idx = len(end_logits) - 1
    while end_idx < max_end_idx and end_logits[end_idx + 1] > end_logits[end_idx] * 0.8:
        end_idx += 1
        if debug:
            print(f"扩展end_idx至: {end_idx}, end_logits={end_logits[end_idx]:.4f}")

    # 根据offset_mapping将token位置映射回原始文本的字符位置
    start_char = offset_mapping[start_idx][0]  # 答案起始字符索引
    end_char = offset_mapping[end_idx][1]      # 答案结束字符索引(取token的结束位置)

    # 调试:打印提取的字符范围和文本
    if debug:
        # print(f"提取字符范围: [{start_char}, {end_char})")
        print(f"提取的答案文本: '{context[start_char:end_char]}'")

    # 从上下文提取答案文本
    answer = context[start_char:end_char]
    return answer if answer else "未找到答案"


# 主函数:整合数据加载、模型训练和交互式问答流程
def main():
    try:
        # 加载并验证训练集和验证集
        train_path, val_path = load_existing_dataset()

        # 加载中文BERT分词器和模型(专为中文优化的抽取式问答模型)
        # BertTokenizerFast:快速分词器,支持offset_mapping(抽取式任务必需)
        # BertForQuestionAnswering:在BERT基础上添加了start/end logits输出层,用于预测答案位置
        tokenizer = BertTokenizerFast.from_pretrained('bert-base-chinese')
        model = BertForQuestionAnswering.from_pretrained('bert-base-chinese').to(device)

        # 准备训练数据(转换为模型输入格式)
        print("\n准备训练数据...")
        train_dataset = prepare_data(train_path, tokenizer)
        # 准备验证数据
        print("\n准备验证数据...")
        val_dataset = prepare_data(val_path, tokenizer)

        # 创建数据加载器(按批次加载数据,支持打乱和并行加载)
        train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)  # 训练集打乱顺序
        val_loader = DataLoader(val_dataset, batch_size=8)  # 验证集无需打乱

        # 配置优化器和学习率调度器
        optimizer = AdamW(model.parameters(), lr=1e-5)  # 学习率1e-5(BERT微调常用)
        epochs = 20  # 训练轮次(根据数据量调整,20轮适合中小规模数据集)
        total_steps = len(train_loader) * epochs  # 总训练步数
        # 线性学习率调度器:从初始学习率线性衰减到0(避免后期波动)
        scheduler = get_linear_schedule_with_warmup(optimizer, 0, total_steps)

        # 开始训练模型
        print("\n开始训练...")
        train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, './chinese_qa_model')

        # 在验证集上评估最终模型性能
        val_loss = evaluate_model(model, val_loader)
        print(f"\n评估结果: eval_loss={val_loss:.4f}(损失越低越好)")

        # 交互式问答测试(允许用户输入问题和上下文,查看模型预测结果)
        print("\n=== 交互式问答 ===")
        print("提示: 输入q退出,输入d切换调试模式,输入r重新加载模型")
        debug_mode = True  # 默认开启调试模式(显示中间过程)

        while True:
            question = input("\n问题 (输入q退出, d切换调试模式, r重新加载模型): ").strip()

            # 处理用户指令
            if question.lower() == 'q':
                break  # 退出程序
            if question.lower() == 'd':
                debug_mode = not debug_mode  # 切换调试模式
                print(f"调试模式已切换为: {'开启' if debug_mode else '关闭'}")
                continue
            if question.lower() == 'r':
                print("重新加载模型...")  # 重新加载训练好的模型
                model = BertForQuestionAnswering.from_pretrained('./chinese_qa_model').to(device)
                print("模型已重新加载")
                continue

            # 获取用户输入的上下文
            context = input("上下文: ").strip()
            if not context:
                print("上下文不能为空!")
                continue

            # 调用预测函数生成答案
            answer = predict_answer(question, context, model, tokenizer, debug=debug_mode)
            print(f"\n答案: {answer}")

    except Exception as e:
        print(f"\n错误: {e}")
        import traceback
        traceback.print_exc()  # 打印完整错误堆栈,方便调试


# 程序入口:当脚本直接运行时执行main函数
if __name__ == "__main__":
    main()

9、中文版回答系统(隐藏上下文)

模型和分词器

# 导入JSON处理模块(用于加载和解析数据集)
import json
# 导入OS模块(用于文件路径操作和目录创建)
import os
# 导入PyTorch深度学习框架
import torch
# 导入数据集和数据加载器(用于批量处理训练数据)
from torch.utils.data import Dataset, DataLoader
# 导入transformers库中的自动加载器和模型组件
# AutoTokenizer:自动加载与模型匹配的分词器
# AutoModelForSeq2SeqLM:自动加载适合序列到序列任务的语言模型(如T5)
# AdamW:优化器(带权重衰减的Adam)
# get_linear_schedule_with_warmup:学习率调度器
from transformers import (
    AutoTokenizer,
    AutoModelForSeq2SeqLM,
    AdamW,
    get_linear_schedule_with_warmup
)
# 导入进度条库(用于显示训练进度)
from tqdm import tqdm

# 设置计算设备(优先使用GPU,否则使用CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")


# 自定义数据集类(处理问答对,生成模型可接受的输入格式)
class QADataset(Dataset):
    def __init__(self, data, tokenizer, max_length=128):
        # 原始问答数据
        self.data = data
        # 分词器(将文本转换为模型可接受的token IDs)
        self.tokenizer = tokenizer
        # 最大序列长度(超出部分将被截断)
        self.max_length = max_length

    def __len__(self):
        # 返回数据集大小
        return len(self.data)

    def __getitem__(self, idx):
        # 获取单个样本
        item = self.data[idx]
        # 提取问题文本
        question = item['question']
        # 提取答案文本(取第一个答案,若无答案则为空字符串)
        answer = item['answers']['text'][0] if item['answers']['text'] else ""

        # 编码问题(添加"question: "前缀,明确任务类型)
        # 注:T5模型通过前缀提示任务类型(如"translate English to German: ...")
        inputs = self.tokenizer(
            f"question: {question}",  # 添加任务前缀
            max_length=self.max_length,
            truncation=True,  # 截断超出长度的文本
            padding='max_length',  # 填充至最大长度
            return_tensors='pt'  # 返回PyTorch张量
        )

        # 编码答案(作为模型的目标输出)
        labels = self.tokenizer(
            answer,
            max_length=self.max_length,
            truncation=True,
            padding='max_length',
            return_tensors='pt'
        )

        # 将padding位置的标签设为-100(PyTorch交叉熵损失会忽略-100位置)
        # 这样在计算损失时会忽略填充部分,只关注有效token
        labels = labels['input_ids'].squeeze()
        labels[labels == self.tokenizer.pad_token_id] = -100

        return {
            'input_ids': inputs['input_ids'].squeeze(),  # 问题的token IDs
            'attention_mask': inputs['attention_mask'].squeeze(),  # 注意力掩码(标识有效token)
            'labels': labels  # 答案的token IDs(padding位置为-100)
        }


# 加载并过滤数据集(仅保留有答案的样本)
def load_data(train_path='data/train_dataset.json', val_path='data/val_dataset.json'):
    def load_and_filter(path):
        # 读取JSON格式的数据集
        with open(path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        # 过滤无答案的样本(生成式模型需要明确的答案)
        filtered = [item for item in data if item['answers']['text']]
        print(f"加载{path},有效样本数: {len(filtered)}/{len(data)}")
        return filtered

    # 加载训练集和验证集
    train_data = load_and_filter(train_path)
    val_data = load_and_filter(val_path)
    return train_data, val_data


# 模型训练函数
def train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, save_path):
    # 设置模型为训练模式(启用dropout等训练特有的层)
    model.train()
    # 创建保存模型的目录
    os.makedirs(save_path, exist_ok=True)

    # 训练主循环
    for epoch in range(epochs):
        total_loss = 0  # 累计训练损失
        # 使用tqdm显示训练进度
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))

        for step, batch in progress_bar:
            # 将数据移至指定设备(GPU/CPU)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # 前向传播(计算模型输出和损失)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels  # 传入目标答案,模型会自动计算生成损失
            )

            loss = outputs.loss
            total_loss += loss.item()  # 累计损失值

            # 反向传播(计算梯度并更新模型参数)
            optimizer.zero_grad()  # 清空梯度
            loss.backward()  # 计算梯度
            optimizer.step()  # 更新参数
            scheduler.step()  # 更新学习率

            # 更新进度条显示当前损失
            progress_bar.set_description(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}')

        # 计算平均训练损失
        avg_train_loss = total_loss / len(train_loader)
        # 在验证集上评估模型
        val_loss = evaluate_model(model, val_loader)
        print(f'Epoch {epoch + 1}, 训练损失: {avg_train_loss:.4f}, 验证损失: {val_loss:.4f}')

    # 保存训练好的模型和分词器
    model.save_pretrained(save_path)
    tokenizer.save_pretrained(save_path)
    print(f"模型已保存至: {save_path}")


# 模型评估函数(计算验证集上的平均损失)
def evaluate_model(model, val_loader):
    # 设置模型为评估模式(关闭dropout等)
    model.eval()
    total_loss = 0

    with torch.no_grad():  # 不计算梯度(节省内存和计算资源)
        for batch in val_loader:
            # 将数据移至指定设备
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # 前向传播计算损失
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            total_loss += outputs.loss.item()

    # 返回平均验证损失
    return total_loss / len(val_loader)


# 答案预测函数(给定问题,生成答案)
def predict_answer(question, model, tokenizer, max_length=128, temperature=0.7):
    # 设置模型为评估模式
    model.eval()

    # 准备输入文本(添加任务前缀)
    input_text = f"question: {question}"
    # 编码输入文本
    inputs = tokenizer(
        input_text,
        max_length=max_length,
        truncation=True,
        padding='max_length',
        return_tensors='pt'
    ).to(device)

    # 生成答案(使用beam search生成更连贯的文本)
    with torch.no_grad():
        outputs = model.generate(
            **inputs,  # 传入编码后的输入
            max_length=max_length,  # 最大生成长度
            num_beams=5,  # beam search的宽度(生成质量和速度的权衡)
            temperature=temperature,  # 控制生成的随机性(值越小越确定性)
            no_repeat_ngram_size=2,  # 避免生成重复的2-gram短语
            early_stopping=True  # 当所有beam都生成结束标记时停止
        )

    # 解码生成的答案(将token IDs转换为文本)
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
    return answer if answer else "无法生成答案"


# 主函数(程序入口点)
def main():
    # 加载预训练的中文T5模型和分词器
    # Langboat/mengzi-t5-base:专为中文优化的T5模型,适合生成任务
    model_name = "Langboat/mengzi-t5-base"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device)

    # 加载训练数据和验证数据
    train_data, val_data = load_data(
        train_path='data/train_dataset.json',
        val_path='data/val_dataset.json'
    )

    # 创建数据集和数据加载器
    train_dataset = QADataset(train_data, tokenizer)
    val_dataset = QADataset(val_data, tokenizer)
    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)  # 训练集打乱顺序
    val_loader = DataLoader(val_dataset, batch_size=8)  # 验证集不打乱

    # 配置优化器和学习率调度器
    optimizer = AdamW(model.parameters(), lr=5e-5)  # 学习率设置为5e-5
    epochs = 15  # 训练15个轮次
    total_steps = len(train_loader) * epochs  # 计算总训练步数
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

    # 开始训练模型
    print("开始训练生成式问答模型...")
    train_model(model, tokenizer, train_loader, val_loader, optimizer, scheduler, epochs, './chinese_qa_generator')

    # 交互式问答测试(无需提供上下文,模型直接生成答案)
    print("\n=== 交互式问答(无需上下文) ===")
    print("提示: 输入问题(输入q退出)")
    while True:
        question = input("\n问题: ").strip()
        if question.lower() == 'q':
            break  # 退出程序
        # 调用预测函数生成答案
        answer = predict_answer(question, model, tokenizer)
        print(f"答案: {answer}")


# 程序入口点(当脚本直接运行时执行main函数)
if __name__ == "__main__":
    main()

10、中英文版回答系统(隐藏上下文)(需要调试)

"""
多语言生成式问答模型(修复数据集合并与索引错误)
"""
import torch
import os
import random
import json
from datasets import load_dataset, Dataset, concatenate_datasets  # 新增合并工具
from transformers import (
    AutoModelForSeq2SeqLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer
)

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"使用设备: {device}")


# 1. 数据加载函数(修复索引错误)
def load_qa_data(json_path, sample_ratio=0.01, is_chinese=False):
    """
    加载问答数据,修复:
    - 中文样本的list index out of range错误
    - 数据集合并方法
    """
    try:
        if is_chinese:
            # 中文数据:手动加载扁平JSON
            with open(json_path, 'r', encoding='utf-8') as f:
                raw_data = json.load(f)
            if not isinstance(raw_data, list):
                print(f"中文数据错误:{json_path} 必须是JSON列表")
                exit(1)
            dataset = Dataset.from_list(raw_data)
        else:
            # 英文数据:SQuAD格式
            dataset = load_dataset("json", data_files=json_path, field="data")["train"]
    except Exception as e:
        print(f"加载数据失败: {e}")
        exit(1)

    # 提取问题和答案(修复索引错误)
    samples = []
    for idx, item in enumerate(dataset):
        try:
            if is_chinese:  # 中文数据处理(核心修复)
                question = item.get("question", "").strip()
                answers = item.get("answers", {})  # 默认为字典,避免列表索引错误

                # 修复:先判断answers类型,再提取text
                if isinstance(answers, list):
                    # 处理列表格式:answers = [{"text": "..."}]
                    if len(answers) == 0:
                        print(f"中文样本{idx}错误:answers为空列表")
                        continue
                    answer_text = answers[0].get("text", "").strip()
                elif isinstance(answers, dict):
                    # 处理字典格式:answers = {"text": ["..."]}
                    text_list = answers.get("text", [])
                    if not isinstance(text_list, list) or len(text_list) == 0:
                        print(f"中文样本{idx}错误:answers.text为空列表")
                        continue
                    answer_text = text_list[0].strip()
                else:
                    print(f"中文样本{idx}错误:answers格式错误(非列表/字典)")
                    continue

                # 验证有效样本
                if question and answer_text:
                    samples.append({"question": question, "answer": answer_text})
                else:
                    print(f"中文样本{idx}无效:问题或答案为空")
            else:  # 英文数据处理
                for para in item.get("paragraphs", []):
                    for qa in para.get("qas", []):
                        question = qa.get("question", "").strip()
                        answers = qa.get("answers", [{"text": ""}])
                        if len(answers) == 0:
                            continue
                        answer_text = answers[0]["text"].strip()
                        if question and answer_text:
                            samples.append({"question": question, "answer": answer_text})
        except Exception as e:
            print(f"样本{idx}处理错误: {e},已跳过")
            continue

    # 抽样逻辑(安全处理)
    if len(samples) == 0:
        print(f"错误:{json_path} 未提取到有效样本")
        exit(1)
    sample_size = min(int(len(samples) * sample_ratio), len(samples))
    sample_size = max(1, sample_size)
    sampled = samples if len(samples) <= sample_size else random.sample(samples, sample_size)
    print(f"{ '中文' if is_chinese else '英文' }数据:原始{len(samples)}条,抽样后{len(sampled)}条")

    # 打印示例
    if sampled:
        print(f"样本示例 - 问题: {sampled[0]['question'][:50]}...")
        print(f"样本示例 - 答案: {sampled[0]['answer'][:50]}...")
    return Dataset.from_list(sampled)


# 2. 预处理函数(保持不变)
def preprocess_data(examples, tokenizer, max_length=128):
    inputs = tokenizer(
        examples["question"],
        max_length=max_length,
        truncation=True,
        padding="max_length",
        return_tensors="pt"
    )

    labels = tokenizer(
        examples["answer"],
        max_length=max_length,
        truncation=True,
        padding="max_length",
        return_tensors="pt"
    )["input_ids"]

    labels[labels == tokenizer.pad_token_id] = -100
    return {
        "input_ids": inputs["input_ids"].squeeze(),
        "attention_mask": inputs["attention_mask"].squeeze(),
        "labels": labels.squeeze()
    }


# 3. 训练函数(保持不变)
def train_model(model, tokenizer, train_data, val_data, output_dir):
    print("预处理训练数据...")
    tokenized_train = train_data.map(
        lambda x: preprocess_data(x, tokenizer),
        batched=True,
        remove_columns=train_data.column_names,
        desc="预处理训练集"
    )

    print("预处理验证数据...")
    tokenized_val = val_data.map(
        lambda x: preprocess_data(x, tokenizer),
        batched=True,
        remove_columns=val_data.column_names,
        desc="预处理验证集"
    )

    training_args = TrainingArguments(
        output_dir=output_dir,
        evaluation_strategy="epoch",
        learning_rate=3e-5,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=4,
        num_train_epochs=3,
        weight_decay=0.01,
        logging_steps=5,
        save_strategy="epoch",
        fp16=device == "cuda",
        load_best_model_at_end=True
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train,
        eval_dataset=tokenized_val,
        tokenizer=tokenizer
    )

    print("开始训练模型...")
    trainer.train()
    trainer.save_model(output_dir)
    print(f"模型已保存至: {output_dir}")
    return model


# 4. 预测函数(保持不变)
def predict_answer(question, model, tokenizer):
    model.eval()
    input_text = f"answer the question: {question}"
    inputs = tokenizer(
        input_text,
        max_length=128,
        truncation=True,
        return_tensors="pt"
    ).to(device)

    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_length=128,
            num_beams=3,
            temperature=0.7,
            no_repeat_ngram_size=2,
            early_stopping=True
        )

    return tokenizer.decode(outputs[0].cpu(), skip_special_tokens=True).strip() or "无法生成答案"


# 主函数(修复数据集合并)
def main():
    model_config = {
        "model_name": "google/mt5-base",
        "train_data": {
            "english": "train-v2.0.json",
            "chinese": "train_dataset.json"
        },
        "val_data": {
            "english": "dev-v2.0.json",
            "chinese": "val_dataset.json"
        },
        "output_dir": "./multilingual_qa_model"
    }

    print(f"加载模型: {model_config['model_name']}")
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_config["model_name"])
        model = AutoModelForSeq2SeqLM.from_pretrained(model_config["model_name"]).to(device)
    except Exception as e:
        print(f"加载模型失败: {e}")
        return

    # 加载英文数据
    print("\n加载英文训练数据...")
    english_train = load_qa_data(model_config["train_data"]["english"], sample_ratio=0.05, is_chinese=False)
    print("加载英文验证数据...")
    english_val = load_qa_data(model_config["val_data"]["english"], sample_ratio=0.05, is_chinese=False)

    # 加载中文数据
    print("\n加载中文训练数据...")
    chinese_train = load_qa_data(model_config["train_data"]["chinese"], sample_ratio=1.0, is_chinese=True)
    print("加载中文验证数据...")
    chinese_val = load_qa_data(model_config["val_data"]["chinese"], sample_ratio=1.0, is_chinese=True)

    # 修复:使用concatenate_datasets合并数据集(兼容所有版本)
    print("\n合并中英文数据集...")
    train_data = concatenate_datasets([english_train, chinese_train])
    val_data = concatenate_datasets([english_val, chinese_val])
    print(f"最终训练样本数: {len(train_data)},验证样本数: {len(val_data)}")

    # 训练模型
    model = train_model(
        model, tokenizer, train_data, val_data, model_config["output_dir"]
    )

    # 交互式测试
    print("\n=== 双语问答测试 ===")
    while True:
        question = input("\n请输入问题 (输入q退出): ").strip()
        if question.lower() == "q":
            break
        print(f"答案: {predict_answer(question, model, tokenizer)}")


if __name__ == "__main__":
    main()


网站公告

今日签到

点亮在社区的每一天
去签到