Skip to content

Conversation

@zhisbug
Copy link
Contributor

@zhisbug zhisbug commented Nov 19, 2020

Why are these changes needed?

An initial proposal on adding NCCL/MPI-backed collective into Ray

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@zhisbug
Copy link
Contributor Author

zhisbug commented Nov 19, 2020

Comments by Stephanie (@stephanie-wang):

Thanks, this is interesting. The API to me seems very MPI-like, which i suppose is good for faster prototyping as it's closer to the implementation code, but i hope that we can come up with something that requires less boilerplate, is more compatible with ObjectRefs, and which is less error-prone. so far, the ray model of communication is through tasks and ObjectRefs only - obviously you can communicate between workers out-of-band through TCP, etc, but it is not recommended. so i worry about introducing an API that adds a second model of communication that the user has to think about

i wrote this quick API proposal following the examples in the doc where the collectives are declared at task/actor creation time instead of during execution, and the communication is specified through ObjectRefs. the main difference between this API and the standard ObjectRef API is that the user is limited in what they can do with the ObjectRefs. they can only use them with other objects/processes in the same collective group. https://gist.github.com/stephanie-wang/ae9a82b3f200989ba37749d3268c0907

note that i'm not saying anything about how this should be implemented, but just some initial thoughts on that. the main difficulty is that the system is now responsible for the collective group setup, object storage and communication

  • the boilerplate code for setting up communication groups, etc can be run on the workers when the task or actor creation task is assigned
  • for storage, we actually already have two types of storage, one in-memory C++ store and the distributed object store. to support this API, we could add a third type of storage: in-memory python store, meaning we would store the pointer directly and so we wouldn't need to serialize.
  • for send/recv, we would implement a new protocol for retrieving objects that contacts the sender directly. we already have two protocols according to the two types of storage we currently have, so in some sense we already have these abstractions. for collectives, the driver would send messages to all involved workers to start the operation, e.g., allreduce

@zhisbug
Copy link
Contributor Author

zhisbug commented Nov 19, 2020

Posting earlier conversations on slack, between myself and Stephanie:

Hao:

My main take-aways of your comments (please correct me if any misunderstanding):

  • API-wise: move the collective group declaration to actor/task creation, using some signature like ray.actor.option(kwargs).remote() . Extend the APIs to work with ObjectRefs, or maybe in the future limit it to only work with ObjectRefs.
  • Implementation-wise: create a 3rd python object store (sa, POS). Store the pointers to contents like cuda tensors send , recv that are to be collectively communicated in POS. Runtime -- driver sends messages to members in a group to invoke collective, member retrieves the pointers at execution from local POS and launch the lower-level collective calls.

Actually I discussed with Ion and Melih a bit before I wrote this RFC. We had a Non-goals section in the RFC saying this RFC probably won't support Ray ObjectRef (partially it is also because I'm not familiar with Ray core, and don't know how to do it) -- we (kind of deliberately) limited its scope and were aware the proposal might be incompatible with ObjectRef at this point.
That being said, here is what I am thinking about -- with your comments and help, maybe we should extend the scope of this RFC a bit, and make it a two-staged one:

  • Stage (I): we implement what described in the current RFC, so to bring NCCL/MPI to ray. NumS and RayML can use them to continue development. There are some other complications in the implementation which I didn't bring up in the RFC, such as managing multiple threads/cuda streams, managing multiple collective groups (and hence the treams and communiators therein); We feel this part has an immediate value to facilitate some development in other ongoing projects.
  • Stage (II): make it compatible with ObjectRefs per your comments.

Thoughts? Do you think this is a viable roadmap? One concern of mine is that if we do (I) without worrying (II) at this moment, I am unsure how far we'll go and how many code needs to be redone (e.g. the stream/communicator management is a bit tricky based on my prototyping experience in the past several days) both in this project and in NumS and RayML (because they will collectively communicate tensors but not ObjectRefs).

Stephanie:

but yeah, i think the roadmap makes sense.
i do think we should try to think about which parts can be done easily right now though. i agree with pushing off the ObjectRef-compatible API for now, but we should strongly consider requiring collective groups to be declared at task/actor invocation time

Hao:

yeah, that part should be doable!

@zhisbug
Copy link
Contributor Author

zhisbug commented Nov 19, 2020

Posting earlier conversations on slack, between myself and Richard (@richardliaw ):

Hao:

if working with ObjectRef, then the contents to be reduced must be in ray object store. But the object store does not have cuda awareness now? it will be extremely slow, and incompatible with NCCL. So to make it work, we need to add some cuda runtime into object store. am I correct?

Richard:

not necessarily, Stephanie is proposing a CPU object store so that it does not need to go into Plasma. It stores just a reference to the cuda tensor, i.e., don’t touch it.

Hao:

yeah that might work. I am thinking of extending this project into two stages:

  • (first stage) do what I proposed in the RFC.
  • (second stage) making it compatible with refs, following some of Stephanie's idea on creating a Python object store.

Richard:

What are the main overhead in stage I -- implementing NCCL/MPI collectives on Tensors in Ray? How do streams interact with communicators?

Hao:

There are three complicated parts:

  1. we need to have some good caching mechanism for communicators; repeatedly declaring new communicators in NCCL is very wasty and slow;
  2. for a single collective group, If using the default stream to do compute + comm, we lose any chance to overlap; we need to maintain a dedicated stream on Cuda to do things and synchronize streams only when appropriate; this becomes a bit more complicated with there are multiple GPUs, and each GPU has a few streams to sync...
  3. we need to plan a bit for the future; considering a distributed ML strategy that massively mixes data parallelism+model parallelisms, this makes it more complicated as one needs to create multiple collective groups and manage multiple groups of communicators (hence threads or streams)... while we might do (3) later anyway.

communicator can assign a stream to take care of the communication.

@richardliaw
Copy link
Contributor

@zhisbug your comment about the 2 stages is now cut off

@robertnishihara
Copy link
Collaborator

Thanks @zhisbug for sharing the proposal! The examples in the RFC are super helpful. In order to evaluate the APIs a bit more, it'd be nice to see them in the context of concrete use cases. For example, it'd be helpful to see how would they be used by SpaCy or other libraries.

Your proposal seems slightly more general than @stephanie-wang's (since you can have an allreduce with both Pytorch actors and Cupy actors. So it'd be interesting to know if there are use cases that benefit from that extra flexibility.

ray.collective.init_collective_group seems to require gang scheduling, so this might make sense to integrate with placement groups. In your design, that would be done at the application level by the user. In @stephanie-wang's API, it'd probably be done in the collective library.

Are there other collectives that make sense to support beyond allreduce?

Is point to point GPU communication out of scope for this doc?

@zhisbug
Copy link
Contributor Author

zhisbug commented Nov 21, 2020

Hi @robertnishihara: really appreciate the feedback; see responses inlined below.

Thanks @zhisbug for sharing the proposal! The examples in the RFC are super helpful. In order to evaluate the APIs a bit more, it'd be nice to see them in the context of concrete use cases. For example, it'd be helpful to see how would they be used by SpaCy or other libraries.

For Spacy pipelines to benefit from this infra, we can implement a high-level parameter server strategy or AllReduce strategy based on Ray APIs and this set of collective APIs; since the communication (esp. on GPUs) will be taken care by CCL libraries, the performance can be improved significantly compared to what they have now; I'll defer the PS implementation using these APIs to a later RFC, but you can think in a way that these distributed ML strategies, such as a sharded PS, are just a series of CCL-based reduce + broadcast calls (but much more optimized and efficient than RPCs).

Your proposal seems slightly more general than @stephanie-wang's (since you can have an allreduce with both Pytorch actors and Cupy actors. So it'd be interesting to know if there are use cases that benefit from that extra flexibility.

Consider RAG or Realm-like ML model where some part of the code was written in Cupy, numpy, or TensorFlow (e.g. an embedding or database look_up) while some parts are implemented in PyTorch (e.g. layers of transformers). When we model-parallelize this model across distributed GPUs we might send tensors from a tensorflow endpoint to a pytorch endpoint. While I understand this is just an imaginary case but it involved little effort to make this interoperability happen, without making the API confusing.

ray.collective.init_collective_group seems to require gang scheduling, so this might make sense to integrate with placement groups. In your design, that would be done at the application level by the user. In @stephanie-wang's API, it'd probably be done in the collective library.

Good point. Yeah, gang scheduling might be needed to make sure no deadlocks in the proposed APIs. We'll look into it a bit.

Are there other collectives that make sense to support beyond allreduce?

Is point to point GPU communication out of scope for this doc?

Yes, we will bring in all available APIs in NCCL/MPI. Take NCCL as an example, this includes: reduce, allreduce, broadcast, allgather, reduce, reducescatter, and gpu2gpu send/recv (NCCL >= 2.7.4). See here.

Regarding the proposed APIs and Stephanie's API suggestion: we might expose both, since the set of APIs this RFC proposed are slightly lower-level; hence we can implement Stephanie's APIs on top of it (i.e. collective group declaration happens at actor/task creation) and make them the recommended ones.
Supporting NCCL and MPI-backed collective over ObjectRefs will be investigated in the second stage (maybe in another RFC later).

@stephanie-wang
Copy link
Contributor

I agree with @zhisbug that the APIs are compatible. The one that I suggested with ObjectRefs is just a higher-level version that would allow the user to not have to think at the level of "send/recv".

Your proposal seems slightly more general than @stephanie-wang's (since you can have an allreduce with both Pytorch actors and Cupy actors. So it'd be interesting to know if there are use cases that benefit from that extra flexibility.

I don't think that's true. Both proposals would support such a use case (see the linked gist).

ray.collective.init_collective_group seems to require gang scheduling, so this might make sense to integrate with placement groups. In your design, that would be done at the application level by the user. In @stephanie-wang's API, it'd probably be done in the collective library.

Actually, in both cases I think this should be done by the user. Ray could provide some integration (e.g., automatically create a placement group for a collective), but I don't think it makes sense since a user can already do it easily with the existing placement group API.

In general, the reasoning behind the API that I proposed was to make it more declarative, which allows Ray to take on more functionality (by making the calls to the lower-level collective API internally). For example, part of the API that I proposed was to move the declaration of collective groups into task/actor invocation. This would let us provide better errors if gang scheduling fails, e.g., if an actor can't be scheduled. If the user is the one specifying when to initialize the communication group, then the system can't do anything except wait since it doesn't know which task/actor was supposed to be part of the communication group

@richardliaw
Copy link
Contributor

I'm ok with merging this, though maybe we can just have a separate repo in the org for RFC/community stuff. Thoughts @anabranch @zhe-thoughts @ericl @edoakes ?

@zhisbug
Copy link
Contributor Author

zhisbug commented Dec 21, 2020

Closing this PR. I am going to move it to ray-project/RFC repo per our discussion last week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants