Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f4df2af
Wip
ilmarkov Nov 24, 2025
a46c72a
Optimize weight rearrange with numpy
ilmarkov Nov 25, 2025
561b427
Add preserve expert on the same slot within gpu optimization
ilmarkov Nov 25, 2025
691f090
Merge branch 'main' into imarkov/eplb_optimizations
ilmarkov Nov 25, 2025
b853314
Merge branch 'main' into imarkov/eplb_optimizations
ilmarkov Nov 26, 2025
30bab97
Edit config and fix config post_init
ilmarkov Nov 26, 2025
c6f14d1
Optimize after codex review
ilmarkov Nov 26, 2025
0808374
Vectorize get_ep_ranks_with_experts
ilmarkov Nov 26, 2025
599648b
Merge branch 'main' into imarkov/eplb_optimizations
ilmarkov Dec 9, 2025
b462872
Fix pre-commit
ilmarkov Dec 9, 2025
60f744d
Merge branch 'main' into imarkov/eplb_optimizations
ilmarkov Dec 9, 2025
cfac6b3
Remove eplb config fix
ilmarkov Dec 9, 2025
6b2a1de
Updates after review
ilmarkov Dec 10, 2025
fc54d76
Correct eplb state logs
ilmarkov Dec 11, 2025
f28720d
Remove layer grouping
ilmarkov Dec 11, 2025
ab0ca86
Futher optimize rearrange
ilmarkov Dec 11, 2025
b57a045
Merge branch 'main' into imarkov/eplb_optimizations
ilmarkov Dec 11, 2025
208b51b
Upd
ilmarkov Dec 11, 2025
a5ecdc1
Add comments
ilmarkov Dec 11, 2025
040ae89
Address review comments
ilmarkov Dec 12, 2025
9a41b91
Fix precommit
ilmarkov Dec 12, 2025
7d0ab7d
Refactor tests and address nits
ilmarkov Dec 15, 2025
11c492a
Merge branch 'main' into imarkov/eplb_optimizations
ilmarkov Dec 15, 2025
1f90b1f
Update eplb config checks
ilmarkov Dec 16, 2025
def3415
Merge branch 'main' into imarkov/eplb_optimizations
tlrmchlsmth Jan 6, 2026
2134c9b
Fix test
ilmarkov Jan 7, 2026
4dc455b
Prettify the log
ilmarkov Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions tests/distributed/test_eplb_algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,143 @@ def test_additional_cases():
print(phy2log)

test_basic_rebalance()


def _make_phy_replicas_idx_from_phy2log(phy2log: torch.Tensor) -> torch.Tensor:
"""Create replicas indices mapping from phy2log"""
pr = torch.zeros_like(phy2log)
for layer in range(phy2log.shape[0]):
seen: dict[int, int] = {}
row = phy2log[layer].tolist()
for i, expert in enumerate(row):
r = seen.get(expert, 0)
pr[layer, i] = r
seen[expert] = r + 1
return pr


def _validate_intragpu_rearrangement(
old_global_expert_indices: torch.Tensor,
new_phy2log: torch.Tensor,
new_phy_replicas_idx: torch.Tensor,
post_phy2log: torch.Tensor,
post_phy_replicas_idx: torch.Tensor,
num_ranks: int,
slots_per_gpu: int,
):
# Per-GPU checks
for gpu_idx in range(num_ranks):
start = gpu_idx * slots_per_gpu
end = start + slots_per_gpu
old_seg = old_global_expert_indices[0, start:end]
new_seg = new_phy2log[0, start:end]
new_rnk = new_phy_replicas_idx[0, start:end]
post_seg = post_phy2log[0, start:end]
post_rnk = post_phy_replicas_idx[0, start:end]

# Pairwise equality for (expert, rank) pairs to ensure nothing is lost
def sorted_pairs(seg: torch.Tensor, rnk: torch.Tensor):
pairs = list(zip(seg.tolist(), rnk.tolist()))
pairs.sort()
return pairs

assert sorted_pairs(post_seg, post_rnk) == sorted_pairs(new_seg, new_rnk), (
f"Per-GPU pairs of (expert,rank) must match new mapping for GPU {gpu_idx}"
)

# For experts that remain on the same GPU, the old slot is preserved
# for at least one occurrence; rank at that slot must be valid for that expert
old_list = old_seg.tolist()
new_list = new_seg.tolist()
post_list = post_seg.tolist()
remained = set(old_list) & set(new_list)
new_ranks_for_expert: dict[int, list[int]] = {}
for v, r in zip(new_list, new_rnk.tolist()):
new_ranks_for_expert.setdefault(v, []).append(r)
for expert in remained:
old_pos = old_list.index(expert)
assert post_list[old_pos] == expert, (
f"Expert {expert} on GPU {gpu_idx} should stay at old slot {old_pos}"
)
# Rank at preserved slot must be one of the ranks
# the expert has in new mapping
assert post_rnk.tolist()[old_pos] in new_ranks_for_expert[expert], (
f"Rank for expert {expert} at preserved slot on GPU {gpu_idx} "
"must come from new mapping"
)


@pytest.mark.parametrize(
"num_ranks, slots_per_gpu, old_phy2log, new_phy2log",
[
pytest.param(
# Setup: 2 GPUs, 4 slots each, 1 layer
# Old mapping: GPU0 -> [0,1,2,3], GPU1 -> [4,5,6,7]
# New mapping shuffles within GPU0 and brings 4,5 into GPU0.
# GPU0 new -> [1,5,0,4]; GPU1 new -> [6,2,7,3]
2,
4,
torch.tensor([[0, 1, 2, 3, 4, 5, 6, 7]]),
torch.tensor([[1, 5, 0, 4, 6, 2, 7, 3]]),
id="simple",
),
pytest.param(
# Setup: 2 GPUs, 5 slots each (total 10 physical experts), 1 layer
# Old mapping:
# GPU0 -> [0, 1, 0, 2, 3] (expert 0 duplicated)
# GPU1 -> [4, 5, 6, 1, 2]
# New mapping reorders within GPUs and moves some experts across GPUs,
# while still including duplicates:
# GPU0 new -> [0, 5, 4, 0, 1] (expert 0 duplicated, 4/5 incoming)
# GPU1 new -> [6, 2, 3, 2, 1] (expert 2 duplicated)
2,
5,
torch.tensor([[0, 1, 0, 2, 3, 4, 5, 6, 1, 2]]),
torch.tensor([[0, 5, 4, 0, 1, 6, 2, 3, 2, 1]]),
id="duplicates",
),
pytest.param(
# Setup: 3 GPUs, 4 slots each (total 12 physical experts), 1 layer
# Old mapping:
# GPU0 -> [0, 1, 2, 3]
# GPU1 -> [0, 1, 2, 3]
# GPU2 -> [0, 1, 2, 3]
# New mapping decides to use one expert on 2 GPUs and shuffles
# experts on the third GPU,
# GPU0 new -> [0, 0, 0, 0]
# GPU1 new -> [0, 0, 0, 0]
# GPU2 new -> [1, 2, 3, 0]
3,
4,
torch.tensor([[0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3]]),
torch.tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 0]]),
id="skewed_expert",
),
],
)
def test_preserve_intragpu_slots(
num_ranks: int,
slots_per_gpu: int,
old_phy2log: torch.Tensor,
new_phy2log: torch.Tensor,
):
"""Experts that stay on a GPU keep their old slots; incoming not lost."""
phy_replicas_idx = _make_phy_replicas_idx_from_phy2log(new_phy2log)

post_phy2log, post_phy_replicas_idx = DefaultEplbPolicy.preserve_intragpu_slots(
new_phy2log, phy_replicas_idx, num_ranks, old_phy2log
)

# Shapes preserved
assert post_phy2log.shape == new_phy2log.shape
assert post_phy_replicas_idx.shape == phy_replicas_idx.shape

_validate_intragpu_rearrangement(
old_phy2log,
new_phy2log,
phy_replicas_idx,
post_phy2log,
post_phy_replicas_idx,
num_ranks,
slots_per_gpu,
)
17 changes: 9 additions & 8 deletions tests/distributed/test_eplb_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,32 +286,33 @@ def _test_async_transfer_layer_without_mtp_worker(
device,
old_indices,
)
old_indices_cpu = old_indices.cpu()
new_indices_cpu = new_indices.cpu()

expert_buffer = [torch.empty_like(w) for w in expert_weights[0]]
cuda_stream = torch.cuda.Stream(device=device)

for layer_idx in range(num_layers):
is_unchanged, is_received_locally, experts_recv_loc = asyncio.run(
is_unchanged, is_received_locally, recv_metadata = asyncio.run(
transfer_layer(
old_global_expert_indices=old_indices,
new_global_expert_indices=new_indices,
old_global_expert_indices=old_indices_cpu,
new_global_expert_indices=new_indices_cpu,
expert_weights=expert_weights,
expert_weights_buffer=expert_buffer,
ep_group=ep_group,
layer=layer_idx,
cuda_stream=cuda_stream,
)
)

cuda_stream.synchronize()
move_from_buffer(
expert_weights=expert_weights[layer_idx],
expert_weights_buffer=expert_buffer,
expert_weights_buffers=expert_buffer,
is_unchanged=is_unchanged,
is_received_locally=is_received_locally,
experts_recv_loc=experts_recv_loc,
new_indices=new_indices[layer_idx].tolist(),
ep_group=ep_group,
recv_metadata=recv_metadata,
new_indices=new_indices_cpu[layer_idx],
ep_rank=ep_rank,
)

verify_expert_weights_after_shuffle(
Expand Down
12 changes: 12 additions & 0 deletions vllm/config/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class EPLBConfig:
Log the balancedness each step of expert parallelism.
This is turned off by default since it will cause communication overhead.
"""
log_balancedness_interval: int = 1
"""
Interval for logging the balancedness.
"""
use_async: bool = False
"""
Whether to use non-blocking EPLB.
Expand All @@ -77,6 +81,14 @@ class EPLBConfig:
policy: EPLBPolicyOption = "default"
"""The policy type for expert parallel load balancing (EPLB)."""

@model_validator(mode="after")
def _validate_eplb_config(self) -> Self:
if self.use_async and self.policy != "default":
raise ValueError("Async EPLB is only supported with the default policy.")
if self.log_balancedness and self.log_balancedness_interval <= 0:
raise ValueError("log_balancedness_interval must be greater than 0.")
return self


@config
@dataclass
Expand Down
2 changes: 1 addition & 1 deletion vllm/distributed/eplb/async_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def transfer_run_periodically(
(
model_state.is_unchanged,
model_state.is_received_locally,
model_state.experts_recv_loc,
model_state.recv_metadata,
) = await transfer_layer(
old_global_expert_indices=model_state.physical_to_logical_map,
new_global_expert_indices=model_state.new_physical_to_logical_map,
Expand Down
Loading