diff --git a/vllm/distributed/elastic_ep/elastic_execute.py b/vllm/distributed/elastic_ep/elastic_execute.py index 24979b62af6d..163cec47e4dd 100644 --- a/vllm/distributed/elastic_ep/elastic_execute.py +++ b/vllm/distributed/elastic_ep/elastic_execute.py @@ -566,3 +566,45 @@ def receive_expert_mapping(self) -> tuple[torch.Tensor, int, int]: def prepare_new_worker(self) -> None: with set_current_vllm_config(self.worker.vllm_config): prepare_communication_buffer_for_model(self.worker.model_runner.get_model()) + + def rewarm_workspace(self) -> None: + # Must run on every DP sibling in lockstep: _dummy_run calls + # coordinate_batch_across_dp whenever data_parallel_size > 1 + # (gpu_model_runner.py:3663), which deadlocks if any rank skips it. + + # Save and clear block tables so profile_run/compile_or_warm_up_model + # don't write dummy slot mappings into real KV-cache blocks (mirrors + # switch_and_prepare's pattern). + multi_block_table = self.worker.model_runner.input_batch.block_table + saved_block_tables: list[tuple[torch.Tensor, torch.Tensor]] = [] + for bt in multi_block_table.block_tables: + saved_block_tables.append( + (bt.block_table.gpu.clone(), bt.block_table.cpu.clone()) + ) + multi_block_table.clear() + + # _ensure_workspace_size allocates a fresh tensor on grow, leaving + # captured CUDA graphs with stale data pointers; drop graphs before + # re-warm so captures realign with the resized buffer. + self._release_cuda_graphs() + unlock_workspace() + + # Grow the MoE workspace at max_num_tokens. + # compile_or_warm_up_model alone only exercises cudagraph-capture + # sizes (≤64 tokens for this test) and leaves the workspace at + # ~10-14 MB; the post-all-to-all per-rank token count under real + # post-reshuffle routing needs hundreds of MB. Use _dummy_run + # directly (rather than profile_run) with skip_eplb=True so dummy + # routing doesn't pollute the just-rebalanced EPLB stats — same + # convention compile_or_warm_up_model itself uses. + runner = self.worker.model_runner + runner._dummy_run(runner.max_num_tokens, is_profile=True, skip_eplb=True) + self.worker.compile_or_warm_up_model() + + lock_workspace() + + for bt, (saved_gpu, saved_cpu) in zip( + multi_block_table.block_tables, saved_block_tables + ): + bt.block_table.gpu.copy_(saved_gpu) + bt.block_table.cpu.copy_(saved_cpu) diff --git a/vllm/distributed/elastic_ep/elastic_state.py b/vllm/distributed/elastic_ep/elastic_state.py index bace771a2ab6..256efe46a4a4 100644 --- a/vllm/distributed/elastic_ep/elastic_state.py +++ b/vllm/distributed/elastic_ep/elastic_state.py @@ -538,6 +538,11 @@ def _eplb_reshuffle(self): self.model_executor.collective_rpc( "elastic_ep_execute", args=("perform_eplb_reshuffle",) ) + # Reshuffle changes per-rank token routing; the locked MoE workspace + # may now be too small. Rewarm covers both new and existing engines. + self.model_executor.collective_rpc( + "elastic_ep_execute", args=("rewarm_workspace",) + ) assert self.new_dp_group is not None if self.new_dp_group.rank() == 0: logger.info("[Elastic EP] EPLB reshuffle completed") diff --git a/vllm/v1/engine/utils.py b/vllm/v1/engine/utils.py index 1f0b9bbb19d5..a28c1366dd84 100644 --- a/vllm/v1/engine/utils.py +++ b/vllm/v1/engine/utils.py @@ -308,6 +308,18 @@ def get_device_indices( return value +def _apply_dp_identity_suffix(dp_vllm_config, dp_rank: int) -> None: + # Ray actor names (RayExecutorV2) and KV-connector engine_ids must + # be unique across sibling DP engines or registration collides. + # Use the global DP rank, not a node-local rank, since sibling DP + # engines can span multiple nodes. + dp_vllm_config.instance_id = f"{dp_vllm_config.instance_id}_dp{dp_rank}" + if dp_vllm_config.kv_transfer_config is not None: + dp_vllm_config.kv_transfer_config.engine_id = ( + f"{dp_vllm_config.kv_transfer_config.engine_id}_dp{dp_rank}" + ) + + class CoreEngineActorManager: """ Utility class to handle creation, readiness, and shutdown @@ -404,20 +416,10 @@ def __init__( ): dp_vllm_config = copy.deepcopy(vllm_config) if dp_size > 1: - # Append the DP rank to instance_id so that per-engine - # identifiers (e.g. Ray actor names in RayExecutorV2) are - # unique across DP replicas. - dp_vllm_config.instance_id = f"{dp_vllm_config.instance_id}_dp{index}" + _apply_dp_identity_suffix(dp_vllm_config, index) dp_vllm_config.parallel_config.placement_group = pg local_client = index < local_engine_count - if dp_size > 1 and dp_vllm_config.kv_transfer_config is not None: - # modify the engine_id and append the local_dp_rank to it to ensure - # that the kv_transfer_config is unique for each DP rank. - dp_vllm_config.kv_transfer_config.engine_id = ( - f"{dp_vllm_config.kv_transfer_config.engine_id}_dp{local_index}" - ) - # Ray XPU known issue: dpctl initializes the GPU runtime early, so # setting device env vars in Ray actor's initialization method # will not affect device selection. See: @@ -789,6 +791,8 @@ def scale_up_elastic_ep( for i, (pg, local_rank) in enumerate(zip(placement_groups, local_dp_ranks)): rank = cur_data_parallel_size + i dp_vllm_config = copy.deepcopy(cur_vllm_config) + if new_data_parallel_size > 1: + _apply_dp_identity_suffix(dp_vllm_config, rank) dp_vllm_config.parallel_config.data_parallel_size = new_data_parallel_size dp_vllm_config.parallel_config.placement_group = pg