diff --git a/tests/v1/engine/test_core_engine_actor_manager.py b/tests/v1/engine/test_core_engine_actor_manager.py index f60f8c94e7e2..a986bc07a3e8 100644 --- a/tests/v1/engine/test_core_engine_actor_manager.py +++ b/tests/v1/engine/test_core_engine_actor_manager.py @@ -8,6 +8,7 @@ from pathlib import Path from types import SimpleNamespace from typing import Any +from unittest.mock import Mock import pytest import ray @@ -15,6 +16,7 @@ from vllm.utils.network_utils import make_zmq_socket, split_zmq_path from vllm.v1.engine.core import EngineCoreActorMixin +from vllm.v1.engine.core_client import BackgroundResources from vllm.v1.engine.utils import ( CoreEngineActorManager, EngineZmqAddresses, @@ -99,6 +101,17 @@ class _DummyExecutor: pass +def test_background_resources_passes_worker_shutdown_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + timeout = 7 + monkeypatch.setenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", str(timeout)) + engine_manager = Mock() + resources = BackgroundResources(ctx=None, engine_manager=engine_manager) + resources() + engine_manager.shutdown.assert_called_once_with(timeout=timeout) + + def _make_vllm_config() -> SimpleNamespace: return SimpleNamespace( parallel_config=SimpleNamespace( diff --git a/tests/v1/executor/test_executor.py b/tests/v1/executor/test_executor.py index 494e8aa67dd8..c529c3204d50 100644 --- a/tests/v1/executor/test_executor.py +++ b/tests/v1/executor/test_executor.py @@ -14,6 +14,7 @@ from vllm.sampling_params import SamplingParams from vllm.v1.engine.async_llm import AsyncLLM from vllm.v1.engine.llm_engine import LLMEngine +from vllm.v1.executor import multiproc_executor as multiproc_executor_module from vllm.v1.executor.abstract import Executor from vllm.v1.executor.multiproc_executor import MultiprocExecutor from vllm.v1.executor.uniproc_executor import ( @@ -43,6 +44,50 @@ def test_supports_async_scheduling_multiproc_executor(): assert MultiprocExecutor.supports_async_scheduling() is True +class _FakeClock: + def __init__(self) -> None: + self.now = 0.0 + + def time(self) -> float: + return self.now + + def sleep(self, seconds: float) -> None: + self.now += seconds + + +class _FakeProcess: + def __init__(self, clock: _FakeClock, exits_at: float) -> None: + self.clock = clock + self.exits_at = exits_at + self.terminate_called = False + + def is_alive(self) -> bool: + return self.clock.time() < self.exits_at + + def terminate(self) -> None: + self.terminate_called = True + + +@pytest.mark.parametrize( + ("timeout", "exits_at", "expected_terminate"), + [ + pytest.param(6, 5, False, id="worker-exits-before-timeout"), + pytest.param(6, 7, True, id="worker-exceeds-timeout"), + ], +) +def test_multiproc_executor_worker_termination_timeout( + monkeypatch, timeout, exits_at, expected_terminate +): + monkeypatch.setenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", str(timeout)) + clock = _FakeClock() + monkeypatch.setattr(multiproc_executor_module.time, "time", clock.time) + monkeypatch.setattr(multiproc_executor_module.time, "sleep", clock.sleep) + executor = MultiprocExecutor.__new__(MultiprocExecutor) + proc = _FakeProcess(clock, exits_at=exits_at) + executor._ensure_worker_termination([proc]) + assert proc.terminate_called is expected_terminate + + class CustomMultiprocExecutor(MultiprocExecutor): def collective_rpc( self, diff --git a/vllm/envs.py b/vllm/envs.py index a32f055a0283..250d636c4131 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -207,6 +207,7 @@ VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1 VLLM_MQ_MAX_CHUNK_BYTES_MB: int = 16 VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS: int = 300 + VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS: int = 4 VLLM_KV_CACHE_LAYOUT: Literal["NHD", "HND"] | None = None VLLM_SSM_CONV_STATE_LAYOUT: Literal["SD", "DS"] | None = None VLLM_COMPUTE_NANS_IN_LOGITS: bool = False @@ -1639,6 +1640,11 @@ def _resolve_rust_frontend_path() -> str | None: "VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS": lambda: int( os.getenv("VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS", "300") ), + # Timeout in seconds for worker process shutdown (only applies when TP > + # 1). + "VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS": lambda: int( + os.getenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", "4") + ), # KV Cache layout used throughout vllm. # Some common values are: # - NHD @@ -2122,6 +2128,7 @@ def compile_factors() -> dict[str, object]: "VLLM_ENGINE_ITERATION_TIMEOUT_S", "VLLM_HTTP_TIMEOUT_KEEP_ALIVE", "VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS", + "VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", "VLLM_KEEP_ALIVE_ON_ENGINE_DEATH", "VLLM_IMAGE_FETCH_TIMEOUT", "VLLM_VIDEO_FETCH_TIMEOUT", diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 32f2d091eb30..aa083dd3502b 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -20,6 +20,7 @@ import zmq import zmq.asyncio +from vllm import envs from vllm.config import VllmConfig from vllm.envs import VLLM_ENGINE_READY_TIMEOUT_S from vllm.logger import init_logger @@ -394,7 +395,9 @@ def __call__(self): logger.debug_once("[shutdown] MPClient: background resource cleanup start") self.engine_dead = True if self.engine_manager is not None: - self.engine_manager.shutdown() + self.engine_manager.shutdown( + timeout=envs.VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS + ) if self.coordinator is not None: self.coordinator.shutdown() diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 66564bebdb67..b0100c3d66ae 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -429,7 +429,9 @@ def wait_for_termination(procs, timeout): "[shutdown] Executor: waiting for worker exit count=%d", initial_count, ) - if wait_for_termination(active_procs(), 4): + if wait_for_termination( + active_procs(), timeout=envs.VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS + ): logger.info_once("[shutdown] Executor: all workers exited gracefully") return