从RNN到BERT

发布于:2025-09-01 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

  1. 序列模型简介
  2. RNN循环神经网络
  3. LSTM长短期记忆网络
  4. Transformer架构
  5. BERT模型详解
  6. 实践项目

序列模型简介

什么是序列数据?

序列数据是按照特定顺序排列的数据,其中元素的顺序包含重要信息。常见的序列数据包括:

  • 文本:单词或字符的序列
  • 时间序列:股票价格、天气数据
  • 音频:声音信号的采样序列
  • 视频:图像帧的序列

为什么需要专门的序列模型?

传统的神经网络(如全连接网络)存在以下限制:

  1. 固定输入大小:无法处理变长序列
  2. 无记忆能力:不能利用序列中的时序信息
  3. 参数过多:对于长序列,参数量会爆炸式增长

RNN循环神经网络

1. RNN基本概念

1.1 核心思想

RNN(Recurrent Neural Network)的核心思想是让网络具有"记忆"能力。它通过在处理序列时维护一个隐藏状态(hidden state),将之前的信息传递到当前时刻。

RNN的数学表示:

h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h)
y_t = W_hy * h_t + b_y

其中:

  • x_t:时刻t的输入
  • h_t:时刻t的隐藏状态
  • y_t:时刻t的输出
  • W_hh, W_xh, W_hy:权重矩阵
  • b_h, b_y:偏置项
1.2 RNN的展开视图
输入:  x_1 → x_2 → x_3 → ... → x_t
        ↓     ↓     ↓           ↓
RNN:  [h_1]→[h_2]→[h_3]→ ... →[h_t]
        ↓     ↓     ↓           ↓
输出:  y_1   y_2   y_3   ...   y_t

每个时间步的RNN单元共享相同的参数(权重和偏置)。

1.3 Python实现示例
import numpy as np

class SimpleRNN:
    def __init__(self, input_size, hidden_size, output_size):
        # 初始化权重
        self.W_xh = np.random.randn(hidden_size, input_size) * 0.01
        self.W_hh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.W_hy = np.random.randn(output_size, hidden_size) * 0.01
        self.b_h = np.zeros((hidden_size, 1))
        self.b_y = np.zeros((output_size, 1))
        
    def forward(self, inputs):
        """
        前向传播
        inputs: 输入序列,shape=(input_size, seq_length)
        """
        h = np.zeros((self.W_hh.shape[0], 1))  # 初始隐藏状态
        self.hidden_states = [h]
        outputs = []
        
        for t in range(inputs.shape[1]):
            x = inputs[:, t].reshape(-1, 1)
            # 计算新的隐藏状态
            h = np.tanh(np.dot(self.W_xh, x) + np.dot(self.W_hh, h) + self.b_h)
            # 计算输出
            y = np.dot(self.W_hy, h) + self.b_y
            
            self.hidden_states.append(h)
            outputs.append(y)
            
        return outputs, self.hidden_states

2. RNN的反向传播(BPTT)

2.1 什么是BPTT?

BPTT(Backpropagation Through Time)是RNN的训练算法,它将RNN在时间上展开,然后像普通神经网络一样进行反向传播。

2.2 BPTT的核心步骤
  1. 前向传播:计算所有时间步的输出和隐藏状态
  2. 计算损失:累积所有时间步的损失
  3. 反向传播:从最后一个时间步开始,逐步向前计算梯度
  4. 梯度累积:由于参数共享,需要累积所有时间步的梯度
2.3 梯度计算公式

对于损失函数L,梯度计算如下:

∂L/∂W_hy = Σ_t ∂L_t/∂W_hy
∂L/∂W_hh = Σ_t Σ_k ∂L_t/∂h_t * ∂h_t/∂h_k * ∂h_k/∂W_hh
∂L/∂W_xh = Σ_t ∂L_t/∂h_t * ∂h_t/∂W_xh
2.4 BPTT实现示例
def backward(self, targets, learning_rate=0.01):
    """
    反向传播算法
    targets: 目标序列
    """
    # 初始化梯度
    dW_xh = np.zeros_like(self.W_xh)
    dW_hh = np.zeros_like(self.W_hh)
    dW_hy = np.zeros_like(self.W_hy)
    db_h = np.zeros_like(self.b_h)
    db_y = np.zeros_like(self.b_y)
    
    dh_next = np.zeros_like(self.hidden_states[0])
    
    # 反向遍历时间步
    for t in reversed(range(len(outputs))):
        # 输出层梯度
        dy = outputs[t] - targets[t]
        dW_hy += np.dot(dy, self.hidden_states[t+1].T)
        db_y += dy
        
        # 隐藏层梯度
        dh = np.dot(self.W_hy.T, dy) + dh_next
        dh_raw = (1 - self.hidden_states[t+1]**2) * dh
        
        # 参数梯度
        db_h += dh_raw
        dW_xh += np.dot(dh_raw, inputs[t].T)
        dW_hh += np.dot(dh_raw, self.hidden_states[t].T)
        
        # 传递梯度到前一时间步
        dh_next = np.dot(self.W_hh.T, dh_raw)
    
    # 梯度裁剪(防止梯度爆炸)
    for dparam in [dW_xh, dW_hh, dW_hy, db_h, db_y]:
        np.clip(dparam, -5, 5, out=dparam)
    
    # 参数更新
    self.W_xh -= learning_rate * dW_xh
    self.W_hh -= learning_rate * dW_hh
    self.W_hy -= learning_rate * dW_hy
    self.b_h -= learning_rate * db_h
    self.b_y -= learning_rate * db_y

3. RNN的问题

3.1 梯度消失和梯度爆炸
  • 梯度消失:当序列很长时,早期时间步的梯度会变得极小,导致无法有效学习长期依赖
  • 梯度爆炸:梯度在反向传播过程中不断累积,可能变得极大
3.2 解决方案
  • 梯度裁剪(Gradient Clipping)
  • 使用ReLU激活函数
  • 更好的初始化方法
  • 使用LSTM或GRU(最有效的解决方案)

LSTM长短期记忆网络

1. LSTM的动机

LSTM(Long Short-Term Memory)是为了解决RNN的长期依赖问题而设计的。它通过引入"门控机制"来控制信息的流动。

2. LSTM的核心组件

2.1 细胞状态(Cell State)

细胞状态C_t是LSTM的核心,它像一条传送带,贯穿整个网络,只有少量的线性交互,信息可以轻松地流动。

2.2 三个门控机制

LSTM通过三个门来控制细胞状态:

1. 遗忘门(Forget Gate)

f_t = σ(W_f · [h_{t-1}, x_t] + b_f)
  • 决定从细胞状态中丢弃什么信息
  • 输出0到1之间的数值,0表示完全遗忘,1表示完全保留

2. 输入门(Input Gate)

i_t = σ(W_i · [h_{t-1}, x_t] + b_i)
C̃_t = tanh(W_C · [h_{t-1}, x_t] + b_C)
  • 决定什么新信息存储在细胞状态中
  • i_t决定更新哪些值
  • C̃_t创建新的候选值

3. 输出门(Output Gate)

o_t = σ(W_o · [h_{t-1}, x_t] + b_o)
h_t = o_t * tanh(C_t)
  • 决定输出什么信息
  • 基于细胞状态,但是经过过滤
2.3 LSTM的完整计算流程
import numpy as np

class LSTM:
    def __init__(self, input_size, hidden_size, output_size):
        # 初始化权重矩阵
        self.hidden_size = hidden_size
        
        # 遗忘门参数
        self.W_f = np.random.randn(hidden_size, input_size + hidden_size) * 0.01
        self.b_f = np.zeros((hidden_size, 1))
        
        # 输入门参数
        self.W_i = np.random.randn(hidden_size, input_size + hidden_size) * 0.01
        self.b_i = np.zeros((hidden_size, 1))
        
        # 候选值参数
        self.W_C = np.random.randn(hidden_size, input_size + hidden_size) * 0.01
        self.b_C = np.zeros((hidden_size, 1))
        
        # 输出门参数
        self.W_o = np.random.randn(hidden_size, input_size + hidden_size) * 0.01
        self.b_o = np.zeros((hidden_size, 1))
        
        # 输出层参数
        self.W_y = np.random.randn(output_size, hidden_size) * 0.01
        self.b_y = np.zeros((output_size, 1))
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def lstm_cell(self, x_t, h_prev, C_prev):
        """
        单个LSTM单元的前向传播
        """
        # 拼接输入和前一个隐藏状态
        concat = np.vstack((h_prev, x_t))
        
        # 遗忘门
        f_t = self.sigmoid(np.dot(self.W_f, concat) + self.b_f)
        
        # 输入门
        i_t = self.sigmoid(np.dot(self.W_i, concat) + self.b_i)
        C_tilde = np.tanh(np.dot(self.W_C, concat) + self.b_C)
        
        # 更新细胞状态
        C_t = f_t * C_prev + i_t * C_tilde
        
        # 输出门
        o_t = self.sigmoid(np.dot(self.W_o, concat) + self.b_o)
        h_t = o_t * np.tanh(C_t)
        
        # 计算输出
        y_t = np.dot(self.W_y, h_t) + self.b_y
        
        # 保存中间值用于反向传播
        cache = (x_t, h_prev, C_prev, f_t, i_t, C_tilde, o_t, C_t, h_t)
        
        return h_t, C_t, y_t, cache
    
    def forward(self, inputs):
        """
        LSTM的前向传播
        inputs: shape=(input_size, seq_length)
        """
        seq_length = inputs.shape[1]
        h_t = np.zeros((self.hidden_size, 1))
        C_t = np.zeros((self.hidden_size, 1))
        
        outputs = []
        caches = []
        
        for t in range(seq_length):
            x_t = inputs[:, t].reshape(-1, 1)
            h_t, C_t, y_t, cache = self.lstm_cell(x_t, h_t, C_t)
            outputs.append(y_t)
            caches.append(cache)
        
        return outputs, caches

3. LSTM的优势

3.1 长期记忆能力
  • 细胞状态可以携带信息穿越很长的序列
  • 门控机制精确控制信息的添加和删除
3.2 梯度流动
  • 细胞状态的线性特性使梯度能够更好地流动
  • 避免了梯度消失问题
3.3 选择性记忆
  • 能够学习何时记住、何时遗忘、何时输出
  • 对不同类型的序列模式有很好的适应性

4. LSTM的变体

4.1 GRU(Gated Recurrent Unit)

GRU是LSTM的简化版本,只有两个门:

  • 重置门(Reset Gate):决定忘记多少过去的信息
  • 更新门(Update Gate):决定保留多少过去的信息
# GRU的核心计算
z_t = σ(W_z · [h_{t-1}, x_t])  # 更新门
r_t = σ(W_r · [h_{t-1}, x_t])  # 重置门
h̃_t = tanh(W · [r_t * h_{t-1}, x_t])  # 候选隐藏状态
h_t = (1 - z_t) * h_{t-1} + z_t * h̃_t  # 最终隐藏状态
4.2 双向LSTM(Bidirectional LSTM)
  • 同时使用前向和后向两个LSTM
  • 能够利用未来的上下文信息
  • 在许多NLP任务中表现优秀

Transformer架构

1. Transformer的革命性创新

2017年,Google提出的Transformer架构彻底改变了序列建模的方式。其核心创新是完全基于注意力机制,抛弃了RNN的递归结构。

2. 自注意力机制(Self-Attention)

2.1 核心思想

自注意力允许模型在处理每个位置时,直接关注序列中的所有其他位置,而不需要通过递归。

2.2 注意力的数学表达

Query、Key、Value的计算:

Q = X · W_Q  # Query矩阵
K = X · W_K  # Key矩阵
V = X · W_V  # Value矩阵

Attention(Q, K, V) = softmax(QK^T / √d_k) · V

其中:

  • X:输入序列的嵌入表示
  • W_Q, W_K, W_V:学习的权重矩阵
  • d_k:Key向量的维度(用于缩放)
2.3 自注意力的直观理解

想象你在阅读句子"The cat sat on the mat":

  • 处理"sat"时,模型需要知道是"cat"在坐
  • 自注意力让"sat"直接查看所有词,并学习与"cat"的关联
  • 每个词都会计算与其他所有词的相关性分数
2.4 Python实现
import numpy as np

class SelfAttention:
    def __init__(self, d_model, d_k, d_v):
        """
        d_model: 输入维度
        d_k: Key的维度
        d_v: Value的维度
        """
        self.d_k = d_k
        self.W_Q = np.random.randn(d_model, d_k) * 0.01
        self.W_K = np.random.randn(d_model, d_k) * 0.01
        self.W_V = np.random.randn(d_model, d_v) * 0.01
    
    def forward(self, X):
        """
        X: 输入序列,shape=(seq_len, d_model)
        """
        # 计算Q, K, V
        Q = np.dot(X, self.W_Q)
        K = np.dot(X, self.W_K)
        V = np.dot(X, self.W_V)
        
        # 计算注意力分数
        scores = np.dot(Q, K.T) / np.sqrt(self.d_k)
        
        # Softmax归一化
        attention_weights = self.softmax(scores)
        
        # 加权求和
        output = np.dot(attention_weights, V)
        
        return output, attention_weights
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

3. 多头注意力(Multi-Head Attention)

3.1 为什么需要多头?

单个注意力头可能只能捕捉一种类型的关系。多头注意力允许模型同时关注不同类型的信息。

3.2 多头注意力的实现
class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        self.num_heads = num_heads
        self.d_model = d_model
        self.d_k = d_model // num_heads
        
        # 为每个头创建独立的权重矩阵
        self.W_Q = np.random.randn(num_heads, d_model, self.d_k) * 0.01
        self.W_K = np.random.randn(num_heads, d_model, self.d_k) * 0.01
        self.W_V = np.random.randn(num_heads, d_model, self.d_k) * 0.01
        
        # 输出投影
        self.W_O = np.random.randn(d_model, d_model) * 0.01
    
    def forward(self, X):
        batch_size, seq_len = X.shape[0], X.shape[1]
        outputs = []
        
        for i in range(self.num_heads):
            # 每个头独立计算注意力
            Q = np.dot(X, self.W_Q[i])
            K = np.dot(X, self.W_K[i])
            V = np.dot(X, self.W_V[i])
            
            scores = np.dot(Q, K.T) / np.sqrt(self.d_k)
            attention = self.softmax(scores)
            head_output = np.dot(attention, V)
            outputs.append(head_output)
        
        # 拼接所有头的输出
        concat_output = np.concatenate(outputs, axis=-1)
        
        # 最终投影
        final_output = np.dot(concat_output, self.W_O)
        
        return final_output

4. 位置编码(Positional Encoding)

由于Transformer没有递归结构,需要额外的位置信息。

4.1 正弦位置编码
def positional_encoding(seq_len, d_model):
    """
    生成位置编码
    """
    PE = np.zeros((seq_len, d_model))
    
    for pos in range(seq_len):
        for i in range(0, d_model, 2):
            PE[pos, i] = np.sin(pos / (10000 ** (2 * i / d_model)))
            PE[pos, i + 1] = np.cos(pos / (10000 ** (2 * i / d_model)))
    
    return PE

5. Transformer的完整架构

5.1 编码器(Encoder)
class TransformerEncoder:
    def __init__(self, d_model, num_heads, d_ff, num_layers):
        self.num_layers = num_layers
        self.layers = []
        
        for _ in range(num_layers):
            layer = {
                'attention': MultiHeadAttention(d_model, num_heads),
                'norm1': LayerNorm(d_model),
                'feedforward': FeedForward(d_model, d_ff),
                'norm2': LayerNorm(d_model)
            }
            self.layers.append(layer)
    
    def forward(self, X):
        for layer in self.layers:
            # 多头注意力
            attn_output = layer['attention'].forward(X)
            X = layer['norm1'].forward(X + attn_output)  # 残差连接
            
            # 前馈网络
            ff_output = layer['feedforward'].forward(X)
            X = layer['norm2'].forward(X + ff_output)  # 残差连接
        
        return X
5.2 解码器(Decoder)

解码器除了自注意力外,还包含编码器-解码器注意力:

class TransformerDecoder:
    def __init__(self, d_model, num_heads, d_ff, num_layers):
        self.num_layers = num_layers
        self.layers = []
        
        for _ in range(num_layers):
            layer = {
                'self_attention': MultiHeadAttention(d_model, num_heads),
                'norm1': LayerNorm(d_model),
                'cross_attention': MultiHeadAttention(d_model, num_heads),
                'norm2': LayerNorm(d_model),
                'feedforward': FeedForward(d_model, d_ff),
                'norm3': LayerNorm(d_model)
            }
            self.layers.append(layer)
    
    def forward(self, X, encoder_output):
        for layer in self.layers:
            # 带掩码的自注意力
            self_attn = layer['self_attention'].forward(X, mask=True)
            X = layer['norm1'].forward(X + self_attn)
            
            # 编码器-解码器注意力
            cross_attn = layer['cross_attention'].forward(X, encoder_output)
            X = layer['norm2'].forward(X + cross_attn)
            
            # 前馈网络
            ff_output = layer['feedforward'].forward(X)
            X = layer['norm3'].forward(X + ff_output)
        
        return X

6. Transformer的优势

  1. 并行计算:所有位置可以同时计算,大大提高训练速度
  2. 长距离依赖:直接建模任意两个位置的关系
  3. 可解释性:注意力权重提供了模型决策的可视化
  4. 扩展性:容易扩展到更大的模型规模

BERT模型详解

1. BERT的创新之处

BERT(Bidirectional Encoder Representations from Transformers)是Google在2018年提出的预训练语言模型,它革命性地改变了NLP领域。

1.1 核心创新
  1. 双向性:同时利用左右上下文
  2. 预训练-微调范式:在大规模数据上预训练,然后针对特定任务微调
  3. 统一架构:同一个模型可以用于各种下游任务

2. BERT的架构

2.1 基础架构

BERT基于Transformer的编码器:

  • BERT-Base:12层,768维隐藏层,12个注意力头,110M参数
  • BERT-Large:24层,1024维隐藏层,16个注意力头,340M参数
2.2 输入表示

BERT的输入是三个嵌入的和:

class BERTEmbedding:
    def __init__(self, vocab_size, max_len, d_model):
        # 词嵌入
        self.token_embedding = np.random.randn(vocab_size, d_model) * 0.01
        # 段嵌入(用于区分句子A和句子B)
        self.segment_embedding = np.random.randn(2, d_model) * 0.01
        # 位置嵌入
        self.position_embedding = np.random.randn(max_len, d_model) * 0.01
    
    def forward(self, token_ids, segment_ids, position_ids):
        # 获取三种嵌入
        token_emb = self.token_embedding[token_ids]
        segment_emb = self.segment_embedding[segment_ids]
        position_emb = self.position_embedding[position_ids]
        
        # 相加得到最终嵌入
        embeddings = token_emb + segment_emb + position_emb
        
        return embeddings

3. BERT的预训练任务

3.1 掩码语言模型(Masked Language Model, MLM)

核心思想:随机遮蔽输入中15%的词,让模型预测被遮蔽的词。

实现细节

  • 80%的时间:用[MASK]标记替换
  • 10%的时间:用随机词替换
  • 10%的时间:保持不变
def create_mlm_data(tokens, vocab_size, mask_prob=0.15):
    """
    创建MLM训练数据
    """
    output_labels = []
    masked_tokens = tokens.copy()
    
    for i in range(len(tokens)):
        if np.random.random() < mask_prob:
            output_labels.append(tokens[i])
            
            # 80%概率用[MASK]替换
            if np.random.random() < 0.8:
                masked_tokens[i] = '[MASK]'
            # 10%概率用随机词替换
            elif np.random.random() < 0.5:
                masked_tokens[i] = np.random.randint(0, vocab_size)
            # 10%概率保持不变
            # else: keep original
        else:
            output_labels.append(-1)  # 不需要预测
    
    return masked_tokens, output_labels
3.2 下一句预测(Next Sentence Prediction, NSP)

核心思想:给定两个句子,预测第二个句子是否是第一个句子的下一句。

训练数据构造

  • 50%的时间:句子B确实是句子A的下一句(标签:IsNext)
  • 50%的时间:句子B是随机选择的(标签:NotNext)
def create_nsp_data(sentence_pairs):
    """
    创建NSP训练数据
    """
    nsp_data = []
    
    for i, (sent_a, sent_b) in enumerate(sentence_pairs):
        if np.random.random() < 0.5:
            # 正样本:真实的下一句
            nsp_data.append({
                'sent_a': sent_a,
                'sent_b': sent_b,
                'label': 1  # IsNext
            })
        else:
            # 负样本:随机句子
            random_idx = np.random.randint(len(sentence_pairs))
            while random_idx == i:
                random_idx = np.random.randint(len(sentence_pairs))
            
            nsp_data.append({
                'sent_a': sent_a,
                'sent_b': sentence_pairs[random_idx][0],  # 随机句子
                'label': 0  # NotNext
            })
    
    return nsp_data

4. BERT的完整实现

class BERT:
    def __init__(self, vocab_size, max_len=512, d_model=768, 
                 num_layers=12, num_heads=12, d_ff=3072):
        # 嵌入层
        self.embedding = BERTEmbedding(vocab_size, max_len, d_model)
        
        # Transformer编码器
        self.encoder = TransformerEncoder(d_model, num_heads, d_ff, num_layers)
        
        # MLM预测头
        self.mlm_head = MLMHead(d_model, vocab_size)
        
        # NSP预测头
        self.nsp_head = NSPHead(d_model)
    
    def forward(self, token_ids, segment_ids, masked_positions=None):
        # 获取嵌入
        seq_len = len(token_ids)
        position_ids = np.arange(seq_len)
        embeddings = self.embedding.forward(token_ids, segment_ids, position_ids)
        
        # 通过Transformer编码器
        encoded = self.encoder.forward(embeddings)
        
        # MLM预测
        mlm_predictions = None
        if masked_positions is not None:
            masked_encoded = encoded[masked_positions]
            mlm_predictions = self.mlm_head.forward(masked_encoded)
        
        # NSP预测(使用[CLS]标记的输出)
        cls_output = encoded[0]  # 第一个位置是[CLS]
        nsp_prediction = self.nsp_head.forward(cls_output)
        
        return mlm_predictions, nsp_prediction

class MLMHead:
    def __init__(self, d_model, vocab_size):
        self.dense = np.random.randn(d_model, d_model) * 0.01
        self.layer_norm = LayerNorm(d_model)
        self.decoder = np.random.randn(d_model, vocab_size) * 0.01
    
    def forward(self, hidden_states):
        x = np.dot(hidden_states, self.dense)
        x = self.gelu(x)
        x = self.layer_norm.forward(x)
        predictions = np.dot(x, self.decoder)
        return predictions
    
    def gelu(self, x):
        # GELU激活函数
        return 0.5 * x * (1 + np.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * x**3)))

class NSPHead:
    def __init__(self, d_model):
        self.classifier = np.random.randn(d_model, 2) * 0.01
    
    def forward(self, cls_output):
        return np.dot(cls_output, self.classifier)

5. BERT的微调

5.1 文本分类任务
class BERTForClassification:
    def __init__(self, bert_model, num_classes):
        self.bert = bert_model
        self.classifier = np.random.randn(768, num_classes) * 0.01
        self.dropout = 0.1
    
    def forward(self, token_ids, segment_ids):
        # 获取BERT的输出
        _, cls_output = self.bert.forward(token_ids, segment_ids)
        
        # Dropout
        if self.training:
            mask = np.random.binomial(1, 1-self.dropout, cls_output.shape)
            cls_output = cls_output * mask / (1-self.dropout)
        
        # 分类
        logits = np.dot(cls_output, self.classifier)
        return logits
5.2 问答任务
class BERTForQuestionAnswering:
    def __init__(self, bert_model):
        self.bert = bert_model
        # 预测答案的起始和结束位置
        self.qa_outputs = np.random.randn(768, 2) * 0.01
    
    def forward(self, token_ids, segment_ids):
        # 获取所有位置的输出
        sequence_output, _ = self.bert.forward(token_ids, segment_ids)
        
        # 预测起始和结束位置
        logits = np.dot(sequence_output, self.qa_outputs)
        start_logits = logits[:, 0]
        end_logits = logits[:, 1]
        
        return start_logits, end_logits

6. BERT的训练技巧

6.1 学习率调度
def get_linear_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps):
    """
    带预热的线性学习率调度
    """
    def lr_lambda(current_step):
        if current_step < num_warmup_steps:
            # 预热阶段:线性增加
            return float(current_step) / float(max(1, num_warmup_steps))
        # 线性衰减
        return max(0.0, float(num_training_steps - current_step) / 
                   float(max(1, num_training_steps - num_warmup_steps)))
    
    return lr_lambda
6.2 梯度累积

当批次太大无法放入内存时,使用梯度累积:

def train_with_gradient_accumulation(model, data_loader, accumulation_steps=4):
    optimizer.zero_grad()
    
    for i, batch in enumerate(data_loader):
        outputs = model.forward(batch)
        loss = compute_loss(outputs, batch['labels'])
        loss = loss / accumulation_steps  # 归一化损失
        loss.backward()
        
        if (i + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()

7. BERT的变体和改进

7.1 RoBERTa
  • 去除NSP任务
  • 使用更大的批次和更多数据
  • 动态掩码
7.2 ALBERT
  • 参数共享(跨层共享)
  • 因式分解嵌入
  • 句子顺序预测(SOP)替代NSP
7.3 ELECTRA
  • 生成器-判别器架构
  • 替换词检测任务
  • 更高效的预训练

实践项目

项目1:使用RNN进行文本生成

目标

构建一个字符级RNN,生成莎士比亚风格的文本。

实现步骤
import numpy as np

class CharRNN:
    def __init__(self, vocab_size, hidden_size=128, seq_length=25):
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.seq_length = seq_length
        
        # 初始化参数
        self.Wxh = np.random.randn(hidden_size, vocab_size) * 0.01
        self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.Why = np.random.randn(vocab_size, hidden_size) * 0.01
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((vocab_size, 1))
    
    def train(self, data, epochs=100, learning_rate=0.1):
        """训练模型"""
        for epoch in range(epochs):
            h_prev = np.zeros((self.hidden_size, 1))
            
            for t in range(0, len(data) - self.seq_length, self.seq_length):
                # 准备输入和目标
                inputs = [data[t+i] for i in range(self.seq_length)]
                targets = [data[t+i+1] for i in range(self.seq_length)]
                
                # 前向传播和反向传播
                loss, h_prev = self.train_step(inputs, targets, h_prev, learning_rate)
                
                if t % 1000 == 0:
                    print(f'Epoch {epoch}, Step {t}, Loss: {loss:.4f}')
    
    def generate(self, seed_char, length=100, temperature=1.0):
        """生成文本"""
        h = np.zeros((self.hidden_size, 1))
        x = np.zeros((self.vocab_size, 1))
        x[seed_char] = 1
        generated = []
        
        for _ in range(length):
            h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
            y = np.dot(self.Why, h) + self.by
            p = np.exp(y / temperature) / np.sum(np.exp(y / temperature))
            
            # 采样
            ix = np.random.choice(range(self.vocab_size), p=p.ravel())
            x = np.zeros((self.vocab_size, 1))
            x[ix] = 1
            generated.append(ix)
        
        return generated

项目2:使用LSTM进行情感分析

目标

构建LSTM模型对电影评论进行情感分类。

import torch
import torch.nn as nn

class LSTMSentimentClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256, 
                 num_layers=2, dropout=0.5):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, 
                           batch_first=True, dropout=dropout, 
                           bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, 2)  # 二分类
    
    def forward(self, x):
        # x shape: (batch_size, seq_len)
        embedded = self.embedding(x)
        # embedded shape: (batch_size, seq_len, embedding_dim)
        
        lstm_out, (hidden, cell) = self.lstm(embedded)
        # 使用最后一个时间步的输出
        # lstm_out shape: (batch_size, seq_len, hidden_dim * 2)
        
        # 取最后时间步
        last_hidden = lstm_out[:, -1, :]
        
        dropped = self.dropout(last_hidden)
        output = self.fc(dropped)
        
        return output

# 训练函数
def train_sentiment_model(model, train_loader, val_loader, epochs=10):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        
        for batch in train_loader:
            texts, labels = batch
            
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(model.parameters(), 5)
            
            optimizer.step()
            total_loss += loss.item()
        
        # 验证
        model.eval()
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch in val_loader:
                texts, labels = batch
                outputs = model(texts)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        accuracy = 100 * correct / total
        print(f'Epoch {epoch+1}, Loss: {total_loss:.4f}, Accuracy: {accuracy:.2f}%')

项目3:使用Transformer进行机器翻译

class TransformerTranslator(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, 
                 num_heads=8, num_layers=6, d_ff=2048, max_len=100):
        super().__init__()
        
        # 源语言和目标语言的嵌入
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        
        # Transformer
        self.transformer = nn.Transformer(d_model, num_heads, num_layers, 
                                         num_layers, d_ff)
        
        # 输出层
        self.output_layer = nn.Linear(d_model, tgt_vocab_size)
    
    def forward(self, src, tgt):
        # 嵌入和位置编码
        src_emb = self.positional_encoding(self.src_embedding(src))
        tgt_emb = self.positional_encoding(self.tgt_embedding(tgt))
        
        # 生成目标掩码(防止看到未来的词)
        tgt_mask = self.generate_square_subsequent_mask(tgt.size(1))
        
        # Transformer前向传播
        output = self.transformer(src_emb, tgt_emb, tgt_mask=tgt_mask)
        
        # 预测下一个词
        output = self.output_layer(output)
        
        return output
    
    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)
        return mask

# 推理函数
def translate(model, src_sentence, src_vocab, tgt_vocab, max_len=50):
    model.eval()
    
    # 编码源句子
    src_tokens = [src_vocab[word] for word in src_sentence.split()]
    src_tensor = torch.tensor(src_tokens).unsqueeze(0)
    
    # 开始解码
    tgt_tokens = [tgt_vocab['<sos>']]
    
    for _ in range(max_len):
        tgt_tensor = torch.tensor(tgt_tokens).unsqueeze(0)
        
        with torch.no_grad():
            output = model(src_tensor, tgt_tensor)
            next_token = output[0, -1, :].argmax().item()
        
        tgt_tokens.append(next_token)
        
        if next_token == tgt_vocab['<eos>']:
            break
    
    # 转换回文字
    translation = [tgt_vocab.get_word(token) for token in tgt_tokens]
    return ' '.join(translation[1:-1])  # 去除<sos>和<eos>

项目4:使用BERT进行命名实体识别

from transformers import BertForTokenClassification, BertTokenizer

class BERTNERModel:
    def __init__(self, num_labels):
        self.model = BertForTokenClassification.from_pretrained(
            'bert-base-uncased', 
            num_labels=num_labels
        )
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
    def train(self, train_data, val_data, epochs=3):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=5e-5)
        
        for epoch in range(epochs):
            self.model.train()
            total_loss = 0
            
            for batch in train_data:
                # 准备输入
                inputs = self.tokenizer(batch['texts'], 
                                       padding=True, 
                                       truncation=True, 
                                       return_tensors="pt")
                labels = batch['labels']
                
                # 前向传播
                outputs = self.model(**inputs, labels=labels)
                loss = outputs.loss
                
                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            
            # 验证
            self.evaluate(val_data)
    
    def predict(self, text):
        self.model.eval()
        
        # Tokenize
        inputs = self.tokenizer(text, return_tensors="pt")
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            predictions = torch.argmax(outputs.logits, dim=-1)
        
        # 解码预测结果
        tokens = self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
        labels = predictions[0].cpu().numpy()
        
        # 合并结果
        entities = []
        current_entity = []
        current_label = None
        
        for token, label in zip(tokens, labels):
            if label != 0:  # 非O标签
                if current_label == label:
                    current_entity.append(token)
                else:
                    if current_entity:
                        entities.append((current_entity, current_label))
                    current_entity = [token]
                    current_label = label
            else:
                if current_entity:
                    entities.append((current_entity, current_label))
                    current_entity = []
                    current_label = None
        
        return entities

总结与展望

学习路线建议

  1. 基础阶段(1-2周)

    • 理解RNN的基本概念
    • 实现简单的RNN
    • 理解梯度消失问题
  2. 进阶阶段(2-3周)

    • 深入理解LSTM的门控机制
    • 实现LSTM并在实际任务中应用
    • 学习GRU等变体
  3. 深入阶段(3-4周)

    • 全面掌握Transformer架构
    • 理解自注意力机制
    • 实现简单的Transformer
  4. 应用阶段(持续)

    • 学习使用预训练模型
    • 微调BERT解决实际问题
    • 探索最新的模型架构

重要资源

  1. 论文

    • “Attention Is All You Need” (Transformer)
    • “BERT: Pre-training of Deep Bidirectional Transformers”
    • “GPT-3: Language Models are Few-Shot Learners”
  2. 开源框架

    • Hugging Face Transformers
    • PyTorch
    • TensorFlow
  3. 在线课程

    • Stanford CS224N: Natural Language Processing with Deep Learning
    • Fast.ai Practical Deep Learning
    • Andrew Ng’s Deep Learning Specialization
  4. 模型规模:GPT-4、PaLM等超大规模模型

  5. 效率优化:模型压缩、知识蒸馏

  6. 多模态学习:结合文本、图像、音频

  7. 持续学习:模型的在线更新和适应

  8. 可解释性:理解模型的决策过程