复现一个nanoGPT——model.py

发布于:2025-07-03 ⋅ 阅读:(22) ⋅ 点赞:(0)

由于代码较长,最好的方式是逐个模块实现:

1.LayerNorm

class LayerNorm(nn.Module):
    def __init__(self, ndim, bias=None):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

测试:
x = torch.randn(4, 2, 8)
ndim = 8

my_layernorm = Layer_Norm(ndim)
my_output = my_layernorm(x)

layernorm = nn.LayerNorm(ndim)
layernorm.weight.data = my_layernorm.weight.clone()
if my_layernorm.bias is not None:
    layernorm.bias.data = my_layernorm.bias.clone()

official_output = layernorm(x)
print("差异是否很小:", torch.allclose(my_output, official_output, atol=1e-6))

2.Attention: 

class CausalSelfAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embd = config.embedding_size
        self.n_head = config.n_head
        assert self.embd % self.n_head == 0
        self.dropout = config.dropout
        self.bias = config.bias

        self.c_attn = nn.Linear(self.embd, 3 * self.embd, self.bias)
        self.c_proj = nn.Linear(self.embd, self.embd, self.bias)

        self.resid_dropout = nn.Dropout(self.dropout)
        self.attn_dropout = nn.Dropout(self.dropout)

        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("Warning!Current Torch dosen't have scaled_dot_product_attention")
            self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                 .view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()
        qkv = self.c_attn(x)
        q, k, v = torch.split(qkv, self.embd, dim=2)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        if self.flash:
            y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, is_causal=True, dropout_p=self.dropout if self.training else 0)
        else:
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
            att = att.masked_fill(self.bias[:, :, :T, :T], float('-inf'))
            att = F.softmax(att)
            att = self.attn_dropout(att)
            y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_dropout(self.c_proj(y))
        return y

 3.MLP、Block和Config 

class MLP(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.l_1 = nn.Linear(config.embedding_size, 4 * config.embedding_size, config.bias)
        self.gelu = nn.GELU()
        self.l_2 = nn.Linear(config.embedding_size * 4, config.embedding_size, config.bias)
        self.dropout = nn.Dropout(config.dropout)
    def forward(self, x):
        x = self.l_1(x)
        x = self.gelu(x)
        x = self.l_2(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.ln1 = LayerNorm(config.embedding_size, config.bias)
        self.att = CausalSelfAttention(config)
        self.ln2 = LayerNorm(config.embedding_size, config.bias)
        self.mlp = MLP(config)

    def forward(self, x):
        x = x + self.att(self.ln1(x))
        x = x + self.mlp(self.ln2(x))
        return x

class Config(nn.Module):
    n_head: int = 12
    embedding_size: int = 768
    bias: bool = True
    dropout: float = 0.0
    block_size: int = 1024
    #n_layer:
    #vocab_size:

前四个部分复现中易出现的错误:

(1) CausalSelfAttention.forwardx.shape() 错误
B, T, C = x.shape()

应为:

B, T, C = x.shape

原因:shape 是一个属性,不是方法。


(2)注意在使用scaled_dot_product_attention时的dropout需要判断模型是训练态还是测试态,测试态不需要dropout
y = F.scaled_dot_product_attention(q, k, v, dropout_p=self.dropout if self.training else 0,is_causal=True)

⚠️解释:其他的不用是应为dropout时使用的nn.Dropout(),里面会自动判断模型是什么状态从而确定是否dropout。


(3)self.bias = torch.tril(...) 写法错误,tril中应该是一个tensor
self.register_buffer("bias", torch.tril([config.block_size, config.block_size])...)

应为:

self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
                                 .view(1, 1, config.block_size, config.block_size))

torch.tril() 需要的是 Tensor,不是 list。


(5)初始化class时不要忘记super().__init__()

4.GPT

首先给出GPT模型的三个最主要的功能函数,init、forward和generate

 (1)__init__()

    def __init__(self, config):
        super().__init__()
        assert config.vocab_size is not None
        assert config.block_size  is not None
        self.config = config

        #pos = torch.arange(config.block_size)
        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),
            wpe = nn.Embedding(config.block_size, config.n_embd),
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
            drop = nn.Dropout(config.dropout),
            ln_f = LayerNorm(config.n_embd, bias=config.bias)
        ))

        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=config.bias)
        self.transformer.wte.weight = self.lm_head.weight #权重共享
        #初始化权重
        self.apply(self.init_weight_)
        for np, p in self.named_parameters():
            if np.endswith('c_proj.weight'):
                torch.nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))
        print("num of parameters:%.2fM" % (self.get_num_params()/1e6,))

关于权重共享:建议是lm_head的权重覆盖wte的weight,因为如果反过来lm_head的输出权重会被抹掉,模型就会失效

(2)forward

    def forward(self, x, target=None):
        device = x.device
        b, t = x.size()
        assert t <= self.config.block_size
        pos = torch.arange(0, t, dtype=torch.long, device=device)

        token_emb = self.transformer.wte(x)
        pos_emb = self.transformer.wpe(pos)
        x = self.transformer.drop(token_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)

        if target is not None:
            logits = self.lm_head(x)
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), target.view(-1), ignore_index=-1)
            #-1:合并除最后一维外的维度
        else:
            logits = self.lm_head(x[:, [-1], :])
            loss = None
        return logits, loss

判断target是否为空实际上是在判断是否为训练模式,下面的logits=self.lm_head(x[:, [-1],  :])保留第二个维度是因为要和target不为None时的logits维度保持一致。

(3)generate

    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        for _ in max_new_tokens:
            idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
            logits, _ = self(idx_cond)
            logits = logits[:, -1, :]
            logits = logits / temperature

            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('inf')
            probs = F.softmax(logits, dim=-1)
            idx_next = torch.multinomial(probs, num_samples=1)
            idx = torch.cat((idx, idx_next), dim=1)
        return idx

(4)crop_block_size,config_optimizer,init_weight,get_num_params, from_pretrained

其他的功能是在train和sample中用到才构建的,比如crop_block_size是针对输入的block_size小于config中的block_size这种情况设置的,又如from_pretrained是针对继承之前训练过的模型这种情况,figure_optimize是生成optimizer,estimate_mfu是bench中评估模型的

    def init_weight_(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
    def get_num_params(self, non_embedding=True):
        n_params = sum(p.numel() for p in self.parameters())
        if non_embedding:
            n_params -= self.transformer.wpe.weight.numel()
        #这里不减去wte的原因是wte的权重和lm_head共享
        return n_params

    def crop_block_size(self, block_size):
        assert block_size <= self.config.block_size
        self.config.block_size = block_size
        self.transformer.wpe.weight=nn.Parameter(self.transformer.wpe.weight[:block_size])
        for block in self.transformer.h:
            if hasattr(block.attn, 'bias'):
                block.attn.bias = block.attn.bias[:, :, :block_size, :block_size]


    def from_pretrained(self, model_type, override_arg=None):
        assert model_type in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'}
        override_arg = override_arg or {}
        assert all(k == 'dropout' for k in override_arg)
        from transformers import GPT2LMHeadModel
        print("Loading model from pretrained_model %s" % model_type)
        #定义config
        config_args = {
            'gpt2':        dict(n_layer=12, n_head=12, n_embd=768),
            'gpt2-medium': dict(n_layer=24, n_head=16, n_embd=1024),
            'gpt2-large':  dict(n_layer=36, n_head=20, n_embd=1280),
            'gpt2-xl':     dict(n_layer=48, n_head=25, n_embd=1600),
        }[model_type]
        print("Enforcign the bias=, block_size=m=, vocab_size=")
        config_args['bias'] = True
        config_args['block_size'] = 1024
        config_args['vocab_size'] = 50257
        if 'dropout' in override_arg:
            print(f"updata the dropout rate to {override_arg['dropout']}")
            config_args['dropout'] = override_arg['dropout']
        #定义model
        config = GPTConfig(**config_args)
        model = GPT(config)

        #转移权重
        sd = model.state_dict()
        sd_keys = sd.keys()
        sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')]

        model_hf = GPT2LMHeadModel.from_pretrained(model_type)
        sd_hf = model_hf.state_dict()
        sd_hf_keys = sd_hf.keys()
        sd_hf_keys = [k for k in sd_hf_keys if not k.endswith('.attn.bias')]
        sd_hf_keys = [k for k in sd_hf_keys if not k.endswith('.attn.masked_bias')]
        transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', ',lp.c_proj.weight']

        assert len(sd_hf_keys) == len(sd_keys)
        for k in sd_hf_keys:
            if any(k.endswith(w) for w in transposed):
                assert sd[k].shape[::-1] == sd_hf[k].shape
                with torch.no_grad():
                    sd[k].copy_(sd_hf[k].t())
            else:
                assert sd[k].shape == sd_hf[k].shape
                with torch.no_grad():
                    sd[k].copy_(sd_hf[k])
        return model
    def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
        params = {np: p for np, p in self.named_parameters()}
        params = {np: p for np, p in params.items() if p.requires_grad}
        params_decay = [p for np, p in params.items() if p.dim() >= 2]
        params_nodecay = [p for np, p in params.items() if p.dim() < 2]

        optim_groups = [
            {'params':params_decay, 'weight_decay':weight_decay},
            {'params':params_nodecay, 'weight_decay':0.0}
        ]
        num_decay_params = sum(p.numel() for p in params_decay)
        num_nodecay_params = sum(p.numel() for p in params_nodecay)
        print(f"num decayed parameters tensors:{len(params_decay)}, with {num_decay_params} parameters")
        print(f"num nodecayed parameters tensors:{len(params_nodecay)}, with {num_nodecay_params} parameters")

        fused_available = 'fuse' in inspect.signature(torch.optim.AdamW).parameters
        use_fused = device_type == 'cuda' and fused_available
        extra_arg = dict(fused=True) if use_fused else dict()
        optimizer = torch.optim.AdamW(optim_groups, learning_rate, betas, **extra_arg)
        return optimizer
    
    def estimate_mfu(self, fwdbwd_per_iter, dt):
        N = self.get_num_params()
        cfg = self.config
        L, H, Q, T = cfg.n_layer, cfg.n_head, cfg.n_embd // cfg.n_head, cfg.block_size
        flops_per_token = 6 * N + 12 * L * H * Q * T
        flops_per_fwdbwd = flops_per_token * T
        flops_per_iter = flops_per_fwdbwd * fwdbwd_per_iter
        flops_achieved = flops_per_iter * (1.0 / dt)
        flop_promised = 312e12
        mfu = flops_achieved / flop_promised
        return mfu

GPT中可能出现的错误

(1)权重共享前后顺序

(2)初始化权重的时候不要忘记c_proj.weight中对std又进一步缩小

(3)init_wieght_中如果是Linear要判断是否又bias

(4)get_num_params中不减wte是因为wte和lm_head是权重共享

(5)关于crop_block_size,之所以只修改transformer.h的attn.bias和wpe.weight 是因为整个模型中只有这两个地方使用了block_size

(6)还有就是测试的时候使用torch,randint生成后要转为float才能进模型


网站公告

今日签到

点亮在社区的每一天
去签到