Skip to content
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
18ea0e2
scaffold of the code
Nov 25, 2020
bf1051c
some scratch and options change
DachengLi1 Nov 26, 2020
3c5628a
NCCL mostly done, supporting API#1
Dec 1, 2020
0714c4a
interface 2.1 2.2 scratch
DachengLi1 Dec 1, 2020
20df179
put code into ray and fix some importing issues
Dec 1, 2020
5267df1
add an addtional Rendezvous class to safely meet at named actor
Dec 1, 2020
8ff63ad
fix some small bugs in nccl_util
Dec 1, 2020
88fbea1
some small fix
Dec 2, 2020
1e66354
scaffold of the code
Nov 25, 2020
c41f046
some scratch and options change
DachengLi1 Nov 26, 2020
912bd0f
NCCL mostly done, supporting API#1
Dec 1, 2020
5db388f
interface 2.1 2.2 scratch
DachengLi1 Dec 1, 2020
bd91da9
put code into ray and fix some importing issues
Dec 1, 2020
d971237
add an addtional Rendezvous class to safely meet at named actor
Dec 1, 2020
3f2f86b
fix some small bugs in nccl_util
Dec 1, 2020
135b9ec
some small fix
Dec 2, 2020
03e49e7
add a Backend class to make Backend string more robust
Dec 2, 2020
ec02002
fix some conflicts
Dec 2, 2020
5588322
add several useful APIs
Dec 2, 2020
49e59a3
add some tests
Dec 4, 2020
be40e84
added allreduce test
DachengLi1 Dec 4, 2020
0133c6a
fix typos
DachengLi1 Dec 4, 2020
cbeaafe
fix several bugs found via unittests
Dec 4, 2020
893142d
fix and update torch test
DachengLi1 Dec 4, 2020
ec1c07a
changed back actor
DachengLi1 Dec 4, 2020
8f15ba4
rearange a bit before importing distributed test
Dec 5, 2020
5b40ec3
add distributed test
Dec 5, 2020
c76a645
merge master
Dec 5, 2020
793830c
remove scratch code
zhisbug Dec 5, 2020
f8587df
auto-linting
zhisbug Dec 5, 2020
d7e4aee
linting 2
Dec 5, 2020
cd62a50
linting 2
Dec 5, 2020
bdb90de
linting 3
Dec 5, 2020
63973ec
linting 4
Dec 5, 2020
e027891
linting 5
zhisbug Dec 5, 2020
4136fa9
linting 6
Dec 5, 2020
ac603ad
2.1 2.2
DachengLi1 Dec 6, 2020
a8f6898
fix small bugs
DachengLi1 Dec 6, 2020
a3aafba
merge master
Dec 15, 2020
af15ca5
minor updates
Dec 15, 2020
9abf10f
linting again
Dec 15, 2020
6aad76d
auto linting
zhisbug Dec 15, 2020
82170bf
linting 2
Dec 15, 2020
93567cf
final linting
Dec 15, 2020
d521fcb
Update python/ray/util/collective_utils.py
DachengLi1 Dec 18, 2020
5a71f2c
Update python/ray/util/collective_utils.py
DachengLi1 Dec 18, 2020
38daf7b
Update python/ray/util/collective_utils.py
DachengLi1 Dec 18, 2020
c5c414a
added actor test
DachengLi1 Dec 18, 2020
b0ab663
lint
DachengLi1 Dec 18, 2020
322c822
remove local sh
DachengLi1 Dec 18, 2020
7c5f414
address most of richard's comments
Dec 28, 2020
9018ccd
minor update
Dec 28, 2020
f50758d
remove the actor.option() interface to avoid changes in ray core
Jan 4, 2021
1b3ba3b
minor updates
Jan 4, 2021
918c905
Merge branch 'master' into ray-collective-pr2
zhisbug Jan 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ray.worker
from ray.util.placement_group import (
PlacementGroup, check_placement_group_index, get_current_placement_group)
from ray.util.collective_utils import collective_to_envs

from ray import ActorClassID, Language
from ray._raylet import PythonFunctionDescriptor
Expand Down Expand Up @@ -424,7 +425,8 @@ def options(self,
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
override_environment_variables=None):
override_environment_variables=None,
collective=None):
"""Configures and overrides the actor instantiation parameters.

The arguments are the same as those that can be passed
Expand Down Expand Up @@ -466,7 +468,8 @@ def remote(self, *args, **kwargs):
placement_group_capture_child_tasks=(
placement_group_capture_child_tasks),
override_environment_variables=(
override_environment_variables))
override_environment_variables),
collective=collective)

return ActorOptionWrapper()

Expand All @@ -487,7 +490,8 @@ def _remote(self,
placement_group=None,
placement_group_bundle_index=-1,
placement_group_capture_child_tasks=None,
override_environment_variables=None):
override_environment_variables=None,
collective=None):
"""Create an actor.

This method allows more flexibility than the remote method because
Expand Down Expand Up @@ -527,6 +531,7 @@ def _remote(self,
override_environment_variables: Environment variables to override
and/or introduce for this actor. This is a dictionary mapping
variable names to their values.
collective: what colletive configuration to use

Returns:
A handle to the newly created actor.
Expand Down Expand Up @@ -657,6 +662,11 @@ def _remote(self,
function_signature = meta.method_meta.signatures["__init__"]
creation_args = signature.flatten_args(function_signature, args,
kwargs)

if collective:
override_environment_variables = collective_to_envs(
collective, override_environment_variables)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang or @ericl can you take a quick look and give +1 if ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem necessary. Can we avoid making changes to core API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl understood the requirement.

This modification over core APIs was intended to implement an declarative interface: actor.options(..., collective=options) per Stephanie's suggestions. See here

But I am actually fine not adding this one as we currently have had a declarative interface as declare_collective_group(actors, options, ...) in addition to the init_collective_group(...) interface.

@stephanie-wang please comment, thanks!


actor_id = worker.core_worker.create_actor(
meta.language,
meta.actor_creation_function_descriptor,
Expand Down
33 changes: 32 additions & 1 deletion python/ray/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import ray
import ray.test_utils
import ray.cluster_utils

# NOTE: We have to import setproctitle after ray because we bundle setproctitle
# with ray.
import setproctitle
Expand Down Expand Up @@ -238,6 +237,38 @@ def get_val(self):
assert ray.get(g.remote()) == num_remote_functions - 1


def test_collective_envs(ray_start_10_cpus):
@ray.remote
class Actor:
def __init__(self):
pass

def f(self):
_group_name = os.getenv("collective_group_name")
_rank = os.getenv("collective_rank")
_world_size = os.getenv("collective_world_size")
_backend = os.getenv("collective_backend")
return _group_name, _rank, _world_size, _backend

actors = []
for i in range(2):
_options = {
"group_name": "177",
"world_size": 2,
"rank": i,
"backend": "nccl"
}
actor = Actor.options(collective=_options).remote()
actors.append(actor)

_group_name, _rank, _world_size, _backend =\
ray.get(actors[0].f.remote())
assert _group_name == "177"
assert _world_size == "2"
assert _rank == "0"
assert _backend == "nccl"


def test_actor_method_metadata_cache(ray_start_regular):
class Actor(object):
pass
Expand Down
9 changes: 5 additions & 4 deletions python/ray/util/collective/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .collective import nccl_available, mpi_available, is_group_initialized, \
init_collective_group, destroy_collective_group, get_rank, \
get_world_size, allreduce, barrier
init_collective_group, declare_collective_group, \
destroy_collective_group, get_rank, get_world_size, allreduce, barrier

__all__ = [
"nccl_available", "mpi_available", "is_group_initialized",
"init_collective_group", "destroy_collective_group", "get_rank",
"get_world_size", "allreduce", "barrier"
"init_collective_group", "declare_collective_group",
"destroy_collective_group", "get_rank", "get_world_size", "allreduce",
"barrier"
]
77 changes: 75 additions & 2 deletions python/ray/util/collective/collective.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""APIs exposed under the namespace ray.util.collective."""
import logging
import os

import numpy as np
import ray
Expand Down Expand Up @@ -150,6 +151,54 @@ def init_collective_group(world_size: int,
_group_mgr.create_collective_group(backend, world_size, rank, group_name)


def declare_collective_group(actors, group_options):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we instead expand group_options here:

def declare_collective_group(actors, *, group_name, world_size, rank, backend):

"""
Declare a list of actors in a collective group with group options.

Note: This function should be called in a driver process.

Args:
actors (list): a list of actors to be set in a collective group.
group_options (dict): a dictionary that contains group_name(str),
world_size(int), rank(list of int, e.g. [0,1]
means the first actor is rank 0, and the second
actor is rank 1), backend(str).
"""
try:
group_name = group_options["group_name"]
world_size = group_options["world_size"]
rank = group_options["rank"]
backend = group_options["backend"]
except KeyError:
raise ValueError("group options incomplete.")

backend = types.Backend(backend)
_check_backend_availability(backend)

name = "info_" + group_name
try:
ray.get_actor(name)
raise RuntimeError("Trying to initialize a group twice.")
except ValueError:
pass

if len(rank) != len(actors):
raise RuntimeError("Each actor should correspond to one rank.")

if set(rank) != set(range(len(rank))):
raise RuntimeError("Rank must be a permutation from 0 to len-1.")

assert world_size > 0
assert all(rank) >= 0 and all(rank) < world_size

from ray.util.collective.util import Info
# store the information into a NamedActor that can be accessed later/
name = "info_" + group_name
actors_id = [a._ray_actor_id for a in actors]
info = Info.options(name=name, lifetime="detached").remote()
ray.wait([info.set_info.remote(actors_id, world_size, rank, backend)])


def destroy_collective_group(group_name: str = "default") -> None:
"""Destroy a collective group given its group name."""
_check_inside_actor()
Expand Down Expand Up @@ -231,9 +280,33 @@ def barrier(group_name):
def _check_and_get_group(group_name):
"""Check the existence and return the group handle."""
_check_inside_actor()
global _group_mgr
if not is_group_initialized(group_name):
raise RuntimeError("The collective group '{}' is not "
"initialized in the process.".format(group_name))
# try loading from remote info store
try:
# if the information is stored in an Info object,
# get and create the group.
name = "info_" + group_name
mgr = ray.get_actor(name=name)
ids, world_size, rank, backend = ray.get(mgr.get_info.remote())
worker = ray.worker.global_worker
id_ = worker.core_worker.get_actor_id()
r = rank[ids.index(id_)]
_group_mgr.create_collective_group(backend, world_size, r,
group_name)
except ValueError:
# check if this group is initialized using options()
if "collective_group_name" in os.environ and \
os.environ["collective_group_name"] == group_name:
rank = int(os.environ["collective_rank"])
world_size = int(os.environ["collective_world_size"])
backend = os.environ["collective_backend"]
_group_mgr.create_collective_group(backend, world_size, rank,
group_name)
else:
raise RuntimeError(
"The collective group '{}' is not "
"initialized in the process.".format(group_name))
g = _group_mgr.get_group_by_name(group_name)
return g

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import cupy as cp
import ray

import ray.util.collective as collective


@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)

def compute(self):
collective.allreduce(self.send, "177")
return self.send


if __name__ == "__main__":
ray.init(num_gpus=2)

num_workers = 2
workers = []
for i in range(num_workers):
_options = {
"group_name": "177",
"world_size": 2,
"rank": i,
"backend": "nccl"
}
w = Worker.options(collective=_options).remote()
workers.append(w)
results = ray.get([w.compute.remote() for w in workers])
print(results)
ray.shutdown()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import cupy as cp
import ray

import ray.util.collective as collective


@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)

def compute(self):
collective.allreduce(self.send, "177")
return self.send


if __name__ == "__main__":
ray.init(num_gpus=2)

num_workers = 2
workers = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
_options = {
"group_name": "177",
"world_size": 2,
"rank": [0, 1],
"backend": "nccl"
}
collective.declare_collective_group(workers, _options)
results = ray.get([w.compute.remote() for w in workers])
print(results)
ray.shutdown()
26 changes: 26 additions & 0 deletions python/ray/util/collective/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,29 @@ def get_id(self):
logger.warning("The NCCL ID has not been "
"set yet for store {}.".format(self.name))
return self.nccl_id


@ray.remote
class Info:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please rename? Also, this is just a makeshift kvstore right?

Can we maybe use ray.experimental.internal_kv instead of this actor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhisbug let's chat offline about possible alternatives here? this may seem harmless but I think it could easily be a source of issues later on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's talk tomorrow. If the internal_kv is good we can refactor all named_actor code to use this one (which might be better!)

"""
Store the group information created via `declare_collective_group`.

Note: Should be used as a NamedActor.
"""

def __init__(self):
self.ids = None
self.world_size = -1
self.rank = -1
self.backend = None

def set_info(self, ids, world_size, rank, backend):
"""Store collective information."""
self.ids = ids
self.world_size = world_size
self.rank = rank
self.backend = backend

def get_info(self):
"""Get previously stored collective information."""
return self.ids, self.world_size, self.rank, self.backend
28 changes: 28 additions & 0 deletions python/ray/util/collective_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Collections of collective util functions"""


def collective_to_envs(collective, envs):
"""A helper method that get information from collective and add to envs.

Args:
collective (dict): collective information
envs (dict): os environment dict

Returns:
envs (dict): modified os environment dict
"""

if envs is not None:
assert not any(k in envs for k in [
"collective_group_name", "collective_rank",
"collective_world_size", "collective_backend"
])

else:
envs = {}
envs["collective_group_name"] = str(collective["group_name"])
envs["collective_rank"] = str(collective["rank"])
envs["collective_world_size"] = str(collective["world_size"])
envs["collective_backend"] = str(collective["backend"])

return envs