diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index e2263648b76f..360498425109 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -1,5 +1,7 @@ """APIs exposed under the namespace ray.util.collective.""" import logging +import os +from typing import List import numpy as np import ray @@ -124,6 +126,49 @@ def init_collective_group(world_size: int, _group_mgr.create_collective_group(backend, world_size, rank, group_name) +def declare_collective_group(actors, + world_size: int, + ranks: List[int], + backend=types.Backend.NCCL, + group_name: str = "default"): + """Declare a list of actors as a collective group. + + Note: This function should be called in a driver process. + + Args: + actors (list): a list of actors to be set in a collective group. + group_options (dict): a dictionary that contains group_name(str), + world_size(int), rank(list of int, e.g. [0,1] + means the first actor is rank 0, and the second + actor is rank 1), backend(str). + """ + backend = types.Backend(backend) + _check_backend_availability(backend) + + name = "info_" + group_name + try: + ray.get_actor(name) + raise RuntimeError("Trying to initialize a group twice.") + except ValueError: + pass + + if len(ranks) != len(actors): + raise RuntimeError("Each actor should correspond to one rank.") + + if set(ranks) != set(range(len(ranks))): + raise RuntimeError("Rank must be a permutation from 0 to len-1.") + + assert world_size > 0 + assert all(ranks) >= 0 and all(ranks) < world_size + + from ray.util.collective.util import Info + # store the information into a NamedActor that can be accessed later/ + name = "info_" + group_name + actors_id = [a._ray_actor_id for a in actors] + info = Info.options(name=name, lifetime="detached").remote() + ray.wait([info.set_info.remote(actors_id, world_size, ranks, backend)]) + + def destroy_collective_group(group_name: str = "default") -> None: """Destroy a collective group given its group name.""" _check_inside_actor() @@ -342,9 +387,33 @@ def recv(tensor, src_rank: int, group_name: str = "default"): def _check_and_get_group(group_name): """Check the existence and return the group handle.""" _check_inside_actor() + global _group_mgr if not is_group_initialized(group_name): - raise RuntimeError("The collective group '{}' is not " - "initialized in the process.".format(group_name)) + # try loading from remote info store + try: + # if the information is stored in an Info object, + # get and create the group. + name = "info_" + group_name + mgr = ray.get_actor(name=name) + ids, world_size, rank, backend = ray.get(mgr.get_info.remote()) + worker = ray.worker.global_worker + id_ = worker.core_worker.get_actor_id() + r = rank[ids.index(id_)] + _group_mgr.create_collective_group(backend, world_size, r, + group_name) + except ValueError as exc: + # check if this group is initialized using options() + if "collective_group_name" in os.environ and \ + os.environ["collective_group_name"] == group_name: + rank = int(os.environ["collective_rank"]) + world_size = int(os.environ["collective_world_size"]) + backend = os.environ["collective_backend"] + _group_mgr.create_collective_group(backend, world_size, rank, + group_name) + else: + raise RuntimeError( + "The collective group '{}' is not " + "initialized in the process.".format(group_name)) from exc g = _group_mgr.get_group_by_name(group_name) return g diff --git a/python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py b/python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py new file mode 100644 index 000000000000..9d0335dbab11 --- /dev/null +++ b/python/ray/util/collective/examples/nccl_allreduce_example_declare_collective_group.py @@ -0,0 +1,34 @@ +import cupy as cp +import ray + +import ray.util.collective as collective + + +@ray.remote(num_gpus=1) +class Worker: + def __init__(self): + self.send = cp.ones((4, ), dtype=cp.float32) + + def compute(self): + collective.allreduce(self.send, "177") + return self.send + + +if __name__ == "__main__": + ray.init(num_gpus=2) + + num_workers = 2 + workers = [] + for i in range(num_workers): + w = Worker.remote() + workers.append(w) + _options = { + "group_name": "177", + "world_size": 2, + "ranks": [0, 1], + "backend": "nccl" + } + collective.declare_collective_group(workers, **_options) + results = ray.get([w.compute.remote() for w in workers]) + print(results) + ray.shutdown() diff --git a/python/ray/util/collective/util.py b/python/ray/util/collective/util.py index e591e9b93f0b..56a80673a88d 100644 --- a/python/ray/util/collective/util.py +++ b/python/ray/util/collective/util.py @@ -40,3 +40,28 @@ def get_id(self): logger.warning("The NCCL ID has not been " "set yet for store {}.".format(self.name)) return self.nccl_id + + +@ray.remote +class Info: + """Store the group information created via `declare_collective_group`. + + Note: Should be used as a NamedActor. + """ + + def __init__(self): + self.ids = None + self.world_size = -1 + self.rank = -1 + self.backend = None + + def set_info(self, ids, world_size, rank, backend): + """Store collective information.""" + self.ids = ids + self.world_size = world_size + self.rank = rank + self.backend = backend + + def get_info(self): + """Get previously stored collective information.""" + return self.ids, self.world_size, self.rank, self.backend