在神经机器翻译领域,注意力机制的引入极大地提升了模型对长句子的翻译质量。本文将通过一个具体的英译法案例,详细解析基于RNN的Encoder-Decoder架构中注意力机制的实现原理和工作流程,帮助读者直观理解注意力机制如何让模型"聚焦"于输入序列的关键部分。
相关文章
RNN(Recurrent Neural Network,循环神经网络)家族详解:网页链接
人工智能概念:RNN中的基础Encoder-Decoder框架:网页链接
人工智能概念:RNN中的注意力机制详解:网页链接
RNN中的注意力机制代码实现:网页链接
一、案例简介以及程序总体结构
1.1 案例背景
本案例旨在实现一个基于注意力机制的英法翻译模型,输入为英文句子,输出为对应的法文翻译。通过该案例,我们可以清晰观察到:当模型翻译法文单词时,会自动"关注"英文原句中与之相关的单词,这种动态聚焦能力正是注意力机制的核心价值。
1.2 技术选型
- 模型架构:采用Encoder-Decoder框架
- 编码器:LSTM网络,负责将输入的英文序列编码为上下文向量
- 解码器:带注意力机制的LSTM网络,负责生成法文翻译序列
- 核心技术:注意力机制(Attention Mechanism),使解码器在生成每个词时能动态关注编码器输出的相关部分
- 开发框架:PyTorch,用于构建和训练神经网络
1.3 程序总体结构
整个程序可分为五个核心模块:
- 数据处理模块:负责文本清洗、词表构建和数据加载
- 模型定义模块:包括编码器、基础解码器和带注意力的解码器
- 注意力机制模块:实现解码器对编码器输出的动态关注逻辑
- 模型训练模块:定义训练流程、损失计算和参数优化
- 模型预测模块:利用训练好的模型进行翻译并可视化注意力权重
程序的整体流程为:数据预处理→模型构建→模型训练→翻译预测→结果可视化。
二、数据处理部分代码详解
数据处理是神经网络训练的基础环节,其核心目标是将原始文本数据转化为模型可直接处理的结构化格式。本案例的数据处理严格遵循“文本规范化→词表构建→数据加载”的流程,确保输入数据的一致性和有效性。以下是各步骤的详细解析:
2.1 文本规范化(normal_to_string
函数)
文本规范化是数据预处理的第一步,旨在统一文本格式、去除噪声,为后续词表构建和模型输入奠定基础。其流程完全匹配预设的数据处理逻辑:
def normal_to_string(s):
'''
对输入文本进行规范化处理,包括小写转换、标点分隔和特殊字符过滤
参数: s (str): 待处理的原始文本字符串
返回: str: 规范化处理后的文本字符串
'''
s = s.lower().strip() # 转为小写并去除首尾空白
s = re.sub(r"([.!?])", r" \1", s) # 标点符号与前后内容用空格分隔
s = re.sub(r"[^a-zA-Z.!?]+", r" ", s) # 过滤非英文和非标点字符
return s
流程解析:
- 输入处理:接收单句文本(如原始英文或法文句子)作为输入。
- 格式统一:通过
lower()
将所有字母转为小写,消除大小写差异;通过strip()
去除首尾空白字符(空格、换行符等)。 - 标点处理:使用正则表达式将标点符号(
.!?
)与前后文本用空格分隔(如将“hello!”转换为“hello !”),便于后续按词分割。 - 噪声过滤:过滤非英文字母和非标点符号的字符(如中文、数字、特殊符号等),仅保留有效文本内容。
- 输出结果:返回规范化后的句子,确保所有文本格式一致。
2.2 数据加载与词表构建(get_data
函数)
该函数实现了从文件加载数据、分割句子、规范化文本、构建词表的完整流程,严格对应预设的“加载→分割→规范化→词表构建”逻辑:
def get_data(path):
'''加载并预处理双语平行语料,构建源语言和目标语言的词表映射'''
# 1. 根据路径加载数据集
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines() # 按行读取原始数据
# 2. 分割数据集成为句子列表,并进行文本规范化处理
pairs = [[normal_to_string(s) for s in line.strip().split('\t')] for line in lines]
# 每行按制表符(\t)分割为英文和法文句子,再用normal_to_string规范化
# 3. 初始化词表,添加开始标记和结束标记
english_word_to_index = {'<SOS>': 0, '<EOS>': 1} # 英文词→索引映射
french_word_to_index = {'<SOS>': 0, '<EOS>': 1} # 法文词→索引映射
english_word_n = 2 # 英文词表大小(初始含2个特殊标记)
french_word_n = 2 # 法文词表大小
# 4. 遍历规范化后的文本列表,去重后逐一添加到词表
for pair in pairs:
# 处理英文句子,构建英文词表
for word in pair[0].split(' '): # 按空格分割单词
if word not in english_word_to_index: # 去重:仅添加新词
english_word_to_index[word] = english_word_n
english_word_n += 1
# 处理法文句子,构建法文词表
for word in pair[1].split(' '):
if word not in french_word_to_index: # 去重:仅添加新词
french_word_to_index[word] = french_word_n
french_word_n += 1
# 5. 完成索引对单词的词表转换(反向映射)
english_index_to_word = {v: k for k, v in english_word_to_index.items()} # 英文索引→词
french_index_to_word = {v: k for k, v in french_word_to_index.items()} # 法文索引→词
# 输出:词表、词表大小、规范化后的句子列表
return (english_word_to_index, english_index_to_word, english_word_n,
french_word_to_index, french_index_to_word, french_word_n, pairs)
流程解析:
- 数据加载:从指定路径读取原始双语语料(每行格式为“英文句子\t法文句子”)。
- 句子分割与规范化:按制表符分割每行文本为英文和法文句子,再调用
normal_to_string
进行规范化,得到干净的双语对列表pairs
。 - 词表初始化:为英文和法文分别创建词表字典,初始包含特殊标记
<SOS>
(句子开始,索引0)和<EOS>
(句子结束,索引1)。 - 词表填充:遍历所有规范化后的句子,按空格分割为单词,去重后将新词添加到词表并分配唯一索引(从2开始递增),确保每个单词对应唯一索引。
- 反向映射构建:创建“索引→单词”的反向字典,用于后续将模型输出的索引转换为可读文本。
- 输出结果:返回完整的词表(正向和反向映射)、词表大小及规范化后的句子列表,为数据加载做准备。
2.3 自定义数据集与数据加载器(MyDataset
类与my_datasetloader
函数)
该部分实现了数据的批量加载和格式转换,将文本索引序列转化为张量,严格遵循“索引处理→句子提取→标记添加→索引转换→序列填充→张量转换”的流程:
- 自定义数据集类(
MyDataset
)
class MyDataset(Dataset):
'''自定义PyTorch数据集类,用于处理双语平行语料'''
def __init__(self, pairs):
super(MyDataset, self).__init__()
self.pairs = pairs # 存储规范化后的双语对列表
self.len_pairs = len(pairs) # 记录数据集长度
def __len__(self):
# 输出数据集长度,支持用len(数据集对象)获取
return self.len_pairs
def __getitem__(self, index):
# 1. 索引处理,防止越界
index = min(max(index, -self.len_pairs), self.len_pairs - 1)
# 2. 根据索引获取对应的英法句子
x = self.pairs[index][0] # 英文句子(源语言)
y = self.pairs[index][1] # 法文句子(目标语言)
# 3. 为每个句子前后添加开始标记和结束标记
x = '<SOS> ' + x + ' <EOS>'
y = '<SOS> ' + y + ' <EOS>'
# 4. 根据空格切分句子成列表,利用单词表将句子列表变成索引序列
x = [[english_word_to_index[word] for word in x.split(' ')]] # 英文→索引
y = [[french_word_to_index[word] for word in y.split(' ')]] # 法文→索引
# 5. 对序列进行填充,对齐长度(统一为MAX_LENGTH)
x = sequence.pad_sequences(sequences=x, maxlen=MAX_LENGTH,
padding='post', dtype=int, value=1)
y = sequence.pad_sequences(sequences=y, maxlen=MAX_LENGTH,
padding='post', dtype=int, value=1)
# 6. 将列表转换成张量,移动到指定设备(CPU/GPU)
tx = torch.tensor(x, dtype=torch.long, device=device)
ty = torch.tensor(y, dtype=torch.long, device=device)
# 输出:英法文句子索引张量(去除冗余维度)
return tx[0], ty[0]
流程解析:
__init__
方法:初始化数据集,存储双语对列表并记录数据集长度。__len__
方法:返回数据集长度,支持len(dataset)
操作,为数据加载器提供迭代范围。__getitem__
方法(核心):- 索引安全处理:确保输入索引在有效范围内(非负且不超过数据集长度)。
- 句子提取:根据索引获取对应的英文和法文句子。
- 标记添加:在句子首尾添加
<SOS>
和<EOS>
,明确序列边界。 - 索引转换:按空格分割句子为单词,通过词表将单词转换为索引序列(如“hello”→
[5]
)。 - 序列填充:使用
sequence.pad_sequences
将不同长度的索引序列统一为MAX_LENGTH
(不足部分在末尾填充索引1,对应未登录词)。 - 张量转换:将填充后的索引序列转换为PyTorch张量,并移动到指定计算设备(CPU/GPU),便于模型计算。
- 数据加载器(
my_datasetloader
函数)
def my_datasetloader(pairs):
'''创建数据加载器,支持批量处理和数据打乱'''
my_dataset = MyDataset(pairs) # 1. 将规范化后的句子列表传入自定义数据集
# 2. 将实例化后的自定义数据集传入DataLoader,设置批次大小和打乱
my_dataloader = DataLoader(my_dataset, batch_size=batch_size, shuffle=True)
return my_dataloader # 输出:实例化后的DataLoader
流程解析:
- 数据集实例化:将规范化后的双语对列表传入
MyDataset
,创建自定义数据集对象。 - 数据加载器创建:通过
DataLoader
对数据集进行封装,支持按batch_size
批量读取数据,并在每个epoch前打乱数据顺序,提升模型训练效果。 - 输出结果:返回可迭代的数据加载器,供模型训练时按批次获取数据。
三、注意力机制部分代码详解
3.1 带注意力与不带注意力的解码器
在序列到序列(Seq2Seq)模型中,解码器是否引入注意力机制,直接决定了模型对长序列的处理能力。以下先通过核心差异对比建立认知,再深入解析注意力机制实现:
对比维度 | 不带注意力的解码器(DecoderLSTM ) |
带注意力的解码器(AttentionDecoderLSTM ) |
---|---|---|
信息利用方式 | 仅依赖编码器最终隐藏状态(“一视同仁”) | 动态聚焦编码器不同位置输出(“精准关注”) |
长序列表现 | 易丢失远端信息,长文本翻译/生成误差大 | 捕捉长序列关键关联,提升长文本任务准确性 |
核心逻辑 | 编码器最终隐藏状态 → 解码器逐词生成 | 编码器全序列输出 + 注意力权重 → 动态融合上下文 |
适用场景 | 短序列任务(如短语翻译、简单对话) | 长序列任务(如长文本翻译、摘要、问答) |
3.2 编码器:统一输入编码(EncoderLSTM
)
无论解码器是否带注意力,编码器逻辑完全一致——将输入序列编码为隐藏状态,为解码器提供基础信息。
class EncoderLSTM(nn.Module):
def __init__(self, vocab_size, hidden_size, batch_size, num_layers):
super(EncoderLSTM, self).__init__()
self.vocab_size = vocab_size # 输入词表大小(如英文词表)
self.hidden_size = hidden_size # 隐藏层维度(如 256)
self.batch_size = batch_size # 批次大小(如 9)
self.num_layers = num_layers # LSTM 层数(如 1)
self.embed = nn.Embedding(self.vocab_size, self.hidden_size) # 词嵌入
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size,
self.num_layers, batch_first=True) # LSTM 编码
def forward(self, input_x, hidden):
embed_x = self.embed(input_x) # 词索引 → 词向量 [batch, seq_len, hidden]
output, hidden = self.lstm(embed_x, hidden) # 生成全序列隐藏状态 + 最终状态
return output, hidden # output: 编码器全序列输出;hidden: 最终隐藏状态
def init_hidden(self):
# 初始化隐藏状态(h0)和细胞状态(c0)为全零
h0 = torch.zeros(self.num_layers, self.batch_size, self.hidden_size, device=device)
c0 = torch.zeros(self.num_layers, self.batch_size, self.hidden_size, device=device)
return (h0, c0)
作用:
- 为解码器提供两种信息:
output
(编码器全序列隐藏状态):带注意力解码器的Key
和Value
。hidden
(编码器最终隐藏状态):不带注意力解码器的唯一输入(解码器初始状态)。
3.3 不带注意力的解码器(DecoderLSTM
)
完全依赖 编码器最终隐藏状态 生成输出,逻辑简单但长序列表现差。
class DecoderLSTM(nn.Module):
"""基础LSTM解码器(无注意力机制)"""
def __init__(self, french_vocab_size, hidden_size, batch_size, num_layers):
super(DecoderLSTM, self).__init__()
self.french_vocab_size = french_vocab_size # 目标词表大小(如法文词表)
self.hidden_size = hidden_size # 隐藏层维度(与编码器一致)
self.embed = nn.Embedding(self.french_vocab_size, self.hidden_size) # 词嵌入
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size, batch_first=True) # LSTM 解码
self.out = nn.Linear(self.hidden_size, self.french_vocab_size) # 输出层
self.softmax = nn.LogSoftmax(dim=1) # 概率归一化
def forward(self, input_y, hidden):
embed_y = self.embed(input_y) # 词索引 → 词向量 [batch, 1, hidden]
embed_y = F.relu(embed_y) # 非线性激活(可选)
output, hn = self.lstm(embed_y, hidden) # LSTM 解码,更新隐藏状态
output = self.out(output) # 映射到目标词表 [batch, 1, vocab_size]
return output, hn # 返回预测输出 + 新隐藏状态
核心逻辑:
- 输入
input_y
(当前时间步的目标词索引,如<SOS>
或上一步预测词)→ 词嵌入embed_y
。 embed_y
与 编码器最终隐藏状态hidden
一起输入 LSTM,生成新隐藏状态hn
。- LSTM 输出映射为目标词表概率,直接预测当前词。
缺陷:
- 仅用编码器最终隐藏状态(
hidden
),丢失了输入序列中 远端词的细节信息(如长句中开头的词)。 - 生成每个词时“一视同仁”,无法聚焦输入序列的关键位置,长文本任务(如长句翻译)误差大。
3.4 带注意力机制的解码器(AttentionDecoderLSTM
)
通过 注意力权重动态关联编码器全序列输出,解决长序列信息丢失问题。结合流程图,拆解实现细节:
- 初始化:为注意力计算做准备
class AttentionDecoderLSTM(nn.Module):
def __init__(self, french_vocab_size, hidden_size, batch_size, num_layers, max_length, dropout=0.1):
super(AttentionDecoderLSTM, self).__init__()
self.french_vocab_size = french_vocab_size # 目标词表大小
self.hidden_size = hidden_size # 隐藏层维度
self.batch_size = batch_size # 批次大小
self.num_layers = num_layers # LSTM 层数
self.dropout = dropout # Dropout 概率
self.max_length = max_length # 最大序列长度(如 12)
self.embed = nn.Embedding(self.french_vocab_size, self.hidden_size) # 词嵌入
self.dropout_layer = nn.Dropout(self.dropout) # Dropout 层
# 注意力计算层:
# attn_1:将 QK 拼接向量映射为注意力分数(维度 = max_length)
self.attn_1 = nn.Linear(self.hidden_size * 2, max_length)
# attn_2:将 CQ 拼接向量映射为融合特征(维度 = hidden_size)
self.attn_2 = nn.Linear(self.hidden_size * 2, self.hidden_size)
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size,
self.num_layers, batch_first=True) # LSTM 解码
self.out = nn.Linear(self.hidden_size, self.french_vocab_size) # 输出层
self.softmax = nn.LogSoftmax(dim=-1) # 概率归一化
新增组件:
attn_1
、attn_2
:用于计算注意力权重和融合上下文信息。max_length
:输入序列的最大长度(需提前规范),确保注意力权重维度匹配。
- 前向传播:注意力机制的核心逻辑
def forward(self, input_y, hidden, encoder_output):
# 1. 生成 Query(解码器词向量)
embed_y = self.embed(input_y) # [batch, 1, hidden] → 当前词向量(Query)
embed_y = self.dropout_layer(embed_y) # Dropout 防止过拟合
# 2. 构建 QK 向量(融合 Query 和 Key)
# hidden[0]:解码器隐藏状态(Key 的一部分),转置后与 Query 拼接
hidden_transposed = hidden[0].transpose(0, 1) # [batch, 1, hidden]
QK = torch.cat([embed_y, hidden_transposed], dim=-1) # [batch, 1, 2*hidden]
# 3. 计算注意力权重(attn_weights)
attn_scores = self.attn_1(QK) # [batch, 1, max_length] → 注意力分数
attn_weights = self.softmax(attn_scores) # 归一化 → 注意力权重(关注程度)
# 4. 生成上下文向量(Context)
# encoder_output:编码器全序列输出(Value),与注意力权重加权求和
context = torch.bmm(attn_weights, encoder_output) # [batch, 1, hidden] → 上下文向量
# 5. 融合上下文与 Query(CQ 向量)
CQ = torch.cat([context, embed_y], dim=-1) # [batch, 1, 2*hidden] → 融合向量
attention_output = self.attn_2(CQ) # [batch, 1, hidden] → 注意力输出
# 6. LSTM 解码与输出预测
lstm_output, hn = self.lstm(attention_output, hidden) # 更新隐藏状态
output = self.out(lstm_output) # 映射到目标词表
# output = F.relu(output) # 实验发现,本案例中不使用relu效果更佳
output = self.softmax(output[:, 0, :]) # 归一化概率
return output, hn, attn_weights # 返回预测、隐藏状态、注意力权重
注意力逻辑拆解(结合流程图):
- Query 生成:
embed_y
(当前解码词的向量表示)。 - Key 构建:
hidden_transposed
(解码器隐藏状态,含历史信息)。 - 注意力权重计算:
QK
拼接 →attn_1
线性变换 →softmax
归一化 → 得到attn_weights
(解码器对编码器各位置的关注程度)。 - Value 利用:
encoder_output
(编码器全序列输出)与attn_weights
加权求和 → 得到context
(聚焦关键信息的上下文向量)。 - 信息融合:
context
(全局信息) +embed_y
(当前信息) →CQ
拼接 →attn_2
线性变换 → 得到attention_output
(融合后特征)。 - 解码预测:
attention_output
输入 LSTM → 生成新隐藏状态 → 映射为目标词概率。
3.5 核心差异对比:注意力如何解决长序列问题
场景 | 不带注意力的解码器(DecoderLSTM ) |
带注意力的解码器(AttentionDecoderLSTM ) |
---|---|---|
信息来源 | 仅用编码器最终隐藏状态(丢失长序列远端信息) | 用编码器全序列输出 + 注意力动态聚焦(保留关键细节) |
长句翻译示例 | 输入“Long sentences are hard to translate” 输出可能丢失开头“Long”的关联,翻译为“Phrases courtes sont difficiles à traduire”(错误) |
输入“Long sentences are hard to translate” 通过注意力聚焦“Long”,翻译为“Phrases longues sont difficiles à traduire”(正确) |
权重可视化 | 无注意力权重,无法解释模型决策 | 可可视化 attn_weights ,直观看到模型关注的输入位置(如翻译“longues”时,权重集中在“Long”) |
3.5 注意力机制的价值
带注意力的解码器通过 动态聚焦编码器全序列输出,解决了传统模型“长序列信息丢失”的核心问题,使模型在长文本任务(如翻译、摘要)中表现显著提升。其本质是让解码器生成每个词时,能“主动选择”输入序列的关键部分,而非被动依赖单一的最终隐藏状态。
结合代码和对比,可清晰看到注意力机制如何通过 Query-Key-Value
范式,实现“精准关注→信息融合→准确生成”的完整逻辑,这也是现代 NLP 模型处理长序列的核心思路。
四、模型训练部分代码详解
模型训练是将“数据→模型→损失函数”串联,通过迭代优化参数让模型学习翻译规律的核心流程。结合流程图,以下从 训练框架搭建、单批次训练(train_iter
)、完整训练循环(train_model
) 三个维度拆解实现:
4.1 流程总览
模型训练严格遵循“数据加载→模型初始化→多轮迭代训练→参数更新→模型保存”的逻辑,核心是通过 train_iter
实现单批次的“前向传播→损失计算→反向传播”,再由 train_model
控制多轮迭代。
4.2 单批次训练:train_iter
函数
train_iter
实现单批次数据的完整训练流程(前向传播、损失计算、反向传播),是模型学习的“最小单元”。
def train_iter(x, y, encoder, decoder, encoder_adam, decoder_adam, loss_def):
"""执行单批次训练:前向传播→损失计算→反向传播"""
# 1. 编码器前向传播:输入英文序列,生成隐藏状态
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
decoder_hidden = encoder_hidden # 解码器初始状态 = 编码器最终状态
total_loss = 0.0 # 初始化批次总损失
y_len = 0 # 记录有效序列长度(避免填充符影响)
# 2. 决定是否启用 Teacher Forcing(加速训练的策略)
use_teacher_forcing = random.random() < teacher_forcing_ratio # 随机概率启用
if use_teacher_forcing:
# 模式1:Teacher Forcing → 用真实标签作为解码器下一时间步输入
for idx in range(MAX_LENGTH):
input_y = y[:, idx].reshape(-1, 1) # 当前时间步输入(真实标签)
# 解码器前向传播(带注意力时需 encoder_output)
decoder_output, decoder_hidden, _ = decoder(input_y, decoder_hidden, encoder_output)
# 计算损失(用下一时间步的真实标签)
if idx + 1 < MAX_LENGTH:
target_y = y[:, idx + 1] # 目标标签
total_loss += loss_def(decoder_output, target_y) # 累加损失
y_len += 1 # 有效长度+1
# 提前终止:所有样本都预测出 <EOS> 时停止
topv, topi = torch.topk(decoder_output, k=1)
if (topi == EOS_TOKEN).all():
break
else:
# 模式2:非Teacher Forcing → 用模型预测作为下一时间步输入
for idx in range(MAX_LENGTH):
# 初始输入用真实标签(<SOS>)
if idx == 0:
input_y = y[:, idx].reshape(-1, 1)
# 解码器前向传播
decoder_output, decoder_hidden, _ = decoder(input_y, decoder_hidden, encoder_output)
# 计算损失
if idx + 1 < MAX_LENGTH:
target_y = y[:, idx + 1]
total_loss += loss_def(decoder_output, target_y)
y_len += 1
# 获取预测结果,作为下一时间步输入
topv, topi = torch.topk(decoder_output, k=1)
input_y = topi # 预测值 → 下一时间步输入
# 提前终止
if (topi == EOS_TOKEN).all():
break
# 3. 反向传播与参数更新
encoder_adam.zero_grad() # 编码器梯度清零
decoder_adam.zero_grad() # 解码器梯度清零
total_loss.backward() # 反向传播计算梯度
encoder_adam.step() # 更新编码器参数
decoder_adam.step() # 更新解码器参数
# 返回平均损失(总损失 / 有效长度)
return total_loss.item() / y_len if y_len > 0 else 0.0
流程图对应关系:
输入:英语句子索引(x), 法语句子索引(y), 模型, 优化器, 损失函数
→ 函数参数将英语句子索引送入编码器
→encoder(x, ...)
随机启用teacher_forcing机制
→use_teacher_forcing = random...
Teacher Forcing 模式
→if use_teacher_forcing
分支(用真实标签做输入)非Teacher Forcing 模式
→else
分支(用预测标签做输入)梯度清零→反向传播→梯度更新
→zero_grad()→backward()→step()
输出:平均损失
→return total_loss.item() / y_len
4.3 完整训练循环:train_model
函数
train_model
实现多轮(epoch)、多批次(batch) 的完整训练流程,控制训练的整体节奏。
def train_model():
"""模型训练主流程:数据加载→多轮迭代→模型保存"""
# 1. 创建数据加载器(对应流程图“创建数据加载器”)
my_dataloader = my_datasetloader(pairs) # 加载规范化后的双语对
# 2. 实例化模型(对应流程图“实例化编码器和解码器”)
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = AttentionDecoderLSTM(french_word_n, hidden_size, batch_size, num_layers, MAX_LENGTH)
# 移动模型到 GPU/CPU(加速计算)
encoder = encoder.to(device)
decoder = decoder.to(device)
# 3. 实例化优化器(对应流程图“实例化优化器”)
encoder_adam = optim.Adam(encoder.parameters(), lr=my_lr) # 编码器优化器
decoder_adam = optim.Adam(decoder.parameters(), lr=my_lr) # 解码器优化器
# 4. 实例化损失函数(对应流程图“实例化损失函数”)
loss_def = nn.NLLLoss() # 负对数似然损失(与解码器的 LogSoftmax 配合)
# 5. 定义日志参数(对应流程图“定义日志参数”)
plot_loss_list = [] # 记录损失趋势,用于绘图
print_loss_total = 0.0 # 批次损失累加
plot_loss_total = 0.0 # 绘图用损失累加
# 6. 多轮训练循环(对应流程图“定义第一个for循环,控制训练批次”)
for epoch_idx in range(1, 1 + epochs):
start_time = time.time() # 记录轮次开始时间
# 7. 遍历批次(对应流程图“第二个for循环提取数据集”)
for batch_idx, (x, y) in enumerate(tqdm(my_dataloader, desc=f'Epoch {epoch_idx}')):
# 8. 单批次训练(调用 train_iter,对应流程图“调用迭代训练函数”)
batch_loss = train_iter(x, y, encoder, decoder, encoder_adam, decoder_adam, loss_def)
# 9. 日志记录(对应流程图“打印日志”)
print_loss_total += batch_loss # 累加批次损失
plot_loss_total += batch_loss # 累加绘图损失
# 每 100 批次打印一次平均损失
if (batch_idx + 1) % 100 == 0:
avg_loss = print_loss_total / 100
print_loss_total = 0.0
print(f'轮次:{epoch_idx}, 批次:{batch_idx+1}, 平均损失:{avg_loss:.4f}, '
f'耗时:{time.time()-start_time:.2f}秒')
# 每 10 批次记录一次损失(用于绘图)
if (batch_idx + 1) % 10 == 0:
plot_avg_loss = plot_loss_total / 10
plot_loss_list.append(plot_avg_loss)
plot_loss_total = 0
# 10. 保存模型(对应流程图“保存模型”)
torch.save(encoder.state_dict(), f'encoder_epoch{epoch_idx}.pth')
torch.save(decoder.state_dict(), f'decoder_epoch{epoch_idx}.pth')
# 绘制损失曲线(可视化训练趋势)
plt.figure()
plt.plot(plot_loss_list)
plt.xlabel('Iterations (per 10 batches)')
plt.ylabel('Average Loss')
plt.title('Training Loss Over Time')
plt.savefig('training_loss.png')
plt.show()
流程图对应关系:
创建数据加载器
→my_dataloader = my_datasetloader(pairs)
实例化编码器和解码器
→encoder = EncoderLSTM(...); decoder = AttentionDecoderLSTM(...)
实例化优化器
→encoder_adam = optim.Adam(...); decoder_adam = optim.Adam(...)
实例化损失函数
→loss_def = nn.NLLLoss()
定义日志参数
→plot_loss_list, print_loss_total, plot_loss_total
第一个for循环(控制 epochs)
→for epoch_idx in range(1, 1 + epochs)
第二个for循环(提取批次)
→for batch_idx, (x, y) in enumerate(...)
调用迭代训练函数
→batch_loss = train_iter(...)
打印日志
→print(f'轮次:{epoch_idx}...')
保存模型
→torch.save(...)
4.4 核心策略解析
Teacher Forcing:加速训练的“脚手架”
- 作用:以一定概率(
teacher_forcing_ratio
)让解码器用真实标签而非“预测标签”作为下一时间步输入,避免模型因预测错误陷入“错误累积”,加速收敛。 - 缺陷:测试时无真实标签,可能导致模型“依赖”标签,需通过概率控制平衡训练效率和泛化能力。
- 作用:以一定概率(
梯度更新:参数优化的“核心动力”
- 梯度清零(
zero_grad()
):每次反向传播前清零梯度,避免梯度累积影响参数更新方向。 - 反向传播(
backward()
):计算损失对模型参数的梯度,反映“参数如何调整能减少损失”。 - 参数更新(
step()
):根据梯度调整模型参数,逐步优化翻译能力。
- 梯度清零(
多轮迭代(Epoch):让模型“学透”数据
- 每轮(Epoch)遍历全部训练数据,通过多轮迭代让模型从不同角度学习数据规律,提升泛化能力。
五、模型预测部分代码详解
模型预测是训练流程的“最终验收”,核心是用训练好的模型生成翻译结果,并通过注意力可视化验证模型是否“正确聚焦”输入。结合流程图,以下从 单句翻译生成(seq2seq_evalute
)、批量预测与可视化(model_predict
) 两个维度拆解:
5.1 流程总览
模型预测严格遵循“数据预处理→模型加载→单句翻译→结果可视化”的逻辑,核心是 seq2seq_evalute
实现翻译生成,model_predict
控制批量预测和可视化。
5.2 单句翻译生成:seq2seq_evalute
函数
seq2seq_evalute
实现从英文输入到法文输出的完整翻译流程,包括编码、解码、注意力记录。
def seq2seq_evalute(x, encoder, decoder):
"""使用训练好的模型生成翻译结果,记录注意力权重"""
# 1. 动态调整 batch_size(适配输入数据)
if x.shape[0] != encoder.batch_size:
encoder.batch_size = x.shape[0] # 确保 batch_size 匹配输入
with torch.no_grad(): # 禁用梯度计算(加速推理,节省内存)
# 2. 编码器前向传播:英文输入 → 隐藏状态
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
decoder_hidden = encoder_hidden # 解码器初始状态 = 编码器最终状态
# 3. 初始化变量:存储预测结果和注意力权重
decoded_words = [] # 存储法文预测词索引
decoded_attention_weights = [] # 存储每一步的注意力权重
# 4. 解码循环:逐词生成法文翻译
for idx in range(MAX_LENGTH):
# 初始输入:第一个时间步用 <SOS>(或输入的起始标记)
if idx == 0:
input_y = x[:, idx].reshape(-1, 1) # 取输入的第一个词(通常是 <SOS>)
# 5. 解码器前向传播(带注意力)
decoder_output, decoder_hidden, attention_weight = decoder(
input_y, decoder_hidden, encoder_output
)
# 6. 记录注意力权重(用于可视化)
decoded_attention_weights.append(attention_weight)
# 7. 获取预测结果:取概率最大的词索引
topv, topi = torch.topk(decoder_output, k=1) # 取top1预测
decoded_words.append(topi) # 记录预测词索引
# 8. 提前终止:所有样本预测出 <EOS> 时停止
if (topi == EOS_TOKEN).all():
break
# 9. 下一时间步输入:用当前预测结果
input_y = topi # 预测词 → 下一时间步输入
# 整理结果:转换为张量,便于后续处理
decoded_words = torch.cat(decoded_words, dim=1) # 拼接预测序列
decoded_attention_weights = torch.cat(decoded_attention_weights, dim=0) # 拼接注意力权重
return decoded_words, decoded_attention_weights
流程图对应关系:
输入:英语句子(x), 编码器, 解码器
→ 函数参数设置动态batch_size
→encoder.batch_size = x.shape[0]
开启不计算梯度
→with torch.no_grad()
英语句子输入编码器
→encoder_output, encoder_hidden = encoder(x, ...)
定义for循环,开始解码
→for idx in range(MAX_LENGTH)
索引为0时传递开始标记
→if idx == 0: input_y = x[:, idx]...
输入解码器,获取预测值和注意力权重
→decoder_output, ..., attention_weight = decoder(...)
预测值作为下一时间步输入
→input_y = topi
输出:预测词表, 权重矩阵
→return decoded_words, decoded_attention_weights
5.3 批量预测与可视化:model_predict
函数
model_predict
实现测试数据加载、模型预测、结果可视化的完整流程,验证模型泛化能力。
def model_predict():
"""模型预测主流程:加载数据→生成翻译→可视化注意力"""
# 1. 自定义测试数据集(对应流程图“自定义测试数据集”)
test_samples = [
'i m impressed with your french .\tje suis impressionne par votre francais .\n',
'i m more than a friend .\tje suis plus qu une amie .\n',
'she is beautiful like her mother .\telle est belle comme sa mere .\n'
]
# 2. 规范化数据集(对应流程图“规范化数据集”)
test_pairs = [
[normal_to_string(s) for s in line.strip().split('\t')]
for line in test_samples
]
# 3. 创建数据加载器(对应流程图“创建数据加载器”)
test_dataloader = my_datasetloader(test_pairs) # 加载测试数据
# 4. 加载预训练模型(对应流程图“调用模型”)
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = AttentionDecoderLSTM(french_word_n, hidden_size, batch_size, num_layers, MAX_LENGTH)
# 加载训练好的参数
encoder.load_state_dict(torch.load('encoder.pth'))
decoder.load_state_dict(torch.load('decoder.pth'))
# 移动模型到设备(GPU/CPU)
encoder = encoder.to(device)
decoder = decoder.to(device)
# 5. 批量预测循环(对应流程图“定义for循环提取数据”)
for batch_idx, (x, y) in enumerate(test_dataloader):
# 6. 调用内部评估函数(对应流程图“调用内部评估函数”)
decoded_words, atten_weights = seq2seq_evalute(x, encoder, decoder)
# 7. 打印结果(对应流程图“打印结果”)
for i in range(x.shape[0]): # 遍历批次内每个样本
# 英文输入转换为文本
english_input = ' '.join([
english_index_to_word[int(token)]
for token in x[i] if token != PAD_TOKEN
])
# 法文目标转换为文本
french_target = ' '.join([
french_index_to_word[int(token)]
for token in y[i] if token != PAD_TOKEN
])
# 模型预测转换为文本
french_pred = ' '.join([
french_index_to_word[int(token)]
for token in decoded_words[i] if token != PAD_TOKEN
])
# 打印翻译结果
print(f'英文输入:{english_input}')
print(f'法文目标:{french_target}')
print(f'模型预测:{french_pred}')
print('-' * 50)
# 8. 注意力可视化(热力图)
plt.figure(figsize=(8, 6))
# 注意力权重矩阵维度:[解码步数, batch_size, 编码步数]
# 取当前样本(i)的注意力权重,转置后更直观
attn_map = atten_weights[:, i, :].numpy().T
plt.imshow(attn_map, cmap='hot', interpolation='nearest')
plt.xlabel('法文输出位置')
plt.ylabel('英文输入位置')
plt.title('注意力权重热力图')
plt.colorbar()
plt.savefig(f'attention_heatmap_{batch_idx}_{i}.png')
plt.show()
流程图对应关系:
自定义测试数据集
→test_samples = [...]
规范化数据集
→test_pairs = [normal_to_string(...)...]
创建数据加载器
→test_dataloader = my_datasetloader(...)
调用模型
→encoder.load_state_dict(...); decoder.load_state_dict(...)
定义for循环提取数据
→for batch_idx, (x, y) in enumerate(...)
调用内部评估函数
→decoded_words, atten_weights = seq2seq_evalute(...)
打印结果
→print(f'英文输入:{english_input}')...
输出:预测结果+可视化
→ 控制台打印 + 注意力热力图保存
5.4 核心价值:从“能翻译”到“可解释”
自回归生成:逐词构建翻译
解码器通过“预测词→下一时间步输入”的自回归方式,递归生成完整翻译序列,模拟人类“逐词翻译”的思维。注意力可视化:验证模型“理解”
通过热力图可视化注意力权重,可直观观察:- 解码器生成每个法文词时,关注英文原句的哪个位置(如翻译“français”时,是否关注“french”)。
- 颜色越深,表明模型认为该位置的英文词与当前法文词关联性越强,验证模型是否“正确学习”到翻译对齐关系。
六、运行结果分析
6.1 带注意力机制的模型运行结果
损失曲线:
注意力热力图:
预测结果:
****************************************************************************************************
当前x的句子对是:<SOS> i m more than a friend . <EOS> <EOS> <EOS> <EOS>
当前y的句子对是:<SOS> je suis plus qu une amie . <EOS> <EOS> <EOS> <EOS>
当前预测的句子对是:<SOS> je suis plus qu un amie . <EOS> <EOS> <EOS> <EOS> <EOS>
****************************************************************************************************
当前x的句子对是:<SOS> she is beautiful like her mother . <EOS> <EOS> <EOS> <EOS>
当前y的句子对是:<SOS> elle est belle comme sa mere . <EOS> <EOS> <EOS> <EOS>
当前预测的句子对是:<SOS> elle est belle comme sa mere . <EOS> <EOS> <EOS> <EOS> <EOS>
****************************************************************************************************
当前x的句子对是:<SOS> i m impressed with your french . <EOS> <EOS> <EOS> <EOS>
当前y的句子对是:<SOS> je suis impressionne par votre francais . <EOS> <EOS> <EOS> <EOS>
当前预测的句子对是:<SOS> je suis impressionnee par votre francais . <EOS> <EOS> <EOS> <EOS> <EOS>
****************************************************************************************************
6.2 标准模型的运行结果
损失曲线:
预测结果:
****************************************************************************************************
当前x的句子对是:<SOS> i m impressed with your french . <EOS> <EOS> <EOS> <EOS>
当前y的句子对是:<SOS> je suis impressionne par votre francais . <EOS> <EOS> <EOS> <EOS>
当前预测的句子对是:<SOS> je suis heureux par ton francais . <EOS> <EOS> <EOS> <EOS> <EOS>
****************************************************************************************************
当前x的句子对是:<SOS> she is beautiful like her mother . <EOS> <EOS> <EOS> <EOS>
当前y的句子对是:<SOS> elle est belle comme sa mere . <EOS> <EOS> <EOS> <EOS>
当前预测的句子对是:<SOS> elle est une belle de sa . <EOS> <EOS> <EOS> <EOS> <EOS>
****************************************************************************************************
当前x的句子对是:<SOS> i m more than a friend . <EOS> <EOS> <EOS> <EOS>
当前y的句子对是:<SOS> je suis plus qu une amie . <EOS> <EOS> <EOS> <EOS>
当前预测的句子对是:<SOS> je suis plus qu un ami . <EOS> <EOS> <EOS> <EOS> <EOS>
****************************************************************************************************
6.3 运行结果分析
通过对比带注意力机制的模型与标准模型(无注意力机制)的运行结果,可从损失收敛效率、翻译准确性、注意力聚焦逻辑三个维度清晰看出注意力机制对模型性能的提升作用:
- 损失曲线:注意力机制显著提升收敛效率与稳定性
- 带注意力机制的模型:损失曲线稳定下降并最终收敛在0.5左右,波动较小。这表明模型能更高效地学习输入(英文)与输出(法文)之间的语义关联,通过动态聚焦关键信息减少无效学习,收敛过程更稳定。
- 标准模型:损失曲线下降缓慢,最终稳定在1左右,且迭代过程中波动较大。原因是标准模型仅依赖编码器的最终隐藏状态,无法捕捉长序列中的细节关联,导致学习效率低、收敛难度大,损失始终高于带注意力的模型。
结论:注意力机制通过“精准聚焦”输入序列的关键信息,帮助模型更快找到优化方向,显著提升收敛效率和训练稳定性。
- 翻译准确性:注意力机制大幅提升语义一致性与细节精度
对比两组模型的预测结果,带注意力机制的模型在核心语义保留和细节对齐上优势明显:
输入英文句子 | 带注意力模型的预测结果 | 标准模型的预测结果 | 差异分析 |
---|---|---|---|
“i m impressed with your french .” | “je suis impressionnee par votre francais .” | “je suis heureux par ton francais .” | 带注意力模型准确翻译“impressed”为“impressionnee”(仅性别后缀差异),且“your→votre”人称一致;标准模型误将“impressed”译为“heureux(高兴)”,“your→ton”人称混乱,核心语义丢失。 |
“she is beautiful like her mother .” | “elle est belle comme sa mere .” | “elle est une belle de sa .” | 带注意力模型完全匹配目标翻译,准确捕捉“beautiful→belle”“like→comme”“her mother→sa mere”的对应关系;标准模型语法混乱,丢失“like”的核心语义,且“her mother”翻译错误。 |
“i m more than a friend .” | “je suis plus qu un amie .” | “je suis plus qu un ami .” | 两者均保留核心语义“more than”,但带注意力模型更贴近目标(仅“un→une”性别差异);标准模型“ami(男性朋友)”与目标“amie(女性朋友)”存在性别偏差,细节准确性稍差。 |
结论:注意力机制通过聚焦输入序列中的关键单词(如“impressed”“like”“mother”),实现了更精准的跨语言语义映射,尤其在长句和复杂短语翻译中优势显著。
- 注意力热力图:对角线分布验证“词级对齐”学习能力
带注意力机制的模型生成的热力图呈现明显的对角线分布,这一特征揭示了模型的聚焦逻辑:
- 热力图中颜色较深的区域集中在对角线附近,表明模型在生成法文单词时,会重点关注英文序列中位置对应的单词(如生成“belle”时聚焦“beautiful”,生成“comme”时聚焦“like”)。
- 这种“词级对齐”能力是标准模型不具备的——标准模型因无法关联输入序列的具体位置,只能依赖整体语义模糊推测,导致翻译误差较大。
结论:热力图的对角线分布验证了注意力机制能帮助模型学习“英文单词→法文单词”的精准对应关系,这是其翻译准确性优于标准模型的核心原因。
七、完整代码
数据集下载:百度云链接
注意:数据集需要放在项目根目录下的data
文件夹内,运行程序前务必确保项目根目录有save_model
文件夹
完整代码:
# 导入正则表达式模块,用于文本预处理中的字符匹配、替换操作
import re
# 导入 PyTorch 核心库,用于构建神经网络层、张量运算、自动求导等
import torch
import torch.nn as nn # 包含常用神经网络层(如 Linear、LSTM 等)的定义
import torch.nn.functional as F # 包含常用激活函数、损失函数等功能函数
from torch.utils.data import Dataset, DataLoader # 用于构建自定义数据集和数据加载器
import torch.optim as optim # 提供多种优化算法(如 Adam、SGD 等)
import time # 用于记录时间(如训练耗时)
import random # 用于随机数相关操作(如 teacher forcing 策略的随机选择)
import matplotlib.pyplot as plt # 用于绘制训练损失曲线,可视化训练过程
from tqdm import tqdm # 显示进度条,方便查看数据加载、训练等过程的进度
from tensorflow.keras.preprocessing import sequence # 用于对序列进行填充,使不同长度序列对齐(虽与 PyTorch 混合,这里为便捷实现序列填充)
# 定义文本规范化函数,将输入字符串处理成统一格式,便于后续构建词表和模型输入
def normal_to_string(s):
# 将字符串中所有字母转为小写,并去除首尾空白字符(如空格、换行符等)
s = s.lower().strip()
# 使用正则表达式,将标点符号(.!?)与前后内容用空格分隔,方便后续按词分割
s = re.sub(r"([.!?])", r" \1", s)
# 将非英文字母(a-zA-Z)和非标点符号(.!?)的字符替换为空格,过滤无关符号
s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
return s
# 数据加载与预处理函数,从文件读取数据,构建词表,返回处理后的数据和词表信息
def get_data(path):
# 以utf-8编码打开文件,按行读取内容,lines 存储所有文本行
with open(path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# 打印原始数据集的大小(行数)
print(f'原始数据集大小为:{len(lines)}')
# 打印原始数据集的前 5 行内容,查看数据格式
print(f'原始数据集的前5行数据为:{lines[:5]}')
# 对每行数据按制表符(\t)分割成英文和法文部分,并用 normal_to_string 函数清洗
pairs = [[normal_to_string(s) for s in line.strip().split('\t')] for line in lines]
# 打印处理后数据集的前 5 行,查看清洗效果
print(f'处理后的数据集的前5行数据为:{pairs[:5]}')
# 初始化英文和法文的词到索引的映射字典,包含起始符<SOS>和结束符<EOS>
english_word_to_index = {'<SOS>': SOS_TOKEN, '<EOS>': EOS_TOKEN}
french_word_to_index = {'<SOS>': SOS_TOKEN, '<EOS>': EOS_TOKEN}
english_word_n = 2 # 初始已有 2 个特殊符号,新单词索引从 2 开始
french_word_n = 2
# 遍历所有双语对,填充词表
for pair in pairs:
# 处理英文句子,填充英文词表
for word in pair[0].split(' '):
if word not in english_word_to_index: # 若单词未在词表中
english_word_to_index[word] = english_word_n # 为单词分配索引
english_word_n += 1 # 索引自增,准备下一个新单词
# 处理法文句子,填充法文词表
for word in pair[1].split(' '):
if word not in french_word_to_index: # 若单词未在词表中
french_word_to_index[word] = french_word_n # 为单词分配索引
french_word_n += 1 # 索引自增,准备下一个新单词
# 构建索引到词的映射字典,方便后续将模型输出的索引转回单词,查看翻译结果
english_index_to_word = {v: k for k, v in english_word_to_index.items()}
french_index_to_word = {v: k for k, v in french_word_to_index.items()}
# 返回词表相关信息和处理后的双语对数据
return (english_word_to_index, english_index_to_word, english_word_n,
french_word_to_index, french_index_to_word, french_word_n, pairs)
# 自定义数据集类,继承自 torch.utils.data.Dataset,实现读取、处理单条样本的逻辑
class MyDataset(Dataset):
def __init__(self, pairs):
super(MyDataset, self).__init__() # 调用父类构造函数
self.pairs = pairs # 存储处理后的双语对数据
self.len_pairs = len(pairs) # 记录数据集的样本总数
def __len__(self):
# 返回数据集的长度,DataLoader 会调用该方法获取数据集大小
return self.len_pairs
def __getitem__(self, index):
# 处理索引,防止传入负数或超出范围的索引,确保索引有效
index = min(max(index, -self.len_pairs), self.len_pairs - 1)
x = self.pairs[index][0] # 获取英文句子(源语言)
y = self.pairs[index][1] # 获取法文句子(目标语言)
# 为句子前后添加起始符<SOS>和结束符<EOS>,明确序列的开始和结束
x = '<SOS> ' + x + ' <EOS>'
y = '<SOS> ' + y + ' <EOS>'
# 将句子按空格分割成单词,再转换为对应的词索引序列
x = [[english_word_to_index[word] for word in x.split(' ')]]
y = [[french_word_to_index[word] for word in y.split(' ')]]
# 使用 sequence.pad_sequences 对序列进行填充,使所有样本长度对齐到 MAX_LENGTH
# padding='post' 表示在序列末尾填充,value=1 表示用索引 1 填充(这里 1 对应未在词表中?实际看词表初始化,可能是预留或默认填充值 )
x = sequence.pad_sequences(sequences=x, maxlen=MAX_LENGTH, padding='post', dtype=int, value=1)
y = sequence.pad_sequences(sequences=y, maxlen=MAX_LENGTH, padding='post', dtype=int, value=1)
# 将填充后的序列转换为 PyTorch 张量,指定数据类型为 long(整数类型,适合词索引),并移动到指定设备(CPU/GPU)
tx = torch.tensor(x, dtype=torch.long, device=device)
ty = torch.tensor(y, dtype=torch.long, device=device)
# 返回单条样本的张量,注意去除 batch 维度(因为 DataLoader 会自动封装成批次)
return tx[0], ty[0]
# 构建数据加载器的函数,返回可迭代的 DataLoader,用于分批读取数据进行训练或测试
def my_datasetloader(pairs):
my_dataset = MyDataset(pairs) # 实例化自定义数据集
# 创建 DataLoader,设置批次大小、是否打乱数据;shuffle=True 表示每个 epoch 都会打乱数据顺序
# drop_last=True开启最后一个批次大小不等于batch_size的批次直接丢弃
my_dataloader = DataLoader(my_dataset, batch_size=batch_size, shuffle=True)
return my_dataloader
# 自定义编码器类,基于 LSTM 实现,将输入的源语言序列(英文)编码为隐藏状态
class EncoderLSTM(nn.Module):
def __init__(self, vocab_size, hidden_size, batch_size, num_layers):
super(EncoderLSTM, self).__init__() # 调用父类构造函数,初始化 nn.Module
self.vocab_size = vocab_size # 源语言(英文)词表大小
self.hidden_size = hidden_size # LSTM 隐藏层维度
self.batch_size = batch_size # 批次大小
self.num_layers = num_layers # LSTM 的层数
# 定义 Embedding 层,将词索引转换为词向量,维度为 (vocab_size, hidden_size)
self.embed = nn.Embedding(self.vocab_size, self.hidden_size)
# 定义 LSTM 层,输入维度为 hidden_size(词向量维度),输出维度为 hidden_size,num_layers 层,
# batch_first=True 表示输入和输出的张量维度为 (batch_size, seq_len, hidden_size)
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size,
self.num_layers, batch_first=True)
def forward(self, input_x, hidden):
# 将输入的词索引序列通过 Embedding 层转换为词向量
embed_x = self.embed(input_x)
# 将词向量输入 LSTM 层,得到输出(所有时间步的隐藏状态)和更新后的隐藏状态(hn, cn)
output, hidden = self.lstm(embed_x, hidden)
return output, hidden # 返回 LSTM 输出和隐藏状态
def init_hidden(self):
# 初始化 LSTM 的隐藏状态 h0 和细胞状态 c0,维度为 (num_layers, batch_size, hidden_size),
# 并移动到指定设备(device)上
h0 = torch.zeros(self.num_layers, self.batch_size, self.hidden_size, device=device)
c0 = torch.zeros(self.num_layers, self.batch_size, self.hidden_size, device=device)
return (h0, c0) # 返回初始化的隐藏状态和细胞状态,以元组形式
# 基础解码器类(无注意力机制),基于 LSTM 实现,用于将编码器输出的隐藏状态解码为目标语言序列(法文)
class DecoderLSTM(nn.Module):
def __init__(self, french_vocab_size, hidden_size, batch_size, num_layers):
super(DecoderLSTM, self).__init__() # 调用父类构造函数
self.french_vocab_size = french_vocab_size # 目标语言(法文)词表大小
self.hidden_size = hidden_size # LSTM 隐藏层维度
# 定义 Embedding 层,将目标语言的词索引转换为词向量,维度为 (french_vocab_size, hidden_size)
self.embed = nn.Embedding(self.french_vocab_size, self.hidden_size)
# 定义 LSTM 层,输入维度为 hidden_size(词向量维度),输出维度为 hidden_size,
# batch_first=True 表示输入和输出的张量维度为 (batch_size, seq_len, hidden_size)
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size, batch_first=True)
# 定义输出层,将 LSTM 隐藏状态映射到目标语言词表大小的维度,用于分类预测
self.out = nn.Linear(self.hidden_size, self.french_vocab_size)
# 定义 LogSoftmax 层,用于计算分类概率(常与 NLLLoss 损失函数配合使用)
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input_y, hidden):
# 将输入的目标语言词索引序列通过 Embedding 层转换为词向量
embed_y = self.embed(input_y)
# 对词向量应用 ReLU 激活函数,增加非线性(这里对词向量直接激活不太常见,可根据需求调整)
embed_y = F.relu(embed_y)
# 将激活后的词向量输入 LSTM 层,得到输出和更新后的隐藏状态(hn)
output, hn = self.lstm(embed_y, hidden)
# 将 LSTM 输出通过全连接层映射到目标语言词表大小的维度
output = self.out(output)
output = self.softmax(output[:, 0, :])
return output, hn # 返回输出和隐藏状态
def init_hidden(self):
# 初始化 LSTM 的隐藏状态 h0 和细胞状态 c0,维度为 (num_layers, batch_size, hidden_size)
# 注意:这里未指定设备,若模型在 GPU 上运行,建议加上 device=device
h0 = torch.zeros(self.num_layers, self.batch_size, self.hidden_size)
c0 = torch.zeros(self.num_layers, self.batch_size, self.hidden_size)
return (h0, c0) # 返回初始化的隐藏状态和细胞状态,以元组形式
# 定义带注意力机制的解码器类,通过注意力动态关联编码器输出,提升翻译效果
class AttentionDecoderLSTM(nn.Module):
"""
基于LSTM的注意力机制解码器模块
参数:
french_vocab_size (int): 目标语言(法语)词表大小
hidden_size (int): 隐藏层维度
batch_size (int): 批处理大小
num_layers (int): LSTM层数
max_length (int): 最大序列长度(用于注意力计算)
dropout (float, optional): Dropout概率,默认0.1
属性:
embed (nn.Embedding): 词嵌入层
attn_1 (nn.Linear): 注意力权重计算层
attn_2 (nn.Linear): 注意力融合层
lstm (nn.LSTM): LSTM循环层
out (nn.Linear): 输出分类层
"""
def __init__(self, french_vocab_size, hidden_size, batch_size, num_layers,max_length,
dropout=0.1, ):
super(AttentionDecoderLSTM, self).__init__() # 调用父类构造函数
# 初始化属性,存储解码器的配置参数
self.french_vocab_size = french_vocab_size
self.hidden_size = hidden_size
self.batch_size = batch_size
self.num_layers = num_layers
self.dropout = dropout # Dropout 概率,用于正则化
self.max_length = max_length # 最大序列长度,用于注意力计算
# 定义 Embedding 层,将目标语言的词索引转换为词向量
self.embed = nn.Embedding(self.french_vocab_size, self.hidden_size)
# 定义 Dropout 层,随机置零部分神经元,防止过拟合
self.dropout = nn.Dropout(self.dropout)
# 定义第一个全连接层,用于计算注意力权重:将拼接后的 Q-K 向量映射到 max_length 维度
self.attn_1 = nn.Linear(self.hidden_size * 2, max_length)
# 定义第二个全连接层,用于融合注意力信息:将拼接后的 C-Q 向量映射到 hidden_size 维度
self.attn_2 = nn.Linear(self.hidden_size * 2, self.hidden_size)
# 定义 LSTM 层,输入维度为 hidden_size,输出维度为 hidden_size,batch_first=True
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size,
self.num_layers, batch_first=True)
# 定义输出层,将 LSTM 输出映射到目标语言词表大小的维度
self.out = nn.Linear(self.hidden_size, self.french_vocab_size)
# 定义 LogSoftmax 层,用于计算分类概率
self.softmax = nn.LogSoftmax(dim=-1)
def forward(self, input_y, hidden, encoder_output):
"""
解码器前向传播过程
参数:
input_y (Tensor): 目标语言输入序列,形状 [batch_size, 1]
hidden (Tuple[Tensor, Tensor]): LSTM初始隐藏状态(h_n, c_n)
encoder_output (Tensor): 编码器输出序列,形状 [batch_size, max_length, hidden_size]
返回:
output (Tensor): 预测概率分布,形状 [batch_size, french_vocab_size]
hn (Tuple[Tensor, Tensor]): 更新后的LSTM隐藏状态
a (Tensor): 注意力权重矩阵,形状 [batch_size, 1, max_length]
"""
# 将输入的目标语言词索引序列通过 Embedding 层转换为词向量
embed_y = self.embed(input_y)
# embed_y的形状为 [9, 1, 256]
# 应用 Dropout,随机置零部分词向量元素,防止过拟合,形状不变
embed_y = self.dropout(embed_y)
# 构建 Q(解码器当前时间步的词向量)和 K(解码器的隐藏状态 h_n),并沿最后一维拼接
# hidden[0] 是 LSTM 的隐藏状态
# 构建 Q(解码器当前时间步的词向量)和 K(解码器的隐藏状态 h_n),并沿最后一维拼接
# hidden[0] 是 LSTM 的隐藏状态,形状为 (num_layers, batch_size, hidden_size)
# 转置为 (batch_size, num_layers, hidden_size) 以便与 embed_y 拼接
QK = torch.cat([embed_y, hidden[0].transpose(0, 1)], dim=-1)
# 将拼接后的 QK 向量通过第一个全连接层和 softmax 计算注意力权重
# 权重 a 表示解码器对编码器各位置输出的关注程度
a = self.softmax(self.attn_1(QK))
# 权重a的型状:[batch_size, 1, max_length][9, 1, 12]
# 应用注意力权重,将权重 a 与编码器输出 V(encoder_output)相乘
# 得到上下文向量 C,它是编码器输出的加权和,反映了当前解码步骤应关注的重点
# encoder_output的型状:[batch_size, max_length, hidden_size][9, 12, 256]
C = torch.bmm(a, encoder_output)
# C的形状为 [batch_size, 1, hidden_size][9,1,256]
# 将上下文向量 C 与解码器当前词向量 Q 再次拼接,融合全局和局部信息
CQ = torch.cat([C, embed_y], dim=-1)
# CQ的形状为 [batch_size, 1, hidden_size * 2][9,1,512]
# 将拼接后的 CQ 向量通过第二个全连接层,压缩维度并引入非线性变换
attention_output = self.attn_2(CQ)
# 将注意力输出和上一时间步的隐藏状态送入 LSTM 层
# 得到当前时间步的 LSTM 输出和新的隐藏状态
lstm_output, hn = self.lstm(attention_output, hidden)
# 将 LSTM 输出通过全连接层映射到词表大小的维度
# 并应用 LogSoftmax 计算每个词的对数概率分布
output = self.out(lstm_output)
# output = F.relu(output) # 实验发现,本案例中不使用relu效果更佳
# output形状:[batch_size,1,french_vocab_size][9, 1, 4345]
output = self.softmax(output[:, 0, :])
# output形状:[batch_size,french_vocab_size][9, 4345]
# 返回预测输出、新的隐藏状态和注意力权重(用于可视化)
# hn形状:[num_layers, batch_size, hidden_size][1, 9, 256]
# 权重a的型状:[batch_size, 1, max_length][9, 1, 12]
return output, hn, a
# 测试注意力机制的脚本,验证解码器的注意力计算过程和输出维度
def attention_test():
# 创建数据加载器
my_dataloader = my_datasetloader(pairs)
# 实例化编码器和解码器,并移至指定设备
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = AttentionDecoderLSTM(french_word_n, hidden_size, batch_size, num_layers,MAX_LENGTH)
encoder = encoder.to(device)
decoder = decoder.to(device)
# 从数据加载器获取一批数据
for x, y in my_dataloader:
print(f'y的值为: {y}')
# 将源语言序列输入编码器,获取编码器输出和隐藏状态
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
print(f'encoder_outputs的维度为: {encoder_output.shape}')
print(f'encoder_hidden的维度为: {encoder_hidden[0].shape}')
print(f'encoder_cell的维度为: {encoder_hidden[1].shape}')
# 准备编码器输出的副本,用于注意力计算
# 这里将编码器输出的第一个样本(batch=0)复制到一个新张量中
encoder_output_c = torch.zeros(MAX_LENGTH, encoder.hidden_size, device=device)
print(f'encoder_output_c的初始维度为: {encoder_output_c.shape}')
# 将编码器输出的第一个样本的每个时间步复制到 encoder_output_c
for i in range(encoder_output.shape[1]):
encoder_output_c[i] = encoder_output[0, i]
print(f'encoder_output_c处理后的形状为: {encoder_output_c.shape}')
# 开始解码过程,初始隐藏状态为编码器的最终隐藏状态
hidden = encoder_hidden
# 逐个解码目标序列的每个词
for j in range(y.shape[1]):
# 取当前时间步的目标词作为解码器输入
decoder_input = y[:, j].view(1, -1)
# 调用解码器,获取预测输出、新隐藏状态和注意力权重
decoder_output, hidden, attention_weight = decoder(
decoder_input, hidden, encoder_output_c)
# 打印各输出的维度,用于调试
print(f'decoder_output的维度为: {decoder_output.shape}')
print(f'hidden的维度为: {hidden[0].shape}')
print(f'cell的维度为: {hidden[1].shape}')
print(f'attention_weight的维度为: {attention_weight.shape}')
# 只测试一个样本,跳出循环
break
# 定义训练迭代函数,执行单批次的前向传播和反向传播
def train_iter(x, y, encoder, decoder, encoder_adam, decoder_adam, loss_def):
# 动态改变batch_size,注意,epochs大于1时,结束时要修改,开始时也要修改,故用!=
# if x.shape[0] != encoder.batch_size:
# encoder.batch_size = x.shape[0] # 如果源文本长度小于MAX_LENGTH,则将batch_size设置为源文本长度
# 1. 将源文本输入(英文)送入编码器,获取编码结果
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
# 2. 准备解码器的输入参数
# 解码器的初始隐藏状态为编码器的最终隐藏状态
decoder_hidden = encoder_hidden
# 3. 初始化总损失
total_loss = 0.0
# 4. 决定是否使用 teacher forcing 策略(以一定概率使用真实标签作为下一时间步的输入)
use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
# 5. 根据 teacher forcing 策略执行解码和损失计算
if use_teacher_forcing:
# Teacher Forcing 模式:使用真实标签作为下一时间步的输入
for idx in range(MAX_LENGTH):
# 获取当前时间步的目标词作为输入
# y的形状为[batch_size, MAX_LENGTH][9,12]
input_y = y[:, idx].reshape(-1, 1)
# input_y的形状为[batch_size, 1[9,1]
# 调用解码器,获取预测输出、新隐藏状态和注意力权重
decoder_output, decoder_hidden, attention_weight = decoder(input_y, decoder_hidden, encoder_output)
# 获取下一时间步的真实目标词(用于计算损失)
if idx + 1 < MAX_LENGTH:
target_y = y[:, idx + 1]
# 计算当前时间步的损失并累加到总损失
# decoder_output的形状为[batch_size, french_vocab_size][9,4345]
# target_y的形状为[batch_size, 1[9,1]
total_loss += loss_def(decoder_output, target_y)
# 检查是否遇到结束符(EOS_TOKEN),如果是则提前结束解码
topv, topi = torch.topk(decoder_output, k=1)
y_len = idx + 1
if (topi == EOS_TOKEN).all():
break
else:
# 非 Teacher Forcing 模式:使用模型自身的预测作为下一时间步的输入
for idx in range(MAX_LENGTH):
# 第一个时间步使用真实标签,之后使用预测结果
if idx == 0:
# y的形状为[batch_size, MAX_LENGTH][9,12]
input_y = y[:, idx].reshape(-1, 1)
# input_y的形状为[batch_size, 1[9,1]
# 调用解码器,获取预测输出、新隐藏状态和注意力权重
decoder_output, decoder_hidden, attention_weight = decoder(input_y, decoder_hidden, encoder_output)
# 获取下一时间步的真实目标词(用于计算损失)
if idx + 1 < MAX_LENGTH:
target_y = y[:, idx + 1]
# 计算当前时间步的损失并累加到总损失
# decoder_output的形状为[batch_size, french_vocab_size][9,4345]
# target_y的形状为[batch_size, 1[9,1]
total_loss += loss_def(decoder_output, target_y)
# 获取预测概率最高的词的索引
topv, topi = torch.topk(decoder_output, k=1)
y_len = idx + 1
# 检查是否遇到结束符,如果是则提前结束解码
if (topi == EOS_TOKEN).all():
break
# 将当前预测作为下一时间步的输入
input_y = topi
# 6. 梯度清零,防止梯度累积
encoder_adam.zero_grad()
decoder_adam.zero_grad()
# 7. 反向传播,计算梯度
total_loss.backward()
# 8. 梯度更新(注意:这里重复调用了两次 step(),可能是笔误)
encoder_adam.step()
decoder_adam.step()
# 9. 返回平均损失(总损失除以序列长度)
return total_loss.item() / y_len
# 定义完整的训练函数,控制训练流程和参数更新
def train_model():
# 创建数据加载器
my_dataloader = my_datasetloader(pairs)
# 实例化编码器和解码器,并移至指定设备
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = AttentionDecoderLSTM(french_word_n, hidden_size, batch_size, num_layers,MAX_LENGTH)
encoder = encoder.to(device)
decoder = decoder.to(device)
# 实例化优化器,使用 Adam 优化算法
encoder_adam = torch.optim.Adam(encoder.parameters(), lr=my_lr)
decoder_adam = torch.optim.Adam(decoder.parameters(), lr=my_lr)
# 实例化损失函数,使用负对数似然损失(适合分类问题)
loss_def = nn.NLLLoss()
# 存储训练损失,用于后续绘图
plot_loss_list = []
# 开始训练主循环
for epoch_idx in range(1, 1 + epochs):
# 初始化损失累积变量
print_loss_total, plot_loss_total = 0.0, 0.0
# 记录开始时间
start = time.time()
# 使用 tqdm 显示进度条,遍历数据加载器中的每个批次
for item, (x, y) in enumerate(tqdm(my_dataloader), start=1):
# 调用训练迭代函数,执行前向传播和反向传播,获取当前批次的平均损失
total_loss = train_iter(x, y, encoder, decoder, encoder_adam, decoder_adam, loss_def)
# 累积损失,用于打印和绘图
print_loss_total += total_loss
plot_loss_total += total_loss
# 每 100 个批次打印一次平均损失和训练进度
if item % 100 == 0:
avg_loss = print_loss_total / 100
print_loss_total = 0.0
print(
f'当前训练的轮次为:{epoch_idx}, 平均损失为:{avg_loss:.4f}, 训练耗时:{time.time() - start:.2f}秒'
)
# 每 10 个批次记录一次损失,用于绘制损失曲线
if item % 10 == 0:
plot_avg_loss = plot_loss_total / 10
plot_loss_list.append(plot_avg_loss)
plot_loss_total = 0
# 每个 epoch 保存一次模型参数
torch.save(encoder.state_dict(), f'./save_model/encoder_{epoch_idx}.pth')
torch.save(decoder.state_dict(), f'./save_model/decoder_{epoch_idx}.pth')
# 训练结束后,绘制损失曲线并保存图像
plt.figure()
plt.plot(plot_loss_list)
plt.xlabel('Iterations (per 10 batches)')
plt.ylabel('Average Loss')
plt.title('Training Loss over Time')
plt.savefig('seq2seq_eng2french.png')
plt.show()
# 定义模型内部评估函数
def seq2seq_evalute(x, encoder, decoder):
# 设置动态batch_size
if x.shape[0] != encoder.batch_size:
encoder.batch_size = x.shape[0] # 如果源文本长度小于batch_size,则将batch_size设置为源文本长度
with torch.no_grad():
# 1.得到编码器的输出结果:encoder_output-_>[batch_size, seq_len, hidden_size]-->[1, 6, 256]
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
# 2.准备解码器
decoder_hidden = encoder_hidden
# 3.准备变量
decoded_words = torch.ones(encoder.batch_size, MAX_LENGTH)
decoded_attention_weights = torch.zeros(MAX_LENGTH, encoder.batch_size, MAX_LENGTH)
# 4.开始解码
for idx in range(MAX_LENGTH):
# 第一个时间步使用真实标签,之后使用预测结果
if idx == 0:
# y的形状为[batch_size, MAX_LENGTH][9,12]
input_y = x[:, idx].reshape(-1, 1)
# input_y的形状为[batch_size, 1[9,1]
# 调用解码器,获取预测输出、新隐藏状态和注意力权重
decoder_output, decoder_hidden, attention_weight = decoder(input_y, decoder_hidden, encoder_output)
# 计算当前时间步的损失并累加到总损失
# decoder_output的形状为[batch_size, french_vocab_size][9,4345]
# target_y的形状为[batch_size, 1[9,1]
# 获取预测概率最高的词的索引
topv, topi = torch.topk(decoder_output, k=1)
# attention_weight形状[batch_size,1,MAX_LENGTH][3,1,12]
# decoded_attention_weights形状[12,3,12]
decoded_attention_weights[idx] = attention_weight[:, 0, :]
# 检查是否遇到结束符,如果是则提前结束解码
if (topi == EOS_TOKEN).all():
decoded_words[:, idx] = 1
break
else:
print(decoded_words[:, idx].shape)
# decoded_words形状[3,12]
# topi形状[batch_size,k][3,1]
decoded_words[:, idx] = topi[:, 0]
print(decoded_words)
input_y = topi
return decoded_words, decoded_attention_weights[:idx + 1]
def model_predict():
# 构建自定义测试数据集
my_samplepars = ['i m impressed with your french .\tje suis impressionne par votre francais .\n',
'i m more than a friend .\tje suis plus qu une amie .\n',
'she is beautiful like her mother .\telle est belle comme sa mere .\n']
# 设置batch_size
# 注意:由于测试集与训练集大小不一致,需要重新设置batch_size
# 2.实例化编码器和解码器模型对象
my_pairs = [[normal_to_string(s) for s in line.strip().split('\t')] for line in my_samplepars]
my_predataloader = my_datasetloader(my_pairs)
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = AttentionDecoderLSTM(french_word_n, hidden_size, batch_size, num_layers,MAX_LENGTH)
encoder.load_state_dict(torch.load(encoder_path))
decoder.load_state_dict(torch.load(decoder_path))
encoder = encoder.to(device)
decoder = decoder.to(device)
print(encoder)
print(decoder)
# 一个样本一个样本地去预测
for item, (x, y) in enumerate(my_predataloader):
decoder_words, atten_weights = seq2seq_evalute(x, encoder, decoder)
print(f'当前x的句子对是{x}')
print(f'当前y的句子对是{y}')
print(f'当前预测的句子对是{decoder_words}')
print('*' * 100)
for i in range(decoder_words.shape[0]):
# words = [] # 创建一个空列表,用于存储单词
#
# for xi in x[i]: # 遍历当前样本的每个词索引
# index = int(xi) # 将 tensor 转为整数索引
# word = english_index_to_word[index] # 查词表得到对应的英文单词
# words.append(word) # 把单词加入列表
#
# sentence = " ".join(words) # 用空格拼接成一句话
#
# print(f'当前x的句子对是:{sentence}')
print(f'当前x的句子对是:{" ".join([english_index_to_word[int(xi)] for xi in x[i]])}')
print(f'当前y的句子对是:{" ".join([french_index_to_word[int(yi)] for yi in y[i]])}')
print(
f'当前预测的句子对是:{"<SOS> " + " ".join([french_index_to_word[int(pi)] for pi in decoder_words[i]])}')
print('*' * 100)
# 可视化
plt.matshow(atten_weights[:, i, :].numpy())
plt.savefig(f'./seq2seq_attention_{i}.png')
plt.show()
################################################
## 标准解码器训练和测试代码 ##
################################################
'''
# 定义训练迭代函数,执行单批次的前向传播和反向传播
def train_iter(x, y, encoder, decoder, encoder_adam, decoder_adam, loss_def):
# 动态改变batch_size,注意,epochs大于1时,结束时要修改,开始时也要修改,故用!=
if x.shape[0] != encoder.batch_size:
encoder.batch_size = x.shape[0] # 如果源文本长度小于MAX_LENGTH,则将batch_size设置为源文本长度
# 1. 将源文本输入(英文)送入编码器,获取编码结果
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
# 2. 准备解码器的初始隐藏状态为编码器的最终隐藏状态
decoder_hidden = encoder_hidden
# 3. 初始化总损失
total_loss = 0.0
# 4. 决定是否使用 teacher forcing 策略(以一定概率使用真实标签作为下一时间步的输入)
use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
# 5. 根据 teacher forcing 策略执行解码和损失计算
if use_teacher_forcing:
# Teacher Forcing 模式:使用真实标签作为下一时间步的输入
for idx in range(MAX_LENGTH):
# 获取当前时间步的目标词作为输入
input_y = y[:, idx].reshape(-1, 1) # [batch_size]
# 调用解码器,获取预测输出、新隐藏状态
decoder_output, decoder_hidden = decoder(input_y, decoder_hidden)
# 获取下一时间步的真实目标词(用于计算损失)
if idx + 1 < MAX_LENGTH:
target_y = y[:, idx + 1]
# 计算当前时间步的损失并累加到总损失
total_loss += loss_def(decoder_output, target_y)
# 检查是否遇到结束符(EOS_TOKEN),如果是则提前结束解码
topv, topi = torch.topk(decoder_output, k=1)
y_len = idx + 1
if (topi == EOS_TOKEN).all():
break
else:
# 非 Teacher Forcing 模式:使用模型自身的预测作为下一时间步的输入
for idx in range(MAX_LENGTH):
# 第一个时间步使用真实标签,之后使用预测结果
if idx == 0:
input_y = y[:, idx].reshape(-1, 1)
# 调用解码器,获取预测输出、新隐藏状态
decoder_output, decoder_hidden = decoder(input_y, decoder_hidden)
# 获取下一时间步的真实目标词(用于计算损失)
if idx + 1 < MAX_LENGTH:
target_y = y[:, idx + 1]
# 计算当前时间步的损失并累加到总损失
total_loss += loss_def(decoder_output, target_y)
# 获取预测概率最高的词的索引
topv, topi = torch.topk(decoder_output, k=1)
y_len = idx + 1
# 检查是否遇到结束符,如果是则提前结束解码
if (topi == EOS_TOKEN).all():
break
# 将当前预测作为下一时间步的输入
input_y = topi
# 6. 梯度清零,防止梯度累积
encoder_adam.zero_grad()
decoder_adam.zero_grad()
# 7. 反向传播,计算梯度
total_loss.backward()
# 8. 梯度更新
torch.nn.utils.clip_grad_norm_(encoder.parameters(), 1.0)
torch.nn.utils.clip_grad_norm_(decoder.parameters(), 1.0)
encoder_adam.step()
decoder_adam.step()
# 9. 返回平均损失(总损失除以序列长度)
return total_loss.item() / y_len
# 定义完整的训练函数,控制训练流程和参数更新
def train_model():
# 创建数据加载器
my_dataloader = my_datasetloader(pairs)
# 实例化编码器和解码器,并移至指定设备
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = DecoderLSTM(french_word_n, hidden_size, batch_size, num_layers)
encoder = encoder.to(device)
decoder = decoder.to(device)
# 实例化优化器,使用 Adam 优化算法
encoder_adam = torch.optim.Adam(encoder.parameters(), lr=my_lr)
decoder_adam = torch.optim.Adam(decoder.parameters(), lr=my_lr)
# 实例化损失函数,使用负对数似然损失(适合分类问题)
loss_def = nn.NLLLoss()
# 存储训练损失,用于后续绘图
plot_loss_list = []
# 开始训练主循环
for epoch_idx in range(1, 1 + epochs):
# 初始化损失累积变量
print_loss_total, plot_loss_total = 0.0, 0.0
# 记录开始时间
start = time.time()
# 使用 tqdm 显示进度条,遍历数据加载器中的每个批次
for item, (x, y) in enumerate(tqdm(my_dataloader), start=1):
# 调用训练迭代函数,执行前向传播和反向传播,获取当前批次的平均损失
total_loss = train_iter(x, y, encoder, decoder, encoder_adam, decoder_adam, loss_def)
# 累积损失,用于打印和绘图
print_loss_total += total_loss
plot_loss_total += total_loss
# 每 100 个批次打印一次平均损失和训练进度
if item % 100 == 0:
avg_loss = print_loss_total / 100
print_loss_total = 0.0
print(
f'当前训练的轮次为:{epoch_idx}, 平均损失为:{avg_loss:.4f}, 训练耗时:{time.time() - start:.2f}秒'
)
# 每 10 个批次记录一次损失,用于绘制损失曲线
if item % 10 == 0:
plot_avg_loss = plot_loss_total / 10
plot_loss_list.append(plot_avg_loss)
plot_loss_total = 0
# 每个 epoch 保存一次模型参数
torch.save(encoder.state_dict(), f'./save_model/encoder_{epoch_idx}.pth')
torch.save(decoder.state_dict(), f'./save_model/decoder_{epoch_idx}.pth')
# 训练结束后,绘制损失曲线并保存图像
plt.figure()
plt.plot(plot_loss_list)
plt.xlabel('Iterations (per 10 batches)')
plt.ylabel('Average Loss')
plt.title('Training Loss over Time')
plt.savefig('seq2seq_eng2french.png')
plt.show()
# 定义模型内部评估函数
def seq2seq_evalute(x, encoder, decoder):
# 设置动态batch_size
if x.shape[0] != encoder.batch_size:
encoder.batch_size = x.shape[0] # 如果源文本长度小于MAX_LENGTH,则将batch_size设置为源文本长度
with torch.no_grad():
# 1.得到编码器的输出结果
encoder_output, encoder_hidden = encoder(x, encoder.init_hidden())
# 2.准备解码器
decoder_hidden = encoder_hidden
# 3.准备变量
decoded_words = torch.ones(encoder.batch_size, MAX_LENGTH)
# 4.开始解码
for idx in range(MAX_LENGTH):
# 第一个时间步使用真实标签,之后使用预测结果
if idx == 0:
input_y = x[:, idx].reshape(-1, 1)
# 调用解码器,获取预测输出、新隐藏状态
decoder_output, decoder_hidden = decoder(input_y, decoder_hidden)
# 获取预测概率最高的词的索引
topv, topi = torch.topk(decoder_output, k=1)
# 检查是否遇到结束符,如果是则提前结束解码
if (topi == EOS_TOKEN).all():
decoded_words[:, idx] = EOS_TOKEN
break
else:
decoded_words[:, idx] = topi[:, 0]
input_y = topi
return decoded_words
def model_predict():
# 构建自定义测试数据集
my_samplepars = ['i m impressed with your french .\tje suis impressionne par votre francais .\n',
'i m more than a friend .\tje suis plus qu une amie .\n',
'she is beautiful like her mother .\telle est belle comme sa mere .\n']
# 设置batch_size
# 注意:由于测试集与训练集大小不一致,需要重新设置batch_size
# 2.实例化编码器和解码器模型对象
my_pairs = [[normal_to_string(s) for s in line.strip().split('\t')] for line in my_samplepars]
my_predataloader = my_datasetloader(my_pairs)
encoder = EncoderLSTM(english_word_n, hidden_size, batch_size, num_layers)
decoder = DecoderLSTM(french_word_n, hidden_size, batch_size, num_layers)
encoder.load_state_dict(torch.load(encoder_path))
decoder.load_state_dict(torch.load(decoder_path))
encoder = encoder.to(device)
decoder = decoder.to(device)
print(encoder)
print(decoder)
# 一个样本一个样本地去预测
for item, (x, y) in enumerate(my_predataloader):
decoder_words = seq2seq_evalute(x, encoder, decoder)
print(f'当前x的句子对是{x}')
print(f'当前y的句子对是{y}')
print(f'当前预测的句子对是{decoder_words}')
print('*' * 100)
for i in range(decoder_words.shape[0]):
print(f'当前x的句子对是:{" ".join([english_index_to_word[int(xi)] for xi in x[i]])}')
print(f'当前y的句子对是:{" ".join([french_index_to_word[int(yi)] for yi in y[i]])}')
print(f'当前预测的句子对是:{"<SOS> " + " ".join([french_index_to_word[int(pi)] for pi in decoder_words[i]])}')
print('*' * 100)
'''
if __name__ == '__main__':
########################################
## 设定参数 ##
########################################
# 选择设备(GPU 或 CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'当前设备运行在:{device}')
# 定义特殊标记:开始符号和结束符号的索引
SOS_TOKEN = 0
EOS_TOKEN = 1
# 定义最大句子长度,超过此长度的句子将被截断,不足的将被填充
MAX_LENGTH = 12
data_path = 'data/eng-fra-v2.txt'
# 设定超参数
# 加载数据并构建词表
(english_word_to_index, english_index_to_word, english_word_n,
french_word_to_index, french_index_to_word, french_word_n, pairs) = get_data(data_path)
# 源语言(英文)和目标语言(法文)的词表大小
# LSTM 隐藏层维度
hidden_size = 256
# 最后一个批次可能会因为批次大小不等于8而导致hidden_size的维度不一致,可以动态改变hidden_size,
# 或在DataLoader开启drop_last=True最后一个批次大小不等于batch_size的批次直接丢弃
# 或在令batch_size 能够整除数据集
batch_size = 9 # 设置能整除的batch_size
# LSTM 层数
num_layers = 1
# 学习率
my_lr = 1e-4
# 训练轮数
epochs = 4
# Teacher Forcing 比例:控制在训练时使用真实标签作为输入的概率
teacher_forcing_ratio = 0.5
# 定义原始文本数据路径
encoder_path = f'./save_model/encoder_{epochs}.pth'
decoder_path = f'./save_model/decoder_{epochs}.pth'
# 测试文本规范化函数
print(normal_to_string('Hello你好 World世界!'))
# 执行模型训练
train_model()
model_predict()