diff --git a/tests/v1/executor/test_executor.py b/tests/v1/executor/test_executor.py index e9f635378e57..494e8aa67dd8 100644 --- a/tests/v1/executor/test_executor.py +++ b/tests/v1/executor/test_executor.py @@ -14,12 +14,35 @@ from vllm.sampling_params import SamplingParams from vllm.v1.engine.async_llm import AsyncLLM from vllm.v1.engine.llm_engine import LLMEngine +from vllm.v1.executor.abstract import Executor from vllm.v1.executor.multiproc_executor import MultiprocExecutor +from vllm.v1.executor.uniproc_executor import ( + ExecutorWithExternalLauncher, + UniProcExecutor, +) class Mock: ... +def test_supports_async_scheduling_base_executor(): + assert Executor.supports_async_scheduling() is False + + +def test_supports_async_scheduling_uniproc_executor(): + assert UniProcExecutor.supports_async_scheduling() is True + + +def test_supports_async_scheduling_executor_with_external_launcher(): + # ExecutorWithExternalLauncher inherits from UniProcExecutor and does not + # override supports_async_scheduling, so it should return True. + assert ExecutorWithExternalLauncher.supports_async_scheduling() is True + + +def test_supports_async_scheduling_multiproc_executor(): + assert MultiprocExecutor.supports_async_scheduling() is True + + class CustomMultiprocExecutor(MultiprocExecutor): def collective_rpc( self, diff --git a/vllm/config/vllm.py b/vllm/config/vllm.py index 8cd114481053..948335d6cd61 100644 --- a/vllm/config/vllm.py +++ b/vllm/config/vllm.py @@ -682,12 +682,11 @@ def __post_init__(self): self.model_config, self.load_config ) + from vllm.v1.executor.abstract import Executor + executor_backend = self.parallel_config.distributed_executor_backend - executor_supports_async_sched = executor_backend in ( - "mp", - "uni", - "external_launcher", - ) + executor_class = Executor.get_class(self) + executor_supports_async_sched = executor_class.supports_async_scheduling() if self.scheduler_config.async_scheduling: # Async scheduling explicitly enabled, hard fail any incompatibilities. @@ -711,9 +710,7 @@ def __post_init__(self): ) if not executor_supports_async_sched: raise ValueError( - "Currently, async scheduling only supports `mp`, `uni`, or " - "`external_launcher` distributed executor backend, but you chose " - f"`{executor_backend}`." + f"`{executor_backend}` does not support async scheduling yet." ) elif self.scheduler_config.async_scheduling is None: # Enable async scheduling unless there is an incompatible option. @@ -742,8 +739,7 @@ def __post_init__(self): elif not executor_supports_async_sched: logger.warning_once( "Async scheduling will be disabled because it is not supported " - "with the `%s` distributed executor backend (only `mp`, `uni`, and " - "`external_launcher` are supported).", + "with the `%s` distributed executor backend. ", executor_backend, scope="local", ) diff --git a/vllm/v1/executor/abstract.py b/vllm/v1/executor/abstract.py index 8e7c48054554..2c3538d9ac26 100644 --- a/vllm/v1/executor/abstract.py +++ b/vllm/v1/executor/abstract.py @@ -353,6 +353,13 @@ def reinitialize_distributed( ) -> None: raise NotImplementedError + @classmethod + def supports_async_scheduling(cls) -> bool: + """ + Whether the executor supports async scheduling. + """ + return False + from vllm.v1.executor.uniproc_executor import ( # noqa: E402 ExecutorWithExternalLauncher as _ExecutorWithExternalLauncher, diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index 95336034caf7..ab543e2e53b9 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -487,6 +487,10 @@ def _get_output_rank(self) -> int: * self.parallel_config.prefill_context_parallel_size ) + @classmethod + def supports_async_scheduling(cls) -> bool: + return True + @dataclass class UnreadyWorkerProcHandle: diff --git a/vllm/v1/executor/uniproc_executor.py b/vllm/v1/executor/uniproc_executor.py index 2ae9821199ed..e90a1ab23915 100644 --- a/vllm/v1/executor/uniproc_executor.py +++ b/vllm/v1/executor/uniproc_executor.py @@ -134,6 +134,10 @@ def shutdown(self) -> None: if worker := self.driver_worker: worker.shutdown() + @classmethod + def supports_async_scheduling(cls) -> bool: + return True + class ExecutorWithExternalLauncher(UniProcExecutor): """An executor that uses external launchers to launch engines,