Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions tests/v1/executor/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,35 @@
from vllm.sampling_params import SamplingParams
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.engine.llm_engine import LLMEngine
from vllm.v1.executor.abstract import Executor
from vllm.v1.executor.multiproc_executor import MultiprocExecutor
from vllm.v1.executor.uniproc_executor import (
ExecutorWithExternalLauncher,
UniProcExecutor,
)


class Mock: ...


def test_supports_async_scheduling_base_executor():
assert Executor.supports_async_scheduling() is False


def test_supports_async_scheduling_uniproc_executor():
assert UniProcExecutor.supports_async_scheduling() is True


def test_supports_async_scheduling_executor_with_external_launcher():
# ExecutorWithExternalLauncher inherits from UniProcExecutor and does not
# override supports_async_scheduling, so it should return True.
assert ExecutorWithExternalLauncher.supports_async_scheduling() is True


def test_supports_async_scheduling_multiproc_executor():
assert MultiprocExecutor.supports_async_scheduling() is True


class CustomMultiprocExecutor(MultiprocExecutor):
def collective_rpc(
self,
Expand Down
16 changes: 6 additions & 10 deletions vllm/config/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,12 +682,11 @@ def __post_init__(self):
self.model_config, self.load_config
)

from vllm.v1.executor.abstract import Executor

executor_backend = self.parallel_config.distributed_executor_backend
executor_supports_async_sched = executor_backend in (
"mp",
"uni",
"external_launcher",
)
executor_class = Executor.get_class(self)
executor_supports_async_sched = executor_class.supports_async_scheduling()

if self.scheduler_config.async_scheduling:
# Async scheduling explicitly enabled, hard fail any incompatibilities.
Expand All @@ -711,9 +710,7 @@ def __post_init__(self):
)
if not executor_supports_async_sched:
raise ValueError(
"Currently, async scheduling only supports `mp`, `uni`, or "
"`external_launcher` distributed executor backend, but you chose "
f"`{executor_backend}`."
f"`{executor_backend}` does not support async scheduling yet."
)
elif self.scheduler_config.async_scheduling is None:
# Enable async scheduling unless there is an incompatible option.
Expand Down Expand Up @@ -742,8 +739,7 @@ def __post_init__(self):
elif not executor_supports_async_sched:
logger.warning_once(
"Async scheduling will be disabled because it is not supported "
"with the `%s` distributed executor backend (only `mp`, `uni`, and "
"`external_launcher` are supported).",
"with the `%s` distributed executor backend. ",
executor_backend,
scope="local",
)
Expand Down
7 changes: 7 additions & 0 deletions vllm/v1/executor/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ def reinitialize_distributed(
) -> None:
raise NotImplementedError

@classmethod
def supports_async_scheduling(cls) -> bool:
"""
Whether the executor supports async scheduling.
"""
return False


from vllm.v1.executor.uniproc_executor import ( # noqa: E402
ExecutorWithExternalLauncher as _ExecutorWithExternalLauncher,
Expand Down
4 changes: 4 additions & 0 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ def _get_output_rank(self) -> int:
* self.parallel_config.prefill_context_parallel_size
)

@classmethod
def supports_async_scheduling(cls) -> bool:
return True


@dataclass
class UnreadyWorkerProcHandle:
Expand Down
4 changes: 4 additions & 0 deletions vllm/v1/executor/uniproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def shutdown(self) -> None:
if worker := self.driver_worker:
worker.shutdown()

@classmethod
def supports_async_scheduling(cls) -> bool:
return True


class ExecutorWithExternalLauncher(UniProcExecutor):
"""An executor that uses external launchers to launch engines,
Expand Down
Loading