diff --git a/tests/entrypoints/openai/test_response_api_harmony_input_output.py b/tests/entrypoints/openai/test_response_api_harmony_input_output.py new file mode 100644 index 000000000000..b8d2192638f5 --- /dev/null +++ b/tests/entrypoints/openai/test_response_api_harmony_input_output.py @@ -0,0 +1,858 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import httpx +import pytest +import pytest_asyncio +from openai import OpenAI +from openai_harmony import Message, Role + +from ...utils import RemoteOpenAIServer + +MODEL_NAME = "openai/gpt-oss-20b" + + +def harmony_message_to_dict(message: Message) -> dict: + """ + Convert a harmony Message to a dictionary format suitable for vLLM API. + + This adds the required "type": "harmony" field that vLLM's field validator + expects for proper deserialization. + + Args: + message: OpenAI Harmony Message object + + Returns: + Dictionary representation with proper type field for vLLM validation + """ + message_dict = message.to_dict() + message_dict["type"] = "harmony" + return message_dict + + +def harmony_messages_to_dicts(messages: list[Message]) -> list[dict]: + """ + Convert a list of harmony Messages to dictionaries suitable for vLLM API. + + Args: + messages: List of OpenAI Harmony Message objects + + Returns: + List of dictionary representations with proper type fields + """ + return [harmony_message_to_dict(msg) for msg in messages] + + +@pytest.fixture(scope="module") +def monkeypatch_module(): + from _pytest.monkeypatch import MonkeyPatch + mpatch = MonkeyPatch() + yield mpatch + mpatch.undo() + + +@pytest.fixture(scope="module") +def server(monkeypatch_module: pytest.MonkeyPatch): + args = ["--enforce-eager", "--tool-server", "demo"] + + with monkeypatch_module.context() as m: + with RemoteOpenAIServer(MODEL_NAME, args) as remote_server: + yield remote_server + + +@pytest_asyncio.fixture +async def client(server): + async with server.get_async_client() as async_client: + yield async_client + + +async def send_responses_request(server, data: dict) -> dict: + """Helper function to send requests using HTTP.""" + async with httpx.AsyncClient(timeout=120.0) as http_client: + response = await http_client.post( + f"{server.url_root}/v1/responses", + json=data, + headers={"Authorization": f"Bearer {server.DUMMY_API_KEY}"}) + + response.raise_for_status() + return response.json() + + +class ResponsesApiResponse: + """Helper class to make HTTP response look like OpenAI client response.""" + + def __init__(self, data: dict): + self.status = data["status"] + self.input_messages = data.get("input_messages", []) + self.output_messages = data.get("output_messages", []) + self.id = data.get("id") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_deserialization(client: OpenAI, model_name: str, + server): + """Test that harmony messages can be properly deserialized from JSON.""" + + # Create some harmony messages manually (as they would come from a client) + previous_harmony_messages = [{ + "role": + "user", + "content": [{ + "type": "text", + "text": "What is the capital of France?" + }], + "channel": + None, + "recipient": + None, + "content_type": + None + }, { + "role": + "assistant", + "content": [{ + "type": "text", + "text": "The capital of France is Paris." + }], + "channel": + None, + "recipient": + None, + "content_type": + None + }] + + # Use direct HTTP request since OpenAI client doesn't support custom params + response_json = await send_responses_request( + server, { + "model": model_name, + "input": "Tell me more about that city.", + "previous_response_messages": previous_harmony_messages, + "enable_response_messages": True + }) + + response = ResponsesApiResponse(response_json) + + assert response is not None + assert response.status == "completed" + + # Verify the response includes both the previous and new messages + all_messages = (response.input_messages + response.output_messages) + + # Verify that all messages have proper serialization + all_messages = (response.input_messages + response.output_messages) + for msg in all_messages: + assert "role" in msg + assert "content" in msg + assert isinstance(msg["content"], list) + + # Ensure content is not empty objects + for content_item in msg["content"]: + assert isinstance(content_item, dict) + assert len(content_item) > 0 # Should not be empty {} + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_round_trip(client: OpenAI, model_name: str, + server): + """Test full round-trip: get harmony messages from response, send back.""" + + # First request using HTTP since we need enable_response_messages + response1_json = await send_responses_request( + server, { + "model": model_name, + "input": "What is 2 + 2?", + "instructions": "Provide a simple answer.", + "enable_response_messages": True + }) + + response1 = ResponsesApiResponse(response1_json) + assert response1 is not None + assert response1.status == "completed" + + # Extract harmony messages from first response + first_input_messages = response1.input_messages + first_output_messages = response1.output_messages + + # Combine all messages from first conversation + all_first_messages = first_input_messages + first_output_messages + + # Second request using harmony messages from first response - use HTTP + response2_json = await send_responses_request( + server, { + "model": model_name, + "input": "Now what is 3 + 3?", + "instructions": "Continue the math conversation.", + "previous_response_messages": all_first_messages, + "previous_response_output": response1_json.get("output", []), + "enable_response_messages": True + }) + + response2 = ResponsesApiResponse(response2_json) + + assert response2 is not None + assert response2.status == "completed" + + # Verify that second response contains more messages (original + new) + second_input_messages = response2.input_messages + second_output_messages = response2.output_messages + + # Should have at least the messages from the first conversation plus new + assert len(second_input_messages) > len(first_input_messages) + + # Verify all messages in the full conversation have proper content + all_second_messages = second_input_messages + second_output_messages + text_message_count = 0 + + for msg in all_second_messages: + assert "role" in msg + assert "content" in msg + + for content_item in msg["content"]: + if content_item.get("type") == "text": + assert "text" in content_item + assert len(content_item["text"].strip()) > 0 + text_message_count += 1 + + # Should have at least some text messages in the conversation + assert text_message_count > 0 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_context_continuation(client: OpenAI, + model_name: str, server): + """Test that harmony messages provide proper context continuation.""" + + # First establish context with a specific topic + response1_json = await send_responses_request( + server, { + "model": model_name, + "input": + "I'm planning a trip to Tokyo. What's the best time to visit?", + "instructions": "Provide travel advice.", + "enable_response_messages": True + }) + + response1 = ResponsesApiResponse(response1_json) + assert response1.status == "completed" + + # Get all messages from the first conversation + all_messages = (response1.input_messages + response1.output_messages) + + # Continue the conversation with a follow-up question + response2_json = await send_responses_request( + server, { + "model": model_name, + "input": "What about food recommendations for that city?", + "instructions": "Continue helping with travel planning.", + "previous_response_messages": all_messages, + "previous_response_output": response1_json.get("output", []), + "enable_response_messages": True + }) + + response2 = ResponsesApiResponse(response2_json) + assert response2.status == "completed" + + # Verify context is maintained - should have more messages now + assert len(response2.input_messages) > len(response1.input_messages) + + # The conversation should contain references to the original topic + all_content = [] + for msg in (response2.input_messages + response2.output_messages): + for content_item in msg["content"]: + if content_item.get("type") == "text" and "text" in content_item: + all_content.append(content_item["text"].lower()) + + # Should contain references to the original context (Tokyo/trip) + conversation_text = " ".join(all_content) + assert ("tokyo" in conversation_text or "trip" in conversation_text + or "travel" in conversation_text) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_empty_list(client: OpenAI, model_name: str, + server): + """Test that empty harmony messages list works properly.""" + + response_json = await send_responses_request( + server, + { + "model": model_name, + "input": "What's 5 + 5?", + "instructions": "Answer the math question.", + "previous_response_messages": [], # Empty list + "enable_response_messages": True + }) + + response = ResponsesApiResponse(response_json) + assert response.status == "completed" + assert len(response.input_messages) > 0 + assert len(response.output_messages) > 0 + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_none_parameter(client: OpenAI, model_name: str, + server): + """Test that None harmony messages parameter works (same as omitting).""" + + response_json = await send_responses_request( + server, { + "model": model_name, + "input": "What's 7 + 8?", + "instructions": "Answer the math question.", + "previous_response_messages": None, + "enable_response_messages": True + }) + + response = ResponsesApiResponse(response_json) + assert response.status == "completed" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_tools_presence_across_requests(client: OpenAI, model_name: str, + server): + """Test that tools are properly handled when present in first request but not second.""" + + # First request with tools defined + response1_json = await send_responses_request( + server, { + "model": model_name, + "input": "Calculate 25 + 17 using Python.", + "tools": [{ + "type": "code_interpreter", + "container": { + "type": "auto" + } + }], + "enable_response_messages": True + }) + + response1 = ResponsesApiResponse(response1_json) + assert response1.status == "completed" + assert len(response1.input_messages) > 0 + assert len(response1.output_messages) > 0 + + # Get all messages from first response + all_messages_1 = response1.input_messages + response1.output_messages + + # Second request with NO tools defined, but using messages from first request + response2_json = await send_responses_request( + server, + { + "model": model_name, + "input": "What was that calculation result again?", + "instructions": "Use the previous conversation context.", + "previous_response_messages": all_messages_1, + # Note: explicitly NOT including tools here + "enable_response_messages": True + }) + + response2 = ResponsesApiResponse(response2_json) + assert response2.status == "completed" + assert len(response2.input_messages) > 0 + assert len(response2.output_messages) > 0 + + # Validate that input_messages from second response do not contain tool definitions + tool_definitions_found = 0 + for msg in response2.input_messages: + assert "role" in msg + assert "content" in msg + + # Check that there are no tool-related keys in the message structure + assert "tools" not in msg + assert "tool_calls" not in msg + + # Check content items don't contain tool definitions + if isinstance(msg["content"], list): + for content_item in msg["content"]: + # Tool definitions should not be present in content + if content_item.get("type") == "tool_definition": + tool_definitions_found += 1 + # But tool_call and tool_result types may still be present from previous conversation + # We're specifically checking that new tool definitions aren't added + + assert tool_definitions_found == 0, "No tool definitions should be in second request input" + + # Verify the second response has more messages (accumulated context) + assert len(response2.input_messages) > len(response1.input_messages) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_tools_introduction_in_second_request(client: OpenAI, + model_name: str, server): + """Test introducing tools in second request when first had none.""" + + # First request with NO tools + response1_json = await send_responses_request( + server, + { + "model": model_name, + "input": "I need to do a calculation later: 42 + 58.", + "instructions": "Just acknowledge the request for now.", + # Note: explicitly NOT including tools here + "enable_response_messages": True + }) + + response1 = ResponsesApiResponse(response1_json) + assert response1.status == "completed" + assert len(response1.input_messages) > 0 + assert len(response1.output_messages) > 0 + + # Get all messages from first response + all_messages_1 = response1.input_messages + response1.output_messages + + # Second request WITH tools defined, using messages from first request + response2_json = await send_responses_request( + server, { + "model": model_name, + "input": + "Now please calculate that addition I mentioned using the python tool.", + "instructions": + "Use the previous conversation context and perform the calculation.", + "previous_response_messages": all_messages_1, + "tools": [{ + "type": "code_interpreter", + "container": { + "type": "auto" + } + }], + "enable_response_messages": True + }) + + response2 = ResponsesApiResponse(response2_json) + assert response2.status == "completed" + assert len(response2.input_messages) > 0 + assert len(response2.output_messages) > 0 + + # Check for tool calls or tool results in the output messages + has_tool_call = False + tool_evidence = [] + for msg in response2.output_messages: + if "content" in msg and isinstance(msg["content"], list): + for content_item in msg["content"]: + content_type = content_item.get("type", "") + # Look for tool-related content types + if content_type in [ + "tool_call", "tool_result", "code_interpreter" + ]: + has_tool_call = True + tool_evidence.append(f"Found {content_type}") + break + # Also check for text content that indicates code execution + elif (content_type == "text" and "text" in content_item): + text_content = content_item["text"].lower() + if any( + keyword in text_content for keyword in + ["python", "execute", "calculate", "42", "58", "100"]): + has_tool_call = True + tool_evidence.append( + "Found calculation evidence in text") + break + if has_tool_call: + break + + # Verify that tool functionality was engaged + assert has_tool_call, "Expected tool call or calculation evidence in output messages" + + # Verify the second response has more messages (accumulated context) + assert len(response2.input_messages) > len(response1.input_messages) + + # Verify conversation continuity - should reference the original calculation + all_content = [] + for msg in response2.input_messages + response2.output_messages: + if "content" in msg and isinstance(msg["content"], list): + for content_item in msg["content"]: + if content_item.get( + "type") == "text" and "text" in content_item: + all_content.append(content_item["text"].lower()) + + conversation_text = " ".join(all_content) + # Should contain references to the calculation mentioned in first request + calculation_refs = [ + kw for kw in ["42", "58", "calculation", "addition"] + if kw in conversation_text + ] + + assert len( + calculation_refs + ) > 0, f"Expected calculation references, found: {calculation_refs}" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_code_interpreter_tool_structure(client: OpenAI, model_name: str, + server): + """Test code_interpreter tool has correct structure and appears in output_messages with analysis channel.""" + + # Request that should trigger code_interpreter tool usage + response_json = await send_responses_request( + server, { + "model": model_name, + "input": "Calculate the factorial of 5 using Python code.", + "tools": [{ + "type": "code_interpreter", + "container": { + "type": "auto" + } + }], + "enable_response_messages": True + }) + + response = ResponsesApiResponse(response_json) + assert response.status == "completed" + assert len(response.output_messages) > 0 + + # Check that response.output doesn't contain tool calls (they should be in output_messages) + response_output = response_json.get("output", []) + has_tool_call_in_output = False + for item in response_output: + if isinstance(item, dict) and item.get("type") == "tool_call": + has_tool_call_in_output = True + break + + # Tool calls should NOT appear in response.output + assert not has_tool_call_in_output, "Tool calls should not appear in response.output" + + # Find code_interpreter tool calls in output_messages + code_interpreter_calls = [] + analysis_channel_messages = [] + + for msg in response.output_messages: + # Check for analysis channel + if msg.get("channel") == "analysis": + analysis_channel_messages.append(msg) + + # Look for code_interpreter tool calls in message content + if "content" in msg and isinstance(msg["content"], list): + for content_item in msg["content"]: + if content_item.get("type") == "tool_call": + # Check if it's a code_interpreter tool call + tool_call = content_item.get("tool_call", {}) + if tool_call.get("type") == "code_interpreter": + code_interpreter_calls.append(content_item) + + # For now, just verify we have analysis channel messages (tool calls may vary by implementation) + assert len(analysis_channel_messages + ) > 0, "Expected messages with channel='analysis'" + + # Validate structure of code_interpreter tool calls if they exist + for i, tool_call_item in enumerate(code_interpreter_calls): + assert tool_call_item["type"] == "tool_call" + tool_call = tool_call_item["tool_call"] + + # Verify it's a code_interpreter type + assert tool_call["type"] == "code_interpreter" + + # Should have required fields for code_interpreter + assert "id" in tool_call + assert "code_interpreter" in tool_call + + code_interpreter = tool_call["code_interpreter"] + assert "input" in code_interpreter # Should have code input + + # The code should be related to factorial calculation + code_input = code_interpreter["input"].lower() + factorial_keywords = [ + kw for kw in ["factorial", "5", "*", "math"] if kw in code_input + ] + + assert len(factorial_keywords) > 0, \ + f"Code should be related to factorial calculation, got: {code_input}" + + # Verify analysis channel messages have proper structure + for i, msg in enumerate(analysis_channel_messages): + assert msg["channel"] == "analysis" + assert "role" in msg + assert "content" in msg + assert isinstance(msg["content"], list) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_chain_conversation(client: OpenAI, + model_name: str, server): + """Test chaining multiple requests with harmony messages.""" + + # Start a conversation + response1_json = await send_responses_request( + server, { + "model": model_name, + "input": "My favorite color is blue.", + "instructions": "Remember this information.", + "enable_response_messages": True + }) + + response1 = ResponsesApiResponse(response1_json) + assert response1.status == "completed" + + # Continue with context from first response + messages_after_1 = (response1.input_messages + response1.output_messages) + + response2_json = await send_responses_request( + server, { + "model": model_name, + "input": "What's my favorite color?", + "instructions": "Use the previous context.", + "previous_response_messages": messages_after_1, + "enable_response_messages": True + }) + + response2 = ResponsesApiResponse(response2_json) + assert response2.status == "completed" + + # Continue with context from second response + messages_after_2 = (response2.input_messages + response2.output_messages) + + response3_json = await send_responses_request( + server, { + "model": model_name, + "input": "What about my favorite number? It's 42.", + "instructions": "Remember this new information too.", + "previous_response_messages": messages_after_2, + "enable_response_messages": True + }) + + response3 = ResponsesApiResponse(response3_json) + assert response3.status == "completed" + + # Final request should have context from all previous messages + messages_after_3 = (response3.input_messages + response3.output_messages) + + response4_json = await send_responses_request( + server, { + "model": model_name, + "input": "What are my favorite color and number?", + "instructions": "Recall both pieces of information.", + "previous_response_messages": messages_after_3, + "enable_response_messages": True + }) + + response4 = ResponsesApiResponse(response4_json) + assert response4.status == "completed" + + # Verify the conversation has grown with each interaction + assert len(response4.input_messages) > len(response3.input_messages) + assert len(response3.input_messages) > len(response2.input_messages) + assert len(response2.input_messages) > len(response1.input_messages) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_message_with_python_tool(client: OpenAI, + model_name: str, server): + """Test harmony messages with Python tool usage and context preservation.""" + # First request that should trigger Python tool usage using proper tool spec + response1_json = await send_responses_request( + server, { + "model": model_name, + "input": "Calculate the square root of 144 using Python code.", + "tools": [{ + "type": "code_interpreter", + "container": { + "type": "auto" + } + }], + "enable_response_messages": True + }) + + response1 = ResponsesApiResponse(response1_json) + assert response1.status == "completed" + + # Verify we have harmony messages from the first response + assert len(response1.input_messages) > 0 + assert len(response1.output_messages) > 0 + + # Look for tool usage in the messages + all_messages_1 = (response1.input_messages + response1.output_messages) + + # Check if any message contains tool calls or tool results + has_tool_content = False + for msg in all_messages_1: + if "content" in msg and isinstance(msg["content"], list): + for content_item in msg["content"]: + if (content_item.get("type") + in ["tool_call", "tool_result", "code_interpreter"] + or ("text" in content_item and + ("python" in content_item["text"].lower() + or "calculate" in content_item["text"].lower()))): + has_tool_content = True + break + if has_tool_content: + break + + # Continue conversation with tool context + response2_json = await send_responses_request( + server, { + "model": model_name, + "input": "Now calculate the square of that result.", + "instructions": "Use the result from the previous calculation.", + "previous_response_messages": all_messages_1, + "tools": [{ + "type": "code_interpreter", + "container": { + "type": "auto" + } + }], + "enable_response_messages": True + }) + + response2 = ResponsesApiResponse(response2_json) + assert response2.status == "completed" + + # Verify second response has more messages than first (accumulated context) + assert len(response2.input_messages) > len(response1.input_messages) + + # Verify all messages maintain proper structure + all_messages_2 = (response2.input_messages + response2.output_messages) + + for msg in all_messages_2: + assert "role" in msg + assert "content" in msg + assert isinstance(msg["content"], list) + + for content_item in msg["content"]: + assert isinstance(content_item, dict) + assert len(content_item) > 0 # Should not be empty + + # Verify we have more total messages in the second response + assert len(all_messages_2) > len(all_messages_1) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_library_input_messages(client: OpenAI, model_name: str, + server): + """Test using harmony library to create messages and send via previous_response_messages for single-turn conversations.""" + + # Create harmony messages using the library + user_message = Message.from_role_and_content( + Role.USER, + "Hello! Can you help me understand how Python functions work?") + + # Convert to dict format for API using helper function + harmony_messages = [harmony_message_to_dict(user_message)] + + # Send request with harmony messages in previous_response_messages and current input + response_json = await send_responses_request( + server, + { + "model": model_name, + "input": + "Can you explain more about Python functions?", # Non-empty input field + "previous_response_messages": + harmony_messages, # Using previous_response_messages to provide context + "instructions": + "Provide a helpful explanation about Python functions.", + "enable_response_messages": True + }) + + response = ResponsesApiResponse(response_json) + assert response.status == "completed" + + # Verify the harmony message was processed correctly + assert len(response.input_messages) > 0 + assert len(response.output_messages) > 0 + + # Check that our original harmony message is in the input_messages + found_original_message = False + for msg in response.input_messages: + if (msg.get("role") == "user" and "content" in msg + and isinstance(msg["content"], list)): + for content_item in msg["content"]: + if (content_item.get("type") == "text" and "python functions" + in content_item.get("text", "").lower()): + found_original_message = True + break + if found_original_message: + break + + assert found_original_message, "Original harmony message should be preserved in input_messages" + + # Verify response contains relevant content about Python functions + response_text = "" + for msg in response.output_messages: + if "content" in msg and isinstance(msg["content"], list): + for content_item in msg["content"]: + if content_item.get( + "type") == "text" and "text" in content_item: + response_text += content_item["text"].lower() + + python_keywords_found = [ + kw for kw in ["function", "python", "def", "return"] + if kw in response_text + ] + + assert len(python_keywords_found + ) > 0, "Response should contain Python function information" + + +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_harmony_library_multi_message_input(client: OpenAI, + model_name: str, server): + """Test using harmony library to create multiple messages via input_messages.""" + + # Create a conversation using harmony library + user_msg1 = Message.from_role_and_content(Role.USER, + "What's the capital of Japan?") + + assistant_msg1 = Message.from_role_and_content( + Role.ASSISTANT, "The capital of Japan is Tokyo.") + + user_msg2 = Message.from_role_and_content( + Role.USER, "What's the population of that city?") + + # Convert to dict format for API using helper function + harmony_messages = harmony_messages_to_dicts( + [user_msg1, assistant_msg1, user_msg2]) + + # Send request with multiple harmony messages + response_json = await send_responses_request( + server, + { + "model": model_name, + "input": "", # Empty input field + "previous_response_messages": harmony_messages, + "instructions": "Continue the conversation about Tokyo.", + "enable_response_messages": True + }) + + response = ResponsesApiResponse(response_json) + assert response.status == "completed" + + # Verify all original messages are preserved + assert len(response.input_messages) >= len(harmony_messages) + + # Check that conversation context is maintained + response_text = "" + for msg in response.output_messages: + if "content" in msg and isinstance(msg["content"], list): + for content_item in msg["content"]: + if content_item.get( + "type") == "text" and "text" in content_item: + response_text += content_item["text"].lower() + + # Should reference Tokyo or population in the response + context_keywords = [ + kw for kw in ["tokyo", "population", "million", "people", "city"] + if kw in response_text + ] + + assert len( + context_keywords) > 0, "Response should maintain conversation context" + + # Verify message structure integrity + for msg in response.input_messages: + assert "role" in msg + assert "content" in msg + assert isinstance(msg["content"], list) + for content_item in msg["content"]: + assert isinstance(content_item, dict) + assert len(content_item) > 0 diff --git a/vllm/entrypoints/chat_utils.py b/vllm/entrypoints/chat_utils.py index b53dbfb3a26a..803f1620ce10 100644 --- a/vllm/entrypoints/chat_utils.py +++ b/vllm/entrypoints/chat_utils.py @@ -254,6 +254,25 @@ class CustomChatCompletionMessageParam(TypedDict, total=False): OpenAIHarmonyMessage, ] +def chat_completion_message_param_to_dict( + param: ChatCompletionMessageParam +) -> dict[str, Any]: + if isinstance(param, OpenAIHarmonyMessage): + out = param.to_dict() + out["type"] = "harmony" + return out + elif isinstance(param, dict): + return dict(param) + else: + raise ValueError(f"Unknown message param type: {type(param)}") + +def chat_completion_message_param_from_dict( + param: dict[str, Any] +) -> ChatCompletionMessageParam: + if param.get("type", None) == "harmony": + return OpenAIHarmonyMessage.from_dict(param) + else: + return cast(ChatCompletionMessageParam, param) # TODO: Make fields ReadOnly once mypy supports it class ConversationMessage(TypedDict, total=False): diff --git a/vllm/entrypoints/context.py b/vllm/entrypoints/context.py index 7723c5d5cbcf..f4436851be3f 100644 --- a/vllm/entrypoints/context.py +++ b/vllm/entrypoints/context.py @@ -4,16 +4,30 @@ import logging from abc import ABC, abstractmethod from contextlib import AsyncExitStack -from typing import TYPE_CHECKING, Optional, Union +from typing import Callable, TYPE_CHECKING, Optional, Union, Tuple +from openai.types.responses import (ResponseOutputItem, + ResponseOutputMessage, ResponseOutputText, + ResponseReasoningItem) +from openai.types.responses.response_reasoning_item import ( + Content as ResponseReasoningTextContent) from openai_harmony import Author, Message, Role, StreamState, TextContent +from vllm.entrypoints.chat_utils import ChatCompletionMessageParam from vllm.entrypoints.harmony_utils import ( - get_encoding, get_streamable_parser_for_assistant, render_for_completion) + convert_harmony_message_to_output_item, get_encoding, + get_streamable_parser_for_assistant, + parse_remaining_state_into_output_items, + render_for_completion,) +from vllm.entrypoints.logger import RequestLogger +from vllm.entrypoints.openai.protocol import ResponsesRequest from vllm.entrypoints.tool import Tool from vllm.entrypoints.tool_server import ToolServer +from vllm.entrypoints.openai.logprobs_utils import create_response_logprobs from vllm.outputs import RequestOutput - +from vllm.reasoning import ReasoningParser +from vllm.transformers_utils.tokenizer import AnyTokenizer +from vllm.utils import random_uuid if TYPE_CHECKING: from mcp.client import ClientSession @@ -60,16 +74,32 @@ async def init_tool_sessions(self, tool_server: Optional[ToolServer], exit_stack: AsyncExitStack) -> None: pass + @abstractmethod + def get_input_messages(self) -> list[ChatCompletionMessageParam]: + pass -class SimpleContext(ConversationContext): + @abstractmethod + def get_output_and_output_messages(self, request: ResponsesRequest) -> Tuple[list[ResponseOutputItem], list[ChatCompletionMessageParam]]: + pass - def __init__(self): +# Standard functionality for ChatCompletion based models +class ChatContext(ConversationContext): + def __init__(self, input_messages: list[ChatCompletionMessageParam], + tokenizer: AnyTokenizer, + reasoning_parser: Optional[Callable[[AnyTokenizer], + ReasoningParser]], + request_logger: Optional[RequestLogger] = None): + self.request_logger = request_logger self.last_output = None + self.tokenizer = tokenizer + self._input_messages = input_messages.copy() self.num_prompt_tokens = 0 self.num_output_tokens = 0 self.num_cached_tokens = 0 # todo num_reasoning_tokens is not implemented yet. self.num_reasoning_tokens = 0 + self.reasoning_parser = reasoning_parser + # TODO: Tool parser for tool calling for other models def append_output(self, output) -> None: self.last_output = output @@ -92,6 +122,103 @@ async def init_tool_sessions(self, tool_server: Optional[ToolServer], exit_stack: AsyncExitStack) -> None: pass + def get_input_messages(self) -> list[ChatCompletionMessageParam]: + return self._input_messages + + # TODO: Ideally this class only deals with messages, but since we don't have + # a way to represent reasoning as messages yet + # This is very important as once Responses specific concepts are removed + # then this can be used to handle tool calling for completions API as well + def get_output_and_output_messages(self, + request: ResponsesRequest) -> Tuple[ + list[ResponseOutputItem], + list[ChatCompletionMessageParam]]: + + if self.last_output is None: + return [], [] + assert len(self.last_output.outputs) == 1 + final_output = self.last_output.outputs[0] + output_items = [] + output_messages = [] + if self.reasoning_parser: + try: + reasoning_parser = self.reasoning_parser(self.tokenizer) + except RuntimeError as e: + logger.exception("Error in reasoning parser creation.") + raise e + # TODO: Figure out how to get number of reasoning tokens here + # without tokenizing again + reasoning_content, content = ( + reasoning_parser.extract_reasoning_content(final_output.text, + request=request)) + else: + reasoning_content = None + self.num_reasoning_tokens = 0 + content = final_output.text + + # Log complete response if output logging is provided + # matches previous functionality + if self.request_logger is not None: + output_text = "" + if content: + output_text = content + elif reasoning_content: + output_text = f"[reasoning: {reasoning_content}]" + + if output_text: + self.request_logger.log_outputs( + request_id=request.request_id, + outputs=output_text, + output_token_ids=final_output.token_ids, + finish_reason=final_output.finish_reason, + is_streaming=False, + delta=False, + ) + + + if reasoning_content: + # TODO: Make a ResponseOutputItem but skip a reasoning message + # since there is no direct match in OpenAI spec and + # functionality drops them between API requests at the moment + reasoning_item = ResponseReasoningItem( + id=f"rs_{random_uuid()}", + summary=[], + type="reasoning", + content=[ + ResponseReasoningTextContent(text=reasoning_content, + type="reasoning_text") + ], + status=None, # NOTE: Only the last output item has status. + ) + + output_items.append(reasoning_item) + if content: + output_text = ResponseOutputText( + text=content, + annotations=[], # TODO + type="output_text", + logprobs=create_response_logprobs( + token_ids=final_output.token_ids, + logprobs=final_output.logprobs, + tokenizer=self.tokenizer, + top_logprobs=request.top_logprobs, + ) if request.is_include_output_logprobs() else None, + ) + message = ResponseOutputMessage( + id=f"msg_{random_uuid()}", + content=[output_text], + role="assistant", + status="completed", + type="message", + ) + output_items.append(message) + # It is always an assistant message, which is a typed_dict + output_messages.append({ + "role": "assistant", + "content": content, + }) + + return output_items, output_messages class HarmonyContext(ConversationContext): @@ -101,11 +228,11 @@ def __init__( available_tools: list[str], ): self._messages = messages + self._input_messages = messages.copy() self.available_tools = available_tools self._tool_sessions: dict[str, Union[ClientSession, Tool]] = {} self.parser = get_streamable_parser_for_assistant() - self.num_init_messages = len(messages) self.num_prompt_tokens = 0 self.num_output_tokens = 0 self.num_cached_tokens = 0 @@ -298,6 +425,22 @@ async def init_tool_sessions(self, tool_server: Optional[ToolServer], tool_name] = await exit_stack.enter_async_context( tool_server.new_session(tool_name)) + def get_input_messages(self) -> list[ChatCompletionMessageParam]: + return self._input_messages + + def get_output_and_output_messages(self, request: ResponsesRequest) -> Tuple[list[ResponseOutputItem], list[ChatCompletionMessageParam]]: + output_items = [] + output_messages = self.messages[len(self._input_messages):] + for msg in output_messages: + output_items.extend(convert_harmony_message_to_output_item(msg)) + # Handle the generation stopped in the middle (if any). + # TODO: This will not result in any messages, so the next API + # request will not see these outputs, should this be kept? + last_items = parse_remaining_state_into_output_items(self.parser) + if last_items: + output_items.extend(last_items) + return output_items, output_messages + class StreamingHarmonyContext(HarmonyContext): diff --git a/vllm/entrypoints/harmony_utils.py b/vllm/entrypoints/harmony_utils.py index a3693ce60e4e..3c6fd8334ebd 100644 --- a/vllm/entrypoints/harmony_utils.py +++ b/vllm/entrypoints/harmony_utils.py @@ -111,7 +111,6 @@ def get_developer_message( def get_user_message(content: str) -> Message: return Message.from_role_and_content(Role.USER, content) - def parse_response_input( response_msg: ResponseInputOutputItem, prev_responses: list[Union[ResponseOutputItem, ResponseReasoningItem]] @@ -213,7 +212,7 @@ def render_for_completion(messages: list[Message]) -> list[int]: return token_ids -def parse_output_message(message: Message) -> list[ResponseOutputItem]: +def convert_harmony_message_to_output_item(message: Message) -> list[ResponseOutputItem]: """ Parse a Harmony message into a list of output response items. """ @@ -315,7 +314,7 @@ def parse_output_message(message: Message) -> list[ResponseOutputItem]: return output_items -def parse_remaining_state( +def parse_remaining_state_into_output_items( parser: StreamableParser) -> list[ResponseOutputItem]: if not parser.current_content: return [] diff --git a/vllm/entrypoints/openai/logprobs_utils.py b/vllm/entrypoints/openai/logprobs_utils.py new file mode 100644 index 000000000000..9faca38865b4 --- /dev/null +++ b/vllm/entrypoints/openai/logprobs_utils.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +"""Utility functions for converting vLLM logprobs to OpenAI formats.""" + +from collections.abc import Sequence +from typing import Optional + +from openai.types.responses.response_output_text import (Logprob, + LogprobTopLogprob) +from openai.types.responses import response_text_delta_event + +from vllm.logprobs import Logprob as SampleLogprob +from vllm.logprobs import SampleLogprobs +from vllm.transformers_utils.tokenizer import AnyTokenizer + + +def _topk_logprobs(logprobs: dict[int, SampleLogprob], top_logprobs: int, + tokenizer: AnyTokenizer) -> list[LogprobTopLogprob]: + """Returns the top-k logprobs from the logprobs dictionary.""" + out = [] + for i, (token_id, _logprob) in enumerate(logprobs.items()): + if i >= top_logprobs: + break + text = _logprob.decoded_token if _logprob.decoded_token \ + is not None else tokenizer.decode([token_id]) + out.append( + LogprobTopLogprob( + token=text, + logprob=max(_logprob.logprob, -9999.0), + bytes=list(text.encode("utf-8", errors="replace")), + )) + return out + + +def create_stream_response_logprobs( + token_ids: Sequence[int], + logprobs: Optional[SampleLogprobs], + tokenizer: AnyTokenizer, + top_logprobs: Optional[int] = None +) -> list[response_text_delta_event.Logprob]: + """Create streaming response logprobs for OpenAI Responses API.""" + lgs = create_response_logprobs(token_ids=token_ids, + logprobs=logprobs, + tokenizer=tokenizer, + top_logprobs=top_logprobs) + return [ + response_text_delta_event.Logprob( + token=lg.token, + logprob=lg.logprob, + top_logprobs=[ + response_text_delta_event.LogprobTopLogprob( + token=tl.token, logprob=tl.logprob) + for tl in lg.top_logprobs + ]) for lg in lgs + ] + + +def create_response_logprobs( + token_ids: Sequence[int], + logprobs: Optional[SampleLogprobs], + tokenizer: AnyTokenizer, + top_logprobs: Optional[int] = None) -> list[Logprob]: + assert logprobs is not None, "logprobs must be provided" + assert len(token_ids) == len(logprobs), ( + "token_ids and logprobs.token_ids must have the same length") + out = [] + for i, token_id in enumerate(token_ids): + logprob = logprobs[i] + token_logprob = logprob[token_id] + text = token_logprob.decoded_token if token_logprob.decoded_token \ + is not None else tokenizer.decode([token_id]) + out.append( + Logprob( + token=text, + logprob=max(token_logprob.logprob, -9999.0), + bytes=list(text.encode("utf-8", errors="replace")), + top_logprobs=_topk_logprobs(logprob, + top_logprobs=top_logprobs, + tokenizer=tokenizer) + if top_logprobs else [], + )) + return out diff --git a/vllm/entrypoints/openai/protocol.py b/vllm/entrypoints/openai/protocol.py index c56c68cf7644..b366572491a5 100644 --- a/vllm/entrypoints/openai/protocol.py +++ b/vllm/entrypoints/openai/protocol.py @@ -33,12 +33,15 @@ from openai.types.responses.response import ToolChoice from openai.types.responses.tool import Tool from openai.types.shared import Metadata, Reasoning +from openai_harmony import Message from pydantic import (BaseModel, ConfigDict, Field, TypeAdapter, ValidationInfo, field_validator, model_validator) from typing_extensions import TypeAlias from vllm import envs from vllm.entrypoints.chat_utils import (ChatCompletionMessageParam, + chat_completion_message_param_from_dict, + chat_completion_message_param_to_dict, make_tool_call_id) from vllm.entrypoints.score_utils import (ScoreContentPartParam, ScoreMultiModalParam) @@ -54,6 +57,8 @@ _LONG_INFO = torch.iinfo(torch.long) + + class OpenAIBaseModel(BaseModel): # OpenAI API does allow extra fields model_config = ConfigDict(extra="allow") @@ -273,7 +278,19 @@ class ResponsesRequest(OpenAIBaseModel): metadata: Optional[Metadata] = None model: Optional[str] = None parallel_tool_calls: Optional[bool] = True + # If previous_response_id is set, cannot set + # previous_response_messages or previous_response_output + # as it will be looked up from the stores instead previous_response_id: Optional[str] = None + # These can be used when the store is disabled but you want to + # be able to continue a Responses API thread + # if the item is a dict, the type field is expected to exist + previous_response_messages: Optional[list[ChatCompletionMessageParam]] = None + # This is used exclusively to match function calls to outputs + # for the function name + previous_response_output: Optional[list[ResponseOutputItem]] = None + # Dictates whether or not to return Messages as part of the response object + enable_response_messages: bool = False prompt: Optional[ResponsePrompt] = None reasoning: Optional[Reasoning] = None service_tier: Literal["auto", "default", "flex", "scale", @@ -374,6 +391,22 @@ def is_include_output_logprobs(self) -> bool: self.include, list) and "message.output_text.logprobs" in self.include + @field_validator("previous_response_messages", mode="before") + @classmethod + def deserialize_harmony_messages(cls, v): + """Convert incoming ChatCompletionMessageParam objects from dictionaries if required.""" + if v is None: + return v + if isinstance(v, list): + result = [] + for item in v: + if isinstance(item, dict): + result.append(chat_completion_message_param_from_dict(item)) + else: + result.append(item) + return result + raise ValueError(f"Invalid type for harmony messages: {type(v)}") + @model_validator(mode="before") def validate_background(cls, data): if not data.get("background"): @@ -1856,6 +1889,9 @@ class ResponseUsage(OpenAIBaseModel): class ResponsesResponse(OpenAIBaseModel): id: str = Field(default_factory=lambda: f"resp_{random_uuid()}") created_at: int = Field(default_factory=lambda: int(time.time())) + # These are populated when enable_response_messages is set to True + input_messages: Optional[list[dict[str, Any]]] = None + output_messages: Optional[list[dict[str, Any]]] = None # error: Optional[ResponseError] = None # incomplete_details: Optional[IncompleteDetails] = None instructions: Optional[str] = None @@ -1891,12 +1927,16 @@ def from_request( created_time: int, output: list[ResponseOutputItem], status: ResponseStatus, + input_messages: Optional[list[ChatCompletionMessageParam]] = None, + output_messages: Optional[list[ChatCompletionMessageParam]] = None, usage: Optional[ResponseUsage] = None, ) -> "ResponsesResponse": return cls( id=request.request_id, created_at=created_time, instructions=request.instructions, + input_messages=[chat_completion_message_param_to_dict(message) for message in input_messages] if input_messages else None, + output_messages=[chat_completion_message_param_to_dict(message) for message in output_messages] if output_messages else None, metadata=request.metadata, model=model_name, output=output, @@ -2098,7 +2138,7 @@ class DetokenizeResponse(OpenAIBaseModel): class TokenizerInfoResponse(OpenAIBaseModel): """ - Response containing tokenizer configuration + Response containing tokenizer configuration equivalent to tokenizer_config.json """ @@ -2188,7 +2228,7 @@ class TranscriptionRequest(OpenAIBaseModel): to_language: Optional[str] = None """The language of the output audio we transcribe to. - Please note that this is not currently used by supported models at this + Please note that this is not currently used by supported models at this time, but it is a placeholder for future use, matching translation api. """ diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index a102d4a4a5e6..f97ef5de7e2c 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -10,7 +10,7 @@ from contextlib import AsyncExitStack from copy import copy from http import HTTPStatus -from typing import Callable, Final, Optional, Union +from typing import Callable, Final, Optional, Tuple, Union import jinja2 import openai.types.responses as openai_responses_types @@ -28,12 +28,10 @@ ResponseReasoningTextDeltaEvent, ResponseReasoningTextDoneEvent, response_text_delta_event) -from openai.types.responses.response_output_text import (Logprob, - LogprobTopLogprob) # yapf: enable from openai.types.responses.response_reasoning_item import ( Content as ResponseReasoningTextContent) -from openai_harmony import Message as OpenAIHarmonyMessage +from openai_harmony import Message as OpenAIHarmonyMessage, Role from vllm import envs from vllm.config import ModelConfig @@ -41,11 +39,11 @@ from vllm.entrypoints.chat_utils import (ChatCompletionMessageParam, ChatTemplateContentFormatOption) from vllm.entrypoints.context import (ConversationContext, HarmonyContext, - SimpleContext, StreamingHarmonyContext) + ChatContext, StreamingHarmonyContext) from vllm.entrypoints.harmony_utils import ( get_developer_message, get_stop_tokens_for_assistant_actions, - get_system_message, get_user_message, parse_output_message, - parse_remaining_state, parse_response_input, render_for_completion) + get_system_message, get_user_message, + parse_remaining_state_into_output_items, parse_response_input, render_for_completion) from vllm.entrypoints.logger import RequestLogger # yapf conflicts with isort for this block # yapf: disable @@ -56,14 +54,13 @@ ResponsesRequest, ResponsesResponse, ResponseUsage) # yapf: enable -from vllm.entrypoints.openai.serving_engine import OpenAIServing +from vllm.entrypoints.openai.logprobs_utils import (create_response_logprobs, + create_stream_response_logprobs) +from vllm.entrypoints.openai.serving_engine import OpenAIServing, RequestPrompt from vllm.entrypoints.openai.serving_models import OpenAIServingModels from vllm.entrypoints.tool_server import ToolServer from vllm.inputs.data import TokensPrompt as EngineTokensPrompt from vllm.logger import init_logger -from vllm.logprobs import Logprob as SampleLogprob -from vllm.logprobs import SampleLogprobs -from vllm.outputs import CompletionOutput from vllm.reasoning import ReasoningParser, ReasoningParserManager from vllm.sampling_params import SamplingParams from vllm.transformers_utils.tokenizer import AnyTokenizer @@ -222,8 +219,17 @@ async def create_responses( status_code=HTTPStatus.BAD_REQUEST, ) + if (request.previous_response_messages or request.previous_response_output) and request.previous_response_id: + return self.create_error_response( + err_type="invalid_request_error", + message="Cannot use previous_response_id when previous_response_messages or previous_response_output is set", + status_code=HTTPStatus.BAD_REQUEST, + ) + # Handle the previous response ID. prev_response_id = request.previous_response_id + prev_msgs = [] + prev_response_output = [] if prev_response_id is not None: if not prev_response_id.startswith("resp_"): return self._make_invalid_id_error(prev_response_id) @@ -231,20 +237,23 @@ async def create_responses( prev_response = self.response_store.get(prev_response_id) if prev_response is None: return self._make_not_found_error(prev_response_id) + prev_response_output = prev_response.output + prev_msgs = self.msg_store[prev_response_id] else: - prev_response = None + prev_response_output = request.previous_response_output if request.previous_response_output else [] + prev_msgs = request.previous_response_messages if request.previous_response_messages else [] try: lora_request = self._maybe_get_adapters(request) model_name = self._get_model_name(request.model, lora_request) tokenizer = await self.engine_client.get_tokenizer(lora_request) - + input_messages: list[ChatCompletionMessageParam] = [] if self.use_harmony: - messages, request_prompts, engine_prompts = ( - self._make_request_with_harmony(request, prev_response)) + input_messages, request_prompts, engine_prompts = ( + self._make_request_with_harmony(request, prev_response_output, prev_msgs)) # type: ignore else: - messages, request_prompts, engine_prompts = ( - await self._make_request(request, prev_response, + input_messages, request_prompts, engine_prompts = ( + await self._make_request(request, prev_response_output, prev_msgs, tokenizer)) except (ValueError, TypeError, RuntimeError, jinja2.TemplateError, @@ -286,11 +295,14 @@ async def create_responses( if self.use_harmony: if request.stream: context = StreamingHarmonyContext( - messages, available_tools) + input_messages, available_tools) else: - context = HarmonyContext(messages, available_tools) + context = HarmonyContext(input_messages, available_tools) else: - context = SimpleContext() + # TODO: Output logging is only happening for this case and only reasoning tokens + # Adding it in here for now, but should be pulled back out and actually log everything + request_logger = self.request_logger if self.enable_log_outputs else None + context = ChatContext(input_messages, tokenizer, self.reasoning_parser, request_logger) generator = self._generate_with_builtin_tools( request_id=request.request_id, request_prompt=request_prompts[i], @@ -309,10 +321,6 @@ async def create_responses( assert len(generators) == 1 result_generator, = generators - # Store the input messages. - if request.store: - self.msg_store[request.request_id] = messages - if request.background: created_time = int(time.time()) response = ResponsesResponse.from_request( @@ -395,14 +403,15 @@ async def create_responses( async def _make_request( self, request: ResponsesRequest, - prev_response: Optional[ResponsesResponse], + prev_response_output: list[ResponseOutputItem], + prev_msgs: list[ChatCompletionMessageParam], tokenizer: AnyTokenizer, ): if len(request.tools) > 0: raise NotImplementedError( "Tool use is not supported in Responses API without Harmony") # Construct the input messages. - messages = self._construct_input_messages(request, prev_response) + messages = self._construct_input_messages(request, prev_response_output, prev_msgs) _, request_prompts, engine_prompts = await self._preprocess_chat( request, tokenizer, @@ -415,14 +424,17 @@ async def _make_request( def _make_request_with_harmony( self, request: ResponsesRequest, - prev_response: Optional[ResponsesResponse], - ): + prev_response_output: list[ResponseOutputItem], + prev_msgs: list[ChatCompletionMessageParam] + ) -> Tuple[list[OpenAIHarmonyMessage], + Sequence[RequestPrompt], + list[EngineTokensPrompt]]: if request.tool_choice != "auto": raise NotImplementedError( "Only 'auto' tool_choice is supported in " "response API with Harmony") messages = self._construct_input_messages_with_harmony( - request, prev_response) + request, prev_response_output, prev_msgs) prompt_token_ids = render_for_completion(messages) engine_prompt = EngineTokensPrompt(prompt_token_ids=prompt_token_ids) @@ -432,6 +444,8 @@ def _make_request_with_harmony( return messages, [prompt_token_ids], [engine_prompt] + + async def responses_full_generator( self, request: ResponsesRequest, @@ -457,25 +471,19 @@ async def responses_full_generator( # TODO: Use a vllm-specific Validation Error return self.create_error_response(str(e)) + # TODO: In the future this should only return output_messages + # and the conversion to items happens here + output_items, output_messages = context.get_output_and_output_messages(request) + input_messages = context.get_input_messages() if self.use_harmony: assert isinstance(context, HarmonyContext) - output = self._make_response_output_items_with_harmony(context) num_tool_output_tokens = context.num_tool_output_tokens else: - assert isinstance(context, SimpleContext) - final_res = context.last_output - assert final_res is not None - assert len(final_res.outputs) == 1 - final_output = final_res.outputs[0] - - output = self._make_response_output_items(request, final_output, - tokenizer) - + assert isinstance(context, ChatContext) # Calculate usage. - assert final_res.prompt_token_ids is not None num_tool_output_tokens = 0 - assert isinstance(context, (SimpleContext, HarmonyContext)) + assert isinstance(context, (ChatContext, HarmonyContext)) num_prompt_tokens = context.num_prompt_tokens num_generated_tokens = context.num_output_tokens num_cached_tokens = context.num_cached_tokens @@ -496,7 +504,10 @@ async def responses_full_generator( sampling_params, model_name=model_name, created_time=created_time, - output=output, + output=output_items, + # Only if request flag is enabled + input_messages=input_messages if request.enable_response_messages else None, + output_messages=output_messages if request.enable_response_messages else None, status="completed", usage=usage, ) @@ -508,189 +519,36 @@ async def responses_full_generator( if (stored_response is None or stored_response.status != "cancelled"): self.response_store[response.id] = response + # Final store call moved here + # Now stores input and output messages so the ResponseOutputItems + # aren't needed to populate messages on future turns (because they are lossy) + self.msg_store[response.id] = input_messages + output_messages return response - def _topk_logprobs(self, logprobs: dict[int, - SampleLogprob], top_logprobs: int, - tokenizer: AnyTokenizer) -> list[LogprobTopLogprob]: - """Returns the top-k logprobs from the logprobs dictionary.""" - out = [] - for i, (token_id, _logprob) in enumerate(logprobs.items()): - if i >= top_logprobs: - break - text = _logprob.decoded_token if _logprob.decoded_token \ - is not None else tokenizer.decode([token_id]) - out.append( - LogprobTopLogprob( - token=text, - logprob=max(_logprob.logprob, -9999.0), - bytes=list(text.encode("utf-8", errors="replace")), - )) - return out - - def _create_response_logprobs( - self, - token_ids: Sequence[int], - logprobs: Optional[SampleLogprobs], - tokenizer: AnyTokenizer, - top_logprobs: Optional[int] = None) -> list[Logprob]: - assert logprobs is not None, "logprobs must be provided" - assert len(token_ids) == len(logprobs), ( - "token_ids and logprobs.token_ids must have the same length") - out = [] - for i, token_id in enumerate(token_ids): - logprob = logprobs[i] - token_logprob = logprob[token_id] - text = token_logprob.decoded_token if token_logprob.decoded_token \ - is not None else tokenizer.decode([token_id]) - out.append( - Logprob( - token=text, - logprob=max(token_logprob.logprob, -9999.0), - bytes=list(text.encode("utf-8", errors="replace")), - top_logprobs=self._topk_logprobs(logprob, - top_logprobs=top_logprobs, - tokenizer=tokenizer) - if top_logprobs else [], - )) - return out - - def _create_stream_response_logprobs( - self, - token_ids: Sequence[int], - logprobs: Optional[SampleLogprobs], - tokenizer: AnyTokenizer, - top_logprobs: Optional[int] = None - ) -> list[response_text_delta_event.Logprob]: - lgs = self._create_response_logprobs(token_ids=token_ids, - logprobs=logprobs, - tokenizer=tokenizer, - top_logprobs=top_logprobs) - return [ - response_text_delta_event.Logprob( - token=lg.token, - logprob=lg.logprob, - top_logprobs=[ - response_text_delta_event.LogprobTopLogprob( - token=tl.token, logprob=tl.logprob) - for tl in lg.top_logprobs - ]) for lg in lgs - ] - - def _make_response_output_items( - self, - request: ResponsesRequest, - final_output: CompletionOutput, - tokenizer: AnyTokenizer, - ) -> list[ResponseOutputItem]: - if self.reasoning_parser: - try: - reasoning_parser = self.reasoning_parser(tokenizer) - except RuntimeError as e: - logger.exception("Error in reasoning parser creation.") - raise e - - reasoning_content, content = ( - reasoning_parser.extract_reasoning_content(final_output.text, - request=request)) - else: - reasoning_content = None - content = final_output.text - - # Log complete response if output logging is enabled - if self.enable_log_outputs and self.request_logger: - output_text = "" - if content: - output_text = content - elif reasoning_content: - output_text = f"[reasoning: {reasoning_content}]" - - if output_text: - self.request_logger.log_outputs( - request_id=request.request_id, - outputs=output_text, - output_token_ids=final_output.token_ids, - finish_reason=final_output.finish_reason, - is_streaming=False, - delta=False, - ) - - output = [] - if reasoning_content: - reasoning_item = ResponseReasoningItem( - id=f"rs_{random_uuid()}", - summary=[], - type="reasoning", - content=[ - ResponseReasoningTextContent(text=reasoning_content, - type="reasoning_text") - ], - status=None, # NOTE: Only the last output item has status. - ) - output.append(reasoning_item) - if content: - output_text = ResponseOutputText( - text=content, - annotations=[], # TODO - type="output_text", - logprobs=self._create_response_logprobs( - token_ids=final_output.token_ids, - logprobs=final_output.logprobs, - tokenizer=tokenizer, - top_logprobs=request.top_logprobs, - ) if request.is_include_output_logprobs() else None, - ) - message = ResponseOutputMessage( - id=f"msg_{random_uuid()}", - content=[output_text], - role="assistant", - status="completed", - type="message", - ) - output.append(message) - return output - - def _make_response_output_items_with_harmony( - self, - context: HarmonyContext, - ) -> list[ResponseOutputItem]: - output_items = [] - num_init_messages = context.num_init_messages - for msg in context.messages[num_init_messages:]: - output_items.extend(parse_output_message(msg)) - # Handle the generation stopped in the middle (if any). - last_items = parse_remaining_state(context.parser) - if last_items: - output_items.extend(last_items) - return output_items def _construct_input_messages( self, request: ResponsesRequest, - prev_response: Optional[ResponsesResponse] = None, - ) -> list[ChatCompletionMessageParam]: + prev_response_output: list[ResponseOutputItem], + prev_msgs: list[ChatCompletionMessageParam]) -> list[ChatCompletionMessageParam]: messages: list[ChatCompletionMessageParam] = [] if request.instructions: + # TODO: Implement replacing system and developer messages + # with new information from tools as well messages.append({ "role": "system", "content": request.instructions, }) - # Prepend the conversation history. - if prev_response is not None: - # Add the previous messages. - prev_msg = self.msg_store[prev_response.id] - messages.extend(prev_msg) - - # Add the previous output. - for output_item in prev_response.output: - # NOTE: We skip the reasoning output. - if isinstance(output_item, ResponseOutputMessage): - for content in output_item.content: - messages.append({ - "role": "assistant", - "content": content.text, - }) + for message in prev_msgs: + assert isinstance(message, dict), 'Harmony input message seen for non-harmony model' + # Do not reuse previous system messages + if message["role"] == "system": + continue + messages.append(message) + + # TODO: Is there any tool call id matching that needs to happen + # here like in harmony? # Append the new input. # Responses API supports simple text inputs without chat format. @@ -703,68 +561,69 @@ def _construct_input_messages( def _construct_input_messages_with_harmony( self, request: ResponsesRequest, - prev_response: Optional[ResponsesResponse], + prev_response_output: list[ResponseOutputItem], + prev_msgs: list[ChatCompletionMessageParam] ) -> list[OpenAIHarmonyMessage]: messages: list[OpenAIHarmonyMessage] = [] - if prev_response is None: - # New conversation. - reasoning_effort = (request.reasoning.effort - if request.reasoning else None) - tool_types = [tool.type for tool in request.tools] - enable_browser = ("web_search_preview" in tool_types - and self.tool_server is not None - and self.tool_server.has_tool("browser")) - enable_code_interpreter = ("code_interpreter" in tool_types - and self.tool_server is not None - and self.tool_server.has_tool("python")) - sys_msg = get_system_message( - reasoning_effort=reasoning_effort, - browser_description=self.tool_server.get_tool_description( - "browser") - if enable_browser and self.tool_server is not None else None, - python_description=self.tool_server.get_tool_description( - "python") if enable_code_interpreter - and self.tool_server is not None else None, - ) - messages.append(sys_msg) - dev_msg = get_developer_message(request.instructions, - request.tools) - messages.append(dev_msg) - else: - # Continue the previous conversation. - # FIXME(woosuk): Currently, request params like reasoning and - # instructions are ignored. - prev_msgs = self.msg_store[prev_response.id] - # Remove the previous chain-of-thoughts if there is a new "final" - # message. Note that this also removes these messages from the - # msg_store. - if len(prev_msgs) > 0: - last_msg = prev_msgs[-1] - assert isinstance(last_msg, OpenAIHarmonyMessage) - if last_msg.channel == "final": - prev_final_msg_idx = -1 - for i in range(len(prev_msgs) - 2, -1, -1): - prev_msg_i = prev_msgs[i] - assert isinstance(prev_msg_i, OpenAIHarmonyMessage) - if prev_msg_i.channel == "final": - prev_final_msg_idx = i - break - recent_turn_msgs = prev_msgs[prev_final_msg_idx + 1:] - del prev_msgs[prev_final_msg_idx + 1:] - for msg in recent_turn_msgs: - assert isinstance(msg, OpenAIHarmonyMessage) - if msg.channel != "analysis": - prev_msgs.append(msg) - messages.extend(prev_msgs) + # To match OpenAI, instructions, reasoning and tools are + # always taken from the most recent Responses API request + # not carried over from previous requests that you are continuing + reasoning_effort = (request.reasoning.effort + if request.reasoning else None) + tool_types = [tool.type for tool in request.tools] + enable_browser = "web_search_preview" in tool_types + enable_code_interpreter = "code_interpreter" in tool_types + # If we enable responses messages, the user + # may be handling built in tool calls client side + # In this case, we want to enable the tool + # in the prompts, even if we are not calling them server side. + if not request.enable_response_messages: + enable_browser = (enable_browser + and self.tool_server is not None + and self.tool_server.has_tool("browser")) + enable_code_interpreter = (enable_code_interpreter + and self.tool_server is not None + and self.tool_server.has_tool("python")) + print("ENABLED TOOLS", enable_browser, enable_code_interpreter) + print("OTHER TOOLS", request.tools) + sys_msg = get_system_message( + reasoning_effort=reasoning_effort, + browser_description=self.tool_server.get_tool_description( + "browser") + if enable_browser and self.tool_server is not None else None, + python_description=self.tool_server.get_tool_description( + "python") if enable_code_interpreter + and self.tool_server is not None else None, + ) + messages.append(sys_msg) + dev_msg = get_developer_message(request.instructions, + request.tools) + messages.append(dev_msg) + + for message in prev_msgs: + # Handle both OpenAIHarmonyMessage objects and dictionary inputs + if isinstance(message, OpenAIHarmonyMessage): + message_role = message.author.role + # Don't use the previous system or developer messages + if message_role == Role.SYSTEM or message_role == Role.DEVELOPER: + continue + messages.append(message) + else: + # Convert dictionary to harmony message using parse_chat_input + from vllm.entrypoints.harmony_utils import parse_chat_input + harmony_messages = parse_chat_input(message) + for harmony_msg in harmony_messages: + message_role = harmony_msg.author.role + # Don't use the previous system or developer messages + if message_role == Role.SYSTEM or message_role == Role.DEVELOPER: + continue + messages.append(harmony_msg) # Append the new input. # Responses API supports simple text inputs without chat format. if isinstance(request.input, str): messages.append(get_user_message(request.input)) else: - if prev_response is not None: - prev_outputs = copy(prev_response.output) - else: - prev_outputs = [] + prev_outputs = prev_response_output for response_msg in request.input: messages.append( parse_response_input(response_msg, prev_outputs)) @@ -959,7 +818,7 @@ async def _process_simple_streaming_events( first_delta_sent = False previous_delta_messages: list[DeltaMessage] = [] async for ctx in result_generator: - assert isinstance(ctx, SimpleContext) + assert isinstance(ctx, ChatContext) if ctx.last_output is None: continue if ctx.last_output.outputs: @@ -1123,7 +982,7 @@ async def _process_simple_streaming_events( output_index=current_output_index, item_id=current_item_id, delta=delta_message.content, - logprobs=self._create_stream_response_logprobs( + logprobs=create_stream_response_logprobs( token_ids=output.token_ids, logprobs=output.logprobs, tokenizer=tokenizer,