Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ef5723c
eep phase: stateless group + CUDA graph support
libertyeagle Sep 26, 2025
66ce382
fix precommit
tlrmchlsmth Jan 5, 2026
f2d6300
require current vllm config to be set in init_model_parallel as well
tlrmchlsmth Jan 14, 2026
a02b00e
partially handle review comments from cursor
tlrmchlsmth Jan 14, 2026
d76dfea
use allgather_reducescatter instead of pplx
tlrmchlsmth Jan 14, 2026
38ffbf8
update
tlrmchlsmth Jan 14, 2026
7e0cd87
world_size -> world_size_across_dp for executor selection
tlrmchlsmth Jan 15, 2026
65829d7
dummy_weights -> load_dummy_weights and fix cpu model runner
tlrmchlsmth Jan 26, 2026
31b3bb7
fixup
tlrmchlsmth Jan 27, 2026
998169b
set current vllm config in tests
tlrmchlsmth Jan 27, 2026
1700fcf
more vllm config wrestling
tlrmchlsmth Jan 27, 2026
d9914ff
More current_vllm_config fixes
tlrmchlsmth Jan 28, 2026
fced7ec
precommit
tlrmchlsmth Jan 28, 2026
dad7698
[CI Fix] Fix tests to set vllm_config before initialize_model_parallel
rtourgeman Feb 15, 2026
5718393
[CI Fix] Fix circular reference between AsyncLLM and output_handler
rtourgeman Feb 15, 2026
f685cf2
Remove support dynamo
itayalroy Feb 15, 2026
e169d1e
Create a stateless EPLB group for elastic EP
itayalroy Feb 15, 2026
88c8aeb
Pass sp_size to FusedMoEParallelConfig.make
itayalroy Feb 15, 2026
ffcfa2c
Fix eplb is async field name
itayalroy Feb 15, 2026
3ae0a4e
Fix is_ep_communicator check
itayalroy Feb 17, 2026
5553e32
Reinit modular kernel on EP scaling events
itayalroy Feb 16, 2026
47ad0b0
Implement destroy for DeepEP
itayalroy Feb 16, 2026
cbdce04
Torch recompile on existing ranks after scale up/down
itayalroy Feb 17, 2026
60fb2a5
Single api server in test elastic ep
itayalroy Feb 17, 2026
037ba81
Reduce memory util in test_elastic_ep.py
itayalroy Feb 17, 2026
30bbd48
Force stop ray procs between tests
itayalroy Feb 17, 2026
5ac19a9
Defer elastic EP port allocation to after ray.init()
itayalroy Feb 18, 2026
de2fec6
Graceful comm group destruction on scale-down
itayalroy Feb 18, 2026
833bbc8
Move elastic EP standby groups into dedicated module
itayalroy Feb 20, 2026
69363f0
Deduplicate local_all_ranks calculation
itayalroy Feb 20, 2026
9643d19
Fix vllm config in tests
itayalroy Feb 22, 2026
ac220cb
Require PP=1 for elastic EP
itayalroy Feb 22, 2026
35dad3f
Fix ZMQ port TOCTOU in MPClient
itayalroy Feb 22, 2026
8394d65
Revert pplx kernels installation changes
itayalroy Feb 24, 2026
369bbd3
Fix dp weight transfer send/recv mismatch
itayalroy Feb 25, 2026
1d8a0f4
Remove unused param max_concurrent_workers
itayalroy Feb 25, 2026
942b156
Update dp_size in vllm_config used in create_standby_groups
itayalroy Feb 25, 2026
bb00d97
Fixed staged barrier possible race
itayalroy Feb 25, 2026
9da3694
Fix test_elastic_ep path
itayalroy Feb 26, 2026
d87fe82
Merge branch 'main' into eep_m2_rebase
tlrmchlsmth Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .buildkite/test_areas/expert_parallelism.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,19 @@ steps:
- tests/distributed/test_eplb_execute.py
commands:
- pytest -v -s distributed/test_eplb_execute.py
- pytest -v -s distributed/test_eplb_spec_decode.py
- pytest -v -s distributed/test_eplb_spec_decode.py

- label: Elastic EP Scaling Test
timeout_in_minutes: 20
device: b200
optional: true
working_dir: "/vllm-workspace/tests"
num_devices: 4
source_file_dependencies:
- vllm/distributed/
- vllm/engine/
- vllm/executor/
- vllm/compilation/
- tests/distributed/
commands:
- pytest -v -s distributed/test_elastic_ep.py
8 changes: 3 additions & 5 deletions tests/compile/passes/distributed/test_async_tp.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ def async_tp_pass_on_test_model(

# initialize distributed
init_distributed_environment()
initialize_model_parallel(tensor_model_parallel_size=world_size)

# configure vllm config for SequenceParallelismPass
vllm_config = VllmConfig()
Expand All @@ -334,11 +333,10 @@ def async_tp_pass_on_test_model(
model=model_name, trust_remote_code=True, dtype=dtype, seed=42
)

async_tp_pass = AsyncTPPass(vllm_config)

# Set the global vllm_config for TestBackend which calls
# get_current_vllm_config()
with set_current_vllm_config(vllm_config):
initialize_model_parallel(tensor_model_parallel_size=world_size)

async_tp_pass = AsyncTPPass(vllm_config)
backend = TestBackend(async_tp_pass)

assert (
Expand Down
2 changes: 1 addition & 1 deletion tests/compile/passes/distributed/test_fusion_all_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ def all_reduce_fusion_pass_on_test_model(
)

init_distributed_environment()
initialize_model_parallel(tensor_model_parallel_size=world_size)

custom_ops = []
if enable_rms_norm_custom_op:
Expand All @@ -304,6 +303,7 @@ def all_reduce_fusion_pass_on_test_model(
model=model_name, trust_remote_code=True, dtype=dtype, seed=42
)
with set_current_vllm_config(vllm_config):
initialize_model_parallel(tensor_model_parallel_size=world_size)
all_reduce_fusion_pass = AllReduceFusionPass(vllm_config)
noop_pass = NoOpEliminationPass(vllm_config)
func_pass = FixFunctionalizationPass(vllm_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ def sequence_parallelism_pass_on_test_model(

# initialize distributed
init_distributed_environment()
initialize_model_parallel(tensor_model_parallel_size=world_size)

# configure vllm config for SequenceParallelismPass
custom_ops_list = custom_ops.split(",") if custom_ops else []
Expand Down Expand Up @@ -272,6 +271,7 @@ def sequence_parallelism_pass_on_test_model(
)

with set_current_vllm_config(vllm_config):
initialize_model_parallel(tensor_model_parallel_size=world_size)
noop_pass = NoOpEliminationPass(vllm_config)
sequence_parallelism_pass = SequenceParallelismPass(vllm_config)
cleanup_pass = PostCleanupPass(vllm_config)
Expand Down
22 changes: 13 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,20 @@ def init_test_http_connection():

@pytest.fixture
def dist_init():
from tests.utils import ensure_current_vllm_config

temp_file = tempfile.mkstemp()[1]
init_distributed_environment(
world_size=1,
rank=0,
distributed_init_method=f"file://{temp_file}",
local_rank=0,
backend="nccl",
)
initialize_model_parallel(1, 1)
yield

with ensure_current_vllm_config():
init_distributed_environment(
world_size=1,
rank=0,
distributed_init_method=f"file://{temp_file}",
local_rank=0,
backend="nccl",
)
initialize_model_parallel(1, 1)
yield
cleanup_dist_env_and_memory()


Expand Down
7 changes: 6 additions & 1 deletion tests/distributed/eplb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import torch
import torch.multiprocessing as mp

from vllm.config import VllmConfig, set_current_vllm_config
from vllm.distributed.parallel_state import (
init_distributed_environment,
)
Expand Down Expand Up @@ -42,7 +43,11 @@ def set_env_vars_and_device(env: dict[str, str]) -> None:
local_rank = os.environ["LOCAL_RANK"]
device = torch.device(f"cuda:{local_rank}")
torch.cuda.set_device(device)
init_distributed_environment()

# Create a minimal vllm config for init_distributed_environment
vllm_config = VllmConfig()
with set_current_vllm_config(vllm_config):
init_distributed_environment()

# Ensure each worker process has the same random seed
random.seed(42)
Expand Down
202 changes: 202 additions & 0 deletions tests/distributed/test_elastic_ep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import os
import subprocess
import time

import pytest
import requests

from ..evals.gsm8k.gsm8k_eval import evaluate_gsm8k
from ..utils import RemoteOpenAIServer, multi_gpu_test


@pytest.fixture(autouse=True)
def cleanup_ray_between_tests():
"""Force-stop any lingering Ray processes between tests."""
subprocess.run(["ray", "stop", "--force"], timeout=30, capture_output=True)
time.sleep(5)
yield


MODEL_NAME = "deepseek-ai/DeepSeek-V2-Lite-Chat"

NUM_GSM8K_QUESTIONS = 256
EXPECTED_ACCURACY = 0.58
ACCURACY_TOL = 0.08
MAX_NUM_SEQS = 32


def _send_scale_command(server: RemoteOpenAIServer, new_dp_size: int) -> bool:
url = server.url_for("scale_elastic_ep")
payload = {"new_data_parallel_size": new_dp_size}
headers = {"Content-Type": "application/json"}

try:
response = requests.post(url, json=payload, headers=headers, timeout=300)
return response.status_code == 200
except requests.exceptions.RequestException:
return False


def _run_gsm8k_eval(server: RemoteOpenAIServer, stage: str) -> float:
assert server.port is not None
result = evaluate_gsm8k(
num_questions=NUM_GSM8K_QUESTIONS,
host=f"http://{server.host}",
port=server.port,
)
accuracy = result["accuracy"]
print(
f"[{stage}] GSM8K accuracy: {accuracy:.3f} "
f"({result['num_questions']} questions)"
)
assert accuracy >= EXPECTED_ACCURACY, (
f"[{stage}] GSM8K accuracy {accuracy:.3f} is below "
f"expected threshold {EXPECTED_ACCURACY}"
)
return accuracy


@multi_gpu_test(num_gpus=4)
def test_elastic_ep_scaling():
vllm_serve_args = [
"--trust-remote-code",
"--tensor-parallel-size",
"1",
"--gpu-memory-utilization",
"0.8",
"--max-model-len",
"4096",
"--max-num-seqs",
str(MAX_NUM_SEQS),
"--enable-expert-parallel",
"--all2all-backend",
"allgather_reducescatter",
"--enable-elastic-ep",
"--enable-eplb",
"--eplb-config.num_redundant_experts",
"0",
"--data-parallel-backend",
"ray",
"--data-parallel-size",
"2",
"--api-server-count",
"1",
Comment on lines +85 to +86
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itayalroy do you know why this is needed? Does something go wrong when the api server count is greater than 1?

The default is to set the api server count equal to the number of DP ranks, so would be good to make an issue if this is broken

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API server maintains state across scaling events (the core_engines list, scaling flags, etc.). To support multiple API servers, we need to either sync this state between them or ensure all scaling requests are handled by the same API server- Neither is implemented in this PR.

This issue comes from the original PR, see comment in parallel.py L661:

"Elastic EP is not compatible with data_parallel_external_lb "
"or data_parallel_hybrid_lb. Elastic EP relies on a single API "
"server and core client to coordinate scale up/down."

]

leader_address = os.environ.get("LEADER_ADDRESS")
if leader_address:
vllm_serve_args.extend(["--data-parallel-address", leader_address])

with RemoteOpenAIServer(
MODEL_NAME, vllm_serve_args, env_dict={}, max_wait_seconds=1200
) as server:
initial_accuracy = _run_gsm8k_eval(server, "Initial (2 GPUs)")

assert _send_scale_command(server, 4)
time.sleep(10)
scale_up_accuracy = _run_gsm8k_eval(server, "After scale up (4 GPUs)")

assert scale_up_accuracy >= initial_accuracy - ACCURACY_TOL, (
f"Scale up accuracy {scale_up_accuracy:.3f} dropped more than "
f"{ACCURACY_TOL} below initial accuracy {initial_accuracy:.3f}"
)

assert _send_scale_command(server, 2)
time.sleep(5)
scale_down_accuracy = _run_gsm8k_eval(server, "After scale down (2 GPUs)")

assert scale_down_accuracy >= initial_accuracy - ACCURACY_TOL, (
f"Scale down accuracy {scale_down_accuracy:.3f} dropped more than "
f"{ACCURACY_TOL} below initial accuracy {initial_accuracy:.3f}"
)

print("\nAccuracy Summary:")
print(f" Initial: {initial_accuracy:.3f}")
print(
f" Scale up: {scale_up_accuracy:.3f} "
f"(diff: {scale_up_accuracy - initial_accuracy:+.3f})"
)
print(
f" Scale down: {scale_down_accuracy:.3f} "
f"(diff: {scale_down_accuracy - initial_accuracy:+.3f})"
)
print(f" Tolerance: {ACCURACY_TOL:.3f}")


@multi_gpu_test(num_gpus=4)
def test_elastic_ep_scaling_uneven():
"""Test scale up with uneven worker distribution.

This tests the case where num_new_workers % old_dp_size != 0,
specifically 2 -> 3 where remainder = 1 % 2 = 1.
This exercises the remainder handling in sender-receiver pairing.
"""
vllm_serve_args = [
"--trust-remote-code",
"--tensor-parallel-size",
"1",
"--gpu-memory-utilization",
"0.8",
"--max-model-len",
"4096",
"--max-num-seqs",
str(MAX_NUM_SEQS),
"--enable-expert-parallel",
"--all2all-backend",
"allgather_reducescatter",
"--enable-elastic-ep",
"--enable-eplb",
"--eplb-config.num_redundant_experts",
"0",
"--data-parallel-backend",
"ray",
"--data-parallel-size",
"2",
"--api-server-count",
"1",
]

leader_address = os.environ.get("LEADER_ADDRESS")
if leader_address:
vllm_serve_args.extend(["--data-parallel-address", leader_address])

with RemoteOpenAIServer(
MODEL_NAME, vllm_serve_args, env_dict={}, max_wait_seconds=1200
) as server:
initial_accuracy = _run_gsm8k_eval(server, "Initial (2 GPUs)")

# Scale 2 -> 3: This has remainder = 1 % 2 = 1
# Tests uneven sender-receiver pairing
assert _send_scale_command(server, 3)
time.sleep(10)
scale_up_accuracy = _run_gsm8k_eval(server, "After scale up (3 GPUs)")

assert scale_up_accuracy >= initial_accuracy - ACCURACY_TOL, (
f"Scale up accuracy {scale_up_accuracy:.3f} dropped more than "
f"{ACCURACY_TOL} below initial accuracy {initial_accuracy:.3f}"
)

# Scale back down to 2
assert _send_scale_command(server, 2)
time.sleep(5)
scale_down_accuracy = _run_gsm8k_eval(server, "After scale down (2 GPUs)")

assert scale_down_accuracy >= initial_accuracy - ACCURACY_TOL, (
f"Scale down accuracy {scale_down_accuracy:.3f} dropped more than "
f"{ACCURACY_TOL} below initial accuracy {initial_accuracy:.3f}"
)

print("\nAccuracy Summary (Uneven Scaling):")
print(f" Initial: {initial_accuracy:.3f}")
print(
f" Scale up: {scale_up_accuracy:.3f} "
f"(diff: {scale_up_accuracy - initial_accuracy:+.3f})"
)
print(
f" Scale down: {scale_down_accuracy:.3f} "
f"(diff: {scale_down_accuracy - initial_accuracy:+.3f})"
)
print(f" Tolerance: {ACCURACY_TOL:.3f}")
Loading