-
-
Notifications
You must be signed in to change notification settings - Fork 11.2k
Add pynccl all-gatherv and reducescatterv #20154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @trevor-m, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly enhances the distributed communication capabilities by introducing variable-sized all_gather and reduce_scatter operations (all_gatherv and reducescatterv) to the pynccl backend. This change is vital for optimizing distributed training workflows, particularly for Mixture-of-Experts (MoE) models, by allowing for more efficient data handling without the overhead of padding.
Highlights
- New Collective Operations: Implemented
all_gathervandreducescattervwithin thePyNcclCommunicatorto support distributed tensor operations where input/output sizes can vary per rank. This is a crucial enhancement for flexible distributed training. - MoE Optimization: These new variable-sized collectives are specifically designed to enable more efficient Mixture-of-Experts (MoE) dispatch and combine operations in distributed parallel (DP) settings, by eliminating the need for padding, which can lead to significant performance gains.
- NCCL Primitives Integration: Extended the
pyncclwrapper to expose the underlyingncclReduce,ncclGroupStart, andncclGroupEndNCCL primitives. These low-level functions are utilized to implement the complex logic required for the variable-sizedall_gatherv(viancclBroadcastcalls within a group) andreducescatterv(viancclReducecalls within a group) operations. - Comprehensive Testing: Added new unit tests (
test_pynccl_all_gathervandtest_pynccl_reduce_scatterv) totests/distributed/test_pynccl.pyto thoroughly validate the correctness and functionality of the newly introduced variable-sized collective operations across multiple GPUs.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces new collective communication operations (all-gatherv and reduce-scatterv) using PyNCCL. The changes include modifications to the base device communicator, CUDA communicator, and PyNCCL wrapper to support these new operations. Tests have also been added to verify the correctness of the new implementations. The code appears to be well-structured and addresses the intended functionality. However, there are some areas where improvements can be made to enhance efficiency, error handling, and code clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion assert dim == 0, "only dim 0 all-gatherv is supported" will always raise an error if dim is not 0. It might be better to raise a NotImplementedError to indicate that other dimensions are not supported.
| assert dim == 0, "only dim 0 all-gatherv is supported" | |
| world_size = self.world_size | |
| pynccl_comm = self.pynccl_comm | |
| assert pynccl_comm is not None and not pynccl_comm.disabled | |
| if dim != 0: | |
| raise NotImplementedError("only dim 0 all-gatherv is supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, better to raise rather than assert here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition sizes is not None is checked twice, once here and once in the all_gather function. Consider refactoring to avoid this redundancy and improve maintainability.
| if sizes is not None: | |
| assert output_tensor.shape[0] == sum(sizes) | |
| numel_base = int(np.prod(output_tensor.shape[1:])) | |
| split_offset = 0 | |
| self.nccl.ncclGroupStart() | |
| for root, split_size in enumerate(sizes): | |
| dst_slice = output_tensor[split_offset:split_offset + | |
| split_size] | |
| self.nccl.ncclBroadcast( | |
| buffer_type(input_tensor.data_ptr()), | |
| buffer_type(dst_slice.data_ptr()), | |
| split_size * numel_base, | |
| ncclDataTypeEnum.from_torch(input_tensor.dtype), | |
| root, | |
| self.comm, | |
| cudaStream_t(stream.cuda_stream), | |
| ) | |
| split_offset += split_size | |
| self.nccl.ncclGroupEnd() | |
| if sizes is not None: | |
| assert output_tensor.shape[0] == sum(sizes) | |
| numel_base = int(np.prod(output_tensor.shape[1:])) | |
| split_offset = 0 | |
| self.nccl.ncclGroupStart() | |
| for root, split_size in enumerate(sizes): | |
| dst_slice = output_tensor[split_offset:split_offset + | |
| split_size] | |
| self.nccl.ncclBroadcast( | |
| buffer_type(input_tensor.data_ptr()), | |
| buffer_type(dst_slice.data_ptr()), | |
| split_size * numel_base, | |
| ncclDataTypeEnum.from_torch(input_tensor.dtype), | |
| root, | |
| self.comm, | |
| cudaStream_t(stream.cuda_stream), | |
| ) | |
| split_offset += split_size | |
| self.nccl.ncclGroupEnd() | |
| return # early return to avoid duplicate code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for reduce_scatter when sizes is not None uses ncclReduce in a loop. This approach might not be the most efficient way to perform reduce scatter with varying sizes. It could be beneficial to investigate alternative approaches or optimized NCCL functions that can handle this operation more efficiently. Also, it is not clear if the root argument in ncclReduce is used correctly, since it is always the same as enumerate(sizes).
| if sizes is not None: | |
| numel_base = int(np.prod(input_tensor.shape[1:])) | |
| split_offset = 0 | |
| self.nccl.ncclGroupStart() | |
| for root, split_size in enumerate(sizes): | |
| chunk = input_tensor[split_offset:split_offset + split_size, ...] | |
| self.nccl.ncclReduce( | |
| buffer_type(chunk.data_ptr()), | |
| buffer_type(output_tensor.data_ptr()), | |
| split_size * numel_base, | |
| ncclDataTypeEnum.from_torch(input_tensor.dtype), | |
| ncclRedOpTypeEnum.from_torch(op), | |
| root, | |
| self.comm, | |
| cudaStream_t(stream.cuda_stream) | |
| ) | |
| split_offset += split_size | |
| self.nccl.ncclGroupEnd() | |
| if sizes is not None: | |
| numel_base = int(np.prod(input_tensor.shape[1:])) | |
| split_offset = 0 | |
| self.nccl.ncclGroupStart() | |
| for root, split_size in enumerate(sizes): | |
| chunk = input_tensor[split_offset:split_offset + split_size, ...] | |
| self.nccl.ncclReduce( | |
| buffer_type(chunk.data_ptr()), | |
| buffer_type(output_tensor.data_ptr()), | |
| split_size * numel_base, | |
| ncclDataTypeEnum.from_torch(input_tensor.dtype), | |
| ncclRedOpTypeEnum.from_torch(op), | |
| root, | |
| self.comm, | |
| cudaStream_t(stream.cuda_stream) | |
| ) | |
| split_offset += split_size | |
| self.nccl.ncclGroupEnd() | |
|
QQ - how should I think about these kernels in comparison to PPLX and DeepEP? |
These collectives are used by TRT-LLM for the "min latency mode" path. Do pplx/deepep target latency or max throughput? @robertgshaw2-redhat Allgatherv will outperform the current naive_multicast used for MoE dispatch and reducescatterv will outperform the current allreduce+slice combine. Being able to all-gather on a list of tensors will also allow us to quantize to fp4 before all gather. |
tlrmchlsmth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please run the pre-commit to fix the linter errors https://docs.vllm.ai/en/stable/contributing/#code-quality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, better to raise rather than assert here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks interesting -- How do ncclGroupStart() and ncclGroupEnd() work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These functions allow you batch multiple NCCL calls into one single launch.
You can read more about it here: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/groups.html
Signed-off-by: Trevor Morris <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last comment - I think it would be cleaner and clearer if the all_gather and all_gatherv implementations were completely separate. Right now it's slightly awkward that all_gatherv calls pynccl all_gather with a list of sizes. Ditto for reduce_scatter/reduce_scatterv.
Otherwise looks good to me, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tlrmchlsmth Could you take a look?
Signed-off-by: Trevor Morris <[email protected]>
Signed-off-by: Trevor Morris <[email protected]>
Signed-off-by: Trevor Morris <[email protected]>
Signed-off-by: Trevor Morris <[email protected]>
Signed-off-by: Trevor Morris <[email protected]>
Signed-off-by: Trevor Morris <[email protected]>
| dim: int) -> torch.Tensor: | ||
| return self.device_communicator.all_gather(input_, dim) | ||
|
|
||
| def all_gatherv(self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The self.use_custom_call_op in this file is set in a way that the newly added ag and rs features cannot be enabled simply by toggling an environment variable.
mgoin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me, just one nit
tlrmchlsmth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
Signed-off-by: Trevor Morris <[email protected]>
|
@trevor-m It seems there is an issue with your tests on the CI, PTAL https://buildkite.com/vllm/ci/builds/23741/steps/canvas?sid=0197faaf-9704-4b36-a45d-24076ea18d02#0197faaf-97ff-4e37-b73c-8d7912f280a9/6-10388 EDIT: I pushed a fix since it was just the func rename |
Signed-off-by: mgoin <[email protected]>
|
Thank you @mgoin ! |
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]> Signed-off-by: x22x22 <[email protected]>
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]>
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]>
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]> Signed-off-by: Jinzhen Lin <[email protected]>
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]> Signed-off-by: Paul Pak <[email protected]>
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]> Signed-off-by: Diego-Castan <[email protected]>
Signed-off-by: Trevor Morris <[email protected]> Signed-off-by: mgoin <[email protected]> Co-authored-by: mgoin <[email protected]>
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.Purpose
Adds all-gatherv which can support varying sizes per rank and a list of input tensors.
Adds reduce-scatterv which can support varying sizes per rank.
These new collectives can be used for MoE dispatch/combine for DP without padding. See #20037 for usage.
Test Plan
Added tests to test_pynccl.py
Test Result
(Optional) Documentation Update