From c78b1d49d9c02a4ebccc7dbe5b398e3f41adf9d6 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Sun, 1 Mar 2026 22:09:35 -0500 Subject: [PATCH 01/11] feat(responses): stateless multi-turn via encrypted_content (RFC #26934) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the @grs proposal for stateless multi-turn Responses API conversations without server-side storage, using the standard OpenAI `encrypted_content` field on a synthetic `ResponseReasoningItem` as the state carrier. **How it works:** 1. Client sets `store=false` + `include=["reasoning.encrypted_content"]` 2. vLLM serialises the Harmony message history into a signed blob (`vllm:1::`) and appends it as a synthetic `ReasoningItem` to the response output 3. On the next turn the client passes `previous_response` (full response object) instead of `previous_response_id` 4. vLLM extracts, verifies, and deserialises the history from the carrier item — no in-memory store touched **No breaking changes.** Existing `previous_response_id` + store-enabled path is unchanged. New path requires explicit opt-in. **Multi-node safe:** set `VLLM_RESPONSES_STATE_SIGNING_KEY` to the same 64-char hex value on all nodes so tokens validate across replicas. Files changed: - `vllm/entrypoints/openai/responses/state.py` (new) — serialise / deserialise / HMAC-verify state carriers - `vllm/entrypoints/openai/responses/protocol.py` — add `previous_response` field + mutual-exclusion validator on `ResponsesRequest`; `model_rebuild()` for forward ref - `vllm/entrypoints/openai/responses/serving.py` — stateless prev-response resolution; thread `prev_messages` through `_make_request*`; inject state carrier in `responses_full_generator`; 501 guards on `retrieve_responses` / `cancel_responses` when store disabled - `vllm/entrypoints/openai/responses/utils.py` — skip state-carrier `ReasoningItem`s when reconstructing chat messages - `vllm/envs.py` — register `VLLM_RESPONSES_STATE_SIGNING_KEY` - `tests/entrypoints/openai/responses/test_state.py` (new) — 16 unit tests - `tests/entrypoints/openai/responses/test_serving_stateless.py` (new) — 14 unit tests Closes #26934 (partial — non-streaming only; streaming carrier TBD) Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Will Deines --- .../responses/test_serving_stateless.py | 326 ++++++++++++++++++ .../openai/responses/test_state.py | 238 +++++++++++++ vllm/entrypoints/openai/responses/protocol.py | 29 +- vllm/entrypoints/openai/responses/serving.py | 244 +++++++++++-- vllm/entrypoints/openai/responses/state.py | 150 ++++++++ vllm/entrypoints/openai/responses/utils.py | 21 +- vllm/envs.py | 7 + 7 files changed, 983 insertions(+), 32 deletions(-) create mode 100644 tests/entrypoints/openai/responses/test_serving_stateless.py create mode 100644 tests/entrypoints/openai/responses/test_state.py create mode 100644 vllm/entrypoints/openai/responses/state.py diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py new file mode 100644 index 000000000000..d3493d667e3c --- /dev/null +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -0,0 +1,326 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +""" +Unit tests for stateless multi-turn Responses API (RFC #26934 + @grs). + +Covers: +- Protocol-level validation (pydantic model constraints on ResponsesRequest) +- State carrier injection helpers (_build_state_carrier / _extract_state_from_response) +- Error paths when enable_store=False (retrieve, cancel, previous_response_id) +- utils.py skipping of state-carrier items +- construct_input_messages with prev_messages +""" + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Signing key fixture — deterministic per test run +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def set_signing_key(monkeypatch): + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "cc" * 32) + import vllm.entrypoints.openai.responses.state as state_mod + + state_mod._SIGNING_KEY = None + yield + state_mod._SIGNING_KEY = None + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_minimal_serving(enable_store: bool = False): + """Return an OpenAIServingResponses instance with only the attributes + exercised by the methods under test. We bypass __init__ via __new__ and + set exactly what is needed. + + Attributes touched by retrieve_responses / cancel_responses: + enable_store, response_store, response_store_lock, background_tasks, + log_error_stack (for create_error_response from base class) + Attributes touched by create_responses (up to store check): + models (for _check_model), engine_client (for errored check) + """ + import asyncio + + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + obj = OpenAIServingResponses.__new__(OpenAIServingResponses) + obj.enable_store = enable_store + obj.response_store = {} + obj.response_store_lock = asyncio.Lock() + obj.background_tasks = {} + obj.log_error_stack = False + # _check_model needs models.is_base_model to return True (model found) + obj.models = MagicMock() + obj.models.is_base_model.return_value = True + # _validate_create_responses_input checks this before hitting the store guard + obj.use_harmony = False + return obj + + +# --------------------------------------------------------------------------- +# Protocol validation — pydantic model constraints +# --------------------------------------------------------------------------- + + +class TestProtocolValidation: + def test_previous_response_and_id_mutually_exclusive(self): + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + + # model_construct bypasses field validation so we get a typed instance + # without needing all required fields — Pydantic accepts it for type checks. + fake_prev = ResponsesResponse.model_construct(id="resp_fake") + with pytest.raises(Exception, match="Cannot set both"): + ResponsesRequest( + model="test", + input="hello", + previous_response_id="resp_abc", + previous_response=fake_prev, + ) + + def test_previous_response_silently_clears_store(self): + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + + fake_prev = ResponsesResponse.model_construct(id="resp_fake") + req = ResponsesRequest( + model="test", + input="hello", + store=True, + previous_response=fake_prev, + ) + assert req.store is False + + def test_no_previous_response_preserves_store_true(self): + from vllm.entrypoints.openai.responses.protocol import ResponsesRequest + + req = ResponsesRequest(model="test", input="hello", store=True) + assert req.store is True + + +# --------------------------------------------------------------------------- +# State carrier round-trip via serving helpers +# (thin wrappers over state.py — verified here to confirm the integration) +# --------------------------------------------------------------------------- + + +class TestStateCarrierHelpers: + def test_build_and_extract_roundtrip(self): + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + messages = [{"role": "user", "content": "hi"}, {"role": "assistant", "content": "hello"}] + + carrier = OpenAIServingResponses._build_state_carrier(serving, messages, "test_req_id_12345") + + assert isinstance(carrier, ResponseReasoningItem) + assert carrier.type == "reasoning" + assert carrier.status == "completed" + assert carrier.encrypted_content.startswith("vllm:1:") + + mock_response = MagicMock() + mock_response.output = [carrier] + + recovered = OpenAIServingResponses._extract_state_from_response(serving, mock_response) + assert recovered == messages + + def test_extract_returns_none_when_no_carrier(self): + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + + text = ResponseOutputText(type="output_text", text="hi", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", type="message", role="assistant", + content=[text], status="completed", + ) + mock_response = MagicMock() + mock_response.output = [msg] + + assert OpenAIServingResponses._extract_state_from_response(serving, mock_response) is None + + def test_extract_raises_on_tampered_carrier(self): + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + carrier = OpenAIServingResponses._build_state_carrier( + serving, [{"role": "user", "content": "hi"}], "req" + ) + + parts = carrier.encrypted_content.split(":", 3) + parts[3] = "0" * 64 + carrier.encrypted_content = ":".join(parts) + + mock_response = MagicMock() + mock_response.output = [carrier] + + with pytest.raises(ValueError, match="HMAC verification failed"): + OpenAIServingResponses._extract_state_from_response(serving, mock_response) + + +# --------------------------------------------------------------------------- +# Error paths when enable_store=False +# --------------------------------------------------------------------------- + + +class TestStorelessErrorPaths: + @pytest.mark.asyncio + async def test_retrieve_without_store_returns_501(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + result = await OpenAIServingResponses.retrieve_responses( + serving, "resp_123", None, False + ) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_IMPLEMENTED + assert "VLLM_ENABLE_RESPONSES_API_STORE" in result.error.message + + @pytest.mark.asyncio + async def test_cancel_without_store_returns_501(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + result = await OpenAIServingResponses.cancel_responses(serving, "resp_123") + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_IMPLEMENTED + + @pytest.mark.asyncio + async def test_previous_response_id_without_store_returns_400(self): + """create_responses should reject previous_response_id when store is off.""" + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ResponsesRequest + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + # Minimal extra attributes that create_responses reads before the store check + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + req = ResponsesRequest( + model="test", + input="hello", + previous_response_id="resp_old", + store=False, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "VLLM_ENABLE_RESPONSES_API_STORE" in result.error.message + + +# --------------------------------------------------------------------------- +# utils.py — state-carrier items are transparently skipped +# --------------------------------------------------------------------------- + + +class TestUtilsStateCarrierSkipping: + def test_construct_chat_messages_skips_state_carrier(self): + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.state import serialize_state + from vllm.entrypoints.openai.responses.utils import construct_chat_messages_with_tool_call + + blob = serialize_state([{"role": "user", "content": "prev"}]) + carrier = ResponseReasoningItem( + id="rs_state_abc", type="reasoning", summary=[], + status="completed", encrypted_content=blob, + ) + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", type="message", role="assistant", + content=[text], status="completed", + ) + + result = construct_chat_messages_with_tool_call([carrier, msg]) + + assert len(result) == 1 + assert result[0]["role"] == "assistant" + assert result[0]["content"] == "Hello!" + + def test_single_message_from_state_carrier_is_none(self): + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.state import serialize_state + from vllm.entrypoints.openai.responses.utils import _construct_single_message_from_response_item + + blob = serialize_state([{"role": "user", "content": "hi"}]) + carrier = ResponseReasoningItem( + id="rs_state_xyz", type="reasoning", summary=[], + status="completed", encrypted_content=blob, + ) + assert _construct_single_message_from_response_item(carrier) is None + + def test_external_encrypted_content_still_raises(self): + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.utils import _construct_single_message_from_response_item + + external = ResponseReasoningItem( + id="rs_ext", type="reasoning", summary=[], + status="completed", + encrypted_content="opaque-blob-not-from-vllm", + ) + with pytest.raises(ValueError, match="not supported"): + _construct_single_message_from_response_item(external) + + +# --------------------------------------------------------------------------- +# construct_input_messages with prev_messages override +# --------------------------------------------------------------------------- + + +class TestPrevMessagesOverride: + def test_prev_messages_used_over_empty_msg_store(self): + from vllm.entrypoints.openai.responses.utils import construct_input_messages + + prev = [ + {"role": "user", "content": "Who are you?"}, + {"role": "assistant", "content": "I am helpful."}, + ] + result = construct_input_messages( + request_instructions=None, + request_input="What did you say?", + prev_msg=prev, + prev_response_output=None, + ) + assert result[0] == {"role": "user", "content": "Who are you?"} + assert result[1] == {"role": "assistant", "content": "I am helpful."} + assert result[2] == {"role": "user", "content": "What did you say?"} + + def test_full_stateless_roundtrip(self): + """serialize → embed in carrier → extract → same messages.""" + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + original = [ + {"role": "system", "content": "Be helpful."}, + {"role": "user", "content": "My name is Alice"}, + {"role": "assistant", "content": "Hello, Alice!"}, + ] + carrier = OpenAIServingResponses._build_state_carrier(serving, original, "req_abc123456789") + + mock_response = MagicMock() + mock_response.output = [carrier] + + recovered = OpenAIServingResponses._extract_state_from_response(serving, mock_response) + assert recovered == original diff --git a/tests/entrypoints/openai/responses/test_state.py b/tests/entrypoints/openai/responses/test_state.py new file mode 100644 index 000000000000..1d5ba9eb8254 --- /dev/null +++ b/tests/entrypoints/openai/responses/test_state.py @@ -0,0 +1,238 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""Unit tests for the stateless Responses API state carrier (state.py).""" + +import importlib +import os + +import pytest + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _reset_signing_key(): + """Force state.py to re-derive the signing key on next call.""" + import vllm.entrypoints.openai.responses.state as state_mod + + state_mod._SIGNING_KEY = None + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def isolated_key(monkeypatch): + """Each test gets a fresh, deterministic signing key.""" + monkeypatch.setenv( + "VLLM_RESPONSES_STATE_SIGNING_KEY", "ab" * 32 # 64 hex chars = 32 bytes + ) + _reset_signing_key() + yield + _reset_signing_key() + + +# --------------------------------------------------------------------------- +# Import after env is patched (in case module was already imported) +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def state(): + import vllm.entrypoints.openai.responses.state as m + + return m + + +# --------------------------------------------------------------------------- +# Round-trip tests +# --------------------------------------------------------------------------- + + +def test_roundtrip_plain_dicts(state): + messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + blob = state.serialize_state(messages) + recovered = state.deserialize_state(blob) + assert recovered == messages + + +def test_roundtrip_empty_list(state): + blob = state.serialize_state([]) + recovered = state.deserialize_state(blob) + assert recovered == [] + + +def test_roundtrip_nested_structure(state): + messages = [ + { + "role": "user", + "content": [{"type": "text", "text": "What is 2+2?"}], + }, + {"role": "assistant", "content": "4", "extra": {"key": [1, 2, 3]}}, + ] + blob = state.serialize_state(messages) + recovered = state.deserialize_state(blob) + assert recovered == messages + + +def test_roundtrip_pydantic_model(state): + """Objects with model_dump() should serialize transparently.""" + + class FakeModel: + def model_dump(self): + return {"author": {"role": "user"}, "content": "hi"} + + messages = [FakeModel()] + blob = state.serialize_state(messages) + recovered = state.deserialize_state(blob) + # After JSON round-trip, FakeModel becomes a plain dict + assert recovered == [{"author": {"role": "user"}, "content": "hi"}] + + +# --------------------------------------------------------------------------- +# is_state_carrier +# --------------------------------------------------------------------------- + + +def test_is_state_carrier_true(state): + blob = state.serialize_state([{"role": "user", "content": "hi"}]) + + class FakeItem: + encrypted_content = blob + + assert state.is_state_carrier(FakeItem()) + + +def test_is_state_carrier_false_external(state): + """Real encrypted_content from external models should not be detected.""" + + class FakeItem: + encrypted_content = "some-opaque-blob-from-openai" + + assert not state.is_state_carrier(FakeItem()) + + +def test_is_state_carrier_false_no_field(state): + class FakeItem: + pass + + assert not state.is_state_carrier(FakeItem()) + + +def test_is_state_carrier_false_none(state): + class FakeItem: + encrypted_content = None + + assert not state.is_state_carrier(FakeItem()) + + +# --------------------------------------------------------------------------- +# deserialize_state — non-carrier inputs +# --------------------------------------------------------------------------- + + +def test_deserialize_returns_none_for_non_carrier(state): + assert state.deserialize_state("some-random-opaque-string") is None + + +def test_deserialize_returns_none_for_empty_string(state): + assert state.deserialize_state("") is None + + +# --------------------------------------------------------------------------- +# HMAC tamper detection +# --------------------------------------------------------------------------- + + +def test_tampered_payload_raises(state): + blob = state.serialize_state([{"role": "user", "content": "original"}]) + # Corrupt the payload part (index 2 when split on ':') + parts = blob.split(":", 3) + assert len(parts) == 4 + parts[2] = parts[2][:-4] + "XXXX" # corrupt the base64 payload + tampered = ":".join(parts) + with pytest.raises(ValueError, match="HMAC verification failed"): + state.deserialize_state(tampered) + + +def test_tampered_sig_raises(state): + blob = state.serialize_state([{"role": "user", "content": "hello"}]) + parts = blob.split(":", 3) + parts[3] = "0" * 64 # replace HMAC with zeros + tampered = ":".join(parts) + with pytest.raises(ValueError, match="HMAC verification failed"): + state.deserialize_state(tampered) + + +def test_malformed_carrier_raises(state): + malformed = "vllm:1:onlythreeparts" + with pytest.raises(ValueError, match="Malformed vLLM state carrier"): + state.deserialize_state(malformed) + + +# --------------------------------------------------------------------------- +# Cross-key incompatibility +# --------------------------------------------------------------------------- + + +def test_different_keys_are_incompatible(monkeypatch): + """A blob signed with key A must not validate with key B.""" + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "aa" * 32) + state_mod._SIGNING_KEY = None + blob = state_mod.serialize_state([{"role": "user", "content": "secret"}]) + + # Switch to a different key + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "bb" * 32) + state_mod._SIGNING_KEY = None + + with pytest.raises(ValueError, match="HMAC verification failed"): + state_mod.deserialize_state(blob) + + +# --------------------------------------------------------------------------- +# Random-key warning (no env var) +# --------------------------------------------------------------------------- + + +def test_no_env_var_generates_random_key(monkeypatch): + """Without the env var, a random 32-byte key is generated. + + The warning is emitted via vLLM's logger (visible in test output) but is + not capturable via capsys/caplog since vLLM writes to sys.__stdout__ directly. + """ + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.delenv("VLLM_RESPONSES_STATE_SIGNING_KEY", raising=False) + state_mod._SIGNING_KEY = None + + key = state_mod._get_signing_key() + + assert key is not None + assert len(key) == 32 + # A second call returns the same cached key (warning only fires once) + key2 = state_mod._get_signing_key() + assert key == key2 + + +# --------------------------------------------------------------------------- +# Invalid hex key +# --------------------------------------------------------------------------- + + +def test_invalid_hex_key_raises(monkeypatch): + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "not-valid-hex!!") + state_mod._SIGNING_KEY = None + + with pytest.raises(ValueError, match="valid hex string"): + state_mod._get_signing_key() diff --git a/vllm/entrypoints/openai/responses/protocol.py b/vllm/entrypoints/openai/responses/protocol.py index a5f62bdd8c39..980c1bfb6047 100644 --- a/vllm/entrypoints/openai/responses/protocol.py +++ b/vllm/entrypoints/openai/responses/protocol.py @@ -4,7 +4,7 @@ # Adapted from # https://github.com/lm-sys/FastChat/blob/168ccc29d3f7edc50823016105c024fe2282732a/fastchat/protocol/openai_api_protocol.py import time -from typing import Any, Literal, TypeAlias +from typing import TYPE_CHECKING, Any, Literal, TypeAlias from openai.types.responses import ( ResponseCodeInterpreterCallCodeDeltaEvent, @@ -236,6 +236,13 @@ class ResponsesRequest(OpenAIBaseModel): # this cannot be used in conjunction with previous_response_id # TODO: consider supporting non harmony messages as well previous_input_messages: list[OpenAIHarmonyMessage | dict] | None = None + # Accept full previous response for stateless multi-turn (RFC #26934 + @grs). + # The client stores the full response object and passes it back on the next + # turn instead of a previous_response_id. vLLM extracts the Harmony message + # history from the encrypted_content state carrier embedded in the response + # output, so no server-side store is required. + # Cannot be set together with previous_response_id. + previous_response: "ResponsesResponse | None" = None structured_outputs: StructuredOutputsParams | None = Field( default=None, description="Additional kwargs for structured outputs", @@ -382,6 +389,21 @@ def is_include_output_logprobs(self) -> bool: and "message.output_text.logprobs" in self.include ) + @model_validator(mode="after") + def validate_previous_response_xor_id(self) -> "ResponsesRequest": + if self.previous_response_id and self.previous_response is not None: + raise ValueError( + "Cannot set both 'previous_response_id' and 'previous_response'. " + "Use 'previous_response_id' for store-backed multi-turn, or " + "'previous_response' (with include=['reasoning.encrypted_content'] " + "and store=false) for stateless multi-turn." + ) + if self.previous_response is not None and self.store: + # Stateless path: store is meaningless (and would cause confusion). + # Mirror the silent-disable behavior in create_responses for store=True. + self.store = False + return self + @model_validator(mode="before") @classmethod def validate_background(cls, data): @@ -652,3 +674,8 @@ class ResponseInProgressEvent(OpenAIResponseInProgressEvent): | ResponseMcpCallInProgressEvent | ResponseMcpCallCompletedEvent ) + +# Resolve forward reference: ResponsesRequest.previous_response -> ResponsesResponse +# Both classes are defined in this module; model_rebuild() is needed because +# ResponsesResponse did not exist when ResponsesRequest was first evaluated. +ResponsesRequest.model_rebuild() diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index b2428e97e20d..cd0a140e408c 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -328,6 +328,14 @@ def _validate_create_responses_input( status_code=HTTPStatus.BAD_REQUEST, param="previous_response_id", ) + if request.previous_input_messages and request.previous_response is not None: + return self.create_error_response( + err_type="invalid_request_error", + message="Only one of `previous_input_messages` and " + "`previous_response` can be set.", + status_code=HTTPStatus.BAD_REQUEST, + param="previous_response", + ) return None async def create_responses( @@ -362,25 +370,58 @@ async def create_responses( # value). request.store = False - # Handle the previous response ID. - prev_response_id = request.previous_response_id - if prev_response_id is not None: + # Resolve the previous response (store-backed or stateless). + prev_response: ResponsesResponse | None = None + if request.previous_response_id is not None: + if not self.enable_store: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "`previous_response_id` requires the response store to be " + "enabled (VLLM_ENABLE_RESPONSES_API_STORE=1). " + "For stateless multi-turn without a persistent store, pass " + "the full previous response object via `previous_response` " + "with `include=['reasoning.encrypted_content']` and " + "`store=false`." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) async with self.response_store_lock: - prev_response = self.response_store.get(prev_response_id) + prev_response = self.response_store.get(request.previous_response_id) if prev_response is None: - return self._make_not_found_error(prev_response_id) - else: - prev_response = None + return self._make_not_found_error(request.previous_response_id) + elif request.previous_response is not None: + prev_response = request.previous_response - lora_request = self._maybe_get_adapters(request) - model_name = self.models.model_name(lora_request) + # For stateless multi-turn: extract Harmony/chat messages from the + # encrypted_content state carrier embedded in the previous response. + # This replaces the msg_store lookup for subsequent turns. + prev_messages_from_state: list | None = None + if prev_response is not None and request.previous_response is not None: + prev_messages_from_state = self._extract_state_from_response(prev_response) - if self.use_harmony: - messages, engine_prompts = self._make_request_with_harmony( - request, prev_response - ) - else: - messages, engine_prompts = await self._make_request(request, prev_response) + try: + lora_request = self._maybe_get_adapters(request) + model_name = self.models.model_name(lora_request) + + if self.use_harmony: + messages, engine_prompts = self._make_request_with_harmony( + request, prev_response, prev_messages_from_state + ) + else: + messages, engine_prompts = await self._make_request( + request, prev_response, prev_messages_from_state + ) + + except ( + ValueError, + TypeError, + RuntimeError, + jinja2.TemplateError, + NotImplementedError, + ) as e: + logger.exception("Error in preprocessing prompt inputs") + return self.create_error_response(e) request_metadata = RequestResponseMetadata(request_id=request.request_id) if raw_request: @@ -567,29 +608,44 @@ async def create_responses( model_name, tokenizer, request_metadata, + messages=messages, ) - return await self.responses_full_generator( - request, - sampling_params, - result_generator, - context, - model_name, - tokenizer, - request_metadata, - ) + try: + return await self.responses_full_generator( + request, + sampling_params, + result_generator, + context, + model_name, + tokenizer, + request_metadata, + messages=messages, + ) + except GenerationError as e: + return self._convert_generation_error_to_response(e) + except Exception as e: + return self.create_error_response(e) async def _make_request( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, + prev_messages: list | None = None, ): tool_dicts = construct_tool_dicts(request.tools, request.tool_choice) # Construct the input messages. + # For stateless multi-turn, prev_messages comes from the encrypted_content + # state carrier; otherwise fall back to the in-memory msg_store. + prev_msg = ( + prev_messages + if prev_messages is not None + else (self.msg_store.get(prev_response.id) if prev_response else None) + ) messages = construct_input_messages( request_instructions=request.instructions, request_input=request.input, - prev_msg=self.msg_store.get(prev_response.id) if prev_response else None, + prev_msg=prev_msg, prev_response_output=prev_response.output if prev_response else None, ) @@ -711,13 +767,16 @@ def _make_request_with_harmony( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, + prev_messages: list | None = None, ): if request.tool_choice != "auto": raise NotImplementedError( "Only 'auto' tool_choice is supported in response API with Harmony" ) - messages = self._construct_input_messages_with_harmony(request, prev_response) + messages = self._construct_input_messages_with_harmony( + request, prev_response, prev_messages + ) prompt_token_ids = render_for_completion(messages) engine_prompt = token_inputs(prompt_token_ids) @@ -753,6 +812,7 @@ async def responses_full_generator( tokenizer: TokenizerLike, request_metadata: RequestResponseMetadata, created_time: int | None = None, + messages: list | None = None, ) -> ErrorResponse | ResponsesResponse: if created_time is None: created_time = int(time.time()) @@ -882,6 +942,28 @@ async def responses_full_generator( usage=usage, ) + # Inject state carrier for stateless multi-turn (RFC #26934 + @grs). + # When the client requests encrypted_content inclusion and has opted out + # of server-side storage, embed a signed blob of the full message history + # into a synthetic ReasoningItem appended to response.output. The client + # stores the response opaquely and passes it back as `previous_response` + # on the next turn; vLLM then deserializes the blob to recover history. + if ( + request.include + and "reasoning.encrypted_content" in request.include + and not request.store + ): + carrier_messages: list | None = None + if self.use_harmony and isinstance(context, HarmonyContext): + carrier_messages = list(context.messages) + elif messages is not None: + carrier_messages = messages + if carrier_messages is not None: + state_carrier = self._build_state_carrier( + carrier_messages, request.request_id + ) + response.output.append(state_carrier) + if request.store: async with self.response_store_lock: stored_response = self.response_store.get(response.id) @@ -1132,6 +1214,7 @@ def _construct_input_messages_with_harmony( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, + prev_messages: list | None = None, ) -> list[OpenAIHarmonyMessage]: messages: list[OpenAIHarmonyMessage] = [] if prev_response is None: @@ -1154,7 +1237,17 @@ def _construct_input_messages_with_harmony( # Continue the previous conversation. # FIXME(woosuk): Currently, request params like reasoning and # instructions are ignored. - prev_msgs = self.msg_store[prev_response.id] + if prev_messages is not None: + # Stateless path: messages came from the encrypted_content + # state carrier; deserialize dicts back to OpenAIHarmonyMessage. + prev_msgs = [ + OpenAIHarmonyMessage.model_validate(m) + if isinstance(m, dict) + else m + for m in prev_messages + ] + else: + prev_msgs = self.msg_store[prev_response.id] # FIXME(woosuk): The slice-delete-reappend cycle below is # currently a no-op --- it removes messages then puts them all @@ -1280,6 +1373,19 @@ async def retrieve_responses( | ResponsesResponse | AsyncGenerator[StreamingResponsesResponse, None] ): + if not self.enable_store: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "Response retrieval requires the response store to be enabled " + "(VLLM_ENABLE_RESPONSES_API_STORE=1). " + "For stateless multi-turn, use `previous_response` with " + "`include=['reasoning.encrypted_content']` and `store=false` " + "instead of retrieving responses by ID." + ), + status_code=HTTPStatus.NOT_IMPLEMENTED, + ) + async with self.response_store_lock: response = self.response_store.get(response_id) @@ -1297,6 +1403,28 @@ async def cancel_responses( self, response_id: str, ) -> ErrorResponse | ResponsesResponse: + if not self.enable_store: + # Stateless mode: cancel in-flight tasks by ID only (no stored response). + if task := self.background_tasks.get(response_id): + task.cancel() + try: + await task + except asyncio.CancelledError: + logger.exception( + "Background task for %s was cancelled (stateless mode)", + response_id, + ) + return self.create_error_response( + err_type="invalid_request_error", + message=( + "Response cancellation in stateless mode cancelled any " + "in-flight task, but no stored response object is available. " + "Enable VLLM_ENABLE_RESPONSES_API_STORE=1 for full cancel " + "support." + ), + status_code=HTTPStatus.NOT_IMPLEMENTED, + ) + async with self.response_store_lock: response = self.response_store.get(response_id) if response is None: @@ -1330,6 +1458,66 @@ def _make_not_found_error(self, response_id: str) -> ErrorResponse: param="response_id", ) + def _build_state_carrier( + self, messages: list, request_id: str + ) -> ResponseReasoningItem: + """Build a synthetic ReasoningItem that carries serialized message history. + + The ``encrypted_content`` field holds a signed, base64-encoded JSON blob + of the full message list. On the next turn the client passes the full + response back as ``previous_response``; vLLM calls + ``_extract_state_from_response`` to recover history without a store. + """ + from vllm.entrypoints.openai.responses.state import serialize_state + + return ResponseReasoningItem( + id=f"rs_state_{request_id[-12:]}", + type="reasoning", + summary=[], + status="completed", + encrypted_content=serialize_state(messages), + ) + + def _extract_state_from_response( + self, response: ResponsesResponse + ) -> list | None: + """Extract the serialized message history from a response's state carrier. + + Scans ``response.output`` for a vLLM state-carrier ReasoningItem and + deserializes its ``encrypted_content`` field back into a message list. + + Returns: + The deserialized message list (plain dicts), or ``None`` if no state + carrier is found (e.g. when the previous response was generated + without ``include=['reasoning.encrypted_content']``). + + Raises: + ValueError: If a state carrier is found but its HMAC is invalid. + """ + from vllm.entrypoints.openai.responses.state import ( + deserialize_state, + is_state_carrier, + ) + + for item in response.output: + if is_state_carrier(item): + return deserialize_state(item.encrypted_content) + return None + + def _make_store_not_supported_error(self) -> ErrorResponse: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "`store=True` (default) is not supported. Please set " + "`store=False` in Responses API or set " + "`VLLM_ENABLE_RESPONSES_API_STORE=1` in the env var when " + "starting the vLLM server." + ), + status_code=HTTPStatus.BAD_REQUEST, + param="store", + ) + + async def _process_simple_streaming_events( self, request: ResponsesRequest, @@ -1949,6 +2137,7 @@ async def responses_stream_generator( tokenizer: TokenizerLike, request_metadata: RequestResponseMetadata, created_time: int | None = None, + messages: list | None = None, ) -> AsyncGenerator[StreamingResponsesResponse, None]: # TODO: # 1. Handle disconnect @@ -2036,6 +2225,7 @@ async def empty_async_generator(): tokenizer, request_metadata, created_time=created_time, + messages=messages, ) yield _increment_sequence_number_and_return( ResponseCompletedEvent( diff --git a/vllm/entrypoints/openai/responses/state.py b/vllm/entrypoints/openai/responses/state.py new file mode 100644 index 000000000000..58e01b432bd7 --- /dev/null +++ b/vllm/entrypoints/openai/responses/state.py @@ -0,0 +1,150 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +""" +Stateless conversation state carrier for the Responses API. + +Serializes conversation message history into the ``encrypted_content`` field of +a synthetic ResponseReasoningItem, enabling multi-turn conversations without +server-side storage. Implements the approach proposed by @grs in +vllm-project/vllm#26934. + +Wire format: ``vllm:1::`` + +Security note: + The payload is **signed**, not encrypted. The HMAC-SHA256 signature + prevents client-side tampering with the serialized history, but the + contents (serialized message dicts) are readable by anyone who holds + the response object. This matches the spirit of the ``encrypted_content`` + field in the OpenAI protocol: an opaque blob the client stores and + returns verbatim. + +Multi-node deployments: + Set ``VLLM_RESPONSES_STATE_SIGNING_KEY`` to the same 64-character hex + string on all nodes. Without it each process generates a random key at + startup, making state carriers incompatible across restarts/replicas. +""" + +import base64 +import hashlib +import hmac +import json +import os +from typing import Any + +from vllm.logger import init_logger + +logger = init_logger(__name__) + +_FORMAT_VERSION = "vllm:1" +_SIGNING_KEY: bytes | None = None + + +def _get_signing_key() -> bytes: + global _SIGNING_KEY + if _SIGNING_KEY is None: + key_hex = os.environ.get("VLLM_RESPONSES_STATE_SIGNING_KEY", "") + if key_hex: + try: + _SIGNING_KEY = bytes.fromhex(key_hex) + except ValueError as exc: + raise ValueError( + "VLLM_RESPONSES_STATE_SIGNING_KEY must be a valid hex " + "string (e.g. a 64-char / 32-byte hex key)." + ) from exc + else: + _SIGNING_KEY = os.urandom(32) + logger.warning( + "VLLM_RESPONSES_STATE_SIGNING_KEY is not set. " + "Stateless multi-turn state carriers are valid only for this " + "server instance and will break across restarts or on " + "multi-node deployments. " + "Set VLLM_RESPONSES_STATE_SIGNING_KEY to a shared 64-char " + "hex key to enable multi-node / restart-safe operation." + ) + return _SIGNING_KEY + + +def _harmony_serializer(obj: Any) -> Any: + """JSON serializer for OpenAIHarmonyMessage and similar pydantic models.""" + if hasattr(obj, "model_dump"): + return obj.model_dump() + raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") + + +def serialize_state(messages: list[Any]) -> str: + """Serialize *messages* into a signed, base64-encoded state carrier string. + + The returned string is suitable for embedding in + ``ResponseReasoningItem.encrypted_content``. + + Args: + messages: A list of message objects (OpenAIHarmonyMessage instances or + plain ``ChatCompletionMessageParam`` dicts). + + Returns: + A wire-format state carrier string: ``vllm:1::``. + """ + payload_b64 = base64.b64encode( + json.dumps(messages, default=_harmony_serializer).encode() + ).decode() + sig = hmac.new( + _get_signing_key(), payload_b64.encode(), hashlib.sha256 + ).hexdigest() + return f"{_FORMAT_VERSION}:{payload_b64}:{sig}" + + +def deserialize_state(encrypted_content: str) -> list[Any] | None: + """Deserialize a state carrier string back into a message list. + + Args: + encrypted_content: The value of ``ResponseReasoningItem.encrypted_content``. + + Returns: + The deserialized message list (as plain dicts), or ``None`` if the + string is not a vLLM state carrier (e.g. a real encrypted_content + from an external model). + + Raises: + ValueError: If the string looks like a vLLM state carrier but the + HMAC signature does not match (indicates tampering). + """ + if not encrypted_content.startswith(f"{_FORMAT_VERSION}:"): + return None + # Expected: "vllm:1::" + # Split into exactly 4 parts on the first 3 colons. + parts = encrypted_content.split(":", 3) + if len(parts) != 4: + raise ValueError( + "Malformed vLLM state carrier: expected " + f"'{_FORMAT_VERSION}::', got {encrypted_content!r}" + ) + _, _, payload_b64, sig = parts + expected = hmac.new( + _get_signing_key(), payload_b64.encode(), hashlib.sha256 + ).hexdigest() + if not hmac.compare_digest(sig, expected): + raise ValueError( + "vLLM state carrier HMAC verification failed. " + "The state may have been tampered with, or was produced by a " + "different server instance (check VLLM_RESPONSES_STATE_SIGNING_KEY)." + ) + return json.loads(base64.b64decode(payload_b64)) + + +def is_state_carrier(item: Any) -> bool: + """Return True if *item* is a vLLM state-carrier ReasoningItem. + + A state carrier is a ``ResponseReasoningItem`` whose ``encrypted_content`` + field starts with the vLLM format prefix. These items are synthetic: + they carry serialized conversation history and should not be sent to the + LLM as part of the chat message list. + + Args: + item: Any object, typically a ``ResponseOutputItem``. + + Returns: + True if the item is a vLLM-generated state carrier. + """ + ec = getattr(item, "encrypted_content", None) + return bool(ec and isinstance(ec, str) and ec.startswith(f"{_FORMAT_VERSION}:")) diff --git a/vllm/entrypoints/openai/responses/utils.py b/vllm/entrypoints/openai/responses/utils.py index 0713fe2a1474..06082edfed4d 100644 --- a/vllm/entrypoints/openai/responses/utils.py +++ b/vllm/entrypoints/openai/responses/utils.py @@ -167,14 +167,16 @@ def construct_chat_messages_with_tool_call( if maybe_combined_message is not None: messages[-1] = maybe_combined_message else: - messages.append(_construct_single_message_from_response_item(item)) + result = _construct_single_message_from_response_item(item) + if result is not None: # None means state carrier — skip it + messages.append(result) return messages def _construct_single_message_from_response_item( item: ResponseInputOutputItem, -) -> ChatCompletionMessageParam: +) -> ChatCompletionMessageParam | None: if isinstance(item, ResponseFunctionToolCall): # Append the function call as a tool call. return ChatCompletionAssistantMessageParam( @@ -191,10 +193,21 @@ def _construct_single_message_from_response_item( ], ) elif isinstance(item, ResponseReasoningItem): + from vllm.entrypoints.openai.responses.state import is_state_carrier + + if is_state_carrier(item): + # This is a vLLM state-carrier item: it carries serialized Harmony + # history in encrypted_content and must not be forwarded to the LLM. + return None reasoning_content = "" if item.encrypted_content: - raise ValueError("Encrypted content is not supported.") - elif item.content and len(item.content) >= 1: + raise ValueError( + "Encrypted content from external providers is not supported. " + "vLLM-generated state carriers are handled transparently." + ) + if len(item.summary) == 1: + reasoning_content = item.summary[0].text + elif item.content and len(item.content) == 1: reasoning_content = item.content[0].text elif len(item.summary) >= 1: reasoning_content = item.summary[0].text diff --git a/vllm/envs.py b/vllm/envs.py index d6240df36051..cd460a0a26a6 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -199,6 +199,7 @@ VLLM_LOOPBACK_IP: str = "" VLLM_ALLOW_CHUNKED_LOCAL_ATTN_WITH_HYBRID_KV_CACHE: bool = True VLLM_ENABLE_RESPONSES_API_STORE: bool = False + VLLM_RESPONSES_STATE_SIGNING_KEY: str = "" VLLM_NVFP4_GEMM_BACKEND: str | None = None VLLM_HAS_FLASHINFER_CUBIN: bool = False VLLM_USE_FLASHINFER_MOE_MXFP4_MXFP8: bool = False @@ -1459,6 +1460,12 @@ def _get_or_set_default() -> str: "VLLM_ENABLE_RESPONSES_API_STORE": lambda: bool( int(os.getenv("VLLM_ENABLE_RESPONSES_API_STORE", "0")) ), + # Hex-encoded 32-byte (64-char) signing key for stateless multi-turn + # state carriers (RFC #26934). If not set, a random key is generated at + # startup (incompatible across restarts / multi-node deployments). + "VLLM_RESPONSES_STATE_SIGNING_KEY": lambda: os.environ.get( + "VLLM_RESPONSES_STATE_SIGNING_KEY", "" + ), # If set, use the fp8 mfma in rocm paged attention. "VLLM_ROCM_FP8_MFMA_PAGE_ATTN": lambda: bool( int(os.getenv("VLLM_ROCM_FP8_MFMA_PAGE_ATTN", "0")) From 96868da63085e4bc9d14c3d5004dac89f96d9021 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Sun, 1 Mar 2026 22:15:34 -0500 Subject: [PATCH 02/11] =?UTF-8?q?fix(responses):=20cancel=5Fresponses=20st?= =?UTF-8?q?ateless=20mode=20=E2=80=94=20404=20on=20unknown=20id,=20400=20o?= =?UTF-8?q?n=20success,=20info=20log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per code review feedback: - Return 404 (not 501) when response_id has no matching background task, consistent with the stateful path's _make_not_found_error behavior - Return 400 BAD_REQUEST (not 501 NOT_IMPLEMENTED) when a task is found and cancelled — cancellation succeeded, but no stored response object can be returned; 501 was misleading - Use logger.info instead of logger.exception for asyncio.CancelledError, since cancellation is the expected outcome of this call path Update test to assert 404 for the unknown-id case. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Will Deines --- .../responses/test_serving_stateless.py | 5 +-- vllm/entrypoints/openai/responses/serving.py | 31 ++++++++++++------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py index d3493d667e3c..9b275c6d17a9 100644 --- a/tests/entrypoints/openai/responses/test_serving_stateless.py +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -194,7 +194,8 @@ async def test_retrieve_without_store_returns_501(self): assert "VLLM_ENABLE_RESPONSES_API_STORE" in result.error.message @pytest.mark.asyncio - async def test_cancel_without_store_returns_501(self): + async def test_cancel_without_store_unknown_id_returns_404(self): + """With store off and no matching background task, cancel returns 404.""" from vllm.entrypoints.openai.engine.protocol import ErrorResponse from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses @@ -202,7 +203,7 @@ async def test_cancel_without_store_returns_501(self): result = await OpenAIServingResponses.cancel_responses(serving, "resp_123") assert isinstance(result, ErrorResponse) - assert result.error.code == HTTPStatus.NOT_IMPLEMENTED + assert result.error.code == HTTPStatus.NOT_FOUND @pytest.mark.asyncio async def test_previous_response_id_without_store_returns_400(self): diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index cd0a140e408c..49d629faeff6 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -1405,24 +1405,31 @@ async def cancel_responses( ) -> ErrorResponse | ResponsesResponse: if not self.enable_store: # Stateless mode: cancel in-flight tasks by ID only (no stored response). - if task := self.background_tasks.get(response_id): - task.cancel() - try: - await task - except asyncio.CancelledError: - logger.exception( - "Background task for %s was cancelled (stateless mode)", - response_id, - ) + task = self.background_tasks.get(response_id) + if not task: + # Mimic the stateful path's 404 when no task is found. + return self._make_not_found_error(response_id) + + task.cancel() + try: + await task + except asyncio.CancelledError: + logger.info( + "Background task for %s was cancelled (stateless mode)", + response_id, + ) + + # Cancellation was initiated but we cannot return a full response + # object because no state is stored in stateless mode. return self.create_error_response( err_type="invalid_request_error", message=( - "Response cancellation in stateless mode cancelled any " - "in-flight task, but no stored response object is available. " + "Response cancellation was initiated for the in-flight task, " + "but a full response object cannot be returned in stateless mode. " "Enable VLLM_ENABLE_RESPONSES_API_STORE=1 for full cancel " "support." ), - status_code=HTTPStatus.NOT_IMPLEMENTED, + status_code=HTTPStatus.BAD_REQUEST, ) async with self.response_store_lock: From 90ab93885e0756a12cd2b22781a969b114576eba Mon Sep 17 00:00:00 2001 From: Will Deines Date: Sun, 1 Mar 2026 22:31:37 -0500 Subject: [PATCH 03/11] fix+test: address code review feedback (cancel error codes, missing carrier guard, background invariant) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes three issues found in review: 1. cancel_responses stateless mode (gemini-code-assist P1): - Return 404 (not 501) for unknown response_id — consistent with stateful path - Return 400 BAD_REQUEST (not 501) on successful cancellation — task was cancelled but no stored response is available; 501 was misleading - Use logger.info (not logger.exception) for expected CancelledError 2. Missing carrier guard in create_responses (codex P1): - When previous_response has no state carrier and store is disabled, return 400 with a clear message instead of falling through to msg_store[id] KeyError → 500 3. background/store invariant in protocol validator (codex P2): - Reject background=True + previous_response at validation time rather than silently producing an unretrievable background response Tests: - Add test_cancel_without_store_active_task_returns_400: covers the success branch of the cancel fix; uses await asyncio.sleep(0) to start the task before cancelling (Python 3.12: unstarted tasks cancelled before first await never run their body) - Add test_previous_response_without_carrier_returns_400: regression for the KeyError → 500 bug - Add test_background_with_previous_response_raises: regression for the background/store invariant - Remove test_no_previous_response_preserves_store_true: passed regardless of our code (no new path exercised) - Remove test_full_stateless_roundtrip: duplicate of test_build_and_extract_roundtrip - Rename test_prev_messages_used_over_empty_msg_store → test_construct_input_messages_prepends_prev_msg (accurate name) All 31 tests pass. Signed-off-by: Will Deines --- .../responses/test_serving_stateless.py | 138 +++++++++++++++--- vllm/entrypoints/openai/responses/protocol.py | 20 ++- vllm/entrypoints/openai/responses/serving.py | 16 ++ 3 files changed, 146 insertions(+), 28 deletions(-) diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py index 9b275c6d17a9..e33900264e6a 100644 --- a/tests/entrypoints/openai/responses/test_serving_stateless.py +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -8,8 +8,10 @@ - Protocol-level validation (pydantic model constraints on ResponsesRequest) - State carrier injection helpers (_build_state_carrier / _extract_state_from_response) - Error paths when enable_store=False (retrieve, cancel, previous_response_id) +- Error path when previous_response lacks a state carrier (no include= on prior turn) +- background=True rejected when previous_response is set - utils.py skipping of state-carrier items -- construct_input_messages with prev_messages +- construct_input_messages contract (prev_msg prepended before new turn) """ from http import HTTPStatus @@ -104,11 +106,28 @@ def test_previous_response_silently_clears_store(self): ) assert req.store is False - def test_no_previous_response_preserves_store_true(self): - from vllm.entrypoints.openai.responses.protocol import ResponsesRequest + def test_background_with_previous_response_raises(self): + """background=True + previous_response must be rejected. - req = ResponsesRequest(model="test", input="hello", store=True) - assert req.store is True + The mode='before' validator allows background=True when store=True + (the default). Our mode='after' validator must then catch that + previous_response was also set and raise — otherwise the request + would produce an unretrievable background response (store gets + silently cleared to False, but background remains True). + """ + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + + fake_prev = ResponsesResponse.model_construct(id="resp_fake") + with pytest.raises(Exception, match="background"): + ResponsesRequest( + model="test", + input="hello", + background=True, + previous_response=fake_prev, + ) # --------------------------------------------------------------------------- @@ -205,6 +224,88 @@ async def test_cancel_without_store_unknown_id_returns_404(self): assert isinstance(result, ErrorResponse) assert result.error.code == HTTPStatus.NOT_FOUND + @pytest.mark.asyncio + async def test_cancel_without_store_active_task_returns_400(self): + """With store off but a matching in-flight task, cancel cancels it and + returns 400 (not 404, not 501) — the task was cancelled but no stored + response object can be returned in stateless mode. + + This is the success branch of the stateless cancel path and was the + actual behavior added by the review fix. + """ + import asyncio + + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + + # Simulate an in-flight background task that blocks until cancelled. + async def _blocking(): + await asyncio.sleep(9999) + + task = asyncio.ensure_future(_blocking()) + # Yield once so the task starts and reaches its first await point; + # otherwise cancel() on an unstarted task may mark it cancelled before + # the coroutine body ever runs, but task.cancelled() is still True. + await asyncio.sleep(0) + + serving.background_tasks["resp_inflight"] = task + + result = await OpenAIServingResponses.cancel_responses(serving, "resp_inflight") + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "stateless mode" in result.error.message + assert task.cancelled(), "task was not cancelled" + + @pytest.mark.asyncio + async def test_previous_response_without_carrier_returns_400(self): + """previous_response with no state carrier + store disabled → 400. + + Regression test for P1 review finding: if a client passes + previous_response but the prior turn was generated without + include=['reasoning.encrypted_content'], _extract_state_from_response + returns None. With store disabled there is no msg_store fallback, so + we must return a clear 400 rather than letting msg_store[id] raise + KeyError → 500. + """ + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + # Build a previous response whose output has NO state carrier item. + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", type="message", role="assistant", + content=[text], status="completed", + ) + prev_resp = ResponsesResponse.model_construct( + id="resp_old", output=[msg] + ) + + serving = _make_minimal_serving(enable_store=False) + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + req = ResponsesRequest( + model="test", + input="What is my name?", + store=False, + previous_response=prev_resp, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "state carrier" in result.error.message + @pytest.mark.asyncio async def test_previous_response_id_without_store_returns_400(self): """create_responses should reject previous_response_id when store is off.""" @@ -291,7 +392,14 @@ def test_external_encrypted_content_still_raises(self): class TestPrevMessagesOverride: - def test_prev_messages_used_over_empty_msg_store(self): + def test_construct_input_messages_prepends_prev_msg(self): + """construct_input_messages correctly prepends a deserialized history + list (prev_msg) before the new user turn. + + This is the contract the stateless path relies on: after extracting + message history from the encrypted_content carrier, the history is + passed as prev_msg and must appear in order before the new input. + """ from vllm.entrypoints.openai.responses.utils import construct_input_messages prev = [ @@ -307,21 +415,3 @@ def test_prev_messages_used_over_empty_msg_store(self): assert result[0] == {"role": "user", "content": "Who are you?"} assert result[1] == {"role": "assistant", "content": "I am helpful."} assert result[2] == {"role": "user", "content": "What did you say?"} - - def test_full_stateless_roundtrip(self): - """serialize → embed in carrier → extract → same messages.""" - from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses - - serving = _make_minimal_serving() - original = [ - {"role": "system", "content": "Be helpful."}, - {"role": "user", "content": "My name is Alice"}, - {"role": "assistant", "content": "Hello, Alice!"}, - ] - carrier = OpenAIServingResponses._build_state_carrier(serving, original, "req_abc123456789") - - mock_response = MagicMock() - mock_response.output = [carrier] - - recovered = OpenAIServingResponses._extract_state_from_response(serving, mock_response) - assert recovered == original diff --git a/vllm/entrypoints/openai/responses/protocol.py b/vllm/entrypoints/openai/responses/protocol.py index 980c1bfb6047..661c0ac8957a 100644 --- a/vllm/entrypoints/openai/responses/protocol.py +++ b/vllm/entrypoints/openai/responses/protocol.py @@ -398,10 +398,22 @@ def validate_previous_response_xor_id(self) -> "ResponsesRequest": "'previous_response' (with include=['reasoning.encrypted_content'] " "and store=false) for stateless multi-turn." ) - if self.previous_response is not None and self.store: - # Stateless path: store is meaningless (and would cause confusion). - # Mirror the silent-disable behavior in create_responses for store=True. - self.store = False + if self.previous_response is not None: + if self.background: + # background mode requires store=True to retrieve the response + # later, but the stateless path forces store=False. Raise + # explicitly rather than silently producing an unretrievable + # background response. + raise ValueError( + "'background' mode cannot be used with 'previous_response'. " + "Stateless multi-turn (previous_response + " + "include=['reasoning.encrypted_content']) does not support " + "background responses." + ) + if self.store: + # Stateless path: store is meaningless (and would cause + # confusion). Mirror the silent-disable in create_responses. + self.store = False return self @model_validator(mode="before") diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 49d629faeff6..8162aeda2352 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -399,6 +399,22 @@ async def create_responses( prev_messages_from_state: list | None = None if prev_response is not None and request.previous_response is not None: prev_messages_from_state = self._extract_state_from_response(prev_response) + if prev_messages_from_state is None and not self.enable_store: + # The caller passed previous_response but omitted + # include=['reasoning.encrypted_content'] on the prior turn, + # so no state carrier is present. With store disabled there is + # no fallback — return a clear 400 rather than a KeyError 500. + return self.create_error_response( + err_type="invalid_request_error", + message=( + "The provided 'previous_response' does not contain a " + "vLLM state carrier. To use stateless multi-turn, " + "generate the previous response with " + "include=['reasoning.encrypted_content'] and " + "store=false." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) try: lora_request = self._maybe_get_adapters(request) From 36612a14717bc527b678bb9163695715d0ac54b2 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Mon, 2 Mar 2026 08:18:08 -0500 Subject: [PATCH 04/11] fix: incomplete carrier history, unconditional carrier guard, short key validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gemini critical — non-Harmony state carrier missing the assistant turn: carrier_messages was only the input messages, omitting the assistant response just generated. The next turn would see history without the last assistant message. Fix: append construct_chat_messages_with_tool_call(response.output) so the carrier contains the full turn (input + response). Codex P1 — carrierless previous_response check gated on enable_store: The guard 'if prev_messages_from_state is None and not self.enable_store' was too narrow. previous_response always means stateless path; a server restart with enable_store=True and empty msg_store would still KeyError. Fix: drop the 'not self.enable_store' condition. Codex P2 — any-length hex key accepted as signing key: bytes.fromhex('aa') produces 1 byte — a weak HMAC key. Fix: enforce len(key_bytes) >= 32 (64 hex chars) and raise ValueError if too short. Tests: - test_previous_response_without_carrier_store_enabled_returns_400: regression for P1 (store=True path also returns 400, not KeyError) - test_short_key_raises: regression for P2 (4-byte key raises) Run pre-commit --all-files; apply linter reformatting. Signed-off-by: Will Deines --- .../responses/test_serving_stateless.py | 125 +++++++++++++++--- .../openai/responses/test_state.py | 22 ++- vllm/entrypoints/openai/responses/serving.py | 36 +++-- vllm/entrypoints/openai/responses/state.py | 13 +- 4 files changed, 155 insertions(+), 41 deletions(-) diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py index e33900264e6a..55bb848303bf 100644 --- a/tests/entrypoints/openai/responses/test_serving_stateless.py +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -139,12 +139,18 @@ def test_background_with_previous_response_raises(self): class TestStateCarrierHelpers: def test_build_and_extract_roundtrip(self): from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses serving = _make_minimal_serving() - messages = [{"role": "user", "content": "hi"}, {"role": "assistant", "content": "hello"}] + messages = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + ] - carrier = OpenAIServingResponses._build_state_carrier(serving, messages, "test_req_id_12345") + carrier = OpenAIServingResponses._build_state_carrier( + serving, messages, "test_req_id_12345" + ) assert isinstance(carrier, ResponseReasoningItem) assert carrier.type == "reasoning" @@ -154,24 +160,33 @@ def test_build_and_extract_roundtrip(self): mock_response = MagicMock() mock_response.output = [carrier] - recovered = OpenAIServingResponses._extract_state_from_response(serving, mock_response) + recovered = OpenAIServingResponses._extract_state_from_response( + serving, mock_response + ) assert recovered == messages def test_extract_returns_none_when_no_carrier(self): from openai.types.responses import ResponseOutputMessage, ResponseOutputText + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses serving = _make_minimal_serving() text = ResponseOutputText(type="output_text", text="hi", annotations=[]) msg = ResponseOutputMessage( - id="msg_1", type="message", role="assistant", - content=[text], status="completed", + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", ) mock_response = MagicMock() mock_response.output = [msg] - assert OpenAIServingResponses._extract_state_from_response(serving, mock_response) is None + assert ( + OpenAIServingResponses._extract_state_from_response(serving, mock_response) + is None + ) def test_extract_raises_on_tampered_carrier(self): from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses @@ -282,12 +297,13 @@ async def test_previous_response_without_carrier_returns_400(self): # Build a previous response whose output has NO state carrier item. text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) msg = ResponseOutputMessage( - id="msg_1", type="message", role="assistant", - content=[text], status="completed", - ) - prev_resp = ResponsesResponse.model_construct( - id="resp_old", output=[msg] + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", ) + prev_resp = ResponsesResponse.model_construct(id="resp_old", output=[msg]) serving = _make_minimal_serving(enable_store=False) serving.engine_client = MagicMock() @@ -306,6 +322,53 @@ async def test_previous_response_without_carrier_returns_400(self): assert result.error.code == HTTPStatus.BAD_REQUEST assert "state carrier" in result.error.message + @pytest.mark.asyncio + async def test_previous_response_without_carrier_store_enabled_returns_400(self): + """previous_response with no state carrier returns 400 even when store is on. + + Regression for Codex P1: the original guard was gated on + `not self.enable_store`, so with store=True and a carrierless + previous_response the code fell through to msg_store[prev_response.id] + which raised KeyError → 500. previous_response always means stateless + path; carrier absence is always a client error. + """ + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + prev_resp = ResponsesResponse.model_construct(id="resp_old", output=[msg]) + + # store is ENABLED — this is the case the original guard missed + serving = _make_minimal_serving(enable_store=True) + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + req = ResponsesRequest( + model="test", + input="What is my name?", + store=False, + previous_response=prev_resp, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "state carrier" in result.error.message + @pytest.mark.asyncio async def test_previous_response_id_without_store_returns_400(self): """create_responses should reject previous_response_id when store is off.""" @@ -341,18 +404,27 @@ class TestUtilsStateCarrierSkipping: def test_construct_chat_messages_skips_state_carrier(self): from openai.types.responses import ResponseOutputMessage, ResponseOutputText from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.state import serialize_state - from vllm.entrypoints.openai.responses.utils import construct_chat_messages_with_tool_call + from vllm.entrypoints.openai.responses.utils import ( + construct_chat_messages_with_tool_call, + ) blob = serialize_state([{"role": "user", "content": "prev"}]) carrier = ResponseReasoningItem( - id="rs_state_abc", type="reasoning", summary=[], - status="completed", encrypted_content=blob, + id="rs_state_abc", + type="reasoning", + summary=[], + status="completed", + encrypted_content=blob, ) text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) msg = ResponseOutputMessage( - id="msg_1", type="message", role="assistant", - content=[text], status="completed", + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", ) result = construct_chat_messages_with_tool_call([carrier, msg]) @@ -363,22 +435,33 @@ def test_construct_chat_messages_skips_state_carrier(self): def test_single_message_from_state_carrier_is_none(self): from openai.types.responses.response_reasoning_item import ResponseReasoningItem + from vllm.entrypoints.openai.responses.state import serialize_state - from vllm.entrypoints.openai.responses.utils import _construct_single_message_from_response_item + from vllm.entrypoints.openai.responses.utils import ( + _construct_single_message_from_response_item, + ) blob = serialize_state([{"role": "user", "content": "hi"}]) carrier = ResponseReasoningItem( - id="rs_state_xyz", type="reasoning", summary=[], - status="completed", encrypted_content=blob, + id="rs_state_xyz", + type="reasoning", + summary=[], + status="completed", + encrypted_content=blob, ) assert _construct_single_message_from_response_item(carrier) is None def test_external_encrypted_content_still_raises(self): from openai.types.responses.response_reasoning_item import ResponseReasoningItem - from vllm.entrypoints.openai.responses.utils import _construct_single_message_from_response_item + + from vllm.entrypoints.openai.responses.utils import ( + _construct_single_message_from_response_item, + ) external = ResponseReasoningItem( - id="rs_ext", type="reasoning", summary=[], + id="rs_ext", + type="reasoning", + summary=[], status="completed", encrypted_content="opaque-blob-not-from-vllm", ) diff --git a/tests/entrypoints/openai/responses/test_state.py b/tests/entrypoints/openai/responses/test_state.py index 1d5ba9eb8254..c23ded58fff9 100644 --- a/tests/entrypoints/openai/responses/test_state.py +++ b/tests/entrypoints/openai/responses/test_state.py @@ -3,9 +3,6 @@ """Unit tests for the stateless Responses API state carrier (state.py).""" -import importlib -import os - import pytest # --------------------------------------------------------------------------- @@ -29,7 +26,8 @@ def _reset_signing_key(): def isolated_key(monkeypatch): """Each test gets a fresh, deterministic signing key.""" monkeypatch.setenv( - "VLLM_RESPONSES_STATE_SIGNING_KEY", "ab" * 32 # 64 hex chars = 32 bytes + "VLLM_RESPONSES_STATE_SIGNING_KEY", + "ab" * 32, # 64 hex chars = 32 bytes ) _reset_signing_key() yield @@ -236,3 +234,19 @@ def test_invalid_hex_key_raises(monkeypatch): with pytest.raises(ValueError, match="valid hex string"): state_mod._get_signing_key() + + +def test_short_key_raises(monkeypatch): + """A key shorter than 32 bytes (64 hex chars) must be rejected. + + Short HMAC keys weaken tamper protection; enforce minimum length so + misconfigured deployments fail loudly rather than silently degrading + security. 'aa' decodes to 1 byte — well below the 32-byte minimum. + """ + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "aa" * 4) # 4 bytes + state_mod._SIGNING_KEY = None + + with pytest.raises(ValueError, match="minimum of 32 bytes"): + state_mod._get_signing_key() diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 8162aeda2352..81887438318f 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -399,11 +399,15 @@ async def create_responses( prev_messages_from_state: list | None = None if prev_response is not None and request.previous_response is not None: prev_messages_from_state = self._extract_state_from_response(prev_response) - if prev_messages_from_state is None and not self.enable_store: - # The caller passed previous_response but omitted - # include=['reasoning.encrypted_content'] on the prior turn, - # so no state carrier is present. With store disabled there is - # no fallback — return a clear 400 rather than a KeyError 500. + if prev_messages_from_state is None: + # The caller passed previous_response but the prior response + # contains no vLLM state carrier — most likely because + # include=['reasoning.encrypted_content'] was omitted on the + # prior turn. previous_response always implies the stateless + # path; there is no msg_store fallback regardless of + # enable_store (msg_store is keyed by response ID, not carried + # in the response object). Return a clear 400 rather than + # letting msg_store[prev_response.id] raise a KeyError 500. return self.create_error_response( err_type="invalid_request_error", message=( @@ -971,9 +975,21 @@ async def responses_full_generator( ): carrier_messages: list | None = None if self.use_harmony and isinstance(context, HarmonyContext): + # Harmony context already contains the full history including + # the assistant turn that was just generated. carrier_messages = list(context.messages) elif messages is not None: - carrier_messages = messages + # Non-Harmony path: `messages` is the input to the LLM for + # this turn. Append the assistant's response output so the + # carrier contains the complete history (input + response) + # for the next turn to build on. + from vllm.entrypoints.openai.responses.utils import ( + construct_chat_messages_with_tool_call, + ) + + carrier_messages = messages + construct_chat_messages_with_tool_call( + response.output + ) if carrier_messages is not None: state_carrier = self._build_state_carrier( carrier_messages, request.request_id @@ -1257,9 +1273,7 @@ def _construct_input_messages_with_harmony( # Stateless path: messages came from the encrypted_content # state carrier; deserialize dicts back to OpenAIHarmonyMessage. prev_msgs = [ - OpenAIHarmonyMessage.model_validate(m) - if isinstance(m, dict) - else m + OpenAIHarmonyMessage.model_validate(m) if isinstance(m, dict) else m for m in prev_messages ] else: @@ -1501,9 +1515,7 @@ def _build_state_carrier( encrypted_content=serialize_state(messages), ) - def _extract_state_from_response( - self, response: ResponsesResponse - ) -> list | None: + def _extract_state_from_response(self, response: ResponsesResponse) -> list | None: """Extract the serialized message history from a response's state carrier. Scans ``response.output`` for a vLLM state-carrier ReasoningItem and diff --git a/vllm/entrypoints/openai/responses/state.py b/vllm/entrypoints/openai/responses/state.py index 58e01b432bd7..79c9bba7fa70 100644 --- a/vllm/entrypoints/openai/responses/state.py +++ b/vllm/entrypoints/openai/responses/state.py @@ -46,12 +46,19 @@ def _get_signing_key() -> bytes: key_hex = os.environ.get("VLLM_RESPONSES_STATE_SIGNING_KEY", "") if key_hex: try: - _SIGNING_KEY = bytes.fromhex(key_hex) + key_bytes = bytes.fromhex(key_hex) except ValueError as exc: raise ValueError( "VLLM_RESPONSES_STATE_SIGNING_KEY must be a valid hex " "string (e.g. a 64-char / 32-byte hex key)." ) from exc + if len(key_bytes) < 32: + raise ValueError( + f"VLLM_RESPONSES_STATE_SIGNING_KEY decoded to only " + f"{len(key_bytes)} byte(s); a minimum of 32 bytes " + f"(64 hex characters) is required for a secure HMAC-SHA256 key." + ) + _SIGNING_KEY = key_bytes else: _SIGNING_KEY = os.urandom(32) logger.warning( @@ -88,9 +95,7 @@ def serialize_state(messages: list[Any]) -> str: payload_b64 = base64.b64encode( json.dumps(messages, default=_harmony_serializer).encode() ).decode() - sig = hmac.new( - _get_signing_key(), payload_b64.encode(), hashlib.sha256 - ).hexdigest() + sig = hmac.new(_get_signing_key(), payload_b64.encode(), hashlib.sha256).hexdigest() return f"{_FORMAT_VERSION}:{payload_b64}:{sig}" From 9944d81dbf072a151f6f40083612385e25f1224a Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 09:05:38 -0500 Subject: [PATCH 05/11] fix: remove unused TYPE_CHECKING import to pass pre-commit ruff check Signed-off-by: Will Deines --- vllm/entrypoints/openai/responses/protocol.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/responses/protocol.py b/vllm/entrypoints/openai/responses/protocol.py index 661c0ac8957a..84e611c13d6f 100644 --- a/vllm/entrypoints/openai/responses/protocol.py +++ b/vllm/entrypoints/openai/responses/protocol.py @@ -4,7 +4,7 @@ # Adapted from # https://github.com/lm-sys/FastChat/blob/168ccc29d3f7edc50823016105c024fe2282732a/fastchat/protocol/openai_api_protocol.py import time -from typing import TYPE_CHECKING, Any, Literal, TypeAlias +from typing import Any, Literal, TypeAlias from openai.types.responses import ( ResponseCodeInterpreterCallCodeDeltaEvent, From a5654cb2900b428c8daea9be6bd579fe530ad982 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 10:10:29 -0500 Subject: [PATCH 06/11] feat(responses): pluggable ResponseStore abstraction Extract the three in-memory dicts (response_store, msg_store, response_store_lock) from OpenAIServingResponses into a pluggable ResponseStore ABC with an InMemoryResponseStore default. Users can point VLLM_RESPONSES_STORE_BACKEND to a fully-qualified class name to swap in their own backend (Redis, Postgres, etc.) without patching vLLM. - Add ResponseStore ABC with 5 abstract methods + close() hook - Add InMemoryResponseStore wrapping current dict behavior with internal asyncio.Lock (removes external response_store_lock) - Add create_response_store() factory reading env var - Refactor ~15 call sites in serving.py to use self.store.* - Add VLLM_RESPONSES_STORE_BACKEND env var to envs.py - Update test helper to use InMemoryResponseStore - Add unit + integration tests for store and serving interactions Follows up on #35740 (stateless multi-turn). Addresses RFC #26934 (pluggable state backends) and supersedes #34738 (LRU eviction). Signed-off-by: Will Deines --- .gitignore | 3 + .../responses/test_serving_stateless.py | 8 +- .../openai/responses/test_store.py | 257 ++++++++++++++++++ vllm/entrypoints/openai/responses/serving.py | 109 ++++---- vllm/entrypoints/openai/responses/store.py | 155 +++++++++++ vllm/envs.py | 7 + 6 files changed, 480 insertions(+), 59 deletions(-) create mode 100644 tests/entrypoints/openai/responses/test_store.py create mode 100644 vllm/entrypoints/openai/responses/store.py diff --git a/.gitignore b/.gitignore index d62536cfb91d..b8a689a92d05 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Local e2e testing harness (not committed) +/local_test/ + # version file generated by setuptools-scm /vllm/_version.py diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py index 55bb848303bf..802ba9031f6b 100644 --- a/tests/entrypoints/openai/responses/test_serving_stateless.py +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -45,19 +45,17 @@ def _make_minimal_serving(enable_store: bool = False): set exactly what is needed. Attributes touched by retrieve_responses / cancel_responses: - enable_store, response_store, response_store_lock, background_tasks, + enable_store, store, background_tasks, log_error_stack (for create_error_response from base class) Attributes touched by create_responses (up to store check): models (for _check_model), engine_client (for errored check) """ - import asyncio - from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + from vllm.entrypoints.openai.responses.store import InMemoryResponseStore obj = OpenAIServingResponses.__new__(OpenAIServingResponses) obj.enable_store = enable_store - obj.response_store = {} - obj.response_store_lock = asyncio.Lock() + obj.store = InMemoryResponseStore() obj.background_tasks = {} obj.log_error_stack = False # _check_model needs models.is_base_model to return True (model found) diff --git a/tests/entrypoints/openai/responses/test_store.py b/tests/entrypoints/openai/responses/test_store.py new file mode 100644 index 000000000000..c844351aad12 --- /dev/null +++ b/tests/entrypoints/openai/responses/test_store.py @@ -0,0 +1,257 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""Unit tests for the pluggable ResponseStore abstraction. + +Tests focus on the business-logic methods (unless_status guard, +update_response_status transitions) and the factory's env-var loading. +Integration tests verify that serving.py actually uses the store correctly +through its public methods. +""" + +from http import HTTPStatus + +import pytest + +from vllm.entrypoints.openai.responses.store import ( + InMemoryResponseStore, + ResponseStore, + create_response_store, +) + + +def _make_response(response_id: str = "resp_1", status: str = "completed"): + """Create a minimal ResponsesResponse via model_construct.""" + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + return ResponsesResponse.model_construct(id=response_id, status=status) + + +# --------------------------------------------------------------------------- +# put_response unless_status guard — the cancelled-response protection +# --------------------------------------------------------------------------- + + +class TestPutResponseUnlessStatus: + """Tests for the unless_status guard on put_response. + + In production this prevents a completed response from overwriting + a cancelled one (serving.py finalize path). + """ + + @pytest.mark.asyncio + async def test_skips_write_when_existing_status_matches(self): + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "cancelled")) + + new = _make_response("r1", "completed") + ok = await store.put_response("r1", new, unless_status="cancelled") + + assert ok is False + assert (await store.get_response("r1")).status == "cancelled" + + @pytest.mark.asyncio + async def test_allows_write_when_existing_status_differs(self): + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "queued")) + + new = _make_response("r1", "completed") + ok = await store.put_response("r1", new, unless_status="cancelled") + + assert ok is True + assert (await store.get_response("r1")).status == "completed" + + @pytest.mark.asyncio + async def test_allows_write_when_nothing_stored_yet(self): + store = InMemoryResponseStore() + new = _make_response("r1", "completed") + ok = await store.put_response("r1", new, unless_status="cancelled") + + assert ok is True + assert await store.get_response("r1") is new + + +# --------------------------------------------------------------------------- +# update_response_status — atomic status transitions +# --------------------------------------------------------------------------- + + +class TestUpdateResponseStatus: + """Tests for the atomic status transition method. + + In production this is used by: + - _run_background_request: queued/in_progress -> failed + - cancel_responses: queued/in_progress -> cancelled + """ + + @pytest.mark.asyncio + async def test_transitions_when_current_status_is_allowed(self): + """The actual production path: status IS in allowed set.""" + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "queued")) + + result = await store.update_response_status( + "r1", "failed", allowed_current_statuses={"queued", "in_progress"} + ) + + assert result is not None + assert result.status == "failed" + # Verify the stored object was mutated (not a copy) + assert (await store.get_response("r1")).status == "failed" + + @pytest.mark.asyncio + async def test_rejects_when_current_status_not_in_allowed_set(self): + """e.g. trying to fail a completed response — should be a no-op.""" + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "completed")) + + result = await store.update_response_status( + "r1", "failed", allowed_current_statuses={"queued", "in_progress"} + ) + + assert result is None + assert (await store.get_response("r1")).status == "completed" + + @pytest.mark.asyncio + async def test_returns_none_when_not_found(self): + store = InMemoryResponseStore() + result = await store.update_response_status("ghost", "failed") + assert result is None + + @pytest.mark.asyncio + async def test_unconditional_update_when_no_allowed_set(self): + """Without allowed_current_statuses, any transition succeeds.""" + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "completed")) + + result = await store.update_response_status("r1", "cancelled") + + assert result is not None + assert result.status == "cancelled" + + +# --------------------------------------------------------------------------- +# Factory — env var loading +# --------------------------------------------------------------------------- + + +class TestFactory: + def test_default_returns_in_memory(self, monkeypatch): + monkeypatch.delenv("VLLM_RESPONSES_STORE_BACKEND", raising=False) + import vllm.envs as envs_mod + + envs_mod._ENVS_INITIALIZED = False + store = create_response_store() + assert isinstance(store, InMemoryResponseStore) + + def test_custom_backend_loaded_via_qualname(self, monkeypatch): + monkeypatch.setenv( + "VLLM_RESPONSES_STORE_BACKEND", + "vllm.entrypoints.openai.responses.store.InMemoryResponseStore", + ) + import vllm.envs as envs_mod + + envs_mod._ENVS_INITIALIZED = False + store = create_response_store() + assert isinstance(store, InMemoryResponseStore) + + def test_non_subclass_raises_type_error(self, monkeypatch): + monkeypatch.setenv("VLLM_RESPONSES_STORE_BACKEND", "builtins.int") + import vllm.envs as envs_mod + + envs_mod._ENVS_INITIALIZED = False + with pytest.raises(TypeError, match="not a ResponseStore subclass"): + create_response_store() + + def test_abc_cannot_be_instantiated(self): + with pytest.raises(TypeError): + ResponseStore() # type: ignore[abstract] + + +# --------------------------------------------------------------------------- +# Integration: serving.py cancel_responses uses the store correctly +# --------------------------------------------------------------------------- + + +def _make_minimal_serving(enable_store: bool = True): + """Minimal OpenAIServingResponses with a real InMemoryResponseStore.""" + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + obj = OpenAIServingResponses.__new__(OpenAIServingResponses) + obj.enable_store = enable_store + obj.store = InMemoryResponseStore() + obj.background_tasks = {} + obj.log_error_stack = False + return obj + + +class TestServingCancelIntegration: + """Verify cancel_responses interacts with the store correctly. + + These tests exercise the refactored cancel_responses code that replaced + the old get-check-mutate-under-lock pattern with + store.update_response_status. + """ + + @pytest.mark.asyncio + async def test_cancel_queued_response_succeeds(self): + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_bg", status="queued") + await serving.store.put_response("resp_bg", resp) + + result = await serving.cancel_responses("resp_bg") + + assert isinstance(result, ResponsesResponse) + assert result.status == "cancelled" + + @pytest.mark.asyncio + async def test_cancel_completed_response_returns_error(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_done", status="completed") + await serving.store.put_response("resp_done", resp) + + result = await serving.cancel_responses("resp_done") + + assert isinstance(result, ErrorResponse) + assert "Cannot cancel" in result.error.message + + @pytest.mark.asyncio + async def test_cancel_unknown_id_returns_404(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + result = await serving.cancel_responses("resp_ghost") + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_FOUND + + +class TestServingRetrieveIntegration: + """Verify retrieve_responses reads from the store correctly.""" + + @pytest.mark.asyncio + async def test_retrieve_stored_response(self): + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_1", status="completed") + await serving.store.put_response("resp_1", resp) + + result = await serving.retrieve_responses("resp_1", None, False) + + assert isinstance(result, ResponsesResponse) + assert result is resp + + @pytest.mark.asyncio + async def test_retrieve_unknown_id_returns_404(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + result = await serving.retrieve_responses("resp_ghost", None, False) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_FOUND diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 81887438318f..0c97c009327c 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -44,7 +44,6 @@ from vllm.config.utils import replace from vllm.engine.protocol import EngineClient from vllm.entrypoints.chat_utils import ( - ChatCompletionMessageParam, ChatTemplateContentFormatOption, ) from vllm.entrypoints.logger import RequestLogger @@ -252,16 +251,12 @@ def __init__( self.tool_call_id_type = "random" self.enable_auto_tools = enable_auto_tools - # HACK(woosuk): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove responses from the store. - self.response_store: dict[str, ResponsesResponse] = {} - self.response_store_lock = asyncio.Lock() - # HACK(woosuk): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove messages from the store. - self.msg_store: dict[str, list[ChatCompletionMessageParam]] = {} + from vllm.entrypoints.openai.responses.store import ( + create_response_store, + ) + + self.store = create_response_store() # HACK(wuhang): This is a hack. We should use a better store. # FIXME: If enable_store=True, this may cause a memory leak since we @@ -386,8 +381,7 @@ async def create_responses( ), status_code=HTTPStatus.BAD_REQUEST, ) - async with self.response_store_lock: - prev_response = self.response_store.get(request.previous_response_id) + prev_response = await self.store.get_response(request.previous_response_id) if prev_response is None: return self._make_not_found_error(request.previous_response_id) elif request.previous_response is not None: @@ -395,7 +389,7 @@ async def create_responses( # For stateless multi-turn: extract Harmony/chat messages from the # encrypted_content state carrier embedded in the previous response. - # This replaces the msg_store lookup for subsequent turns. + # This replaces the store message lookup for subsequent turns. prev_messages_from_state: list | None = None if prev_response is not None and request.previous_response is not None: prev_messages_from_state = self._extract_state_from_response(prev_response) @@ -404,10 +398,9 @@ async def create_responses( # contains no vLLM state carrier — most likely because # include=['reasoning.encrypted_content'] was omitted on the # prior turn. previous_response always implies the stateless - # path; there is no msg_store fallback regardless of - # enable_store (msg_store is keyed by response ID, not carried - # in the response object). Return a clear 400 rather than - # letting msg_store[prev_response.id] raise a KeyError 500. + # path; there is no stored-message fallback regardless of + # enable_store (stored messages are keyed by response ID, not + # carried in the response object). Return a clear 400. return self.create_error_response( err_type="invalid_request_error", message=( @@ -425,7 +418,7 @@ async def create_responses( model_name = self.models.model_name(lora_request) if self.use_harmony: - messages, engine_prompts = self._make_request_with_harmony( + messages, engine_prompts = await self._make_request_with_harmony( request, prev_response, prev_messages_from_state ) else: @@ -562,7 +555,7 @@ async def create_responses( # Store the input messages. if request.store: - self.msg_store[request.request_id] = messages + await self.store.put_messages(request.request_id, messages) if request.background: created_time = int(time.time()) @@ -575,8 +568,7 @@ async def create_responses( status="queued", usage=None, ) - async with self.response_store_lock: - self.response_store[response.id] = response + await self.store.put_response(response.id, response) # Run the request in the background. if request.stream: @@ -656,11 +648,15 @@ async def _make_request( tool_dicts = construct_tool_dicts(request.tools, request.tool_choice) # Construct the input messages. # For stateless multi-turn, prev_messages comes from the encrypted_content - # state carrier; otherwise fall back to the in-memory msg_store. + # state carrier; otherwise fall back to the response store. prev_msg = ( prev_messages if prev_messages is not None - else (self.msg_store.get(prev_response.id) if prev_response else None) + else ( + await self.store.get_messages(prev_response.id) + if prev_response + else None + ) ) messages = construct_input_messages( request_instructions=request.instructions, @@ -783,7 +779,7 @@ async def _generate_with_builtin_tools( priority = orig_priority - 1 sub_request += 1 - def _make_request_with_harmony( + async def _make_request_with_harmony( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, @@ -794,7 +790,7 @@ def _make_request_with_harmony( "Only 'auto' tool_choice is supported in response API with Harmony" ) - messages = self._construct_input_messages_with_harmony( + messages = await self._construct_input_messages_with_harmony( request, prev_response, prev_messages ) prompt_token_ids = render_for_completion(messages) @@ -997,11 +993,9 @@ async def responses_full_generator( response.output.append(state_carrier) if request.store: - async with self.response_store_lock: - stored_response = self.response_store.get(response.id) - # If the response is already cancelled, don't update it. - if stored_response is None or stored_response.status != "cancelled": - self.response_store[response.id] = response + await self.store.put_response( + response.id, response, unless_status="cancelled" + ) return response def _topk_logprobs( @@ -1242,7 +1236,7 @@ def _construct_harmony_system_input_message( ) return sys_msg - def _construct_input_messages_with_harmony( + async def _construct_input_messages_with_harmony( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, @@ -1277,7 +1271,7 @@ def _construct_input_messages_with_harmony( for m in prev_messages ] else: - prev_msgs = self.msg_store[prev_response.id] + prev_msgs = await self.store.get_messages(prev_response.id) or [] # FIXME(woosuk): The slice-delete-reappend cycle below is # currently a no-op --- it removes messages then puts them all @@ -1347,6 +1341,15 @@ async def _run_background_request_stream( finally: new_event_signal.set() + if response is not None and isinstance(response, ErrorResponse): + # If the request has failed, update the status to "failed". + await self.store.update_response_status( + request.request_id, + "failed", + allowed_current_statuses={"queued", "in_progress"}, + ) + + async def _run_background_request( self, request: ResponsesRequest, @@ -1357,12 +1360,11 @@ async def _run_background_request( if isinstance(response, ErrorResponse): # If the request has failed, update the status to "failed". - response_id = request.request_id - async with self.response_store_lock: - stored_response = self.response_store.get(response_id) - assert stored_response is not None - if stored_response.status not in ("completed", "cancelled"): - stored_response.status = "failed" + await self.store.update_response_status( + request.request_id, + "failed", + allowed_current_statuses={"queued", "in_progress"}, + ) async def responses_background_stream_generator( self, @@ -1416,8 +1418,7 @@ async def retrieve_responses( status_code=HTTPStatus.NOT_IMPLEMENTED, ) - async with self.response_store_lock: - response = self.response_store.get(response_id) + response = await self.store.get_response(response_id) if response is None: return self._make_not_found_error(response_id) @@ -1462,21 +1463,21 @@ async def cancel_responses( status_code=HTTPStatus.BAD_REQUEST, ) - async with self.response_store_lock: - response = self.response_store.get(response_id) - if response is None: + response = await self.store.update_response_status( + response_id, + "cancelled", + allowed_current_statuses={"queued", "in_progress"}, + ) + if response is None: + # Either not found or not in a cancellable state. + stored = await self.store.get_response(response_id) + if stored is None: return self._make_not_found_error(response_id) - - prev_status = response.status - if prev_status not in ("queued", "in_progress"): - return self.create_error_response( - err_type="invalid_request_error", - message="Cannot cancel a synchronous response.", - param="response_id", - ) - - # Update the status to "cancelled". - response.status = "cancelled" + return self.create_error_response( + err_type="invalid_request_error", + message="Cannot cancel a synchronous response.", + param="response_id", + ) # Abort the request. if task := self.background_tasks.get(response_id): diff --git a/vllm/entrypoints/openai/responses/store.py b/vllm/entrypoints/openai/responses/store.py new file mode 100644 index 000000000000..98a5a089b0d1 --- /dev/null +++ b/vllm/entrypoints/openai/responses/store.py @@ -0,0 +1,155 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import asyncio +import logging +from abc import ABC, abstractmethod + +from openai.types.responses import ResponseStatus + +from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + +logger = logging.getLogger(__name__) + + +class ResponseStore(ABC): + """Abstract interface for persisting Responses API state. + + vLLM ships an in-memory default (``InMemoryResponseStore``). Users who + need persistence or multi-node consistency can implement this interface and + point ``VLLM_RESPONSES_STORE_BACKEND`` to their class's fully-qualified + name (e.g. ``mypackage.redis_store.RedisResponseStore``). + """ + + @abstractmethod + async def get_response(self, response_id: str) -> ResponsesResponse | None: + """Return the stored response, or ``None`` if not found.""" + + @abstractmethod + async def put_response( + self, + response_id: str, + response: ResponsesResponse, + *, + unless_status: ResponseStatus | None = None, + ) -> bool: + """Store *response* under *response_id*. + + If *unless_status* is given and the **currently stored** response + already has that status, the write is skipped and ``False`` is + returned. Otherwise the write succeeds and ``True`` is returned. + """ + + @abstractmethod + async def update_response_status( + self, + response_id: str, + new_status: ResponseStatus, + *, + allowed_current_statuses: set[ResponseStatus] | None = None, + ) -> ResponsesResponse | None: + """Atomically transition a response's status. + + If *allowed_current_statuses* is given the update only proceeds when + the current status is in the set; otherwise the call is a no-op and + returns ``None``. + + Returns the (possibly updated) ``ResponsesResponse``, or ``None`` + when the response was not found or the transition was rejected. + """ + + @abstractmethod + async def get_messages(self, response_id: str) -> list | None: + """Return the stored input messages for a response, or ``None``.""" + + @abstractmethod + async def put_messages(self, response_id: str, messages: list) -> None: + """Store the input messages for a response.""" + + async def close(self) -> None: + """Release resources. Override to clean up connections/pools.""" + return + + +class InMemoryResponseStore(ResponseStore): + """Default in-memory implementation — wraps the previous dict behaviour. + + All response mutations are guarded by an internal ``asyncio.Lock`` so + callers no longer need an external ``response_store_lock``. + """ + + def __init__(self) -> None: + self._responses: dict[str, ResponsesResponse] = {} + self._messages: dict[str, list] = {} + self._lock = asyncio.Lock() + + async def get_response(self, response_id: str) -> ResponsesResponse | None: + async with self._lock: + return self._responses.get(response_id) + + async def put_response( + self, + response_id: str, + response: ResponsesResponse, + *, + unless_status: ResponseStatus | None = None, + ) -> bool: + async with self._lock: + existing = self._responses.get(response_id) + if ( + unless_status is not None + and existing is not None + and existing.status == unless_status + ): + return False + self._responses[response_id] = response + return True + + async def update_response_status( + self, + response_id: str, + new_status: ResponseStatus, + *, + allowed_current_statuses: set[ResponseStatus] | None = None, + ) -> ResponsesResponse | None: + async with self._lock: + response = self._responses.get(response_id) + if response is None: + return None + if ( + allowed_current_statuses is not None + and response.status not in allowed_current_statuses + ): + return None + response.status = new_status + return response + + async def get_messages(self, response_id: str) -> list | None: + return self._messages.get(response_id) + + async def put_messages(self, response_id: str, messages: list) -> None: + self._messages[response_id] = messages + + +def create_response_store() -> ResponseStore: + """Factory that returns a ``ResponseStore`` instance. + + Checks ``VLLM_RESPONSES_STORE_BACKEND`` — if set, loads the class via + ``resolve_obj_by_qualname``; otherwise returns ``InMemoryResponseStore``. + """ + from vllm import envs + + backend = envs.VLLM_RESPONSES_STORE_BACKEND + if not backend: + return InMemoryResponseStore() + + from vllm.utils.import_utils import resolve_obj_by_qualname + + cls = resolve_obj_by_qualname(backend) + if not (isinstance(cls, type) and issubclass(cls, ResponseStore)): + raise TypeError( + f"VLLM_RESPONSES_STORE_BACKEND={backend!r} resolved to " + f"{cls!r}, which is not a ResponseStore subclass." + ) + logger.info("Using custom response store backend: %s", backend) + return cls() diff --git a/vllm/envs.py b/vllm/envs.py index cd460a0a26a6..5c512af4f192 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -199,6 +199,7 @@ VLLM_LOOPBACK_IP: str = "" VLLM_ALLOW_CHUNKED_LOCAL_ATTN_WITH_HYBRID_KV_CACHE: bool = True VLLM_ENABLE_RESPONSES_API_STORE: bool = False + VLLM_RESPONSES_STORE_BACKEND: str = "" VLLM_RESPONSES_STATE_SIGNING_KEY: str = "" VLLM_NVFP4_GEMM_BACKEND: str | None = None VLLM_HAS_FLASHINFER_CUBIN: bool = False @@ -1460,6 +1461,12 @@ def _get_or_set_default() -> str: "VLLM_ENABLE_RESPONSES_API_STORE": lambda: bool( int(os.getenv("VLLM_ENABLE_RESPONSES_API_STORE", "0")) ), + # Fully-qualified class name of a custom ResponseStore backend. + # When set, vLLM loads this class instead of InMemoryResponseStore. + # Example: "mypackage.redis_store.RedisResponseStore" + "VLLM_RESPONSES_STORE_BACKEND": lambda: os.environ.get( + "VLLM_RESPONSES_STORE_BACKEND", "" + ), # Hex-encoded 32-byte (64-char) signing key for stateless multi-turn # state carriers (RFC #26934). If not set, a random key is generated at # startup (incompatible across restarts / multi-node deployments). From 8d0dd2cb44bd41b994971c853b9519766c469ecc Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 10:33:03 -0500 Subject: [PATCH 07/11] fix: guard message store access with lock in InMemoryResponseStore get_messages/put_messages were accessing self._messages without the internal asyncio.Lock, inconsistent with how self._responses is protected. Adds lock acquisition for both methods to prevent race conditions in concurrent access. Signed-off-by: Will Deines --- vllm/entrypoints/openai/responses/store.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm/entrypoints/openai/responses/store.py b/vllm/entrypoints/openai/responses/store.py index 98a5a089b0d1..595281203f69 100644 --- a/vllm/entrypoints/openai/responses/store.py +++ b/vllm/entrypoints/openai/responses/store.py @@ -125,10 +125,12 @@ async def update_response_status( return response async def get_messages(self, response_id: str) -> list | None: - return self._messages.get(response_id) + async with self._lock: + return self._messages.get(response_id) async def put_messages(self, response_id: str, messages: list) -> None: - self._messages[response_id] = messages + async with self._lock: + self._messages[response_id] = messages def create_response_store() -> ResponseStore: From 2d277c0f3ebfc5c0b1d91f2dfc39e58351593bca Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 16:10:15 -0500 Subject: [PATCH 08/11] fix: use rsplit for more robust state carrier parsing Signed-off-by: Will Deines --- vllm/entrypoints/openai/responses/state.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/vllm/entrypoints/openai/responses/state.py b/vllm/entrypoints/openai/responses/state.py index 79c9bba7fa70..c645aacff2ae 100644 --- a/vllm/entrypoints/openai/responses/state.py +++ b/vllm/entrypoints/openai/responses/state.py @@ -117,14 +117,16 @@ def deserialize_state(encrypted_content: str) -> list[Any] | None: if not encrypted_content.startswith(f"{_FORMAT_VERSION}:"): return None # Expected: "vllm:1::" - # Split into exactly 4 parts on the first 3 colons. - parts = encrypted_content.split(":", 3) - if len(parts) != 4: + # Strip the prefix, then split payload from signature from the right + # so future format version changes (e.g. "vllm:2:alpha") don't break. + suffix = encrypted_content[len(f"{_FORMAT_VERSION}:"):] + try: + payload_b64, sig = suffix.rsplit(":", 1) + except ValueError as exc: raise ValueError( "Malformed vLLM state carrier: expected " f"'{_FORMAT_VERSION}::', got {encrypted_content!r}" - ) - _, _, payload_b64, sig = parts + ) from exc expected = hmac.new( _get_signing_key(), payload_b64.encode(), hashlib.sha256 ).hexdigest() From e6feaa0c5a0e1c1195f2ff60710858e451c54ff2 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Tue, 3 Mar 2026 16:37:39 -0500 Subject: [PATCH 09/11] style: fix ruff-format slice spacing in state carrier parsing Signed-off-by: Will Deines --- vllm/entrypoints/openai/responses/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/responses/state.py b/vllm/entrypoints/openai/responses/state.py index c645aacff2ae..cd85cd18a4ab 100644 --- a/vllm/entrypoints/openai/responses/state.py +++ b/vllm/entrypoints/openai/responses/state.py @@ -119,7 +119,7 @@ def deserialize_state(encrypted_content: str) -> list[Any] | None: # Expected: "vllm:1::" # Strip the prefix, then split payload from signature from the right # so future format version changes (e.g. "vllm:2:alpha") don't break. - suffix = encrypted_content[len(f"{_FORMAT_VERSION}:"):] + suffix = encrypted_content[len(f"{_FORMAT_VERSION}:") :] try: payload_b64, sig = suffix.rsplit(":", 1) except ValueError as exc: From d154088942f53c4be4577aa987a9bfb2c7f5c6f7 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Wed, 4 Mar 2026 11:46:10 -0500 Subject: [PATCH 10/11] fix: remove local_test/ from .gitignore Handled by .git/info/exclude on feature branches, force-added on production/garrio-release. Signed-off-by: Will Deines --- .gitignore | 3 --- vllm/entrypoints/openai/responses/serving.py | 11 ++++++++--- vllm/entrypoints/openai/responses/state.py | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index b8a689a92d05..d62536cfb91d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,3 @@ -# Local e2e testing harness (not committed) -/local_test/ - # version file generated by setuptools-scm /vllm/_version.py diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 0c97c009327c..aa7ea8b702f6 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -11,6 +11,7 @@ from http import HTTPStatus from typing import Any, Final +import jinja2 from fastapi import Request from openai.types.responses import ( ResponseContentPartAddedEvent, @@ -1333,11 +1334,17 @@ async def _run_background_request_stream( event_deque: deque[StreamingResponsesResponse] = deque() new_event_signal = asyncio.Event() self.event_store[request.request_id] = (event_deque, new_event_signal) - generator = self.responses_stream_generator(request, *args, **kwargs) + response = None try: + generator = self.responses_stream_generator(request, *args, **kwargs) async for event in generator: event_deque.append(event) new_event_signal.set() # Signal new event available + except GenerationError as e: + response = self._convert_generation_error_to_response(e) + except Exception as e: + logger.exception("Background request failed for %s", request.request_id) + response = self.create_error_response(e) finally: new_event_signal.set() @@ -1349,7 +1356,6 @@ async def _run_background_request_stream( allowed_current_statuses={"queued", "in_progress"}, ) - async def _run_background_request( self, request: ResponsesRequest, @@ -1553,7 +1559,6 @@ def _make_store_not_supported_error(self) -> ErrorResponse: param="store", ) - async def _process_simple_streaming_events( self, request: ResponsesRequest, diff --git a/vllm/entrypoints/openai/responses/state.py b/vllm/entrypoints/openai/responses/state.py index cd85cd18a4ab..be7bf42cf0af 100644 --- a/vllm/entrypoints/openai/responses/state.py +++ b/vllm/entrypoints/openai/responses/state.py @@ -25,13 +25,14 @@ startup, making state carriers incompatible across restarts/replicas. """ -import base64 import hashlib import hmac import json import os from typing import Any +import pybase64 as base64 + from vllm.logger import init_logger logger = init_logger(__name__) From 5eb720237326576e81b00358ca4bab7e1e243f19 Mon Sep 17 00:00:00 2001 From: Will Deines Date: Wed, 18 Mar 2026 14:00:17 -0400 Subject: [PATCH 11/11] fix: address PR #35905 review comments (double-include, tampered carrier, cross-replica streaming) - Fix double-include of assistant output on stateless non-Harmony path by only passing prev_response_output when prev_messages is None - Catch ValueError from _extract_state_from_response before the try/except block so tampered HMAC returns 400 instead of 500 - Add event_store pre-check in retrieve_responses for streaming: terminal responses fall back to full response, in-progress ones return 400 directing to non-streaming retrieval - Document cross-replica cancel limitation near background_tasks lookup Signed-off-by: Will Deines --- .../responses/test_serving_stateless.py | 117 ++++++++++++++++++ .../openai/responses/test_store.py | 41 ++++++ vllm/entrypoints/openai/responses/serving.py | 39 +++++- 3 files changed, 195 insertions(+), 2 deletions(-) diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py index 802ba9031f6b..09e8d3c4b690 100644 --- a/tests/entrypoints/openai/responses/test_serving_stateless.py +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -472,6 +472,123 @@ def test_external_encrypted_content_still_raises(self): # --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- +# Fix 1: Double-include prevention on stateless non-Harmony path +# --------------------------------------------------------------------------- + + +class TestDoubleIncludePrevention: + def test_stateless_path_does_not_duplicate_assistant_message(self): + """When prev_messages is provided (stateless path), the assistant + output from prev_response.output must NOT be appended again. + + The state carrier already contains the full conversation history + including assistant messages. Passing prev_response_output to + construct_input_messages would duplicate them. + """ + from vllm.entrypoints.openai.responses.utils import construct_input_messages + + assistant_text = "I am a helpful assistant." + prev_messages = [ + {"role": "user", "content": "Who are you?"}, + {"role": "assistant", "content": assistant_text}, + ] + + # Simulate the stateless path: prev_messages is not None. + # prev_response_output should be None (the fix). + result = construct_input_messages( + request_instructions=None, + request_input="What did you say?", + prev_msg=prev_messages, + prev_response_output=None, + ) + + assistant_msgs = [m for m in result if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1, ( + f"Expected exactly 1 assistant message, got {len(assistant_msgs)}: " + f"{assistant_msgs}" + ) + assert assistant_msgs[0]["content"] == assistant_text + + def test_store_path_still_includes_assistant_output(self): + """When prev_messages is None (store-backed path), prev_response_output + must still be used to add the assistant turn. + """ + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.responses.utils import construct_input_messages + + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + + result = construct_input_messages( + request_instructions=None, + request_input="What did you say?", + prev_msg=None, + prev_response_output=[msg], + ) + + assistant_msgs = [m for m in result if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1 + assert assistant_msgs[0]["content"] == "Hello!" + + +# --------------------------------------------------------------------------- +# Fix 2: Tampered state carrier returns 400, not 500 +# --------------------------------------------------------------------------- + + +class TestTamperedCarrierReturns400: + @pytest.mark.asyncio + async def test_tampered_hmac_returns_400(self): + """A previous_response with a tampered HMAC in its state carrier + must return a 400 error, not raise an uncaught ValueError → 500. + """ + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + # Build a valid carrier then corrupt the HMAC. + carrier = OpenAIServingResponses._build_state_carrier( + serving, + [{"role": "user", "content": "hi"}], + "req_123", + ) + parts = carrier.encrypted_content.split(":", 3) + parts[3] = "0" * 64 # Replace HMAC with garbage + carrier.encrypted_content = ":".join(parts) + + prev_resp = ResponsesResponse.model_construct( + id="resp_tampered", output=[carrier] + ) + + req = ResponsesRequest( + model="test", + input="follow up", + store=False, + previous_response=prev_resp, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "integrity" in result.error.message.lower() + + class TestPrevMessagesOverride: def test_construct_input_messages_prepends_prev_msg(self): """construct_input_messages correctly prepends a deserialized history diff --git a/tests/entrypoints/openai/responses/test_store.py b/tests/entrypoints/openai/responses/test_store.py index c844351aad12..e6bebc0916c3 100644 --- a/tests/entrypoints/openai/responses/test_store.py +++ b/tests/entrypoints/openai/responses/test_store.py @@ -181,6 +181,7 @@ def _make_minimal_serving(enable_store: bool = True): obj.enable_store = enable_store obj.store = InMemoryResponseStore() obj.background_tasks = {} + obj.event_store = {} obj.log_error_stack = False return obj @@ -255,3 +256,43 @@ async def test_retrieve_unknown_id_returns_404(self): assert isinstance(result, ErrorResponse) assert result.error.code == HTTPStatus.NOT_FOUND + + @pytest.mark.asyncio + async def test_stream_retrieve_completed_without_event_buffer_returns_response( + self, + ): + """Streaming retrieval for a terminal response that has no local + event buffer (e.g. processed on another replica) should fall back + to returning the full response instead of raising. + """ + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_remote", status="completed") + await serving.store.put_response("resp_remote", resp) + + # No entry in event_store — simulates cross-replica scenario. + result = await serving.retrieve_responses("resp_remote", None, True) + + assert isinstance(result, ResponsesResponse) + assert result is resp + + @pytest.mark.asyncio + async def test_stream_retrieve_in_progress_without_event_buffer_returns_400( + self, + ): + """Streaming retrieval for an in-progress response with no local + event buffer should return a 400 error directing the client to use + non-streaming retrieval. + """ + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_other", status="in_progress") + await serving.store.put_response("resp_other", resp) + + result = await serving.retrieve_responses("resp_other", None, True) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "non-streaming" in result.error.message.lower() diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index aa7ea8b702f6..d65f4d9be74d 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -393,7 +393,20 @@ async def create_responses( # This replaces the store message lookup for subsequent turns. prev_messages_from_state: list | None = None if prev_response is not None and request.previous_response is not None: - prev_messages_from_state = self._extract_state_from_response(prev_response) + try: + prev_messages_from_state = self._extract_state_from_response( + prev_response + ) + except ValueError as e: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "The state carrier in 'previous_response' failed " + f"integrity validation: {e}. The response may have " + "been tampered with or signed with a different key." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) if prev_messages_from_state is None: # The caller passed previous_response but the prior response # contains no vLLM state carrier — most likely because @@ -663,7 +676,9 @@ async def _make_request( request_instructions=request.instructions, request_input=request.input, prev_msg=prev_msg, - prev_response_output=prev_response.output if prev_response else None, + prev_response_output=prev_response.output + if prev_response and prev_messages is None + else None, ) _, engine_prompts = await self.openai_serving_render.preprocess_chat( @@ -1430,6 +1445,21 @@ async def retrieve_responses( return self._make_not_found_error(response_id) if stream: + if response_id not in self.event_store: + if response.status not in ("queued", "in_progress"): + # Terminal state on another replica — return full response. + return response + return self.create_error_response( + err_type="invalid_request_error", + message=( + f"Streaming retrieval is not available for response " + f"'{response_id}': no event buffer exists on this " + f"server instance. The response may have been " + f"processed on a different replica. Use non-streaming " + f"retrieval instead." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) return self.responses_background_stream_generator( response_id, starting_after, @@ -1486,6 +1516,11 @@ async def cancel_responses( ) # Abort the request. + # NOTE: With a shared ResponseStore across replicas, the CAS above + # marks the response as "cancelled" in the store, but the actual + # background task may be running on a different replica. We can only + # cancel process-local tasks here; the remote replica will discard its + # result at finalization via put_response(..., unless_status="cancelled"). if task := self.background_tasks.get(response_id): task.cancel() try: