From 881b06feb60faaa5edeb4f672419ac2e0b962d0d Mon Sep 17 00:00:00 2001 From: ahao-anyscale Date: Wed, 1 Apr 2026 22:04:29 +0000 Subject: [PATCH 1/2] x Signed-off-by: ahao-anyscale --- vllm/v1/engine/coordinator.py | 8 ++------ vllm/v1/engine/core.py | 8 ++++---- vllm/v1/engine/core_client.py | 4 ---- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/vllm/v1/engine/coordinator.py b/vllm/v1/engine/coordinator.py index 8ebf976c5fa1..3f8abc3379ef 100644 --- a/vllm/v1/engine/coordinator.py +++ b/vllm/v1/engine/coordinator.py @@ -41,12 +41,8 @@ class DPCoordinator: DPEngineCoreProc._has_global_unfinished_reqs method. * Broadcasts the START_DP_WAVE message to engines to move them from paused - to running state when one engine receives a new request. This can happen - in two cases: - 1) A front-end sending a new request while the engines are paused will - concurrently notify the coordinator. - 2) An engine receiving a request for a stale request wave while in paused - state will notify the coordinator. + to running state when one engine receives a new request while idle and + unpaused. The receiving engine notifies the coordinator. Engines will move into running state when receiving a new request or START_DP_WAVE message. diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 0fa59579ee76..f52f7b1ecee8 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1639,16 +1639,16 @@ def shutdown(self): def add_request(self, request: Request, request_wave: int = 0): super().add_request(request, request_wave) - if self.has_coordinator and request_wave != self.current_wave: + if self.has_coordinator: if request_wave > self.current_wave: self.current_wave = request_wave - elif ( + if ( not self.engines_running and self.scheduler.pause_state == PauseState.UNPAUSED ): self.engines_running = True - # Request received for an already-completed wave, notify - # front-end that we need to start the next one. + # An unpaused idle engine received a request; notify the + # coordinator to wake other DP engines for this wave. self.output_queue.put_nowait( (-1, EngineCoreOutputs(start_wave=self.current_wave)) ) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 1d73c12eda29..2eadbcee8e32 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1273,10 +1273,6 @@ async def add_request_async(self, request: EngineCoreRequest) -> None: chosen_engine = self.get_core_engine_for_request(request) to_await = self._send_input(EngineCoreRequestType.ADD, request, chosen_engine) - if not self.engines_running: - # Notify coordinator that we're sending a request - req_msg = msgspec.msgpack.encode(("FIRST_REQ", chosen_engine)) - await self.first_req_send_socket.send(req_msg) await to_await From 3ee833ba4f4e72e5f69cdc6deb7714760243641b Mon Sep 17 00:00:00 2001 From: ahao-anyscale Date: Thu, 2 Apr 2026 07:24:56 +0000 Subject: [PATCH 2/2] x Signed-off-by: ahao-anyscale --- tests/v1/distributed/test_async_llm_dp.py | 121 ++++++++++++++++++++++ vllm/v1/engine/core.py | 6 ++ 2 files changed, 127 insertions(+) diff --git a/tests/v1/distributed/test_async_llm_dp.py b/tests/v1/distributed/test_async_llm_dp.py index 1b7739d2f071..16f7a5ccd758 100644 --- a/tests/v1/distributed/test_async_llm_dp.py +++ b/tests/v1/distributed/test_async_llm_dp.py @@ -398,3 +398,124 @@ async def consume_gen(req_id: str) -> None: assert not await engine.is_paused() # Let the two requests we sent mid-pause complete await asyncio.gather(*mid_pause_tasks) + + +@pytest.mark.asyncio +async def test_dp_pause_barrier_request_deadlock(): + """Reproduce the cross-group deadlock fixed by moving wave-start + decisions from the frontend to the engine core. + + Old (buggy) code: the frontend sends FIRST_REQ to the coordinator + whenever add_request_async is called with engines_running=False, + regardless of scheduler pause state. The coordinator then broadcasts + START_DP_WAVE, which wakes engine 1 into execute_dummy_batch() (EP + all-to-all) while engine 0 is blocked in a dist.barrier(dp_group). + Neither engine can participate in the other's collective — deadlock. + + Fixed code: the engine core only sends start_wave when the scheduler + is UNPAUSED, so no START_DP_WAVE is broadcast while paused. Engine 1 + stays idle and processes the barrier normally when it arrives. + + Sequence: + 1. Pause all engines (PAUSED_ALL). + 2. Send barrier to engine 0 only — blocks in dist.barrier(dp_group). + 3. Send a request routed to engine 1. + 4. Wait for any (buggy) START_DP_WAVE propagation. + 5. Send barrier to engine 1 — completes in fixed code, deadlocks + in buggy code because engine 1 is stuck in EP all-to-all. + """ + if DP_SIZE != 2: + pytest.skip("requires DP_SIZE=2") + + with ExitStack() as after: + engine_args = _get_dp_pause_engine_args(expert_parallel=True) + engine = AsyncLLM.from_engine_args(engine_args) + after.callback(engine.shutdown) + + client = engine.engine_core + + # Cache get_supported_tasks so that generate() won't need to + # send a utility call to all engines (which would hang once + # engine 0 is blocked in the barrier). + await engine.get_supported_tasks() + + # Pause all engines normally — no staggering. + await engine.pause_generation(mode="keep") + assert await engine.is_paused() + + original_call_utility = client.call_utility_async + mid_barrier_tasks: list[asyncio.Task] = [] + + async def staggered_barrier(method: str, *args) -> Any: + if method != "barrier": + return await original_call_utility(method, *args) + + # Send barrier to engine 0 only — it blocks in + # dist.barrier(dp_group) waiting for engine 1. + barrier_0 = asyncio.create_task( + client._call_utility_async(method, *args, engine=client.core_engines[0]) + ) + await asyncio.sleep(1) + + # While engine 0 is blocked, send a request routed + # specifically to engine 1. + sp = SamplingParams(max_tokens=5, ignore_eos=True) + + engine_1 = client.core_engines[1] + original_get_engine = client.get_core_engine_for_request + + def route_to_engine_1(req): + client.reqs_in_flight[req.request_id] = engine_1 + return engine_1 + + client.get_core_engine_for_request = route_to_engine_1 + + async def consume_gen(req_id: str) -> None: + async for _ in engine.generate( + request_id=req_id, + prompt=DP_PAUSE_PROMPT, + sampling_params=sp, + ): + pass + + t1 = asyncio.create_task(consume_gen("race-1")) + mid_barrier_tasks.append(t1) + + # Yield so generate() preprocessing completes and + # add_request_async is called (which, in buggy code, + # would send FIRST_REQ and wake engine 1). + for _ in range(200): + await asyncio.sleep(0) + + client.get_core_engine_for_request = original_get_engine + + # Wait for any START_DP_WAVE to propagate and for + # engine 1 to potentially enter execute_dummy_batch. + await asyncio.sleep(5) + + # Now send barrier to engine 1. In buggy code engine 1 + # is stuck in execute_dummy_batch (EP all-to-all) while + # engine 0 is stuck in dist.barrier(dp_group) — deadlock. + result = await client._call_utility_async( + method, *args, engine=client.core_engines[1] + ) + await barrier_0 + return result + + client.call_utility_async = staggered_barrier + + # Drive the staggered barrier. Old code deadlocks here. + try: + await asyncio.wait_for(client.call_utility_async("barrier"), timeout=30) + except asyncio.TimeoutError: + for t in mid_barrier_tasks: + t.cancel() + pytest.fail( + "Staggered barrier deadlocked — FIRST_REQ sent while " + "paused caused collective-op mismatch between engines" + ) + + await engine.resume_generation() + assert not await engine.is_paused() + # Let the two requests we sent mid-barrier complete. + await asyncio.gather(*mid_barrier_tasks) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index f52f7b1ecee8..90c87e07bd30 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1665,6 +1665,12 @@ def resume_scheduler(self): (-1, EngineCoreOutputs(start_wave=self.current_wave)) ) + def barrier(self): + """Blocking barrier on the DP process group (test-only utility).""" + import torch.distributed as dist + + dist.barrier(group=self.dp_group) + def _handle_client_request( self, request_type: EngineCoreRequestType, request: Any ) -> None: