Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cc2df6b
add xdg ulysses
Jun 9, 2025
95f3466
add grpo scripts
Jun 9, 2025
86a66a6
适配redmoe+mcore by光速
Jun 26, 2025
75bd461
Bump from guangsu
Jul 12, 2025
28a0dd9
[feat] Add async-rl with param-sync and async-pipeline
ziqi-wlb Jul 14, 2025
c1f94a5
Update README
ziqi-wlb Aug 19, 2025
6a3e533
Refine code
ziqi-wlb Aug 19, 2025
ae10015
rebase to main
ziqi-wlb Aug 20, 2025
15e7718
add offload-grad for megatron-worker
ziqi-wlb Aug 20, 2025
ad39348
Refine code
ziqi-wlb Aug 20, 2025
c7e0216
Refine code
ziqi-wlb Aug 20, 2025
d1914e5
Refine code
ziqi-wlb Aug 20, 2025
a9cab3f
[trainer, megatron, rollout, sglang, model] feat: Support Async rl st…
ziqi-wlb Aug 20, 2025
6e42f66
Fix save checkpoint
ziqi-wlb Aug 21, 2025
f319332
Merge from feat/async-ref-logp
ziqi-wlb Aug 29, 2025
e4619d7
Fix pp param-sync
ziqi-wlb Aug 29, 2025
56a34c1
Fallback to per-tensor-generator and fix load-checkpoint
ziqi-wlb Sep 2, 2025
c3ecc80
Support valid skip first-val-step
ziqi-wlb Sep 2, 2025
68fdedc
Fix ref-model path for resume
ziqi-wlb Sep 2, 2025
06a9493
Fix cpu-oom for large model
ziqi-wlb Sep 4, 2025
c2f4988
Add memory_efficient_mode to fallback to single buffer for param-update
ziqi-wlb Sep 4, 2025
d7ae3ca
Add clear buffer for param-update
ziqi-wlb Sep 4, 2025
ce683e3
Add overlap logp and recv
ziqi-wlb Sep 4, 2025
9bce5a0
Add nccl-sync for param-update
ziqi-wlb Sep 4, 2025
b77c224
WIP: debug for nccl-sync
ziqi-wlb Sep 4, 2025
ecf14f8
WIP: hang at step2 param-update
ziqi-wlb Sep 5, 2025
ee4db3a
Fix hang for param-update when nccl-sync
ziqi-wlb Sep 5, 2025
dd9b5dc
Porting: support dots model register for engine
ziqi-wlb Sep 5, 2025
54716bd
Fix hang for infer_tp>1
ziqi-wlb Sep 8, 2025
94ad7da
Refine code for async-param
ziqi-wlb Sep 8, 2025
473669d
optimize for param-update nccl
ziqi-wlb Sep 9, 2025
3306bf4
optimize for param-update nccl: 3.5s->2s
ziqi-wlb Sep 9, 2025
247e30a
enable mem clear and refine log
ziqi-wlb Sep 10, 2025
e4f945b
refine mem clear
ziqi-wlb Sep 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
**/*.tar.gz
**/playground
**/wandb

**/tensorboard_log
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
314 changes: 314 additions & 0 deletions recipe/grpo/grpo_ray_trainer.py

Large diffs are not rendered by default.

265 changes: 265 additions & 0 deletions recipe/grpo/main_grpo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Note that we don't combine the main with ray_trainer as ray_trainer is used by other main.
"""

import hydra
import ray

import pandas as pd
from .grpo_ray_trainer import RayGRPOTrainer
from verl.trainer.ppo.reward import load_reward_manager
from torch.utils.data import Dataset
from verl.utils.dataset.rl_dataset import RLHFDataset as OriginalRLHFDataset


class RLHFDataset(OriginalRLHFDataset):
def _read_files_and_tokenize(self):
dataframes = []
for parquet_file in self.data_files:
# read parquet files and cache
if parquet_file.endswith('parquet'):
dataframe = pd.read_parquet(parquet_file)
elif parquet_file.endswith('json'):
dataframe = pd.read_json(parquet_file)
elif parquet_file.endswith('jsonl'):
chunks = []
for chunk in pd.read_json(
parquet_file,
lines=True,
chunksize=10000,
):
chunks.append(chunk)

dataframe = pd.concat(chunks, ignore_index=True)
else:
raise
dataframes.append(dataframe)
self.dataframe = pd.concat(dataframes)

print(f"dataset len: {len(self.dataframe)}")

if self.config.data.get('system_prompt', None) is not None:
system_prompt = self.config.data.system_prompt
self.dataframe[self.prompt_key] = self.dataframe[self.prompt_key].apply(
lambda x: [{'role': 'system', 'content': system_prompt}]+x
)
# filter out too long prompts
if self.filter_overlong_prompts:
tokenizer = self.tokenizer
prompt_key = self.prompt_key
self.dataframe = self.dataframe.filter(

Check failure on line 63 in recipe/grpo/main_grpo.py

View workflow job for this annotation

GitHub Actions / pre-commit (3.12)

Ruff (E501)

recipe/grpo/main_grpo.py:63:121: E501 Line too long (134 > 120)
lambda doc: len(tokenizer.apply_chat_template(doc[prompt_key], add_generation_prompt=True)) <= self.max_prompt_length,
num_proc=self.num_workers,
desc=f"Filtering prompts longer than {self.max_prompt_length} tokens",
)

print(f"filter dataset len: {len(self.dataframe)}")

@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None)
def main(config):
run_grpo(config)


def run_grpo(config) -> None:
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN", "VLLM_ALLOW_RUNTIME_LORA_UPDATING": "true"}},
num_cpus=config.ray_init.num_cpus,
)

runner = TaskRunner.remote()
ray.get(runner.run.remote(config))
# create a timeline trace file to analyze the performance
timeline_json_file = config.ray_init.get("timeline_json_file", None)
if timeline_json_file:
ray.timeline(filename=timeline_json_file)


@ray.remote(num_cpus=1) # please make sure main_task is not scheduled on head
class TaskRunner:
def run(self, config):
# print initial config
from pprint import pprint

from omegaconf import OmegaConf

from verl.utils.fs import copy_to_local

pprint(OmegaConf.to_container(config, resolve=True)) # resolve=True will eval symbol values
OmegaConf.resolve(config)

# download the checkpoint from hdfs
local_path = copy_to_local(config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False))

# instantiate tokenizer
from verl.utils import hf_processor, hf_tokenizer

trust_remote_code = config.data.get("trust_remote_code", False)
tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code)
processor = hf_processor(local_path, trust_remote_code=trust_remote_code, use_fast=True) # used for multimodal LLM, could be none

# vllm early verify
if config.actor_rollout_ref.rollout.name in ["vllm"]:
from verl.utils.vllm_utils import is_version_ge

if config.actor_rollout_ref.model.get("lora_rank", 0) > 0:
if not is_version_ge(pkg="vllm", minver="0.7.3"):
raise NotImplementedError("PPO LoRA is not supported before vllm 0.7.3")

# define worker classes
if config.actor_rollout_ref.actor.strategy in ["fsdp", "fsdp2"]:
assert config.critic.strategy in ["fsdp", "fsdp2"]
from verl.single_controller.ray import RayWorkerGroup
from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
ray_worker_group_cls = RayWorkerGroup

elif config.actor_rollout_ref.actor.strategy == "megatron":
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
ray_worker_group_cls = NVMegatronRayWorkerGroup

else:
raise NotImplementedError

from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role

role_worker_mapping = {
Role.ActorRollout: ray.remote(actor_rollout_cls),
Role.Critic: ray.remote(CriticWorker),
}

global_pool_id = "global_pool"
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
}

# we should adopt a multi-source reward function here
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
# - for code related prompt, we send to a sandbox if there are test cases
# - finally, we combine all the rewards together
# - The reward type depends on the tag of the data
if config.reward_model.enable:
if config.reward_model.strategy in ["fsdp", "fsdp2"]:
from verl.workers.fsdp_workers import RewardModelWorker
elif config.reward_model.strategy == "megatron":
from verl.workers.megatron_workers import RewardModelWorker
else:
raise NotImplementedError
role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
mapping[Role.RewardModel] = global_pool_id

# use reference model
if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:
role_worker_mapping[Role.RefPolicy] = ray.remote(ActorRolloutRefWorker)
mapping[Role.RefPolicy] = global_pool_id

reward_fn = load_reward_manager(config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {}))
val_reward_fn = load_reward_manager(config, tokenizer, num_examine=1, **config.reward_model.get("reward_kwargs", {}))
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)

from verl.utils.dataset.rl_dataset import collate_fn

train_dataset = create_rl_dataset(config.data.train_files, config.data, tokenizer, processor)
val_dataset = create_rl_dataset(config.data.val_files, config.data, tokenizer, processor)
train_sampler = create_rl_sampler(config.data, train_dataset)
trainer = RayPPOTrainer(
config=config,
tokenizer=tokenizer,
processor=processor,
role_worker_mapping=role_worker_mapping,
resource_pool_manager=resource_pool_manager,
ray_worker_group_cls=ray_worker_group_cls,
reward_fn=reward_fn,
val_reward_fn=val_reward_fn,
train_dataset=train_dataset,
val_dataset=val_dataset,
collate_fn=collate_fn,
train_sampler=train_sampler,
device_name=config.trainer.device,
)
trainer.init_workers()
trainer.fit()


def create_rl_dataset(data_paths, data_config, tokenizer, processor):
"""Create a dataset.

Arguments:
data_config: The data config.
tokenizer (Tokenizer): The tokenizer.
processor (Processor): The processor.

Returns:
dataset (Dataset): The dataset.
"""

if "custom_cls" in data_config and data_config.custom_cls.get("path", None) is not None:
from verl.utils.import_utils import load_extern_type

dataset_cls = load_extern_type(data_config.custom_cls.path, data_config.custom_cls.name)
if not issubclass(dataset_cls, Dataset):
raise TypeError(f"The custom dataset class '{data_config.custom_cls.name}' from '{data_config.custom_cls.path}' must inherit from torch.utils.data.Dataset")
else:
dataset_cls = RLHFDataset
print(f"Using dataset class: {dataset_cls.__name__}")

dataset = dataset_cls(
data_files=data_paths,
tokenizer=tokenizer,
processor=processor,
config=data_config,
)

return dataset


def create_rl_sampler(data_config, dataset):
"""Create a sampler for the dataset.

Arguments:
data_config: The data config.
dataset (Dataset): The dataset.

Returns:
sampler (Sampler): The sampler.
"""
import torch
from torch.utils.data import RandomSampler, SequentialSampler

# use sampler for better ckpt resume
if data_config.shuffle:
train_dataloader_generator = torch.Generator()
train_dataloader_generator.manual_seed(data_config.get("seed", 1))
sampler = RandomSampler(data_source=dataset, generator=train_dataloader_generator)
else:
sampler = SequentialSampler(data_source=dataset)

return sampler


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions recipe/grpo/scripts/run_grpo_debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
set -x

# If you are using vllm<=0.6.3, you might need to set the following environment variable to avoid bugs:
# export VLLM_ATTENTION_BACKEND=XFORMERS

python3 -m verl.trainer.main_ppo \
++hydra.run.dir=outputs/${now:%Y-%m-%d}/${now:%H-%M-%S}-${env:RANK,0} \
algorithm.adv_estimator=grpo \
data.train_files=/cpfs/user/liuyanjiang/hf_datasets/DeepScaleR-Preview-Dataset/deepscaler.parquet \
data.val_files=/cpfs/user/liuyanjiang/hf_datasets/DeepScaleR-Preview-Dataset/deepscaler.parquet \
data.train_batch_size=512 \
data.max_prompt_length=512 \
data.max_response_length=16384 \
data.filter_overlong_prompts=True \
data.truncation='error' \
data.system_prompt='A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>' \
actor_rollout_ref.model.path=/cpfs/user/liuyanjiang/hf_models/Qwen2.5-1.5B-Instruct \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.actor.ppo_mini_batch_size=512 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=40 \
actor_rollout_ref.actor.use_kl_loss=True \
actor_rollout_ref.actor.kl_loss_coef=0.001 \
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
actor_rollout_ref.actor.entropy_coeff=0 \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.actor.fsdp_config.param_offload=False \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=40 \
actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
actor_rollout_ref.rollout.n=8 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=40 \
actor_rollout_ref.ref.fsdp_config.param_offload=True \
custom_reward_function.path=verl.custom_reward_functions.grpo_custom_reward_function \
algorithm.use_kl_in_reward=False \
trainer.critic_warmup=0 \
trainer.logger=['console','tensorboard'] \
trainer.project_name='grpo' \
trainer.experiment_name='debug' \
trainer.n_gpus_per_node=8 \
trainer.nnodes=1 \
trainer.default_local_dir=/newcpfs/user/liuyanjiang/ckpts \
trainer.save_freq=20 \
trainer.test_freq=5 \
trainer.total_epochs=15 $@
13 changes: 13 additions & 0 deletions recipe/moe/moe_trainer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading
Loading