I. 前言
在上一篇文章verl:一个集SFT与RL于一体的灵活大模型post-training框架 (快速入门)
中,初步探讨了verl框架的基础使用方法。在实际工业级模型训练场景中,分布式训练往往是必须的。在这篇文章中,将主要探讨verl框架在单机多卡和多机多卡场景下的使用细节,力求小白也能上手使用。
与上一篇文章一致,这篇文章也将分别对SFT和RL(以GRPO为例)两个场景下的分布式训练进行讨论。上一篇文章中的配置文件是几个月前的版本了,verl后续对RL的配置文件新增了一些字段,所以这篇文章中也展开介绍下。
II. SFT
2.1 单机多卡
在verl中,训练前需要先写一个yaml配置文件。我一般习惯将yaml文件放置在verl/verl/trainer/config文件夹下,当然你也可以放在任何路径下,然后像我上一篇文章讲的那样在脚本里传入一个config文件路径即可。现在假设config路径下有一个sft.yaml文件:
data:
train_batch_size: 64
micro_batch_size: null # will be deprecated, use micro_batch_size_per_gpu
micro_batch_size_per_gpu: 1 # 单卡上的bsz
train_files: data/alpaca_zh.parquet # parquet文件
val_files: data/alpaca_zh.val.parquet
prompt_key: prompt # prompt字段
response_key: response # response字段
max_length: 12288 # 输入的最大长度为12K,即prompt + response长度最大为12K
truncation: right # 超出12K后从右边截断
balance_dp_token: False
chat_template: null
model:
partial_pretrain: models/Qwen2.5-7B-Instruct # 待微调的模型路径
fsdp_config:
wrap_policy:
min_num_params: 0
cpu_offload: False # 是否使用cpu offload
offload_params: False # 是否将参数进行offload,如果显存较小,这两个可以都写为True
external_lib: null
enable_gradient_checkpointing: True # 打开会节约一点显存
trust_remote_code: False
lora_rank: 0 # Set to positive value to enable LoRA (e.g., 32)
lora_alpha: 16 # LoRA scaling factor
target_modules: all-linear # Target modules for LoRA adaptation
use_liger: False
optim:
lr: 1e-6 # 学习率
betas: [0.9, 0.95]
weight_decay: 0.01
warmup_steps_ratio: 0.1
clip_grad: 1.0
ulysses_sequence_parallel_size: 2
use_remove_padding: True
trainer:
default_local_dir: verl_sft/Qwen25_7b_sft # 训练后的模型的保存路径
default_hdfs_dir: null # change the hdfs path here
resume_path: null
project_name: ""
experiment_name: ""
total_epochs: 3 # epoch数
save_freq: 100 # 多少个step后保存模型,这里可以根据你数据集数量重新算一下
total_training_steps: null
logger: ['console']
seed: 1
接着,自己新写一个运行脚本run_sft.sh放到script文件夹下:
set -x
CONFIG_PATH="/xxx/verl/verl/trainer/config/sft.yaml" # 刚刚写的脚本,这里应该写绝对路径
nproc_per_node=8 # 单个机器的卡数量
torchrun --standalone --nnodes=1 --nproc_per_node=$nproc_per_node --master_port=65536 \
-m verl.trainer.fsdp_sft_trainer \
--config_path=$CONFIG_PATH
# bash scripts/run_sft.sh 2>&1 | tee -a scripts/log/sft_log_file.txt
然后
bash scripts/run_sft.sh 2>&1 | tee -a scripts/log/sft_log_file.txt
就可以启动单机多卡任务了,相关的训练日志保存在scripts/log/sft_log_file.txt。
在训练中我一般使用swanlab来记录相关日志,如需使用,只需将配置文件中logger的属性值改为['console', 'swanlab']
即可。修改后再次运行时会提示登录账号,如果不想登录可做如下设置:
export SWANLAB_MODE=local # 包含四种模式:cloud云端跟踪模式(默认)、cloud-only仅云端跟踪本地不保存文件、local本地跟踪模式、disabled完全不记录用于debug
export SWANLAB_LOG_DIR=/xxx/RL/swanlab_log # 设置本地日志存储路径
设置后由于各种乱七八糟的原因,可能不会直接生效。我解决的办法是直接修改源码,找到verl/verl/utils/tracking.py
:
2.2 多机多卡
SFT的多机多卡实现十分简单。将数据集、模型、verl文件夹在多个机器上安装布置好,然后每个机器上都写一个run_multi_node_7b_sft.sh脚本:
set -x
nnodes=4 # 机器数量
nproc_per_node=8 # 每台机器上的卡数量
CONFIG_PATH="verl/verl/trainer/config/sft.yaml" # 这里一定要修改成你自己机器上的绝对路径
MAIN_NODE_IP=00.00.00.00 # head机器的ip,可以是第一台机器的ip
node_rank=0 # 这台机器的rank,第一台为0,剩下的依次为123
port=8324
python3 -m torch.distributed.run --nnodes=$nnodes --nproc_per_node=$nproc_per_node \
--node_rank=$node_rank --master_addr=$MAIN_NODE_IP --master_port=$port \
-m verl.trainer.fsdp_sft_trainer \
--config_path=$CONFIG_PATH
# bash scripts/run_multi_node_7b_sft.sh 2>&1 | tee -a scripts/log/qwen_7b_sft.txt
这里假设有4台机器,4台机器的上述脚本只有node_rank不一致,其他均保持一致。
启动多机多卡训练任务的方式是依次在各台机器上运行:
bash scripts/run_multi_node_7b_sft.sh 2>&1 | tee -a scripts/log/qwen_7b_sft.txt
模型日志和ckpt最后都会保存在MAIN_NODE_IP对应的机器上。
III. RL (GRPO)
跑GRPO的话,这里就直接使用网络上比较常见的gsm8k数据集。从huggingface上下载到本地后一定记得使用verl提供的代码对数据进行处理:
import re
import os
import datasets
def extract_solution(solution_str):
solution = re.search("#### (\\-?[0-9\\.\\,]+)", solution_str)
assert solution is not None
final_solution = solution.group(0)
final_solution = final_solution.split('#### ')[1].replace(',', '')
return final_solution
if __name__ == '__main__':
data_source = 'data/gsm8k'
dataset = datasets.load_dataset(data_source, 'main')
train_dataset = dataset['train']
test_dataset = dataset['test']
instruction_following = "Let's think step by step and output the final answer after \"####\"."
# add a row to each data item that represents a unique id
def make_map_fn(split):
def process_fn(example, idx):
question_raw = example.pop('question')
question = question_raw + ' ' + instruction_following
answer_raw = example.pop('answer')
solution = extract_solution(answer_raw)
data = {
"data_source": data_source,
"prompt": [{
"role": "user",
"content": question,
}],
"ability": "math",
"reward_model": {
"style": "rule",
"ground_truth": solution
},
"extra_info": {
'split': split,
'index': idx,
'answer': answer_raw,
"question": question_raw,
}
}
return data
return process_fn
train_dataset = train_dataset.map(function=make_map_fn('train'), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn('test'), with_indices=True)
train_dataset.to_parquet('data/gsm8k_train.parquet')
test_dataset.to_parquet('data/gsm8k_test.parquet')
上述代码中将question进行了包装:
[{
"role": "user",
"content": question,
}]
这样处理的原因是后续在训练时会对prompt添加对话模板,如果私人数据集中prompt字段是str格式,一定一定要提前处理一下变成上述格式,不然会有致命错误。
3.1 单机多卡
与SFT一样,这里同样需要写一个yaml配置文件,假设路径为verl/verl/trainer/config/grpo_trainer.yaml:
data:
tokenizer: null
train_files: data/gsm8k_train.parquet # 数据集路径
val_files: data/gsm8k_test.parquet # 同上
prompt_key: prompt # prompt字段
max_prompt_length: 512 # prompt的最大长度
max_response_length: 512 # response的最大长度
train_batch_size: 64 # bsz
val_batch_size: null # DEPRECATED: Validation datasets are sent to inference engines as a whole batch, which will schedule the memory themselves
return_raw_input_ids: False # This should be set to true when the tokenizer between policy and rm differs
return_raw_chat: False
shuffle: True # 是否打乱数据集
filter_overlong_prompts: False # for large-scale dataset, filtering overlong prompts could be timeconsuming. You should disable this and set `truncation='left'
truncation: error # 截断策略,这里一般选error,保证prompt长度小于max_prompt_length
image_key: images
actor_rollout_ref:
hybrid_engine: True
model:
path: models/Qwen2.5-7B-Instruct # SFT模型路径
external_lib: null
override_config: { }
enable_gradient_checkpointing: True
use_remove_padding: True
actor:
strategy: fsdp # This is for backward-compatibility
ppo_mini_batch_size: 64
ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu
ppo_micro_batch_size_per_gpu: 1
use_dynamic_bsz: True # False则启用梯度累积
ppo_max_token_len_per_gpu: 16384 # n * ${data.max_prompt_length} + ${data.max_response_length}
grad_clip: 1.0
clip_ratio: 0.2
entropy_coeff: 0.001
use_kl_loss: True # True for GRPO
use_torch_compile: True # False to disable torch compile
kl_loss_coef: 0.04 # for grpo,kl_loss的系数
kl_loss_type: low_var_kl # for grpo
ppo_epochs: 1
shuffle: False
ulysses_sequence_parallel_size: 2 # sp size
checkpoint:
contents: ['model', 'hf_model', 'optimizer', 'extra'] # with 'hf_model' you can save whole model as hf format, now only use sharded model checkpoint to save space
optim:
lr: 1e-6 # 学习率
lr_warmup_steps: -1 # Prioritized. Negative values mean delegating to lr_warmup_steps_ratio.
lr_warmup_steps_ratio: 0.1 # the total steps will be injected during runtime
min_lr_ratio: null # only useful for warmup with cosine
warmup_style: cosine # select from constant/cosine
total_training_steps: -1 # must be override by program
fsdp_config:
wrap_policy:
# transformer_layer_cls_to_wrap: None
min_num_params: 0
param_offload: True
optimizer_offload: True
fsdp_size: -1
ref:
fsdp_config:
param_offload: True
wrap_policy:
# transformer_layer_cls_to_wrap: None
min_num_params: 0
log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu
log_prob_micro_batch_size_per_gpu: null
log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz}
log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}
ulysses_sequence_parallel_size: ${actor_rollout_ref.actor.ulysses_sequence_parallel_size} # sp size
rollout:
name: vllm
temperature: 0.9
top_k: -1 # 0 for hf rollout, -1 for vllm rollout
top_p: 0.95
use_fire_sampling: False # https://arxiv.org/abs/2410.21236
prompt_length: ${data.max_prompt_length} # not use for opensource
response_length: ${data.max_response_length}
# for vllm rollout
dtype: bfloat16 # should align with FSDP
gpu_memory_utilization: 0.8
ignore_eos: False
enforce_eager: True # vllm 0.8.1需要关闭
free_cache_engine: True # vllm 0.8.1需要关闭
load_format: dummy_dtensor
tensor_model_parallel_size: 4
max_num_batched_tokens: 12288
max_model_len: 12288
max_num_seqs: 1024
log_prob_micro_batch_size: null # will be deprecated, use log_prob_micro_batch_size_per_gpu
log_prob_micro_batch_size_per_gpu: null
log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz}
log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}
disable_log_stats: True
enable_chunked_prefill: False # may get higher throughput when set to True. When activated, Please increase max_num_batched_tokens or decrease max_model_len.
# for hf rollout
do_sample: True
# number of responses (i.e. num sample times)
n: 16 # > 1 for grpo
val_kwargs:
# sampling parameters for validation
top_k: -1 # 0 for hf rollout, -1 for vllm rollout
top_p: 1.0
temperature: 0
n: 1
do_sample: False # default eager for validation
critic:
strategy: fsdp
optim:
lr: 1e-6
lr_warmup_steps_ratio: 0. # the total steps will be injected during runtime
min_lr_ratio: null # only useful for warmup with cosine
warmup_style: constant # select from constant/cosine
total_training_steps: -1 # must be override by program
model:
path: ~/models/deepseek-llm-7b-chat
tokenizer_path: ${actor_rollout_ref.model.path}
override_config: { }
external_lib: ${actor_rollout_ref.model.external_lib}
enable_gradient_checkpointing: True
use_remove_padding: False
fsdp_config:
param_offload: False
optimizer_offload: False
wrap_policy:
# transformer_layer_cls_to_wrap: None
min_num_params: 0
fsdp_size: -1
ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size}
ppo_micro_batch_size: null # will be deprecated, use ppo_micro_batch_size_per_gpu
ppo_micro_batch_size_per_gpu: null
forward_micro_batch_size: ${critic.ppo_micro_batch_size}
forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu}
use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz}
ppo_max_token_len_per_gpu: 32768 # (${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}) * 2
forward_max_token_len_per_gpu: ${critic.ppo_max_token_len_per_gpu}
ulysses_sequence_parallel_size: 1 # sp size
ppo_epochs: ${actor_rollout_ref.actor.ppo_epochs}
shuffle: ${actor_rollout_ref.actor.shuffle}
grad_clip: 1.0
cliprange_value: 0.5
checkpoint:
contents: ['model', 'hf_model', 'optimizer', 'extra'] # with 'hf_model' you can save whole model as hf format, now only use sharded model checkpoint to save space
reward_model:
enable: False
strategy: fsdp
model:
input_tokenizer: ${actor_rollout_ref.model.path} # set this to null if the chat template is identical
path: ~/models/FsfairX-LLaMA3-RM-v0.1
external_lib: ${actor_rollout_ref.model.external_lib}
use_remove_padding: False
fsdp_config:
wrap_policy:
min_num_params: 0
param_offload: False
fsdp_size: -1
micro_batch_size: null # will be deprecated, use micro_batch_size_per_gpu
micro_batch_size_per_gpu: null # set a number
max_length: null
ulysses_sequence_parallel_size: 1 # sp size
use_dynamic_bsz: ${critic.use_dynamic_bsz}
forward_max_token_len_per_gpu: ${critic.forward_max_token_len_per_gpu}
reward_manager: naive
custom_reward_function:
path: null
name: compute_score
algorithm:
gamma: 1.0
lam: 1.0
adv_estimator: grpo
kl_penalty: kl # how to estimate kl divergence
kl_ctrl:
type: fixed
kl_coef: 0.04
trainer:
balance_batch: True
total_epochs: 1
total_training_steps: null
project_name: "xxx"
experiment_name: "xxx"
logger: ['console', 'swanlab'] # remove wandb
val_generations_to_log_to_wandb: 0
nnodes: 1
n_gpus_per_node: 8
save_freq: 10
# auto: find the last ckpt to resume. If can't find, start from scratch
resume_mode: disable # or disable or resume_path if
val_before_train: False
resume_from_path: null
test_freq: 0
critic_warmup: 0
default_hdfs_dir: null
del_local_ckpt_after_load: False
remove_previous_ckpt_in_save: False
default_local_dir: output/Qwen2.5-7B-GRPO # 模型的保存路径
max_actor_ckpt_to_keep: null
max_critic_ckpt_to_keep: null
接着,同样写一个启动脚本,假设为scripts/run_grpo.sh:
set -x
export VLLM_ATTENTION_BACKEND=XFORMERS
export CUDA_LAUNCH_BLOCKING=1
nproc_per_node=8 # 卡数量
CONFIG_PATH="verl/verl/trainer/config/grpo_trainer.yaml" # 替换为刚刚配置文件的绝对路径
python3 -m verl.trainer.main_ppo \
--config_path=$CONFIG_PATH
# bash scripts/run_grpo.sh 2>&1 | tee -a scripts/log/grpo_log_file.txt
最后启动训练:
bash scripts/run_grpo.sh 2>&1 | tee -a scripts/log/grpo_log_file.txt
3.2 多机多卡
RL的多机多卡比SFT稍微复杂一点。首先要做的依然是在多台机器上配置好环境,同时各个机器上要同时存在数据集和模型,并且路径要保持一致,即配置文件中的路径。
这里以两台机器A和B为例,并且A为head节点,也就是启动训练的机器。接着,在2台机器上新建一个ray的启动脚本worker_start.sh:
#!/bin/bash
export VLLM_ATTENTION_BACKEND=XFORMERS
HEAD_IP=192.168.0.1 # 这里为head节点的IP,也就是机器A的IP
LOCAL_IP=192.168.0.1 # 这里为本机ip
PORT=8888 # 这里的port需要和前面的保持一致
# ray status
# 判断本机IP是否为Head节点的IP
if [ "$LOCAL_IP" == "$HEAD_IP" ]; then
echo "本机 $LOCAL_IP 是Head节点,启动Head节点..."
ray start --head --port=$PORT --min-worker-port=20122 --max-worker-port=20999
else
echo "本机 $LOCAL_IP 是Worker节点,连接到Head节点 $HEAD_IP..."
ray start --address=$POD_0_IP:$PORT --min-worker-port=20122 --max-worker-port=20999
fi
两台机器的上述脚本的唯一区别就是LOCAL_IP,其他均保持不变。然后,首先在机器A上运行该脚本,接着机器B上运行该脚本。启动完毕后,在机器A上可以使用ray status命令查看当前有几台机器。
最后,在机器A上启动多机多卡训练:
set -x
export VLLM_ATTENTION_BACKEND=XFORMERS
export NCCL_DEBUG=INFO
CONFIG_PATH="verl/verl/trainer/config/multi_node_grpo.yaml" # 这里替换为绝对路径
python3 -m verl.trainer.main_ppo \
--config_path=$CONFIG_PATH
# bash scripts/run_multi_node_7b_grpo.sh 2>&1 | tee -a scripts/log/multi_node_7b_grpo.txt
multi_node_grpo.yaml
和之前单机多卡配置文件的区别就是nnodes(机器数量)和n_gpus_per_node(每台机器上的GPU数量)两个参数,修改一下即可。
2.3 模型转换
RL训练完毕后模型会被保存在多个机器上,并且不是hf格式,因此需要转换。具体来说,将其他所有机器上某个step文件夹下model开头的pt文件都移到head机器中,然后运行如下脚本进行转换:
from typing import List, Tuple, Dict
import re
import os
import torch
import argparse
from transformers import AutoConfig, AutoModelForCausalLM, AutoModelForTokenClassification, AutoTokenizer
from concurrent.futures import ThreadPoolExecutor
from torch.distributed._tensor import DTensor, Shard, Placement
def merge_by_placement(tensors: List[torch.Tensor], placement: Placement):
if placement.is_replicate():
return tensors[0]
elif placement.is_partial():
raise NotImplementedError("Partial placement is not supported yet")
elif placement.is_shard():
return torch.cat(tensors, dim=placement.dim).contiguous()
else:
raise ValueError(f"Unsupported placement: {placement}")
if __name__ == '__main__':
step = 100
local_dir = f"output/Qwen2.5-7B-GRPO/global_step_{step}/actor" # 这里需要替换为绝对路径
hf_path = f"output/Qwen2.5-7B-GRPO/global_step_{step}/actor/huggingface" # 这里需要替换为绝对路径
output_path = f"models/Qwen2.5-7B-Instruct-GRPO" # 这里需要替换为绝对路径
# copy rank zero to find the shape of (dp, fsdp)
rank = 0
world_size = 0
for filename in os.listdir(local_dir):
match = re.match(r"model_world_size_(\d+)_rank_0\.pt", filename)
if match:
world_size = match.group(1)
break
assert world_size, "No model file with the proper format"
state_dict = torch.load(os.path.join(local_dir, f'model_world_size_{world_size}_rank_{rank}.pt'), map_location='cpu')
pivot_key = sorted(list(state_dict.keys()))[0]
weight = state_dict[pivot_key]
assert isinstance(weight, torch.distributed._tensor.DTensor)
# get sharding info
device_mesh = weight.device_mesh
mesh = device_mesh.mesh
mesh_dim_names = device_mesh.mesh_dim_names
print(f'Got device mesh {mesh}, mesh_dim_names {mesh_dim_names}')
assert mesh_dim_names in (
('fsdp',),
), f'Unsupported mesh_dim_names {mesh_dim_names}'
if 'tp' in mesh_dim_names:
# fsdp * tp
total_shards = mesh.shape[-1] * mesh.shape[-2]
mesh_shape = (mesh.shape[-2], mesh.shape[-1])
else:
# fsdp
total_shards = mesh.shape[-1]
mesh_shape = (mesh.shape[-1],)
print(f'Processing model shards with {total_shards} {mesh_shape} in total')
model_state_dict_lst = []
model_state_dict_lst.append(state_dict)
model_state_dict_lst.extend([""] * (total_shards - 1))
def process_one_shard(rank):
model_path = os.path.join(local_dir, f'model_world_size_{world_size}_rank_{rank}.pt')
state_dict = torch.load(model_path, map_location='cpu', weights_only=False)
model_state_dict_lst[rank] = state_dict
return state_dict
with ThreadPoolExecutor(max_workers=min(32, os.cpu_count())) as executor:
for rank in range(1, total_shards):
executor.submit(process_one_shard, rank)
state_dict = {}
param_placements: Dict[str, List[Placement]] = {}
keys = set(model_state_dict_lst[0].keys())
for key in keys:
state_dict[key] = []
for model_state_dict in model_state_dict_lst:
try:
tensor = model_state_dict.pop(key)
except:
print("-"*30)
print(model_state_dict)
if isinstance(tensor, DTensor):
state_dict[key].append(tensor._local_tensor.bfloat16())
placements = tuple(tensor.placements)
# replicated placement at dp dimension can be discarded
if mesh_dim_names[0] == 'dp':
placements = placements[1:]
if key not in param_placements:
param_placements[key] = placements
else:
assert param_placements[key] == placements
else:
state_dict[key] = tensor.bfloat16()
del model_state_dict_lst
for key in sorted(state_dict):
if not isinstance(state_dict[key], list):
print(f"No need to merge key {key}")
continue
# merge shards
placements: Tuple[Shard] = param_placements[key]
if len(mesh_shape) == 1:
# 1-D list, FSDP without TP
assert len(placements) == 1
shards = state_dict[key]
state_dict[key] = merge_by_placement(shards, placements[0])
else:
# 2-D list, FSDP + TP
raise NotImplementedError("FSDP + TP is not supported yet")
print('Writing to local disk')
hf_path = os.path.join(local_dir, 'huggingface')
config = AutoConfig.from_pretrained(hf_path)
if 'ForTokenClassification' in config.architectures[0]:
auto_model = AutoModelForTokenClassification
elif 'ForCausalLM' in config.architectures[0]:
auto_model = AutoModelForCausalLM
else:
raise NotImplementedError(f'Unknown architecture {config["architectures"]}')
with torch.device('meta'):
model = auto_model.from_config(config, torch_dtype=torch.bfloat16)
model.to_empty(device='cpu')
print(f'Saving model to {output_path}')
tokenizer = AutoTokenizer.from_pretrained(hf_path)
tokenizer.save_pretrained(output_path)
model.save_pretrained(output_path, state_dict=state_dict)