diff --git a/tests/single_controller/test_split_resource_pool.py b/tests/single_controller/test_split_resource_pool.py new file mode 100644 index 00000000000..7977258073f --- /dev/null +++ b/tests/single_controller/test_split_resource_pool.py @@ -0,0 +1,189 @@ +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import ray +import torch + +from verl import DataProto +from verl.single_controller.base import Worker +from verl.single_controller.base.decorator import Dispatch, register +from verl.single_controller.ray.base import ( + RayClassWithInitArgs, + RayResourcePool, + RayWorkerGroup, + split_resource_pool, +) + + +@ray.remote +class Actor(Worker): + def __init__(self, worker_id) -> None: + super().__init__() + self.worker_id = worker_id + self.temp_tensor = torch.rand(4096, 4096).to("cuda") + + if not torch.distributed.is_initialized(): + rank = int(os.environ.get("RANK", 0)) + world_size = int(os.environ.get("WORLD_SIZE", 1)) + torch.distributed.init_process_group(backend="nccl", world_size=world_size, rank=rank) + + @register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO) + def add(self, data: DataProto): + data.batch["a"] += self.rank + self.worker_id + return data + + +def test_split_resource_pool_with_split_size(): + ray.init() + # assume we have 2 nodes, with 4 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[4, 4]) + global_resource_pool.get_placement_groups() + + # first 4 gpus for actor_1, last 4 gpus for actor_2 + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool(resource_pool=global_resource_pool, split_size=4) + actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) + actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) + actor_worker_1 = RayWorkerGroup( + resource_pool=actor_1_resource_pool, + ray_cls_with_init=actor_cls_1, + ) + actor_worker_2 = RayWorkerGroup( + resource_pool=actor_2_resource_pool, + ray_cls_with_init=actor_cls_2, + ) + assert actor_worker_1.world_size == 4 + assert actor_worker_2.world_size == 4 + + data = DataProto.from_dict({"a": torch.zeros(8)}) + actor_output_1 = actor_worker_1.add(data) + actor_output_2 = actor_worker_2.add(data) + assert actor_output_1.batch["a"].tolist() == [0, 0, 1, 1, 2, 2, 3, 3] + assert actor_output_2.batch["a"].tolist() == [100, 100, 101, 101, 102, 102, 103, 103] + + ray.shutdown() + + +def test_split_resource_pool_with_split_size_list(): + ray.init() + # assume we have 4 nodes, with 2 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2]) + global_resource_pool.get_placement_groups() + + # first 2 gpus for actor_1, last 6 gpus for actor_2 + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( + resource_pool=global_resource_pool, + split_size=[2, 6], + ) + actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) + actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) + actor_worker_1 = RayWorkerGroup( + resource_pool=actor_1_resource_pool, + ray_cls_with_init=actor_cls_1, + ) + actor_worker_2 = RayWorkerGroup( + resource_pool=actor_2_resource_pool, + ray_cls_with_init=actor_cls_2, + ) + assert actor_worker_1.world_size == 2 + assert actor_worker_2.world_size == 6 + + data_1 = DataProto.from_dict({"a": torch.zeros(4)}) + data_2 = DataProto.from_dict({"a": torch.zeros(6)}) + actor_output_1 = actor_worker_1.add(data_1) + actor_output_2 = actor_worker_2.add(data_2) + print(actor_output_1.batch["a"].tolist()) + print(actor_output_2.batch["a"].tolist()) + assert actor_output_1.batch["a"].tolist() == [0, 0, 1, 1] + assert actor_output_2.batch["a"].tolist() == [100, 101, 102, 103, 104, 105] + + ray.shutdown() + + +def test_split_resource_pool_with_split_size_list_cross_nodes(): + ray.init() + # assume we have 4 nodes, with 2 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[4, 4]) + global_resource_pool.get_placement_groups() + + # first 2 gpus for actor_1, last 6 gpus for actor_2 + actor_1_resource_pool, actor_2_resource_pool = split_resource_pool( + resource_pool=global_resource_pool, + split_size=[2, 6], + ) + actor_cls_1 = RayClassWithInitArgs(cls=Actor, worker_id=0) + actor_cls_2 = RayClassWithInitArgs(cls=Actor, worker_id=100) + actor_worker_1 = RayWorkerGroup( + resource_pool=actor_1_resource_pool, + ray_cls_with_init=actor_cls_1, + ) + actor_worker_2 = RayWorkerGroup( + resource_pool=actor_2_resource_pool, + ray_cls_with_init=actor_cls_2, + ) + + assert actor_worker_1.world_size == 2 + assert actor_worker_2.world_size == 6 + + data_1 = DataProto.from_dict({"a": torch.zeros(4)}) + data_2 = DataProto.from_dict({"a": torch.zeros(6)}) + actor_output_1 = actor_worker_1.add(data_1) + actor_output_2 = actor_worker_2.add(data_2) + print(actor_output_1.batch["a"].tolist()) + print(actor_output_2.batch["a"].tolist()) + assert actor_output_1.batch["a"].tolist() == [0, 0, 1, 1] + assert actor_output_2.batch["a"].tolist() == [100, 101, 102, 103, 104, 105] + + ray.shutdown() + + +def test_split_resource_pool_with_split_twice(): + ray.init() + + # assume we have 4 nodes, with 2 GPUs each + global_resource_pool = RayResourcePool(process_on_nodes=[2, 2, 2, 2]) + global_resource_pool.get_placement_groups() + + # actors with [2, 1, 1, 1, 1, 2] (split twice) + rp_1, rp_2, rp_3 = split_resource_pool( + resource_pool=global_resource_pool, + split_size=[2, 4, 2], + ) + rp_2_1, rp_2_2, rp_2_3, rp_2_4 = split_resource_pool( + resource_pool=rp_2, + split_size=1, + ) + fp_list = [rp_1, rp_2_1, rp_2_2, rp_2_3, rp_2_4, rp_3] + correct_world_size = [2, 1, 1, 1, 1, 2] + correct_output = [ + [0.0, 0.0, 1.0, 1.0], # 2 worker + [100.0, 100.0, 100.0, 100.0], # 1 worker + [200.0, 200.0, 200.0, 200.0], # 1 worker + [300.0, 300.0, 300.0, 300.0], # 1 worker + [400.0, 400.0, 400.0, 400.0], # 1 worker + [500.0, 500.0, 501.0, 501.0], # 2 worker + ] + for idx, rp in enumerate(fp_list): + actor_cls = RayClassWithInitArgs(cls=Actor, worker_id=idx * 100) + actor_worker = RayWorkerGroup( + resource_pool=rp, + ray_cls_with_init=actor_cls, + ) + data = DataProto.from_dict({"a": torch.zeros(4)}) + actor_output = actor_worker.add(data) + assert actor_worker.world_size == correct_world_size[idx] + assert actor_output.batch["a"].tolist() == correct_output[idx] + + ray.shutdown() diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py index 3c6c1eab153..7d39de3b33c 100644 --- a/verl/single_controller/ray/base.py +++ b/verl/single_controller/ray/base.py @@ -17,6 +17,7 @@ from copy import deepcopy from typing import Any, Optional +import numpy as np import ray from ray.experimental.state.api import get_actor from ray.util.placement_group import PlacementGroup, placement_group @@ -135,10 +136,28 @@ def get_placement_groups(self, strategy="STRICT_PACK", name=None, device_name="c ray.get([pg.ready() for pg in pgs]) - self.pgs = pgs + self.pgs = sort_placement_group_by_node_ip(pgs) return pgs +class SubRayResourcePool(RayResourcePool): + def __init__( + self, + placement_groups: list[PlacementGroup], + start_bundle_index: int, + subgroup_world_size: int, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.pgs = placement_groups + self.start_bundle_index = start_bundle_index + self.subgroup_world_size = subgroup_world_size + + @property + def world_size(self): + return self.subgroup_world_size + + def extract_pg_from_exist( resource_pools: dict[str, RayResourcePool], src_role_names: list[str], resource_pool: RayResourcePool ) -> list: @@ -165,6 +184,53 @@ def extract_pg_from_exist( return [pg for _, pg in sorted(unsorted_pgs)] +# split a RayResourcePool or SubRayResourcePool into multiple SubRayResourcePool +def split_resource_pool( + resource_pool: RayResourcePool | SubRayResourcePool, split_size: int | list[int] +) -> list[SubRayResourcePool]: + """ + Split a RayResourcePool into multiple SubRayResourcePool. + resouce_pool can also be a SubRayResourcePool (have been splited) for multiple-time spliting. + + Args: + resource_pool (RayResourcePool | SubRayResourcePool): The resource pool to split. + split_size (int | list[int]): The size of each split. If int, all splits will have the same size. + If list[int], each element in the list represents the size of a split. + + Returns: + list[SubRayResourcePool]: A list of SubRayResourcePool after splitting. + """ + # convert split_size to list[int] + if isinstance(split_size, int): + assert resource_pool.world_size % split_size == 0, "split_size must be a divisor of world_size" + num_replica = resource_pool.world_size // split_size + split_size_list = [split_size] * num_replica + else: + split_size_list = split_size + + assert sum(split_size_list) == resource_pool.world_size, "split_size must sum up to world_size" + + # judge if this resource pool has been splited + if isinstance(resource_pool, SubRayResourcePool): + start_bundle_idx_list = np.cumsum([resource_pool.start_bundle_index] + split_size_list[:-1]) + else: + start_bundle_idx_list = np.cumsum([0] + split_size_list[:-1]) + + split_resource_pools = [ + SubRayResourcePool( + process_on_nodes=resource_pool.store, + use_gpu=resource_pool.use_gpu, + name_prefix=f"{resource_pool.name_prefix}_split_{split_idx}", + max_colocate_count=resource_pool.max_colocate_count, + placement_groups=resource_pool.pgs, + start_bundle_index=start_bundle_idx_list[split_idx], + subgroup_world_size=split_size_list[split_idx], + ) + for split_idx in range(len(split_size_list)) + ] + return split_resource_pools + + def merge_resource_pool(rp1: RayResourcePool, rp2: RayResourcePool) -> RayResourcePool: assert rp1.use_gpu == rp2.use_gpu, "Both RayResourcePool must either use_gpu or not" assert rp1.max_colocate_count == rp2.max_colocate_count, "Both RayResourcePool must has the same max_colocate_count" @@ -313,6 +379,14 @@ def __init__( if self._is_init_with_detached_workers: self._init_with_detached_workers(worker_names=worker_names, worker_handles=worker_handles) + elif isinstance(resource_pool, SubRayResourcePool): + self._init_with_subresource_pool( + resource_pool=resource_pool, + ray_cls_with_init=ray_cls_with_init, + bin_pack=bin_pack, + detached=detached, + worker_env=self.customized_worker_env, + ) else: self._init_with_resource_pool( resource_pool=resource_pool, @@ -368,7 +442,7 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d bin_pack: Whether to use strict bin packing for resource allocation detached: Whether workers should be detached """ - use_gpu = resource_pool.use_gpu + self.resource_pool = resource_pool strategy = "PACK" if bin_pack: @@ -377,7 +451,6 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d world_size = resource_pool.world_size self._world_size = world_size # cia.add_kwarg("_world_size", world_size) - num_gpus = 1 / resource_pool.max_colocate_count rank = -1 local_world_size = resource_pool.store[0] @@ -388,60 +461,112 @@ def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, d for local_rank in range(local_world_size): rank += 1 + self._create_worker( + rank=rank, + pg_idx=pg_idx, + pg=pg, + local_rank=local_rank, + resource_pool=resource_pool, + ray_cls_with_init=ray_cls_with_init, + worker_env=worker_env, + detached=detached, + ) - # we pass in environment variable at option so that Worker can use environment variable to set - env_vars = { - "WORLD_SIZE": str(world_size), - "RANK": str(rank), - "WG_PREFIX": self.name_prefix, - "WG_BACKEND": "ray", - "RAY_LOCAL_WORLD_SIZE": str(local_world_size), - "MASTER_ADDR": self._master_addr, - "MASTER_PORT": self._master_port, - } - if worker_env is not None: - logging.debug(f"Appending ray class env, origin: {env_vars}, customized env: {worker_env}") - conflict_env_vars = set(env_vars.keys()) & set(worker_env.keys()) - if len(conflict_env_vars) > 0: - logging.error( - f"User customized env vars conflict with system env: {conflict_env_vars} " - f"Overriding may cause unexpected behavior." - ) - raise ValueError(f"Cannot override protected system env: {conflict_env_vars}") - env_vars.update(worker_env) - import re - - cia_name = type(ray_cls_with_init.cls).__name__ - match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" - cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj" - name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5 - - if self.profile_steps and self.device_name == "cuda": - ray_cls_with_init.update_options( - { - "runtime_env": { - "env_vars": env_vars, - "nsight": self.worker_nsight_options, - }, - "name": name, - } - ) - else: - ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name}) - - if detached: - ray_cls_with_init.update_options({"lifetime": "detached"}) - - # create a worker - worker = ray_cls_with_init( - placement_group=pg, - placement_group_bundle_idx=local_rank, - use_gpu=use_gpu, - num_gpus=num_gpus, - device_name=self.device_name, + def _init_with_subresource_pool(self, resource_pool, ray_cls_with_init, bin_pack, detached, worker_env=None): + """Initialize the worker group by creating new workers from a resource pool or sub resource pool. + Args: + resource_pool: Resource pool for worker allocation + ray_cls_with_init: Class with initialization arguments for workers + bin_pack: Whether to use strict bin packing for resource allocation + detached: Whether workers should be detached + """ + strategy = "PACK" + if bin_pack: + strategy = "STRICT_PACK" + pgs = resource_pool.get_placement_groups(strategy=strategy, device_name=self.device_name) + world_size = resource_pool.world_size + self._world_size = world_size + + rank = -1 + local_world_size = resource_pool.store[0] + self._get_master_addr_port(pgs[0]) + for curr_rank in range(resource_pool.start_bundle_index, resource_pool.start_bundle_index + world_size): + pg_idx = curr_rank // local_world_size + pg = pgs[pg_idx] + local_rank = curr_rank % local_world_size + assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the " + + rank += 1 + self._create_worker( + rank=rank, + pg_idx=pg_idx, + pg=pg, + local_rank=local_rank, + resource_pool=resource_pool, + ray_cls_with_init=ray_cls_with_init, + worker_env=worker_env, + detached=detached, + ) + + def _create_worker(self, rank, pg_idx, pg, local_rank, resource_pool, ray_cls_with_init, worker_env, detached): + world_size = resource_pool.world_size + use_gpu = resource_pool.use_gpu + local_world_size = resource_pool.store[0] + num_gpus = 1 / resource_pool.max_colocate_count + + # we pass in environment variable at option so that Worker can use environment variable to set + env_vars = { + "WORLD_SIZE": str(world_size), + "RANK": str(rank), + "WG_PREFIX": self.name_prefix, + "WG_BACKEND": "ray", + "RAY_LOCAL_WORLD_SIZE": str(local_world_size), + "MASTER_ADDR": self._master_addr, + "MASTER_PORT": self._master_port, + } + if worker_env is not None: + logging.debug(f"Appending ray class env, origin: {env_vars}, customized env: {worker_env}") + conflict_env_vars = set(env_vars.keys()) & set(worker_env.keys()) + if len(conflict_env_vars) > 0: + logging.error( + f"User customized env vars conflict with system env: {conflict_env_vars} " + f"Overriding may cause unexpected behavior." ) - self._workers.append(worker) - self._worker_names.append(name) + raise ValueError(f"Cannot override protected system env: {conflict_env_vars}") + env_vars.update(worker_env) + import re + + cia_name = type(ray_cls_with_init.cls).__name__ + match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)" + cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj" + name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5 + + if self.profile_steps and self.device_name == "cuda": + ray_cls_with_init.update_options( + { + "runtime_env": { + "env_vars": env_vars, + "nsight": self.worker_nsight_options, + }, + "name": name, + } + ) + else: + ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name}) + + if detached: + ray_cls_with_init.update_options({"lifetime": "detached"}) + + # create a worker + worker = ray_cls_with_init( + placement_group=pg, + placement_group_bundle_idx=local_rank, + use_gpu=use_gpu, + num_gpus=num_gpus, + device_name=self.device_name, + ) + self._workers.append(worker) + self._worker_names.append(name) @property def worker_names(self):