Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
**/*.tar.gz
**/playground
**/wandb

**/tensorboard_log
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
314 changes: 314 additions & 0 deletions recipe/grpo/grpo_ray_trainer.py

Large diffs are not rendered by default.

265 changes: 265 additions & 0 deletions recipe/grpo/main_grpo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Note that we don't combine the main with ray_trainer as ray_trainer is used by other main.
"""

import hydra
import ray

import pandas as pd
from .grpo_ray_trainer import RayGRPOTrainer
from verl.trainer.ppo.reward import load_reward_manager
from torch.utils.data import Dataset
from verl.utils.dataset.rl_dataset import RLHFDataset as OriginalRLHFDataset


class RLHFDataset(OriginalRLHFDataset):
def _read_files_and_tokenize(self):
dataframes = []
for parquet_file in self.data_files:
# read parquet files and cache
if parquet_file.endswith('parquet'):
dataframe = pd.read_parquet(parquet_file)
elif parquet_file.endswith('json'):
dataframe = pd.read_json(parquet_file)
elif parquet_file.endswith('jsonl'):
chunks = []
for chunk in pd.read_json(
parquet_file,
lines=True,
chunksize=10000,
):
chunks.append(chunk)

dataframe = pd.concat(chunks, ignore_index=True)
else:
raise
dataframes.append(dataframe)
self.dataframe = pd.concat(dataframes)

print(f"dataset len: {len(self.dataframe)}")

if self.config.data.get('system_prompt', None) is not None:
system_prompt = self.config.data.system_prompt
self.dataframe[self.prompt_key] = self.dataframe[self.prompt_key].apply(
lambda x: [{'role': 'system', 'content': system_prompt}]+x
)
# filter out too long prompts
if self.filter_overlong_prompts:
tokenizer = self.tokenizer
prompt_key = self.prompt_key
self.dataframe = self.dataframe.filter(
lambda doc: len(tokenizer.apply_chat_template(doc[prompt_key], add_generation_prompt=True)) <= self.max_prompt_length,
num_proc=self.num_workers,
desc=f"Filtering prompts longer than {self.max_prompt_length} tokens",
)

print(f"filter dataset len: {len(self.dataframe)}")

@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None)
def main(config):
run_grpo(config)


def run_grpo(config) -> None:
if not ray.is_initialized():
# this is for local ray cluster
ray.init(
runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_LOGGING_LEVEL": "WARN", "VLLM_ALLOW_RUNTIME_LORA_UPDATING": "true"}},
num_cpus=config.ray_init.num_cpus,
)

runner = TaskRunner.remote()
ray.get(runner.run.remote(config))
# create a timeline trace file to analyze the performance
timeline_json_file = config.ray_init.get("timeline_json_file", None)
if timeline_json_file:
ray.timeline(filename=timeline_json_file)


@ray.remote(num_cpus=1) # please make sure main_task is not scheduled on head
class TaskRunner:
def run(self, config):
# print initial config
from pprint import pprint

from omegaconf import OmegaConf

from verl.utils.fs import copy_to_local

pprint(OmegaConf.to_container(config, resolve=True)) # resolve=True will eval symbol values
OmegaConf.resolve(config)

# download the checkpoint from hdfs
local_path = copy_to_local(config.actor_rollout_ref.model.path, use_shm=config.actor_rollout_ref.model.get("use_shm", False))

# instantiate tokenizer
from verl.utils import hf_processor, hf_tokenizer

trust_remote_code = config.data.get("trust_remote_code", False)
tokenizer = hf_tokenizer(local_path, trust_remote_code=trust_remote_code)
processor = hf_processor(local_path, trust_remote_code=trust_remote_code, use_fast=True) # used for multimodal LLM, could be none

# vllm early verify
if config.actor_rollout_ref.rollout.name in ["vllm"]:
from verl.utils.vllm_utils import is_version_ge

if config.actor_rollout_ref.model.get("lora_rank", 0) > 0:
if not is_version_ge(pkg="vllm", minver="0.7.3"):
raise NotImplementedError("PPO LoRA is not supported before vllm 0.7.3")

# define worker classes
if config.actor_rollout_ref.actor.strategy in ["fsdp", "fsdp2"]:
assert config.critic.strategy in ["fsdp", "fsdp2"]
from verl.single_controller.ray import RayWorkerGroup
from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
ray_worker_group_cls = RayWorkerGroup

elif config.actor_rollout_ref.actor.strategy == "megatron":
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
from verl.workers.megatron_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker, CriticWorker

actor_rollout_cls = AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
ray_worker_group_cls = NVMegatronRayWorkerGroup

else:
raise NotImplementedError

from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role

role_worker_mapping = {
Role.ActorRollout: ray.remote(actor_rollout_cls),
Role.Critic: ray.remote(CriticWorker),
}

global_pool_id = "global_pool"
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
}

# we should adopt a multi-source reward function here
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
# - for code related prompt, we send to a sandbox if there are test cases
# - finally, we combine all the rewards together
# - The reward type depends on the tag of the data
if config.reward_model.enable:
if config.reward_model.strategy in ["fsdp", "fsdp2"]:
from verl.workers.fsdp_workers import RewardModelWorker
elif config.reward_model.strategy == "megatron":
from verl.workers.megatron_workers import RewardModelWorker
else:
raise NotImplementedError
role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
mapping[Role.RewardModel] = global_pool_id

# use reference model
if config.algorithm.use_kl_in_reward or config.actor_rollout_ref.actor.use_kl_loss:
role_worker_mapping[Role.RefPolicy] = ray.remote(ActorRolloutRefWorker)
mapping[Role.RefPolicy] = global_pool_id

reward_fn = load_reward_manager(config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {}))
val_reward_fn = load_reward_manager(config, tokenizer, num_examine=1, **config.reward_model.get("reward_kwargs", {}))
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)

from verl.utils.dataset.rl_dataset import collate_fn

train_dataset = create_rl_dataset(config.data.train_files, config.data, tokenizer, processor)
val_dataset = create_rl_dataset(config.data.val_files, config.data, tokenizer, processor)
train_sampler = create_rl_sampler(config.data, train_dataset)
trainer = RayPPOTrainer(
config=config,
tokenizer=tokenizer,
processor=processor,
role_worker_mapping=role_worker_mapping,
resource_pool_manager=resource_pool_manager,
ray_worker_group_cls=ray_worker_group_cls,
reward_fn=reward_fn,
val_reward_fn=val_reward_fn,
train_dataset=train_dataset,
val_dataset=val_dataset,
collate_fn=collate_fn,
train_sampler=train_sampler,
device_name=config.trainer.device,
)
trainer.init_workers()
trainer.fit()


def create_rl_dataset(data_paths, data_config, tokenizer, processor):
"""Create a dataset.

Arguments:
data_config: The data config.
tokenizer (Tokenizer): The tokenizer.
processor (Processor): The processor.

Returns:
dataset (Dataset): The dataset.
"""

if "custom_cls" in data_config and data_config.custom_cls.get("path", None) is not None:
from verl.utils.import_utils import load_extern_type

dataset_cls = load_extern_type(data_config.custom_cls.path, data_config.custom_cls.name)
if not issubclass(dataset_cls, Dataset):
raise TypeError(f"The custom dataset class '{data_config.custom_cls.name}' from '{data_config.custom_cls.path}' must inherit from torch.utils.data.Dataset")
else:
dataset_cls = RLHFDataset
print(f"Using dataset class: {dataset_cls.__name__}")

dataset = dataset_cls(
data_files=data_paths,
tokenizer=tokenizer,
processor=processor,
config=data_config,
)

return dataset


def create_rl_sampler(data_config, dataset):
"""Create a sampler for the dataset.

Arguments:
data_config: The data config.
dataset (Dataset): The dataset.

Returns:
sampler (Sampler): The sampler.
"""
import torch
from torch.utils.data import RandomSampler, SequentialSampler

# use sampler for better ckpt resume
if data_config.shuffle:
train_dataloader_generator = torch.Generator()
train_dataloader_generator.manual_seed(data_config.get("seed", 1))
sampler = RandomSampler(data_source=dataset, generator=train_dataloader_generator)
else:
sampler = SequentialSampler(data_source=dataset)

return sampler


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions recipe/grpo/scripts/run_grpo_debug.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
set -x

# If you are using vllm<=0.6.3, you might need to set the following environment variable to avoid bugs:
# export VLLM_ATTENTION_BACKEND=XFORMERS

python3 -m verl.trainer.main_ppo \
++hydra.run.dir=outputs/${now:%Y-%m-%d}/${now:%H-%M-%S}-${env:RANK,0} \
algorithm.adv_estimator=grpo \
data.train_files=/cpfs/user/liuyanjiang/hf_datasets/DeepScaleR-Preview-Dataset/deepscaler.parquet \
data.val_files=/cpfs/user/liuyanjiang/hf_datasets/DeepScaleR-Preview-Dataset/deepscaler.parquet \
data.train_batch_size=512 \
data.max_prompt_length=512 \
data.max_response_length=16384 \
data.filter_overlong_prompts=True \
data.truncation='error' \
data.system_prompt='A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think> <answer> answer here </answer>' \
actor_rollout_ref.model.path=/cpfs/user/liuyanjiang/hf_models/Qwen2.5-1.5B-Instruct \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.actor.ppo_mini_batch_size=512 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=40 \
actor_rollout_ref.actor.use_kl_loss=True \
actor_rollout_ref.actor.kl_loss_coef=0.001 \
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
actor_rollout_ref.actor.entropy_coeff=0 \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.actor.fsdp_config.param_offload=False \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=40 \
actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
actor_rollout_ref.rollout.n=8 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=40 \
actor_rollout_ref.ref.fsdp_config.param_offload=True \
custom_reward_function.path=verl.custom_reward_functions.grpo_custom_reward_function \
algorithm.use_kl_in_reward=False \
trainer.critic_warmup=0 \
trainer.logger=['console','tensorboard'] \
trainer.project_name='grpo' \
trainer.experiment_name='debug' \
trainer.n_gpus_per_node=8 \
trainer.nnodes=1 \
trainer.default_local_dir=/newcpfs/user/liuyanjiang/ckpts \
trainer.save_freq=20 \
trainer.test_freq=5 \
trainer.total_epochs=15 $@
13 changes: 13 additions & 0 deletions recipe/moe/moe_trainer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading