Skip to content
13 changes: 13 additions & 0 deletions tests/v1/engine/test_core_engine_actor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
from pathlib import Path
from types import SimpleNamespace
from typing import Any
from unittest.mock import Mock

import pytest
import ray
import zmq

from vllm.utils.network_utils import make_zmq_socket, split_zmq_path
from vllm.v1.engine.core import EngineCoreActorMixin
from vllm.v1.engine.core_client import BackgroundResources
from vllm.v1.engine.utils import (
CoreEngineActorManager,
EngineZmqAddresses,
Expand Down Expand Up @@ -99,6 +101,17 @@ class _DummyExecutor:
pass


def test_background_resources_passes_worker_shutdown_timeout(
monkeypatch: pytest.MonkeyPatch,
) -> None:
timeout = 7
monkeypatch.setenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", str(timeout))
engine_manager = Mock()
resources = BackgroundResources(ctx=None, engine_manager=engine_manager)
resources()
engine_manager.shutdown.assert_called_once_with(timeout=timeout)


def _make_vllm_config() -> SimpleNamespace:
return SimpleNamespace(
parallel_config=SimpleNamespace(
Expand Down
45 changes: 45 additions & 0 deletions tests/v1/executor/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from vllm.sampling_params import SamplingParams
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.engine.llm_engine import LLMEngine
from vllm.v1.executor import multiproc_executor as multiproc_executor_module
from vllm.v1.executor.abstract import Executor
from vllm.v1.executor.multiproc_executor import MultiprocExecutor
from vllm.v1.executor.uniproc_executor import (
Expand Down Expand Up @@ -43,6 +44,50 @@ def test_supports_async_scheduling_multiproc_executor():
assert MultiprocExecutor.supports_async_scheduling() is True


class _FakeClock:
def __init__(self) -> None:
self.now = 0.0

def time(self) -> float:
return self.now

def sleep(self, seconds: float) -> None:
self.now += seconds


class _FakeProcess:
def __init__(self, clock: _FakeClock, exits_at: float) -> None:
self.clock = clock
self.exits_at = exits_at
self.terminate_called = False

def is_alive(self) -> bool:
return self.clock.time() < self.exits_at

def terminate(self) -> None:
self.terminate_called = True


@pytest.mark.parametrize(
("timeout", "exits_at", "expected_terminate"),
[
pytest.param(6, 5, False, id="worker-exits-before-timeout"),
pytest.param(6, 7, True, id="worker-exceeds-timeout"),
],
)
def test_multiproc_executor_worker_termination_timeout(
monkeypatch, timeout, exits_at, expected_terminate
):
monkeypatch.setenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", str(timeout))
clock = _FakeClock()
monkeypatch.setattr(multiproc_executor_module.time, "time", clock.time)
monkeypatch.setattr(multiproc_executor_module.time, "sleep", clock.sleep)
executor = MultiprocExecutor.__new__(MultiprocExecutor)
proc = _FakeProcess(clock, exits_at=exits_at)
executor._ensure_worker_termination([proc])
assert proc.terminate_called is expected_terminate


class CustomMultiprocExecutor(MultiprocExecutor):
def collective_rpc(
self,
Expand Down
7 changes: 7 additions & 0 deletions vllm/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1
VLLM_MQ_MAX_CHUNK_BYTES_MB: int = 16
VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS: int = 300
VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS: int = 4
VLLM_KV_CACHE_LAYOUT: Literal["NHD", "HND"] | None = None
VLLM_SSM_CONV_STATE_LAYOUT: Literal["SD", "DS"] | None = None
VLLM_COMPUTE_NANS_IN_LOGITS: bool = False
Expand Down Expand Up @@ -1639,6 +1640,11 @@ def _resolve_rust_frontend_path() -> str | None:
"VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS": lambda: int(
os.getenv("VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS", "300")
),
# Timeout in seconds for worker process shutdown (only applies when TP >
# 1).
"VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS": lambda: int(
os.getenv("VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS", "4")
),
# KV Cache layout used throughout vllm.
# Some common values are:
# - NHD
Expand Down Expand Up @@ -2122,6 +2128,7 @@ def compile_factors() -> dict[str, object]:
"VLLM_ENGINE_ITERATION_TIMEOUT_S",
"VLLM_HTTP_TIMEOUT_KEEP_ALIVE",
"VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS",
"VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS",
"VLLM_KEEP_ALIVE_ON_ENGINE_DEATH",
"VLLM_IMAGE_FETCH_TIMEOUT",
"VLLM_VIDEO_FETCH_TIMEOUT",
Expand Down
5 changes: 4 additions & 1 deletion vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import zmq
import zmq.asyncio

from vllm import envs
from vllm.config import VllmConfig
from vllm.envs import VLLM_ENGINE_READY_TIMEOUT_S
from vllm.logger import init_logger
Expand Down Expand Up @@ -394,7 +395,9 @@ def __call__(self):
logger.debug_once("[shutdown] MPClient: background resource cleanup start")
self.engine_dead = True
if self.engine_manager is not None:
self.engine_manager.shutdown()
self.engine_manager.shutdown(
timeout=envs.VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS
)
if self.coordinator is not None:
self.coordinator.shutdown()

Expand Down
4 changes: 3 additions & 1 deletion vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,9 @@ def wait_for_termination(procs, timeout):
"[shutdown] Executor: waiting for worker exit count=%d",
initial_count,
)
if wait_for_termination(active_procs(), 4):
if wait_for_termination(
active_procs(), timeout=envs.VLLM_WORKER_SHUTDOWN_TIMEOUT_SECONDS
):
logger.info_once("[shutdown] Executor: all workers exited gracefully")
return

Expand Down
Loading