Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions tests/v1/engine/test_abort_drain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
"""Unit tests for abort-and-drain shutdown behavior in EngineCoreProc.

These tests validate that:
1. In-flight requests are aborted during graceful shutdown.
2. Abort outputs are sent to clients before the engine core exits.
3. The output thread is drained (joined) before shutdown completes.
"""

import queue
import threading
from unittest.mock import MagicMock

import pytest

from vllm.v1.engine import EngineCoreOutputs, FinishReason
from vllm.v1.engine.core import EngineCoreProc
from vllm.v1.request import RequestStatus


@pytest.fixture
def mock_engine_core():
"""Create a minimal mock of EngineCoreProc for testing shutdown."""
engine = MagicMock(spec=EngineCoreProc)
engine.output_queue = queue.Queue()

# Use real methods for the ones we're testing.
engine._abort_and_drain_outputs = EngineCoreProc._abort_and_drain_outputs.__get__(
engine
)
engine._send_abort_outputs = EngineCoreProc._send_abort_outputs.__get__(engine)

return engine


class TestAbortAndDrainOutputs:
def test_aborts_all_in_flight_requests(self, mock_engine_core):
"""Verify all in-flight requests are aborted via scheduler."""
mock_engine_core.scheduler.finish_requests.return_value = [
("req-1", 0),
("req-2", 0),
]
# No output thread.
del mock_engine_core.output_thread

mock_engine_core._abort_and_drain_outputs()

mock_engine_core.scheduler.finish_requests.assert_called_once_with(
None, RequestStatus.FINISHED_ABORTED
)

def test_sends_abort_outputs_to_clients(self, mock_engine_core):
"""Verify abort outputs are placed in the output queue."""
mock_engine_core.scheduler.finish_requests.return_value = [
("req-1", 0),
("req-2", 1),
]
del mock_engine_core.output_thread

mock_engine_core._abort_and_drain_outputs()

# Should have 2 output entries (one per client_index).
items = []
while not mock_engine_core.output_queue.empty():
items.append(mock_engine_core.output_queue.get_nowait())

# Filter out ENGINE_CORE_DEAD sentinel if present.
output_items = [i for i in items if not isinstance(i, bytes)]
assert len(output_items) == 2

# Check that abort outputs have the right finish reason.
for client_index, eco in output_items:
assert isinstance(eco, EngineCoreOutputs)
for output in eco.outputs:
assert output.finish_reason == FinishReason.ABORT

def test_drains_output_thread(self, mock_engine_core):
"""Verify output thread is signaled and joined."""
mock_engine_core.scheduler.finish_requests.return_value = []
mock_thread = MagicMock(spec=threading.Thread)
mock_thread.is_alive.return_value = False
mock_engine_core.output_thread = mock_thread

mock_engine_core._abort_and_drain_outputs()

# ENGINE_CORE_DEAD sentinel should be in the queue.
sentinel = mock_engine_core.output_queue.get_nowait()
assert sentinel == EngineCoreProc.ENGINE_CORE_DEAD

# Output thread should be joined.
mock_thread.join.assert_called_once_with(timeout=5.0)

def test_no_output_thread_still_works(self, mock_engine_core):
"""Verify graceful handling when output_thread doesn't exist."""
mock_engine_core.scheduler.finish_requests.return_value = [
("req-1", 0),
]
del mock_engine_core.output_thread

# Should not raise.
mock_engine_core._abort_and_drain_outputs()

def test_no_requests_to_abort(self, mock_engine_core):
"""Verify clean shutdown when no requests are in flight."""
mock_engine_core.scheduler.finish_requests.return_value = []
mock_thread = MagicMock(spec=threading.Thread)
mock_thread.is_alive.return_value = False
mock_engine_core.output_thread = mock_thread

mock_engine_core._abort_and_drain_outputs()

# Should still signal the output thread to drain.
sentinel = mock_engine_core.output_queue.get_nowait()
assert sentinel == EngineCoreProc.ENGINE_CORE_DEAD
mock_thread.join.assert_called_once_with(timeout=5.0)
28 changes: 28 additions & 0 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,8 @@ def signal_handler(signum, frame):

except SystemExit:
logger.debug("EngineCore exiting.")
if engine_core is not None:
engine_core._abort_and_drain_outputs()
raise
except Exception as e:
if engine_core is None:
Expand Down Expand Up @@ -1235,6 +1237,32 @@ def _convert_msgspec_args(method, args):
for v, p in zip(args, arg_types)
)

def _abort_and_drain_outputs(self):
"""Abort all in-flight requests and drain the output queue.

Called during graceful shutdown (SIGTERM/SIGINT) to ensure clients
receive abort responses before the engine core process exits.
"""
aborted_reqs = self.scheduler.finish_requests(
None, RequestStatus.FINISHED_ABORTED
)
if aborted_reqs:
logger.info(
"Aborting %d in-flight request(s) during shutdown.",
len(aborted_reqs),
)
self._send_abort_outputs(aborted_reqs)

if not hasattr(self, "output_thread"):
return

# Signal the output thread to exit and wait for it to flush
# all pending messages (including the abort outputs above).
self.output_queue.put_nowait(EngineCoreProc.ENGINE_CORE_DEAD)
self.output_thread.join(timeout=5.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The 5-second timeout for draining the output thread is hardcoded. In scenarios with high load or a slow network, this might not be enough time to send all pending messages, including the abort responses. This could result in some clients not receiving an abort notification, which is the problem this PR aims to solve. This same hardcoded value is also used in _send_engine_dead.

To improve robustness and maintainability, this timeout should be extracted into a constant. A future improvement could be to make this value configurable, for example, via an environment variable.

if self.output_thread.is_alive():
logger.warning("Output thread did not drain within 5 s during shutdown.")

def _send_engine_dead(self):
"""Send EngineDead status to the EngineCoreClient."""

Expand Down
Loading