diff --git a/tests/entrypoints/openai/responses/test_serving_stateless.py b/tests/entrypoints/openai/responses/test_serving_stateless.py new file mode 100644 index 000000000000..09e8d3c4b690 --- /dev/null +++ b/tests/entrypoints/openai/responses/test_serving_stateless.py @@ -0,0 +1,615 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +""" +Unit tests for stateless multi-turn Responses API (RFC #26934 + @grs). + +Covers: +- Protocol-level validation (pydantic model constraints on ResponsesRequest) +- State carrier injection helpers (_build_state_carrier / _extract_state_from_response) +- Error paths when enable_store=False (retrieve, cancel, previous_response_id) +- Error path when previous_response lacks a state carrier (no include= on prior turn) +- background=True rejected when previous_response is set +- utils.py skipping of state-carrier items +- construct_input_messages contract (prev_msg prepended before new turn) +""" + +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Signing key fixture — deterministic per test run +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def set_signing_key(monkeypatch): + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "cc" * 32) + import vllm.entrypoints.openai.responses.state as state_mod + + state_mod._SIGNING_KEY = None + yield + state_mod._SIGNING_KEY = None + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_minimal_serving(enable_store: bool = False): + """Return an OpenAIServingResponses instance with only the attributes + exercised by the methods under test. We bypass __init__ via __new__ and + set exactly what is needed. + + Attributes touched by retrieve_responses / cancel_responses: + enable_store, store, background_tasks, + log_error_stack (for create_error_response from base class) + Attributes touched by create_responses (up to store check): + models (for _check_model), engine_client (for errored check) + """ + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + from vllm.entrypoints.openai.responses.store import InMemoryResponseStore + + obj = OpenAIServingResponses.__new__(OpenAIServingResponses) + obj.enable_store = enable_store + obj.store = InMemoryResponseStore() + obj.background_tasks = {} + obj.log_error_stack = False + # _check_model needs models.is_base_model to return True (model found) + obj.models = MagicMock() + obj.models.is_base_model.return_value = True + # _validate_create_responses_input checks this before hitting the store guard + obj.use_harmony = False + return obj + + +# --------------------------------------------------------------------------- +# Protocol validation — pydantic model constraints +# --------------------------------------------------------------------------- + + +class TestProtocolValidation: + def test_previous_response_and_id_mutually_exclusive(self): + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + + # model_construct bypasses field validation so we get a typed instance + # without needing all required fields — Pydantic accepts it for type checks. + fake_prev = ResponsesResponse.model_construct(id="resp_fake") + with pytest.raises(Exception, match="Cannot set both"): + ResponsesRequest( + model="test", + input="hello", + previous_response_id="resp_abc", + previous_response=fake_prev, + ) + + def test_previous_response_silently_clears_store(self): + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + + fake_prev = ResponsesResponse.model_construct(id="resp_fake") + req = ResponsesRequest( + model="test", + input="hello", + store=True, + previous_response=fake_prev, + ) + assert req.store is False + + def test_background_with_previous_response_raises(self): + """background=True + previous_response must be rejected. + + The mode='before' validator allows background=True when store=True + (the default). Our mode='after' validator must then catch that + previous_response was also set and raise — otherwise the request + would produce an unretrievable background response (store gets + silently cleared to False, but background remains True). + """ + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + + fake_prev = ResponsesResponse.model_construct(id="resp_fake") + with pytest.raises(Exception, match="background"): + ResponsesRequest( + model="test", + input="hello", + background=True, + previous_response=fake_prev, + ) + + +# --------------------------------------------------------------------------- +# State carrier round-trip via serving helpers +# (thin wrappers over state.py — verified here to confirm the integration) +# --------------------------------------------------------------------------- + + +class TestStateCarrierHelpers: + def test_build_and_extract_roundtrip(self): + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + messages = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + ] + + carrier = OpenAIServingResponses._build_state_carrier( + serving, messages, "test_req_id_12345" + ) + + assert isinstance(carrier, ResponseReasoningItem) + assert carrier.type == "reasoning" + assert carrier.status == "completed" + assert carrier.encrypted_content.startswith("vllm:1:") + + mock_response = MagicMock() + mock_response.output = [carrier] + + recovered = OpenAIServingResponses._extract_state_from_response( + serving, mock_response + ) + assert recovered == messages + + def test_extract_returns_none_when_no_carrier(self): + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + + text = ResponseOutputText(type="output_text", text="hi", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + mock_response = MagicMock() + mock_response.output = [msg] + + assert ( + OpenAIServingResponses._extract_state_from_response(serving, mock_response) + is None + ) + + def test_extract_raises_on_tampered_carrier(self): + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving() + carrier = OpenAIServingResponses._build_state_carrier( + serving, [{"role": "user", "content": "hi"}], "req" + ) + + parts = carrier.encrypted_content.split(":", 3) + parts[3] = "0" * 64 + carrier.encrypted_content = ":".join(parts) + + mock_response = MagicMock() + mock_response.output = [carrier] + + with pytest.raises(ValueError, match="HMAC verification failed"): + OpenAIServingResponses._extract_state_from_response(serving, mock_response) + + +# --------------------------------------------------------------------------- +# Error paths when enable_store=False +# --------------------------------------------------------------------------- + + +class TestStorelessErrorPaths: + @pytest.mark.asyncio + async def test_retrieve_without_store_returns_501(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + result = await OpenAIServingResponses.retrieve_responses( + serving, "resp_123", None, False + ) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_IMPLEMENTED + assert "VLLM_ENABLE_RESPONSES_API_STORE" in result.error.message + + @pytest.mark.asyncio + async def test_cancel_without_store_unknown_id_returns_404(self): + """With store off and no matching background task, cancel returns 404.""" + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + result = await OpenAIServingResponses.cancel_responses(serving, "resp_123") + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_FOUND + + @pytest.mark.asyncio + async def test_cancel_without_store_active_task_returns_400(self): + """With store off but a matching in-flight task, cancel cancels it and + returns 400 (not 404, not 501) — the task was cancelled but no stored + response object can be returned in stateless mode. + + This is the success branch of the stateless cancel path and was the + actual behavior added by the review fix. + """ + import asyncio + + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + + # Simulate an in-flight background task that blocks until cancelled. + async def _blocking(): + await asyncio.sleep(9999) + + task = asyncio.ensure_future(_blocking()) + # Yield once so the task starts and reaches its first await point; + # otherwise cancel() on an unstarted task may mark it cancelled before + # the coroutine body ever runs, but task.cancelled() is still True. + await asyncio.sleep(0) + + serving.background_tasks["resp_inflight"] = task + + result = await OpenAIServingResponses.cancel_responses(serving, "resp_inflight") + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "stateless mode" in result.error.message + assert task.cancelled(), "task was not cancelled" + + @pytest.mark.asyncio + async def test_previous_response_without_carrier_returns_400(self): + """previous_response with no state carrier + store disabled → 400. + + Regression test for P1 review finding: if a client passes + previous_response but the prior turn was generated without + include=['reasoning.encrypted_content'], _extract_state_from_response + returns None. With store disabled there is no msg_store fallback, so + we must return a clear 400 rather than letting msg_store[id] raise + KeyError → 500. + """ + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + # Build a previous response whose output has NO state carrier item. + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + prev_resp = ResponsesResponse.model_construct(id="resp_old", output=[msg]) + + serving = _make_minimal_serving(enable_store=False) + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + req = ResponsesRequest( + model="test", + input="What is my name?", + store=False, + previous_response=prev_resp, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "state carrier" in result.error.message + + @pytest.mark.asyncio + async def test_previous_response_without_carrier_store_enabled_returns_400(self): + """previous_response with no state carrier returns 400 even when store is on. + + Regression for Codex P1: the original guard was gated on + `not self.enable_store`, so with store=True and a carrierless + previous_response the code fell through to msg_store[prev_response.id] + which raised KeyError → 500. previous_response always means stateless + path; carrier absence is always a client error. + """ + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + prev_resp = ResponsesResponse.model_construct(id="resp_old", output=[msg]) + + # store is ENABLED — this is the case the original guard missed + serving = _make_minimal_serving(enable_store=True) + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + req = ResponsesRequest( + model="test", + input="What is my name?", + store=False, + previous_response=prev_resp, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "state carrier" in result.error.message + + @pytest.mark.asyncio + async def test_previous_response_id_without_store_returns_400(self): + """create_responses should reject previous_response_id when store is off.""" + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ResponsesRequest + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + # Minimal extra attributes that create_responses reads before the store check + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + req = ResponsesRequest( + model="test", + input="hello", + previous_response_id="resp_old", + store=False, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "VLLM_ENABLE_RESPONSES_API_STORE" in result.error.message + + +# --------------------------------------------------------------------------- +# utils.py — state-carrier items are transparently skipped +# --------------------------------------------------------------------------- + + +class TestUtilsStateCarrierSkipping: + def test_construct_chat_messages_skips_state_carrier(self): + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + + from vllm.entrypoints.openai.responses.state import serialize_state + from vllm.entrypoints.openai.responses.utils import ( + construct_chat_messages_with_tool_call, + ) + + blob = serialize_state([{"role": "user", "content": "prev"}]) + carrier = ResponseReasoningItem( + id="rs_state_abc", + type="reasoning", + summary=[], + status="completed", + encrypted_content=blob, + ) + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + + result = construct_chat_messages_with_tool_call([carrier, msg]) + + assert len(result) == 1 + assert result[0]["role"] == "assistant" + assert result[0]["content"] == "Hello!" + + def test_single_message_from_state_carrier_is_none(self): + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + + from vllm.entrypoints.openai.responses.state import serialize_state + from vllm.entrypoints.openai.responses.utils import ( + _construct_single_message_from_response_item, + ) + + blob = serialize_state([{"role": "user", "content": "hi"}]) + carrier = ResponseReasoningItem( + id="rs_state_xyz", + type="reasoning", + summary=[], + status="completed", + encrypted_content=blob, + ) + assert _construct_single_message_from_response_item(carrier) is None + + def test_external_encrypted_content_still_raises(self): + from openai.types.responses.response_reasoning_item import ResponseReasoningItem + + from vllm.entrypoints.openai.responses.utils import ( + _construct_single_message_from_response_item, + ) + + external = ResponseReasoningItem( + id="rs_ext", + type="reasoning", + summary=[], + status="completed", + encrypted_content="opaque-blob-not-from-vllm", + ) + with pytest.raises(ValueError, match="not supported"): + _construct_single_message_from_response_item(external) + + +# --------------------------------------------------------------------------- +# construct_input_messages with prev_messages override +# --------------------------------------------------------------------------- + + +# --------------------------------------------------------------------------- +# Fix 1: Double-include prevention on stateless non-Harmony path +# --------------------------------------------------------------------------- + + +class TestDoubleIncludePrevention: + def test_stateless_path_does_not_duplicate_assistant_message(self): + """When prev_messages is provided (stateless path), the assistant + output from prev_response.output must NOT be appended again. + + The state carrier already contains the full conversation history + including assistant messages. Passing prev_response_output to + construct_input_messages would duplicate them. + """ + from vllm.entrypoints.openai.responses.utils import construct_input_messages + + assistant_text = "I am a helpful assistant." + prev_messages = [ + {"role": "user", "content": "Who are you?"}, + {"role": "assistant", "content": assistant_text}, + ] + + # Simulate the stateless path: prev_messages is not None. + # prev_response_output should be None (the fix). + result = construct_input_messages( + request_instructions=None, + request_input="What did you say?", + prev_msg=prev_messages, + prev_response_output=None, + ) + + assistant_msgs = [m for m in result if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1, ( + f"Expected exactly 1 assistant message, got {len(assistant_msgs)}: " + f"{assistant_msgs}" + ) + assert assistant_msgs[0]["content"] == assistant_text + + def test_store_path_still_includes_assistant_output(self): + """When prev_messages is None (store-backed path), prev_response_output + must still be used to add the assistant turn. + """ + from openai.types.responses import ResponseOutputMessage, ResponseOutputText + + from vllm.entrypoints.openai.responses.utils import construct_input_messages + + text = ResponseOutputText(type="output_text", text="Hello!", annotations=[]) + msg = ResponseOutputMessage( + id="msg_1", + type="message", + role="assistant", + content=[text], + status="completed", + ) + + result = construct_input_messages( + request_instructions=None, + request_input="What did you say?", + prev_msg=None, + prev_response_output=[msg], + ) + + assistant_msgs = [m for m in result if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1 + assert assistant_msgs[0]["content"] == "Hello!" + + +# --------------------------------------------------------------------------- +# Fix 2: Tampered state carrier returns 400, not 500 +# --------------------------------------------------------------------------- + + +class TestTamperedCarrierReturns400: + @pytest.mark.asyncio + async def test_tampered_hmac_returns_400(self): + """A previous_response with a tampered HMAC in its state carrier + must return a 400 error, not raise an uncaught ValueError → 500. + """ + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + from vllm.entrypoints.openai.responses.protocol import ( + ResponsesRequest, + ResponsesResponse, + ) + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + serving = _make_minimal_serving(enable_store=False) + serving.engine_client = MagicMock() + serving.engine_client.errored = False + + # Build a valid carrier then corrupt the HMAC. + carrier = OpenAIServingResponses._build_state_carrier( + serving, + [{"role": "user", "content": "hi"}], + "req_123", + ) + parts = carrier.encrypted_content.split(":", 3) + parts[3] = "0" * 64 # Replace HMAC with garbage + carrier.encrypted_content = ":".join(parts) + + prev_resp = ResponsesResponse.model_construct( + id="resp_tampered", output=[carrier] + ) + + req = ResponsesRequest( + model="test", + input="follow up", + store=False, + previous_response=prev_resp, + ) + + result = await OpenAIServingResponses.create_responses(serving, req) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "integrity" in result.error.message.lower() + + +class TestPrevMessagesOverride: + def test_construct_input_messages_prepends_prev_msg(self): + """construct_input_messages correctly prepends a deserialized history + list (prev_msg) before the new user turn. + + This is the contract the stateless path relies on: after extracting + message history from the encrypted_content carrier, the history is + passed as prev_msg and must appear in order before the new input. + """ + from vllm.entrypoints.openai.responses.utils import construct_input_messages + + prev = [ + {"role": "user", "content": "Who are you?"}, + {"role": "assistant", "content": "I am helpful."}, + ] + result = construct_input_messages( + request_instructions=None, + request_input="What did you say?", + prev_msg=prev, + prev_response_output=None, + ) + assert result[0] == {"role": "user", "content": "Who are you?"} + assert result[1] == {"role": "assistant", "content": "I am helpful."} + assert result[2] == {"role": "user", "content": "What did you say?"} diff --git a/tests/entrypoints/openai/responses/test_state.py b/tests/entrypoints/openai/responses/test_state.py new file mode 100644 index 000000000000..c23ded58fff9 --- /dev/null +++ b/tests/entrypoints/openai/responses/test_state.py @@ -0,0 +1,252 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""Unit tests for the stateless Responses API state carrier (state.py).""" + +import pytest + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _reset_signing_key(): + """Force state.py to re-derive the signing key on next call.""" + import vllm.entrypoints.openai.responses.state as state_mod + + state_mod._SIGNING_KEY = None + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def isolated_key(monkeypatch): + """Each test gets a fresh, deterministic signing key.""" + monkeypatch.setenv( + "VLLM_RESPONSES_STATE_SIGNING_KEY", + "ab" * 32, # 64 hex chars = 32 bytes + ) + _reset_signing_key() + yield + _reset_signing_key() + + +# --------------------------------------------------------------------------- +# Import after env is patched (in case module was already imported) +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def state(): + import vllm.entrypoints.openai.responses.state as m + + return m + + +# --------------------------------------------------------------------------- +# Round-trip tests +# --------------------------------------------------------------------------- + + +def test_roundtrip_plain_dicts(state): + messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + blob = state.serialize_state(messages) + recovered = state.deserialize_state(blob) + assert recovered == messages + + +def test_roundtrip_empty_list(state): + blob = state.serialize_state([]) + recovered = state.deserialize_state(blob) + assert recovered == [] + + +def test_roundtrip_nested_structure(state): + messages = [ + { + "role": "user", + "content": [{"type": "text", "text": "What is 2+2?"}], + }, + {"role": "assistant", "content": "4", "extra": {"key": [1, 2, 3]}}, + ] + blob = state.serialize_state(messages) + recovered = state.deserialize_state(blob) + assert recovered == messages + + +def test_roundtrip_pydantic_model(state): + """Objects with model_dump() should serialize transparently.""" + + class FakeModel: + def model_dump(self): + return {"author": {"role": "user"}, "content": "hi"} + + messages = [FakeModel()] + blob = state.serialize_state(messages) + recovered = state.deserialize_state(blob) + # After JSON round-trip, FakeModel becomes a plain dict + assert recovered == [{"author": {"role": "user"}, "content": "hi"}] + + +# --------------------------------------------------------------------------- +# is_state_carrier +# --------------------------------------------------------------------------- + + +def test_is_state_carrier_true(state): + blob = state.serialize_state([{"role": "user", "content": "hi"}]) + + class FakeItem: + encrypted_content = blob + + assert state.is_state_carrier(FakeItem()) + + +def test_is_state_carrier_false_external(state): + """Real encrypted_content from external models should not be detected.""" + + class FakeItem: + encrypted_content = "some-opaque-blob-from-openai" + + assert not state.is_state_carrier(FakeItem()) + + +def test_is_state_carrier_false_no_field(state): + class FakeItem: + pass + + assert not state.is_state_carrier(FakeItem()) + + +def test_is_state_carrier_false_none(state): + class FakeItem: + encrypted_content = None + + assert not state.is_state_carrier(FakeItem()) + + +# --------------------------------------------------------------------------- +# deserialize_state — non-carrier inputs +# --------------------------------------------------------------------------- + + +def test_deserialize_returns_none_for_non_carrier(state): + assert state.deserialize_state("some-random-opaque-string") is None + + +def test_deserialize_returns_none_for_empty_string(state): + assert state.deserialize_state("") is None + + +# --------------------------------------------------------------------------- +# HMAC tamper detection +# --------------------------------------------------------------------------- + + +def test_tampered_payload_raises(state): + blob = state.serialize_state([{"role": "user", "content": "original"}]) + # Corrupt the payload part (index 2 when split on ':') + parts = blob.split(":", 3) + assert len(parts) == 4 + parts[2] = parts[2][:-4] + "XXXX" # corrupt the base64 payload + tampered = ":".join(parts) + with pytest.raises(ValueError, match="HMAC verification failed"): + state.deserialize_state(tampered) + + +def test_tampered_sig_raises(state): + blob = state.serialize_state([{"role": "user", "content": "hello"}]) + parts = blob.split(":", 3) + parts[3] = "0" * 64 # replace HMAC with zeros + tampered = ":".join(parts) + with pytest.raises(ValueError, match="HMAC verification failed"): + state.deserialize_state(tampered) + + +def test_malformed_carrier_raises(state): + malformed = "vllm:1:onlythreeparts" + with pytest.raises(ValueError, match="Malformed vLLM state carrier"): + state.deserialize_state(malformed) + + +# --------------------------------------------------------------------------- +# Cross-key incompatibility +# --------------------------------------------------------------------------- + + +def test_different_keys_are_incompatible(monkeypatch): + """A blob signed with key A must not validate with key B.""" + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "aa" * 32) + state_mod._SIGNING_KEY = None + blob = state_mod.serialize_state([{"role": "user", "content": "secret"}]) + + # Switch to a different key + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "bb" * 32) + state_mod._SIGNING_KEY = None + + with pytest.raises(ValueError, match="HMAC verification failed"): + state_mod.deserialize_state(blob) + + +# --------------------------------------------------------------------------- +# Random-key warning (no env var) +# --------------------------------------------------------------------------- + + +def test_no_env_var_generates_random_key(monkeypatch): + """Without the env var, a random 32-byte key is generated. + + The warning is emitted via vLLM's logger (visible in test output) but is + not capturable via capsys/caplog since vLLM writes to sys.__stdout__ directly. + """ + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.delenv("VLLM_RESPONSES_STATE_SIGNING_KEY", raising=False) + state_mod._SIGNING_KEY = None + + key = state_mod._get_signing_key() + + assert key is not None + assert len(key) == 32 + # A second call returns the same cached key (warning only fires once) + key2 = state_mod._get_signing_key() + assert key == key2 + + +# --------------------------------------------------------------------------- +# Invalid hex key +# --------------------------------------------------------------------------- + + +def test_invalid_hex_key_raises(monkeypatch): + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "not-valid-hex!!") + state_mod._SIGNING_KEY = None + + with pytest.raises(ValueError, match="valid hex string"): + state_mod._get_signing_key() + + +def test_short_key_raises(monkeypatch): + """A key shorter than 32 bytes (64 hex chars) must be rejected. + + Short HMAC keys weaken tamper protection; enforce minimum length so + misconfigured deployments fail loudly rather than silently degrading + security. 'aa' decodes to 1 byte — well below the 32-byte minimum. + """ + import vllm.entrypoints.openai.responses.state as state_mod + + monkeypatch.setenv("VLLM_RESPONSES_STATE_SIGNING_KEY", "aa" * 4) # 4 bytes + state_mod._SIGNING_KEY = None + + with pytest.raises(ValueError, match="minimum of 32 bytes"): + state_mod._get_signing_key() diff --git a/tests/entrypoints/openai/responses/test_store.py b/tests/entrypoints/openai/responses/test_store.py new file mode 100644 index 000000000000..e6bebc0916c3 --- /dev/null +++ b/tests/entrypoints/openai/responses/test_store.py @@ -0,0 +1,298 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""Unit tests for the pluggable ResponseStore abstraction. + +Tests focus on the business-logic methods (unless_status guard, +update_response_status transitions) and the factory's env-var loading. +Integration tests verify that serving.py actually uses the store correctly +through its public methods. +""" + +from http import HTTPStatus + +import pytest + +from vllm.entrypoints.openai.responses.store import ( + InMemoryResponseStore, + ResponseStore, + create_response_store, +) + + +def _make_response(response_id: str = "resp_1", status: str = "completed"): + """Create a minimal ResponsesResponse via model_construct.""" + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + return ResponsesResponse.model_construct(id=response_id, status=status) + + +# --------------------------------------------------------------------------- +# put_response unless_status guard — the cancelled-response protection +# --------------------------------------------------------------------------- + + +class TestPutResponseUnlessStatus: + """Tests for the unless_status guard on put_response. + + In production this prevents a completed response from overwriting + a cancelled one (serving.py finalize path). + """ + + @pytest.mark.asyncio + async def test_skips_write_when_existing_status_matches(self): + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "cancelled")) + + new = _make_response("r1", "completed") + ok = await store.put_response("r1", new, unless_status="cancelled") + + assert ok is False + assert (await store.get_response("r1")).status == "cancelled" + + @pytest.mark.asyncio + async def test_allows_write_when_existing_status_differs(self): + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "queued")) + + new = _make_response("r1", "completed") + ok = await store.put_response("r1", new, unless_status="cancelled") + + assert ok is True + assert (await store.get_response("r1")).status == "completed" + + @pytest.mark.asyncio + async def test_allows_write_when_nothing_stored_yet(self): + store = InMemoryResponseStore() + new = _make_response("r1", "completed") + ok = await store.put_response("r1", new, unless_status="cancelled") + + assert ok is True + assert await store.get_response("r1") is new + + +# --------------------------------------------------------------------------- +# update_response_status — atomic status transitions +# --------------------------------------------------------------------------- + + +class TestUpdateResponseStatus: + """Tests for the atomic status transition method. + + In production this is used by: + - _run_background_request: queued/in_progress -> failed + - cancel_responses: queued/in_progress -> cancelled + """ + + @pytest.mark.asyncio + async def test_transitions_when_current_status_is_allowed(self): + """The actual production path: status IS in allowed set.""" + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "queued")) + + result = await store.update_response_status( + "r1", "failed", allowed_current_statuses={"queued", "in_progress"} + ) + + assert result is not None + assert result.status == "failed" + # Verify the stored object was mutated (not a copy) + assert (await store.get_response("r1")).status == "failed" + + @pytest.mark.asyncio + async def test_rejects_when_current_status_not_in_allowed_set(self): + """e.g. trying to fail a completed response — should be a no-op.""" + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "completed")) + + result = await store.update_response_status( + "r1", "failed", allowed_current_statuses={"queued", "in_progress"} + ) + + assert result is None + assert (await store.get_response("r1")).status == "completed" + + @pytest.mark.asyncio + async def test_returns_none_when_not_found(self): + store = InMemoryResponseStore() + result = await store.update_response_status("ghost", "failed") + assert result is None + + @pytest.mark.asyncio + async def test_unconditional_update_when_no_allowed_set(self): + """Without allowed_current_statuses, any transition succeeds.""" + store = InMemoryResponseStore() + await store.put_response("r1", _make_response("r1", "completed")) + + result = await store.update_response_status("r1", "cancelled") + + assert result is not None + assert result.status == "cancelled" + + +# --------------------------------------------------------------------------- +# Factory — env var loading +# --------------------------------------------------------------------------- + + +class TestFactory: + def test_default_returns_in_memory(self, monkeypatch): + monkeypatch.delenv("VLLM_RESPONSES_STORE_BACKEND", raising=False) + import vllm.envs as envs_mod + + envs_mod._ENVS_INITIALIZED = False + store = create_response_store() + assert isinstance(store, InMemoryResponseStore) + + def test_custom_backend_loaded_via_qualname(self, monkeypatch): + monkeypatch.setenv( + "VLLM_RESPONSES_STORE_BACKEND", + "vllm.entrypoints.openai.responses.store.InMemoryResponseStore", + ) + import vllm.envs as envs_mod + + envs_mod._ENVS_INITIALIZED = False + store = create_response_store() + assert isinstance(store, InMemoryResponseStore) + + def test_non_subclass_raises_type_error(self, monkeypatch): + monkeypatch.setenv("VLLM_RESPONSES_STORE_BACKEND", "builtins.int") + import vllm.envs as envs_mod + + envs_mod._ENVS_INITIALIZED = False + with pytest.raises(TypeError, match="not a ResponseStore subclass"): + create_response_store() + + def test_abc_cannot_be_instantiated(self): + with pytest.raises(TypeError): + ResponseStore() # type: ignore[abstract] + + +# --------------------------------------------------------------------------- +# Integration: serving.py cancel_responses uses the store correctly +# --------------------------------------------------------------------------- + + +def _make_minimal_serving(enable_store: bool = True): + """Minimal OpenAIServingResponses with a real InMemoryResponseStore.""" + from vllm.entrypoints.openai.responses.serving import OpenAIServingResponses + + obj = OpenAIServingResponses.__new__(OpenAIServingResponses) + obj.enable_store = enable_store + obj.store = InMemoryResponseStore() + obj.background_tasks = {} + obj.event_store = {} + obj.log_error_stack = False + return obj + + +class TestServingCancelIntegration: + """Verify cancel_responses interacts with the store correctly. + + These tests exercise the refactored cancel_responses code that replaced + the old get-check-mutate-under-lock pattern with + store.update_response_status. + """ + + @pytest.mark.asyncio + async def test_cancel_queued_response_succeeds(self): + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_bg", status="queued") + await serving.store.put_response("resp_bg", resp) + + result = await serving.cancel_responses("resp_bg") + + assert isinstance(result, ResponsesResponse) + assert result.status == "cancelled" + + @pytest.mark.asyncio + async def test_cancel_completed_response_returns_error(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_done", status="completed") + await serving.store.put_response("resp_done", resp) + + result = await serving.cancel_responses("resp_done") + + assert isinstance(result, ErrorResponse) + assert "Cannot cancel" in result.error.message + + @pytest.mark.asyncio + async def test_cancel_unknown_id_returns_404(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + result = await serving.cancel_responses("resp_ghost") + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_FOUND + + +class TestServingRetrieveIntegration: + """Verify retrieve_responses reads from the store correctly.""" + + @pytest.mark.asyncio + async def test_retrieve_stored_response(self): + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_1", status="completed") + await serving.store.put_response("resp_1", resp) + + result = await serving.retrieve_responses("resp_1", None, False) + + assert isinstance(result, ResponsesResponse) + assert result is resp + + @pytest.mark.asyncio + async def test_retrieve_unknown_id_returns_404(self): + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + result = await serving.retrieve_responses("resp_ghost", None, False) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.NOT_FOUND + + @pytest.mark.asyncio + async def test_stream_retrieve_completed_without_event_buffer_returns_response( + self, + ): + """Streaming retrieval for a terminal response that has no local + event buffer (e.g. processed on another replica) should fall back + to returning the full response instead of raising. + """ + from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_remote", status="completed") + await serving.store.put_response("resp_remote", resp) + + # No entry in event_store — simulates cross-replica scenario. + result = await serving.retrieve_responses("resp_remote", None, True) + + assert isinstance(result, ResponsesResponse) + assert result is resp + + @pytest.mark.asyncio + async def test_stream_retrieve_in_progress_without_event_buffer_returns_400( + self, + ): + """Streaming retrieval for an in-progress response with no local + event buffer should return a 400 error directing the client to use + non-streaming retrieval. + """ + from vllm.entrypoints.openai.engine.protocol import ErrorResponse + + serving = _make_minimal_serving() + resp = _make_response("resp_other", status="in_progress") + await serving.store.put_response("resp_other", resp) + + result = await serving.retrieve_responses("resp_other", None, True) + + assert isinstance(result, ErrorResponse) + assert result.error.code == HTTPStatus.BAD_REQUEST + assert "non-streaming" in result.error.message.lower() diff --git a/vllm/entrypoints/openai/engine/serving.py b/vllm/entrypoints/openai/engine/serving.py index 405db1a134c1..9d5ccd118581 100644 --- a/vllm/entrypoints/openai/engine/serving.py +++ b/vllm/entrypoints/openai/engine/serving.py @@ -593,6 +593,16 @@ def _convert_generation_error_to_streaming_response( status_code=e.status_code, ) + def _convert_generation_error_to_response( + self, e: GenerationError + ) -> ErrorResponse: + """Convert GenerationError to error response.""" + return self.create_error_response( + str(e), + err_type="InternalServerError", + status_code=e.status_code, + ) + async def _check_model( self, request: AnyRequest, diff --git a/vllm/entrypoints/openai/responses/protocol.py b/vllm/entrypoints/openai/responses/protocol.py index 43fbba1dd43f..d6e0389f9a02 100644 --- a/vllm/entrypoints/openai/responses/protocol.py +++ b/vllm/entrypoints/openai/responses/protocol.py @@ -236,6 +236,13 @@ class ResponsesRequest(OpenAIBaseModel): # this cannot be used in conjunction with previous_response_id # TODO: consider supporting non harmony messages as well previous_input_messages: list[OpenAIHarmonyMessage | dict] | None = None + # Accept full previous response for stateless multi-turn (RFC #26934 + @grs). + # The client stores the full response object and passes it back on the next + # turn instead of a previous_response_id. vLLM extracts the Harmony message + # history from the encrypted_content state carrier embedded in the response + # output, so no server-side store is required. + # Cannot be set together with previous_response_id. + previous_response: "ResponsesResponse | None" = None structured_outputs: StructuredOutputsParams | None = Field( default=None, description="Additional kwargs for structured outputs", @@ -390,6 +397,33 @@ def is_include_output_logprobs(self) -> bool: and "message.output_text.logprobs" in self.include ) + @model_validator(mode="after") + def validate_previous_response_xor_id(self) -> "ResponsesRequest": + if self.previous_response_id and self.previous_response is not None: + raise ValueError( + "Cannot set both 'previous_response_id' and 'previous_response'. " + "Use 'previous_response_id' for store-backed multi-turn, or " + "'previous_response' (with include=['reasoning.encrypted_content'] " + "and store=false) for stateless multi-turn." + ) + if self.previous_response is not None: + if self.background: + # background mode requires store=True to retrieve the response + # later, but the stateless path forces store=False. Raise + # explicitly rather than silently producing an unretrievable + # background response. + raise ValueError( + "'background' mode cannot be used with 'previous_response'. " + "Stateless multi-turn (previous_response + " + "include=['reasoning.encrypted_content']) does not support " + "background responses." + ) + if self.store: + # Stateless path: store is meaningless (and would cause + # confusion). Mirror the silent-disable in create_responses. + self.store = False + return self + @model_validator(mode="before") @classmethod def validate_background(cls, data): @@ -667,3 +701,8 @@ class ResponseInProgressEvent(OpenAIResponseInProgressEvent): | ResponseMcpCallInProgressEvent | ResponseMcpCallCompletedEvent ) + +# Resolve forward reference: ResponsesRequest.previous_response -> ResponsesResponse +# Both classes are defined in this module; model_rebuild() is needed because +# ResponsesResponse did not exist when ResponsesRequest was first evaluated. +ResponsesRequest.model_rebuild() diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index 71d1945ae466..7a87b6a338d9 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -11,6 +11,7 @@ from http import HTTPStatus from typing import Any, Final +import jinja2 from fastapi import Request from openai.types.responses import ( ResponseContentPartAddedEvent, @@ -44,7 +45,6 @@ from vllm.config.utils import replace from vllm.engine.protocol import EngineClient from vllm.entrypoints.chat_utils import ( - ChatCompletionMessageParam, ChatTemplateContentFormatOption, get_tool_call_id_type, ) @@ -245,16 +245,12 @@ def __init__( self.tool_call_id_type = get_tool_call_id_type(self.model_config) self.enable_auto_tools = enable_auto_tools - # HACK(woosuk): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove responses from the store. - self.response_store: dict[str, ResponsesResponse] = {} - self.response_store_lock = asyncio.Lock() - # HACK(woosuk): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove messages from the store. - self.msg_store: dict[str, list[ChatCompletionMessageParam]] = {} + from vllm.entrypoints.openai.responses.store import ( + create_response_store, + ) + + self.store = create_response_store() # HACK(wuhang): This is a hack. We should use a better store. # FIXME: If enable_store=True, this may cause a memory leak since we @@ -321,6 +317,14 @@ def _validate_create_responses_input( status_code=HTTPStatus.BAD_REQUEST, param="previous_response_id", ) + if request.previous_input_messages and request.previous_response is not None: + return self.create_error_response( + err_type="invalid_request_error", + message="Only one of `previous_input_messages` and " + "`previous_response` can be set.", + status_code=HTTPStatus.BAD_REQUEST, + param="previous_response", + ) return None async def create_responses( @@ -355,25 +359,89 @@ async def create_responses( # value). request.store = False - # Handle the previous response ID. - prev_response_id = request.previous_response_id - if prev_response_id is not None: - async with self.response_store_lock: - prev_response = self.response_store.get(prev_response_id) + # Resolve the previous response (store-backed or stateless). + prev_response: ResponsesResponse | None = None + if request.previous_response_id is not None: + if not self.enable_store: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "`previous_response_id` requires the response store to be " + "enabled (VLLM_ENABLE_RESPONSES_API_STORE=1). " + "For stateless multi-turn without a persistent store, pass " + "the full previous response object via `previous_response` " + "with `include=['reasoning.encrypted_content']` and " + "`store=false`." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) + prev_response = await self.store.get_response(request.previous_response_id) if prev_response is None: - return self._make_not_found_error(prev_response_id) - else: - prev_response = None + return self._make_not_found_error(request.previous_response_id) + elif request.previous_response is not None: + prev_response = request.previous_response + + # For stateless multi-turn: extract Harmony/chat messages from the + # encrypted_content state carrier embedded in the previous response. + # This replaces the store message lookup for subsequent turns. + prev_messages_from_state: list | None = None + if prev_response is not None and request.previous_response is not None: + try: + prev_messages_from_state = self._extract_state_from_response( + prev_response + ) + except ValueError as e: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "The state carrier in 'previous_response' failed " + f"integrity validation: {e}. The response may have " + "been tampered with or signed with a different key." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) + if prev_messages_from_state is None: + # The caller passed previous_response but the prior response + # contains no vLLM state carrier — most likely because + # include=['reasoning.encrypted_content'] was omitted on the + # prior turn. previous_response always implies the stateless + # path; there is no stored-message fallback regardless of + # enable_store (stored messages are keyed by response ID, not + # carried in the response object). Return a clear 400. + return self.create_error_response( + err_type="invalid_request_error", + message=( + "The provided 'previous_response' does not contain a " + "vLLM state carrier. To use stateless multi-turn, " + "generate the previous response with " + "include=['reasoning.encrypted_content'] and " + "store=false." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) - lora_request = self._maybe_get_adapters(request) - model_name = self.models.model_name(lora_request) + try: + lora_request = self._maybe_get_adapters(request) + model_name = self.models.model_name(lora_request) - if self.use_harmony: - messages, engine_prompts = self._make_request_with_harmony( - request, prev_response - ) - else: - messages, engine_prompts = await self._make_request(request, prev_response) + if self.use_harmony: + messages, engine_prompts = await self._make_request_with_harmony( + request, prev_response, prev_messages_from_state + ) + else: + messages, engine_prompts = await self._make_request( + request, prev_response, prev_messages_from_state + ) + + except ( + ValueError, + TypeError, + RuntimeError, + jinja2.TemplateError, + NotImplementedError, + ) as e: + logger.exception("Error in preprocessing prompt inputs") + return self.create_error_response(e) request_metadata = RequestResponseMetadata(request_id=request.request_id) if raw_request: @@ -494,7 +562,7 @@ async def create_responses( # Store the input messages. if request.store: - self.msg_store[request.request_id] = messages + await self.store.put_messages(request.request_id, messages) if request.background: created_time = int(time.time()) @@ -507,8 +575,7 @@ async def create_responses( status="queued", usage=None, ) - async with self.response_store_lock: - self.response_store[response.id] = response + await self.store.put_response(response.id, response) # Run the request in the background. if request.stream: @@ -560,30 +627,51 @@ async def create_responses( model_name, tokenizer, request_metadata, + messages=messages, ) - return await self.responses_full_generator( - request, - sampling_params, - result_generator, - context, - model_name, - tokenizer, - request_metadata, - ) + try: + return await self.responses_full_generator( + request, + sampling_params, + result_generator, + context, + model_name, + tokenizer, + request_metadata, + messages=messages, + ) + except GenerationError as e: + return self._convert_generation_error_to_response(e) + except Exception as e: + return self.create_error_response(e) async def _make_request( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, + prev_messages: list | None = None, ): tool_dicts = construct_tool_dicts(request.tools, request.tool_choice) # Construct the input messages. + # For stateless multi-turn, prev_messages comes from the encrypted_content + # state carrier; otherwise fall back to the response store. + prev_msg = ( + prev_messages + if prev_messages is not None + else ( + await self.store.get_messages(prev_response.id) + if prev_response + else None + ) + ) messages = construct_input_messages( request_instructions=request.instructions, request_input=request.input, - prev_msg=self.msg_store.get(prev_response.id) if prev_response else None, - prev_response_output=prev_response.output if prev_response else None, + prev_msg=prev_msg, + prev_response_output=prev_response.output + if prev_response and prev_messages is None + else None, ) _, engine_prompts = await self.openai_serving_render.preprocess_chat( @@ -700,10 +788,11 @@ async def _generate_with_builtin_tools( priority = orig_priority - 1 sub_request += 1 - def _make_request_with_harmony( + async def _make_request_with_harmony( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, + prev_messages: list | None = None, ): if request.tool_choice != "auto": raise NotImplementedError( @@ -711,7 +800,9 @@ def _make_request_with_harmony( ) arrival_time = time.time() - messages = self._construct_input_messages_with_harmony(request, prev_response) + messages = await self._construct_input_messages_with_harmony( + request, prev_response, prev_messages + ) prompt_token_ids = render_for_completion(messages) engine_prompt = token_inputs(prompt_token_ids) engine_prompt["arrival_time"] = arrival_time @@ -748,6 +839,7 @@ async def responses_full_generator( tokenizer: TokenizerLike, request_metadata: RequestResponseMetadata, created_time: int | None = None, + messages: list | None = None, ) -> ErrorResponse | ResponsesResponse: if created_time is None: created_time = int(time.time()) @@ -878,12 +970,44 @@ async def responses_full_generator( kv_transfer_params=context.kv_transfer_params, ) + # Inject state carrier for stateless multi-turn (RFC #26934 + @grs). + # When the client requests encrypted_content inclusion and has opted out + # of server-side storage, embed a signed blob of the full message history + # into a synthetic ReasoningItem appended to response.output. The client + # stores the response opaquely and passes it back as `previous_response` + # on the next turn; vLLM then deserializes the blob to recover history. + if ( + request.include + and "reasoning.encrypted_content" in request.include + and not request.store + ): + carrier_messages: list | None = None + if self.use_harmony and isinstance(context, HarmonyContext): + # Harmony context already contains the full history including + # the assistant turn that was just generated. + carrier_messages = list(context.messages) + elif messages is not None: + # Non-Harmony path: `messages` is the input to the LLM for + # this turn. Append the assistant's response output so the + # carrier contains the complete history (input + response) + # for the next turn to build on. + from vllm.entrypoints.openai.responses.utils import ( + construct_chat_messages_with_tool_call, + ) + + carrier_messages = messages + construct_chat_messages_with_tool_call( + response.output + ) + if carrier_messages is not None: + state_carrier = self._build_state_carrier( + carrier_messages, request.request_id + ) + response.output.append(state_carrier) + if request.store: - async with self.response_store_lock: - stored_response = self.response_store.get(response.id) - # If the response is already cancelled, don't update it. - if stored_response is None or stored_response.status != "cancelled": - self.response_store[response.id] = response + await self.store.put_response( + response.id, response, unless_status="cancelled" + ) return response def _topk_logprobs( @@ -1124,10 +1248,11 @@ def _construct_harmony_system_input_message( ) return sys_msg - def _construct_input_messages_with_harmony( + async def _construct_input_messages_with_harmony( self, request: ResponsesRequest, prev_response: ResponsesResponse | None, + prev_messages: list | None = None, ) -> list[OpenAIHarmonyMessage]: messages: list[OpenAIHarmonyMessage] = [] if prev_response is None: @@ -1150,7 +1275,15 @@ def _construct_input_messages_with_harmony( # Continue the previous conversation. # FIXME(woosuk): Currently, request params like reasoning and # instructions are ignored. - prev_msgs = self.msg_store[prev_response.id] + if prev_messages is not None: + # Stateless path: messages came from the encrypted_content + # state carrier; deserialize dicts back to OpenAIHarmonyMessage. + prev_msgs = [ + OpenAIHarmonyMessage.model_validate(m) if isinstance(m, dict) else m + for m in prev_messages + ] + else: + prev_msgs = await self.store.get_messages(prev_response.id) or [] # FIXME(woosuk): The slice-delete-reappend cycle below is # currently a no-op --- it removes messages then puts them all @@ -1212,14 +1345,28 @@ async def _run_background_request_stream( event_deque: deque[StreamingResponsesResponse] = deque() new_event_signal = asyncio.Event() self.event_store[request.request_id] = (event_deque, new_event_signal) - generator = self.responses_stream_generator(request, *args, **kwargs) + response = None try: + generator = self.responses_stream_generator(request, *args, **kwargs) async for event in generator: event_deque.append(event) new_event_signal.set() # Signal new event available + except GenerationError as e: + response = self._convert_generation_error_to_response(e) + except Exception as e: + logger.exception("Background request failed for %s", request.request_id) + response = self.create_error_response(e) finally: new_event_signal.set() + if response is not None and isinstance(response, ErrorResponse): + # If the request has failed, update the status to "failed". + await self.store.update_response_status( + request.request_id, + "failed", + allowed_current_statuses={"queued", "in_progress"}, + ) + async def _run_background_request( self, request: ResponsesRequest, @@ -1230,12 +1377,11 @@ async def _run_background_request( if isinstance(response, ErrorResponse): # If the request has failed, update the status to "failed". - response_id = request.request_id - async with self.response_store_lock: - stored_response = self.response_store.get(response_id) - assert stored_response is not None - if stored_response.status not in ("completed", "cancelled"): - stored_response.status = "failed" + await self.store.update_response_status( + request.request_id, + "failed", + allowed_current_statuses={"queued", "in_progress"}, + ) async def responses_background_stream_generator( self, @@ -1276,13 +1422,40 @@ async def retrieve_responses( | ResponsesResponse | AsyncGenerator[StreamingResponsesResponse, None] ): - async with self.response_store_lock: - response = self.response_store.get(response_id) + if not self.enable_store: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "Response retrieval requires the response store to be enabled " + "(VLLM_ENABLE_RESPONSES_API_STORE=1). " + "For stateless multi-turn, use `previous_response` with " + "`include=['reasoning.encrypted_content']` and `store=false` " + "instead of retrieving responses by ID." + ), + status_code=HTTPStatus.NOT_IMPLEMENTED, + ) + + response = await self.store.get_response(response_id) if response is None: return self._make_not_found_error(response_id) if stream: + if response_id not in self.event_store: + if response.status not in ("queued", "in_progress"): + # Terminal state on another replica — return full response. + return response + return self.create_error_response( + err_type="invalid_request_error", + message=( + f"Streaming retrieval is not available for response " + f"'{response_id}': no event buffer exists on this " + f"server instance. The response may have been " + f"processed on a different replica. Use non-streaming " + f"retrieval instead." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) return self.responses_background_stream_generator( response_id, starting_after, @@ -1293,23 +1466,57 @@ async def cancel_responses( self, response_id: str, ) -> ErrorResponse | ResponsesResponse: - async with self.response_store_lock: - response = self.response_store.get(response_id) - if response is None: + if not self.enable_store: + # Stateless mode: cancel in-flight tasks by ID only (no stored response). + task = self.background_tasks.get(response_id) + if not task: + # Mimic the stateful path's 404 when no task is found. return self._make_not_found_error(response_id) - prev_status = response.status - if prev_status not in ("queued", "in_progress"): - return self.create_error_response( - err_type="invalid_request_error", - message="Cannot cancel a synchronous response.", - param="response_id", + task.cancel() + try: + await task + except asyncio.CancelledError: + logger.info( + "Background task for %s was cancelled (stateless mode)", + response_id, ) - # Update the status to "cancelled". - response.status = "cancelled" + # Cancellation was initiated but we cannot return a full response + # object because no state is stored in stateless mode. + return self.create_error_response( + err_type="invalid_request_error", + message=( + "Response cancellation was initiated for the in-flight task, " + "but a full response object cannot be returned in stateless mode. " + "Enable VLLM_ENABLE_RESPONSES_API_STORE=1 for full cancel " + "support." + ), + status_code=HTTPStatus.BAD_REQUEST, + ) + + response = await self.store.update_response_status( + response_id, + "cancelled", + allowed_current_statuses={"queued", "in_progress"}, + ) + if response is None: + # Either not found or not in a cancellable state. + stored = await self.store.get_response(response_id) + if stored is None: + return self._make_not_found_error(response_id) + return self.create_error_response( + err_type="invalid_request_error", + message="Cannot cancel a synchronous response.", + param="response_id", + ) # Abort the request. + # NOTE: With a shared ResponseStore across replicas, the CAS above + # marks the response as "cancelled" in the store, but the actual + # background task may be running on a different replica. We can only + # cancel process-local tasks here; the remote replica will discard its + # result at finalization via put_response(..., unless_status="cancelled"). if task := self.background_tasks.get(response_id): task.cancel() try: @@ -1326,6 +1533,63 @@ def _make_not_found_error(self, response_id: str) -> ErrorResponse: param="response_id", ) + def _build_state_carrier( + self, messages: list, request_id: str + ) -> ResponseReasoningItem: + """Build a synthetic ReasoningItem that carries serialized message history. + + The ``encrypted_content`` field holds a signed, base64-encoded JSON blob + of the full message list. On the next turn the client passes the full + response back as ``previous_response``; vLLM calls + ``_extract_state_from_response`` to recover history without a store. + """ + from vllm.entrypoints.openai.responses.state import serialize_state + + return ResponseReasoningItem( + id=f"rs_state_{request_id[-12:]}", + type="reasoning", + summary=[], + status="completed", + encrypted_content=serialize_state(messages), + ) + + def _extract_state_from_response(self, response: ResponsesResponse) -> list | None: + """Extract the serialized message history from a response's state carrier. + + Scans ``response.output`` for a vLLM state-carrier ReasoningItem and + deserializes its ``encrypted_content`` field back into a message list. + + Returns: + The deserialized message list (plain dicts), or ``None`` if no state + carrier is found (e.g. when the previous response was generated + without ``include=['reasoning.encrypted_content']``). + + Raises: + ValueError: If a state carrier is found but its HMAC is invalid. + """ + from vllm.entrypoints.openai.responses.state import ( + deserialize_state, + is_state_carrier, + ) + + for item in response.output: + if is_state_carrier(item): + return deserialize_state(item.encrypted_content) + return None + + def _make_store_not_supported_error(self) -> ErrorResponse: + return self.create_error_response( + err_type="invalid_request_error", + message=( + "`store=True` (default) is not supported. Please set " + "`store=False` in Responses API or set " + "`VLLM_ENABLE_RESPONSES_API_STORE=1` in the env var when " + "starting the vLLM server." + ), + status_code=HTTPStatus.BAD_REQUEST, + param="store", + ) + async def _process_simple_streaming_events( self, request: ResponsesRequest, @@ -1945,6 +2209,7 @@ async def responses_stream_generator( tokenizer: TokenizerLike, request_metadata: RequestResponseMetadata, created_time: int | None = None, + messages: list | None = None, ) -> AsyncGenerator[StreamingResponsesResponse, None]: # TODO: # 1. Handle disconnect @@ -2032,6 +2297,7 @@ async def empty_async_generator(): tokenizer, request_metadata, created_time=created_time, + messages=messages, ) yield _increment_sequence_number_and_return( ResponseCompletedEvent( diff --git a/vllm/entrypoints/openai/responses/state.py b/vllm/entrypoints/openai/responses/state.py new file mode 100644 index 000000000000..be7bf42cf0af --- /dev/null +++ b/vllm/entrypoints/openai/responses/state.py @@ -0,0 +1,158 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +""" +Stateless conversation state carrier for the Responses API. + +Serializes conversation message history into the ``encrypted_content`` field of +a synthetic ResponseReasoningItem, enabling multi-turn conversations without +server-side storage. Implements the approach proposed by @grs in +vllm-project/vllm#26934. + +Wire format: ``vllm:1::`` + +Security note: + The payload is **signed**, not encrypted. The HMAC-SHA256 signature + prevents client-side tampering with the serialized history, but the + contents (serialized message dicts) are readable by anyone who holds + the response object. This matches the spirit of the ``encrypted_content`` + field in the OpenAI protocol: an opaque blob the client stores and + returns verbatim. + +Multi-node deployments: + Set ``VLLM_RESPONSES_STATE_SIGNING_KEY`` to the same 64-character hex + string on all nodes. Without it each process generates a random key at + startup, making state carriers incompatible across restarts/replicas. +""" + +import hashlib +import hmac +import json +import os +from typing import Any + +import pybase64 as base64 + +from vllm.logger import init_logger + +logger = init_logger(__name__) + +_FORMAT_VERSION = "vllm:1" +_SIGNING_KEY: bytes | None = None + + +def _get_signing_key() -> bytes: + global _SIGNING_KEY + if _SIGNING_KEY is None: + key_hex = os.environ.get("VLLM_RESPONSES_STATE_SIGNING_KEY", "") + if key_hex: + try: + key_bytes = bytes.fromhex(key_hex) + except ValueError as exc: + raise ValueError( + "VLLM_RESPONSES_STATE_SIGNING_KEY must be a valid hex " + "string (e.g. a 64-char / 32-byte hex key)." + ) from exc + if len(key_bytes) < 32: + raise ValueError( + f"VLLM_RESPONSES_STATE_SIGNING_KEY decoded to only " + f"{len(key_bytes)} byte(s); a minimum of 32 bytes " + f"(64 hex characters) is required for a secure HMAC-SHA256 key." + ) + _SIGNING_KEY = key_bytes + else: + _SIGNING_KEY = os.urandom(32) + logger.warning( + "VLLM_RESPONSES_STATE_SIGNING_KEY is not set. " + "Stateless multi-turn state carriers are valid only for this " + "server instance and will break across restarts or on " + "multi-node deployments. " + "Set VLLM_RESPONSES_STATE_SIGNING_KEY to a shared 64-char " + "hex key to enable multi-node / restart-safe operation." + ) + return _SIGNING_KEY + + +def _harmony_serializer(obj: Any) -> Any: + """JSON serializer for OpenAIHarmonyMessage and similar pydantic models.""" + if hasattr(obj, "model_dump"): + return obj.model_dump() + raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable") + + +def serialize_state(messages: list[Any]) -> str: + """Serialize *messages* into a signed, base64-encoded state carrier string. + + The returned string is suitable for embedding in + ``ResponseReasoningItem.encrypted_content``. + + Args: + messages: A list of message objects (OpenAIHarmonyMessage instances or + plain ``ChatCompletionMessageParam`` dicts). + + Returns: + A wire-format state carrier string: ``vllm:1::``. + """ + payload_b64 = base64.b64encode( + json.dumps(messages, default=_harmony_serializer).encode() + ).decode() + sig = hmac.new(_get_signing_key(), payload_b64.encode(), hashlib.sha256).hexdigest() + return f"{_FORMAT_VERSION}:{payload_b64}:{sig}" + + +def deserialize_state(encrypted_content: str) -> list[Any] | None: + """Deserialize a state carrier string back into a message list. + + Args: + encrypted_content: The value of ``ResponseReasoningItem.encrypted_content``. + + Returns: + The deserialized message list (as plain dicts), or ``None`` if the + string is not a vLLM state carrier (e.g. a real encrypted_content + from an external model). + + Raises: + ValueError: If the string looks like a vLLM state carrier but the + HMAC signature does not match (indicates tampering). + """ + if not encrypted_content.startswith(f"{_FORMAT_VERSION}:"): + return None + # Expected: "vllm:1::" + # Strip the prefix, then split payload from signature from the right + # so future format version changes (e.g. "vllm:2:alpha") don't break. + suffix = encrypted_content[len(f"{_FORMAT_VERSION}:") :] + try: + payload_b64, sig = suffix.rsplit(":", 1) + except ValueError as exc: + raise ValueError( + "Malformed vLLM state carrier: expected " + f"'{_FORMAT_VERSION}::', got {encrypted_content!r}" + ) from exc + expected = hmac.new( + _get_signing_key(), payload_b64.encode(), hashlib.sha256 + ).hexdigest() + if not hmac.compare_digest(sig, expected): + raise ValueError( + "vLLM state carrier HMAC verification failed. " + "The state may have been tampered with, or was produced by a " + "different server instance (check VLLM_RESPONSES_STATE_SIGNING_KEY)." + ) + return json.loads(base64.b64decode(payload_b64)) + + +def is_state_carrier(item: Any) -> bool: + """Return True if *item* is a vLLM state-carrier ReasoningItem. + + A state carrier is a ``ResponseReasoningItem`` whose ``encrypted_content`` + field starts with the vLLM format prefix. These items are synthetic: + they carry serialized conversation history and should not be sent to the + LLM as part of the chat message list. + + Args: + item: Any object, typically a ``ResponseOutputItem``. + + Returns: + True if the item is a vLLM-generated state carrier. + """ + ec = getattr(item, "encrypted_content", None) + return bool(ec and isinstance(ec, str) and ec.startswith(f"{_FORMAT_VERSION}:")) diff --git a/vllm/entrypoints/openai/responses/store.py b/vllm/entrypoints/openai/responses/store.py new file mode 100644 index 000000000000..595281203f69 --- /dev/null +++ b/vllm/entrypoints/openai/responses/store.py @@ -0,0 +1,157 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import asyncio +import logging +from abc import ABC, abstractmethod + +from openai.types.responses import ResponseStatus + +from vllm.entrypoints.openai.responses.protocol import ResponsesResponse + +logger = logging.getLogger(__name__) + + +class ResponseStore(ABC): + """Abstract interface for persisting Responses API state. + + vLLM ships an in-memory default (``InMemoryResponseStore``). Users who + need persistence or multi-node consistency can implement this interface and + point ``VLLM_RESPONSES_STORE_BACKEND`` to their class's fully-qualified + name (e.g. ``mypackage.redis_store.RedisResponseStore``). + """ + + @abstractmethod + async def get_response(self, response_id: str) -> ResponsesResponse | None: + """Return the stored response, or ``None`` if not found.""" + + @abstractmethod + async def put_response( + self, + response_id: str, + response: ResponsesResponse, + *, + unless_status: ResponseStatus | None = None, + ) -> bool: + """Store *response* under *response_id*. + + If *unless_status* is given and the **currently stored** response + already has that status, the write is skipped and ``False`` is + returned. Otherwise the write succeeds and ``True`` is returned. + """ + + @abstractmethod + async def update_response_status( + self, + response_id: str, + new_status: ResponseStatus, + *, + allowed_current_statuses: set[ResponseStatus] | None = None, + ) -> ResponsesResponse | None: + """Atomically transition a response's status. + + If *allowed_current_statuses* is given the update only proceeds when + the current status is in the set; otherwise the call is a no-op and + returns ``None``. + + Returns the (possibly updated) ``ResponsesResponse``, or ``None`` + when the response was not found or the transition was rejected. + """ + + @abstractmethod + async def get_messages(self, response_id: str) -> list | None: + """Return the stored input messages for a response, or ``None``.""" + + @abstractmethod + async def put_messages(self, response_id: str, messages: list) -> None: + """Store the input messages for a response.""" + + async def close(self) -> None: + """Release resources. Override to clean up connections/pools.""" + return + + +class InMemoryResponseStore(ResponseStore): + """Default in-memory implementation — wraps the previous dict behaviour. + + All response mutations are guarded by an internal ``asyncio.Lock`` so + callers no longer need an external ``response_store_lock``. + """ + + def __init__(self) -> None: + self._responses: dict[str, ResponsesResponse] = {} + self._messages: dict[str, list] = {} + self._lock = asyncio.Lock() + + async def get_response(self, response_id: str) -> ResponsesResponse | None: + async with self._lock: + return self._responses.get(response_id) + + async def put_response( + self, + response_id: str, + response: ResponsesResponse, + *, + unless_status: ResponseStatus | None = None, + ) -> bool: + async with self._lock: + existing = self._responses.get(response_id) + if ( + unless_status is not None + and existing is not None + and existing.status == unless_status + ): + return False + self._responses[response_id] = response + return True + + async def update_response_status( + self, + response_id: str, + new_status: ResponseStatus, + *, + allowed_current_statuses: set[ResponseStatus] | None = None, + ) -> ResponsesResponse | None: + async with self._lock: + response = self._responses.get(response_id) + if response is None: + return None + if ( + allowed_current_statuses is not None + and response.status not in allowed_current_statuses + ): + return None + response.status = new_status + return response + + async def get_messages(self, response_id: str) -> list | None: + async with self._lock: + return self._messages.get(response_id) + + async def put_messages(self, response_id: str, messages: list) -> None: + async with self._lock: + self._messages[response_id] = messages + + +def create_response_store() -> ResponseStore: + """Factory that returns a ``ResponseStore`` instance. + + Checks ``VLLM_RESPONSES_STORE_BACKEND`` — if set, loads the class via + ``resolve_obj_by_qualname``; otherwise returns ``InMemoryResponseStore``. + """ + from vllm import envs + + backend = envs.VLLM_RESPONSES_STORE_BACKEND + if not backend: + return InMemoryResponseStore() + + from vllm.utils.import_utils import resolve_obj_by_qualname + + cls = resolve_obj_by_qualname(backend) + if not (isinstance(cls, type) and issubclass(cls, ResponseStore)): + raise TypeError( + f"VLLM_RESPONSES_STORE_BACKEND={backend!r} resolved to " + f"{cls!r}, which is not a ResponseStore subclass." + ) + logger.info("Using custom response store backend: %s", backend) + return cls() diff --git a/vllm/entrypoints/openai/responses/utils.py b/vllm/entrypoints/openai/responses/utils.py index 789a0e0b6be6..ee2f94a10603 100644 --- a/vllm/entrypoints/openai/responses/utils.py +++ b/vllm/entrypoints/openai/responses/utils.py @@ -167,14 +167,16 @@ def construct_chat_messages_with_tool_call( if maybe_combined_message is not None: messages[-1] = maybe_combined_message else: - messages.append(_construct_single_message_from_response_item(item)) + result = _construct_single_message_from_response_item(item) + if result is not None: # None means state carrier — skip it + messages.append(result) return messages def _construct_single_message_from_response_item( item: ResponseInputOutputItem, -) -> ChatCompletionMessageParam: +) -> ChatCompletionMessageParam | None: if isinstance(item, ResponseFunctionToolCall): # Append the function call as a tool call. return ChatCompletionAssistantMessageParam( @@ -191,9 +193,18 @@ def _construct_single_message_from_response_item( ], ) elif isinstance(item, ResponseReasoningItem): + from vllm.entrypoints.openai.responses.state import is_state_carrier + + if is_state_carrier(item): + # This is a vLLM state-carrier item: it carries serialized Harmony + # history in encrypted_content and must not be forwarded to the LLM. + return None reasoning = "" if item.encrypted_content: - raise ValueError("Encrypted content is not supported.") + raise ValueError( + "Encrypted content from external providers is not supported. " + "vLLM-generated state carriers are handled transparently." + ) elif item.content and len(item.content) >= 1: reasoning = item.content[0].text elif len(item.summary) >= 1: diff --git a/vllm/envs.py b/vllm/envs.py index 5c2a01482ffe..aa7b62c52ef7 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -201,6 +201,8 @@ VLLM_LOOPBACK_IP: str = "" VLLM_ALLOW_CHUNKED_LOCAL_ATTN_WITH_HYBRID_KV_CACHE: bool = True VLLM_ENABLE_RESPONSES_API_STORE: bool = False + VLLM_RESPONSES_STORE_BACKEND: str = "" + VLLM_RESPONSES_STATE_SIGNING_KEY: str = "" VLLM_NVFP4_GEMM_BACKEND: str | None = None VLLM_HAS_FLASHINFER_CUBIN: bool = False VLLM_USE_FLASHINFER_MOE_MXFP4_MXFP8: bool = False @@ -1466,6 +1468,18 @@ def _get_or_set_default() -> str: "VLLM_ENABLE_RESPONSES_API_STORE": lambda: bool( int(os.getenv("VLLM_ENABLE_RESPONSES_API_STORE", "0")) ), + # Fully-qualified class name of a custom ResponseStore backend. + # When set, vLLM loads this class instead of InMemoryResponseStore. + # Example: "mypackage.redis_store.RedisResponseStore" + "VLLM_RESPONSES_STORE_BACKEND": lambda: os.environ.get( + "VLLM_RESPONSES_STORE_BACKEND", "" + ), + # Hex-encoded 32-byte (64-char) signing key for stateless multi-turn + # state carriers (RFC #26934). If not set, a random key is generated at + # startup (incompatible across restarts / multi-node deployments). + "VLLM_RESPONSES_STATE_SIGNING_KEY": lambda: os.environ.get( + "VLLM_RESPONSES_STATE_SIGNING_KEY", "" + ), # If set, use the fp8 mfma in rocm paged attention. "VLLM_ROCM_FP8_MFMA_PAGE_ATTN": lambda: bool( int(os.getenv("VLLM_ROCM_FP8_MFMA_PAGE_ATTN", "0"))