总体架构
输入部分
代码实现:
导包
# -*-coding:utf-8-*-
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
# -*-coding:utf-8-*-
import copy
import torch.nn.functional as F
import math
位置编码器部分
词嵌入WordEmbedding
# todo 作用:输入数据进行词嵌入升维处理
class Embeddings(nn.Module):
def __init__(self, vocab_size, embed_dim):
super().__init__()
# vocab_size:代表单词的总个数
self.vocab_size = vocab_size
# embed_dim:代表词嵌入维度
self.embed_dim = embed_dim
# 定义Embedding层
self.embed = nn.Embedding(vocab_size, embed_dim)
def forward(self, x):
# x--》[batch_size, seq_len]
return self.embed(x) * math.sqrt(self.embed_dim)
位置编码模型PositionEncoding
# todo 作用:生成位置编码矩阵,与输入数据x进行融合,并输出-->加入了位置编码信息的词嵌入张量
class PositionEncoding(nn.Module):
def __init__(self, d_model, dropout_p, max_len=60):
super().__init__()
# d_model:代表词嵌入维度
self.d_model = d_model
# dropout_p:代表随机失活的系数
self.dropout_p = dropout_p
# max_len:代表最大句子长度
self.max_len = max_len
# 定义dropout层
self.dropout = nn.Dropout(p=dropout_p)
# 根据三角函数的公式实现位置的编码
# 定义位置编码矩阵[max_len, d_model]-->[60, 512]
pe = torch.zeros(max_len, d_model)
# 定义位置列矩阵--》[max_len, 1]-->[60, 1]
position = torch.arange(0, max_len).unsqueeze(dim=1)
# 定义转换矩阵:根据三角函数的计算公式,是其中的除了pos之外的系数(频率)
# temp_vec-->[256]
temp_vec = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000)/d_model))
# 根据三角函数的计算公式,计算角度:pos_vec-->[60, 256]
pos_vec = position * temp_vec
# 将奇数位用sin处理,偶数位用cos处理
pe[:, 0::2] = torch.sin(pos_vec)
pe[:, 1::2] = torch.cos(pos_vec)
# 需要对上述的位置编码结果升维:pe-->[1, max_len, d_model]-->[1, 60, 512]
#todo pe就是位置编码矩阵 似乎每次结果一样
pe = pe.unsqueeze(dim=0)
# pe位置编码结果不随着模型的训练而更新,因此需要进行注册到缓存区
self.register_buffer('pe', pe)
def forward(self, x):
# x--》来自于embedding之后的结果--》[batch_size, seq_len, embed_dim]-->[2, 4, 512]
# 将x和位置编码的信息进行融合
# todo x.size()[1]是指句子有多长pe就取出多长与x相加,pe的形状为[1, max_len, d_model]-->[1, 4, 512]
x = x + self.pe[:, :x.size()[1]]
return self.dropout(x)
编码器部分
掩码矩阵部分:生成掩码矩阵
# 生成一个下三角矩阵(sentence_mask)
def generate_triu(size):
# a = np.triu(m=np.ones((1, size, size)), k=1).astype(int)
# return torch.from_numpy(1-a)
return 1-torch.triu(torch.ones(1, size, size, dtype=torch.int), 1)
# 生成掩码矩阵(padding_mask)
def generate_padding_mask(tensor_x):
# tensor_x-->注意力权重分数--》张量
tensor_x[tensor_x == 0] = 0
tensor_x[tensor_x != 0] = 1
return tensor_x.to(dtype=torch.int)
# 绘图:生成下三角矩阵
def show__triu():
plt.figure(figsize=(5, 5))
plt.imshow(generate_triu(20)[0])
plt.show()
attention:基础注意力计算方式,muti_head_atten将调用次模组
def attention(query, key, value, mask=None, dropout=None):
# query/key/value-->[batch_size, seq_len, embed_dim]
# mask-->shape-->[batch_size, seq_len, seq_len]
# dropout--》实例化的对象
# 第一步:获得词嵌入表达的维度
d_k = query.size(-1)
# 第二步:计算query和key之间的相似性分数(注意力权重分数(未经过softmax归一化的结果))
# query-->[2, 4, 512];key-->[2, 4, 512]-->转置--》[2, 512,4]. 相乘后--》scores-->[2, 4, 4]
scores = torch.matmul(query, torch.transpose(key, -1, -2)) / math.sqrt(d_k)
# 第三步:判断是否需要mask
if mask is not None:
scores = scores.masked_fill(mask==0, -1e9)
# print(f'未归一化的scores--》{scores}')
# 第四步:进行softmax归一化
atten_weights = F.softmax(scores, dim=-1)
# print(f'atten_weights--》{atten_weights}')
# 第五步:如果有dropout 就进行随机失活防止过拟合
if dropout is not None:
atten_weights = dropout(atten_weights)
return torch.matmul(atten_weights, value), atten_weights # todo 返回注意力输出,以及注意力权重
多头注意力类与clones
多头注意力机制原理(核心)
1. 输入与线性变换
输入序列(如词向量)通过三个独立的线性变换层生成查询(Query, Q)、键(Key, K)和值(Value, V)矩阵:
- 自注意力机制:输入为同一矩阵 X,通过不同权重矩阵 WQh,WKh,WVh 生成Q、K、V。
- 交叉注意力机制:输入为两个不同矩阵(如 Xq 和 Xkv),分别生成Q和K、V
2. 分头处理与并行计算
- 分头:将Q、K、V按头的数量 h 拆分为多个子矩阵,每个子矩阵对应一个注意力头。例如,将 Q 拆分为 [Q1,Q2,...,Qh],每个 Qi 的维度为 dk 。
- 并行计算:每个头独立计算缩放点积注意力(Scaled Dot-Product Attention):
3. 多头输出的拼接与融合
- 拼接:将所有头的输出 head1,head2,...,headh 沿特征维度拼接,形成组合输出。
- 线性变换:通过权重矩阵 WO 将拼接后的结果映射回原始维度:
原论文中是先把qkv从[2,6,512]变成[2,6,8,64]后各经过8个权重矩阵总共24个权重矩阵得到变换后的qkv再进行注意力计算再concat拼接起来
这里的代码实现是qkv[[2,6,512]]各经过1个矩阵总共3个矩阵得到[2,6,512]再变成[2,6,8,64]进行注意力计算再concat拼接起来
编码器实例化一个多头注意力类且无masked
# clones 的作用:将一个模块复制N次,并返回一个ModuleList,ModuleList是一个Module的子类,可以迭代,并且可以保存多个Module
def clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
# todo:2. 定义多头注意力类,注意 编码器层的q=k=v=原句子 解码器层的mask_muti_head输入的q=k=v=预测的句子 解码层的第二个muti_head的q=k=编码器输出,v=mask_muti_head输出
class MutiHeadAttention(nn.Module):
def __init__(self, head, embed_dim, dropout_p=0.1):
super().__init__()
# 第一步:确定embed_dim是否能被head整除
assert embed_dim % head == 0
# 第二步:确定每个head应该处理多少维度特征
self.d_k = embed_dim // head
# 第三步:定义head的属性
self.head = head
# 第四步:定义4个全连接层
self.linears = clones(nn.Linear(embed_dim, embed_dim), 4)
# 第五步:定义atten权重属性
self.atten = None
# 第六步:实例化dropout对象
self.dropout = nn.Dropout(p=dropout_p)
def forward(self, query, key, value, mask=None):
# 需要对mask的形状进行升维度
# mask-->输入的形状--》[head, seq_len, seq_len]-->[8, 4, 4],升维之后--》[1, 8, 4, 4]
if mask is not None:
mask = mask.unsqueeze(dim=0)
# 获取当前输入的batch_size
batch_size = query.size(0)
# 开始处理query,key,value,都要经过线性变化并且切分为8个头
# model(x)-->就是将数据经过linear层处理x-->[2, 4, 512]-->经过Linear-->[2, 4, 512]-->分割--》[2, 4, 8, 64]-->transpose-->[2, 8, 4, 64]
# query,key,value--》shape-->[2, 8, 4, 64]
query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)
for model, x in zip(self.linears, (query, key, value))]
# 接下来将上述处理后的query,key,value--》shape-->[2, 8, 4, 64]送入attention方法进行注意力的计算:
# query--》[2, 8, 4, 64]和key--》[2, 8, 4, 64]转置结果[2, 8, 64, 4]进行相乘--》shape--》[2,8, 4, 4](所以传的mask矩阵是4维的)
# [2, 8, 4, 4]要和value-->[2, 8, 4, 64]-->相乘--》shape--》x-->[2, 8, 4, 64]
x, self.atten = attention(query, key, value, mask=mask, dropout=self.dropout)
# 需要将多头注意力的结果进行合并
# x.transpose(1, 2)-->【2,4, 8, 64】
# y 合并后的结果-->[2, 4, 512]
y = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head*self.d_k)
# 经过线性变化得到指定输出维度的结果
return self.linears[-1](y)
前馈全连接层
两层线性层 作用:进行特征提取,进行非线性映射,简单来说就是经过两个线性层一个relu激活函数加入非线性,再dropout随机失活防止过拟合
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout_p=0.1):
super().__init__()
# d_model:第一个全连接层输入的特征维度;第二个全连接层输出的特征维度
self.d_model = d_model
# d_ff: 第一个全连接层输出的特征维度;第二个全连接层输入的特征维度
self.d_ff = d_ff
# 定义第一个全连接层
self.linear1 = nn.Linear(d_model, d_ff)
# 定义第二个全连接层
self.linear2 = nn.Linear(d_ff, d_model)
# 定义dropout层
self.dropout = nn.Dropout(p=dropout_p)
def forward(self, x):
return self.linear2(self.dropout(F.relu(self.linear1(x))))
规范化层
让数据符合标准正态分布 作用机制:self.a * (x - x_mean) / (x_std + self.eps) + self.b, eps:防止分母为0
Add中是把原始经过embedding+position后得到的x与经过(多头注意力层或者前馈全连接层)再进行规范化之后的结果进行相加得到残差链接
残差链接的作用:通过跨层连接(如恒等映射),梯度可直接通过“捷径”回传,避免因多层非线性变换导致的信号衰减或放大
class LayerNorm(nn.Module):
def __init__(self, features, eps=1e-6):
super().__init__()
# 定义属性
self.features = features # 代表词嵌入维度
# eps
self.eps = eps
# 定义一个模型的参数(系数)
self.a = nn.Parameter(torch.ones(features))
self.b = nn.Parameter(torch.zeros(features))
def forward(self, x):
# x--->[2, 4, 512]
# 1.求出均值:x_mean-->[2, 4, 1]
x_mean = torch.mean(x, dim=-1, keepdim=True)
# 2.求出标准差
x_std = torch.std(x, dim=-1, keepdim=True)
return self.a * (x - x_mean) / (x_std + self.eps) + self.b
子层链接结构
定义子层连接结构 把norm&add这一层与feedforward层或者muti_head_atten层进行连接,取决与输入的sublayer是什么层
class SublayerConnection(nn.Module):
def __init__(self, size, dropout_p=0.1):
super().__init__()
# 定义size属性:词嵌入的维度大小
self.size = size
# 实例化规范化层
self.layer_norm = LayerNorm(features=size)
# 实例化dropout层
self.dropout = nn.Dropout(p=dropout_p)
def forward(self, x, sublayer):
# x--》来自于输入部分:positionEncoding+WordEmbedding;[batch_size, seq_len, embed_dim]-->[2, 4, 512]
# sublayer-->代表函数的对象:可以是处理多头自注意力机制函数的对象,也可以是前馈全连接层对象
# post_norm
x1 = x + self.dropout(self.layer_norm(sublayer(x)))
# pre_norm
# x1 = x + self.dropout(sublayer(self.layer_norm(x)))
return x1
编码器层
定义编码器层 #超级拼装:先试用子层链接拼成(norm&add+feedforward)层与(muti_head_atten+norm&add)层 把这两个子层拼起来就是编码器结构
输入的是输入部分:positionEncoding+WordEmbedding;[batch_size, seq_len, embed_dim]-->[2, 4, 512],输出的是编码器的结果-->送给解码器当k和v使用
class EncoderLayer(nn.Module):
def __init__(self, size, self_atten, feed_forward, dropout_p):
super().__init__()
# size:代表词嵌入的维度
self.size = size
# self_atten:代表多头自注意力机制的对象
self.self_atten = self_atten
# feed_forward:代表前馈全连接层的对象
self.feed_forward = feed_forward
# 定义两层子层连接结构
self.sub_layers = clones(SublayerConnection(size, dropout_p), 2)
def forward(self, x, mask):
# x-->来自输入部分--》[batch_size, seq_len, embed_dim]:[2, 4, 512]
# mask-->[head, seq_len, seq_len]-=-->[8, 4, 4]
# 经过第一个子层连接结构:先经过多头自注意力层--》然后经过norm-->最后残差连接
x1 = self.sub_layers[0](x, lambda x: self.self_atten(x, x, x, mask))
# 经过第二个子层连接结构:先经过前馈全连接层--》然后经过norm-->最后残差连接
x2 = self.sub_layers[1](x1, self.feed_forward)
return x2
编码器
定义编码器 超级拼装2.0 n个编码器层构成一个编码器,按照这里的代码,多个编码器层是串联执行,上一个编码器的输出作为下一个编码器的输入,最终输出编码器的结果给解码器当v使用
class Encoder(nn.Module):
def __init__(self, layer, N):
super().__init__()
# layer:代表编码器层
self.layer = layer
# N:代表有几个编码器层
# 定义N个编码器层
self.layers = clones(layer, N)
# 实例化规范化层
self.norm = LayerNorm(features=layer.size)
def forward(self, x, mask):
# x-->来自输入部分--》[batch_size, seq_len, embed_dim]:[2, 4, 512]
# mask-->[head, seq_len, seq_len]-=-->[8, 4, 4]
# for循环迭代N个编码器层得到最终的结果
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
解码器部分
解码器层
依旧超级拼装:先试用子层链接拼成(norm&add+feedforward)层,(muti_head_atten+norm&add)层,(mask_muti_head_atten+norm&add)层 ,然后把这三个子层拼起来就是解码器结构
class DecoderLayer(nn.Module):
def __init__(self, size, self_atten, src_atten, feed_forward, dropout_p):
super().__init__()
# size:代表词嵌入维度的大小
self.size = size
# self_atten:自注意力机制的对象:Q=K=V
self.self_atten = self_atten
# src_atten:一般注意力机制的对象:Q!=K=V
self.src_atten = src_atten
# feed_forward:前馈全连接层对象
self.feed_forward = feed_forward
# 定义三个子层连接结构
self.sub_layers = clones(SublayerConnection(size, dropout_p), 3)
def forward(self, y, encoder_output, source_mask, target_mask):
# y:代表解码器的输入--》[batch_size, seq_len, embed_dim]
# encoder_output:代表编码器的输出结果--》[batch_size, seq_len, emebed_dim]
# target_mask防止未来信息被提前看到/target_mask-->[head, y_seq_len, y_seq_len]
# source_mask消除padding的影响# source_mask--shape-->[head, y_seq_len, x_seq_len]
# 经过第一个子层连接结构 todo 看图写作:第一个子层连接结构是带mask掩码滴,输入是q=k=v==预测值y的positionEncoding+WordEmbedding输出,
y1 = self.sub_layers[0](y, lambda x: self.self_atten(x, x, x, target_mask))
# 经过第二个子层连接结构 todo 第二个子层链接是不带mask掩码,输入的k=v==(源文本嵌入+位置编码)再经过编码器的输出,v是第一个子层结构的输出
# query--》[2,6,512]-->[2, 8, 6, 64],key/value-->[2, 4, 512]-->[2, 8, 4, 64]
# [2, 8, 6, 64]--和[2, 8, 4, 64]转置[2,8, 64, 4]-->[2, 8, 6, 4]
y2 = self.sub_layers[1](y1, lambda x: self.src_atten(x, encoder_output, encoder_output, source_mask))
# 经过第三个子层连接结构 todo 这一层就是feed+norm&add 输入什么维度输出就是什么维度
y3 = self.sub_layers[2](y2, self.feed_forward)
return y3
解码器
拼拼拼:n个解码器层构成一个解码器,这里的n==6 按照这里的代码,多个解码器层是串联执行,上一个解码器的输出作为下一个解码器的输入,最终输出解码器的结果给输出层
class Decoder(nn.Module):
def __init__(self, layer, N):
super().__init__()
# layer:代表解码器层
self.layer = layer
# N:代表有几个解码器层
# 定义N个解码层
self.layers = clones(layer, N)
# 实例化规范化层
self.norm = LayerNorm(features=layer.size)
def forward(self, y, encoder_output, source_mask, target_mask):
# y:代表解码器的输入--》[batch_size, seq_len, embed_dim]
# encoder_output:代表编码器的输出结果--》[batch_size, seq_len, emebed_dim]
# target_mask防止未来信息被提前看到/target_mask-->[head, y_seq_len, y_seq_len]
# source_mask消除padding的影响# source_mask--shape-->[head, y_seq_len, x_seq_len]
# for循环迭代N个编码器层得到最终的结果
for layer in self.layers:
y = layer(y, encoder_output, source_mask, target_mask)
return self.norm(y)
输出部分
生成器generator
输出部分:将解码器输出经过一个线性层,再经过softmax,得到当前预测的结果
输出[batch_size,seq_len,vocab_size] vocab_size是词表词个数,概率最大的为预测结果
class Generator(nn.Module):
def __init__(self, d_model, vocab_size):
# 参数d_model 线性层输入特征尺寸大小
# 参数vocab_size 线层输出尺寸大小
super(Generator, self).__init__()
# 定义线性层
self.project = nn.Linear(d_model, vocab_size)
def forward(self, x):
# 数据经过线性层 最后一个维度归一化 log方式
x = F.log_softmax(self.project(x), dim=-1)
return x
使用部分
模拟使用,仅预测一个单词
源文本输入:也就是要翻译的文本,两个句子4个单词
x = torch.tensor([[1, 40, 28, 100], [45, 89, 39, 10]])
上一步预测值输入:如果为头单词,则为sos_token的词向量表达
y0 = torch.tensor([[2, 4, 10, 29, 67, 89],[34, 56, 78, 20, 19, 6]])
词表大小:要翻译的语言总共有多少个单词
vocab_size = 1000
词向量维度
embed_dim = 512
总体流程概述:
编码器部分:源文本输入x经过embedding后与positionnal_encoding相加结果输入encoder()
在每一个编码器层经过两个子层:
1.多头自注意力子层+(残差链接+规范化)
在编码器的多头注意力层中q=k=v
- 前馈全连接层+(残差链接+规范化)
前馈全连接层的作用: 通过增加两层网络来增强模型的能力.
输出x1
经过n个编码器层后输出xn给解码器
解码器部分:前一步的实际值或预测值 y0 = torch.tensor([[2, 4, 10, 29, 67, 89],[34, 56, 78, 20, 19, 6]])经过embedding后与positionnal_encoding相加输入结果decoder()
在每一个解码器层经过三个子层:
1.带掩码的多头自注意力子层+(残差链接+规范化)
在此层q=k=v mask的作用是防止未来信息被提前看见
2.多头注意力子层+(残差链接+规范化)
在此层k=v = 编码器的输出xn , q=上一个子层的输入
- 前馈全连接子层+(残差链接+规范化)
前馈全连接层的作用: 通过增加两层网络来增强模型的能力.
经过n个解码器层后输出yn给输出部分
输出部分:经过一个线性层和一个softmax层
线性层:通过对上一步的线性变化得到指定维度的输出, 也就是转换维度的作用.
softmax层:使最后一维的向量中的数字缩放到0-1的概率值域内, 并满足他们的和为1.
输出预测值result[batch_size,seq_len,vocab_size],batch_size句seq_len个单词,其中概率最大的值为预测结果
def usb_position(): #todo 生成位置编码
vocab_size = 1000 # 定义词汇大小
embed_dim = 512 # 词嵌入维度
my_embed = Embeddings(vocab_size, embed_dim)
x = torch.tensor([[1, 40, 28, 100], [45, 89, 39, 10]])
embed_result = my_embed(x)
my_position = PositionEncoding(d_model=512, dropout_p=0.1)
position_result = my_position(embed_result)
print(f'position_result-->{position_result.shape}')
# print(f'position_result-->{position_result}')
return position_result
def use_encoder(): # todo 输入编码器输入(待翻译的句子)以及位置编码,得到编码器输出
# 获取编码器输入部分:[2, 4, 512]
position_result = usb_position()
# 实例化多头注意力机制对象
mutiHead_atten = MutiHeadAttention(head=8, embed_dim=512)
# 实例化前馈全连接层对象
ff = FeedForward(d_model=512, d_ff=1024)
mask = torch.zeros(8, 4, 4)
# 实例化编码器层对象
encoder_layer = EncoderLayer(size=512, self_atten=mutiHead_atten, feed_forward=ff, dropout_p=0.1)
# 实例化编码器对象
encoder = Encoder(layer=encoder_layer, N=6)
# 将数据送入编码器 todo position_result先
encoder_output = encoder(position_result, mask)
print(f'encoder_output编码器得到的结果--》{encoder_output}')
print(f'encoder_output编码器得到的结果--》{encoder_output.shape}')
# todo 编码器输出
return encoder_output
def use_decoder(): #todo 输入编码器输出,得到解码器输出
# 定义解码器端的输入 todo 编码器的输入是对应原始输入序列(如待翻译的源语言句子),解码器输入的是解码器的输入是右移(shifted right)的目标序列(如已生成的部分目标语言句子)。
# todo 解码器的输出是解码器逐步生成的目标序列(如翻译结果),每次预测一个词。
y0 = torch.tensor([[2, 4, 10, 29, 67, 89],
[34, 56, 78, 20, 19, 6]])
vocab_size = 1000
embed_dim = 512
# 实例化Embedding层
embed = Embeddings(vocab_size, embed_dim)
# embed_y-->[2, 6, 512]
embed_y = embed(y0)
# # 实例化PositionEncoding层
position_encode = PositionEncoding(d_model=512, dropout_p=0.1)
# todo position_y 是预测的目标序列,位置编码对预测结果进行编码,从而提高预测效果。若为首字母则对应sos_token
position_y = position_encode(embed_y)
# 实例化多头注意力机制的对象
muti_head_atten = MutiHeadAttention(head=8, embed_dim=512)
self_atten = copy.deepcopy(muti_head_atten)
src_atten = copy.deepcopy(muti_head_atten)
# 实例化前馈全连接的对象
ff = FeedForward(d_model=512, d_ff=1024)
# 实例化解码器层的对象
decoder_layer = DecoderLayer(size=512, self_atten=self_atten, src_atten=src_atten, feed_forward=ff, dropout_p=0.1)
# 准备数据
# todo encoder_output是编码器输出结果,对应源语言句子
encoder_output = use_encoder()
source_mask = torch.zeros(8, 6, 4)
target_mask = torch.zeros(8, 6, 6)
# 实例化解码器的对象
decoder = Decoder(layer=decoder_layer, N=6)
result = decoder(position_y, encoder_output, source_mask, target_mask)
# print(f'解码器得到的结果--》{result}')
# print(f'解码器得到的结果--》{result.shape}')
return result
def use_generator():
x=use_decoder()
my_generator = Generator
result = my_generator(512, 1000)(x)
print(f'生成器得到结果--》{result}')
print(f'生成器得到结果--》{result.shape}')
return result
if __name__ == '__main__':
# use_decoder()
use_generator()