Skip to content
Merged
1 change: 1 addition & 0 deletions .buildkite/test-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1037,3 +1037,4 @@ steps:
num_gpus: 2
commands:
- pytest -v -s tests/distributed/test_context_parallel.py
- pytest -v -s tests/distributed/test_nccl_symm_mem_allreduce.py
26 changes: 24 additions & 2 deletions benchmarks/kernels/benchmark_device_communicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
CustomAllreduce (oneshot, twoshot), PyNcclCommunicator,
and SymmMemCommunicator (multimem, two-shot).

for NCCL symmetric memory you need to set the environment variables
NCCL_NVLS_ENABLE=1 NCCL_CUMEM_ENABLE=1 VLLM_USE_NCCL_SYMM_MEM=1, otherwise NCCL does
not use fast NVLS implementation for all reduce.

Usage:
torchrun --nproc_per_node=<N> benchmark_device_communicators.py [options]

Expand All @@ -26,7 +30,13 @@
from torch.distributed import ProcessGroup

from vllm.distributed.device_communicators.custom_all_reduce import CustomAllreduce
from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator
from vllm.distributed.device_communicators.pynccl import (
PyNcclCommunicator,
register_nccl_symmetric_ops,
)
from vllm.distributed.device_communicators.pynccl_allocator import (
set_graph_pool_id,
)
from vllm.distributed.device_communicators.symm_mem import SymmMemCommunicator
from vllm.logger import init_logger
from vllm.utils import FlexibleArgumentParser
Expand Down Expand Up @@ -98,6 +108,7 @@ def _init_communicators(self):
)
if not self.pynccl_comm.disabled:
logger.info("Rank %s: PyNcclCommunicator initialized", self.rank)
register_nccl_symmetric_ops(self.pynccl_comm)
else:
logger.info("Rank %s: PyNcclCommunicator disabled", self.rank)
self.pynccl_comm = None
Expand Down Expand Up @@ -194,6 +205,15 @@ def benchmark_allreduce(
None, # no env variable needed
)
)
communicators.append(
(
"pynccl-symm",
lambda t: torch.ops.vllm.all_reduce_symmetric_with_copy(t),
lambda t: True, # Always available if initialized
nullcontext(),
None, # no env variable needed
)
)

if self.symm_mem_comm_multimem is not None:
comm = self.symm_mem_comm_multimem
Expand Down Expand Up @@ -271,7 +291,9 @@ def benchmark_allreduce_single(
# Capture the graph using context manager
with context:
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph):
graph_pool = torch.cuda.graph_pool_handle()
set_graph_pool_id(graph_pool)
with torch.cuda.graph(graph, pool=graph_pool):
for _ in range(CUDA_GRAPH_CAPTURE_CYCLES):
allreduce_fn(graph_input)

Expand Down
94 changes: 94 additions & 0 deletions tests/distributed/test_nccl_symm_mem_allreduce.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not actually run in CI as we need to add it to a job in .buildkite/test-pipeline.yaml

It doesn't seem to work on my L40s or H100 node, so do we need to restrict it to some compute capability or library availability? We can enable this in a followup PR if this is complex.

Copy link
Contributor Author

@Amir-19 Amir-19 Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it fail on your l40s and h100 nodes or skipped?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It fails to compile and then fails

WARNING 09-22 17:26:06 [pynccl_allocator.py:95] Failed to compile NCCL memory allocator. Symmetric memory will be disabled. This is expected if NCCL headers are not available. optionally set VLLM_NCCL_INCLUDE_PATH to point to a directory containing the NCCL header. Error: Error building extension 'nccl_allocator': [1/2] c++ -MMD -MF main.o.d -DTORCH_EXTENSION_NAME=nccl_allocator -DTORCH_API_INCLUDE_EXTENSION_H -DPYBIND11_COMPILER_TYPE=\"_gcc\" -DPYBIND11_STDLIB=\"_libstdcpp\" -DPYBIND11_BUILD_ABI=\"_cxxabi1018\" -I/home/mgoin/venvs/vllm/lib/python3.12/site-packages/nvidia/nccl/include -isystem /home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/include -isystem /home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/include/torch/csrc/api/include -isystem /usr/local/cuda-12.5/include -isystem /home/mgoin/.local/share/uv/python/cpython-3.12.10-linux-x86_64-gnu/include/python3.12 -fPIC -std=c++17 -c /tmp/main.cpp -o main.o 
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] [2/2] c++ main.o -shared -lnccl -L/home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/lib -lc10 -lc10_cuda -ltorch_cpu -ltorch_cuda -ltorch -ltorch_python -L/usr/local/cuda-12.5/lib64 -lcudart -o nccl_allocator.so
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] FAILED: [code=1] nccl_allocator.so 
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] c++ main.o -shared -lnccl -L/home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/lib -lc10 -lc10_cuda -ltorch_cpu -ltorch_cuda -ltorch -ltorch_python -L/usr/local/cuda-12.5/lib64 -lcudart -o nccl_allocator.so
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] /usr/bin/ld: cannot find -lnccl: No such file or directory
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] collect2: error: ld returned 1 exit status
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] ninja: build stopped: subcommand failed.
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] 
WARNING 09-22 17:26:06 [pynccl_allocator.py:95] Failed to compile NCCL memory allocator. Symmetric memory will be disabled. This is expected if NCCL headers are not available. optionally set VLLM_NCCL_INCLUDE_PATH to point to a directory containing the NCCL header. Error: /tmp/nccl_allocator.so: cannot open shared object file: No such file or directory
-------------------------------------------------------------------------------------------------------------------------------------------------------------- Captured stderr call ---------------------------------------------------------------------------------------------------------------------------------------------------------------
/home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/cuda/__init__.py:63: FutureWarning: The pynvml package is deprecated. Please install nvidia-ml-py instead. If you did not install pynvml directly, please report this to the maintainers of the package that installed pynvml for you.
  import pynvml  # type: ignore[import]
/home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/cuda/__init__.py:63: FutureWarning: The pynvml package is deprecated. Please install nvidia-ml-py instead. If you did not install pynvml directly, please report this to the maintainers of the package that installed pynvml for you.
  import pynvml  # type: ignore[import]
[rank1]:W0922 17:25:56.180000 2714803 torch/utils/cpp_extension.py:2425] TORCH_CUDA_ARCH_LIST is not set, all archs for visible cards are included for compilation. 
[rank1]:W0922 17:25:56.180000 2714803 torch/utils/cpp_extension.py:2425] If this is not desired, please set os.environ['TORCH_CUDA_ARCH_LIST'] to specific architectures.
[rank0]:[W922 17:26:06.579409377 ProcessGroupNCCL.cpp:1538] Warning: WARNING: destroy_process_group() was not called before program exit, which can leak resources. For more info, please see https://pytorch.org/docs/stable/distributed.html#shutdown (function operator())
W0922 17:26:07.139000 2714610 torch/multiprocessing/spawn.py:169] Terminating process 2714802 via signal SIGTERM
================================================================================================================================================================ warnings summary =================================================================================================================================================================
../../venvs/vllm/lib/python3.12/site-packages/torch/cuda/__init__.py:63
  /home/mgoin/venvs/vllm/lib/python3.12/site-packages/torch/cuda/__init__.py:63: FutureWarning: The pynvml package is deprecated. Please install nvidia-ml-py instead. If you did not install pynvml directly, please report this to the maintainers of the package that installed pynvml for you.
    import pynvml  # type: ignore[import]

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
============================================================================================================================================================= short test summary info =============================================================================================================================================================
FAILED tests/distributed/test_nccl_symm_mem_allreduce.py::test_nccl_symm_mem_allreduce[2] - torch.multiprocessing.spawn.ProcessRaisedException: 
========================================================================================================================================================== 1 failed, 1 warning in 17.51s ==========================================================================================================================================================

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import random
import typing

import pytest
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

import vllm.envs as envs
from vllm.distributed import cleanup_dist_env_and_memory
from vllm.distributed.device_communicators.cuda_communicator import (
CudaCommunicator)
from vllm.distributed.device_communicators.pynccl import (
register_nccl_symmetric_ops)
from vllm.distributed.device_communicators.pynccl_allocator import (
get_nccl_mem_pool, is_symmetric_memory_enabled)
from vllm.distributed.parallel_state import (get_tp_group,
init_distributed_environment,
initialize_model_parallel)
from vllm.platforms import current_platform
from vllm.utils import update_environment_variables

torch.manual_seed(42)
random.seed(44)

test_size_elements = 4 * 1024 * 1024


def nccl_symm_mem_allreduce_worker(local_rank: int, world_size: int):
monkeypatch = pytest.MonkeyPatch()
with monkeypatch.context() as m:
m.delenv("CUDA_VISIBLE_DEVICES", raising=False)
dtype = torch.bfloat16
device = torch.device(f"cuda:{local_rank}")
torch.cuda.set_device(device)
torch.set_default_device(device)
torch.set_default_dtype(dtype)
update_environment_variables({
"RANK": str(local_rank),
"LOCAL_RANK": str(local_rank),
"WORLD_SIZE": str(world_size),
"MASTER_ADDR": "localhost",
"MASTER_PORT": "12345",
})

init_distributed_environment()
initialize_model_parallel(tensor_model_parallel_size=world_size)

cuda_communicator = typing.cast(CudaCommunicator,
get_tp_group().device_communicator)
pynccl_comm = cuda_communicator.pynccl_comm
if get_nccl_mem_pool() is None:
pytest.skip("NCCL allocator compilation failed "
"(probably missing NCCL headers).")
if not is_symmetric_memory_enabled():
pytest.skip("NCCL symmetric memory allreduce is disabled.")

register_nccl_symmetric_ops(pynccl_comm)
input = torch.randint(1,
23, (test_size_elements, ),
dtype=dtype,
device=device)
input_clone = input.clone()
output = torch.ops.vllm.all_reduce_symmetric_with_copy(input)
assert output is not None

group = get_tp_group().device_group
dist.all_reduce(input_clone, group=group)
torch.testing.assert_close(output, input_clone, atol=2.5, rtol=0.1)


@pytest.mark.skipif(
not current_platform.is_cuda(),
reason="NCCLSymmMemAllreduce is only available for CUDA platforms.",
)
@pytest.mark.parametrize("world_size", [2])
@pytest.mark.skipif(envs.VLLM_TARGET_DEVICE not in ["cuda"],
reason="Only test on CUDA")
def test_nccl_symm_mem_allreduce(monkeypatch: pytest.MonkeyPatch, world_size):
if world_size > torch.cuda.device_count():
pytest.skip("Not enough GPUs to run the test.")

# Enable SymmMemCommunicator
monkeypatch.setenv("VLLM_USE_NCCL_SYMM_MEM", "1")
monkeypatch.setenv("NCCL_NVLS_ENABLE", "1")
monkeypatch.setenv("NCCL_CUMEM_ENABLE", "1")

mp.spawn(nccl_symm_mem_allreduce_worker,
args=(world_size, ),
nprocs=world_size)
cleanup_dist_env_and_memory()
6 changes: 6 additions & 0 deletions vllm/compilation/cuda_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from vllm.compilation.counter import compilation_counter
from vllm.compilation.monitor import validate_cudagraph_capturing_enabled
from vllm.config import CUDAGraphMode, VllmConfig
from vllm.distributed.device_communicators.pynccl_allocator import (
set_graph_pool_id)
from vllm.forward_context import BatchDescriptor, get_forward_context
from vllm.logger import init_logger
from vllm.platforms import current_platform
Expand Down Expand Up @@ -154,6 +156,10 @@ def __call__(self, *args, **kwargs):
stack.enter_context(
patch("torch.cuda.empty_cache", lambda: None))

if self.graph_pool is not None:
set_graph_pool_id(self.graph_pool)
else:
set_graph_pool_id(current_platform.graph_pool_handle())
# mind-exploding: carefully manage the reference and memory.
with torch.cuda.graph(cudagraph, pool=self.graph_pool):
# `output` is managed by pytorch's cudagraph pool
Expand Down
27 changes: 26 additions & 1 deletion vllm/distributed/device_communicators/all_reduce_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import tempfile
from collections.abc import Sequence
from itertools import product
from typing import Optional
from typing import Any, Optional

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

Expand Down Expand Up @@ -56,6 +57,30 @@
}
}

NCCL_SYMM_MEM_ALL_REDUCE_CONFIG: dict[str, Any] = {
"min_world_size": 4,
"thresholds": {
4: 2 * MiB, # 2 MB
8: 1 * MiB, # 1 MB
},
"always_use_above_world_size": 8 # Always use symm mem for world_size > 8
}


def should_nccl_symm_mem_allreduce(world_size: int,
input_tensor: torch.Tensor) -> bool:
from vllm.distributed.device_communicators.pynccl_allocator import (
is_symmetric_memory_enabled)
if not is_symmetric_memory_enabled():
return False
if world_size < NCCL_SYMM_MEM_ALL_REDUCE_CONFIG["min_world_size"]:
return False
threshold = NCCL_SYMM_MEM_ALL_REDUCE_CONFIG["thresholds"].get(world_size)
if threshold is not None and input_tensor.nbytes >= threshold:
return True
return (world_size
> NCCL_SYMM_MEM_ALL_REDUCE_CONFIG["always_use_above_world_size"])


def producer(batch_src: Sequence[int],
producer_queue,
Expand Down
15 changes: 15 additions & 0 deletions vllm/distributed/device_communicators/cuda_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from torch.distributed import ProcessGroup

import vllm.envs as envs
from vllm.distributed.device_communicators.all_reduce_utils import (
should_nccl_symm_mem_allreduce)
from vllm.distributed.device_communicators.pynccl import (
register_nccl_symmetric_ops)
from vllm.distributed.device_communicators.pynccl_allocator import (
is_symmetric_memory_enabled)
from vllm.logger import init_logger
from vllm.platforms import current_platform

Expand Down Expand Up @@ -53,6 +59,8 @@ def __init__(self,
group=self.cpu_group,
device=self.device,
)
if is_symmetric_memory_enabled():
register_nccl_symmetric_ops(self.pynccl_comm)

self.ca_comm: Optional[CustomAllreduce] = None
self.qr_comm: Optional[QuickAllReduce] = None
Expand Down Expand Up @@ -107,6 +115,13 @@ def __init__(self,
raise ValueError(f"Unknown all2all backend: {all2all_backend}")

def all_reduce(self, input_):
# since currently we perform copy input -> symm_input -> out-of-place AR
# return symm_output, we don't need to check if input is symmetric
if self.pynccl_comm is not None and \
should_nccl_symm_mem_allreduce(self.pynccl_comm.world_size,input_):
out = torch.ops.vllm.all_reduce_symmetric_with_copy(input_)
if out is not None:
return out
Comment on lines +122 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we assert that out is not None? When/why would we want to fall through to another method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic for the regular nccl all reduce is:

out = pynccl_comm.all_reduce(input_)
if out is None:
  # fall back to the default all-reduce using PyTorch.

and with nccl symm memory we are still calling pynccl_comm.all_reduce(input_)

# always try quick reduce first, then custom allreduce,
# and then pynccl. (quick reduce just for ROCM MI3*)
qr_comm = self.qr_comm
Expand Down
53 changes: 52 additions & 1 deletion vllm/distributed/device_communicators/pynccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,39 @@

logger = init_logger(__name__)

_NCCL_SYMM_OPS_REGISTERED = False


def register_nccl_symmetric_ops(pynccl_comm):
from vllm.distributed.device_communicators.pynccl_allocator import (
nccl_symm_mem_context)
from vllm.utils import direct_register_custom_op

global _NCCL_SYMM_OPS_REGISTERED
if _NCCL_SYMM_OPS_REGISTERED:
return
_NCCL_SYMM_OPS_REGISTERED = True

def all_reduce_symmetric_with_copy_impl(
input_tensor: torch.Tensor) -> torch.Tensor:
with nccl_symm_mem_context(pynccl_comm):
symm_input = torch.empty_like(input_tensor)
symm_output = torch.empty_like(input_tensor)
symm_input.copy_(input_tensor)
symm_output = pynccl_comm.all_reduce(symm_input, symm_output)
return symm_output

def all_reduce_symmetric_with_copy_fake(
input_tensor: torch.Tensor) -> torch.Tensor:
return torch.empty_like(input_tensor)

direct_register_custom_op(
op_name="all_reduce_symmetric_with_copy",
op_func=all_reduce_symmetric_with_copy_impl,
mutates_args=[],
fake_impl=all_reduce_symmetric_with_copy_fake,
)


class PyNcclCommunicator:

Expand Down Expand Up @@ -67,6 +100,7 @@ def __init__(
self.available = True
self.disabled = False

self.nccl_version = self.nccl.ncclGetRawVersion()
logger.info("vLLM is using nccl==%s", self.nccl.ncclGetVersion())

if self.rank == 0:
Expand Down Expand Up @@ -109,6 +143,7 @@ def __init__(

def all_reduce(self,
in_tensor: torch.Tensor,
out_tensor: torch.Tensor = None,
op: ReduceOp = ReduceOp.SUM,
stream=None) -> torch.Tensor:
if self.disabled:
Expand All @@ -120,7 +155,8 @@ def all_reduce(self,
f"this nccl communicator is created to work on {self.device}, "
f"but the input tensor is on {in_tensor.device}")

out_tensor = torch.empty_like(in_tensor)
if out_tensor is None:
out_tensor = torch.empty_like(in_tensor)

if stream is None:
stream = current_stream()
Expand Down Expand Up @@ -288,3 +324,18 @@ def group_start(self):

def group_end(self):
self.nccl.ncclGroupEnd()

def register_comm_window(self, tensor: torch.Tensor):
return self.nccl.ncclCommWindowRegister(
self.comm,
buffer_type(tensor.data_ptr()),
tensor.numel() * tensor.element_size(),
1,
)

def register_comm_window_raw(self, ptr: int, size: int):
return self.nccl.ncclCommWindowRegister(self.comm, buffer_type(ptr),
size, 1)

def deregister_comm_window(self, window):
return self.nccl.ncclCommWindowDeregister(self.comm, window)
Loading