diff --git a/vllm/config/scheduler.py b/vllm/config/scheduler.py index 93002012799a..b22af27049af 100644 --- a/vllm/config/scheduler.py +++ b/vllm/config/scheduler.py @@ -33,6 +33,11 @@ class SchedulerConfig: runner_type: RunnerType = "generate" """The runner type to launch for the model.""" + max_waiting_queue_length: Optional[int] = None + """The maximum number of requests allowed in the waiting queue. + If the waiting queue is full, new requests will be rejected. + If None, the waiting queue size is unlimited. + """ max_num_batched_tokens: SkipValidation[int] = None # type: ignore """Maximum number of tokens to be processed in a single iteration. @@ -300,5 +305,9 @@ def _verify_args(self) -> Self: f"max_long_partial_prefills ({self.max_long_partial_prefills}) " "must be greater than or equal to 1 and less than or equal to " f"max_num_partial_prefills ({self.max_num_partial_prefills}).") - + if self.max_waiting_queue_length is not None and \ + self.max_waiting_queue_length < 1: + raise ValueError( + f"max_waiting_queue_length ({self.max_waiting_queue_length})" + " must be greater than or equal to 1 or None.") return self diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 9841876bc8e8..4b9f954498a9 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -467,6 +467,9 @@ class EngineArgs: kv_sharing_fast_prefill: bool = \ CacheConfig.kv_sharing_fast_prefill + max_waiting_queue_length: Optional[int] = ( + SchedulerConfig.max_waiting_queue_length) + def __post_init__(self): # support `EngineArgs(compilation_config={...})` # without having to manually construct a @@ -848,6 +851,9 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: title="SchedulerConfig", description=SchedulerConfig.__doc__, ) + scheduler_group.add_argument( + "--max-waiting-queue-length", + **scheduler_kwargs["max_waiting_queue_length"]) scheduler_group.add_argument( "--max-num-batched-tokens", **scheduler_kwargs["max_num_batched_tokens"]) @@ -1352,6 +1358,7 @@ def create_engine_config( max_num_batched_tokens=self.max_num_batched_tokens, max_num_seqs=self.max_num_seqs, max_model_len=model_config.max_model_len, + max_waiting_queue_length=self.max_waiting_queue_length, cuda_graph_sizes=self.cuda_graph_sizes, num_lookahead_slots=num_lookahead_slots, delay_factor=self.scheduler_delay_factor, diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index c159bcee315f..be755732f63a 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -681,7 +681,10 @@ async def cancel_responses(response_id: str, raw_request: Request): }, HTTPStatus.INTERNAL_SERVER_ERROR.value: { "model": ErrorResponse - } + }, + HTTPStatus.SERVICE_UNAVAILABLE.value: { + "model": ErrorResponse + }, }) @with_cancellation @load_aware_call @@ -723,6 +726,9 @@ async def create_chat_completion(request: ChatCompletionRequest, HTTPStatus.INTERNAL_SERVER_ERROR.value: { "model": ErrorResponse }, + HTTPStatus.SERVICE_UNAVAILABLE.value: { + "model": ErrorResponse + }, }) @with_cancellation @load_aware_call diff --git a/vllm/entrypoints/openai/serving_chat.py b/vllm/entrypoints/openai/serving_chat.py index 579f6f537ee2..8599cfca3df4 100644 --- a/vllm/entrypoints/openai/serving_chat.py +++ b/vllm/entrypoints/openai/serving_chat.py @@ -332,7 +332,7 @@ async def create_chat_completion( conversation, tokenizer, request_metadata) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) def get_chat_request_role(self, request: ChatCompletionRequest) -> str: if request.add_generation_prompt: @@ -1133,8 +1133,7 @@ async def chat_completion_stream_generator( except Exception as e: # TODO: Use a vllm-specific Validation Error - logger.exception("Error in chat completion stream generator.") - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" # Send the final done message after all response.n are finished yield "data: [DONE]\n\n" @@ -1160,7 +1159,7 @@ async def chat_completion_full_generator( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) assert final_res is not None diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index c2de449a9699..535606df9dcd 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -228,7 +228,7 @@ async def create_completion( generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) result_generator = merge_async_iterators(*generators) @@ -290,7 +290,7 @@ async def create_completion( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) # When user requests streaming but we don't stream, we still need to # return a streaming response with a single event. @@ -485,7 +485,7 @@ async def completion_stream_generator( except Exception as e: # TODO: Use a vllm-specific Validation Error - data = self.create_streaming_error_response(str(e)) + data = self.create_streaming_error_response(e) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index d391cc50ad23..48dca54aa7c9 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -76,6 +76,7 @@ from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer from vllm.utils import (AsyncMicrobatchTokenizer, is_list_of, merge_async_iterators, random_uuid) +from vllm.v1.engine.exceptions import SchedulerWaitingQueueFullError logger = init_logger(__name__) @@ -399,7 +400,7 @@ async def _prepare_generators( except Exception as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) async def _collect_batch( self, @@ -438,10 +439,20 @@ async def _collect_batch( def create_error_response( self, - message: str, + message: Union[str, Exception], err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST, ) -> ErrorResponse: + if isinstance(message, SchedulerWaitingQueueFullError): + return ErrorResponse(error=ErrorInfo( + message=str(message), + type="ServiceUnavailableError", + code=HTTPStatus.SERVICE_UNAVAILABLE.value, + )) + elif isinstance(message, Exception): + message_str = str(message) + else: + message_str = message if self.log_error_stack: exc_type, _, _ = sys.exc_info() if exc_type is not None: @@ -449,11 +460,11 @@ def create_error_response( else: traceback.print_stack() return ErrorResponse(error=ErrorInfo( - message=message, type=err_type, code=status_code.value)) + message=message_str, type=err_type, code=status_code.value)) def create_streaming_error_response( self, - message: str, + message: Union[str, Exception], err_type: str = "BadRequestError", status_code: HTTPStatus = HTTPStatus.BAD_REQUEST, ) -> str: diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 401ba6c53331..6a9d0b80d6c2 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -307,7 +307,7 @@ async def create_responses( generators.append(generator) except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) assert len(generators) == 1 result_generator, = generators @@ -459,7 +459,7 @@ async def responses_full_generator( return self.create_error_response("Client disconnected") except ValueError as e: # TODO: Use a vllm-specific Validation Error - return self.create_error_response(str(e)) + return self.create_error_response(e) if self.use_harmony: assert isinstance(context, HarmonyContext) diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index aa45f6669207..e5ed56c12848 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -28,6 +28,7 @@ from vllm.v1.core.sched.utils import check_stop, remove_all from vllm.v1.engine import (EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs) +from vllm.v1.engine.exceptions import SchedulerWaitingQueueFullError from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.metrics.stats import SchedulerStats from vllm.v1.outputs import DraftTokenIds, KVConnectorOutput, ModelRunnerOutput @@ -1092,6 +1093,10 @@ def get_request_counts(self) -> tuple[int, int]: return len(self.running), len(self.waiting) def add_request(self, request: Request) -> None: + if (self.scheduler_config.max_waiting_queue_length + and len(self.waiting) + >= self.scheduler_config.max_waiting_queue_length): + raise SchedulerWaitingQueueFullError(request_id=request.request_id) self.waiting.add_request(request) self.requests[request.request_id] = request if self.log_stats: diff --git a/vllm/v1/engine/__init__.py b/vllm/v1/engine/__init__.py index dec4abec519b..c97ab9441af1 100644 --- a/vllm/v1/engine/__init__.py +++ b/vllm/v1/engine/__init__.py @@ -142,6 +142,13 @@ class UtilityOutput( result: Optional[UtilityResult] = None +class EngineErrorPayload(msgspec.Struct): + exc_type: str + exc_module: str + exc_args: list + exc_traceback: str + + class EngineCoreOutputs( msgspec.Struct, array_like=True, # type: ignore[call-arg] @@ -168,6 +175,8 @@ class EngineCoreOutputs( # "old" wave, so the next wave needs to be started in other engines. start_wave: Optional[int] = None + engine_error: Optional[EngineErrorPayload] = None + def __post_init__(self): if self.timestamp == 0.0: self.timestamp = time.monotonic() diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index e467f40f6eab..6360609a9e8c 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -35,8 +35,9 @@ from vllm.utils import (Device, as_list, cancel_task_threadsafe, cdiv, deprecate_kwargs) from vllm.v1.engine import EngineCoreRequest -from vllm.v1.engine.core_client import EngineCoreClient -from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError +from vllm.v1.engine.core_client import EngineCoreClient, process_engine_error +from vllm.v1.engine.exceptions import (EngineDeadError, EngineGenerateError, + SchedulerWaitingQueueFullError) from vllm.v1.engine.output_processor import (OutputProcessor, RequestOutputCollector) from vllm.v1.engine.parallel_sampling import ParentRequest @@ -410,13 +411,16 @@ async def generate( if self.log_requests: logger.info("Request %s failed (engine dead).", request_id) raise - + except SchedulerWaitingQueueFullError: + if self.log_requests: + logger.info("Request %s failed (waiting queue full).", + request_id) + raise # Request validation error. except ValueError: if self.log_requests: logger.info("Request %s failed (bad request).", request_id) raise - # Unexpected error in the generate() task (possibly recoverable). except Exception as e: await self.abort(request_id) @@ -442,6 +446,10 @@ async def output_handler(): while True: # 1) Pull EngineCoreOutputs from the EngineCore. outputs = await engine_core.get_output_async() + if outputs.engine_error: + output_processor.propagate_error( + process_engine_error(outputs.engine_error)) + continue num_outputs = len(outputs.outputs) iteration_stats = IterationStats() if ( @@ -574,6 +582,12 @@ async def encode( logger.info("Request %s failed (engine dead).", request_id) raise + except SchedulerWaitingQueueFullError: + if self.log_requests: + logger.info("Request %s failed (waiting queue full).", + request_id) + raise + # Request validation error. except ValueError: if self.log_requests: diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index b46ae72ccdf1..1f7ea2fbace9 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -6,6 +6,7 @@ import signal import threading import time +import traceback from collections import deque from collections.abc import Generator from concurrent.futures import Future @@ -37,7 +38,7 @@ from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.core.sched.scheduler import Scheduler as V1Scheduler from vllm.v1.engine import (EngineCoreOutputs, EngineCoreRequest, - EngineCoreRequestType, + EngineCoreRequestType, EngineErrorPayload, ReconfigureDistributedRequest, ReconfigureRankType, UtilityOutput, UtilityResult) from vllm.v1.engine.utils import (EngineHandshakeMetadata, EngineZmqAddresses, @@ -707,9 +708,11 @@ def signal_handler(signum, frame): set_process_title("EngineCore") decorate_logs() engine_core = EngineCoreProc(*args, **kwargs) - - engine_core.run_busy_loop() - + while True: + try: + engine_core.run_busy_loop() + except ValueError as e: + engine_core._send_engine_error(e) except SystemExit: logger.debug("EngineCore exiting.") raise @@ -824,6 +827,20 @@ def _send_engine_dead(self): logger.fatal("vLLM shutdown signal from EngineCore failed " "to send. Please report this issue.") + def _send_engine_error(self, exc: BaseException): + """Send CustomEngineError status to the EngineCoreClient.""" + + # Put CustomEngineError in the queue. + self.output_queue.put_nowait(( + 0, + EngineCoreOutputs(engine_error=EngineErrorPayload( + exc_type=type(exc).__name__, + exc_module=type(exc).__module__, + exc_args=list(exc.args), + exc_traceback=traceback.format_exc(), + )), + )) + def process_input_sockets(self, input_addresses: list[str], coord_input_address: Optional[str], identity: bytes, ready_event: threading.Event): diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 605bedaf10e6..15d1a922035b 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -26,7 +26,7 @@ from vllm.utils import (close_sockets, get_open_port, get_open_zmq_inproc_path, in_loop, make_zmq_socket) from vllm.v1.engine import (EngineCoreOutputs, EngineCoreRequest, - EngineCoreRequestType, + EngineCoreRequestType, EngineErrorPayload, ReconfigureDistributedRequest, ReconfigureRankType, UtilityOutput) from vllm.v1.engine.coordinator import DPCoordinator @@ -811,6 +811,9 @@ async def process_outputs_socket(): frames = await output_socket.recv_multipart(copy=False) resources.validate_alive(frames) outputs: EngineCoreOutputs = decoder.decode(frames) + if outputs.engine_error: + outputs_queue.put_nowait(outputs) + continue if outputs.utility_output: _process_utility_output(outputs.utility_output, utility_results) @@ -1331,3 +1334,14 @@ async def _scale_down_elastic_ep(self, cur_data_parallel_size: int, logger.info( "[Elastic EP] Scale down completed, new data parallel size: %s", new_data_parallel_size) + + +def process_engine_error(engine_error: EngineErrorPayload) -> Exception: + """Process an engine error payload and raise an exception.""" + try: + module = sys.modules.get(engine_error.exc_module) + exc_class = getattr(module, engine_error.exc_type) + except Exception: + exc_class = RuntimeError # fallback + exc = exc_class(*engine_error.exc_args) + return exc diff --git a/vllm/v1/engine/exceptions.py b/vllm/v1/engine/exceptions.py index 692ba9dc840f..ccd625fb9138 100644 --- a/vllm/v1/engine/exceptions.py +++ b/vllm/v1/engine/exceptions.py @@ -2,6 +2,7 @@ # SPDX-FileCopyrightText: Copyright contributors to the vLLM project class EngineGenerateError(Exception): """Raised when a AsyncLLM.generate() fails. Recoverable.""" + pass @@ -15,3 +16,12 @@ def __init__(self, *args, suppress_context: bool = False, **kwargs): # Make stack trace clearer when using with LLMEngine by # silencing irrelevant ZMQError. self.__suppress_context__ = suppress_context + + +class SchedulerWaitingQueueFullError(ValueError): + """Raised when the scheduler's waiting queue is full and cannot accept + new requests.""" + + def __init__(self, request_id: str): + super().__init__(request_id) + self.request_id = request_id diff --git a/vllm/v1/engine/output_processor.py b/vllm/v1/engine/output_processor.py index 02c8c61cb909..54e4202e0278 100644 --- a/vllm/v1/engine/output_processor.py +++ b/vllm/v1/engine/output_processor.py @@ -17,6 +17,7 @@ from vllm.transformers_utils.tokenizer_group import TokenizerGroup from vllm.v1.engine import EngineCoreOutput, EngineCoreRequest, FinishReason from vllm.v1.engine.detokenizer import IncrementalDetokenizer +from vllm.v1.engine.exceptions import SchedulerWaitingQueueFullError from vllm.v1.engine.logprobs import LogprobsProcessor from vllm.v1.engine.parallel_sampling import ParentRequest from vllm.v1.metrics.stats import (IterationStats, LoRARequestStates, @@ -306,10 +307,16 @@ def has_unfinished_requests(self) -> bool: def propagate_error(self, e: Exception): """Propagate error to all generate() tasks.""" - - for _, state in self.request_states.items(): - assert state.queue is not None - state.queue.put(e) + if isinstance(e, SchedulerWaitingQueueFullError): + request_id = e.request_id + if request_id in self.request_states: + state = self.request_states.get(request_id) + if state and state.queue is not None: + state.queue.put(e) + else: + for _, state in self.request_states.items(): + assert state.queue is not None + state.queue.put(e) def abort_requests( self,