diff --git a/tests/entrypoints/openai/chat_completion/test_chat_error.py b/tests/entrypoints/openai/chat_completion/test_chat_error.py index 5fd7bc09c273..fb20601c030d 100644 --- a/tests/entrypoints/openai/chat_completion/test_chat_error.py +++ b/tests/entrypoints/openai/chat_completion/test_chat_error.py @@ -10,10 +10,10 @@ from vllm.config.multimodal import MultiModalConfig from vllm.entrypoints.openai.chat_completion.protocol import ChatCompletionRequest from vllm.entrypoints.openai.chat_completion.serving import OpenAIServingChat -from vllm.entrypoints.openai.engine.protocol import GenerationError from vllm.entrypoints.openai.models.protocol import BaseModelPath from vllm.entrypoints.openai.models.serving import OpenAIServingModels from vllm.entrypoints.serve.render.serving import OpenAIServingRender +from vllm.exceptions import GenerationError from vllm.outputs import CompletionOutput, RequestOutput from vllm.renderers.hf import HfRenderer from vllm.tokenizers.registry import tokenizer_args_from_config diff --git a/tests/entrypoints/openai/completion/test_completion_error.py b/tests/entrypoints/openai/completion/test_completion_error.py index c914e427d59c..d7b53f62c2ba 100644 --- a/tests/entrypoints/openai/completion/test_completion_error.py +++ b/tests/entrypoints/openai/completion/test_completion_error.py @@ -10,10 +10,10 @@ from vllm.config.multimodal import MultiModalConfig from vllm.entrypoints.openai.completion.protocol import CompletionRequest from vllm.entrypoints.openai.completion.serving import OpenAIServingCompletion -from vllm.entrypoints.openai.engine.protocol import GenerationError from vllm.entrypoints.openai.models.protocol import BaseModelPath from vllm.entrypoints.openai.models.serving import OpenAIServingModels from vllm.entrypoints.serve.render.serving import OpenAIServingRender +from vllm.exceptions import GenerationError from vllm.outputs import CompletionOutput, RequestOutput from vllm.renderers.hf import HfRenderer from vllm.tokenizers.registry import tokenizer_args_from_config diff --git a/tests/v1/core/test_scheduler.py b/tests/v1/core/test_scheduler.py index 2fe45242153c..1e12a5d9335f 100644 --- a/tests/v1/core/test_scheduler.py +++ b/tests/v1/core/test_scheduler.py @@ -4140,3 +4140,45 @@ def test_eagle3_mm_encoder_cache_with_shift(): f"shifted_end={scheduled_end_with_shift}) overlapping MM at " f"{start_pos}. The fix must schedule encoder inputs." ) + + +def test_max_waiting_queue_length(): + """Test that requests are rejected when waiting queue is full.""" + # Create scheduler with max_waiting_queue_length=5 + scheduler = create_scheduler(max_waiting_queue_length=5) + + # Add 5 requests (should succeed) + requests = create_requests(num_requests=5) + for request in requests: + scheduler.add_request(request) + + assert len(scheduler.waiting) == 5 + assert len(scheduler.rejected) == 0 + + # Add 1 more request (should be rejected) + extra_request = create_requests(num_requests=1, req_ids=["extra"]) + scheduler.add_request(extra_request[0]) + + # The extra request should be rejected + assert len(scheduler.waiting) == 5 + assert len(scheduler.rejected) == 1 + assert scheduler.rejected[0].request_id == "extra" + assert scheduler.rejected[0].status == RequestStatus.FINISHED_REJECTED + + # Schedule: the 5 waiting requests are scheduled + output = scheduler.schedule() + assert len(output.scheduled_new_reqs) == 5 + # rejected request not in finished_req_ids yet (handled in update_from_output) + assert "extra" not in output.finished_req_ids + + +def test_max_waiting_queue_length_default(): + """Test that default max_waiting_queue_length works.""" + # Default is 4096, should handle large number of requests + scheduler = create_scheduler(max_waiting_queue_length=4096) + requests = create_requests(num_requests=100) + for request in requests: + scheduler.add_request(request) + + assert len(scheduler.waiting) == 100 + assert len(scheduler.rejected) == 0 diff --git a/tests/v1/core/utils.py b/tests/v1/core/utils.py index 2d9834d2e3a6..69ae7413d521 100644 --- a/tests/v1/core/utils.py +++ b/tests/v1/core/utils.py @@ -57,6 +57,7 @@ def create_scheduler( pipeline_parallel_size: int = 1, use_ec_connector: bool = False, ec_role: str | None = None, + max_waiting_queue_length: int = 4096, ) -> Scheduler | AsyncScheduler: """Create scheduler under test. @@ -89,6 +90,7 @@ def create_scheduler( enable_chunked_prefill=enable_chunked_prefill, async_scheduling=async_scheduling, is_encoder_decoder=model_config.is_encoder_decoder, + max_waiting_queue_length=max_waiting_queue_length, ) # Cache config, optionally force APC cache_config = CacheConfig( diff --git a/vllm/config/scheduler.py b/vllm/config/scheduler.py index 9f6284c4b389..33d3d7fc19da 100644 --- a/vllm/config/scheduler.py +++ b/vllm/config/scheduler.py @@ -41,7 +41,7 @@ class SchedulerConfig: DEFAULT_MAX_NUM_BATCHED_TOKENS: ClassVar[int] = 2048 DEFAULT_MAX_NUM_SEQS: ClassVar[int] = 128 - + DEFAULT_MAX_WAITING_QUEUE_LEN: ClassVar[int] = 4096 runner_type: RunnerType = "generate" """The runner type to launch for the model.""" @@ -66,6 +66,15 @@ class SchedulerConfig: In real usage, this should be set in `EngineArgs.create_engine_config`. """ + max_waiting_queue_length: int = Field(default=DEFAULT_MAX_WAITING_QUEUE_LEN, ge=1) + """ + The maximum number of requests allowed in the waiting queue. + If the waiting queue is full, new incoming requests will be rejected. + + [Experimental] This parameter is an experimental feature and + may change or be removed in future releases. + """ + max_num_partial_prefills: int = Field(default=1, ge=1) """For chunked prefill, the maximum number of sequences that can be partially prefilled concurrently.""" diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index d0bdd4916144..3c3ad17699ba 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -461,6 +461,9 @@ class EngineArgs: max_long_partial_prefills: int = SchedulerConfig.max_long_partial_prefills long_prefill_token_threshold: int = SchedulerConfig.long_prefill_token_threshold max_num_seqs: int | None = None + max_waiting_queue_length: int = get_field( + SchedulerConfig, "max_waiting_queue_length" + ) max_logprobs: int = ModelConfig.max_logprobs logprobs_mode: LogprobsMode = ModelConfig.logprobs_mode disable_log_stats: bool = False @@ -1196,6 +1199,9 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: "--max-long-partial-prefills", **scheduler_kwargs["max_long_partial_prefills"], ) + scheduler_group.add_argument( + "--max-waiting-queue-length", **scheduler_kwargs["max_waiting_queue_length"] + ) scheduler_group.add_argument( "--long-prefill-token-threshold", **scheduler_kwargs["long_prefill_token_threshold"], @@ -1782,6 +1788,7 @@ def create_engine_config( runner_type=model_config.runner_type, max_num_batched_tokens=self.max_num_batched_tokens, max_num_seqs=self.max_num_seqs, + max_waiting_queue_length=self.max_waiting_queue_length, max_model_len=model_config.max_model_len, enable_chunked_prefill=self.enable_chunked_prefill, disable_chunked_mm_input=self.disable_chunked_mm_input, diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 39e9076a7cc6..0a2686700dd9 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -29,7 +29,6 @@ from vllm.entrypoints.launcher import serve_http from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.cli_args import make_arg_parser, validate_parsed_serve_args -from vllm.entrypoints.openai.engine.protocol import GenerationError from vllm.entrypoints.openai.models.protocol import BaseModelPath from vllm.entrypoints.openai.models.serving import OpenAIServingModels from vllm.entrypoints.openai.server_utils import ( @@ -54,6 +53,7 @@ log_version_and_model, process_lora_modules, ) +from vllm.exceptions import GenerationError from vllm.logger import init_logger from vllm.reasoning import ReasoningParserManager from vllm.tasks import POOLING_TASKS, SupportedTask diff --git a/vllm/entrypoints/openai/engine/protocol.py b/vllm/entrypoints/openai/engine/protocol.py index 8f6cdb3e6241..8949d3ef92ad 100644 --- a/vllm/entrypoints/openai/engine/protocol.py +++ b/vllm/entrypoints/openai/engine/protocol.py @@ -4,7 +4,6 @@ # Adapted from # https://github.com/lm-sys/FastChat/blob/168ccc29d3f7edc50823016105c024fe2282732a/fastchat/protocol/openai_api_protocol.py import time -from http import HTTPStatus from typing import Any, ClassVar, Literal, TypeAlias import regex as re @@ -260,11 +259,3 @@ class DeltaMessage(OpenAIBaseModel): content: str | None = None reasoning: str | None = None tool_calls: list[DeltaToolCall] = Field(default_factory=list) - - -class GenerationError(Exception): - """raised when finish_reason indicates internal server error (500)""" - - def __init__(self, message: str = "Internal server error"): - super().__init__(message) - self.status_code = HTTPStatus.INTERNAL_SERVER_ERROR diff --git a/vllm/entrypoints/openai/engine/serving.py b/vllm/entrypoints/openai/engine/serving.py index 405db1a134c1..0c8319035bc6 100644 --- a/vllm/entrypoints/openai/engine/serving.py +++ b/vllm/entrypoints/openai/engine/serving.py @@ -38,7 +38,6 @@ ErrorResponse, FunctionCall, FunctionDefinition, - GenerationError, ) from vllm.entrypoints.openai.models.serving import OpenAIServingModels from vllm.entrypoints.openai.responses.protocol import ( @@ -71,7 +70,7 @@ TokenizeResponse, ) from vllm.entrypoints.utils import create_error_response -from vllm.exceptions import VLLMValidationError +from vllm.exceptions import GenerationError, RequestRejectedError, VLLMValidationError from vllm.inputs.data import ( ProcessorInputs, PromptType, @@ -575,23 +574,32 @@ def create_streaming_error_response( return json_str def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None: - """Raise GenerationError if finish_reason indicates an error.""" + """ + Raise appropriate exception if finish_reason indicates an error or rejection. + """ if finish_reason == "error": logger.error( "Request %s failed with an internal error during generation", request_id, ) raise GenerationError("Internal server error") + if finish_reason == "rejected": + logger.warning_every_n( + "Request %s was rejected due to " + "a full waiting queue (log every 100 requests)", + request_id, + ) + raise RequestRejectedError() - def _convert_generation_error_to_streaming_response( - self, e: GenerationError - ) -> str: - """Convert GenerationError to streaming error response.""" - return self.create_streaming_error_response( - str(e), - err_type="InternalServerError", - status_code=e.status_code, - ) + def _convert_generation_error_to_streaming_response(self, e: Exception) -> str: + """ + Convert GenerationError or RequestRejectedError to streaming error response. + """ + if isinstance(e, GenerationError): + return self.create_streaming_error_response( + str(e), err_type=e.err_type, status_code=e.status_code + ) + return self.create_streaming_error_response(str(e)) async def _check_model( self, diff --git a/vllm/entrypoints/openai/server_utils.py b/vllm/entrypoints/openai/server_utils.py index 02b8c3352621..1ff45e86c0a4 100644 --- a/vllm/entrypoints/openai/server_utils.py +++ b/vllm/entrypoints/openai/server_utils.py @@ -24,10 +24,9 @@ from vllm.entrypoints.openai.engine.protocol import ( ErrorInfo, ErrorResponse, - GenerationError, ) from vllm.entrypoints.utils import create_error_response, sanitize_message -from vllm.exceptions import VLLMValidationError +from vllm.exceptions import GenerationError, VLLMValidationError from vllm.logger import init_logger from vllm.utils.gc_utils import freeze_gc_heap from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError diff --git a/vllm/entrypoints/utils.py b/vllm/entrypoints/utils.py index d5ecb75992fb..cec53118f567 100644 --- a/vllm/entrypoints/utils.py +++ b/vllm/entrypoints/utils.py @@ -20,10 +20,10 @@ from vllm.entrypoints.openai.engine.protocol import ( ErrorInfo, ErrorResponse, - GenerationError, StreamOptions, ) from vllm.entrypoints.openai.models.protocol import LoRAModulePath +from vllm.exceptions import GenerationError from vllm.logger import current_formatter_type, init_logger from vllm.platforms import current_platform from vllm.utils.argparse_utils import FlexibleArgumentParser diff --git a/vllm/exceptions.py b/vllm/exceptions.py index 931040b8ceb0..1f3412538447 100644 --- a/vllm/exceptions.py +++ b/vllm/exceptions.py @@ -3,6 +3,7 @@ """Custom exceptions for vLLM.""" +from http import HTTPStatus from typing import Any @@ -64,3 +65,29 @@ def __init__( def __str__(self): return self.message + + +class GenerationError(Exception): + """Raised when finish_reason indicates internal server error (500)""" + + def __init__( + self, + message: str = "Internal server error", + err_type: str = "InternalServerError", + status_code: HTTPStatus = HTTPStatus.INTERNAL_SERVER_ERROR, + ): + super().__init__(message) + self.err_type = err_type + self.status_code = status_code + + +class RequestRejectedError(GenerationError): + """Raised when finish_reason indicates the request was rejected + (e.g., queue full, rate-limited, etc).""" + + def __init__(self): + super().__init__( + message="Request was rejected", + err_type="ServiceUnavailableError", + status_code=HTTPStatus.SERVICE_UNAVAILABLE, + ) diff --git a/vllm/logger.py b/vllm/logger.py index e8aecead3adc..01a05ec3dc1a 100644 --- a/vllm/logger.py +++ b/vllm/logger.py @@ -7,12 +7,14 @@ import logging import os import sys -from collections.abc import Generator, Hashable +from collections import defaultdict +from collections.abc import Callable, Generator, Hashable from contextlib import contextmanager from functools import lru_cache, partial from logging import Logger from logging.config import dictConfig from os import path +from threading import Lock from types import MethodType from typing import Any, Literal, cast @@ -92,6 +94,24 @@ def _print_warning_once(logger: Logger, msg: str, *args: Hashable) -> None: LogScope = Literal["process", "global", "local"] +_warning_every_n_counters: defaultdict[tuple[LogScope, int, str], int] = defaultdict( + int +) +_warning_every_n_lock: Lock = Lock() + + +def _print_warning_every_n( + logger: Logger, msg: str, *args: Hashable, n: int = 100, scope: LogScope = "process" +) -> None: + key = (scope, id(logger), msg) + + with _warning_every_n_lock: + _warning_every_n_counters[key] += 1 + count = _warning_every_n_counters[key] + + if count == 1 or count % n == 0: + logger.warning(msg, *args) + def _should_log_with_scope(scope: LogScope) -> bool: """Decide whether to log based on scope""" @@ -147,12 +167,25 @@ def warning_once( return _print_warning_once(self, msg, *args) + def warning_every_n( + self, msg: str, *args: Hashable, n: int = 100, scope: LogScope = "process" + ) -> None: + """ + As `warning`, but only logs once every `n` calls. + """ + + if not _should_log_with_scope(scope): + return + + _print_warning_every_n(self, msg, *args, n=n) + # Pre-defined methods mapping to avoid repeated dictionary creation -_METHODS_TO_PATCH = { +_METHODS_TO_PATCH: dict[str, Callable[..., Any]] = { "debug_once": _VllmLogger.debug_once, "info_once": _VllmLogger.info_once, "warning_once": _VllmLogger.warning_once, + "warning_every_n": _VllmLogger.warning_every_n, } diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index 486ce8debc88..43044482846b 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -113,7 +113,7 @@ def __init__( self.kv_events_config is not None and self.kv_events_config.enable_kv_cache_events ) - + self.max_waiting_queue_length = self.scheduler_config.max_waiting_queue_length # Create KVConnector for the Scheduler. Note that each Worker # will have a corresponding KVConnector with Role=WORKER. # KV Connector pushes/pull of remote KVs for P/D and offloading. @@ -174,6 +174,9 @@ def __init__( # This is flushed at the end of each scheduling step. self.finished_req_ids: set[str] = set() + # Requests that are rejected due to a full waiting queue. + self.rejected: list[Request] = [] + # Counter for requests waiting for streaming input. Used to calculate # number of unfinished requests self.num_waiting_for_streaming_input: int = 0 @@ -1463,21 +1466,17 @@ def update_from_output( requests = [self.requests[req_id] for req_id in failed_kv_load_req_ids] self.finish_requests(failed_kv_load_req_ids, RequestStatus.FINISHED_ERROR) for request in requests: - outputs[request.client_index].append( - EngineCoreOutput( - request_id=request.request_id, - new_token_ids=[], - finish_reason=request.get_finished_reason(), - events=request.take_events(), - trace_headers=request.trace_headers, - num_cached_tokens=request.num_cached_tokens, - ) - ) + self._append_failed_or_rejected_output(outputs, request) # KV Connector: update state for finished KV Transfers. if kv_connector_output: self._update_from_kv_xfer_finished(kv_connector_output) - + # Handle rejected requests. + if rejected_reqs := self.rejected: + # Create EngineCoreOutputs for all rejected requests. + for request in rejected_reqs: + self._append_failed_or_rejected_output(outputs, request) + rejected_reqs.clear() # collect KV cache events from KV cache manager events = self.kv_cache_manager.take_events() @@ -1556,6 +1555,25 @@ def _select_waiting_queue_for_scheduling(self) -> RequestQueue | None: return self.waiting or self.skipped_waiting or None + def _append_failed_or_rejected_output( + self, + outputs: dict[int, list[EngineCoreOutput]], + request: "Request", + ) -> None: + """ + Appends an EngineCoreOutput for a failed KV load or rejected request. + """ + output = EngineCoreOutput( + request_id=request.request_id, + new_token_ids=[], + finish_reason=request.get_finished_reason(), + events=request.take_events(), + trace_headers=request.trace_headers, + stop_reason=request.stop_reason, + num_cached_tokens=max(0, request.num_cached_tokens), + ) + outputs[request.client_index].append(output) + def _handle_stopped_request(self, request: Request) -> bool: """Return True if finished (can be False for resumable requests).""" if not request.resumable: @@ -1720,6 +1738,12 @@ def add_request(self, request: Request) -> None: # Streaming-input session finished. self.finish_requests(request.request_id, RequestStatus.FINISHED_ABORTED) else: + if len(self.waiting) >= self.max_waiting_queue_length: + request.status = RequestStatus.FINISHED_REJECTED + self.rejected.append(request) + if self.log_stats: + request.record_event(EngineCoreEventType.REJECTED) + return if request.resumable: request.streaming_queue = deque() self._enqueue_waiting_request(request) diff --git a/vllm/v1/engine/__init__.py b/vllm/v1/engine/__init__.py index d76948bc277d..472312bdcc86 100644 --- a/vllm/v1/engine/__init__.py +++ b/vllm/v1/engine/__init__.py @@ -26,7 +26,7 @@ # These are possible values of RequestOutput.finish_reason, # so form part of the external API. -FINISH_REASON_STRINGS = ("stop", "length", "abort", "error", "repetition") +FINISH_REASON_STRINGS = ("stop", "length", "abort", "error", "repetition", "rejected") EEP_NOTIFICATION_CALL_ID = -1 @@ -58,6 +58,7 @@ class FinishReason(enum.IntEnum): ABORT = 2 ERROR = 3 REPETITION = 4 + REJECTED = 5 def __str__(self): return FINISH_REASON_STRINGS[self.value] @@ -116,6 +117,7 @@ class EngineCoreEventType(enum.IntEnum): QUEUED = 1 SCHEDULED = 2 PREEMPTED = 3 + REJECTED = 4 class EngineCoreEvent(msgspec.Struct): diff --git a/vllm/v1/request.py b/vllm/v1/request.py index f2ee33b49f22..2e455b3c47e4 100644 --- a/vllm/v1/request.py +++ b/vllm/v1/request.py @@ -309,6 +309,7 @@ class RequestStatus(enum.IntEnum): FINISHED_IGNORED = enum.auto() FINISHED_ERROR = enum.auto() FINISHED_REPETITION = enum.auto() + FINISHED_REJECTED = enum.auto() def __str__(self) -> str: return self.name @@ -334,4 +335,5 @@ def get_finished_reason(status: "RequestStatus") -> FinishReason | None: RequestStatus.FINISHED_ERROR: FinishReason.ERROR, RequestStatus.WAITING_FOR_STREAMING_REQ: FinishReason.STOP, RequestStatus.FINISHED_REPETITION: FinishReason.REPETITION, + RequestStatus.FINISHED_REJECTED: FinishReason.REJECTED, }