diff --git a/tests/entrypoints/openai/completion/test_shutdown.py b/tests/entrypoints/openai/completion/test_shutdown.py index bc929cf992ab..82a18c24eaa0 100644 --- a/tests/entrypoints/openai/completion/test_shutdown.py +++ b/tests/entrypoints/openai/completion/test_shutdown.py @@ -26,6 +26,9 @@ _PROCESS_EXIT_TIMEOUT = 15 _SHUTDOWN_DETECTION_TIMEOUT = 10 _CHILD_CLEANUP_TIMEOUT = 10 +_INFLIGHT_REQUEST_START_TIMEOUT = 5 +_INFLIGHT_REQUEST_POLL_INTERVAL = 0.1 +_ABORT_CLIENT_TIMEOUT = 3 def _get_child_pids(parent_pid: int) -> list[int]: @@ -71,6 +74,7 @@ class ShutdownState: requests_after_sigterm: int = 0 aborted_requests: int = 0 connection_errors: int = 0 + inflight_requests: int = 0 stop_requesting: bool = False errors: list[str] = field(default_factory=list) @@ -86,6 +90,7 @@ async def _concurrent_request_loop( async def single_request(): while not state.stop_requesting: try: + state.inflight_requests += 1 response = await client.completions.create( model=MODEL_NAME, prompt="Write a story: ", @@ -110,6 +115,8 @@ async def single_request(): except Exception as e: state.errors.append(f"Unexpected error: {e}") break + finally: + state.inflight_requests -= 1 await asyncio.sleep(0.01) tasks = [asyncio.create_task(single_request()) for _ in range(concurrency)] @@ -392,7 +399,7 @@ async def test_abort_timeout_fails_inflight_requests(): ] with RemoteOpenAIServer(MODEL_NAME, server_args) as remote_server: - client = remote_server.get_async_client() + client = remote_server.get_async_client(timeout=_ABORT_CLIENT_TIMEOUT) proc = remote_server.proc child_pids = _get_child_pids(proc.pid) @@ -403,7 +410,10 @@ async def test_abort_timeout_fails_inflight_requests(): _concurrent_request_loop(client, state, sigterm_sent, concurrency=10) ) - await asyncio.sleep(0.5) + deadline = time.time() + _INFLIGHT_REQUEST_START_TIMEOUT + while state.inflight_requests == 0 and time.time() < deadline: + await asyncio.sleep(_INFLIGHT_REQUEST_POLL_INTERVAL) + assert state.inflight_requests > 0 proc.send_signal(signal.SIGTERM) sigterm_sent.set() diff --git a/tests/entrypoints/openai/test_openai_schema.py b/tests/entrypoints/openai/test_openai_schema.py index d67c0ccd4251..56e4e9baf2e8 100644 --- a/tests/entrypoints/openai/test_openai_schema.py +++ b/tests/entrypoints/openai/test_openai_schema.py @@ -118,7 +118,9 @@ def no_invalid_types(case: schemathesis.models.Case): # the default filtered-vs-good ratio. The filter is intentional, so # suppress the health check rather than drop the filter — dropping it # exposes pre-existing server bugs out of scope here. - suppress_health_check=[HealthCheck.filter_too_much], + # The same nested schema can also trip Hypothesis' entropy budget while + # generating large-but-valid request bodies before vLLM is called. + suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.data_too_large], ) def test_openapi_stateless(case: Case): key = ( diff --git a/tests/entrypoints/serve/disagg/test_generate_stream.py b/tests/entrypoints/serve/disagg/test_generate_stream.py index 49349bf4ba50..ac5b8bcd9158 100644 --- a/tests/entrypoints/serve/disagg/test_generate_stream.py +++ b/tests/entrypoints/serve/disagg/test_generate_stream.py @@ -69,10 +69,16 @@ class MockParallelConfig: _api_process_rank: int = 0 +@dataclass +class MockSchedulerConfig: + max_num_seqs: int = 128 + + @dataclass class MockVllmConfig: model_config: MockModelConfig parallel_config: MockParallelConfig + scheduler_config: MockSchedulerConfig = field(default_factory=MockSchedulerConfig) def _build_renderer(model_config: MockModelConfig): @@ -149,6 +155,9 @@ def _mock_engine() -> MagicMock: engine = MagicMock(spec=AsyncLLM) engine.errored = False engine.model_config = MockModelConfig() + engine.vllm_config = MockVllmConfig( + engine.model_config, parallel_config=MockParallelConfig() + ) engine.input_processor = MagicMock() engine.renderer = _build_renderer(engine.model_config) return engine diff --git a/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py b/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py index aea4c523eca8..6c6269865075 100644 --- a/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py +++ b/tests/entrypoints/weight_transfer/test_weight_transfer_llm.py @@ -63,8 +63,8 @@ class MockWeightTransferEngine(WeightTransferEngine[MockInitInfo, MockUpdateInfo last_init_info: MockInitInfo | None = None last_update_info: MockUpdateInfo | None = None - def __init__(self, config, parallel_config): - super().__init__(config, parallel_config) + def __init__(self, config, parallel_config, model): + super().__init__(config, parallel_config, model) # Reset tracking on init MockWeightTransferEngine.init_transfer_engine_called = False MockWeightTransferEngine.receive_weights_called = False @@ -95,9 +95,9 @@ def trainer_send_weights(self, *args, **kwargs): pass -def mock_create_engine(config, parallel_config): +def mock_create_engine(config, parallel_config, model): """Mock factory function that returns our mock engine.""" - return MockWeightTransferEngine(config, parallel_config) + return MockWeightTransferEngine(config, parallel_config, model) # --- Tests --- diff --git a/vllm/entrypoints/chat_utils.py b/vllm/entrypoints/chat_utils.py index e4e44bc01a12..35256bc647d2 100644 --- a/vllm/entrypoints/chat_utils.py +++ b/vllm/entrypoints/chat_utils.py @@ -1818,12 +1818,28 @@ def _postprocess_messages(messages: list[ConversationMessage]) -> None: continue for item in tool_calls: + if not isinstance(item, dict): + raise VLLMValidationError( + "assistant tool_calls entries must be objects.", + parameter="tool_calls", + ) + + function = item.get("function") + if item.get("type", "function") != "function" or not isinstance( + function, dict + ): + raise VLLMValidationError( + "chat completions only support assistant tool_calls " + "of type 'function'.", + parameter="tool_calls", + ) + # if arguments is None or empty string, set to {} - if content := item["function"].get("arguments"): + if content := function.get("arguments"): if not isinstance(content, (dict, list)): - item["function"]["arguments"] = json.loads(content) + function["arguments"] = json.loads(content) else: - item["function"]["arguments"] = {} + function["arguments"] = {} def parse_chat_messages( diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index fc315a45658b..461128ed9053 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -151,7 +151,7 @@ async def build_async_engine_client_from_engine_args( yield async_llm finally: if async_llm: - async_llm.shutdown() + async_llm.shutdown(timeout=vllm_config.shutdown_timeout) def build_app( diff --git a/vllm/entrypoints/openai/chat_completion/batch_serving.py b/vllm/entrypoints/openai/chat_completion/batch_serving.py index cc49909b8361..0dfcdd925158 100644 --- a/vllm/entrypoints/openai/chat_completion/batch_serving.py +++ b/vllm/entrypoints/openai/chat_completion/batch_serving.py @@ -218,8 +218,6 @@ async def chat_completion_full_generator_batch( ``check_batch_mode`` validator, so neither needs to be handled here. """ created_time = int(time.time()) - role = self.get_chat_request_role(request) # type: ignore[arg-type] - final_results: dict[int, RequestOutput] = {} try: async for prompt_idx, res in merge_async_iterators(*generators): @@ -275,6 +273,12 @@ async def chat_completion_full_generator_batch( reasoning = None content = output.text + role = ( + self.response_role + if request.add_generation_prompt + else request.messages[prompt_idx][-1]["role"] + ) + message = ChatMessage(role=role, reasoning=reasoning, content=content) if request.echo: diff --git a/vllm/entrypoints/openai/chat_completion/protocol.py b/vllm/entrypoints/openai/chat_completion/protocol.py index eb5697442ca4..921f4a84a2c9 100644 --- a/vllm/entrypoints/openai/chat_completion/protocol.py +++ b/vllm/entrypoints/openai/chat_completion/protocol.py @@ -909,7 +909,9 @@ class BatchChatCompletionRequest(OpenAIBaseModel): - The ``n`` parameter must be 1 (or omitted). """ - messages: list[list[ChatCompletionMessageParam]] = Field(..., min_length=1) + messages: list[Annotated[list[ChatCompletionMessageParam], Field(min_length=1)]] = ( + Field(..., min_length=1) + ) model: str | None = None # Shared sampling / generation fields — mirror ChatCompletionRequest. diff --git a/vllm/entrypoints/serve/disagg/protocol.py b/vllm/entrypoints/serve/disagg/protocol.py index 7d468d5d9284..60d2a6424a00 100644 --- a/vllm/entrypoints/serve/disagg/protocol.py +++ b/vllm/entrypoints/serve/disagg/protocol.py @@ -68,7 +68,7 @@ class GenerateRequest(BaseModel): "through out the inference process and return in response." ), ) - token_ids: list[int] + token_ids: list[int] = Field(min_length=1) """The token ids to generate text from.""" @field_validator("token_ids") diff --git a/vllm/entrypoints/serve/disagg/serving.py b/vllm/entrypoints/serve/disagg/serving.py index ca49b6909f68..0cc227ee74db 100644 --- a/vllm/entrypoints/serve/disagg/serving.py +++ b/vllm/entrypoints/serve/disagg/serving.py @@ -8,6 +8,7 @@ from collections.abc import AsyncGenerator from collections.abc import Sequence as GenericSequence +import msgspec import numpy as np import pybase64 as base64 from fastapi import Request @@ -125,6 +126,18 @@ async def serve_tokens( if raw_request: raw_request.state.request_metadata = request_metadata + sampling_params = request.sampling_params + max_num_seqs = self.engine_client.vllm_config.scheduler_config.max_num_seqs + if sampling_params.n > max_num_seqs: + return self.create_error_response( + f"sampling_params.n must be at most the server's max_num_seqs " + f"({max_num_seqs}), got {sampling_params.n}." + ) + try: + msgspec.msgpack.encode(sampling_params) + except (OverflowError, TypeError, ValueError) as e: + return self.create_error_response(e) + engine_input: EngineInput if features := request.features: # Convert PlaceholderRangeInfo → PlaceholderRange per modality. @@ -164,7 +177,6 @@ async def serve_tokens( # Schedule the request and get the result generator. result_generator: AsyncGenerator[RequestOutput, None] | None = None - sampling_params = request.sampling_params # Apply server-side ``max_tokens`` defaulting when the client did # not set it, matching the OpenAI-compat endpoints. ``SamplingParams`` diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index ba1db3282216..afa621ae54d4 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -474,10 +474,9 @@ def shutdown(procs: list[BaseProcess], timeout: float | None = None) -> None: timeout: Maximum time in seconds to wait for graceful shutdown """ if timeout is None: - timeout = 0.0 - - # Allow at least 5 seconds for remaining procs to terminate. - timeout = max(timeout, 5.0) + # Keep a small grace period for best-effort cleanup paths that do not + # have a user-configured shutdown timeout. + timeout = 5.0 # Shutdown the process. for proc in procs: