由于代码较长,最好的方式是逐个模块实现:
1.LayerNorm
class LayerNorm(nn.Module):
def __init__(self, ndim, bias=None):
super().__init__()
self.weight = nn.Parameter(torch.ones(ndim))
self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
def forward(self, input):
return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)
测试:
x = torch.randn(4, 2, 8)
ndim = 8
my_layernorm = Layer_Norm(ndim)
my_output = my_layernorm(x)
layernorm = nn.LayerNorm(ndim)
layernorm.weight.data = my_layernorm.weight.clone()
if my_layernorm.bias is not None:
layernorm.bias.data = my_layernorm.bias.clone()
official_output = layernorm(x)
print("差异是否很小:", torch.allclose(my_output, official_output, atol=1e-6))
2.Attention:
class CausalSelfAttention(nn.Module):
def __init__(self, config):
super().__init__()
self.embd = config.embedding_size
self.n_head = config.n_head
assert self.embd % self.n_head == 0
self.dropout = config.dropout
self.bias = config.bias
self.c_attn = nn.Linear(self.embd, 3 * self.embd, self.bias)
self.c_proj = nn.Linear(self.embd, self.embd, self.bias)
self.resid_dropout = nn.Dropout(self.dropout)
self.attn_dropout = nn.Dropout(self.dropout)
self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
if not self.flash:
print("Warning!Current Torch dosen't have scaled_dot_product_attention")
self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
.view(1, 1, config.block_size, config.block_size))
def forward(self, x):
B, T, C = x.size()
qkv = self.c_attn(x)
q, k, v = torch.split(qkv, self.embd, dim=2)
q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
if self.flash:
y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, is_causal=True, dropout_p=self.dropout if self.training else 0)
else:
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = att.masked_fill(self.bias[:, :, :T, :T], float('-inf'))
att = F.softmax(att)
att = self.attn_dropout(att)
y = att @ v
y = y.transpose(1, 2).contiguous().view(B, T, C)
y = self.resid_dropout(self.c_proj(y))
return y
3.MLP、Block和Config
class MLP(nn.Module):
def __init__(self, config):
super().__init__()
self.l_1 = nn.Linear(config.embedding_size, 4 * config.embedding_size, config.bias)
self.gelu = nn.GELU()
self.l_2 = nn.Linear(config.embedding_size * 4, config.embedding_size, config.bias)
self.dropout = nn.Dropout(config.dropout)
def forward(self, x):
x = self.l_1(x)
x = self.gelu(x)
x = self.l_2(x)
x = self.dropout(x)
return x
class Block(nn.Module):
def __init__(self, config):
super().__init__()
self.ln1 = LayerNorm(config.embedding_size, config.bias)
self.att = CausalSelfAttention(config)
self.ln2 = LayerNorm(config.embedding_size, config.bias)
self.mlp = MLP(config)
def forward(self, x):
x = x + self.att(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x
class Config(nn.Module):
n_head: int = 12
embedding_size: int = 768
bias: bool = True
dropout: float = 0.0
block_size: int = 1024
#n_layer:
#vocab_size:
前四个部分复现中易出现的错误:
(1) CausalSelfAttention.forward
的 x.shape()
错误
B, T, C = x.shape()
应为:
B, T, C = x.shape
原因:shape
是一个属性,不是方法。
(2)注意在使用scaled_dot_product_attention时的dropout需要判断模型是训练态还是测试态,测试态不需要dropout
y = F.scaled_dot_product_attention(q, k, v, dropout_p=self.dropout if self.training else 0,is_causal=True)
⚠️解释:其他的不用是应为dropout时使用的nn.Dropout(),里面会自动判断模型是什么状态从而确定是否dropout。
(3)self.bias = torch.tril(...)
写法错误,tril中应该是一个tensor
self.register_buffer("bias", torch.tril([config.block_size, config.block_size])...)
应为:
self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
.view(1, 1, config.block_size, config.block_size))
torch.tril()
需要的是 Tensor,不是 list。
(5)初始化class时不要忘记super().__init__()
4.GPT
首先给出GPT模型的三个最主要的功能函数,init、forward和generate
(1)__init__()
def __init__(self, config):
super().__init__()
assert config.vocab_size is not None
assert config.block_size is not None
self.config = config
#pos = torch.arange(config.block_size)
self.transformer = nn.ModuleDict(dict(
wte = nn.Embedding(config.vocab_size, config.n_embd),
wpe = nn.Embedding(config.block_size, config.n_embd),
h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
drop = nn.Dropout(config.dropout),
ln_f = LayerNorm(config.n_embd, bias=config.bias)
))
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=config.bias)
self.transformer.wte.weight = self.lm_head.weight #权重共享
#初始化权重
self.apply(self.init_weight_)
for np, p in self.named_parameters():
if np.endswith('c_proj.weight'):
torch.nn.init.normal_(p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer))
print("num of parameters:%.2fM" % (self.get_num_params()/1e6,))
关于权重共享:建议是lm_head的权重覆盖wte的weight,因为如果反过来lm_head的输出权重会被抹掉,模型就会失效
(2)forward
def forward(self, x, target=None):
device = x.device
b, t = x.size()
assert t <= self.config.block_size
pos = torch.arange(0, t, dtype=torch.long, device=device)
token_emb = self.transformer.wte(x)
pos_emb = self.transformer.wpe(pos)
x = self.transformer.drop(token_emb + pos_emb)
for block in self.transformer.h:
x = block(x)
x = self.transformer.ln_f(x)
if target is not None:
logits = self.lm_head(x)
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), target.view(-1), ignore_index=-1)
#-1:合并除最后一维外的维度
else:
logits = self.lm_head(x[:, [-1], :])
loss = None
return logits, loss
判断target是否为空实际上是在判断是否为训练模式,下面的logits=self.lm_head(x[:, [-1], :])保留第二个维度是因为要和target不为None时的logits维度保持一致。
(3)generate
def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
for _ in max_new_tokens:
idx_cond = idx if idx.size(1) <= self.config.block_size else idx[:, -self.config.block_size:]
logits, _ = self(idx_cond)
logits = logits[:, -1, :]
logits = logits / temperature
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float('inf')
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
idx = torch.cat((idx, idx_next), dim=1)
return idx
(4)crop_block_size,config_optimizer,init_weight,get_num_params, from_pretrained
其他的功能是在train和sample中用到才构建的,比如crop_block_size是针对输入的block_size小于config中的block_size这种情况设置的,又如from_pretrained是针对继承之前训练过的模型这种情况,figure_optimize是生成optimizer,estimate_mfu是bench中评估模型的
def init_weight_(self, module):
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def get_num_params(self, non_embedding=True):
n_params = sum(p.numel() for p in self.parameters())
if non_embedding:
n_params -= self.transformer.wpe.weight.numel()
#这里不减去wte的原因是wte的权重和lm_head共享
return n_params
def crop_block_size(self, block_size):
assert block_size <= self.config.block_size
self.config.block_size = block_size
self.transformer.wpe.weight=nn.Parameter(self.transformer.wpe.weight[:block_size])
for block in self.transformer.h:
if hasattr(block.attn, 'bias'):
block.attn.bias = block.attn.bias[:, :, :block_size, :block_size]
def from_pretrained(self, model_type, override_arg=None):
assert model_type in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'}
override_arg = override_arg or {}
assert all(k == 'dropout' for k in override_arg)
from transformers import GPT2LMHeadModel
print("Loading model from pretrained_model %s" % model_type)
#定义config
config_args = {
'gpt2': dict(n_layer=12, n_head=12, n_embd=768),
'gpt2-medium': dict(n_layer=24, n_head=16, n_embd=1024),
'gpt2-large': dict(n_layer=36, n_head=20, n_embd=1280),
'gpt2-xl': dict(n_layer=48, n_head=25, n_embd=1600),
}[model_type]
print("Enforcign the bias=, block_size=m=, vocab_size=")
config_args['bias'] = True
config_args['block_size'] = 1024
config_args['vocab_size'] = 50257
if 'dropout' in override_arg:
print(f"updata the dropout rate to {override_arg['dropout']}")
config_args['dropout'] = override_arg['dropout']
#定义model
config = GPTConfig(**config_args)
model = GPT(config)
#转移权重
sd = model.state_dict()
sd_keys = sd.keys()
sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')]
model_hf = GPT2LMHeadModel.from_pretrained(model_type)
sd_hf = model_hf.state_dict()
sd_hf_keys = sd_hf.keys()
sd_hf_keys = [k for k in sd_hf_keys if not k.endswith('.attn.bias')]
sd_hf_keys = [k for k in sd_hf_keys if not k.endswith('.attn.masked_bias')]
transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', ',lp.c_proj.weight']
assert len(sd_hf_keys) == len(sd_keys)
for k in sd_hf_keys:
if any(k.endswith(w) for w in transposed):
assert sd[k].shape[::-1] == sd_hf[k].shape
with torch.no_grad():
sd[k].copy_(sd_hf[k].t())
else:
assert sd[k].shape == sd_hf[k].shape
with torch.no_grad():
sd[k].copy_(sd_hf[k])
return model
def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
params = {np: p for np, p in self.named_parameters()}
params = {np: p for np, p in params.items() if p.requires_grad}
params_decay = [p for np, p in params.items() if p.dim() >= 2]
params_nodecay = [p for np, p in params.items() if p.dim() < 2]
optim_groups = [
{'params':params_decay, 'weight_decay':weight_decay},
{'params':params_nodecay, 'weight_decay':0.0}
]
num_decay_params = sum(p.numel() for p in params_decay)
num_nodecay_params = sum(p.numel() for p in params_nodecay)
print(f"num decayed parameters tensors:{len(params_decay)}, with {num_decay_params} parameters")
print(f"num nodecayed parameters tensors:{len(params_nodecay)}, with {num_nodecay_params} parameters")
fused_available = 'fuse' in inspect.signature(torch.optim.AdamW).parameters
use_fused = device_type == 'cuda' and fused_available
extra_arg = dict(fused=True) if use_fused else dict()
optimizer = torch.optim.AdamW(optim_groups, learning_rate, betas, **extra_arg)
return optimizer
def estimate_mfu(self, fwdbwd_per_iter, dt):
N = self.get_num_params()
cfg = self.config
L, H, Q, T = cfg.n_layer, cfg.n_head, cfg.n_embd // cfg.n_head, cfg.block_size
flops_per_token = 6 * N + 12 * L * H * Q * T
flops_per_fwdbwd = flops_per_token * T
flops_per_iter = flops_per_fwdbwd * fwdbwd_per_iter
flops_achieved = flops_per_iter * (1.0 / dt)
flop_promised = 312e12
mfu = flops_achieved / flop_promised
return mfu
GPT中可能出现的错误
(1)权重共享前后顺序
(2)初始化权重的时候不要忘记c_proj.weight中对std又进一步缩小
(3)init_wieght_中如果是Linear要判断是否又bias
(4)get_num_params中不减wte是因为wte和lm_head是权重共享
(5)关于crop_block_size,之所以只修改transformer.h的attn.bias和wpe.weight 是因为整个模型中只有这两个地方使用了block_size
(6)还有就是测试的时候使用torch,randint生成后要转为float才能进模型