Skip to content

[parallel state Refactor 3/n] Unify communicator interface#21098

Open
DarkSharpness wants to merge 6 commits intosgl-project:mainfrom
DarkSharpness:refactor_parallel_state
Open

[parallel state Refactor 3/n] Unify communicator interface#21098
DarkSharpness wants to merge 6 commits intosgl-project:mainfrom
DarkSharpness:refactor_parallel_state

Conversation

@DarkSharpness
Copy link
Copy Markdown
Collaborator

Motivation

#20866, #20871

Modifications

This PR refactors the distributed communication stack in srt to make communicator behavior explicit, composable, and easier to extend across backends.

The main change is introducing a unified BaseCommunicator abstraction and moving backend selection logic out of the large ad hoc branches in parallel_state.py into a reusable CommunicatorImpl dispatcher. This keeps the platform-specific implementations local to each communicator while preserving the existing runtime behavior at the group level.

Previously, communicator behavior was spread across backend-specific helpers and parallel_state.py, with different backends exposing slightly different methods such as custom_all_reduce, backend-specific should_* predicates, or implicit in-place assumptions.

With BaseCommunicator, each backend now exposes the same public contract:

  • can_all_reduce() reports whether a communicator can handle the input, and if so whether it supports in-place, out-of-place, or both modes.
  • all_reduce(..., inplace=...) is the single all-reduce entry point for all backends.
  • should_use_custom_op() tells the dispatcher whether the communicator should run through registered custom ops for torch.compile / piecewise-cuda-graph-friendly execution.
  • graph_capture_context() lets each communicator define any graph-capture-specific setup in one place.

This makes communicator selection data-driven instead of backend-name-driven, and removes a large amount of backend-specific branching from the group coordinator.

Accuracy Tests

Benchmarking and Profiling

Checklist

Review Process

  1. Ping Merge Oncalls to start the PR flow. See the PR Merge Process.
  2. Get approvals from CODEOWNERS and other reviewers.
  3. Trigger CI tests with comments or contact authorized users to do so.
    • /tag-run-ci-label, /rerun-failed-ci, /tag-and-rerun-ci
  4. After green CI and required approvals, ask Merge Oncalls to merge.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant refactoring of the distributed communication stack to enhance modularity, extensibility, and maintainability. By establishing a unified BaseCommunicator interface and a CommunicatorImpl dispatcher, it abstracts away backend-specific details, allowing for more consistent and data-driven selection of communication methods. This change simplifies the core parallel_state logic, making it easier to integrate new communication backends and manage existing ones, ultimately improving the overall robustness and flexibility of the distributed system.

Highlights

  • Unified Communicator Interface: Introduced a new BaseCommunicator abstract class and an AllReduceMode enum to define a standardized contract for distributed communication operations. This centralizes common logic and makes backend-specific implementations more explicit and consistent.
  • Centralized Dispatching Logic: Implemented a CommunicatorImpl class that acts as a dispatcher, selecting the appropriate backend communicator based on the operation and tensor properties. This moves complex selection logic out of parallel_state.py and into a reusable, data-driven component.
  • Refactored Existing Communicators: Updated CustomAllreduce, CustomAllReduceV2, HpuCommunicator, PyMscclppCommunicator, PyNcclCommunicator, QuickAllReduce, and TorchSymmMemCommunicator to inherit from BaseCommunicator and conform to its new interface, including can_all_reduce and a unified all_reduce method.
  • New Communicator Implementations: Added TorchDefaultCommunicator for standard PyTorch distributed operations, AiterCustomAllReduce as a wrapper for Aiter's custom all-reduce, and PyNcclSymmMemCommunicator for NCCL with symmetric memory allocation.
  • Simplified Parallel State Management: The GroupCoordinator in parallel_state.py was significantly refactored to leverage the new CommunicatorImpl, reducing its complexity and making it easier to manage various communication backends. Direct calls to specific communicators were replaced with calls to the unified self.impl interface.
  • Removed Deprecated Components: The NpuCommunicator and all_reduce_utils.py files were removed, streamlining the codebase and consolidating functionality within the new architecture.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed refactoring of the distributed communication stack. The new BaseCommunicator abstraction and CommunicatorImpl dispatcher create a unified, data-driven, and more extensible system for handling different communication backends. The changes are clean, modular, and align perfectly with the stated goals. I have one suggestion regarding a potential issue with torch.compile compatibility in the new custom op registration.

@DarkSharpness
Copy link
Copy Markdown
Collaborator Author

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@DarkSharpness
Copy link
Copy Markdown
Collaborator Author

DarkSharpness commented Apr 3, 2026

/tag-and-rerun-ci try again

@github-actions github-actions bot added the run-ci label Apr 3, 2026
_GROUPS: Dict[str, Callable[[], Optional[CommunicatorImpl]]] = {}


def _register_group(group: CommunicatorImpl) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to know why we use weakref here?

if mode is not None and _is_mode_supported(mode, inplace):
if not comm.should_use_custom_op():
return comm.all_reduce(input_, inplace=inplace)
use_inplace = _can_use_inplace(mode) if inplace is None else inplace
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: I’m wondering whether users can tell that we’re using inplace operations here, and whether there’s a risk of it being misused.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If inplace is specified (not None), it must be strictly followed. Otherwise, it depends on the backend (it will choose the fastest possible implementation).

Copy link
Copy Markdown
Collaborator

@BBuf BBuf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a large change, so before merging we should make sure all CI checks passes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants