继续学习字节家的VeRL,今天来看看VeRL的RL,是VeRL系列的第三篇文章(话说近期好多大事儿,我司发布了Longcat、韩立结婴、阿里周五发布了QWen-Next都是好东西啊,学不过来了damn)
- 底层分布式能力基础Ray(点击查看):VeRL分布式能力的基础,框架Ray
- VeRL的原理(点击查看):HybridFlow
- VeRL的使用(点击查看):普通RL(PPO)
- VeRL的使用,Agentic RL(多轮RL)
- VeRL的魔改
前两篇文章分别介绍了VeRL的分布式基础和其底层原理,下面就以RL的PPO为例,同时结合源码,看看具体的使用。
安装
- 使用docker的话,verl提供了诸多版本可以使用,例如纯净的只包含Verl/CUDA/PyTorch等依赖的base镜像,也有整合了vLLM/SGLang/FSDP/Megatron的application镜像
- 手动安装的话,要从CUDA/cuDNN等基础库开始,一定会遇到冲突(嗯,一定…)
使用
- 首先在Ray的Head节点上执行
ray start --head --dashboard-host=0.0.0.0
,之后会得到两个address:
- 一个是集群内head/worker之间通信用的 GCS address
- 一个是提交与查看任务/资源监控/查看日志的dashboard地址(使用VSCode插件进行debug的地址也是它)
- 然后在每个Ray Worker节点上执行
ray start --address=gcs_address
- 最后提交job任务
ray job submit --address=dashboard_address -- python3 -m verl.trainer.main_ppo trainer.n_gpus_per_node=8 ...
就可以在dashboard里看到各种信息了
启动后,整体架构如图,前两篇文章介绍过了,就不赘述了:
- 其中driver进行代表single-controller
- 其他的 actor/critic/rollout/ref/reward 那些 workers 代表 multi-controller,均对应着各自的 resource group
下面直接看源码。
源码
首先是入口函数,即main_ppo.py,主要做定义、初始化:
- 初始化 Ray cluster 环境
- 通过
@ray.remote
定义了一个 远程执行的class TaskRunner
- 定义
actor/rollout worker
:通过配置指定使用fsdp
、megatron
,并构建mapping
,role_worker_mapping[Role.ActorRollout] = ray.remote(actor_rollout_cls)
- 定义
criticworker
- 将上述两个
worker
映射到resourece
资源上:mapping[Role.ActorRollout] = global_pool_id
、mapping[Role.Critic] = global_pool_id
- 定义
rewardworker
:role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
的同时映射资源mapping[Role.RewardModel] = "global_pool"
- 定义
refworker
:role_worker_mapping[Role.RefPolicy] = ray.remote(ref_policy_cls)
的同时映射资源mapping[Role.RefPolicy] = "global_pool"
- 执行PPO workflow:加载模型、准备dataset、构建RayPPOTrainer、执行
RayPPOTrainer.init_workers()
、执行RayPPOTrainer.fit()
# Initialize the PPO trainer.
trainer = RayPPOTrainer(config=config,tokenizer=tokenizer,processor=processor,role_worker_mapping=self.role_worker_mapping,resource_pool_manager=resource_pool_manager,ray_worker_group_cls=ray_worker_group_cls,reward_fn=reward_fn,val_reward_fn=val_reward_fn,train_dataset=train_dataset,val_dataset=val_dataset,collate_fn=collate_fn,train_sampler=train_sampler,
)
# Initialize the workers of the trainer.
trainer.init_workers()
# Start the training process.
trainer.fit()
然后执行的是核心的RayPPOTrainer
,主要就是俩函数,一个是init_workers()
,一个是fit()
先看init_workers()
:
- 根据config配置的资源创建
resource pool
- 创建
hybrid_engine
,这是actor
和rollout
的 colocate的复合体
resource_pool = self.resource_pool_manager.get_resource_pool(Role.ActorRollout)
actor_rollout_cls = RayClassWithInitArgs(cls=self.role_worker_mapping[Role.ActorRollout],config=self.config.actor_rollout_ref,role="actor_rollout",
)
self.resource_pool_to_cls[resource_pool]["actor_rollout"] = actor_rollout_cls
- 创建
critic
resource_pool = self.resource_pool_manager.get_resource_pool(Role.Critic)
critic_cfg = omega_conf_to_dataclass(self.config.critic)
critic_cls = RayClassWithInitArgs(cls=self.role_worker_mapping[Role.Critic], config=critic_cfg)
self.resource_pool_to_cls[resource_pool]["critic"] = critic_cls
- 创建
ref
resource_pool = self.resource_pool_manager.get_resource_pool(Role.RefPolicy)
ref_policy_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.RefPolicy],config=self.config.actor_rollout_ref,role="ref",
)
self.resource_pool_to_cls[resource_pool]["ref"] = ref_policy_cls
- 创建
reward
,下面设置用的是reward model
非function
:
resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
rm_cls = RayClassWithInitArgs(self.role_worker_mapping[Role.RewardModel], config=self.config.reward_model)
self.resource_pool_to_cls[resource_pool]["rm"] = rm_cls
- 创建各自的
wroker group
,WorkerGroup
是一组Wroker
的抽象集合,使得driver可以和底层的多个worker进行交互:
for resource_pool, class_dict in self.resource_pool_to_cls.items():worker_dict_cls = create_colocated_worker_cls(class_dict=class_dict)wg_dict = self.ray_worker_group_cls(resource_pool=resource_pool,ray_cls_with_init=worker_dict_cls,**wg_kwargs,)spawn_wg = wg_dict.spawn(prefix_set=class_dict.keys())all_wg.update(spawn_wg)if self.use_critic:self.critic_wg = all_wg["critic"]self.critic_wg.init_model()if self.use_reference_policy and not self.ref_in_actor: # 需要关注self.ref_policy_wg = all_wg["ref"]self.ref_policy_wg.init_model()if self.use_rm:self.rm_wg = all_wg["rm"]self.rm_wg.init_model()
这里需要注意的是:
actor
和rollout
进行colocate
的目的:是在rollout和train两个阶段间高效更新参数权重- 但是否也同样也
colocate ref
,取决于是否用了LoRA
,因为ref
和actor
它们的base基座模型一样,只不过actor
lora
多了一层lora
的适配层,也就是BA矩阵,所以如果用LoRA
,可以把rollout/actor/ref
同时colocate
到一起,更省资源
之后再看fit()
,其实就是标准的PPO实现了,下面提取出关键信息:
for prompt in dataloader:output = actor_rollout_ref_wg.generate_sequences(prompt) # old_log_prob = actor_rollout_ref_wg.compute_log_prob(output)ref_log_prob = actor_rollout_ref_wg.compute_ref_log_prob(output)values = critic_wg.compute_values(output)rewards = reward_wg.compute_scores(output)advantages = compute_advantages(values, rewards)output = output.union(old_log_prob).union(ref_log_prob).union(values).union(rewards).union(advantages)actor_rollout_ref_wg.update_actor(output)critic.update_critic(output)
另外,关于driver
和wroker
的数据交互,大致可以分成3步:
driver
把数据按DP
数量进行切分- 把数据分发给每个
worker
- 每个
worker
再将执行的结果进行整合,所以VeRL这里搞了一个语法糖@register
class ActorRolloutRefWorker(Worker):@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)def generate_sequences(self, prompts: DataProto):prompts = prompts.to(torch.cuda.current_device())
上面的注解@register
装饰了方法generate_sequence
,包含了 dispatch_mode
对应的:
dispatch_func
:把输入dispatch到worker group中的各个workercollect_func
:把worker group的各个worker的response collect到一起
下篇文章介绍下如何使用VeRL
进行Agentic RL
,也就是多轮RL。