Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/clang-format.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
FILE_PATTERN='\.(cpp|h|cc|c|cxx|hpp|cu|cuh)$'

echo "### Modified C/C++ files:"
FILES=$(git diff --name-only HEAD^1 HEAD | grep -E "$FILE_PATTERN") || true
FILES=$(git diff --name-only HEAD^1 HEAD -- . ':(exclude)examples/device/ep' | grep -E "$FILE_PATTERN") || true
[ -z "$FILES" ] && echo "(none)" || echo "$FILES"

echo "### clang format errors:"
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/copyright-check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ for f in $(git ls-files); do
*.png|*.jpg|*.jpeg|*.gif|*.ico|*.zip|*.rst|*.pyc|*.lock|*.md|*.svg|*.wrap|*.in|*.json|*.template|*.gitignore|*.python-version|*py.typed)
continue
;;
CODEOWNERS|LICENSE|Doxyfile|.clang-format|.clang-tidy|.codespellrc)
CODEOWNERS|*LICENSE*|Doxyfile|.clang-format|.clang-tidy|.codespellrc)
continue
;;
esac
Expand All @@ -39,7 +39,7 @@ for f in $(git ls-files); do

# Extract copyright years (handles YYYY or YYYY-YYYY)
copyright_years=$(echo "$header" | \
grep -Eo 'Copyright \(c\) [0-9]{4}(-[0-9]{4})?' | \
grep NVIDIA | grep -Eo 'Copyright \(c\) [0-9]{4}(-[0-9]{4})?' | \
sed -E 's/.* ([0-9]{4})(-[0-9]{4})?/\1\2/' || true)

if [[ -z "$copyright_years" ]]; then
Expand All @@ -57,7 +57,7 @@ for f in $(git ls-files); do
fi

# License line must exist
if ! echo "$header" | grep -Eq '^[[:space:]]*(#|//|\*|/\*|<!--)[[:space:]]*SPDX-License-Identifier:[[:space:]]*Apache-2\.0'; then
if ! echo "$header" | grep -Eq '^[[:space:]]*(#|//|\*|/\*|<!--)[[:space:]]*SPDX-License-Identifier:.*Apache-2\.0'; then
failures+=("$f (missing license)")
continue
fi
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ repos:
hooks:
- id: black
types_or: [python, cython]
exclude: ^examples/device/ep/

- repo: https://github.com/PyCQA/flake8
rev: 7.1.2
Expand Down
4 changes: 4 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ All code must adhere to the style guide in `docs/CodeStyle.md` and be formatted

## Contributing Process

Contributions to the code under `./examples/device/ep` (which is derived
from DeepEP and licensed under the MIT License) must be licensed under Apache
2.0.

Contributions that fix documentation errors or that make small changes
to existing code can be contributed directly by following the rules
below and submitting an appropriate PR.
Expand Down
9 changes: 9 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
NOTICE: The code under ./examples/device/ep is derived from DeepEP
(originally developed by DeepSeek) and has been modified for use in this
project. The original code was obtained from:
https://github.com/deepseek-ai/DeepEP (commit f0d34aabcb7bdcb3a05d022e7d11b3bf4ccf8ee8)
This code is licensed under the MIT License. The full text of the MIT
License can be found in ./examples/device/ep/LICENSE-DeepEP.
The rest of this codebase is licensed under the Apache License 2.0 as
described below.

Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
Expand Down
21 changes: 21 additions & 0 deletions examples/device/ep/LICENSE-DeepEP
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2025 DeepSeek

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
102 changes: 102 additions & 0 deletions examples/device/ep/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# NIXL EP: Expert-Parallel Communication Example

## Overview

NIXL EP is a complete example implementation of expert-parallel communication for Mixture of Experts (MoE) models built on top of [NIXL](https://github.com/ai-dynamo/nixl)'s device API. It provides elastic scaling capabilities, enabling dynamic addition and removal of processes (ranks) during runtime without disrupting existing connections, and leverages NIXL's RDMA and NVLink support for optimal performance.

## Features
- **Dispatch and Combine support**: Supports dispatch and combine operations for MoE inference
- **RDMA and NVLink support**: Utilizes NIXL's abstractions to support both RDMA and NVLink transports for optimal performance
- **Elastic Scaling**: Dynamically add or remove ranks during runtime

## Buffer Initialization

NIXL EP provides a flexible buffer initialization pattern that supports dynamic rank management:

```python
import nixl_ep

# Initialize buffer with dynamic rank support
buffer = nixl_ep.Buffer(rank, explicitly_destroy=True)
buffer.update_memory_buffers(num_ranks, num_experts_per_rank, rdma_bytes)
buffer.connect_ranks(initial_ranks)

# Dispatch & Combine calls
buffer.dispatch(...)
buffer.combine(...)

# Later: Connect new ranks dynamically
buffer.connect_ranks(ranks)

# Dispatch & Combine calls
buffer.dispatch(...)
buffer.combine(...)

# Disconnect ranks when scaling down
buffer.disconnect_ranks(ranks)
```

## Key APIs

- `Buffer(rank_id, nvlink_backend, explicitly_destroy)`: Initialize the NIXL communication buffer
- `update_memory_buffers(num_ranks, num_experts_per_rank, num_rdma_bytes)`: Prepare buffers for up to `num_ranks` ranks and `num_experts_per_rank` experts
- `connect_ranks(remote_ranks)`: Establish NIXL connections to new peers (can be called multiple times)
- `disconnect_ranks(remote_ranks)`: Clean up connections to departing peers

## Testing

The elastic test suite in `tests/elastic/` validates dynamic scaling capabilities:
- Plan files define scaling phases (representing an orchestrator)
- Tests validate correctness and measure bandwidth between scaling phases

**Example Plan** (`expansion_contraction.json`):
```json
[
[0, 1, 2, 3],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5]
]
```
This plan defines three phases:
- **Phase 0**: Initial state with ranks 0-3
- **Phase 1**: Ranks 4-7 are added dynamically (launched independently from initial ranks)
- **Phase 2**: Ranks 6-7 are removed dynamically

## Getting Started

#### Build NIXL with NIXL EP:

First, configure the pkg-config paths (only needed when dependencies are installed to non-default paths)

```bash
export PKG_CONFIG_PATH=<path to rdma-core install>/lib/pkgconfig:$PKG_CONFIG_PATH
export PKG_CONFIG_PATH=<path to UCX install>/lib/pkgconfig:$PKG_CONFIG_PATH
export PKG_CONFIG_PATH=<path to DOCA install>/lib/x86_64-linux-gnu/pkgconfig:$PKG_CONFIG_PATH
```

Then, configure the NIXL plugin directory so it can find UCX plugin, and set the LD_LIBRARY_PATH so UCX can find rdma-core:
```bash
export NIXL_PLUGIN_DIR=<path to NIXL install directory>/lib/x86_64-linux-gnu/plugins
export LD_LIBRARY_PATH=<path to rdma-core install>/lib:$LD_LIBRARY_PATH
```

Build and install:

```bash
meson setup build \
-Ducx_path=<path to UCX install> \
-Dprefix=<path to NIXL install directory> \
-Dbuildtype=release \
-Dbuild_nixl_ep=true

cd build
ninja install
```


Finally, configure PYTHONPATH to use NIXL EP:
```bash
export PYTHONPATH=<path to NIXL build directory>/examples/device/ep
```

Refer to [tests/elastic/README.md](tests/elastic/README.md) for detailed instructions on how to run the elastic test suite.
130 changes: 130 additions & 0 deletions examples/device/ep/csrc/config.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 DeepSeek
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* This file incorporates material from the DeepSeek project, licensed under the MIT License.
* The modifications made by NVIDIA are licensed under the Apache License, Version 2.0.
*
* SPDX-License-Identifier: MIT AND Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "kernels/api.cuh"
#include "kernels/exception.cuh"

namespace nixl_ep {

template <typename dtype_t>
dtype_t ceil_div(dtype_t a, dtype_t b) {
return (a + b - 1) / b;
}

template <typename dtype_t>
dtype_t align(dtype_t a, dtype_t b) {
return ceil_div<dtype_t>(a, b) * b;
}

struct EPBuffer {
int num_clean_int = 0;

void* dispatch_rdma_send_buffer = nullptr;
void* dispatch_rdma_recv_data_buffer = nullptr;
int* dispatch_rdma_recv_count_buffer = nullptr;

void* combine_rdma_send_buffer = nullptr;
void* combine_rdma_recv_data_buffer = nullptr;
int* combine_rdma_recv_flag_buffer = nullptr;

void* combine_rdma_send_buffer_data_start = nullptr;
size_t num_bytes_per_combine_msg = 0;

std::pair<int*, int> clean_meta() {
EP_HOST_ASSERT(dispatch_rdma_recv_count_buffer == combine_rdma_recv_flag_buffer);
return {dispatch_rdma_recv_count_buffer, num_clean_int};
}
};

struct EPLayout {
size_t total_bytes = 0;
EPBuffer buffers[2];

template <typename out_ptr_t = void*, typename count_ptr_t = uint8_t*, typename in_ptr_t = void*>
out_ptr_t advance(const in_ptr_t& ptr, size_t count) {
return reinterpret_cast<out_ptr_t>(reinterpret_cast<count_ptr_t>(ptr) + count);
}

EPLayout(void* rdma_buffer, int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) {
const int num_scales = hidden / 128;

// Dispatch and combine layout:
// - 2 symmetric odd/even send buffer
// - 2 symmetric odd/even receive buffers
// - 2 symmetric odd/even signaling buffers

// Message sizes
// NOTES: you should add a control `int4` for combine messages if you want to do data transformation
// NOTES: `num_scales * sizeof(nv_bfloat162)` means the per-128-channel min/max
EP_HOST_ASSERT(num_scales * sizeof(float) <= hidden);
size_t num_bytes_per_dispatch_msg = sizeof(int4) + std::max(hidden * sizeof(nv_bfloat16), hidden + num_scales * sizeof(float));
size_t num_bytes_per_combine_msg = num_scales * sizeof(nv_bfloat162) + hidden * sizeof(nv_bfloat16);

// Send buffer
size_t dispatch_send_buffer_bytes = num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg;
size_t combine_send_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg;
size_t send_buffer_bytes = std::max(dispatch_send_buffer_bytes, combine_send_buffer_bytes);
EP_HOST_ASSERT(send_buffer_bytes % sizeof(int4) == 0);
total_bytes += send_buffer_bytes * 2;

// Symmetric receive buffers
// TODO: optimize memory usages
size_t dispatch_recv_data_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_dispatch_msg;
size_t combine_recv_buffer_bytes = num_experts * num_max_dispatch_tokens_per_rank * num_bytes_per_combine_msg;
size_t recv_buffer_bytes = std::max(dispatch_recv_data_buffer_bytes, combine_recv_buffer_bytes);
EP_HOST_ASSERT(recv_buffer_bytes % sizeof(int4) == 0);
total_bytes += recv_buffer_bytes * 2;

// Symmetric signaling buffers
size_t dispatch_recv_count_buffer_bytes = num_experts * sizeof(int);
size_t combine_recv_flag_buffer_bytes = dispatch_recv_count_buffer_bytes;
size_t signaling_buffer_bytes = std::max(dispatch_recv_count_buffer_bytes, combine_recv_flag_buffer_bytes);
size_t signaling_buffer_bytes_aligned = align<size_t>(signaling_buffer_bytes, 128);
total_bytes += signaling_buffer_bytes_aligned * 2;

// Assign pointers
// NOTES: we still leave some space for distinguishing dispatch/combine buffer,
// so you may see some parameters are duplicated
for (int i = 0; i < 2; ++ i) {
buffers[i] = {
static_cast<int>(signaling_buffer_bytes / sizeof(int)),
advance(rdma_buffer, signaling_buffer_bytes_aligned * 2 + send_buffer_bytes * i),
advance(rdma_buffer, signaling_buffer_bytes_aligned * 2 + send_buffer_bytes * 2 + recv_buffer_bytes * i),
advance<int*>(rdma_buffer, signaling_buffer_bytes_aligned * i),
advance(rdma_buffer, signaling_buffer_bytes_aligned * 2 + send_buffer_bytes * i),
advance(rdma_buffer, signaling_buffer_bytes_aligned * 2 + send_buffer_bytes * 2 + recv_buffer_bytes * i),
advance<int*>(rdma_buffer, signaling_buffer_bytes_aligned * i),
advance(rdma_buffer, signaling_buffer_bytes_aligned * 2 + send_buffer_bytes * i),
num_bytes_per_combine_msg
};
}
}
};

size_t get_rdma_size_hint(int num_max_dispatch_tokens_per_rank, int hidden, int num_ranks, int num_experts) {
auto num_bytes = EPLayout(nullptr, num_max_dispatch_tokens_per_rank, hidden, num_ranks, num_experts).total_bytes;
return ((num_bytes + NUM_BUFFER_ALIGNMENT_BYTES) / NUM_BUFFER_ALIGNMENT_BYTES) * NUM_BUFFER_ALIGNMENT_BYTES;
}

} // namespace nixl_ep
65 changes: 65 additions & 0 deletions examples/device/ep/csrc/event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 DeepSeek
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* This file incorporates material from the DeepSeek project, licensed under the MIT License.
* The modifications made by NVIDIA are licensed under the Apache License, Version 2.0.
*
* SPDX-License-Identifier: MIT AND Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <ATen/cuda/CUDAContext.h>
#include <memory>

#include "kernels/exception.cuh"

namespace nixl_ep {

struct EventHandle {
std::shared_ptr<torch::Event> event;

EventHandle() {
event = std::make_shared<torch::Event>(torch::kCUDA);
event->record(at::cuda::getCurrentCUDAStream());
}

explicit EventHandle(const at::cuda::CUDAStream& stream) {
event = std::make_shared<torch::Event>(torch::kCUDA);
event->record(stream);
}

EventHandle(const EventHandle& other) = default;

void current_stream_wait() const {
at::cuda::getCurrentCUDAStream().unwrap().wait(*event);
}
};

torch::Event create_event(const at::cuda::CUDAStream &s) {
auto event = torch::Event(torch::kCUDA);
event.record(s);
return event;
}

void stream_wait(const at::cuda::CUDAStream& s_0, const at::cuda::CUDAStream& s_1) {
EP_HOST_ASSERT(s_0.id() != s_1.id());
s_0.unwrap().wait(create_event(s_1));
}

void stream_wait(const at::cuda::CUDAStream& s, const EventHandle& event) {
s.unwrap().wait(*event.event);
}

} // namespace nixl_ep
Loading