diff --git a/tests/v1/engine/test_abort_drain.py b/tests/v1/engine/test_abort_drain.py new file mode 100644 index 000000000000..6beaf883dc30 --- /dev/null +++ b/tests/v1/engine/test_abort_drain.py @@ -0,0 +1,116 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Unit tests for abort-and-drain shutdown behavior in EngineCoreProc. + +These tests validate that: +1. In-flight requests are aborted during graceful shutdown. +2. Abort outputs are sent to clients before the engine core exits. +3. The output thread is drained (joined) before shutdown completes. +""" + +import queue +import threading +from unittest.mock import MagicMock + +import pytest + +from vllm.v1.engine import EngineCoreOutputs, FinishReason +from vllm.v1.engine.core import EngineCoreProc +from vllm.v1.request import RequestStatus + + +@pytest.fixture +def mock_engine_core(): + """Create a minimal mock of EngineCoreProc for testing shutdown.""" + engine = MagicMock(spec=EngineCoreProc) + engine.output_queue = queue.Queue() + + # Use real methods for the ones we're testing. + engine._abort_and_drain_outputs = EngineCoreProc._abort_and_drain_outputs.__get__( + engine + ) + engine._send_abort_outputs = EngineCoreProc._send_abort_outputs.__get__(engine) + + return engine + + +class TestAbortAndDrainOutputs: + def test_aborts_all_in_flight_requests(self, mock_engine_core): + """Verify all in-flight requests are aborted via scheduler.""" + mock_engine_core.scheduler.finish_requests.return_value = [ + ("req-1", 0), + ("req-2", 0), + ] + # No output thread. + del mock_engine_core.output_thread + + mock_engine_core._abort_and_drain_outputs() + + mock_engine_core.scheduler.finish_requests.assert_called_once_with( + None, RequestStatus.FINISHED_ABORTED + ) + + def test_sends_abort_outputs_to_clients(self, mock_engine_core): + """Verify abort outputs are placed in the output queue.""" + mock_engine_core.scheduler.finish_requests.return_value = [ + ("req-1", 0), + ("req-2", 1), + ] + del mock_engine_core.output_thread + + mock_engine_core._abort_and_drain_outputs() + + # Should have 2 output entries (one per client_index). + items = [] + while not mock_engine_core.output_queue.empty(): + items.append(mock_engine_core.output_queue.get_nowait()) + + # Filter out ENGINE_CORE_DEAD sentinel if present. + output_items = [i for i in items if not isinstance(i, bytes)] + assert len(output_items) == 2 + + # Check that abort outputs have the right finish reason. + for client_index, eco in output_items: + assert isinstance(eco, EngineCoreOutputs) + for output in eco.outputs: + assert output.finish_reason == FinishReason.ABORT + + def test_drains_output_thread(self, mock_engine_core): + """Verify output thread is signaled and joined.""" + mock_engine_core.scheduler.finish_requests.return_value = [] + mock_thread = MagicMock(spec=threading.Thread) + mock_thread.is_alive.return_value = False + mock_engine_core.output_thread = mock_thread + + mock_engine_core._abort_and_drain_outputs() + + # ENGINE_CORE_DEAD sentinel should be in the queue. + sentinel = mock_engine_core.output_queue.get_nowait() + assert sentinel == EngineCoreProc.ENGINE_CORE_DEAD + + # Output thread should be joined. + mock_thread.join.assert_called_once_with(timeout=5.0) + + def test_no_output_thread_still_works(self, mock_engine_core): + """Verify graceful handling when output_thread doesn't exist.""" + mock_engine_core.scheduler.finish_requests.return_value = [ + ("req-1", 0), + ] + del mock_engine_core.output_thread + + # Should not raise. + mock_engine_core._abort_and_drain_outputs() + + def test_no_requests_to_abort(self, mock_engine_core): + """Verify clean shutdown when no requests are in flight.""" + mock_engine_core.scheduler.finish_requests.return_value = [] + mock_thread = MagicMock(spec=threading.Thread) + mock_thread.is_alive.return_value = False + mock_engine_core.output_thread = mock_thread + + mock_engine_core._abort_and_drain_outputs() + + # Should still signal the output thread to drain. + sentinel = mock_engine_core.output_queue.get_nowait() + assert sentinel == EngineCoreProc.ENGINE_CORE_DEAD + mock_thread.join.assert_called_once_with(timeout=5.0) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 11f24cb1990a..d7ea8f661826 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1082,6 +1082,8 @@ def signal_handler(signum, frame): except SystemExit: logger.debug("EngineCore exiting.") + if engine_core is not None: + engine_core._abort_and_drain_outputs() raise except Exception as e: if engine_core is None: @@ -1235,6 +1237,32 @@ def _convert_msgspec_args(method, args): for v, p in zip(args, arg_types) ) + def _abort_and_drain_outputs(self): + """Abort all in-flight requests and drain the output queue. + + Called during graceful shutdown (SIGTERM/SIGINT) to ensure clients + receive abort responses before the engine core process exits. + """ + aborted_reqs = self.scheduler.finish_requests( + None, RequestStatus.FINISHED_ABORTED + ) + if aborted_reqs: + logger.info( + "Aborting %d in-flight request(s) during shutdown.", + len(aborted_reqs), + ) + self._send_abort_outputs(aborted_reqs) + + if not hasattr(self, "output_thread"): + return + + # Signal the output thread to exit and wait for it to flush + # all pending messages (including the abort outputs above). + self.output_queue.put_nowait(EngineCoreProc.ENGINE_CORE_DEAD) + self.output_thread.join(timeout=5.0) + if self.output_thread.is_alive(): + logger.warning("Output thread did not drain within 5 s during shutdown.") + def _send_engine_dead(self): """Send EngineDead status to the EngineCoreClient."""