Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/reward_model_sglang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
- name: Install the current repository
run: |
pip3 install -e .[test]
pip3 install sglang-router==0.1.8
pip3 install sglang-router==0.2.2
- name: Prepare gsm8k dataset
run: |
ray stop --force
Expand All @@ -111,6 +111,10 @@ jobs:
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=sglang pytest -s -x tests/experimental/reward/test_agent_loop_reward_manager.py
- name: Running sglang agent loop with reward model colocate tests on 8 L20 GPUs
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=sglang pytest -s -x tests/experimental/reward/test_agent_reward_loop_colocate.py

cleanup:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/reward_model_vllm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ jobs:
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=vllm pytest -s -x tests/experimental/reward/test_agent_loop_reward_manager.py
- name: Running vllm agent loop with reward model colocate tests on 8 L20 GPUs
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=vllm pytest -s -x tests/experimental/reward/test_agent_reward_loop_colocate.py

cleanup:
runs-on: ubuntu-latest
Expand Down
7 changes: 3 additions & 4 deletions tests/experimental/agent_loop/agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,14 @@ def init_agent_loop_manager(config: DictConfig) -> AgentLoopManager | RayWorkerG
return actor_rollout_wg

if config.reward_model.enable_resource_pool and config.reward_model.enable:
rm_wg = all_wg["rm"]
rm_wg.init_model()
rm_resource_pool = resource_pool_manager.get_resource_pool(Role.RewardModel)
else:
rm_wg = None
rm_resource_pool = None
# =========================== 2. Create AgentLoopManager ===========================
agent_loop_manager = AgentLoopManager(
config=config,
worker_group=actor_rollout_wg,
rm_wg=rm_wg,
rm_resource_pool=rm_resource_pool,
)

return agent_loop_manager
136 changes: 136 additions & 0 deletions tests/experimental/reward/test_agent_reward_loop_colocate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import ray
from hydra import compose, initialize_config_dir
from torchdata.stateful_dataloader import StatefulDataLoader
from transformers import AutoTokenizer

from verl.experimental.agent_loop import AgentLoopManager
from verl.experimental.reward.reward_model import RewardModelManager
from verl.protocol import DataProto
from verl.single_controller.ray import RayClassWithInitArgs, RayWorkerGroup
from verl.trainer.main_ppo import create_rl_sampler
from verl.trainer.ppo.ray_trainer import ResourcePoolManager
from verl.utils.dataset.rl_dataset import RLHFDataset, collate_fn
from verl.workers.fsdp_workers import ActorRolloutRefWorker, AsyncActorRolloutRefWorker


def test_agent_loop_reward_manager():
ray.init(
runtime_env={
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "INFO",
"VLLM_USE_V1": "1",
}
}
)
with initialize_config_dir(config_dir=os.path.abspath("recipe/fapo/config")):
config = compose("rm_config")

rollout_model_path = os.path.expanduser("~/models/Qwen/Qwen2.5-0.5B-Instruct")
reward_model_path = os.path.expanduser("~/models/Qwen/Qwen2.5-1.5B-Instruct")
Comment on lines +45 to +46
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The test uses hardcoded paths to local model files within the user's home directory (~/models/...). This makes the test non-portable and will cause it to fail in CI environments or on other developers' machines. Tests should be self-contained and not rely on a specific local file structure.

To fix this, consider using a smaller, publicly available model from the Hugging Face Hub that can be downloaded automatically during the test setup. If a specific model architecture is needed, a mock model could be created.


# actor_rollout_ref config
config.data.return_raw_chat = True
config.data.max_prompt_length = 1024
config.data.max_response_length = 4096
config.actor_rollout_ref.model.path = rollout_model_path
config.actor_rollout_ref.actor.use_dynamic_bsz = True
config.actor_rollout_ref.rollout.name = os.getenv("ROLLOUT_NAME", "vllm")
config.actor_rollout_ref.rollout.mode = "async"
config.actor_rollout_ref.rollout.tensor_model_parallel_size = 2
config.actor_rollout_ref.rollout.gpu_memory_utilization = 0.8
config.actor_rollout_ref.rollout.enforce_eager = True
config.actor_rollout_ref.rollout.prompt_length = 1024
config.actor_rollout_ref.rollout.response_length = 4096
config.actor_rollout_ref.rollout.skip_tokenizer_init = True
config.trainer.n_gpus_per_node = 8
config.trainer.nnodes = 1

config.reward_model.reward_manager = "dapo"
config.reward_model.enable = True
config.reward_model.enable_resource_pool = False
config.reward_model.n_gpus_per_node = 8
config.reward_model.model.path = reward_model_path
config.reward_model.rollout.name = os.getenv("ROLLOUT_NAME", "vllm")
config.reward_model.rollout.gpu_memory_utilization = 0.8
config.reward_model.rollout.tensor_model_parallel_size = 2
config.reward_model.rollout.skip_tokenizer_init = False
config.reward_model.rollout.prompt_length = 5120
config.reward_model.rollout.response_length = 4096
config.custom_reward_function.path = "tests/experimental/reward/reward_fn.py"
config.custom_reward_function.name = "compute_score_gsm8k"

# 1. init reward model manager
actor_rollout_cls = (
AsyncActorRolloutRefWorker if config.actor_rollout_ref.rollout.mode == "async" else ActorRolloutRefWorker
)
global_pool_id = "global_pool"
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=None)
resource_pool_manager.create_resource_pool()
resource_pool = resource_pool_manager.resource_pool_dict[global_pool_id]
actor_rollout_cls = RayClassWithInitArgs(
cls=ray.remote(actor_rollout_cls), config=config.actor_rollout_ref, role="actor_rollout"
)
actor_rollout_wg = RayWorkerGroup(
resource_pool=resource_pool,
ray_cls_with_init=actor_rollout_cls,
)
actor_rollout_wg.init_model()

agent_loop_manager = AgentLoopManager(config, worker_group=actor_rollout_wg)
reward_model_manager = RewardModelManager(config.reward_model, resource_pool=resource_pool)

# 2. init test data
local_folder = os.path.expanduser("~/data/gsm8k/")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Similar to the model paths, this data path is hardcoded to a local directory in the user's home folder (~/data/...). This will cause the test to fail on other machines. Please include a small sample of the data directly in the test suite, or download it from a public source to a temporary directory during test setup.

data_files = [os.path.join(local_folder, "train.parquet")]
tokenizer = AutoTokenizer.from_pretrained(rollout_model_path)

dataset = RLHFDataset(
data_files=data_files,
tokenizer=tokenizer,
config=config.data,
processor=None,
)

batch_size = 64
sampler = create_rl_sampler(config.data, dataset)
dataloader = StatefulDataLoader(
dataset=dataset,
batch_size=batch_size,
num_workers=config.data.dataloader_num_workers,
drop_last=True,
collate_fn=collate_fn,
sampler=sampler,
)

# 3. generate responses
batch_dict = next(iter(dataloader))
batch = DataProto.from_single_dict(batch_dict)
gen_batch = agent_loop_manager.generate_sequences(prompts=batch)
sampling_params = {"temperature": 0.0, "top_p": 1.0, "max_tokens": 1024}
genrm_outputs = reward_model_manager.generate_sequences(gen_batch, sampling_params=sampling_params)

print(genrm_outputs[0])

print("done")

ray.shutdown()
11 changes: 8 additions & 3 deletions verl/experimental/agent_loop/agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from verl.experimental.agent_loop.utils import resolve_config_path
from verl.experimental.reward import RewardManagerWorker
from verl.protocol import DataProto
from verl.single_controller.ray.base import RayWorkerGroup
from verl.single_controller.ray.base import RayResourcePool, RayWorkerGroup
from verl.utils import hf_processor, hf_tokenizer
from verl.utils.fs import copy_to_local
from verl.utils.model import compute_position_id_with_mask
Expand Down Expand Up @@ -686,12 +686,15 @@ async def get_trajectory_info(step, index, validate):
class AgentLoopManager:
"""Agent loop manager that manages a group of agent loop workers."""

def __init__(self, config: DictConfig, worker_group: RayWorkerGroup = None, rm_wg: RayWorkerGroup = None):
def __init__(
self, config: DictConfig, worker_group: RayWorkerGroup = None, rm_resource_pool: RayResourcePool = None
):
"""Initialize agent loop manager.

Args:
config (DictConfig): trainer config.
worker_group (RayWorkerGroup): ActorRolloutRef worker group for hybrid mode; None for standalone mode.
rm_resource_pool (RayResourcePool): Resource pool for reward model (Standalone mode).
"""
self.config = config
self.worker_group = worker_group
Expand All @@ -700,7 +703,9 @@ def __init__(self, config: DictConfig, worker_group: RayWorkerGroup = None, rm_w
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
from verl.experimental.reward import RewardModelManager

self.reward_model_manager = RewardModelManager(config.reward_model, rm_wg)
# TODO (dyy): current rm is colocated with the legacy fsdp/megatron rm
# future pr will depericate fsdp/megatron rm and init RewardModelManager in standalone mode
self.reward_model_manager = RewardModelManager(config.reward_model, rm_resource_pool)
self.reward_router_address = self.reward_model_manager.get_router_address()

# for recipe to change
Expand Down
21 changes: 13 additions & 8 deletions verl/experimental/reward/reward_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from openai.types.chat import ChatCompletion

from verl import DataProto
from verl.single_controller.ray.base import RayWorkerGroup
from verl.single_controller.ray.base import RayResourcePool
from verl.workers.config import HFModelConfig, RewardModelConfig
from verl.workers.rollout.replica import get_rollout_replica_class

Expand All @@ -32,16 +32,16 @@
class RewardModelManager:
"""Reward model manager."""

def __init__(self, config: RewardModelConfig, worker_group: RayWorkerGroup = None):
def __init__(self, config: RewardModelConfig, resource_pool: RayResourcePool = None):
"""
Initialize the reward model manager.

Args:
config (RewardModelConfig): Reward model configuration.
worker_group (RayWorkerGroup, optional): Worker group. Defaults to None.
resource_pool (RayResourcePool, optional): Resource pool. Defaults to None.
"""
self.config = config
self.worker_group = worker_group
self.resource_pool = resource_pool
self._initialize_llm_servers()
self._initialize_router()
if self.config.rollout.free_cache_engine:
Expand All @@ -50,8 +50,8 @@ def __init__(self, config: RewardModelConfig, worker_group: RayWorkerGroup = Non
def _initialize_llm_servers(self):
rollout_world_size = self.config.rollout.tensor_model_parallel_size
world_size = (
self.worker_group.world_size
if self.worker_group # colocate mode
self.resource_pool.world_size
if self.resource_pool # colocate mode
else self.config.n_gpus_per_node * self.config.nnodes # standalone mode
)
num_replicas = world_size // rollout_world_size
Expand All @@ -74,10 +74,11 @@ def _initialize_llm_servers(self):
)
for replica_rank in range(num_replicas)
]
if self.worker_group:
self._run_all([server.init_colocated(self.worker_group) for server in self.rollout_replicas])
if self.resource_pool:
self._run_all([server.init_colocated(self.resource_pool) for server in self.rollout_replicas])
else:
self._run_all([server.init_standalone() for server in self.rollout_replicas])

self.server_handles = [server._server_handle for server in self.rollout_replicas]
self.server_addresses = [server._server_address for server in self.rollout_replicas]

Expand Down Expand Up @@ -123,6 +124,8 @@ async def chat_complete(self, chat_complete_request: dict):
await session.close()

def generate_sequences(self, prompts: DataProto, sampling_params: dict):
if self.config.rollout.free_cache_engine:
self.wake_up()
chat_complete_requests = [
{
"model": self.config.model.path,
Expand All @@ -133,4 +136,6 @@ def generate_sequences(self, prompts: DataProto, sampling_params: dict):
]
tasks = [self.chat_complete(chat_complete_request) for chat_complete_request in chat_complete_requests]
results = self._run_all(tasks)
if self.config.rollout.free_cache_engine:
self.sleep()
return results
12 changes: 10 additions & 2 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ def create_resource_pool(self):
# For FSDP backend, we recommend using max_colocate_count=1 that merge all WorkerGroups into one.
# For Megatron backend, we recommend using max_colocate_count>1
# that can utilize different WorkerGroup for differnt models
# max_colocate_count = 3: actor_critic_ref, rollout, reward model (optional)
resource_pool = RayResourcePool(
process_on_nodes=process_on_nodes, use_gpu=True, max_colocate_count=1, name_prefix=resource_pool_name
process_on_nodes=process_on_nodes, use_gpu=True, max_colocate_count=3, name_prefix=resource_pool_name
)
self.resource_pool_dict[resource_pool_name] = resource_pool

Expand Down Expand Up @@ -770,8 +771,15 @@ def init_workers(self):
from verl.experimental.agent_loop import AgentLoopManager

self.async_rollout_mode = True
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
rm_resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
else:
rm_resource_pool = None

self.async_rollout_manager = AgentLoopManager(
config=self.config, worker_group=self.actor_rollout_wg, rm_wg=self.rm_wg
config=self.config,
worker_group=self.actor_rollout_wg,
rm_resource_pool=rm_resource_pool,
)

def _save_checkpoint(self):
Expand Down
19 changes: 14 additions & 5 deletions verl/workers/rollout/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,28 @@ async def init_hybrid(self, worker_group: RayWorkerGroup):
]
await self.launch_servers()

# TODO(@dyy): init with resource_pool?
# TODO(sgm): this should be the default solution, but need to make the RolloutMode more clear.
async def init_colocated(self, worker_group: RayWorkerGroup):
async def init_colocated(self, resource_pool: RayResourcePool):
"""Init colocated rollout server, rollout engine and hybrid engine colocated in same ray placement group
but in separate processes.

Args:
resource_pool: RayResourcePool, ray placement group where hybrid engine processes have been launched.
"""
self.rollout_mode = RolloutMode.COLOCATED
self.workers = worker_group.workers[
self.world_size * self.replica_rank : self.world_size * (self.replica_rank + 1)
]
self.resource_pool = resource_pool

worker_group = RayWorkerGroup(
resource_pool=self.resource_pool,
ray_cls_with_init=self.get_ray_class_with_init_args(),
bin_pack=False,
name_prefix=f"rollout_colocate_{self.replica_rank}"
if not self.is_reward_model
else f"rollout_reward_colocate_{self.replica_rank}",
replica_rank=self.replica_rank,
replica_world_size=self.world_size,
)
self.workers = worker_group.workers
await self.launch_servers()

async def init_standalone(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ async def wake_up(self):
await asyncio.gather(*[worker.wake_up.remote() for worker in self.workers])
elif self.rollout_mode == RolloutMode.COLOCATED:
# Directly call engine to wake up without sync weights.
# FIXME(@wuxibin): sglang seems resume with random weights.
obj = ResumeMemoryOccupationReqInput(tags=["kv_cache", "weights"])
await self.tokenizer_manager.resume_memory_occupation(obj, None)
await self.tokenizer_manager.flush_cache()
Expand Down
2 changes: 0 additions & 2 deletions verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,6 @@ async def _execute_method(self, method: str | bytes, *args, **kwargs):
return self._init_worker(*args, **kwargs)
elif method == "load_model":
return self._load_model(*args, **kwargs)
elif method == "sleep" or method == "wake_up":
raise ValueError("wake_up and sleep should not be called through ZeroMQ")
else:
return self.inference_engine.execute_method(method, *args, **kwargs)

Expand Down