From df7d348cb50253651bebc56a8026126f5cc5546b Mon Sep 17 00:00:00 2001 From: yuyangding Date: Mon, 24 Nov 2025 21:25:39 +0800 Subject: [PATCH 1/5] update --- .../test_split_resource_pool.py | 149 ++++++++++++++++++ verl/single_controller/ray/base.py | 145 ++++++++++++++++- 2 files changed, 292 insertions(+), 2 deletions(-) create mode 100644 tests/single_controller/test_split_resource_pool.py diff --git a/tests/single_controller/test_split_resource_pool.py b/tests/single_controller/test_split_resource_pool.py new file mode 100644 index 00000000000..04b7bc74df1 --- /dev/null +++ b/tests/single_controller/test_split_resource_pool.py @@ -0,0 +1,149 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import ray +import torch + +from verl import DataProto +from verl.single_controller.base import Worker +from verl.single_controller.base.decorator import Dispatch, register +from verl.single_controller.ray.base import ( + RayClassWithInitArgs, + RayResourcePool, + RayWorkerGroup, + split_resource_pool, +) + + +@ray.remote +class Actor(Worker): + def __init__(self, worker_id) -> None: + super().__init__() + self.worker_id = worker_id + self.temp_tensor = torch.rand(4096, 4096).to("cuda") + + if not torch.distributed.is_initialized(): + rank = int(os.environ.get("RANK", 0)) + world_size = int(os.environ.get("WORLD_SIZE", 1)) + torch.distributed.init_process_group(backend="nccl", world_size=world_size, rank=rank) + + @register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO) + def add(self, data: DataProto): + data.batch["a"] += self.rank + self.worker_id + return data + + +def test_split_resource_pool_with_split_size(): + ray.init() + # assume we have 2 nodes, with 4 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[4, 4]) + global_resource_pool.get_placement_groups() + + # first 4 gpus for actor_1, last 4 gpus for actor_2 + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( + resource_pool=global_resource_pool, split_size=4 + ) + actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) + actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) + actor_worker_1 = RayWorkerGroup( + resource_pool=actor_1_resource_pool, + ray_cls_with_init=actor_cls_1, + ) + actor_worker_2 = RayWorkerGroup( + resource_pool=actor_2_resource_pool, + ray_cls_with_init=actor_cls_2, + ) + assert actor_worker_1.world_size == 4 + assert actor_worker_2.world_size == 4 + + data = DataProto.from_dict({"a": torch.zeros(8)}) + actor_output_1 = actor_worker_1.add(data) + actor_output_2 = actor_worker_2.add(data) + assert actor_output_1.batch["a"].tolist() == [0, 0, 1, 1, 2, 2, 3, 3] + assert actor_output_2.batch["a"].tolist() == [100, 100, 101, 101, 102, 102, 103, 103] + + ray.shutdown() + + +def test_split_resource_pool_with_split_size_list(): + ray.init() + # assume we have 4 nodes, with 2 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2]) + global_resource_pool.get_placement_groups() + + # first 2 gpus for actor_1, last 6 gpus for actor_2 + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( + resource_pool=global_resource_pool, split_size=[2, 6], + ) + actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) + actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) + actor_worker_1 = RayWorkerGroup( + resource_pool=actor_1_resource_pool, + ray_cls_with_init=actor_cls_1, + ) + actor_worker_2 = RayWorkerGroup( + resource_pool=actor_2_resource_pool, + ray_cls_with_init=actor_cls_2, + ) + assert actor_worker_1.world_size == 2 + assert actor_worker_2.world_size == 6 + + data_1 = DataProto.from_dict({"a": torch.zeros(4)}) + data_2 = DataProto.from_dict({"a": torch.zeros(12)}) + actor_output_1 = actor_worker_1.add(data_1) + actor_output_2 = actor_worker_2.add(data_2) + print(actor_output_1.batch["a"].tolist()) + print(actor_output_2.batch["a"].tolist()) + assert actor_output_1.batch["a"].tolist() == [0.0, 0.0, 1.0, 1.0] + assert actor_output_2.batch["a"].tolist() == [100.0, 100.0, 101.0, 101.0, 102.0, 102.0, 103.0, 103.0, 104.0, 104.0, 105.0, 105.0] + + ray.shutdown() + +def test_split_resource_pool_with_split_twice(): + ray.init() + + # assume we have 4 nodes, with 2 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2]) + global_resource_pool.get_placement_groups() + + # first 2 gpus for actor_1, last 6 gpus for actor_2 + rp_1, rp_2, rp_3 = split_resource_pool( + resource_pool=global_resource_pool, split_size=[2, 4, 2], + ) + rp_2_1, rp_2_2, rp_2_3, rp_2_4 = split_resource_pool( + resource_pool=rp_2, split_size=1, + ) + fp_list = [rp_1, rp_2_1, rp_2_2, rp_2_3, rp_2_4, rp_3] + correct_world_size = [2, 1, 1, 1, 1, 2] + correct_output = [ + [100.0, 100.0, 100.0, 100.0], + [200.0, 200.0, 200.0, 200.0], + [300.0, 300.0, 300.0, 300.0], + [400.0, 400.0, 400.0, 400.0], + [500.0, 500.0, 501.0, 501.0], + ] + for idx, rp in enumerate(fp_list): + actor_cls = RayClassWithInitArgs(cls=Actor, worker_id=idx * 100) + actor_worker = RayWorkerGroup( + resource_pool=rp, + ray_cls_with_init=actor_cls, + ) + data = DataProto.from_dict({"a": torch.zeros(4)}) + actor_output = actor_worker.add(data) + assert actor_worker.world_size == correct_world_size[idx] + print(actor_output.batch["a"].tolist()) + + ray.shutdown() diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py index 3c6c1eab153..40569af7f8f 100644 --- a/verl/single_controller/ray/base.py +++ b/verl/single_controller/ray/base.py @@ -17,6 +17,7 @@ from copy import deepcopy from typing import Any, Optional +import numpy as np import ray from ray.experimental.state.api import get_actor from ray.util.placement_group import PlacementGroup, placement_group @@ -135,10 +136,28 @@ def get_placement_groups(self, strategy="STRICT_PACK", name=None, device_name="c ray.get([pg.ready() for pg in pgs]) - self.pgs = pgs + self.pgs = sort_placement_group_by_node_ip(pgs) return pgs +class SubRayResourcePool(RayResourcePool): + def __init__( + self, + placement_groups: PlacementGroup, + start_bundle_index: int, + subgroup_world_size: int, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.pgs = placement_groups + self.start_bundle_index = start_bundle_index + self.subgroup_world_size = subgroup_world_size + + @property + def world_size(self): + return self.subgroup_world_size + + def extract_pg_from_exist( resource_pools: dict[str, RayResourcePool], src_role_names: list[str], resource_pool: RayResourcePool ) -> list: @@ -165,6 +184,41 @@ def extract_pg_from_exist( return [pg for _, pg in sorted(unsorted_pgs)] +# split a RayResourcePool or SubRayResourcePool into multiple SubRayResourcePool +def split_resource_pool( + resource_pool: RayResourcePool | SubRayResourcePool, split_size: int | list[int] +) -> list[SubRayResourcePool]: + # convert split_size to list[int] + if isinstance(split_size, int): + assert resource_pool.world_size % split_size == 0, "split_size must be a divisor of world_size" + num_replica = resource_pool.world_size // split_size + split_size_list = [split_size] * num_replica + else: + split_size_list = split_size + + assert sum(split_size_list) == resource_pool.world_size, "split_size must sum up to world_size" + + # judge if this resource pool has been splited + if isinstance(resource_pool, SubRayResourcePool): + start_bundle_idx_list = np.cumsum([resource_pool.start_bundle_index] + split_size_list[:-1]) + else: + start_bundle_idx_list = np.cumsum([0] + split_size_list[:-1]) + + split_resource_pools = [ + SubRayResourcePool( + process_on_nodes=resource_pool.store, + use_gpu=resource_pool.use_gpu, + name_prefix=f"{resource_pool.name_prefix}_split_{split_idx}", + max_colocate_count=resource_pool.max_colocate_count, + placement_groups=resource_pool.pgs, + start_bundle_index=start_bundle_idx_list[split_idx], + subgroup_world_size=split_size_list[split_idx], + ) + for split_idx in range(len(split_size_list)) + ] + return split_resource_pools + + def merge_resource_pool(rp1: RayResourcePool, rp2: RayResourcePool) -> RayResourcePool: assert rp1.use_gpu == rp2.use_gpu, "Both RayResourcePool must either use_gpu or not" assert rp1.max_colocate_count == rp2.max_colocate_count, "Both RayResourcePool must has the same max_colocate_count" @@ -313,7 +367,8 @@ def __init__( if self._is_init_with_detached_workers: self._init_with_detached_workers(worker_names=worker_names, worker_handles=worker_handles) - else: + elif not isinstance(resource_pool, SubRayResourcePool): + assert isinstance(resource_pool, RayResourcePool) self._init_with_resource_pool( resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init, @@ -321,6 +376,15 @@ def __init__( detached=detached, worker_env=self.customized_worker_env, ) + else: + assert isinstance(resource_pool, SubRayResourcePool) + self._init_with_subresource_pool( + resource_pool=resource_pool, + ray_cls_with_init=ray_cls_with_init, + bin_pack=bin_pack, + detached=detached, + worker_env=self.customized_worker_env, + ) if ray_cls_with_init is not None: self._bind_worker_method(self.ray_cls_with_init.cls, func_generator) @@ -443,6 +507,83 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d self._workers.append(worker) self._worker_names.append(name) + def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack, detached, worker_env=None): + """Initialize the worker group by creating new workers from a resource pool or sub resource pool. + Args: + resource_pool: Resource pool for worker allocation + ray_cls_with_init: Class with initialization arguments for workers + bin_pack: Whether to use strict bin packing for resource allocation + detached: Whether workers should be detached + """ + use_gpu = resource_pool.use_gpu + strategy = "PACK" + if bin_pack: + strategy = "STRICT_PACK" + pgs = resource_pool.get_placement_groups(strategy=strategy, device_name=self.device_name) + world_size = resource_pool.world_size + self._world_size = world_size + num_gpus = 1 / resource_pool.max_colocate_count + + rank = -1 + local_world_size = resource_pool.store[0] + self._get_master_addr_port(pgs[0]) + for curr_rank in range(resource_pool.start_bundle_index, resource_pool.start_bundle_index + world_size): + pg_idx = curr_rank // local_world_size + pg = pgs[pg_idx] + local_rank = curr_rank % local_world_size + assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the " + rank += 1 + # we pass in environment variable at option so that Worker can use environment variable to set + env_vars = { + "WORLD_SIZE": str(world_size), + "RANK": str(rank), + "WG_PREFIX": self.name_prefix, + "WG_BACKEND": "ray", + "RAY_LOCAL_WORLD_SIZE": str(local_world_size), + "MASTER_ADDR": self._master_addr, + "MASTER_PORT": self._master_port, + } + if worker_env is not None: + logging.debug(f"Appending ray class env, origin: {env_vars}, customized env: {worker_env}") + conflict_env_vars = set(env_vars.keys()) & set(worker_env.keys()) + if len(conflict_env_vars) > 0: + logging.error( + f"User customized env vars conflict with system env: {conflict_env_vars} " + f"Overriding may cause unexpected behavior." + ) + raise ValueError(f"Cannot override protected system env: {conflict_env_vars}") + env_vars.update(worker_env) + import re + + cia_name = type(ray_cls_with_init.cls).__name__ + match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" + cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj" + name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5 + if self.profile_steps and self.device_name == "cuda": + ray_cls_with_init.update_options( + { + "runtime_env": { + "env_vars": env_vars, + "nsight": self.worker_nsight_options, + }, + "name": name, + } + ) + else: + ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name}) + if detached: + ray_cls_with_init.update_options({"lifetime": "detached"}) + # create a worker + worker = ray_cls_with_init( + placement_group=pg, + placement_group_bundle_idx=local_rank, + use_gpu=use_gpu, + num_gpus=num_gpus, + device_name=self.device_name, + ) + self._workers.append(worker) + self._worker_names.append(name) + @property def worker_names(self): return self._worker_names From 34c07c1b4d59db4defc4142e7aee6095fd06838e Mon Sep 17 00:00:00 2001 From: yuyangding Date: Mon, 24 Nov 2025 22:32:18 +0800 Subject: [PATCH 2/5] update --- .../test_split_resource_pool.py | 72 +++++-- verl/single_controller/ray/base.py | 198 ++++++++---------- 2 files changed, 149 insertions(+), 121 deletions(-) diff --git a/tests/single_controller/test_split_resource_pool.py b/tests/single_controller/test_split_resource_pool.py index 04b7bc74df1..7977258073f 100644 --- a/tests/single_controller/test_split_resource_pool.py +++ b/tests/single_controller/test_split_resource_pool.py @@ -53,9 +53,7 @@ def test_split_resource_pool_with_split_size(): global_resource_pool.get_placement_groups() # first 4 gpus for actor_1, last 4 gpus for actor_2 - actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( - resource_pool=global_resource_pool, split_size=4 - ) + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool(resource_pool=global_resource_pool, split_size=4) actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) actor_worker_1 = RayWorkerGroup( @@ -86,7 +84,8 @@ def test_split_resource_pool_with_split_size_list(): # first 2 gpus for actor_1, last 6 gpus for actor_2 actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( - resource_pool=global_resource_pool, split_size=[2, 6], + resource_pool=global_resource_pool, + split_size=[2, 6], ) actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) @@ -102,16 +101,54 @@ def test_split_resource_pool_with_split_size_list(): assert actor_worker_2.world_size == 6 data_1 = DataProto.from_dict({"a": torch.zeros(4)}) - data_2 = DataProto.from_dict({"a": torch.zeros(12)}) + data_2 = DataProto.from_dict({"a": torch.zeros(6)}) actor_output_1 = actor_worker_1.add(data_1) actor_output_2 = actor_worker_2.add(data_2) print(actor_output_1.batch["a"].tolist()) print(actor_output_2.batch["a"].tolist()) - assert actor_output_1.batch["a"].tolist() == [0.0, 0.0, 1.0, 1.0] - assert actor_output_2.batch["a"].tolist() == [100.0, 100.0, 101.0, 101.0, 102.0, 102.0, 103.0, 103.0, 104.0, 104.0, 105.0, 105.0] + assert actor_output_1.batch["a"].tolist() == [0, 0, 1, 1] + assert actor_output_2.batch["a"].tolist() == [100, 101, 102, 103, 104, 105] ray.shutdown() + +def test_split_resource_pool_with_split_size_list_cross_nodes(): + ray.init() + # assume we have 4 nodes, with 2 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[4, 4]) + global_resource_pool.get_placement_groups() + + # first 2 gpus for actor_1, last 6 gpus for actor_2 + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( + resource_pool=global_resource_pool, + split_size=[2, 6], + ) + actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) + actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) + actor_worker_1 = RayWorkerGroup( + resource_pool=actor_1_resource_pool, + ray_cls_with_init=actor_cls_1, + ) + actor_worker_2 = RayWorkerGroup( + resource_pool=actor_2_resource_pool, + ray_cls_with_init=actor_cls_2, + ) + + assert actor_worker_1.world_size == 2 + assert actor_worker_2.world_size == 6 + + data_1 = DataProto.from_dict({"a": torch.zeros(4)}) + data_2 = DataProto.from_dict({"a": torch.zeros(6)}) + actor_output_1 = actor_worker_1.add(data_1) + actor_output_2 = actor_worker_2.add(data_2) + print(actor_output_1.batch["a"].tolist()) + print(actor_output_2.batch["a"].tolist()) + assert actor_output_1.batch["a"].tolist() == [0, 0, 1, 1] + assert actor_output_2.batch["a"].tolist() == [100, 101, 102, 103, 104, 105] + + ray.shutdown() + + def test_split_resource_pool_with_split_twice(): ray.init() @@ -119,21 +156,24 @@ def test_split_resource_pool_with_split_twice(): global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2]) global_resource_pool.get_placement_groups() - # first 2 gpus for actor_1, last 6 gpus for actor_2 + # actors with [2, 1, 1, 1, 1, 2] (split twice) rp_1, rp_2, rp_3 = split_resource_pool( - resource_pool=global_resource_pool, split_size=[2, 4, 2], + resource_pool=global_resource_pool, + split_size=[2, 4, 2], ) rp_2_1, rp_2_2, rp_2_3, rp_2_4 = split_resource_pool( - resource_pool=rp_2, split_size=1, + resource_pool=rp_2, + split_size=1, ) fp_list = [rp_1, rp_2_1, rp_2_2, rp_2_3, rp_2_4, rp_3] correct_world_size = [2, 1, 1, 1, 1, 2] correct_output = [ - [100.0, 100.0, 100.0, 100.0], - [200.0, 200.0, 200.0, 200.0], - [300.0, 300.0, 300.0, 300.0], - [400.0, 400.0, 400.0, 400.0], - [500.0, 500.0, 501.0, 501.0], + [0.0, 0.0, 1.0, 1.0], # 2 worker + [100.0, 100.0, 100.0, 100.0], # 1 worker + [200.0, 200.0, 200.0, 200.0], # 1 worker + [300.0, 300.0, 300.0, 300.0], # 1 worker + [400.0, 400.0, 400.0, 400.0], # 1 worker + [500.0, 500.0, 501.0, 501.0], # 2 worker ] for idx, rp in enumerate(fp_list): actor_cls = RayClassWithInitArgs(cls=Actor, worker_id=idx * 100) @@ -144,6 +184,6 @@ def test_split_resource_pool_with_split_twice(): data = DataProto.from_dict({"a": torch.zeros(4)}) actor_output = actor_worker.add(data) assert actor_worker.world_size == correct_world_size[idx] - print(actor_output.batch["a"].tolist()) + assert actor_output.batch["a"].tolist() == correct_output[idx] ray.shutdown() diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py index 40569af7f8f..5c1232d2723 100644 --- a/verl/single_controller/ray/base.py +++ b/verl/single_controller/ray/base.py @@ -188,6 +188,18 @@ def extract_pg_from_exist( def split_resource_pool( resource_pool: RayResourcePool | SubRayResourcePool, split_size: int | list[int] ) -> list[SubRayResourcePool]: + """ + Split a RayResourcePool into multiple SubRayResourcePool. + resouce_pool can also be a SubRayResourcePool (have been splited) for multiple-time spliting. + + Args: + resource_pool (RayResourcePool | SubRayResourcePool): The resource pool to split. + split_size (int | list[int]): The size of each split. If int, all splits will have the same size. + If list[int], each element in the list represents the size of a split. + + Returns: + list[SubRayResourcePool]: A list of SubRayResourcePool after splitting. + """ # convert split_size to list[int] if isinstance(split_size, int): assert resource_pool.world_size % split_size == 0, "split_size must be a divisor of world_size" @@ -432,7 +444,7 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d bin_pack: Whether to use strict bin packing for resource allocation detached: Whether workers should be detached """ - use_gpu = resource_pool.use_gpu + self.resource_pool = resource_pool strategy = "PACK" if bin_pack: @@ -441,7 +453,6 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d world_size = resource_pool.world_size self._world_size = world_size # cia.add_kwarg("_world_size", world_size) - num_gpus = 1 / resource_pool.max_colocate_count rank = -1 local_world_size = resource_pool.store[0] @@ -452,60 +463,16 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d for local_rank in range(local_world_size): rank += 1 - - # we pass in environment variable at option so that Worker can use environment variable to set - env_vars = { - "WORLD_SIZE": str(world_size), - "RANK": str(rank), - "WG_PREFIX": self.name_prefix, - "WG_BACKEND": "ray", - "RAY_LOCAL_WORLD_SIZE": str(local_world_size), - "MASTER_ADDR": self._master_addr, - "MASTER_PORT": self._master_port, - } - if worker_env is not None: - logging.debug(f"Appending ray class env, origin: {env_vars}, customized env: {worker_env}") - conflict_env_vars = set(env_vars.keys()) & set(worker_env.keys()) - if len(conflict_env_vars) > 0: - logging.error( - f"User customized env vars conflict with system env: {conflict_env_vars} " - f"Overriding may cause unexpected behavior." - ) - raise ValueError(f"Cannot override protected system env: {conflict_env_vars}") - env_vars.update(worker_env) - import re - - cia_name = type(ray_cls_with_init.cls).__name__ - match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" - cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj" - name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5 - - if self.profile_steps and self.device_name == "cuda": - ray_cls_with_init.update_options( - { - "runtime_env": { - "env_vars": env_vars, - "nsight": self.worker_nsight_options, - }, - "name": name, - } - ) - else: - ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name}) - - if detached: - ray_cls_with_init.update_options({"lifetime": "detached"}) - - # create a worker - worker = ray_cls_with_init( - placement_group=pg, - placement_group_bundle_idx=local_rank, - use_gpu=use_gpu, - num_gpus=num_gpus, - device_name=self.device_name, + self.register_single_bundle( + rank=rank, + pg_idx=pg_idx, + pg=pg, + local_rank=local_rank, + resource_pool=resource_pool, + ray_cls_with_init=ray_cls_with_init, + worker_env=worker_env, + detached=detached, ) - self._workers.append(worker) - self._worker_names.append(name) def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack, detached, worker_env=None): """Initialize the worker group by creating new workers from a resource pool or sub resource pool. @@ -515,14 +482,12 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack bin_pack: Whether to use strict bin packing for resource allocation detached: Whether workers should be detached """ - use_gpu = resource_pool.use_gpu strategy = "PACK" if bin_pack: strategy = "STRICT_PACK" pgs = resource_pool.get_placement_groups(strategy=strategy, device_name=self.device_name) world_size = resource_pool.world_size self._world_size = world_size - num_gpus = 1 / resource_pool.max_colocate_count rank = -1 local_world_size = resource_pool.store[0] @@ -532,57 +497,80 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack pg = pgs[pg_idx] local_rank = curr_rank % local_world_size assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the " + rank += 1 - # we pass in environment variable at option so that Worker can use environment variable to set - env_vars = { - "WORLD_SIZE": str(world_size), - "RANK": str(rank), - "WG_PREFIX": self.name_prefix, - "WG_BACKEND": "ray", - "RAY_LOCAL_WORLD_SIZE": str(local_world_size), - "MASTER_ADDR": self._master_addr, - "MASTER_PORT": self._master_port, - } - if worker_env is not None: - logging.debug(f"Appending ray class env, origin: {env_vars}, customized env: {worker_env}") - conflict_env_vars = set(env_vars.keys()) & set(worker_env.keys()) - if len(conflict_env_vars) > 0: - logging.error( - f"User customized env vars conflict with system env: {conflict_env_vars} " - f"Overriding may cause unexpected behavior." - ) - raise ValueError(f"Cannot override protected system env: {conflict_env_vars}") - env_vars.update(worker_env) - import re - - cia_name = type(ray_cls_with_init.cls).__name__ - match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" - cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj" - name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5 - if self.profile_steps and self.device_name == "cuda": - ray_cls_with_init.update_options( - { - "runtime_env": { - "env_vars": env_vars, - "nsight": self.worker_nsight_options, - }, - "name": name, - } + self.register_single_bundle( + rank=rank, + pg_idx=pg_idx, + pg=pg, + local_rank=local_rank, + resource_pool=resource_pool, + ray_cls_with_init=ray_cls_with_init, + worker_env=worker_env, + detached=detached, + ) + + def register_single_bundle( + self, rank, pg_idx, pg, local_rank, resource_pool, ray_cls_with_init, worker_env, detached + ): + world_size = resource_pool.world_size + use_gpu = resource_pool.use_gpu + local_world_size = resource_pool.store[0] + num_gpus = 1 / resource_pool.max_colocate_count + + # we pass in environment variable at option so that Worker can use environment variable to set + env_vars = { + "WORLD_SIZE": str(world_size), + "RANK": str(rank), + "WG_PREFIX": self.name_prefix, + "WG_BACKEND": "ray", + "RAY_LOCAL_WORLD_SIZE": str(local_world_size), + "MASTER_ADDR": self._master_addr, + "MASTER_PORT": self._master_port, + } + if worker_env is not None: + logging.debug(f"Appending ray class env, origin: {env_vars}, customized env: {worker_env}") + conflict_env_vars = set(env_vars.keys()) & set(worker_env.keys()) + if len(conflict_env_vars) > 0: + logging.error( + f"User customized env vars conflict with system env: {conflict_env_vars} " + f"Overriding may cause unexpected behavior." ) - else: - ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name}) - if detached: - ray_cls_with_init.update_options({"lifetime": "detached"}) - # create a worker - worker = ray_cls_with_init( - placement_group=pg, - placement_group_bundle_idx=local_rank, - use_gpu=use_gpu, - num_gpus=num_gpus, - device_name=self.device_name, + raise ValueError(f"Cannot override protected system env: {conflict_env_vars}") + env_vars.update(worker_env) + import re + + cia_name = type(ray_cls_with_init.cls).__name__ + match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" + cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj" + name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5 + + if self.profile_steps and self.device_name == "cuda": + ray_cls_with_init.update_options( + { + "runtime_env": { + "env_vars": env_vars, + "nsight": self.worker_nsight_options, + }, + "name": name, + } ) - self._workers.append(worker) - self._worker_names.append(name) + else: + ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name}) + + if detached: + ray_cls_with_init.update_options({"lifetime": "detached"}) + + # create a worker + worker = ray_cls_with_init( + placement_group=pg, + placement_group_bundle_idx=local_rank, + use_gpu=use_gpu, + num_gpus=num_gpus, + device_name=self.device_name, + ) + self._workers.append(worker) + self._worker_names.append(name) @property def worker_names(self): From 2ef67c4edd5d2a1865dd9bb7792490cf17e2602c Mon Sep 17 00:00:00 2001 From: yuyangding Date: Mon, 24 Nov 2025 22:39:23 +0800 Subject: [PATCH 3/5] update --- verl/single_controller/ray/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py index 5c1232d2723..31f8ed8fa65 100644 --- a/verl/single_controller/ray/base.py +++ b/verl/single_controller/ray/base.py @@ -463,7 +463,7 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d for local_rank in range(local_world_size): rank += 1 - self.register_single_bundle( + self.create_worker( rank=rank, pg_idx=pg_idx, pg=pg, @@ -499,7 +499,7 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the " rank += 1 - self.register_single_bundle( + self.create_worker( rank=rank, pg_idx=pg_idx, pg=pg, @@ -510,7 +510,7 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack detached=detached, ) - def register_single_bundle( + def create_worker( self, rank, pg_idx, pg, local_rank, resource_pool, ray_cls_with_init, worker_env, detached ): world_size = resource_pool.world_size From 3e9fdd09cd0d31380b336dfdb6004ab809f836d9 Mon Sep 17 00:00:00 2001 From: yuyangding Date: Tue, 25 Nov 2025 10:49:12 +0800 Subject: [PATCH 4/5] fix --- verl/single_controller/ray/base.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py index 31f8ed8fa65..5bf183de385 100644 --- a/verl/single_controller/ray/base.py +++ b/verl/single_controller/ray/base.py @@ -379,9 +379,8 @@ def __init__( if self._is_init_with_detached_workers: self._init_with_detached_workers(worker_names=worker_names, worker_handles=worker_handles) - elif not isinstance(resource_pool, SubRayResourcePool): - assert isinstance(resource_pool, RayResourcePool) - self._init_with_resource_pool( + elif isinstance(resource_pool, SubRayResourcePool): + self._init_with_subresource_pool( resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init, bin_pack=bin_pack, @@ -389,8 +388,7 @@ def __init__( worker_env=self.customized_worker_env, ) else: - assert isinstance(resource_pool, SubRayResourcePool) - self._init_with_subresource_pool( + self._init_with_resource_pool( resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init, bin_pack=bin_pack, @@ -463,7 +461,7 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d for local_rank in range(local_world_size): rank += 1 - self.create_worker( + self._create_worker( rank=rank, pg_idx=pg_idx, pg=pg, @@ -499,7 +497,7 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the " rank += 1 - self.create_worker( + self._create_worker( rank=rank, pg_idx=pg_idx, pg=pg, @@ -510,9 +508,7 @@ def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack detached=detached, ) - def create_worker( - self, rank, pg_idx, pg, local_rank, resource_pool, ray_cls_with_init, worker_env, detached - ): + def _create_worker(self, rank, pg_idx, pg, local_rank, resource_pool, ray_cls_with_init, worker_env, detached): world_size = resource_pool.world_size use_gpu = resource_pool.use_gpu local_world_size = resource_pool.store[0] From c6202acb95800a58c9816227afe200c19ff573f4 Mon Sep 17 00:00:00 2001 From: yuyangding Date: Tue, 25 Nov 2025 10:56:11 +0800 Subject: [PATCH 5/5] fix --- verl/single_controller/ray/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py index 5bf183de385..7d39de3b33c 100644 --- a/verl/single_controller/ray/base.py +++ b/verl/single_controller/ray/base.py @@ -143,7 +143,7 @@ def get_placement_groups(self, strategy="STRICT_PACK", name=None, device_name="c class SubRayResourcePool(RayResourcePool): def __init__( self, - placement_groups: PlacementGroup, + placement_groups: list[PlacementGroup], start_bundle_index: int, subgroup_world_size: int, **kwargs,