Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .github/workflows/reward_model_sglang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
- name: Install the current repository
run: |
pip3 install -e .[test]
pip3 install sglang-router==0.2.2
pip3 install sglang-router==0.1.8
- name: Prepare gsm8k dataset
run: |
ray stop --force
Expand All @@ -111,10 +111,6 @@ jobs:
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=sglang pytest -s -x tests/experimental/reward/test_agent_loop_reward_manager.py
- name: Running sglang agent loop with reward model colocate tests on 8 L20 GPUs
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=sglang pytest -s -x tests/experimental/reward/test_agent_reward_loop_colocate.py

cleanup:
runs-on: ubuntu-latest
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/reward_model_vllm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ jobs:
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=vllm pytest -s -x tests/experimental/reward/test_agent_loop_reward_manager.py
- name: Running vllm agent loop with reward model colocate tests on 8 L20 GPUs
run: |
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
ROLLOUT_NAME=vllm pytest -s -x tests/experimental/reward/test_agent_reward_loop_colocate.py

cleanup:
runs-on: ubuntu-latest
Expand Down
7 changes: 4 additions & 3 deletions tests/experimental/agent_loop/agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ def init_agent_loop_manager(config: DictConfig) -> AgentLoopManager | RayWorkerG
return actor_rollout_wg

if config.reward_model.enable_resource_pool and config.reward_model.enable:
rm_resource_pool = resource_pool_manager.get_resource_pool(Role.RewardModel)
rm_wg = all_wg["rm"]
rm_wg.init_model()
else:
rm_resource_pool = None
rm_wg = None
# =========================== 2. Create AgentLoopManager ===========================
agent_loop_manager = AgentLoopManager(
config=config,
worker_group=actor_rollout_wg,
rm_resource_pool=rm_resource_pool,
rm_wg=rm_wg,
)

return agent_loop_manager
136 changes: 0 additions & 136 deletions tests/experimental/reward/test_agent_reward_loop_colocate.py

This file was deleted.

11 changes: 3 additions & 8 deletions verl/experimental/agent_loop/agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from verl.experimental.agent_loop.utils import resolve_config_path
from verl.experimental.reward import RewardManagerWorker
from verl.protocol import DataProto
from verl.single_controller.ray.base import RayResourcePool, RayWorkerGroup
from verl.single_controller.ray.base import RayWorkerGroup
from verl.utils import hf_processor, hf_tokenizer
from verl.utils.fs import copy_to_local
from verl.utils.model import compute_position_id_with_mask
Expand Down Expand Up @@ -686,15 +686,12 @@ async def get_trajectory_info(step, index, validate):
class AgentLoopManager:
"""Agent loop manager that manages a group of agent loop workers."""

def __init__(
self, config: DictConfig, worker_group: RayWorkerGroup = None, rm_resource_pool: RayResourcePool = None
):
def __init__(self, config: DictConfig, worker_group: RayWorkerGroup = None, rm_wg: RayWorkerGroup = None):
"""Initialize agent loop manager.

Args:
config (DictConfig): trainer config.
worker_group (RayWorkerGroup): ActorRolloutRef worker group for hybrid mode; None for standalone mode.
rm_resource_pool (RayResourcePool): Resource pool for reward model (Standalone mode).
"""
self.config = config
self.worker_group = worker_group
Expand All @@ -703,9 +700,7 @@ def __init__(
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
from verl.experimental.reward import RewardModelManager

# TODO (dyy): current rm is colocated with the legacy fsdp/megatron rm
# future pr will depericate fsdp/megatron rm and init RewardModelManager in standalone mode
self.reward_model_manager = RewardModelManager(config.reward_model, rm_resource_pool)
self.reward_model_manager = RewardModelManager(config.reward_model, rm_wg)
self.reward_router_address = self.reward_model_manager.get_router_address()

# for recipe to change
Expand Down
21 changes: 8 additions & 13 deletions verl/experimental/reward/reward_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from openai.types.chat import ChatCompletion

from verl import DataProto
from verl.single_controller.ray.base import RayResourcePool
from verl.single_controller.ray.base import RayWorkerGroup
from verl.workers.config import HFModelConfig, RewardModelConfig
from verl.workers.rollout.replica import get_rollout_replica_class

Expand All @@ -32,16 +32,16 @@
class RewardModelManager:
"""Reward model manager."""

def __init__(self, config: RewardModelConfig, resource_pool: RayResourcePool = None):
def __init__(self, config: RewardModelConfig, worker_group: RayWorkerGroup = None):
"""
Initialize the reward model manager.

Args:
config (RewardModelConfig): Reward model configuration.
resource_pool (RayResourcePool, optional): Resource pool. Defaults to None.
worker_group (RayWorkerGroup, optional): Worker group. Defaults to None.
"""
self.config = config
self.resource_pool = resource_pool
self.worker_group = worker_group
self._initialize_llm_servers()
self._initialize_router()
if self.config.rollout.free_cache_engine:
Expand All @@ -50,8 +50,8 @@ def __init__(self, config: RewardModelConfig, resource_pool: RayResourcePool = N
def _initialize_llm_servers(self):
rollout_world_size = self.config.rollout.tensor_model_parallel_size
world_size = (
self.resource_pool.world_size
if self.resource_pool # colocate mode
self.worker_group.world_size
if self.worker_group # colocate mode
else self.config.n_gpus_per_node * self.config.nnodes # standalone mode
)
num_replicas = world_size // rollout_world_size
Expand All @@ -74,11 +74,10 @@ def _initialize_llm_servers(self):
)
for replica_rank in range(num_replicas)
]
if self.resource_pool:
self._run_all([server.init_colocated(self.resource_pool) for server in self.rollout_replicas])
if self.worker_group:
self._run_all([server.init_colocated(self.worker_group) for server in self.rollout_replicas])
else:
self._run_all([server.init_standalone() for server in self.rollout_replicas])

self.server_handles = [server._server_handle for server in self.rollout_replicas]
self.server_addresses = [server._server_address for server in self.rollout_replicas]

Expand Down Expand Up @@ -124,8 +123,6 @@ async def chat_complete(self, chat_complete_request: dict):
await session.close()

def generate_sequences(self, prompts: DataProto, sampling_params: dict):
if self.config.rollout.free_cache_engine:
self.wake_up()
chat_complete_requests = [
{
"model": self.config.model.path,
Expand All @@ -136,6 +133,4 @@ def generate_sequences(self, prompts: DataProto, sampling_params: dict):
]
tasks = [self.chat_complete(chat_complete_request) for chat_complete_request in chat_complete_requests]
results = self._run_all(tasks)
if self.config.rollout.free_cache_engine:
self.sleep()
return results
12 changes: 2 additions & 10 deletions verl/trainer/ppo/ray_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ def create_resource_pool(self):
# For FSDP backend, we recommend using max_colocate_count=1 that merge all WorkerGroups into one.
# For Megatron backend, we recommend using max_colocate_count>1
# that can utilize different WorkerGroup for differnt models
# max_colocate_count = 3: actor_critic_ref, rollout, reward model (optional)
resource_pool = RayResourcePool(
process_on_nodes=process_on_nodes, use_gpu=True, max_colocate_count=3, name_prefix=resource_pool_name
process_on_nodes=process_on_nodes, use_gpu=True, max_colocate_count=1, name_prefix=resource_pool_name
)
self.resource_pool_dict[resource_pool_name] = resource_pool

Expand Down Expand Up @@ -779,15 +778,8 @@ def init_workers(self):
from verl.experimental.agent_loop import AgentLoopManager

self.async_rollout_mode = True
if self.config.reward_model.enable and self.config.reward_model.enable_resource_pool:
rm_resource_pool = self.resource_pool_manager.get_resource_pool(Role.RewardModel)
else:
rm_resource_pool = None

self.async_rollout_manager = AgentLoopManager(
config=self.config,
worker_group=self.actor_rollout_wg,
rm_resource_pool=rm_resource_pool,
config=self.config, worker_group=self.actor_rollout_wg, rm_wg=self.rm_wg
)

def _save_checkpoint(self):
Expand Down
19 changes: 5 additions & 14 deletions verl/workers/rollout/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,19 @@ async def init_hybrid(self, worker_group: RayWorkerGroup):
]
await self.launch_servers()

# TODO(@dyy): init with resource_pool?
# TODO(sgm): this should be the default solution, but need to make the RolloutMode more clear.
async def init_colocated(self, resource_pool: RayResourcePool):
async def init_colocated(self, worker_group: RayWorkerGroup):
"""Init colocated rollout server, rollout engine and hybrid engine colocated in same ray placement group
but in separate processes.

Args:
resource_pool: RayResourcePool, ray placement group where hybrid engine processes have been launched.
"""
self.rollout_mode = RolloutMode.COLOCATED
self.resource_pool = resource_pool

worker_group = RayWorkerGroup(
resource_pool=self.resource_pool,
ray_cls_with_init=self.get_ray_class_with_init_args(),
bin_pack=False,
name_prefix=f"rollout_colocate_{self.replica_rank}"
if not self.is_reward_model
else f"rollout_reward_colocate_{self.replica_rank}",
replica_rank=self.replica_rank,
replica_world_size=self.world_size,
)
self.workers = worker_group.workers
self.workers = worker_group.workers[
self.world_size * self.replica_rank : self.world_size * (self.replica_rank + 1)
]
await self.launch_servers()

async def init_standalone(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def wake_up(self):
await asyncio.gather(*[worker.wake_up.remote() for worker in self.workers])
elif self.rollout_mode == RolloutMode.COLOCATED:
# Directly call engine to wake up without sync weights.
# FIXME(@wuxibin): sglang seems resume with random weights.
obj = ResumeMemoryOccupationReqInput(tags=["kv_cache", "weights"])
await self.tokenizer_manager.resume_memory_occupation(obj, None)
await self.tokenizer_manager.flush_cache()
Expand Down
2 changes: 2 additions & 0 deletions verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,8 @@ async def _execute_method(self, method: str | bytes, *args, **kwargs):
return self._init_worker(*args, **kwargs)
elif method == "load_model":
return self._load_model(*args, **kwargs)
elif method == "sleep" or method == "wake_up":
raise ValueError("wake_up and sleep should not be called through ZeroMQ")
else:
return self.inference_engine.execute_method(method, *args, **kwargs)

Expand Down
Loading