From 53223f2c82611ae8a336b6272734e366730c9b87 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Mon, 18 May 2026 17:01:29 -0500 Subject: [PATCH 1/8] [ROCm][CI] Stabilize 400 error return code for invalid schema inputs Signed-off-by: Andreas Karatzas --- vllm/entrypoints/chat_utils.py | 22 ++++- .../openai/chat_completion/batch_serving.py | 84 +++++++++++-------- .../openai/chat_completion/protocol.py | 4 +- vllm/entrypoints/openai/completion/serving.py | 2 + vllm/entrypoints/openai/engine/serving.py | 10 +++ vllm/entrypoints/serve/disagg/protocol.py | 2 +- vllm/entrypoints/serve/disagg/serving.py | 26 ++++-- vllm/entrypoints/serve/render/serving.py | 49 ++++++----- 8 files changed, 133 insertions(+), 66 deletions(-) diff --git a/vllm/entrypoints/chat_utils.py b/vllm/entrypoints/chat_utils.py index e3b727597d86..c4d5025203be 100644 --- a/vllm/entrypoints/chat_utils.py +++ b/vllm/entrypoints/chat_utils.py @@ -1818,12 +1818,28 @@ def _postprocess_messages(messages: list[ConversationMessage]) -> None: continue for item in tool_calls: + if not isinstance(item, dict): + raise VLLMValidationError( + "assistant tool_calls entries must be objects.", + parameter="tool_calls", + ) + + function = item.get("function") + if item.get("type", "function") != "function" or not isinstance( + function, dict + ): + raise VLLMValidationError( + "chat completions only support assistant tool_calls " + "of type 'function'.", + parameter="tool_calls", + ) + # if arguments is None or empty string, set to {} - if content := item["function"].get("arguments"): + if content := function.get("arguments"): if not isinstance(content, (dict, list)): - item["function"]["arguments"] = json.loads(content) + function["arguments"] = json.loads(content) else: - item["function"]["arguments"] = {} + function["arguments"] = {} def parse_chat_messages( diff --git a/vllm/entrypoints/openai/chat_completion/batch_serving.py b/vllm/entrypoints/openai/chat_completion/batch_serving.py index cc49909b8361..5c28d20ce419 100644 --- a/vllm/entrypoints/openai/chat_completion/batch_serving.py +++ b/vllm/entrypoints/openai/chat_completion/batch_serving.py @@ -7,6 +7,7 @@ from http import HTTPStatus from fastapi import Request +from pydantic import ValidationError from vllm.entrypoints.chat_utils import ConversationMessage from vllm.entrypoints.openai.chat_completion.protocol import ( @@ -81,21 +82,24 @@ async def render_batch_chat_request( all_engine_prompts: list[EngineInput] = [] for messages in request.messages: - single_request = request.to_chat_completion_request(messages) - if render.use_harmony: - conversation, engine_prompts = render._make_request_with_harmony( - single_request, should_include_tools=tool_dicts is not None - ) - else: - conversation, engine_prompts = await render.preprocess_chat( - single_request, - messages, - default_template=render.chat_template, - default_template_content_format=render.chat_template_content_format, - default_template_kwargs=render.default_chat_template_kwargs, - tool_dicts=tool_dicts, - tool_parser=tool_parser, - ) + try: + single_request = request.to_chat_completion_request(messages) + if render.use_harmony: + conversation, engine_prompts = render._make_request_with_harmony( + single_request, should_include_tools=tool_dicts is not None + ) + else: + conversation, engine_prompts = await render.preprocess_chat( + single_request, + messages, + default_template=render.chat_template, + default_template_content_format=render.chat_template_content_format, + default_template_kwargs=render.default_chat_template_kwargs, + tool_dicts=tool_dicts, + tool_parser=tool_parser, + ) + except (ValidationError, ValueError) as e: + return self.create_error_response(e) all_conversations.append(conversation) all_engine_prompts.append(engine_prompts[0]) @@ -114,10 +118,13 @@ async def create_batch_chat_completion( """ tokenizer = self.renderer.tokenizer assert tokenizer is not None - single_requests = [ - request.to_chat_completion_request(messages) - for messages in request.messages - ] + try: + single_requests = [ + request.to_chat_completion_request(messages) + for messages in request.messages + ] + except (ValidationError, ValueError) as e: + return self.create_error_response(e) reasoning_parser: ReasoningParser | None = None if self.reasoning_parser_cls: @@ -149,19 +156,22 @@ async def create_batch_chat_completion( generators: list[AsyncGenerator[RequestOutput, None]] = [] for i, engine_prompt in enumerate(engine_prompts): sub_request_id = f"{request_id}_{i}" - max_tokens = get_max_tokens( - max_model_len, - request.max_completion_tokens - if request.max_completion_tokens is not None - else request.max_tokens, - self._extract_prompt_len(engine_prompt), - self.default_sampling_params, - self.override_max_tokens, - ) - single_request = single_requests[i] - sampling_params = single_request.to_sampling_params( - max_tokens, self.default_sampling_params - ) + try: + max_tokens = get_max_tokens( + max_model_len, + request.max_completion_tokens + if request.max_completion_tokens is not None + else request.max_tokens, + self._extract_prompt_len(engine_prompt), + self.default_sampling_params, + self.override_max_tokens, + ) + single_request = single_requests[i] + sampling_params = single_request.to_sampling_params( + max_tokens, self.default_sampling_params + ) + except ValueError as e: + return self.create_error_response(e) self._log_inputs( sub_request_id, engine_prompt, @@ -218,14 +228,14 @@ async def chat_completion_full_generator_batch( ``check_batch_mode`` validator, so neither needs to be handled here. """ created_time = int(time.time()) - role = self.get_chat_request_role(request) # type: ignore[arg-type] - final_results: dict[int, RequestOutput] = {} try: async for prompt_idx, res in merge_async_iterators(*generators): final_results[prompt_idx] = res except asyncio.CancelledError: return self.create_error_response("Client disconnected") + except ValueError as e: + return self.create_error_response(e) choices: list[ChatCompletionResponseChoice] = [] total_prompt_tokens = 0 @@ -275,6 +285,12 @@ async def chat_completion_full_generator_batch( reasoning = None content = output.text + role = ( + self.response_role + if request.add_generation_prompt + else request.messages[prompt_idx][-1]["role"] + ) + message = ChatMessage(role=role, reasoning=reasoning, content=content) if request.echo: diff --git a/vllm/entrypoints/openai/chat_completion/protocol.py b/vllm/entrypoints/openai/chat_completion/protocol.py index cf843f6f1d3f..57e7a4321462 100644 --- a/vllm/entrypoints/openai/chat_completion/protocol.py +++ b/vllm/entrypoints/openai/chat_completion/protocol.py @@ -886,7 +886,9 @@ class BatchChatCompletionRequest(OpenAIBaseModel): - The ``n`` parameter must be 1 (or omitted). """ - messages: list[list[ChatCompletionMessageParam]] = Field(..., min_length=1) + messages: list[Annotated[list[ChatCompletionMessageParam], Field(min_length=1)]] = ( + Field(..., min_length=1) + ) model: str | None = None # Shared sampling / generation fields — mirror ChatCompletionRequest. diff --git a/vllm/entrypoints/openai/completion/serving.py b/vllm/entrypoints/openai/completion/serving.py index bfe9fb226762..370deeb56de8 100644 --- a/vllm/entrypoints/openai/completion/serving.py +++ b/vllm/entrypoints/openai/completion/serving.py @@ -259,6 +259,8 @@ async def _create_completion( ) except asyncio.CancelledError: return self.create_error_response("Client disconnected") + except ValueError as e: + return self.create_error_response(e) # When user requests streaming but we don't stream, we still need to # return a streaming response with a single event. diff --git a/vllm/entrypoints/openai/engine/serving.py b/vllm/entrypoints/openai/engine/serving.py index 6152f915cc3a..c0cbac46f831 100644 --- a/vllm/entrypoints/openai/engine/serving.py +++ b/vllm/entrypoints/openai/engine/serving.py @@ -9,6 +9,7 @@ from http import HTTPStatus from typing import Any, ClassVar, Generic, Protocol, TypeAlias, TypeVar +import msgspec import numpy as np from fastapi import Request from openai.types.responses import ToolChoiceFunction @@ -629,6 +630,15 @@ async def _with_kv_transfer_rejection_cleanup( """Wrap a `create_*` coroutine so that, if it raises or returns an ErrorResponse (i.e. the request never reached the engine), the KV connector is notified to free any pinned remote-prefill blocks.""" + if request.kv_transfer_params is not None: + try: + msgspec.msgpack.encode(request.kv_transfer_params) + except (OverflowError, TypeError, ValueError) as e: + close = getattr(awaitable, "close", None) + if close is not None: + close() + return self.create_error_response(e) # type: ignore[return-value] + kv_transfer_params = self.has_kv_connector and request.kv_transfer_params if not kv_transfer_params or not kv_transfer_params.get("do_remote_prefill"): return await awaitable diff --git a/vllm/entrypoints/serve/disagg/protocol.py b/vllm/entrypoints/serve/disagg/protocol.py index 7d468d5d9284..60d2a6424a00 100644 --- a/vllm/entrypoints/serve/disagg/protocol.py +++ b/vllm/entrypoints/serve/disagg/protocol.py @@ -68,7 +68,7 @@ class GenerateRequest(BaseModel): "through out the inference process and return in response." ), ) - token_ids: list[int] + token_ids: list[int] = Field(min_length=1) """The token ids to generate text from.""" @field_validator("token_ids") diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index 0edfa6148711..45dcaf935c57 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -125,6 +125,14 @@ async def serve_tokens( if raw_request: raw_request.state.request_metadata = request_metadata + sampling_params = request.sampling_params + max_num_seqs = self.engine_client.vllm_config.scheduler_config.max_num_seqs + if sampling_params.n > max_num_seqs: + return self.create_error_response( + f"sampling_params.n must be at most the server's max_num_seqs " + f"({max_num_seqs}), got {sampling_params.n}." + ) + engine_input: EngineInput if features := request.features: # Convert PlaceholderRangeInfo → PlaceholderRange per modality. @@ -155,16 +163,20 @@ async def serve_tokens( cache_salt=request.cache_salt, ) else: - (engine_input,) = await self.openai_serving_render.preprocess_completion( - request, - prompt_input=request.token_ids, - prompt_embeds=None, - skip_mm_cache=True, - ) + try: + ( + engine_input, + ) = await self.openai_serving_render.preprocess_completion( + request, + prompt_input=request.token_ids, + prompt_embeds=None, + skip_mm_cache=True, + ) + except ValueError as e: + return self.create_error_response(e) # Schedule the request and get the result generator. result_generator: AsyncGenerator[RequestOutput, None] | None = None - sampling_params = request.sampling_params # Apply server-side ``max_tokens`` defaulting when the client did # not set it, matching the OpenAI-compat endpoints. ``SamplingParams`` diff --git a/vllm/entrypoints/serve/render/serving.py b/vllm/entrypoints/serve/render/serving.py index 782b2eaea24b..889d89ea159c 100644 --- a/vllm/entrypoints/serve/render/serving.py +++ b/vllm/entrypoints/serve/render/serving.py @@ -247,23 +247,29 @@ async def render_chat( if error_check_ret is not None: return error_check_ret - conversation, engine_inputs = await self.preprocess_chat( - request, - request.messages, - default_template=self.chat_template, - default_template_content_format=self.chat_template_content_format, - default_template_kwargs=self.default_chat_template_kwargs, - tool_dicts=tool_dicts, - tool_parser=tool_parser, - skip_mm_cache=skip_mm_cache, - reasoning_parser=self.reasoning_parser, - ) + try: + conversation, engine_inputs = await self.preprocess_chat( + request, + request.messages, + default_template=self.chat_template, + default_template_content_format=self.chat_template_content_format, + default_template_kwargs=self.default_chat_template_kwargs, + tool_dicts=tool_dicts, + tool_parser=tool_parser, + skip_mm_cache=skip_mm_cache, + reasoning_parser=self.reasoning_parser, + ) + except ValueError as e: + return self.create_error_response(e) else: # For GPT-OSS. should_include_tools = tool_dicts is not None - conversation, engine_inputs = self._make_request_with_harmony( - request, should_include_tools - ) + try: + conversation, engine_inputs = self._make_request_with_harmony( + request, should_include_tools + ) + except ValueError as e: + return self.create_error_response(e) return conversation, engine_inputs @@ -346,12 +352,15 @@ async def render_completion( "prompt_logprobs is not compatible with prompt embeds." ) - engine_inputs = await self.preprocess_completion( - request, - prompt_input=request.prompt, - prompt_embeds=request.prompt_embeds, - skip_mm_cache=skip_mm_cache, - ) + try: + engine_inputs = await self.preprocess_completion( + request, + prompt_input=request.prompt, + prompt_embeds=request.prompt_embeds, + skip_mm_cache=skip_mm_cache, + ) + except ValueError as e: + return self.create_error_response(e) return engine_inputs From 01e5eac3c0b5327701cba39f02c6855d48c56ec4 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Tue, 19 May 2026 04:04:40 -0500 Subject: [PATCH 2/8] [ROCm][CI] Stabilize 400 error return code for invalid schema inputs Signed-off-by: Andreas Karatzas --- .../openai/chat_completion/batch_serving.py | 76 ++++++++----------- vllm/entrypoints/openai/completion/serving.py | 2 - vllm/entrypoints/serve/disagg/serving.py | 17 ++--- vllm/entrypoints/serve/render/serving.py | 49 +++++------- 4 files changed, 58 insertions(+), 86 deletions(-) diff --git a/vllm/entrypoints/openai/chat_completion/batch_serving.py b/vllm/entrypoints/openai/chat_completion/batch_serving.py index 5c28d20ce419..0dfcdd925158 100644 --- a/vllm/entrypoints/openai/chat_completion/batch_serving.py +++ b/vllm/entrypoints/openai/chat_completion/batch_serving.py @@ -7,7 +7,6 @@ from http import HTTPStatus from fastapi import Request -from pydantic import ValidationError from vllm.entrypoints.chat_utils import ConversationMessage from vllm.entrypoints.openai.chat_completion.protocol import ( @@ -82,24 +81,21 @@ async def render_batch_chat_request( all_engine_prompts: list[EngineInput] = [] for messages in request.messages: - try: - single_request = request.to_chat_completion_request(messages) - if render.use_harmony: - conversation, engine_prompts = render._make_request_with_harmony( - single_request, should_include_tools=tool_dicts is not None - ) - else: - conversation, engine_prompts = await render.preprocess_chat( - single_request, - messages, - default_template=render.chat_template, - default_template_content_format=render.chat_template_content_format, - default_template_kwargs=render.default_chat_template_kwargs, - tool_dicts=tool_dicts, - tool_parser=tool_parser, - ) - except (ValidationError, ValueError) as e: - return self.create_error_response(e) + single_request = request.to_chat_completion_request(messages) + if render.use_harmony: + conversation, engine_prompts = render._make_request_with_harmony( + single_request, should_include_tools=tool_dicts is not None + ) + else: + conversation, engine_prompts = await render.preprocess_chat( + single_request, + messages, + default_template=render.chat_template, + default_template_content_format=render.chat_template_content_format, + default_template_kwargs=render.default_chat_template_kwargs, + tool_dicts=tool_dicts, + tool_parser=tool_parser, + ) all_conversations.append(conversation) all_engine_prompts.append(engine_prompts[0]) @@ -118,13 +114,10 @@ async def create_batch_chat_completion( """ tokenizer = self.renderer.tokenizer assert tokenizer is not None - try: - single_requests = [ - request.to_chat_completion_request(messages) - for messages in request.messages - ] - except (ValidationError, ValueError) as e: - return self.create_error_response(e) + single_requests = [ + request.to_chat_completion_request(messages) + for messages in request.messages + ] reasoning_parser: ReasoningParser | None = None if self.reasoning_parser_cls: @@ -156,22 +149,19 @@ async def create_batch_chat_completion( generators: list[AsyncGenerator[RequestOutput, None]] = [] for i, engine_prompt in enumerate(engine_prompts): sub_request_id = f"{request_id}_{i}" - try: - max_tokens = get_max_tokens( - max_model_len, - request.max_completion_tokens - if request.max_completion_tokens is not None - else request.max_tokens, - self._extract_prompt_len(engine_prompt), - self.default_sampling_params, - self.override_max_tokens, - ) - single_request = single_requests[i] - sampling_params = single_request.to_sampling_params( - max_tokens, self.default_sampling_params - ) - except ValueError as e: - return self.create_error_response(e) + max_tokens = get_max_tokens( + max_model_len, + request.max_completion_tokens + if request.max_completion_tokens is not None + else request.max_tokens, + self._extract_prompt_len(engine_prompt), + self.default_sampling_params, + self.override_max_tokens, + ) + single_request = single_requests[i] + sampling_params = single_request.to_sampling_params( + max_tokens, self.default_sampling_params + ) self._log_inputs( sub_request_id, engine_prompt, @@ -234,8 +224,6 @@ async def chat_completion_full_generator_batch( final_results[prompt_idx] = res except asyncio.CancelledError: return self.create_error_response("Client disconnected") - except ValueError as e: - return self.create_error_response(e) choices: list[ChatCompletionResponseChoice] = [] total_prompt_tokens = 0 diff --git a/vllm/entrypoints/openai/completion/serving.py b/vllm/entrypoints/openai/completion/serving.py index 370deeb56de8..bfe9fb226762 100644 --- a/vllm/entrypoints/openai/completion/serving.py +++ b/vllm/entrypoints/openai/completion/serving.py @@ -259,8 +259,6 @@ async def _create_completion( ) except asyncio.CancelledError: return self.create_error_response("Client disconnected") - except ValueError as e: - return self.create_error_response(e) # When user requests streaming but we don't stream, we still need to # return a streaming response with a single event. diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index 45dcaf935c57..d7f76adce553 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -163,17 +163,12 @@ async def serve_tokens( cache_salt=request.cache_salt, ) else: - try: - ( - engine_input, - ) = await self.openai_serving_render.preprocess_completion( - request, - prompt_input=request.token_ids, - prompt_embeds=None, - skip_mm_cache=True, - ) - except ValueError as e: - return self.create_error_response(e) + (engine_input,) = await self.openai_serving_render.preprocess_completion( + request, + prompt_input=request.token_ids, + prompt_embeds=None, + skip_mm_cache=True, + ) # Schedule the request and get the result generator. result_generator: AsyncGenerator[RequestOutput, None] | None = None diff --git a/vllm/entrypoints/serve/render/serving.py b/vllm/entrypoints/serve/render/serving.py index 889d89ea159c..782b2eaea24b 100644 --- a/vllm/entrypoints/serve/render/serving.py +++ b/vllm/entrypoints/serve/render/serving.py @@ -247,29 +247,23 @@ async def render_chat( if error_check_ret is not None: return error_check_ret - try: - conversation, engine_inputs = await self.preprocess_chat( - request, - request.messages, - default_template=self.chat_template, - default_template_content_format=self.chat_template_content_format, - default_template_kwargs=self.default_chat_template_kwargs, - tool_dicts=tool_dicts, - tool_parser=tool_parser, - skip_mm_cache=skip_mm_cache, - reasoning_parser=self.reasoning_parser, - ) - except ValueError as e: - return self.create_error_response(e) + conversation, engine_inputs = await self.preprocess_chat( + request, + request.messages, + default_template=self.chat_template, + default_template_content_format=self.chat_template_content_format, + default_template_kwargs=self.default_chat_template_kwargs, + tool_dicts=tool_dicts, + tool_parser=tool_parser, + skip_mm_cache=skip_mm_cache, + reasoning_parser=self.reasoning_parser, + ) else: # For GPT-OSS. should_include_tools = tool_dicts is not None - try: - conversation, engine_inputs = self._make_request_with_harmony( - request, should_include_tools - ) - except ValueError as e: - return self.create_error_response(e) + conversation, engine_inputs = self._make_request_with_harmony( + request, should_include_tools + ) return conversation, engine_inputs @@ -352,15 +346,12 @@ async def render_completion( "prompt_logprobs is not compatible with prompt embeds." ) - try: - engine_inputs = await self.preprocess_completion( - request, - prompt_input=request.prompt, - prompt_embeds=request.prompt_embeds, - skip_mm_cache=skip_mm_cache, - ) - except ValueError as e: - return self.create_error_response(e) + engine_inputs = await self.preprocess_completion( + request, + prompt_input=request.prompt, + prompt_embeds=request.prompt_embeds, + skip_mm_cache=skip_mm_cache, + ) return engine_inputs From 41abe0745de0124c756dff533a37d9465a2218b3 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Tue, 19 May 2026 13:14:17 -0500 Subject: [PATCH 3/8] [ROCm][CI] Stabilize 400 error return code for invalid schema inputs Signed-off-by: Andreas Karatzas --- vllm/entrypoints/serve/disagg/serving.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index 37aa3e9a2f55..b48e072ab676 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -8,6 +8,7 @@ from collections.abc import AsyncGenerator from collections.abc import Sequence as GenericSequence +import msgspec import numpy as np import pybase64 as base64 from fastapi import Request @@ -132,6 +133,10 @@ async def serve_tokens( f"sampling_params.n must be at most the server's max_num_seqs " f"({max_num_seqs}), got {sampling_params.n}." ) + try: + msgspec.msgpack.encode(sampling_params) + except (OverflowError, TypeError, ValueError) as e: + return self.create_error_response(e) engine_input: EngineInput if features := request.features: @@ -467,12 +472,10 @@ def _create_tokens_logprobs( logprob=max(step_token.logprob, -9999.0), top_logprobs=[ ChatCompletionLogProb( - token=f"token_id:{token_id}", - logprob=max(logprob.logprob, -9999.0), - ) - for i, (token_id, logprob) in enumerate( - step_top_logprobs.items() + token=token, + logprob=max(p[1].logprob, -9999.0), ) + for i, p in enumerate(step_top_logprobs.items()) if num_output_top_logprobs is not None and i < max(num_output_top_logprobs, 1) ], From d596a9f59dcb17812c7ae249dc7af04cc99cd25e Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Tue, 19 May 2026 21:32:52 -0500 Subject: [PATCH 4/8] [ROCm][CI] Stabilize 400 error return code for invalid schema inputs Signed-off-by: Andreas Karatzas --- tests/entrypoints/serve/disagg/test_generate_stream.py | 9 +++++++++ vllm/entrypoints/serve/disagg/serving.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/entrypoints/serve/disagg/test_generate_stream.py b/tests/entrypoints/serve/disagg/test_generate_stream.py index 49349bf4ba50..ac5b8bcd9158 100644 --- a/tests/entrypoints/serve/disagg/test_generate_stream.py +++ b/tests/entrypoints/serve/disagg/test_generate_stream.py @@ -69,10 +69,16 @@ class MockParallelConfig: _api_process_rank: int = 0 +@dataclass +class MockSchedulerConfig: + max_num_seqs: int = 128 + + @dataclass class MockVllmConfig: model_config: MockModelConfig parallel_config: MockParallelConfig + scheduler_config: MockSchedulerConfig = field(default_factory=MockSchedulerConfig) def _build_renderer(model_config: MockModelConfig): @@ -149,6 +155,9 @@ def _mock_engine() -> MagicMock: engine = MagicMock(spec=AsyncLLM) engine.errored = False engine.model_config = MockModelConfig() + engine.vllm_config = MockVllmConfig( + engine.model_config, parallel_config=MockParallelConfig() + ) engine.input_processor = MagicMock() engine.renderer = _build_renderer(engine.model_config) return engine diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index b48e072ab676..1793eb4e8ad1 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -472,7 +472,7 @@ def _create_tokens_logprobs( logprob=max(step_token.logprob, -9999.0), top_logprobs=[ ChatCompletionLogProb( - token=token, + token=f"token_id:{p[0]}", logprob=max(p[1].logprob, -9999.0), ) for i, p in enumerate(step_top_logprobs.items()) From 80e73ce442f225d0c38d7b052a7e3f207035b839 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Wed, 20 May 2026 00:42:52 -0500 Subject: [PATCH 5/8] [ROCm][CI] Stabilize 400 error return code for invalid schema inputs Signed-off-by: Andreas Karatzas --- tests/entrypoints/openai/test_openai_schema.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/entrypoints/openai/test_openai_schema.py b/tests/entrypoints/openai/test_openai_schema.py index d67c0ccd4251..56e4e9baf2e8 100644 --- a/tests/entrypoints/openai/test_openai_schema.py +++ b/tests/entrypoints/openai/test_openai_schema.py @@ -118,7 +118,9 @@ def no_invalid_types(case: schemathesis.models.Case): # the default filtered-vs-good ratio. The filter is intentional, so # suppress the health check rather than drop the filter — dropping it # exposes pre-existing server bugs out of scope here. - suppress_health_check=[HealthCheck.filter_too_much], + # The same nested schema can also trip Hypothesis' entropy budget while + # generating large-but-valid request bodies before vLLM is called. + suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.data_too_large], ) def test_openapi_stateless(case: Case): key = ( From 0f205a909e2acde8e1770fb4b98b49ac4b0c8269 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Sat, 23 May 2026 18:19:22 -0500 Subject: [PATCH 6/8] Reverting bad merge Signed-off-by: Andreas Karatzas --- vllm/entrypoints/serve/disagg/serving.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index 994dbed5c5b0..0cc227ee74db 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -476,10 +476,12 @@ def _create_tokens_logprobs( logprob=max(step_token.logprob, -9999.0), top_logprobs=[ ChatCompletionLogProb( - token=f"token_id:{p[0]}", - logprob=max(p[1].logprob, -9999.0), + token=f"token_id:{token_id}", + logprob=max(logprob.logprob, -9999.0), + ) + for i, (token_id, logprob) in enumerate( + step_top_logprobs.items() ) - for i, p in enumerate(step_top_logprobs.items()) if num_output_top_logprobs is not None and i < max(num_output_top_logprobs, 1) ], From ef7bea373379c2df1f333ef3165becfb495089f2 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Sat, 23 May 2026 23:29:15 -0500 Subject: [PATCH 7/8] Honor shutdown timeouts and update weight-transfer mocks Signed-off-by: Andreas Karatzas --- .../entrypoints/openai/completion/test_shutdown.py | 14 ++++++++++++-- .../weight_transfer/test_weight_transfer_llm.py | 8 ++++---- vllm/entrypoints/openai/api_server.py | 2 +- vllm/v1/utils.py | 7 +++---- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/tests/entrypoints/openai/completion/test_shutdown.py b/tests/entrypoints/openai/completion/test_shutdown.py index bc929cf992ab..82a18c24eaa0 100644 --- a/tests/entrypoints/openai/completion/test_shutdown.py +++ b/tests/entrypoints/openai/completion/test_shutdown.py @@ -26,6 +26,9 @@ _PROCESS_EXIT_TIMEOUT = 15 _SHUTDOWN_DETECTION_TIMEOUT = 10 _CHILD_CLEANUP_TIMEOUT = 10 +_INFLIGHT_REQUEST_START_TIMEOUT = 5 +_INFLIGHT_REQUEST_POLL_INTERVAL = 0.1 +_ABORT_CLIENT_TIMEOUT = 3 def _get_child_pids(parent_pid: int) -> list[int]: @@ -71,6 +74,7 @@ class ShutdownState: requests_after_sigterm: int = 0 aborted_requests: int = 0 connection_errors: int = 0 + inflight_requests: int = 0 stop_requesting: bool = False errors: list[str] = field(default_factory=list) @@ -86,6 +90,7 @@ async def _concurrent_request_loop( async def single_request(): while not state.stop_requesting: try: + state.inflight_requests += 1 response = await client.completions.create( model=MODEL_NAME, prompt="Write a story: ", @@ -110,6 +115,8 @@ async def single_request(): except Exception as e: state.errors.append(f"Unexpected error: {e}") break + finally: + state.inflight_requests -= 1 await asyncio.sleep(0.01) tasks = [asyncio.create_task(single_request()) for _ in range(concurrency)] @@ -392,7 +399,7 @@ async def test_abort_timeout_fails_inflight_requests(): ] with RemoteOpenAIServer(MODEL_NAME, server_args) as remote_server: - client = remote_server.get_async_client() + client = remote_server.get_async_client(timeout=_ABORT_CLIENT_TIMEOUT) proc = remote_server.proc child_pids = _get_child_pids(proc.pid) @@ -403,7 +410,10 @@ async def test_abort_timeout_fails_inflight_requests(): _concurrent_request_loop(client, state, sigterm_sent, concurrency=10) ) - await asyncio.sleep(0.5) + deadline = time.time() + _INFLIGHT_REQUEST_START_TIMEOUT + while state.inflight_requests == 0 and time.time() < deadline: + await asyncio.sleep(_INFLIGHT_REQUEST_POLL_INTERVAL) + assert state.inflight_requests > 0 proc.send_signal(signal.SIGTERM) sigterm_sent.set() diff --git a/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py b/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py index aea4c523eca8..6c6269865075 100644 --- a/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py +++ b/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py @@ -63,8 +63,8 @@ class MockWeightTransferEngine(WeightTransferEngine[MockInitInfo, MockUpdateInfo last_init_info: MockInitInfo | None = None last_update_info: MockUpdateInfo | None = None - def __init__(self, config, parallel_config): - super().__init__(config, parallel_config) + def __init__(self, config, parallel_config, model): + super().__init__(config, parallel_config, model) # Reset tracking on init MockWeightTransferEngine.init_transfer_engine_called = False MockWeightTransferEngine.receive_weights_called = False @@ -95,9 +95,9 @@ def trainer_send_weights(self, *args, **kwargs): pass -def mock_create_engine(config, parallel_config): +def mock_create_engine(config, parallel_config, model): """Mock factory function that returns our mock engine.""" - return MockWeightTransferEngine(config, parallel_config) + return MockWeightTransferEngine(config, parallel_config, model) # --- Tests --- diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index fc315a45658b..461128ed9053 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -151,7 +151,7 @@ async def build_async_engine_client_from_engine_args( yield async_llm finally: if async_llm: - async_llm.shutdown() + async_llm.shutdown(timeout=vllm_config.shutdown_timeout) def build_app( diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index ba1db3282216..afa621ae54d4 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -474,10 +474,9 @@ def shutdown(procs: list[BaseProcess], timeout: float | None = None) -> None: timeout: Maximum time in seconds to wait for graceful shutdown """ if timeout is None: - timeout = 0.0 - - # Allow at least 5 seconds for remaining procs to terminate. - timeout = max(timeout, 5.0) + # Keep a small grace period for best-effort cleanup paths that do not + # have a user-configured shutdown timeout. + timeout = 5.0 # Shutdown the process. for proc in procs: From 5f133567ae1fd4d408e3e302179efbd059a06230 Mon Sep 17 00:00:00 2001 From: Andreas Karatzas Date: Sun, 24 May 2026 02:51:19 -0500 Subject: [PATCH 8/8] Removing eager check Signed-off-by: Andreas Karatzas --- vllm/entrypoints/openai/engine/serving.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/vllm/entrypoints/openai/engine/serving.py b/vllm/entrypoints/openai/engine/serving.py index 9ccef79c2c22..ff67575fcc6c 100644 --- a/vllm/entrypoints/openai/engine/serving.py +++ b/vllm/entrypoints/openai/engine/serving.py @@ -8,7 +8,6 @@ from http import HTTPStatus from typing import Any, ClassVar, Generic, Protocol, TypeAlias, TypeVar -import msgspec from fastapi import Request from openai.types.responses import ToolChoiceFunction from pydantic import ConfigDict, TypeAdapter, ValidationError @@ -427,15 +426,6 @@ async def _with_kv_transfer_rejection_cleanup( """Wrap a `create_*` coroutine so that, if it raises or returns an ErrorResponse (i.e. the request never reached the engine), the KV connector is notified to free any pinned remote-prefill blocks.""" - if request.kv_transfer_params is not None: - try: - msgspec.msgpack.encode(request.kv_transfer_params) - except (OverflowError, TypeError, ValueError) as e: - close = getattr(awaitable, "close", None) - if close is not None: - close() - return self.create_error_response(e) # type: ignore[return-value] - kv_transfer_params = self.has_kv_connector and request.kv_transfer_params if not kv_transfer_params or not kv_transfer_params.get("do_remote_prefill"): return await awaitable