【从0到1搞懂大模型】chatGPT 中的对齐优化(RLHF)讲解与实战(9)

发布于:2025-05-25 ⋅ 阅读:(19) ⋅ 点赞:(0)

GPT系列模型的演进

chatgpt系列模型演进的重要节点包含下面几个模型(当然,这两年模型发展太快了,4o这些推理模型我就先不写了)
(Transformer) → GPT-1 → GPT-2 → GPT-3 → InstructGPT/ChatGPT(GPT-3.5) → GPT-4
在这里插入图片描述
下面介绍一下各个模型之前的重点差异

(1)Transformer(2017)

  • 定位:NLP基础架构革命,奠定GPT系列技术底座
  • 核心创新:
    • 多头自注意力机制:替代RNN/CNN,解决长距离依赖问题
    • 位置编码:通过正弦函数或可学习向量表征序列位置
    • 并行计算架构:突破序列处理的效率瓶颈
  • 局限:未形成完整生成模型,需配合任务微调

(2)GPT-1(2018)

  • 技术定位:首个基于Transformer的生成式预训练模型
  • 核心改进:
    • 单向掩码机制:仅允许左向注意力,实现自回归文本生成
    • 两阶段训练:无监督预训练(BooksCorpus)+ 下游任务微调
    • 参数规模:1.17亿
  • 应用场景:文本续写、简单问答

(3)GPT-2(2019)

  • 技术跃迁:验证"规模扩展+零样本学习"可行性
  • 核心改进:
    • 参数爆炸:15亿参数,较GPT-1增长12.8倍
    • 零样本迁移:无需微调即可完成翻译、摘要等任务
    • WebText数据集:800万网页数据提升多样性
  • 局限:生成文本存在重复和不连贯现象

(4)GPT-3(2020)

  • 技术突破:定义"大模型即服务"范式
  • 核心创新:
    • 超大规模参数:1750亿参数,开启千亿级模型时代
    • 上下文学习:通过Prompt工程实现少样本/单样本学习
    • 混合训练数据:融合Common Crawl、书籍、维基百科等
  • 局限:存在事实性错误和伦理风险

(5)InstructGPT/ChatGPT(GPT-3.5, 2022)

  • 技术定位:首个实现人类对齐的对话模型
  • 核心改进:
    • 三阶段对齐流程:SFT → RM → PPO
    • RLHF技术:通过人类反馈强化学习优化输出安全性
    • 指令微调:使用人工标注指令-答案对提升任务理解
    • 参数规模:保持1750亿参数,但训练数据量扩展

(6)GPT-4(2023)

  • 技术革命:多模态+超智能体架构
  • 核心创新:
    • 多模态处理:支持图像输入与文本生成联动
    • 混合专家模型:推测采用MoE架构,参数达1.8万亿
    • 动态推理优化:思维链(CoT)增强复杂问题解决能力
    • 安全增强:毒性输出较GPT-3.5降低50%以上
    • 工程突破:32K上下文窗口支持长文档处理

可以见的,从 GPT1 到 GPT3,最主要的技术进步就是参数量和预训练数据的扩大,这也验证可 Scale Law(规模法则)即模型性能随模型规模的增长而提高
而从 GPT3 到可以直接对话的 chatgpt,对齐优化则是最重要的突破,RLHF作为对齐阶段的里程碑技术,通过三阶段流程(SFT→RM→PPO)将模型输出与人类偏好深度绑定,标志着语言模型从“通用生成”到“可控服务”的范式转变。
下面 图片就演示了 RLHF,而本篇文章重点讲解一下 RLHF(Reinforcement Learning from Human Feedback,基于人类反馈的强化学习)

RLHF讲解与实战

整体流程介绍

在这里插入图片描述
ChatGPT是怎么变聪明的?​​
想象一下,ChatGPT一开始就像个刚学说话的小孩,虽然懂一些知识,但回答得不太好。科学家们为了让它的回答更符合人类喜好,用了​​三步训练法​​,让它像打游戏升级一样,越练越强!

  • 第1步:先教它“标准答案”(监督学习)​​
    ​​方法​​:从网上找一大堆问题和答案(比如“怎么煮咖啡?”),让ChatGPT学习正确的回答方式。
    ​​结果​​:它学会了基本的对话能力,但还不够聪明,回答可能很死板或者不讨喜。这时候的版本叫​​“弱弱的ChatGPT”​​。
  • 第2步:教它“哪种回答更讨喜”(奖励模型)​​
    ​​方法​​:让“弱弱的ChatGPT”对同一个问题生成多个答案(比如回答“煮咖啡”时,有的详细,有的简短),然后请人类给这些答案打分(哪个更好?哪个更差?)。
    ​​训练奖励模型​​:用这些打分数据训练一个​​“评分AI”​​,让它学会人类的喜好,以后能自动给ChatGPT的回答打分。
  • 第3步:让它“自己和自己比赛”(强化学习)​​
    ​​方法​​:让“弱弱的ChatGPT”继续回答问题,但这次用​​“评分AI”​​给它打分,然后告诉它:“这个回答得分高,下次多这样答;那个回答得分低,下次别这样了。”
    ​​升级版ChatGPT​​:经过反复调整,它的回答越来越符合人类喜好,变得更自然、更聪明。
  • 循环升级:越练越强!​​
    升级后的ChatGPT可以​​重新训练“评分AI”​​(因为它的回答更好了)。
    更好的“评分AI”又能帮ChatGPT​​进一步优化回答​​……
    这样​​循环训练​​,就像武侠小说里的高手​​左右手互搏​​,越练越厉害,最终成为惊艳世界的ChatGPT!

强化学习基础知识

强化学习(Reinforcement Learning, RL) 是一种通过试错学习实现目标的人工智能范式。其核心是智能体(Agent)在与环境(Environment)的交互中,通过最大化累积奖励(Reward)来学习最优策略(Policy)。

  • 关键要素:
    • 智能体(Agent):决策主体(如机器人、游戏角色)。
    • 环境(Environment):智能体交互的物理或虚拟世界。
    • 状态(State):环境的当前描述(如迷宫中的位置)。
    • 动作(Action):智能体可执行的操作(如移动方向)。
    • 奖励(Reward):环境对动作的即时反馈(如到达终点+100分,撞墙-10分)。
    • 策略(Policy):状态到动作的映射规则(如“遇到障碍物时左转”)。
  • 与其他机器学习的区别:
    • 监督学习:依赖标注数据(输入-答案对),优化目标是预测误差最小化。
    • 强化学习:无需标注数据,通过试错优化长期累积奖励,注重延迟反馈(如围棋中某一步可能在几十步后才决定胜负)

在训练ChatGPT时,OpenAI让一组人类评估者来评价模型的回答。这些评估者拿到了一组指导方针,告诉他们什么样的回答应该被高度评价,什么样的回答应该被低度评价。在评估者评价的过程中,通过不断的试错和学习,机器人试图找到一种策略,使得在与用户交谈过程中获取的总奖励最大。这就是通过基于人类反馈的强化学习调优ChatGPT的基本思想。

PPO讲解

PPO 是一种强化学习算法,专门用于​​让AI模型(比如ChatGPT)通过试错和反馈优化自己的策略(即参数)​​。它的核心思想是:
​​“小步调整参数,避免一次更新太大导致模型崩溃”​​(就像健身时循序渐进,而不是突然举100kg受伤)。

PPO的输入
  • 输入​​:
    ​​当前模型(策略)​​:比如“弱弱的ChatGPT”,参数为θ。
    ​​奖励模型(RM)​​:能给ChatGPT的回答打分(比如1~10分)。
    ​​- 输出​​:
    ​​优化后的新模型​​:参数θ’,生成的回答更符合人类偏好。
PPO分步骤讲解
  • 步骤1:生成回答并打分​​
    从Prompt库抽样一个问题(比如“怎么煮咖啡?”)。
    让当前ChatGPT生成多个答案(比如答案A、B、C)。
    奖励模型RM给这些答案打分(比如A=7分,B=3分,C=5分)

  • 步骤2:计算“优势”(Advantage)​​
    ​​关键问题​​:当前回答比“平均水平”好多少?
    ​​公式​​:优势A = 当前回答得分 - 平均预期得分
    如果A=7,平均预期=5 → 优势=+2(鼓励这类回答)。
    如果B=3,平均预期=5 → 优势=-2(抑制这类回答)。

  • 步骤3:计算“策略比率”(Policy Ratio)​​
    ​​关键问题​​:新参数θ’生成的回答,和旧参数θ生成的回答概率相差多少?
    ​​公式​​:比率 = Pθ’(回答) / Pθ(回答)
    如果比率≈1:新旧策略对回答的选择概率相似。
    如果比率>1:新策略更倾向于生成该回答。
    如果比率<1:新策略更抑制该回答。

  • 步骤4:PPO的核心目标函数​​
    PPO通过以下公式调整参数θ’,​​同时限制更新幅度​​(避免突变):

**目标函数 = min(比率×优势, clip(比率, 1-ε, 1+ε)×优势)**

​​clip函数​​:强制比率在[1-ε, 1+ε]范围内(比如ε=0.2 → 比率限制在0.8~1.2)。
如果比率=1.5(更新太大)→ 被clip到1.2,防止参数剧烈变化。
如果比率=0.9(安全范围)→ 保持不变。

  • 步骤5:梯度下降更新参数​​
    最大化目标函数 → 让高优势回答的概率增加,低优势回答的概率降低。
    但通过clip限制步长,保证训练稳定。

PPO 的整体公式就是

L(θ) = E[ min(ratio * Advantage, clip(ratio, 1-ε, 1+ε) * Advantage) ] - β * Entropy
  • ratio = Pθ_new(a|s) / Pθ_old(a|s)
  • Entropy 是熵奖励,鼓励生成多样性回答。

这个本身是一个相对复杂的算法,建议看看网上更专业的讲解奥
推荐一下李宏毅老师的课程:http://speech.ee.ntu.edu.tw/~tlkagk/courses_MLDS18.html

ppo代码示例
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy as np
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)

class LanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)  # 输出每个词的概率

    def forward(self, x):
        # x: [batch_size, seq_len]
        x = self.embedding(x)  # [batch_size, seq_len, embedding_dim]
        lstm_out, _ = self.lstm(x)  # [batch_size, seq_len, hidden_dim]
        logits = self.fc(lstm_out)  # [batch_size, seq_len, vocab_size]
        return logits

    def generate(self, prompt, max_len=20):
        """生成文本(类似ChatGPT的推理过程)"""
        with torch.no_grad():
            tokens = prompt
            for _ in range(max_len):
                logits = self.forward(tokens)  # [batch_size, seq_len, vocab_size]
                next_token_logits = logits[:, -1, :]  # 取最后一个词的logits
                probs = torch.softmax(next_token_logits, dim=-1)
                next_token = torch.multinomial(probs, 1)  # 按概率采样
                tokens = torch.cat([tokens, next_token], dim=1)
            return tokens

class RewardModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=256):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)  # 输出单个奖励值

    def forward(self, x):
        # x: [batch_size, seq_len]
        x = self.embedding(x)  # [batch_size, seq_len, embedding_dim]
        lstm_out, _ = self.lstm(x)  # [batch_size, seq_len, hidden_dim]
        # 使用最后一个时间步的隐藏状态来预测奖励
        rewards = self.fc(lstm_out[:, -1, :])  # [batch_size, 1]
        return rewards.squeeze(-1)  # [batch_size]

def ppo_train(
    actor_model,          # 语言模型(策略网络)
    reward_model,         # 奖励模型
    optimizer,            # 优化器(如Adam)
    prompts,              # 输入的prompt(问题)
    num_epochs=10,        # PPO训练轮数
    clip_epsilon=0.2,     # PPO的clip参数(通常0.1~0.3)
    gamma=0.99,           # 折扣因子
    batch_size=32         # 每批数据量
):
    actor_model.train()  # 切换到训练模式
    reward_model.eval()  # 奖励模型设置为评估模式
    
    for epoch in range(num_epochs):
        # 1. 生成回答(采样)
        with torch.no_grad():
            responses = actor_model.generate(prompts)  # [batch_size, seq_len]
            # 计算旧策略的概率
            old_logits = actor_model(responses)  # [batch_size, seq_len, vocab_size]
            old_log_probs = torch.log_softmax(old_logits, dim=-1)
            old_log_probs = old_log_probs.gather(-1, responses.unsqueeze(-1)).squeeze(-1)  # [batch_size, seq_len]
            old_log_probs = old_log_probs.mean(dim=1)  # 平均每个token的log_prob

        # 2. 计算奖励(用奖励模型)
        with torch.no_grad():
            rewards = reward_model(responses)  # [batch_size]
        
        # 3. 计算优势(Advantage)
        # 这里简化计算:Advantage ≈ 归一化的Reward
        advantages = (rewards - rewards.mean()) / (rewards.std() + 1e-8)
        
        # 4. 计算新策略的概率
        new_logits = actor_model(responses)  # [batch_size, seq_len, vocab_size]
        new_log_probs = torch.log_softmax(new_logits, dim=-1)
        new_log_probs = new_log_probs.gather(-1, responses.unsqueeze(-1)).squeeze(-1)  # [batch_size, seq_len]
        new_log_probs = new_log_probs.mean(dim=1)  # 平均每个token的log_prob
        
        # 5. 计算策略比率(Policy Ratio)
        ratios = torch.exp(new_log_probs - old_log_probs)  # e^{log(π_new/π_old)}
        
        # 6. PPO 目标函数(Clipped Surrogate Objective)
        surr1 = ratios * advantages
        surr2 = torch.clamp(ratios, 1 - clip_epsilon, 1 + clip_epsilon) * advantages
        policy_loss = -torch.min(surr1, surr2).mean()  # 取min防止更新过大
        
        # 7. 计算熵正则化(鼓励探索)
        entropy = Categorical(logits=new_logits).entropy().mean()
        entropy_bonus = 0.01 * entropy  # 调节系数
        
        # 8. 总损失 = Policy Loss - 熵奖励
        loss = policy_loss - entropy_bonus
        
        # 9. 反向传播 & 优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}, Avg Reward: {rewards.mean().item():.4f}")

# 设置参数
vocab_size = 10000  # 词汇表大小
embedding_dim = 128
hidden_dim = 256

# 创建模型
actor_model = LanguageModel(vocab_size, embedding_dim, hidden_dim)
reward_model = RewardModel(vocab_size, embedding_dim, hidden_dim)

# 优化器
optimizer = optim.Adam(actor_model.parameters(), lr=1e-5)

# 创建示例输入数据
batch_size = 32
seq_len = 5
prompts = torch.randint(0, vocab_size, (batch_size, seq_len))

# 开始PPO训练
ppo_train(actor_model, reward_model, optimizer, prompts, num_epochs=10)

简单RLHF实战

下面实现了一个完整的RLHF系统,包含了完整的RLHF三阶段流程

  • 预训练阶段:训练基础语言模型
  • 奖励模型训练:基于人类偏好数据训练奖励模型
  • PPO强化学习:使用PPO算法优化策略

学习思路即可 目前效果确实一般

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random
from collections import deque
import json
import os
from typing import List, Dict, Tuple, Optional
import logging

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"使用设备: {device}")

class SimpleTokenizer:
    """简单的分词器"""
    def __init__(self, vocab_size=5000):
        self.vocab_size = vocab_size
        self.vocab = {"<pad>": 0, "<unk>": 1, "<start>": 2, "<end>": 3}
        self.reverse_vocab = {0: "<pad>", 1: "<unk>", 2: "<start>", 3: "<end>"}
        
    def build_vocab(self, texts):
        """构建词汇表"""
        word_freq = {}
        for text in texts:
            words = text.lower().split()
            for word in words:
                word_freq[word] = word_freq.get(word, 0) + 1
        
        # 按频率排序,取前vocab_size-4个词
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
        for i, (word, _) in enumerate(sorted_words[:self.vocab_size-4]):
            idx = i + 4
            self.vocab[word] = idx
            self.reverse_vocab[idx] = word
        
        # 确保测试词在词汇表中
        test_words = ["今天", "天气", "人工", "智能", "学习", "健康", "工作"]
        for word in test_words:
            if word not in self.vocab:
                idx = len(self.vocab)
                self.vocab[word] = idx
                self.reverse_vocab[idx] = word
        
        logger.info(f"构建词汇表完成,词汇量: {len(self.vocab)}")
        logger.info(f"测试词是否在词汇表中: {[word in self.vocab for word in test_words]}")
    
    def encode(self, text, max_length=128):
        """编码文本"""
        words = text.lower().split()
        tokens = [self.vocab.get(word, 1) for word in words]  # 1是<unk>
        
        # 添加开始和结束标记
        tokens = [2] + tokens + [3]  # 2是<start>, 3是<end>
        
        # 填充或截断
        if len(tokens) < max_length:
            tokens += [0] * (max_length - len(tokens))  # 0是<pad>
        else:
            tokens = tokens[:max_length]
            tokens[-1] = 3  # 确保以<end>结尾
        
        return tokens
    
    def decode(self, tokens):
        """解码令牌"""
        words = []
        for token in tokens:
            if token in [0, 2]:  # 跳过<pad>和<start>
                continue
            if token == 3:  # <end>
                break
            words.append(self.reverse_vocab.get(token, "<unk>"))
        return " ".join(words)

class SimpleTransformer(nn.Module):
    """简化的Transformer模型"""
    def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, max_length=128):
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.max_length = max_length
        
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(max_length, d_model))
        
        # Transformer层
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=d_model*4,
            dropout=0.1, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # 输出头
        self.lm_head = nn.Linear(d_model, vocab_size)
        
        # 初始化权重
        self._init_weights()
        
    def _init_weights(self):
        """初始化权重"""
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.normal_(module.weight, mean=0.0, std=0.02)
                if module.bias is not None:
                    nn.init.zeros_(module.bias)
            elif isinstance(module, nn.Embedding):
                nn.init.normal_(module.weight, mean=0.0, std=0.02)
        
    def forward(self, input_ids, attention_mask=None):
        seq_length = input_ids.size(1)
        
        # 嵌入和位置编码
        embeddings = self.embedding(input_ids)
        # 确保位置编码维度匹配
        pos_enc = self.pos_encoding[:seq_length].unsqueeze(0).expand(embeddings.size(0), -1, -1)
        embeddings = embeddings + pos_enc
        
        # 注意力掩码
        if attention_mask is None:
            attention_mask = (input_ids != 0).float()
        
        # Transformer
        mask = (attention_mask == 0)
        hidden_states = self.transformer(embeddings, src_key_padding_mask=mask)
        
        # 语言模型头
        logits = self.lm_head(hidden_states)
        
        return logits

class RewardModel(nn.Module):
    """奖励模型"""
    def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, max_length=128):
        super().__init__()
        self.d_model = d_model
        
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = nn.Parameter(torch.randn(max_length, d_model))
        
        # Transformer层
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=d_model*4,
            dropout=0.1, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # 奖励头
        self.reward_head = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Linear(d_model // 2, 1)
        )
        
    def forward(self, input_ids, attention_mask=None):
        seq_length = input_ids.size(1)
        
        # 嵌入和位置编码
        embeddings = self.embedding(input_ids)
        # 确保位置编码维度匹配
        pos_enc = self.pos_encoding[:seq_length].unsqueeze(0).expand(embeddings.size(0), -1, -1)
        embeddings = embeddings + pos_enc
        
        # 注意力掩码
        if attention_mask is None:
            attention_mask = (input_ids != 0).float()
        
        # Transformer
        mask = (attention_mask == 0)
        hidden_states = self.transformer(embeddings, src_key_padding_mask=mask)
        
        # 使用最后一个非填充位置的隐藏状态
        batch_size = input_ids.size(0)
        # 计算每个序列的实际长度(非padding token的数量)
        seq_lengths = attention_mask.sum(dim=1).long()
        # 避免索引为0的情况,至少为1
        last_positions = torch.clamp(seq_lengths - 1, min=0)
        
        # 使用torch.arange确保索引类型正确
        batch_indices = torch.arange(batch_size, device=input_ids.device)
        last_hidden = hidden_states[batch_indices, last_positions]
        
        # 奖励分数
        reward = self.reward_head(last_hidden).squeeze(-1)
        
        return reward

class PreferenceDataset(Dataset):
    """偏好数据集"""
    def __init__(self, preferences, tokenizer, max_length=128):
        self.preferences = preferences
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.preferences)
    
    def __getitem__(self, idx):
        pref = self.preferences[idx]
        prompt = pref["prompt"]
        chosen = pref["chosen"]
        rejected = pref["rejected"]
        
        # 编码
        prompt_tokens = self.tokenizer.encode(prompt, self.max_length)
        chosen_tokens = self.tokenizer.encode(prompt + " " + chosen, self.max_length)
        rejected_tokens = self.tokenizer.encode(prompt + " " + rejected, self.max_length)
        
        return {
            "prompt": torch.tensor(prompt_tokens, dtype=torch.long),
            "chosen": torch.tensor(chosen_tokens, dtype=torch.long),
            "rejected": torch.tensor(rejected_tokens, dtype=torch.long)
        }

class PPOTrainer:
    """PPO训练器"""
    def __init__(self, policy_model, reward_model, tokenizer, lr=1e-4):
        self.policy_model = policy_model
        self.reward_model = reward_model
        self.tokenizer = tokenizer
        
        # 创建参考模型(冻结的策略模型副本)
        self.ref_model = SimpleTransformer(
            vocab_size=policy_model.vocab_size,
            d_model=policy_model.d_model,
            max_length=policy_model.max_length
        ).to(device)
        self.ref_model.load_state_dict(policy_model.state_dict())
        self.ref_model.eval()
        
        # 优化器
        self.optimizer = optim.Adam(policy_model.parameters(), lr=lr)
        
        # PPO参数
        self.clip_ratio = 0.2
        self.kl_coeff = 0.02
        self.value_coeff = 0.1
        self.entropy_coeff = 0.01
        
    def generate_response(self, prompt_tokens, max_new_tokens=50, temperature=0.8):
        """生成响应"""
        self.policy_model.eval()
        with torch.no_grad():
            input_ids = prompt_tokens.clone()
            
            for _ in range(max_new_tokens):
                logits = self.policy_model(input_ids)
                next_token_logits = logits[:, -1, :] / temperature
                probs = F.softmax(next_token_logits, dim=-1)
                next_token = torch.multinomial(probs, 1)
                
                input_ids = torch.cat([input_ids, next_token], dim=-1)
                
                # 检查是否所有序列都生成了结束标记
                if (next_token == 3).all():  # <end>
                    break
                    
                # 防止序列过长
                if input_ids.size(1) >= self.policy_model.max_length:
                    break
            
            return input_ids
    
    def compute_rewards(self, sequences):
        """计算奖励"""
        self.reward_model.eval()
        with torch.no_grad():
            rewards = self.reward_model(sequences)
        return rewards
    
    def compute_log_probs(self, model, sequences, actions):
        """计算动作的对数概率"""
        # 确保序列长度足够
        if sequences.size(1) <= 1:
            return torch.zeros(actions.size(), device=actions.device)
            
        logits = model(sequences[:, :-1])  # 不包括最后一个token
        log_probs = F.log_softmax(logits, dim=-1)
        
        # 获取动作的对数概率
        action_log_probs = log_probs.gather(2, actions.unsqueeze(-1)).squeeze(-1)
        
        return action_log_probs
    
    def ppo_step(self, prompts, responses, rewards, old_log_probs):
        """PPO更新步骤"""
        self.policy_model.train()
        
        # 检查响应是否为空
        if responses.size(1) == 0:
            return {
                "policy_loss": 0.0,
                "kl_penalty": 0.0,
                "entropy": 0.0,
                "total_loss": 0.0
            }
        
        # 计算新的对数概率
        full_sequences = torch.cat([prompts, responses], dim=1)
        new_log_probs = self.compute_log_probs(self.policy_model, full_sequences, responses)
        
        # 计算参考模型的对数概率(用于KL散度)
        with torch.no_grad():
            ref_log_probs = self.compute_log_probs(self.ref_model, full_sequences, responses)
        
        # 计算比率
        ratio = torch.exp(new_log_probs - old_log_probs)
        
        # PPO裁剪损失
        advantages = rewards.unsqueeze(-1) - rewards.mean()
        clipped_ratio = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)
        policy_loss = -torch.min(ratio * advantages, clipped_ratio * advantages).mean()
        
        # KL散度惩罚
        kl_penalty = self.kl_coeff * (new_log_probs - ref_log_probs).mean()
        
        # 熵奖励
        entropy = -(torch.exp(new_log_probs) * new_log_probs).sum(-1).mean()
        entropy_bonus = self.entropy_coeff * entropy
        
        # 总损失
        total_loss = policy_loss + kl_penalty - entropy_bonus
        
        # 反向传播
        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
        self.optimizer.step()
        
        return {
            "policy_loss": policy_loss.item(),
            "kl_penalty": kl_penalty.item(),
            "entropy": entropy.item(),
            "total_loss": total_loss.item()
        }

class RLHFTrainer:
    """完整的RLHF训练器"""
    def __init__(self, vocab_size=5000, d_model=256, max_length=128):
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.max_length = max_length
        
        # 初始化组件
        self.tokenizer = SimpleTokenizer(vocab_size)
        self.policy_model = SimpleTransformer(vocab_size, d_model, max_length=max_length).to(device)
        self.reward_model = RewardModel(vocab_size, d_model, max_length=max_length).to(device)
        
        logger.info("RLHF训练器初始化完成")
    
    def prepare_data(self):
        """准备示例数据 - 增加更多样化的数据"""
        # 预训练数据 - 增加数据量和多样性
        pretrain_texts = [
            "今天天气很好,阳光明媚,适合出门散步",
            "我喜欢学习新知识,这让我感到充实和快乐",
            "人工智能很有趣,它正在改变我们的生活",
            "编程是一门艺术,需要逻辑思维和创造力",
            "音乐让人放松,能够舒缓心情和压力",
            "读书使人进步,开拓视野增长见识",
            "运动有益健康,保持身体活力和精神状态",
            "美食令人愉悦,带来味觉和心灵的享受",
            "旅行开阔视野,体验不同文化和风景",
            "友谊珍贵无比,真挚的友情值得珍惜",
            "工作需要专注,认真负责才能做好",
            "家庭很重要,亲情是最温暖的港湾",
            "创新推动发展,新思路带来新机遇",
            "学习永无止境,持续进步才能成长",
            "沟通很关键,理解彼此才能合作",
            "时间很宝贵,珍惜当下把握机会",
            "健康最重要,身体是革命的本钱",
            "梦想值得追求,坚持努力终会实现",
            "思考很重要,深度思考带来智慧",
            "合作共赢好,团结协作力量大"
        ] * 20  # 增加重复次数
        
        # 偏好数据 - 增加数据量
        preferences = [
            {
                "prompt": "今天天气",
                "chosen": "今天天气很好,阳光明媚,适合出门游玩和散步",
                "rejected": "今天天气不好,下雨了"
            },
            {
                "prompt": "学习的意义",
                "chosen": "学习能够丰富知识,提升能力,让人变得更加智慧",
                "rejected": "学习很累很困难"
            },
            {
                "prompt": "人工智能",
                "chosen": "人工智能是未来科技发展的重要方向,将改变生活",
                "rejected": "人工智能很难懂很复杂"
            },
            {
                "prompt": "工作态度",
                "chosen": "认真负责的工作态度是成功的关键因素",
                "rejected": "工作很辛苦很累"
            },
            {
                "prompt": "健康生活",
                "chosen": "健康的生活方式包括运动、营养和良好作息",
                "rejected": "健康生活很难坚持"
            },
            {
                "prompt": "友谊价值",
                "chosen": "真挚的友谊珍贵无比,朋友间相互支持很重要",
                "rejected": "朋友关系很复杂"
            }
        ] * 10  # 增加重复次数
        
        return pretrain_texts, preferences
    
    def pretrain(self, texts, epochs=20, batch_size=8, lr=1e-3):
        """预训练阶段 - 增加训练轮数"""
        logger.info("开始预训练...")
        
        # 构建词汇表
        self.tokenizer.build_vocab(texts)
        
        # 准备数据 - 使用与模型一致的最大长度
        encoded_texts = [self.tokenizer.encode(text, self.max_length) for text in texts]
        dataset = torch.utils.data.TensorDataset(torch.tensor(encoded_texts, dtype=torch.long))
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        # 优化器 - 调整学习率
        optimizer = optim.Adam(self.policy_model.parameters(), lr=lr, weight_decay=1e-5)
        criterion = nn.CrossEntropyLoss(ignore_index=0)  # 忽略padding
        
        # 学习率调度器
        scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
        
        self.policy_model.train()
        for epoch in range(epochs):
            total_loss = 0
            num_batches = 0
            for batch in dataloader:
                input_ids = batch[0].to(device)
                
                # 前向传播
                logits = self.policy_model(input_ids)
                
                # 计算损失(预测下一个token)
                shift_logits = logits[:, :-1, :].contiguous()
                shift_labels = input_ids[:, 1:].contiguous()
                
                loss = criterion(shift_logits.view(-1, self.vocab_size), shift_labels.view(-1))
                
                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
                optimizer.step()
                
                total_loss += loss.item()
                num_batches += 1
            
            scheduler.step()
            avg_loss = total_loss / num_batches
            logger.info(f"预训练 Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}")
        
        logger.info("预训练完成")
    
    def train_reward_model(self, preferences, epochs=30, batch_size=4, lr=1e-4):
        """训练奖励模型 - 增加训练轮数"""
        logger.info("开始训练奖励模型...")
        
        dataset = PreferenceDataset(preferences, self.tokenizer, self.max_length)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        optimizer = optim.Adam(self.reward_model.parameters(), lr=lr, weight_decay=1e-5)
        
        self.reward_model.train()
        for epoch in range(epochs):
            total_loss = 0
            correct = 0
            total = 0
            
            for batch in dataloader:
                chosen_ids = batch["chosen"].to(device)
                rejected_ids = batch["rejected"].to(device)
                
                # 计算奖励分数
                chosen_rewards = self.reward_model(chosen_ids)
                rejected_rewards = self.reward_model(rejected_ids)
                
                # 排序损失(chosen应该比rejected得分更高)
                loss = -torch.log(torch.sigmoid(chosen_rewards - rejected_rewards)).mean()
                
                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
                optimizer.step()
                
                total_loss += loss.item()
                
                # 计算准确率
                correct += (chosen_rewards > rejected_rewards).sum().item()
                total += chosen_rewards.size(0)
            
            avg_loss = total_loss / len(dataloader)
            accuracy = correct / total
            logger.info(f"奖励模型 Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Acc: {accuracy:.4f}")
        
        logger.info("奖励模型训练完成")
    
    def rl_training(self, prompts, epochs=15, batch_size=2):
        """强化学习训练阶段 - 增加训练轮数"""
        logger.info("开始PPO强化学习训练...")
        
        ppo_trainer = PPOTrainer(self.policy_model, self.reward_model, self.tokenizer, lr=5e-5)
        
        # 编码提示
        encoded_prompts = [self.tokenizer.encode(prompt, self.max_length//2) for prompt in prompts]
        prompt_dataset = torch.utils.data.TensorDataset(torch.tensor(encoded_prompts, dtype=torch.long))
        prompt_dataloader = DataLoader(prompt_dataset, batch_size=batch_size, shuffle=True)
        
        for epoch in range(epochs):
            epoch_stats = {"policy_loss": 0, "kl_penalty": 0, "entropy": 0, "total_loss": 0}
            num_batches = 0
            
            for batch in prompt_dataloader:
                prompt_tokens = batch[0].to(device)
                
                # 生成响应
                with torch.no_grad():
                    full_sequences = ppo_trainer.generate_response(prompt_tokens, max_new_tokens=20)
                    
                    # 确保响应序列不为空
                    if full_sequences.size(1) <= prompt_tokens.size(1):
                        continue
                        
                    responses = full_sequences[:, prompt_tokens.size(1):]
                    
                    # 计算奖励
                    rewards = ppo_trainer.compute_rewards(full_sequences)
                    
                    # 计算旧的对数概率
                    old_log_probs = ppo_trainer.compute_log_probs(ppo_trainer.policy_model, full_sequences, responses)
                
                # PPO更新
                stats = ppo_trainer.ppo_step(prompt_tokens, responses, rewards, old_log_probs)
                
                for key in epoch_stats:
                    epoch_stats[key] += stats[key]
                num_batches += 1
            
            # 记录平均统计
            if num_batches > 0:
                for key in epoch_stats:
                    epoch_stats[key] /= num_batches
                
                logger.info(f"PPO Epoch {epoch+1}/{epochs}: {epoch_stats}")
            else:
                logger.info(f"PPO Epoch {epoch+1}/{epochs}: 没有有效的批次")
        
        logger.info("PPO训练完成")
    
    def generate_text(self, prompt, max_length=30, temperature=0.7, top_k=50):
        """生成文本 - 改进解码策略"""
        self.policy_model.eval()
        
        # 确保提示词在词汇表中
        prompt_tokens = torch.tensor([self.tokenizer.encode(prompt, self.max_length//2)], dtype=torch.long).to(device)
        
        with torch.no_grad():
            input_ids = prompt_tokens.clone()
            generated_tokens = []
            
            for _ in range(max_length):
                # 获取模型预测
                logits = self.policy_model(input_ids)
                next_token_logits = logits[:, -1, :] / temperature
                
                # 应用top-k过滤
                if top_k > 0:
                    indices_to_remove = next_token_logits < torch.topk(next_token_logits, top_k)[0][..., -1, None]
                    next_token_logits[indices_to_remove] = -float('Inf')
                
                # 计算概率分布
                probs = F.softmax(next_token_logits, dim=-1)
                
                # 采样下一个token
                next_token = torch.multinomial(probs, 1)
                
                # 如果生成了结束标记,停止生成
                if next_token.item() == 3:  # <end>
                    break
                
                # 如果生成了未知标记,尝试重新采样
                if next_token.item() == 1:  # <unk>
                    continue
                
                # 添加新生成的token
                input_ids = torch.cat([input_ids, next_token], dim=-1)
                generated_tokens.append(next_token.item())
                
                # 检查序列长度
                if input_ids.size(1) >= self.max_length:
                    break
            
            # 如果没有生成任何token,返回原始提示
            if not generated_tokens:
                return prompt
            
            # 解码生成的文本
            generated_text = self.tokenizer.decode(generated_tokens)
            
            # 如果生成了空文本,返回原始提示
            if not generated_text.strip():
                return prompt
            
            return prompt + " " + generated_text
    
    def save_models(self, save_dir="./rlhf_models"):
        """保存模型"""
        os.makedirs(save_dir, exist_ok=True)
        
        torch.save(self.policy_model.state_dict(), os.path.join(save_dir, "policy_model.pt"))
        torch.save(self.reward_model.state_dict(), os.path.join(save_dir, "reward_model.pt"))
        
        # 保存分词器
        with open(os.path.join(save_dir, "tokenizer.json"), "w", encoding="utf-8") as f:
            json.dump({
                "vocab": self.tokenizer.vocab,
                "reverse_vocab": self.tokenizer.reverse_vocab
            }, f, ensure_ascii=False, indent=2)
        
        logger.info(f"模型已保存到 {save_dir}")
    
    def load_models(self, save_dir="./rlhf_models"):
        """加载模型"""
        self.policy_model.load_state_dict(torch.load(os.path.join(save_dir, "policy_model.pt"), map_location=device))
        self.reward_model.load_state_dict(torch.load(os.path.join(save_dir, "reward_model.pt"), map_location=device))
        
        # 加载分词器
        with open(os.path.join(save_dir, "tokenizer.json"), "r", encoding="utf-8") as f:
            tokenizer_data = json.load(f)
            self.tokenizer.vocab = tokenizer_data["vocab"]
            self.tokenizer.reverse_vocab = {int(k): v for k, v in tokenizer_data["reverse_vocab"].items()}
        
        logger.info(f"模型已从 {save_dir} 加载")

def main():
    """主函数 - 完整的RLHF训练流程"""
    logger.info("开始完整的RLHF训练流程")
    
    # 初始化训练器 - 调整参数
    trainer = RLHFTrainer(vocab_size=2000, d_model=128, max_length=64)
    
    # 准备数据
    pretrain_texts, preferences = trainer.prepare_data()
    
    # 阶段1:预训练
    trainer.pretrain(pretrain_texts, epochs=15, batch_size=4)
    
    # 阶段2:训练奖励模型
    trainer.train_reward_model(preferences, epochs=20, batch_size=2)
    
    # 阶段3:PPO强化学习
    prompts = ["今天天气", "学习的意义", "人工智能", "编程语言", "音乐的魅力", "健康生活", "工作态度"]
    trainer.rl_training(prompts, epochs=10, batch_size=2)
    
    # 测试生成
    logger.info("\n=== 生成测试 ===")
    test_prompts = ["今天天气", "人工智能", "学习", "健康", "工作"]
    
    for prompt in test_prompts:
        generated = trainer.generate_text(prompt, max_length=20)
        logger.info(f"提示: {prompt}")
        logger.info(f"生成: {generated}")
        logger.info("-" * 50)
    
    # 保存模型
    trainer.save_models()
    
    logger.info("RLHF训练流程完成!")

if __name__ == "__main__":
    main()

网站公告

今日签到

点亮在社区的每一天
去签到