LLM手撕

发布于:2024-09-05 ⋅ 阅读:(52) ⋅ 点赞:(0)

LayerNorm

import torch
from torch import nn

class LayerNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-6):
        super().__init__()
        self.hidden_size = hidden_size  # 隐藏状态的大小
        self.eps = eps  # 用于数值稳定性的一个小值
        
        # 初始化可学习的缩放和平移参数
        self.gamma = nn.Parameter(torch.ones(hidden_size))  # 缩放参数,初始值为全1
        self.beta = nn.Parameter(torch.zeros(hidden_size))  # 平移参数,初始值为全0
        
    def forward(self, x):
        # x 形状: (batch_size, seq_len, hidden_size)
        
        # 计算每个样本的均值和方差
        mean = x.mean(dim=-1, keepdim=True)  # 计算最后一个维度的均值,形状: (batch_size, seq_len, 1)
        variance = x.var(dim=-1, keepdim=True, unbiased=False)  # 计算最后一个维度的方差,形状: (batch_size, seq_len, 1)
        
        # 进行归一化
        x_normalized = (x - mean) / torch.sqrt(variance + self.eps)  # 归一化,形状: (batch_size, seq_len, hidden_size)
        
        # 应用缩放和平移参数
        output = self.gamma * x_normalized + self.beta  # 形状: (batch_size, seq_len, hidden_size)
        
        return output

def test_layer_norm():
    batch_size = 2
    seq_len = 4
    hidden_size = 8
    
    # 随机生成输入数据
    x = torch.randn(batch_size, seq_len, hidden_size)  # (batch_size, seq_len, hidden_size)
    
    # 创建 LayerNorm 模块
    layer_norm = LayerNorm(hidden_size)
    
    # 计算 LayerNorm 输出
    output = layer_norm(x)
    
    print("Input shape:", x.shape)
    print("Output shape:", output.shape)
    
if __name__ == "__main__":
	test_layer_norm()

BatchNorm

import torch
from torch import nn

class BatchNorm(nn.Module):
    def __init__(self, hidden_size, eps=1e-5, momentum=0.1):
        super().__init__()
        self.hidden_size = hidden_size  # 隐藏状态的大小
        self.eps = eps  # 用于数值稳定性的一个小值
        self.momentum = momentum  # 用于计算运行时均值和方差的动量
        
        # 初始化可学习的缩放和平移参数
        self.gamma = nn.Parameter(torch.ones(hidden_size))  # 缩放参数,初始值为全1
        self.beta = nn.Parameter(torch.zeros(hidden_size))  # 平移参数,初始值为全0
        
        # 初始化运行时均值和方差
        self.running_mean = torch.zeros(hidden_size)  # 运行时均值,初始值为全0
        self.running_var = torch.ones(hidden_size)  # 运行时方差,初始值为全1

    def forward(self, x):
        # x 形状: (batch_size, seq_len, hidden_size)
        if self.training:
            # 计算当前批次的均值和方差
            batch_mean = x.mean(dim=(0, 1), keepdim=False)  # 计算前两个维度的均值,形状: (hidden_size)
            batch_var = x.var(dim=(0, 1), keepdim=False, unbiased=False)  # 计算前两个维度的方差,形状: (hidden_size)

            # 更新运行时均值和方差
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var
            
            mean = batch_mean
            variance = batch_var
        else:
            # 使用运行时均值和方差
            mean = self.running_mean
            variance = self.running_var
        
        # 进行归一化
        x_normalized = (x - mean) / torch.sqrt(variance + self.eps)  # 归一化,形状: (batch_size, seq_len, hidden_size)
        
        # 应用缩放和平移参数
        output = self.gamma * x_normalized + self.beta  # 形状: (batch_size, seq_len, hidden_size)
        
        return output

def test_batch_norm():
    batch_size = 2
    seq_len = 4
    hidden_size = 8
    
    # 随机生成输入数据
    x = torch.randn(batch_size, seq_len, hidden_size)  # (batch_size, seq_len, hidden_size)
    
    # 创建 BatchNorm 模块
    batch_norm = BatchNorm(hidden_size)
    
    # 计算 BatchNorm 输出
    output = batch_norm(x)
    
    print("Input shape:", x.shape)
    print("Output shape:", output.shape)
    
if __name__ == "__main__":
	test_batch_norm()

Dropout

import torch
from torch import nn

class Dropout(nn.Module):
    def __init__(self, dropout_prob=0.1):
        super().__init__()
        self.dropout_prob = dropout_prob  # Dropout 的概率
    
    def forward(self, x):
        if self.training:
            # 生成与输入形状相同的掩码,元素为 0 或 1,按照 dropout_prob 的概率为 0
            mask = (torch.rand(x.shape) > self.dropout_prob).float()  # 掩码,形状与 x 相同
            # 归一化掩码,使得训练阶段和推理阶段的一致性
            output = mask * x / (1.0 - self.dropout_prob)  # 形状与 x 相同
        else:
            output = x  # 推理阶段,不进行 Dropout
        
        return output

def test_dropout():
    batch_size = 2
    seq_len = 4
    hidden_size = 8
    
    # 随机生成输入数据
    x = torch.randn(batch_size, seq_len, hidden_size)  # (batch_size, seq_len, hidden_size)
    
    # 创建 Dropout 模块
    dropout = Dropout(dropout_prob=0.1)
    
    # 设置为训练模式
    dropout.train()
    output_train = dropout(x)
    
    # 设置为推理模式
    dropout.eval()
    output_eval = dropout(x)
    
    print("Input shape:", x.shape)
    print("Output shape during training:", output_train.shape)
    print("Output shape during evaluation:", output_eval.shape)
    
if __name__ == "__main__":
	test_dropout()

Transformer位置编码

def sinusoidal_position_embedding(batch_size, nums_head, max_len, output_dim, device):
    # (max_len, 1)
    position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(-1)
    # (output_dim//2)
    ids = torch.arange(0, output_dim // 2, dtype=torch.float)  # 即公式里的i, i的范围是 [0,d/2]
    theta = torch.pow(10000, -2 * ids / output_dim)

    # (max_len, output_dim//2)
    embeddings = position * theta  # 即公式里的:pos / (10000^(2i/d))

    # (max_len, output_dim//2, 2)
    embeddings = torch.stack([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)

    # (bs, head, max_len, output_dim//2, 2)
    embeddings = embeddings.repeat((batch_size, nums_head, *([1] * len(embeddings.shape))))  # 在bs维度重复,其他维度都是1不重复

    # (bs, head, max_len, output_dim)
    # reshape后就是:偶数sin, 奇数cos了
    embeddings = torch.reshape(embeddings, (batch_size, nums_head, max_len, output_dim))
    embeddings = embeddings.to(device)
    return embeddings

RoPE

Self-attention

from math import sqrt
import torch
import torch.nn as nn

class Self_Attention(nn.Module):
    def __init__(self, input_dim, dim_k, dim_v):
        super(Self_Attention, self).__init__()
        self.q = nn.Linear(input_dim, dim_k)
        self.k = nn.Linear(input_dim, dim_k)
        self.v = nn.Linear(input_dim, dim_v)
        self._norm_fact = 1 / sqrt(dim_k)
        
    def forward(self, x):
        Q = self.q(x)  # Q: batch_size * seq_len * dim_k
        K = self.k(x)  # K: batch_size * seq_len * dim_k
        V = self.v(x)  # V: batch_size * seq_len * dim_v
         
        # Q * K.T() / sqrt(dim_k)
        atten = torch.bmm(Q, K.permute(0, 2, 1)) * self._norm_fact  # batch_size * seq_len * seq_len
        
        # 计算 Softmax
        atten = torch.softmax(atten, dim=-1)
        
        # 计算输出
        output = torch.bmm(atten, V)  # Q * K.T() * V # batch_size * seq_len * dim_v
        
        return output

# 创建一个 Self_Attention 对象
input_dim = 64
dim_k = 32
dim_v = 32
self_attention = Self_Attention(input_dim, dim_k, dim_v)

# 创建一个示例输入张量,形状为 batch_size * seq_len * input_dim
batch_size = 2
seq_len = 10
x = torch.randn(batch_size, seq_len, input_dim)

# 运行前向传播
output = self_attention(x)

print("Input shape:", x.shape)
print("Output shape:", output.shape)

Scaled Cross Product

import torch
from torch import nn

class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, query, key, value, attention_mask=None):
        # query, key, value 形状: (batch_size, seq_len, hidden_size)
        
        # 计算注意力分数
        # key.transpose(-1, -2) 将最后两个维度进行转置,以进行点积
        # attention_scores 形状: (batch_size, seq_len, seq_len)
        d_k = query.size(-1)  # 获取 hidden_size
        attention_scores = torch.matmul(query, key.transpose(-1, -2)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32))
        
        # 添加注意力掩码(seq_len, seq_len),掩码位置(1)的值为负无穷
        if attention_mask is not None:
            attention_scores += attention_mask * -1e9
                
        # 对注意力分数进行归一化,得到注意力概率
        attention_probs = torch.softmax(attention_scores, dim=-1)  # (batch_size, num_heads, seq_len, seq_len)
        
        # 计算注意力输出,通过注意力概率加权值
        attention_output = torch.matmul(attention_probs, value)  # (batch_size, num_heads, seq_len, hidden_size)
        
        return attention_output
    
def test_attn():
    batch_size = 128
    seq_len = 512
    hidden_size = 1024
    
    query = torch.randn(batch_size, seq_len, hidden_size)  # (batch_size, seq_len, hidden_size)
    key = torch.randn(batch_size, seq_len, hidden_size)    # (batch_size, seq_len, hidden_size)
    value = torch.randn(batch_size, seq_len, hidden_size)  # (batch_size, seq_len, hidden_size)

    sdpa = ScaledDotProductAttention()
    output = sdpa(query, key, value)
    
    print("Query shape:", query.shape)
    print("Key shape:", key.shape)
    print("Value shape:", value.shape)
    print("Output shape:", output.shape)
    
if __name__ == "__main__":
	test_attn()

MHA

import torch
from torch import nn

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, hidden_size, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = hidden_size // num_heads  # 每个头的维度,二者必须整除
        
        # 初始化 Q、K、V 的投影矩阵,将输入词向量线性变换为 Q、K、V,维度保持一致
        self.q_linear = nn.Linear(hidden_size, hidden_size) 
        self.k_linear = nn.Linear(hidden_size, hidden_size)
        self.v_linear = nn.Linear(hidden_size, hidden_size)
        
        # 输出线性层,将拼接后的多头注意力输出变换为所需的输出维度,这里维度保持一致
        self.o_linear = nn.Linear(hidden_size, hidden_size)
        
    def forward(self, hidden_state, attention_mask=None):
        # hidden_state 形状: (batch_size, seq_len, hidden_size)
        batch_size = hidden_state.size(0)  # 获取批量大小

        # 计算 Q、K、V,线性变换
        query = self.q_linear(hidden_state)  # (batch_size, seq_len, hidden_size)
        key = self.k_linear(hidden_state)    # (batch_size, seq_len, hidden_size)
        value = self.v_linear(hidden_state)  # (batch_size, seq_len, hidden_size)

        # 分割多头,将每个头的维度拆分出来
        query = self.split_head(query)  # (batch_size, num_heads, seq_len, head_dim)
        key = self.split_head(key)      # (batch_size, num_heads, seq_len, head_dim)
        value = self.split_head(value)  # (batch_size, num_heads, seq_len, head_dim)

        # 计算注意力分数,使用缩放点积注意力机制
        # attention_scores 形状: (batch_size, num_heads, seq_len, seq_len)
        attention_scores = torch.matmul(query, key.transpose(-1, -2)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))
        
        # 添加注意力掩码(seq_len, seq_len),掩码位置(1)的值为负无穷
        if attention_mask is not None:
            attention_scores += attention_mask * -1e9
        
        # 对注意力分数进行归一化,得到注意力概率
        attention_probs = torch.softmax(attention_scores, dim=-1)  # (batch_size, num_heads, seq_len, seq_len)

        # 计算注意力输出,通过注意力概率加权值
        output = torch.matmul(attention_probs, value)  # (batch_size, num_heads, seq_len, head_dim)
        
        # 对多头注意力输出进行拼接
        # output.transpose(1, 2) 将 num_heads 和 seq_len 维度转置
        # 将形状调整为 (batch_size, seq_len, hidden_size)
        output = output.transpose(1, 2).reshape(batch_size, -1, self.head_dim * self.num_heads)
        
        # 通过线性层将拼接后的输出变换为所需的输出维度
        output = self.o_linear(output)  # (batch_size, seq_len, hidden_size)
        
        return output

    def split_head(self, x):
        batch_size = x.size(0)  # 获取批量大小
        # x 形状: (batch_size, seq_len, hidden_size)
        # 将 hidden_size 分割为 num_heads 和 head_dim
        return x.reshape(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        # 返回形状: (batch_size, num_heads, seq_len, head_dim)

def test_MHA():
    batch_size = 128
    seq_len = 512
    hidden_size = 1024
    num_heads = 8
    
    # 随机生成输入数据
    hidden_state = torch.randn(batch_size, seq_len, hidden_size)  # (batch_size, seq_len, hidden_size)
    
    # 创建多头注意力模块
    mha = MultiHeadAttention(hidden_size, num_heads)
    
    # 计算多头注意力输出
    output = mha(hidden_state)
    
    print("Input shape:", hidden_state.shape)
    print("Output shape:", output.shape)
    
if __name__ == "__main__":
	test_MHA()

Softmax

import torch

def softmax(x):
    # 计算输入张量的指数
    exp_x = torch.exp(x)
    
    # 计算所有指数之和
    sum_exp_x = torch.sum(exp_x, dim=0)
    
    # 将每个元素的指数除以总和
    softmax_x = exp_x / sum_exp_x
    
    return softmax_x

# 假设我们有一个张量
x = torch.tensor([1.0, 2.0, 3.0])
# 使用自己实现的 softmax 函数
softmax_x = softmax(x)
print(softmax_x)

MSE

import torch

def mse_loss(y_true, y_pred):
    # 计算平方误差
    squared_diff = (y_true - y_pred) ** 2
    
    # 返回平均平方误差
    return torch.mean(squared_diff)

# 测试均方误差损失函数
y_true = torch.tensor([3.0, -0.5, 2.0, 7.0])
y_pred = torch.tensor([2.5, 0.0, 2.0, 8.0])

loss = mse_loss(y_true, y_pred)
print(f"Mean Squared Error: {loss.item()}")

Cross entropy

import torch

def cross_entropy_loss(y_true, y_pred):
    # 防止 log(0) 的情况
    epsilon = 1e-12
    y_pred = torch.clamp(y_pred, epsilon, 1. - epsilon)
    
    # 计算交叉熵
    ce_loss = -torch.sum(y_true * torch.log(y_pred), dim=-1)
    
    # 返回平均损失
    return torch.mean(ce_loss)
y_true = torch.tensor([[1, 0, 0], [0, 1, 0]], dtype=torch.float32)
y_pred = torch.tensor([[0.8, 0.1, 0.1], [0.2, 0.7, 0.1]], dtype=torch.float32)

loss = cross_entropy_loss(y_true, y_pred)
print(f"Cross-Entropy Loss: {loss.item()}")