Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions vllm/distributed/elastic_ep/elastic_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,45 @@ def receive_expert_mapping(self) -> tuple[torch.Tensor, int, int]:
def prepare_new_worker(self) -> None:
with set_current_vllm_config(self.worker.vllm_config):
prepare_communication_buffer_for_model(self.worker.model_runner.get_model())

def rewarm_workspace(self) -> None:
# Must run on every DP sibling in lockstep: _dummy_run calls
# coordinate_batch_across_dp whenever data_parallel_size > 1
# (gpu_model_runner.py:3663), which deadlocks if any rank skips it.

# Save and clear block tables so profile_run/compile_or_warm_up_model
# don't write dummy slot mappings into real KV-cache blocks (mirrors
# switch_and_prepare's pattern).
multi_block_table = self.worker.model_runner.input_batch.block_table
saved_block_tables: list[tuple[torch.Tensor, torch.Tensor]] = []
for bt in multi_block_table.block_tables:
saved_block_tables.append(
(bt.block_table.gpu.clone(), bt.block_table.cpu.clone())
)
multi_block_table.clear()
Comment on lines +575 to +584

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any model runner state that we're missing here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the code at here, I duplicated it from https://github.com/haosdent/vllm/blob/53dc9f3923a8438b7fd31f4c348a59e17894a453/vllm/distributed/elastic_ep/elastic_execute.py#L435-L451

Or we add something like model_runner.prepare_for_warmup() , restore_after_warmup() to avoid duplicate?


# _ensure_workspace_size allocates a fresh tensor on grow, leaving
# captured CUDA graphs with stale data pointers; drop graphs before
# re-warm so captures realign with the resized buffer.
self._release_cuda_graphs()
unlock_workspace()

# Grow the MoE workspace at max_num_tokens.
# compile_or_warm_up_model alone only exercises cudagraph-capture
# sizes (≤64 tokens for this test) and leaves the workspace at
# ~10-14 MB; the post-all-to-all per-rank token count under real
# post-reshuffle routing needs hundreds of MB. Use _dummy_run
# directly (rather than profile_run) with skip_eplb=True so dummy
# routing doesn't pollute the just-rebalanced EPLB stats — same
# convention compile_or_warm_up_model itself uses.
runner = self.worker.model_runner
runner._dummy_run(runner.max_num_tokens, is_profile=True, skip_eplb=True)
self.worker.compile_or_warm_up_model()

lock_workspace()

for bt, (saved_gpu, saved_cpu) in zip(
multi_block_table.block_tables, saved_block_tables
):
bt.block_table.gpu.copy_(saved_gpu)
bt.block_table.cpu.copy_(saved_cpu)
5 changes: 5 additions & 0 deletions vllm/distributed/elastic_ep/elastic_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ def _eplb_reshuffle(self):
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("perform_eplb_reshuffle",)
)
# Reshuffle changes per-rank token routing; the locked MoE workspace
# may now be too small. Rewarm covers both new and existing engines.
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("rewarm_workspace",)
)
assert self.new_dp_group is not None
if self.new_dp_group.rank() == 0:
logger.info("[Elastic EP] EPLB reshuffle completed")
Expand Down
26 changes: 15 additions & 11 deletions vllm/v1/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,18 @@ def get_device_indices(
return value


def _apply_dp_identity_suffix(dp_vllm_config, dp_rank: int) -> None:
# Ray actor names (RayExecutorV2) and KV-connector engine_ids must
# be unique across sibling DP engines or registration collides.
# Use the global DP rank, not a node-local rank, since sibling DP
# engines can span multiple nodes.
dp_vllm_config.instance_id = f"{dp_vllm_config.instance_id}_dp{dp_rank}"
if dp_vllm_config.kv_transfer_config is not None:
dp_vllm_config.kv_transfer_config.engine_id = (
f"{dp_vllm_config.kv_transfer_config.engine_id}_dp{dp_rank}"
)


class CoreEngineActorManager:
"""
Utility class to handle creation, readiness, and shutdown
Expand Down Expand Up @@ -404,20 +416,10 @@ def __init__(
):
dp_vllm_config = copy.deepcopy(vllm_config)
if dp_size > 1:
# Append the DP rank to instance_id so that per-engine
# identifiers (e.g. Ray actor names in RayExecutorV2) are
# unique across DP replicas.
dp_vllm_config.instance_id = f"{dp_vllm_config.instance_id}_dp{index}"
_apply_dp_identity_suffix(dp_vllm_config, index)
dp_vllm_config.parallel_config.placement_group = pg
local_client = index < local_engine_count

if dp_size > 1 and dp_vllm_config.kv_transfer_config is not None:
# modify the engine_id and append the local_dp_rank to it to ensure
# that the kv_transfer_config is unique for each DP rank.
dp_vllm_config.kv_transfer_config.engine_id = (
f"{dp_vllm_config.kv_transfer_config.engine_id}_dp{local_index}"
)

# Ray XPU known issue: dpctl initializes the GPU runtime early, so
# setting device env vars in Ray actor's initialization method
# will not affect device selection. See:
Expand Down Expand Up @@ -789,6 +791,8 @@ def scale_up_elastic_ep(
for i, (pg, local_rank) in enumerate(zip(placement_groups, local_dp_ranks)):
rank = cur_data_parallel_size + i
dp_vllm_config = copy.deepcopy(cur_vllm_config)
if new_data_parallel_size > 1:
_apply_dp_identity_suffix(dp_vllm_config, rank)
dp_vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
dp_vllm_config.parallel_config.placement_group = pg

Expand Down
Loading