Skip to content

Commit 4150970

Browse files
zhisbugDachengLi1richardliaw
authored
[Collective][PR 2/6] Driver program declarative interfaces (#12874)
* scaffold of the code * some scratch and options change * NCCL mostly done, supporting API#1 * interface 2.1 2.2 scratch * put code into ray and fix some importing issues * add an addtional Rendezvous class to safely meet at named actor * fix some small bugs in nccl_util * some small fix * scaffold of the code * some scratch and options change * NCCL mostly done, supporting API#1 * interface 2.1 2.2 scratch * put code into ray and fix some importing issues * add an addtional Rendezvous class to safely meet at named actor * fix some small bugs in nccl_util * some small fix * add a Backend class to make Backend string more robust * add several useful APIs * add some tests * added allreduce test * fix typos * fix several bugs found via unittests * fix and update torch test * changed back actor * rearange a bit before importing distributed test * add distributed test * remove scratch code * auto-linting * linting 2 * linting 2 * linting 3 * linting 4 * linting 5 * linting 6 * 2.1 2.2 * fix small bugs * minor updates * linting again * auto linting * linting 2 * final linting * Update python/ray/util/collective_utils.py Co-authored-by: Richard Liaw <[email protected]> * Update python/ray/util/collective_utils.py Co-authored-by: Richard Liaw <[email protected]> * Update python/ray/util/collective_utils.py Co-authored-by: Richard Liaw <[email protected]> * added actor test * lint * remove local sh * address most of richard's comments * minor update * remove the actor.option() interface to avoid changes in ray core * minor updates Co-authored-by: YLJALDC <[email protected]> Co-authored-by: Richard Liaw <[email protected]>
1 parent c617291 commit 4150970

File tree

3 files changed

+130
-2
lines changed

3 files changed

+130
-2
lines changed

python/ray/util/collective/collective.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""APIs exposed under the namespace ray.util.collective."""
22
import logging
3+
import os
4+
from typing import List
35

46
import numpy as np
57
import ray
@@ -124,6 +126,49 @@ def init_collective_group(world_size: int,
124126
_group_mgr.create_collective_group(backend, world_size, rank, group_name)
125127

126128

129+
def declare_collective_group(actors,
130+
world_size: int,
131+
ranks: List[int],
132+
backend=types.Backend.NCCL,
133+
group_name: str = "default"):
134+
"""Declare a list of actors as a collective group.
135+
136+
Note: This function should be called in a driver process.
137+
138+
Args:
139+
actors (list): a list of actors to be set in a collective group.
140+
group_options (dict): a dictionary that contains group_name(str),
141+
world_size(int), rank(list of int, e.g. [0,1]
142+
means the first actor is rank 0, and the second
143+
actor is rank 1), backend(str).
144+
"""
145+
backend = types.Backend(backend)
146+
_check_backend_availability(backend)
147+
148+
name = "info_" + group_name
149+
try:
150+
ray.get_actor(name)
151+
raise RuntimeError("Trying to initialize a group twice.")
152+
except ValueError:
153+
pass
154+
155+
if len(ranks) != len(actors):
156+
raise RuntimeError("Each actor should correspond to one rank.")
157+
158+
if set(ranks) != set(range(len(ranks))):
159+
raise RuntimeError("Rank must be a permutation from 0 to len-1.")
160+
161+
assert world_size > 0
162+
assert all(ranks) >= 0 and all(ranks) < world_size
163+
164+
from ray.util.collective.util import Info
165+
# store the information into a NamedActor that can be accessed later/
166+
name = "info_" + group_name
167+
actors_id = [a._ray_actor_id for a in actors]
168+
info = Info.options(name=name, lifetime="detached").remote()
169+
ray.wait([info.set_info.remote(actors_id, world_size, ranks, backend)])
170+
171+
127172
def destroy_collective_group(group_name: str = "default") -> None:
128173
"""Destroy a collective group given its group name."""
129174
_check_inside_actor()
@@ -342,9 +387,33 @@ def recv(tensor, src_rank: int, group_name: str = "default"):
342387
def _check_and_get_group(group_name):
343388
"""Check the existence and return the group handle."""
344389
_check_inside_actor()
390+
global _group_mgr
345391
if not is_group_initialized(group_name):
346-
raise RuntimeError("The collective group '{}' is not "
347-
"initialized in the process.".format(group_name))
392+
# try loading from remote info store
393+
try:
394+
# if the information is stored in an Info object,
395+
# get and create the group.
396+
name = "info_" + group_name
397+
mgr = ray.get_actor(name=name)
398+
ids, world_size, rank, backend = ray.get(mgr.get_info.remote())
399+
worker = ray.worker.global_worker
400+
id_ = worker.core_worker.get_actor_id()
401+
r = rank[ids.index(id_)]
402+
_group_mgr.create_collective_group(backend, world_size, r,
403+
group_name)
404+
except ValueError as exc:
405+
# check if this group is initialized using options()
406+
if "collective_group_name" in os.environ and \
407+
os.environ["collective_group_name"] == group_name:
408+
rank = int(os.environ["collective_rank"])
409+
world_size = int(os.environ["collective_world_size"])
410+
backend = os.environ["collective_backend"]
411+
_group_mgr.create_collective_group(backend, world_size, rank,
412+
group_name)
413+
else:
414+
raise RuntimeError(
415+
"The collective group '{}' is not "
416+
"initialized in the process.".format(group_name)) from exc
348417
g = _group_mgr.get_group_by_name(group_name)
349418
return g
350419

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import cupy as cp
2+
import ray
3+
4+
import ray.util.collective as collective
5+
6+
7+
@ray.remote(num_gpus=1)
8+
class Worker:
9+
def __init__(self):
10+
self.send = cp.ones((4, ), dtype=cp.float32)
11+
12+
def compute(self):
13+
collective.allreduce(self.send, "177")
14+
return self.send
15+
16+
17+
if __name__ == "__main__":
18+
ray.init(num_gpus=2)
19+
20+
num_workers = 2
21+
workers = []
22+
for i in range(num_workers):
23+
w = Worker.remote()
24+
workers.append(w)
25+
_options = {
26+
"group_name": "177",
27+
"world_size": 2,
28+
"ranks": [0, 1],
29+
"backend": "nccl"
30+
}
31+
collective.declare_collective_group(workers, **_options)
32+
results = ray.get([w.compute.remote() for w in workers])
33+
print(results)
34+
ray.shutdown()

python/ray/util/collective/util.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,28 @@ def get_id(self):
4040
logger.warning("The NCCL ID has not been "
4141
"set yet for store {}.".format(self.name))
4242
return self.nccl_id
43+
44+
45+
@ray.remote
46+
class Info:
47+
"""Store the group information created via `declare_collective_group`.
48+
49+
Note: Should be used as a NamedActor.
50+
"""
51+
52+
def __init__(self):
53+
self.ids = None
54+
self.world_size = -1
55+
self.rank = -1
56+
self.backend = None
57+
58+
def set_info(self, ids, world_size, rank, backend):
59+
"""Store collective information."""
60+
self.ids = ids
61+
self.world_size = world_size
62+
self.rank = rank
63+
self.backend = backend
64+
65+
def get_info(self):
66+
"""Get previously stored collective information."""
67+
return self.ids, self.world_size, self.rank, self.backend

0 commit comments

Comments
 (0)