From d3d54b7a0f56ed9792952df48dfee7298657dffc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Chud=C3=BD?= Date: Fri, 18 Jul 2025 12:54:27 +0200 Subject: [PATCH 1/3] Implement max_waiting_queue_length arguemnt MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Andrej Chudý Signed-off-by: Andrej Chudý --- tests/core/test_scheduler.py | 65 ++++++++++++++++++- vllm/config.py | 5 ++ vllm/core/scheduler.py | 10 +++ vllm/engine/arg_utils.py | 5 ++ vllm/engine/async_llm_engine.py | 9 ++- vllm/entrypoints/openai/serving_chat.py | 14 ++++ vllm/entrypoints/openai/serving_completion.py | 14 ++++ vllm/entrypoints/openai/serving_engine.py | 7 ++ 8 files changed, 127 insertions(+), 2 deletions(-) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index 591e1780c11c..86c836bd5a39 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -12,7 +12,7 @@ from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig from vllm.core.interfaces import AllocStatus -from vllm.core.scheduler import Scheduler, SchedulingBudget +from vllm.core.scheduler import Scheduler, SchedulingBudget, SchedulerWaitingQueueFullError from vllm.lora.request import LoRARequest from vllm.sequence import SequenceGroup, SequenceStatus @@ -71,6 +71,69 @@ def test_scheduler_abort_seq_group(): assert scheduler.get_num_unfinished_seq_groups() == 0 +def test_scheduler_max_waiting_queue_length(): + """Test that scheduler respects max_waiting_queue_length setting.""" + block_size = 4 + max_waiting_queue_length = 2 + scheduler_config = SchedulerConfig( + "generate", + max_num_batched_tokens=100, + max_num_seqs=64, + max_model_len=1, + max_waiting_queue_length=max_waiting_queue_length, + ) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 4 + cache_config.num_gpu_blocks = 4 + scheduler = Scheduler(scheduler_config, cache_config, None) + + # Add seq groups up to the limit + for i in range(max_waiting_queue_length): + _, seq_group = create_dummy_prompt(str(i), + block_size, + block_size=block_size) + scheduler.add_seq_group(seq_group) + assert scheduler.get_num_unfinished_seq_groups() == i + 1 + + # Adding one more should raise SchedulerWaitingQueueFullError + _, seq_group = create_dummy_prompt(str(max_waiting_queue_length), + block_size, + block_size=block_size) + with pytest.raises(SchedulerWaitingQueueFullError) as excinfo: + scheduler.add_seq_group(seq_group) + + assert "Scheduler waiting queue is full" in str(excinfo.value) + assert f"request {max_waiting_queue_length}" in str(excinfo.value) + + # Verify that the number of unfinished seq groups hasn't changed + assert scheduler.get_num_unfinished_seq_groups() == max_waiting_queue_length + + +def test_scheduler_max_waiting_queue_length_disabled(): + """Test that scheduler allows unlimited queue when max_waiting_queue_length is None.""" + block_size = 4 + scheduler_config = SchedulerConfig( + "generate", + max_num_batched_tokens=100, + max_num_seqs=64, + max_model_len=1, + max_waiting_queue_length=None, # No limit + ) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 4 + cache_config.num_gpu_blocks = 4 + scheduler = Scheduler(scheduler_config, cache_config, None) + + # Add many seq groups - should not raise an exception + num_seq_groups = 10 + for i in range(num_seq_groups): + _, seq_group = create_dummy_prompt(str(i), + block_size, + block_size=block_size) + scheduler.add_seq_group(seq_group) + assert scheduler.get_num_unfinished_seq_groups() == i + 1 + + def test_scheduler_schedule_simple(): block_size = 4 num_seq_group = 4 diff --git a/vllm/config.py b/vllm/config.py index f94c08c32536..983a3f9fe609 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -2292,6 +2292,11 @@ class SchedulerConfig: structured outputs, speculative decoding, and pipeline parallelism. """ + max_waiting_queue_length: Optional[int] = None + """Maximum number of requests that can be in the waiting queue. + When the queue reaches this limit, new requests will be rejected + with HTTP 503 error. If None, no limit is enforced.""" + def compute_hash(self) -> str: """ WARNING: Whenever a new field is added to this config, diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 0ef0396996b6..5e798e2ddcbd 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -24,6 +24,11 @@ logger = init_logger(__name__) + +class SchedulerWaitingQueueFullError(Exception): + """Raised when the scheduler waiting queue is full and cannot accept new requests.""" + pass + # Test-only. If configured, decode is preempted with # ARTIFICIAL_PREEMPTION_PROB% probability. ENABLE_ARTIFICIAL_PREEMPT = bool( @@ -551,6 +556,11 @@ def num_decoding_tokens_per_seq(self) -> int: def add_seq_group(self, seq_group: SequenceGroup) -> None: # Add sequence groups to the waiting queue. + if (self.scheduler_config.max_waiting_queue_length is not None and + len(self.waiting) >= self.scheduler_config.max_waiting_queue_length): + raise SchedulerWaitingQueueFullError( + f"Scheduler waiting queue is full. Cannot add request {seq_group.request_id}." + ) self.waiting.append(seq_group) def _add_seq_group_to_running(self, seq_group: SequenceGroup) -> None: diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index b20defde73ed..1cb0c345e0bf 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -441,6 +441,8 @@ class EngineArgs: async_scheduling: bool = SchedulerConfig.async_scheduling + max_waiting_queue_length: Optional[int] = SchedulerConfig.max_waiting_queue_length + def __post_init__(self): # support `EngineArgs(compilation_config={...})` # without having to manually construct a @@ -843,6 +845,8 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: **scheduler_kwargs["disable_hybrid_kv_cache_manager"]) scheduler_group.add_argument("--async-scheduling", **scheduler_kwargs["async_scheduling"]) + scheduler_group.add_argument("--max-waiting-queue-length", + **scheduler_kwargs["max_waiting_queue_length"]) # vLLM arguments vllm_kwargs = get_kwargs(VllmConfig) @@ -1231,6 +1235,7 @@ def create_engine_config( disable_hybrid_kv_cache_manager=self. disable_hybrid_kv_cache_manager, async_scheduling=self.async_scheduling, + max_waiting_queue_length=self.max_waiting_queue_length, ) if not model_config.is_multimodal_model and self.default_mm_loras: diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 3d7d28055dd0..4dc0ce226757 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -13,7 +13,7 @@ import vllm.envs as envs from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, VllmConfig) -from vllm.core.scheduler import SchedulerOutputs +from vllm.core.scheduler import SchedulerOutputs, SchedulerWaitingQueueFullError from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout from vllm.engine.llm_engine import LLMEngine, SchedulerOutputState @@ -750,6 +750,13 @@ async def engine_step(self, virtual_engine: int) -> bool: e, verbose=self.log_requests, ) + except SchedulerWaitingQueueFullError as e: + # Handle scheduler queue full error + self._request_tracker.process_exception( + new_request["request_id"], + e, + verbose=self.log_requests, + ) if aborted_requests: await self._engine_abort(aborted_requests) diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index a5eb16a53976..90399640c566 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -6,6 +6,7 @@ import time from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import Sequence as GenericSequence +from http import HTTPStatus from typing import Callable, Final, Optional, Union import jinja2 @@ -15,6 +16,7 @@ from pydantic import TypeAdapter from vllm.config import ModelConfig +from vllm.core.scheduler import SchedulerWaitingQueueFullError from vllm.engine.protocol import EngineClient from vllm.entrypoints.chat_utils import (ChatTemplateContentFormatOption, ConversationMessage, @@ -267,6 +269,12 @@ async def create_chat_completion( except ValueError as e: # TODO: Use a vllm-specific Validation Error return self.create_error_response(str(e)) + except SchedulerWaitingQueueFullError as e: + return self.create_error_response( + str(e), + err_type="ServiceUnavailableError", + status_code=HTTPStatus.SERVICE_UNAVAILABLE + ) assert len(generators) == 1 result_generator, = generators @@ -290,6 +298,12 @@ async def create_chat_completion( except ValueError as e: # TODO: Use a vllm-specific Validation Error return self.create_error_response(str(e)) + except SchedulerWaitingQueueFullError as e: + return self.create_error_response( + str(e), + err_type="ServiceUnavailableError", + status_code=HTTPStatus.SERVICE_UNAVAILABLE + ) def get_chat_request_role(self, request: ChatCompletionRequest) -> str: if request.add_generation_prompt: diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index 1e1f655022f0..c2101d94ba1b 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -5,6 +5,7 @@ import time from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import Sequence as GenericSequence +from http import HTTPStatus from typing import Optional, Union, cast import jinja2 @@ -12,6 +13,7 @@ from typing_extensions import assert_never from vllm.config import ModelConfig +from vllm.core.scheduler import SchedulerWaitingQueueFullError from vllm.engine.protocol import EngineClient from vllm.entrypoints.logger import RequestLogger # yapf conflicts with isort for this block @@ -230,6 +232,12 @@ async def create_completion( except ValueError as e: # TODO: Use a vllm-specific Validation Error return self.create_error_response(str(e)) + except SchedulerWaitingQueueFullError as e: + return self.create_error_response( + str(e), + err_type="ServiceUnavailableError", + status_code=HTTPStatus.SERVICE_UNAVAILABLE + ) result_generator = merge_async_iterators(*generators) @@ -294,6 +302,12 @@ async def create_completion( except ValueError as e: # TODO: Use a vllm-specific Validation Error return self.create_error_response(str(e)) + except SchedulerWaitingQueueFullError as e: + return self.create_error_response( + str(e), + err_type="ServiceUnavailableError", + status_code=HTTPStatus.SERVICE_UNAVAILABLE + ) # When user requests streaming but we don't stream, we still need to # return a streaming response with a single event. diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 462317a0878c..7a4f0ca9d046 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -25,6 +25,7 @@ import vllm.envs as envs from vllm.config import ModelConfig +from vllm.core.scheduler import SchedulerWaitingQueueFullError from vllm.engine.protocol import EngineClient # yapf conflicts with isort for this block # yapf: disable @@ -363,6 +364,12 @@ async def _prepare_generators( return None + except SchedulerWaitingQueueFullError as e: + return self.create_error_response( + str(e), + err_type="ServiceUnavailableError", + status_code=HTTPStatus.SERVICE_UNAVAILABLE + ) except Exception as e: # TODO: Use a vllm-specific Validation Error return self.create_error_response(str(e)) From ab4d9408a4a570e1702f59c1e74b5ba9d34e039d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Chud=C3=BD?= Date: Mon, 21 Jul 2025 01:52:44 +0200 Subject: [PATCH 2/3] Catch the error in the handler & pass the error object instead of a string MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Andrej Chudý --- tests/core/test_scheduler.py | 10 ++++--- vllm/core/scheduler.py | 6 ++-- vllm/engine/arg_utils.py | 8 +++-- vllm/engine/async_llm_engine.py | 3 +- vllm/entrypoints/openai/serving_chat.py | 30 +++++-------------- .../openai/serving_classification.py | 2 +- vllm/entrypoints/openai/serving_completion.py | 28 +++++------------ vllm/entrypoints/openai/serving_embedding.py | 4 +-- vllm/entrypoints/openai/serving_engine.py | 26 +++++++++------- vllm/entrypoints/openai/serving_pooling.py | 6 ++-- vllm/entrypoints/openai/serving_responses.py | 10 +++---- vllm/entrypoints/openai/serving_score.py | 4 +-- vllm/entrypoints/openai/speech_to_text.py | 8 ++--- 13 files changed, 64 insertions(+), 81 deletions(-) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index 86c836bd5a39..036da63e5b45 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -12,7 +12,8 @@ from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig from vllm.core.interfaces import AllocStatus -from vllm.core.scheduler import Scheduler, SchedulingBudget, SchedulerWaitingQueueFullError +from vllm.core.scheduler import (Scheduler, SchedulerWaitingQueueFullError, + SchedulingBudget) from vllm.lora.request import LoRARequest from vllm.sequence import SequenceGroup, SequenceStatus @@ -101,12 +102,13 @@ def test_scheduler_max_waiting_queue_length(): block_size=block_size) with pytest.raises(SchedulerWaitingQueueFullError) as excinfo: scheduler.add_seq_group(seq_group) - + assert "Scheduler waiting queue is full" in str(excinfo.value) assert f"request {max_waiting_queue_length}" in str(excinfo.value) - + # Verify that the number of unfinished seq groups hasn't changed - assert scheduler.get_num_unfinished_seq_groups() == max_waiting_queue_length + assert scheduler.get_num_unfinished_seq_groups( + ) == max_waiting_queue_length def test_scheduler_max_waiting_queue_length_disabled(): diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 5e798e2ddcbd..61b61477e827 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -29,6 +29,7 @@ class SchedulerWaitingQueueFullError(Exception): """Raised when the scheduler waiting queue is full and cannot accept new requests.""" pass + # Test-only. If configured, decode is preempted with # ARTIFICIAL_PREEMPTION_PROB% probability. ENABLE_ARTIFICIAL_PREEMPT = bool( @@ -556,8 +557,9 @@ def num_decoding_tokens_per_seq(self) -> int: def add_seq_group(self, seq_group: SequenceGroup) -> None: # Add sequence groups to the waiting queue. - if (self.scheduler_config.max_waiting_queue_length is not None and - len(self.waiting) >= self.scheduler_config.max_waiting_queue_length): + if (self.scheduler_config.max_waiting_queue_length is not None + and len(self.waiting) + >= self.scheduler_config.max_waiting_queue_length): raise SchedulerWaitingQueueFullError( f"Scheduler waiting queue is full. Cannot add request {seq_group.request_id}." ) diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 1cb0c345e0bf..63203c1d5c00 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -441,7 +441,8 @@ class EngineArgs: async_scheduling: bool = SchedulerConfig.async_scheduling - max_waiting_queue_length: Optional[int] = SchedulerConfig.max_waiting_queue_length + max_waiting_queue_length: Optional[ + int] = SchedulerConfig.max_waiting_queue_length def __post_init__(self): # support `EngineArgs(compilation_config={...})` @@ -845,8 +846,9 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: **scheduler_kwargs["disable_hybrid_kv_cache_manager"]) scheduler_group.add_argument("--async-scheduling", **scheduler_kwargs["async_scheduling"]) - scheduler_group.add_argument("--max-waiting-queue-length", - **scheduler_kwargs["max_waiting_queue_length"]) + scheduler_group.add_argument( + "--max-waiting-queue-length", + **scheduler_kwargs["max_waiting_queue_length"]) # vLLM arguments vllm_kwargs = get_kwargs(VllmConfig) diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 4dc0ce226757..820ba04a30ff 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -13,7 +13,8 @@ import vllm.envs as envs from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, VllmConfig) -from vllm.core.scheduler import SchedulerOutputs, SchedulerWaitingQueueFullError +from vllm.core.scheduler import (SchedulerOutputs, + SchedulerWaitingQueueFullError) from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout from vllm.engine.llm_engine import LLMEngine, SchedulerOutputState diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 90399640c566..ca2eb179eee5 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -6,7 +6,6 @@ import time from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import Sequence as GenericSequence -from http import HTTPStatus from typing import Callable, Final, Optional, Union import jinja2 @@ -16,7 +15,6 @@ from pydantic import TypeAdapter from vllm.config import ModelConfig -from vllm.core.scheduler import SchedulerWaitingQueueFullError from vllm.engine.protocol import EngineClient from vllm.entrypoints.chat_utils import (ChatTemplateContentFormatOption, ConversationMessage, @@ -268,13 +266,7 @@ async def create_chat_completion( generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) - except SchedulerWaitingQueueFullError as e: - return self.create_error_response( - str(e), - err_type="ServiceUnavailableError", - status_code=HTTPStatus.SERVICE_UNAVAILABLE - ) + return self.create_error_response(e) assert len(generators) == 1 result_generator, = generators @@ -297,13 +289,7 @@ async def create_chat_completion( conversation, tokenizer, request_metadata) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) - except SchedulerWaitingQueueFullError as e: - return self.create_error_response( - str(e), - err_type="ServiceUnavailableError", - status_code=HTTPStatus.SERVICE_UNAVAILABLE - ) + return self.create_error_response(e) def get_chat_request_role(self, request: ChatCompletionRequest) -> str: if request.add_generation_prompt: @@ -484,7 +470,7 @@ async def chat_completion_stream_generator( reasoning_parser = self.reasoning_parser(tokenizer) except RuntimeError as e: logger.exception("Error in reasoning parser creation.") - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" return @@ -498,7 +484,7 @@ async def chat_completion_stream_generator( tool_parsers = [None] * num_choices except Exception as e: logger.exception("Error in tool parser creation.") - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" return @@ -949,7 +935,7 @@ async def chat_completion_stream_generator( except Exception as e: # TODO: Use a vllm-specific Validation Error logger.exception("Error in chat completion stream generator.") - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" # Send the final done message after all response.n are finished yield "data: [DONE]\n\n" @@ -975,7 +961,7 @@ async def chat_completion_full_generator( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) assert final_res is not None @@ -1004,7 +990,7 @@ async def chat_completion_full_generator( reasoning_parser = self.reasoning_parser(tokenizer) except RuntimeError as e: logger.exception("Error in reasoning parser creation.") - return self.create_error_response(str(e)) + return self.create_error_response(e) # If the reasoning parser is enabled, # tool calls are extracted exclusively from the content. reasoning_content, content = ( @@ -1079,7 +1065,7 @@ async def chat_completion_full_generator( tool_parser = self.tool_parser(tokenizer) except RuntimeError as e: logger.exception("Error in tool parser creation.") - return self.create_error_response(str(e)) + return self.create_error_response(e) tool_call_info = tool_parser.extract_tool_calls( content if content is not None else "", request=request) diff --git a/vllm/entrypoints/openai/serving_classification.py b/vllm/entrypoints/openai/serving_classification.py index 3ac4f01ea602..16376a478b65 100644 --- a/vllm/entrypoints/openai/serving_classification.py +++ b/vllm/entrypoints/openai/serving_classification.py @@ -73,7 +73,7 @@ async def _preprocess( except (ValueError, TypeError) as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) def _build_response( self, diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index c2101d94ba1b..8e0e117347eb 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -5,7 +5,6 @@ import time from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import Sequence as GenericSequence -from http import HTTPStatus from typing import Optional, Union, cast import jinja2 @@ -13,7 +12,6 @@ from typing_extensions import assert_never from vllm.config import ModelConfig -from vllm.core.scheduler import SchedulerWaitingQueueFullError from vllm.engine.protocol import EngineClient from vllm.entrypoints.logger import RequestLogger # yapf conflicts with isort for this block @@ -139,16 +137,16 @@ async def create_completion( ) except ValueError as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) except TypeError as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) except RuntimeError as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) except jinja2.TemplateError as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) # Schedule the request and get the result generator. generators: list[AsyncGenerator[RequestOutput, None]] = [] @@ -231,13 +229,7 @@ async def create_completion( generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) - except SchedulerWaitingQueueFullError as e: - return self.create_error_response( - str(e), - err_type="ServiceUnavailableError", - status_code=HTTPStatus.SERVICE_UNAVAILABLE - ) + return self.create_error_response(e) result_generator = merge_async_iterators(*generators) @@ -301,13 +293,7 @@ async def create_completion( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) - except SchedulerWaitingQueueFullError as e: - return self.create_error_response( - str(e), - err_type="ServiceUnavailableError", - status_code=HTTPStatus.SERVICE_UNAVAILABLE - ) + return self.create_error_response(e) # When user requests streaming but we don't stream, we still need to # return a streaming response with a single event. @@ -489,7 +475,7 @@ async def completion_stream_generator( except Exception as e: # TODO: Use a vllm-specific Validation Error - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" diff --git a/vllm/entrypoints/openai/serving_embedding.py b/vllm/entrypoints/openai/serving_embedding.py index e87decfe636a..9938567c7652 100644 --- a/vllm/entrypoints/openai/serving_embedding.py +++ b/vllm/entrypoints/openai/serving_embedding.py @@ -95,7 +95,7 @@ async def _preprocess( return None except (ValueError, TypeError) as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) def _build_response( self, @@ -196,6 +196,6 @@ def _validate_request( try: pooling_params.verify(self.model_config) except ValueError as e: - return self.create_error_response(str(e)) + return self.create_error_response(e) return None diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 7a4f0ca9d046..ecd7645a2c4e 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -364,15 +364,9 @@ async def _prepare_generators( return None - except SchedulerWaitingQueueFullError as e: - return self.create_error_response( - str(e), - err_type="ServiceUnavailableError", - status_code=HTTPStatus.SERVICE_UNAVAILABLE - ) except Exception as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) async def _collect_batch( self, @@ -407,20 +401,30 @@ async def _collect_batch( return None except Exception as e: - return self.create_error_response(str(e)) + return self.create_error_response(e) def create_error_response( self, - message: str, + message: Union[str, Exception], err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> ErrorResponse: - return ErrorResponse(message=message, + # Handle SchedulerWaitingQueueFullError automatically + if isinstance(message, SchedulerWaitingQueueFullError): + return ErrorResponse(message=str(message), + type="ServiceUnavailableError", + code=HTTPStatus.SERVICE_UNAVAILABLE.value) + elif isinstance(message, Exception): + message_str = str(message) + else: + message_str = message + + return ErrorResponse(message=message_str, type=err_type, code=status_code.value) def create_streaming_error_response( self, - message: str, + message: Union[str, Exception], err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> str: json_str = json.dumps({ diff --git a/vllm/entrypoints/openai/serving_pooling.py b/vllm/entrypoints/openai/serving_pooling.py index c2ed50d04d12..8b8d38daaf6d 100644 --- a/vllm/entrypoints/openai/serving_pooling.py +++ b/vllm/entrypoints/openai/serving_pooling.py @@ -135,7 +135,7 @@ async def create_pooling( ) except (ValueError, TypeError, jinja2.TemplateError) as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) # Schedule the request and get the result generator. generators: list[AsyncGenerator[PoolingRequestOutput, None]] = [] @@ -166,7 +166,7 @@ async def create_pooling( generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) result_generator = merge_async_iterators(*generators) @@ -195,7 +195,7 @@ async def create_pooling( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) return response diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index a359371848ce..1e8e253a7e8a 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -187,7 +187,7 @@ async def create_responses( generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) assert len(generators) == 1 result_generator, = generators @@ -244,7 +244,7 @@ async def create_responses( request_metadata, ) except Exception as e: - return self.create_error_response(str(e)) + return self.create_error_response(e) async def responses_full_generator( self, @@ -267,7 +267,7 @@ async def responses_full_generator( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) assert final_res is not None assert len(final_res.outputs) == 1 @@ -278,7 +278,7 @@ async def responses_full_generator( reasoning_parser = self.reasoning_parser(tokenizer) except RuntimeError as e: logger.exception("Error in reasoning parser creation.") - return self.create_error_response(str(e)) + return self.create_error_response(e) reasoning_content, content = ( reasoning_parser.extract_reasoning_content(final_output.text, @@ -391,7 +391,7 @@ async def _run_background_request( except Exception as e: logger.exception("Background request failed for %s", request.request_id) - response = self.create_error_response(str(e)) + response = self.create_error_response(e) if isinstance(response, ErrorResponse): # If the request has failed, update the status to "failed". diff --git a/vllm/entrypoints/openai/serving_score.py b/vllm/entrypoints/openai/serving_score.py index 8d47a417f9cd..9005350eacdf 100644 --- a/vllm/entrypoints/openai/serving_score.py +++ b/vllm/entrypoints/openai/serving_score.py @@ -385,7 +385,7 @@ async def create_score( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) async def do_rerank( self, @@ -431,7 +431,7 @@ async def do_rerank( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) def request_output_to_score_response( self, diff --git a/vllm/entrypoints/openai/speech_to_text.py b/vllm/entrypoints/openai/speech_to_text.py index 09b346dcef6b..d5cd70d6de0b 100644 --- a/vllm/entrypoints/openai/speech_to_text.py +++ b/vllm/entrypoints/openai/speech_to_text.py @@ -171,7 +171,7 @@ async def _create_speech_to_text( except ValueError as e: logger.exception("Error in preprocessing prompt inputs") - return self.create_error_response(str(e)) + return self.create_error_response(e) list_result_generator: Optional[list[AsyncGenerator[RequestOutput, None]]] = None @@ -200,7 +200,7 @@ async def _create_speech_to_text( ] except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) if request.stream: return stream_generator_method(request, list_result_generator, @@ -218,7 +218,7 @@ async def _create_speech_to_text( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) async def _speech_to_text_stream_generator( self, @@ -324,7 +324,7 @@ async def _speech_to_text_stream_generator( except Exception as e: # TODO: Use a vllm-specific Validation Error logger.exception("Error in %s stream generator.", self.task_type) - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" # Send the final done message after all response.n are finished yield "data: [DONE]\n\n" From 891d9ba4f08d1a1fc6a27f66eaccc8c6f854a17e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrej=20Chud=C3=BD?= Date: Mon, 21 Jul 2025 11:05:06 +0200 Subject: [PATCH 3/3] Move implemented logic to V1 codepath MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Andrej Chudý --- tests/core/test_scheduler.py | 67 +------------- tests/v1/core/test_scheduler.py | 107 ++++++++++++++++++++++ tests/v1/core/utils.py | 2 + vllm/config.py | 13 +++ vllm/core/scheduler.py | 12 --- vllm/engine/arg_utils.py | 7 -- vllm/engine/async_llm_engine.py | 10 +- vllm/entrypoints/openai/serving_engine.py | 16 ++-- vllm/v1/core/sched/scheduler.py | 9 ++ vllm/v1/engine/async_llm.py | 15 ++- vllm/v1/engine/exceptions.py | 6 ++ 11 files changed, 162 insertions(+), 102 deletions(-) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index 036da63e5b45..591e1780c11c 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -12,8 +12,7 @@ from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig from vllm.core.interfaces import AllocStatus -from vllm.core.scheduler import (Scheduler, SchedulerWaitingQueueFullError, - SchedulingBudget) +from vllm.core.scheduler import Scheduler, SchedulingBudget from vllm.lora.request import LoRARequest from vllm.sequence import SequenceGroup, SequenceStatus @@ -72,70 +71,6 @@ def test_scheduler_abort_seq_group(): assert scheduler.get_num_unfinished_seq_groups() == 0 -def test_scheduler_max_waiting_queue_length(): - """Test that scheduler respects max_waiting_queue_length setting.""" - block_size = 4 - max_waiting_queue_length = 2 - scheduler_config = SchedulerConfig( - "generate", - max_num_batched_tokens=100, - max_num_seqs=64, - max_model_len=1, - max_waiting_queue_length=max_waiting_queue_length, - ) - cache_config = CacheConfig(block_size, 1.0, 1, "auto") - cache_config.num_cpu_blocks = 4 - cache_config.num_gpu_blocks = 4 - scheduler = Scheduler(scheduler_config, cache_config, None) - - # Add seq groups up to the limit - for i in range(max_waiting_queue_length): - _, seq_group = create_dummy_prompt(str(i), - block_size, - block_size=block_size) - scheduler.add_seq_group(seq_group) - assert scheduler.get_num_unfinished_seq_groups() == i + 1 - - # Adding one more should raise SchedulerWaitingQueueFullError - _, seq_group = create_dummy_prompt(str(max_waiting_queue_length), - block_size, - block_size=block_size) - with pytest.raises(SchedulerWaitingQueueFullError) as excinfo: - scheduler.add_seq_group(seq_group) - - assert "Scheduler waiting queue is full" in str(excinfo.value) - assert f"request {max_waiting_queue_length}" in str(excinfo.value) - - # Verify that the number of unfinished seq groups hasn't changed - assert scheduler.get_num_unfinished_seq_groups( - ) == max_waiting_queue_length - - -def test_scheduler_max_waiting_queue_length_disabled(): - """Test that scheduler allows unlimited queue when max_waiting_queue_length is None.""" - block_size = 4 - scheduler_config = SchedulerConfig( - "generate", - max_num_batched_tokens=100, - max_num_seqs=64, - max_model_len=1, - max_waiting_queue_length=None, # No limit - ) - cache_config = CacheConfig(block_size, 1.0, 1, "auto") - cache_config.num_cpu_blocks = 4 - cache_config.num_gpu_blocks = 4 - scheduler = Scheduler(scheduler_config, cache_config, None) - - # Add many seq groups - should not raise an exception - num_seq_groups = 10 - for i in range(num_seq_groups): - _, seq_group = create_dummy_prompt(str(i), - block_size, - block_size=block_size) - scheduler.add_seq_group(seq_group) - assert scheduler.get_num_unfinished_seq_groups() == i + 1 - - def test_scheduler_schedule_simple(): block_size = 4 num_seq_group = 4 diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index a858a4d8c823..ec928c0fe104 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -12,6 +12,7 @@ from vllm.sampling_params import GuidedDecodingParams, SamplingParams from vllm.v1.core.sched.output import CachedRequestData, SchedulerOutput from vllm.v1.core.sched.scheduler import Scheduler +from vllm.v1.engine.exceptions import SchedulerWaitingQueueFullError from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig, KVCacheGroupSpec) from vllm.v1.outputs import ModelRunnerOutput @@ -1832,3 +1833,109 @@ def test_schedule_skip_tokenizer_init_structured_output_request(): assert len(output.scheduled_new_reqs) == 0 assert len(scheduler.running) == 0 assert len(scheduler.waiting) == 1 + + +def test_scheduler_max_waiting_queue_length(): + """Test that V1 scheduler respects max_waiting_queue_length setting.""" + max_waiting_queue_length = 2 + scheduler = create_scheduler( + max_num_seqs=64, + max_num_batched_tokens=100, + max_waiting_queue_length=max_waiting_queue_length, + ) + requests = create_requests(num_requests=max_waiting_queue_length) + + # Add requests up to the limit + for i, request in enumerate(requests): + scheduler.add_request(request) + assert len(scheduler.waiting) == i + 1 + + assert len(scheduler.waiting) == max_waiting_queue_length + # Try to add one more request - should raise exception + overflow_request = create_requests(num_requests=1)[0] + overflow_request.request_id = "overflow" + + with pytest.raises(SchedulerWaitingQueueFullError, + match="Scheduler waiting queue is full"): + scheduler.add_request(overflow_request) + + # Verify that the queue size hasn't changed + assert len(scheduler.waiting) == max_waiting_queue_length + + +def test_scheduler_max_waiting_queue_length_disabled(): + """Test that V1 scheduler allows unlimited queue when + max_waiting_queue_length is None.""" + scheduler = create_scheduler( + max_num_seqs=64, + max_num_batched_tokens=100, + max_waiting_queue_length=None, # No limit + ) + + # Add many requests - should not raise an exception + num_requests = 10 + requests = create_requests(num_requests=num_requests) + for i, request in enumerate(requests): + scheduler.add_request(request) + assert len(scheduler.waiting) == i + 1 + + +def test_scheduler_max_waiting_queue_length_with_scheduling(): + """Test max_waiting_queue_length behavior when requests are being + scheduled.""" + + max_waiting_queue_length = 2 + scheduler = create_scheduler( + max_num_seqs=1, # Only 1 can run at once, forcing others to wait + max_num_batched_tokens=100, + max_waiting_queue_length=max_waiting_queue_length, + ) + + # Add requests up to the waiting queue limit + requests = create_requests(num_requests=max_waiting_queue_length) + + # Add requests up to the limit + for request in requests: + scheduler.add_request(request) + + # All requests should be in waiting queue initially + assert len(scheduler.waiting) == max_waiting_queue_length + assert len(scheduler.running) == 0 + + # Schedule one request (should move 1 from waiting to running) + output = scheduler.schedule() + assert len(output.scheduled_new_reqs) == 1 # max_num_seqs = 1 + assert len(scheduler.running) == 1 + assert len( + scheduler.waiting) == max_waiting_queue_length - 1 # 1 left in waiting + + # Now add one more request to fill the waiting queue back to its limit + additional_request = create_requests(num_requests=1)[0] + additional_request.request_id = "additional" + scheduler.add_request(additional_request) + + assert len( + scheduler.waiting) == max_waiting_queue_length # back to full capacity + + # Try to add one more request - should raise exception + overflow_request = create_requests(num_requests=1)[0] + overflow_request.request_id = "overflow" + + with pytest.raises(SchedulerWaitingQueueFullError, + match="Scheduler waiting queue is full"): + scheduler.add_request(overflow_request) + + # Verify queue sizes are unchanged + assert len(scheduler.waiting) == max_waiting_queue_length + assert len(scheduler.running) == 1 + + +def test_scheduler_max_waiting_queue_length_zero(): + """Test that max_waiting_queue_length=0 raises ValueError.""" + with pytest.raises(ValueError, + match="max_waiting_queue_length cannot be 0"): + create_scheduler( + max_num_seqs=1, # Only 1 can run at once + max_num_batched_tokens=100, + max_waiting_queue_length=0, # Should raise ValueError + ) diff --git a/tests/v1/core/utils.py b/tests/v1/core/utils.py index 0b7d8251b640..07175396b0d0 100644 --- a/tests/v1/core/utils.py +++ b/tests/v1/core/utils.py @@ -32,6 +32,7 @@ def create_scheduler( num_speculative_tokens: Optional[int] = None, skip_tokenizer_init: bool = False, async_scheduling: bool = False, + max_waiting_queue_length: Optional[int] = None, ) -> Union[Scheduler, AsyncScheduler]: '''Create scheduler under test. @@ -56,6 +57,7 @@ def create_scheduler( disable_chunked_mm_input=disable_chunked_mm_input, enable_chunked_prefill=True, async_scheduling=async_scheduling, + max_waiting_queue_length=max_waiting_queue_length, ) model_config = ModelConfig( model=model, diff --git a/vllm/config.py b/vllm/config.py index 983a3f9fe609..272e6a317826 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -2460,6 +2460,19 @@ def _verify_args(self) -> Self: def is_multi_step(self) -> bool: return self.num_scheduler_steps > 1 + @field_validator("max_waiting_queue_length") + @classmethod + def validate_max_waiting_queue_length( + cls, value: Optional[int]) -> Optional[int]: + if value == 0: + raise ValueError( + "max_waiting_queue_length cannot be 0. Use None for unlimited " + "queue or a positive integer for a limited queue.") + if value is not None and value < 0: + raise ValueError( + "max_waiting_queue_length must be None or a positive integer") + return value + Device = Literal["auto", "cuda", "neuron", "cpu", "tpu", "xpu"] diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 61b61477e827..0ef0396996b6 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -24,12 +24,6 @@ logger = init_logger(__name__) - -class SchedulerWaitingQueueFullError(Exception): - """Raised when the scheduler waiting queue is full and cannot accept new requests.""" - pass - - # Test-only. If configured, decode is preempted with # ARTIFICIAL_PREEMPTION_PROB% probability. ENABLE_ARTIFICIAL_PREEMPT = bool( @@ -557,12 +551,6 @@ def num_decoding_tokens_per_seq(self) -> int: def add_seq_group(self, seq_group: SequenceGroup) -> None: # Add sequence groups to the waiting queue. - if (self.scheduler_config.max_waiting_queue_length is not None - and len(self.waiting) - >= self.scheduler_config.max_waiting_queue_length): - raise SchedulerWaitingQueueFullError( - f"Scheduler waiting queue is full. Cannot add request {seq_group.request_id}." - ) self.waiting.append(seq_group) def _add_seq_group_to_running(self, seq_group: SequenceGroup) -> None: diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 63203c1d5c00..b20defde73ed 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -441,9 +441,6 @@ class EngineArgs: async_scheduling: bool = SchedulerConfig.async_scheduling - max_waiting_queue_length: Optional[ - int] = SchedulerConfig.max_waiting_queue_length - def __post_init__(self): # support `EngineArgs(compilation_config={...})` # without having to manually construct a @@ -846,9 +843,6 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: **scheduler_kwargs["disable_hybrid_kv_cache_manager"]) scheduler_group.add_argument("--async-scheduling", **scheduler_kwargs["async_scheduling"]) - scheduler_group.add_argument( - "--max-waiting-queue-length", - **scheduler_kwargs["max_waiting_queue_length"]) # vLLM arguments vllm_kwargs = get_kwargs(VllmConfig) @@ -1237,7 +1231,6 @@ def create_engine_config( disable_hybrid_kv_cache_manager=self. disable_hybrid_kv_cache_manager, async_scheduling=self.async_scheduling, - max_waiting_queue_length=self.max_waiting_queue_length, ) if not model_config.is_multimodal_model and self.default_mm_loras: diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 820ba04a30ff..3d7d28055dd0 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -13,8 +13,7 @@ import vllm.envs as envs from vllm.config import (DecodingConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, VllmConfig) -from vllm.core.scheduler import (SchedulerOutputs, - SchedulerWaitingQueueFullError) +from vllm.core.scheduler import SchedulerOutputs from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_timeout import asyncio_timeout from vllm.engine.llm_engine import LLMEngine, SchedulerOutputState @@ -751,13 +750,6 @@ async def engine_step(self, virtual_engine: int) -> bool: e, verbose=self.log_requests, ) - except SchedulerWaitingQueueFullError as e: - # Handle scheduler queue full error - self._request_tracker.process_exception( - new_request["request_id"], - e, - verbose=self.log_requests, - ) if aborted_requests: await self._engine_abort(aborted_requests) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index ecd7645a2c4e..49ecb07b3525 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -25,7 +25,6 @@ import vllm.envs as envs from vllm.config import ModelConfig -from vllm.core.scheduler import SchedulerWaitingQueueFullError from vllm.engine.protocol import EngineClient # yapf conflicts with isort for this block # yapf: disable @@ -77,6 +76,7 @@ from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer from vllm.utils import (AsyncMicrobatchTokenizer, is_list_of, merge_async_iterators, random_uuid) +from vllm.v1.engine.exceptions import SchedulerWaitingQueueFullError logger = init_logger(__name__) @@ -366,7 +366,7 @@ async def _prepare_generators( except Exception as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(e) + return self.create_error_response(str(e)) async def _collect_batch( self, @@ -401,18 +401,20 @@ async def _collect_batch( return None except Exception as e: - return self.create_error_response(e) + return self.create_error_response(str(e)) def create_error_response( self, message: Union[str, Exception], err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST) -> ErrorResponse: - # Handle SchedulerWaitingQueueFullError automatically + if isinstance(message, SchedulerWaitingQueueFullError): - return ErrorResponse(message=str(message), - type="ServiceUnavailableError", - code=HTTPStatus.SERVICE_UNAVAILABLE.value) + return ErrorResponse( + message=str(message), + type="ServiceUnavailableError", + code=HTTPStatus.SERVICE_UNAVAILABLE.value, + ) elif isinstance(message, Exception): message_str = str(message) else: diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 446f98034cb8..73043857a33a 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -28,6 +28,7 @@ from vllm.v1.core.sched.utils import check_stop from vllm.v1.engine import (EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs) +from vllm.v1.engine.exceptions import SchedulerWaitingQueueFullError from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.metrics.stats import SchedulerStats from vllm.v1.outputs import ModelRunnerOutput @@ -957,6 +958,14 @@ def get_request_counts(self) -> tuple[int, int]: return len(self.running), len(self.waiting) def add_request(self, request: Request) -> None: + # Check if the waiting queue has reached its maximum capacity + if (self.scheduler_config.max_waiting_queue_length is not None + and len(self.waiting) + >= self.scheduler_config.max_waiting_queue_length): + raise SchedulerWaitingQueueFullError( + f"Scheduler waiting queue is full. Cannot add request " + f"{request.request_id}.") + self.waiting.add_request(request) self.requests[request.request_id] = request if self.log_stats: diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 3754570dfaaa..562b9ea04654 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -29,7 +29,8 @@ from vllm.utils import Device, cdiv from vllm.v1.engine import EngineCoreRequest from vllm.v1.engine.core_client import EngineCoreClient -from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError +from vllm.v1.engine.exceptions import (EngineDeadError, EngineGenerateError, + SchedulerWaitingQueueFullError) from vllm.v1.engine.output_processor import (OutputProcessor, RequestOutputCollector) from vllm.v1.engine.parallel_sampling import ParentRequest @@ -351,6 +352,12 @@ async def generate( logger.info("Request %s failed (bad request).", request_id) raise + # Scheduler waiting queue is full. + except SchedulerWaitingQueueFullError: + if self.log_requests: + logger.info("Request %s failed (queue full).", request_id) + raise + # Unexpected error in the generate() task (possibly recoverable). except Exception as e: await self.abort(request_id) @@ -513,6 +520,12 @@ async def encode( logger.info("Request %s failed (bad request).", request_id) raise + # Scheduler waiting queue is full. + except SchedulerWaitingQueueFullError: + if self.log_requests: + logger.info("Request %s failed (queue full).", request_id) + raise + # Unexpected error in the generate() task (possibly recoverable). except Exception as e: await self.abort(request_id) diff --git a/vllm/v1/engine/exceptions.py b/vllm/v1/engine/exceptions.py index 692ba9dc840f..6da128a0ef6c 100644 --- a/vllm/v1/engine/exceptions.py +++ b/vllm/v1/engine/exceptions.py @@ -15,3 +15,9 @@ def __init__(self, *args, suppress_context: bool = False, **kwargs): # Make stack trace clearer when using with LLMEngine by # silencing irrelevant ZMQError. self.__suppress_context__ = suppress_context + + +class SchedulerWaitingQueueFullError(Exception): + """Raised when the scheduler's waiting queue is full and cannot accept + new requests.""" + pass