Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 13 additions & 31 deletions vllm/distributed/elastic_ep/elastic_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,23 +432,6 @@ def switch_and_prepare(self) -> None:
compilation_counter.stock_torch_compile_count += 1
self.worker.model_runner.model.compile(fullgraph=True, backend=backend)

multi_block_table = self.worker.model_runner.input_batch.block_table
saved_block_tables: list[tuple[torch.Tensor, torch.Tensor]] = []
for bt in multi_block_table.block_tables:
saved_block_tables.append(
(bt.block_table.gpu.clone(), bt.block_table.cpu.clone())
)
multi_block_table.clear()

unlock_workspace()
self.worker.compile_or_warm_up_model()
lock_workspace()

for bt, (saved_gpu, saved_cpu) in zip(
multi_block_table.block_tables, saved_block_tables
):
bt.block_table.gpu.copy_(saved_gpu)
bt.block_table.cpu.copy_(saved_cpu)
if new_dp_size < old_dp_size:
self._set_eplb_suppressed(False)

Expand Down Expand Up @@ -567,14 +550,13 @@ def prepare_new_worker(self) -> None:
with set_current_vllm_config(self.worker.vllm_config):
prepare_communication_buffer_for_model(self.worker.model_runner.get_model())

def rewarm_workspace(self) -> None:
def warm_and_capture(self) -> None:
# Must run on every DP sibling in lockstep: _dummy_run calls
# coordinate_batch_across_dp whenever data_parallel_size > 1
# (gpu_model_runner.py:3663), which deadlocks if any rank skips it.

# Save and clear block tables so profile_run/compile_or_warm_up_model
# don't write dummy slot mappings into real KV-cache blocks (mirrors
# switch_and_prepare's pattern).
# Save and clear block tables so the dummy MoE forward doesn't
# write dummy slot mappings into real KV-cache blocks.
multi_block_table = self.worker.model_runner.input_batch.block_table
saved_block_tables: list[tuple[torch.Tensor, torch.Tensor]] = []
for bt in multi_block_table.block_tables:
Expand All @@ -584,19 +566,19 @@ def rewarm_workspace(self) -> None:
multi_block_table.clear()

# _ensure_workspace_size allocates a fresh tensor on grow, leaving
# captured CUDA graphs with stale data pointers; drop graphs before
# re-warm so captures realign with the resized buffer.
# any captured CUDA graph with a stale data pointer; drop graphs
# before re-warm so captures realign with the resized buffer.
# No-op if switch_and_prepare already released them on this rank.
self._release_cuda_graphs()
unlock_workspace()

# Grow the MoE workspace at max_num_tokens.
# compile_or_warm_up_model alone only exercises cudagraph-capture
# sizes (≤64 tokens for this test) and leaves the workspace at
# ~10-14 MB; the post-all-to-all per-rank token count under real
# post-reshuffle routing needs hundreds of MB. Use _dummy_run
# directly (rather than profile_run) with skip_eplb=True so dummy
# routing doesn't pollute the just-rebalanced EPLB stats — same
# convention compile_or_warm_up_model itself uses.
# Grow the MoE workspace at max_num_tokens. compile_or_warm_up_model
# alone only exercises cudagraph-capture sizes (≤64 tokens) and
# leaves the workspace at ~10-14 MB; the post-all-to-all per-rank
# token count under real post-reshuffle routing needs hundreds of
# MB. Use _dummy_run directly with skip_eplb=True so dummy routing
# doesn't pollute the just-rebalanced EPLB stats — same convention
# compile_or_warm_up_model itself uses.
runner = self.worker.model_runner
runner._dummy_run(runner.max_num_tokens, is_profile=True, skip_eplb=True)
self.worker.compile_or_warm_up_model()
Expand Down
13 changes: 10 additions & 3 deletions vllm/distributed/elastic_ep/elastic_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ def _progress_remaining_engine(self) -> bool:
# e.g., to drain in-batch requests.
self._create_standby_groups()
self._switch_and_prepare()
# MoE workspace was sized for the old (larger) EP; per-rank
# M_full grows after scale-down because each remaining rank
# now serves more local experts. Run the unified warm-and-
# capture phase to grow + recapture under the new topology.
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("warm_and_capture",)
)
self._update_parallel_config()
self.state = ScaleDownRemainingEngineState.COMPLETE
return True
Expand Down Expand Up @@ -538,10 +545,10 @@ def _eplb_reshuffle(self):
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("perform_eplb_reshuffle",)
)
# Reshuffle changes per-rank token routing; the locked MoE workspace
# may now be too small. Rewarm covers both new and existing engines.
# Single warm + cudagraph-capture phase for the new topology; covers
# both new and existing engines on scale-up.
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("rewarm_workspace",)
"elastic_ep_execute", args=("warm_and_capture",)
)
assert self.new_dp_group is not None
if self.new_dp_group.rank() == 0:
Expand Down
8 changes: 7 additions & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,14 @@ def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:

vllm_config.validate_block_size()

# Initialize kv cache and warmup the execution
self.model_executor.initialize_from_config(kv_cache_configs)
# For elastic-EP scale-up, defer warmup: new-engine workers must not
# enter coordinate_batch_across_dp during init or they deadlock
# against existing workers (which haven't joined the new DP group
# yet). warm_and_capture in _eplb_reshuffle runs warmup later in
# lockstep across all DP ranks.
if not envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self.model_executor.compile_or_warm_up_model()

elapsed = time.time() - start
compile_time = vllm_config.compilation_config.compilation_time
Expand Down
8 changes: 4 additions & 4 deletions vllm/v1/executor/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ def _init_executor(self) -> None:
raise NotImplementedError

def initialize_from_config(self, kv_cache_configs: list[KVCacheConfig]) -> None:
"""
Initialize the KV caches and begin the model execution loop of the
underlying workers.
"""
"""Initialize the KV caches on the underlying workers."""
self.collective_rpc("initialize_from_config", args=(kv_cache_configs,))

def compile_or_warm_up_model(self) -> None:
"""Compile/warm up the model and capture cudagraphs on workers."""
compilation_times: list[CompilationTimes] = self.collective_rpc(
"compile_or_warm_up_model"
)
Expand Down
Loading