Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
18ea0e2
scaffold of the code
Nov 25, 2020
bf1051c
some scratch and options change
DachengLi1 Nov 26, 2020
3c5628a
NCCL mostly done, supporting API#1
Dec 1, 2020
0714c4a
interface 2.1 2.2 scratch
DachengLi1 Dec 1, 2020
20df179
put code into ray and fix some importing issues
Dec 1, 2020
5267df1
add an addtional Rendezvous class to safely meet at named actor
Dec 1, 2020
8ff63ad
fix some small bugs in nccl_util
Dec 1, 2020
88fbea1
some small fix
Dec 2, 2020
1e66354
scaffold of the code
Nov 25, 2020
c41f046
some scratch and options change
DachengLi1 Nov 26, 2020
912bd0f
NCCL mostly done, supporting API#1
Dec 1, 2020
5db388f
interface 2.1 2.2 scratch
DachengLi1 Dec 1, 2020
bd91da9
put code into ray and fix some importing issues
Dec 1, 2020
d971237
add an addtional Rendezvous class to safely meet at named actor
Dec 1, 2020
3f2f86b
fix some small bugs in nccl_util
Dec 1, 2020
135b9ec
some small fix
Dec 2, 2020
03e49e7
add a Backend class to make Backend string more robust
Dec 2, 2020
ec02002
fix some conflicts
Dec 2, 2020
5588322
add several useful APIs
Dec 2, 2020
49e59a3
add some tests
Dec 4, 2020
be40e84
added allreduce test
DachengLi1 Dec 4, 2020
0133c6a
fix typos
DachengLi1 Dec 4, 2020
cbeaafe
fix several bugs found via unittests
Dec 4, 2020
893142d
fix and update torch test
DachengLi1 Dec 4, 2020
ec1c07a
changed back actor
DachengLi1 Dec 4, 2020
8f15ba4
rearange a bit before importing distributed test
Dec 5, 2020
5b40ec3
add distributed test
Dec 5, 2020
c76a645
merge master
Dec 5, 2020
793830c
remove scratch code
zhisbug Dec 5, 2020
f8587df
auto-linting
zhisbug Dec 5, 2020
d7e4aee
linting 2
Dec 5, 2020
cd62a50
linting 2
Dec 5, 2020
bdb90de
linting 3
Dec 5, 2020
63973ec
linting 4
Dec 5, 2020
e027891
linting 5
zhisbug Dec 5, 2020
4136fa9
linting 6
Dec 5, 2020
ac603ad
2.1 2.2
DachengLi1 Dec 6, 2020
a8f6898
fix small bugs
DachengLi1 Dec 6, 2020
a3aafba
merge master
Dec 15, 2020
af15ca5
minor updates
Dec 15, 2020
9abf10f
linting again
Dec 15, 2020
6aad76d
auto linting
zhisbug Dec 15, 2020
82170bf
linting 2
Dec 15, 2020
93567cf
final linting
Dec 15, 2020
d521fcb
Update python/ray/util/collective_utils.py
DachengLi1 Dec 18, 2020
5a71f2c
Update python/ray/util/collective_utils.py
DachengLi1 Dec 18, 2020
38daf7b
Update python/ray/util/collective_utils.py
DachengLi1 Dec 18, 2020
c5c414a
added actor test
DachengLi1 Dec 18, 2020
b0ab663
lint
DachengLi1 Dec 18, 2020
322c822
remove local sh
DachengLi1 Dec 18, 2020
7c5f414
address most of richard's comments
Dec 28, 2020
9018ccd
minor update
Dec 28, 2020
f50758d
remove the actor.option() interface to avoid changes in ray core
Jan 4, 2021
1b3ba3b
minor updates
Jan 4, 2021
918c905
Merge branch 'master' into ray-collective-pr2
zhisbug Jan 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions python/ray/util/collective/collective.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""APIs exposed under the namespace ray.util.collective."""
import logging
import os
from typing import List

import numpy as np
import ray
Expand Down Expand Up @@ -124,6 +126,49 @@ def init_collective_group(world_size: int,
_group_mgr.create_collective_group(backend, world_size, rank, group_name)


def declare_collective_group(actors,
world_size: int,
ranks: List[int],
backend=types.Backend.NCCL,
group_name: str = "default"):
"""Declare a list of actors as a collective group.

Note: This function should be called in a driver process.

Args:
actors (list): a list of actors to be set in a collective group.
group_options (dict): a dictionary that contains group_name(str),
world_size(int), rank(list of int, e.g. [0,1]
means the first actor is rank 0, and the second
actor is rank 1), backend(str).
"""
backend = types.Backend(backend)
_check_backend_availability(backend)

name = "info_" + group_name
try:
ray.get_actor(name)
raise RuntimeError("Trying to initialize a group twice.")
except ValueError:
pass

if len(ranks) != len(actors):
raise RuntimeError("Each actor should correspond to one rank.")

if set(ranks) != set(range(len(ranks))):
raise RuntimeError("Rank must be a permutation from 0 to len-1.")

assert world_size > 0
assert all(ranks) >= 0 and all(ranks) < world_size

from ray.util.collective.util import Info
# store the information into a NamedActor that can be accessed later/
name = "info_" + group_name
actors_id = [a._ray_actor_id for a in actors]
info = Info.options(name=name, lifetime="detached").remote()
ray.wait([info.set_info.remote(actors_id, world_size, ranks, backend)])


def destroy_collective_group(group_name: str = "default") -> None:
"""Destroy a collective group given its group name."""
_check_inside_actor()
Expand Down Expand Up @@ -342,9 +387,33 @@ def recv(tensor, src_rank: int, group_name: str = "default"):
def _check_and_get_group(group_name):
"""Check the existence and return the group handle."""
_check_inside_actor()
global _group_mgr
if not is_group_initialized(group_name):
raise RuntimeError("The collective group '{}' is not "
"initialized in the process.".format(group_name))
# try loading from remote info store
try:
# if the information is stored in an Info object,
# get and create the group.
name = "info_" + group_name
mgr = ray.get_actor(name=name)
ids, world_size, rank, backend = ray.get(mgr.get_info.remote())
worker = ray.worker.global_worker
id_ = worker.core_worker.get_actor_id()
r = rank[ids.index(id_)]
_group_mgr.create_collective_group(backend, world_size, r,
group_name)
except ValueError as exc:
# check if this group is initialized using options()
if "collective_group_name" in os.environ and \
os.environ["collective_group_name"] == group_name:
rank = int(os.environ["collective_rank"])
world_size = int(os.environ["collective_world_size"])
backend = os.environ["collective_backend"]
_group_mgr.create_collective_group(backend, world_size, rank,
group_name)
else:
raise RuntimeError(
"The collective group '{}' is not "
"initialized in the process.".format(group_name)) from exc
g = _group_mgr.get_group_by_name(group_name)
return g

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import cupy as cp
import ray

import ray.util.collective as collective


@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)

def compute(self):
collective.allreduce(self.send, "177")
return self.send


if __name__ == "__main__":
ray.init(num_gpus=2)

num_workers = 2
workers = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
_options = {
"group_name": "177",
"world_size": 2,
"ranks": [0, 1],
"backend": "nccl"
}
collective.declare_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])
print(results)
ray.shutdown()
25 changes: 25 additions & 0 deletions python/ray/util/collective/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,28 @@ def get_id(self):
logger.warning("The NCCL ID has not been "
"set yet for store {}.".format(self.name))
return self.nccl_id


@ray.remote
class Info:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please rename? Also, this is just a makeshift kvstore right?

Can we maybe use ray.experimental.internal_kv instead of this actor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhisbug let's chat offline about possible alternatives here? this may seem harmless but I think it could easily be a source of issues later on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's talk tomorrow. If the internal_kv is good we can refactor all named_actor code to use this one (which might be better!)

"""Store the group information created via `declare_collective_group`.

Note: Should be used as a NamedActor.
"""

def __init__(self):
self.ids = None
self.world_size = -1
self.rank = -1
self.backend = None

def set_info(self, ids, world_size, rank, backend):
"""Store collective information."""
self.ids = ids
self.world_size = world_size
self.rank = rank
self.backend = backend

def get_info(self):
"""Get previously stored collective information."""
return self.ids, self.world_size, self.rank, self.backend