diff --git a/sgl-router/py_test/e2e_response_api/backends/test_grpc_backend.py b/sgl-router/py_test/e2e_response_api/backends/test_grpc_backend.py deleted file mode 100644 index 363f0ec83f5c..000000000000 --- a/sgl-router/py_test/e2e_response_api/backends/test_grpc_backend.py +++ /dev/null @@ -1,177 +0,0 @@ -""" -gRPC backend tests for Response API (including Harmony). - -Run with: - python3 -m pytest py_test/e2e_response_api/backends/test_grpc_backend.py -v - python3 -m unittest e2e_response_api.backends.test_grpc_backend.TestGrpcBackend -""" - -import json -import sys -import unittest -from pathlib import Path - -import openai - -# Add e2e_response_api directory for imports -_TEST_DIR = Path(__file__).parent.parent -sys.path.insert(0, str(_TEST_DIR)) - -# Import local modules -from mixins.function_call import FunctionCallingBaseTest -from mixins.mcp import MCPTests -from mixins.state_management import StateManagementTests -from mixins.structured_output import StructuredOutputBaseTest -from router_fixtures import popen_launch_workers_and_router -from util import kill_process_tree - - -class TestGrpcBackend(StateManagementTests, MCPTests, StructuredOutputBaseTest): - """End to end tests for gRPC backend (Regular backend with Qwen2.5).""" - - @classmethod - def setUpClass(cls): - cls.model = "/home/ubuntu/models/Qwen/Qwen2.5-14B-Instruct" - cls.base_url_port = "http://127.0.0.1:30030" - - cls.cluster = popen_launch_workers_and_router( - cls.model, - cls.base_url_port, - timeout=90, - num_workers=1, - tp_size=2, - policy="round_robin", - worker_args=[ - "--context-length=1000", - ], - router_args=[ - "--history-backend", - "memory", - "--tool-call-parser", - "qwen", - ], - ) - - cls.base_url = cls.cluster["base_url"] - cls.client = openai.Client(api_key=cls.api_key, base_url=cls.base_url + "/v1") - - @classmethod - def tearDownClass(cls): - kill_process_tree(cls.cluster["router"].pid) - for worker in cls.cluster.get("workers", []): - kill_process_tree(worker.pid) - - @unittest.skip("TODO: return 501 Not Implemented") - def test_conversation_with_multiple_turns(self): - super().test_conversation_with_multiple_turns() - - def test_structured_output_json_schema(self): - """Override with simpler schema for Llama model (complex schemas not well supported).""" - params = { - "input": [ - { - "role": "system", - "content": "You are a math solver. Return ONLY a JSON object that matches the schema—no extra text.", - }, - { - "role": "user", - "content": "What is 1 + 1?", - }, - ], - "text": { - "format": { - "type": "json_schema", - "name": "math_answer", - "schema": { - "type": "object", - "properties": {"answer": {"type": "string"}}, - "required": ["answer"], - }, - } - }, - } - - create_resp = self.create_response(**params) - self.assertIsNone(create_resp.error) - self.assertIsNotNone(create_resp.id) - self.assertIsNotNone(create_resp.output) - self.assertIsNotNone(create_resp.text) - - # Verify text format was echoed back correctly - self.assertIsNotNone(create_resp.text.format) - self.assertEqual(create_resp.text.format.type, "json_schema") - self.assertEqual(create_resp.text.format.name, "math_answer") - self.assertIsNotNone(create_resp.text.format.schema_) - - # Find the message output - output_text = next( - ( - content.text - for item in create_resp.output - if item.type == "message" - for content in item.content - if content.type == "output_text" - ), - None, - ) - - self.assertIsNotNone(output_text, "No output_text found in response") - self.assertTrue(output_text.strip(), "output_text is empty") - - # Parse JSON output - output_json = json.loads(output_text) - - # Verify simple schema structure (just answer field) - self.assertIn("answer", output_json) - self.assertIsInstance(output_json["answer"], str) - self.assertTrue(output_json["answer"], "Answer is empty") - - -class TestGrpcHarmonyBackend( - StateManagementTests, MCPTests, FunctionCallingBaseTest, StructuredOutputBaseTest -): - """End to end tests for Harmony backend.""" - - @classmethod - def setUpClass(cls): - cls.model = "/home/ubuntu/models/openai/gpt-oss-20b" - cls.base_url_port = "http://127.0.0.1:30030" - - cls.cluster = popen_launch_workers_and_router( - cls.model, - cls.base_url_port, - timeout=90, - num_workers=1, - tp_size=2, - policy="round_robin", - worker_args=[ - "--reasoning-parser=gpt-oss", - ], - router_args=[ - "--history-backend", - "memory", - ], - ) - - cls.base_url = cls.cluster["base_url"] - cls.client = openai.Client(api_key=cls.api_key, base_url=cls.base_url + "/v1") - - @classmethod - def tearDownClass(cls): - kill_process_tree(cls.cluster["router"].pid) - for worker in cls.cluster.get("workers", []): - kill_process_tree(worker.pid) - - @unittest.skip("TODO: 501 Not Implemented") - def test_conversation_with_multiple_turns(self): - super().test_conversation_with_multiple_turns() - - # Inherited from MCPTests: - # - test_mcp_basic_tool_call - # - test_mcp_basic_tool_call_streaming - # - test_mixed_mcp_and_function_tools (requires external MCP server) - # - test_mixed_mcp_and_function_tools_streaming (requires external MCP server) - - -if __name__ == "__main__": - unittest.main() diff --git a/sgl-router/py_test/e2e_response_api/backends/test_http_backend.py b/sgl-router/py_test/e2e_response_api/backends/test_http_backend.py deleted file mode 100644 index 7bab564aff92..000000000000 --- a/sgl-router/py_test/e2e_response_api/backends/test_http_backend.py +++ /dev/null @@ -1,107 +0,0 @@ -""" -HTTP backend tests for Response API (OpenAI and XAI). - -Run with: - export OPENAI_API_KEY=your_key - export XAI_API_KEY=your_key - python3 -m pytest py_test/e2e_response_api/backends/test_http_backend.py -v - python3 -m unittest e2e_response_api.backends.test_http_backend.TestOpenaiBackend -""" - -import os -import sys -import unittest -from pathlib import Path - -import openai - -# Add e2e_response_api directory for imports -_TEST_DIR = Path(__file__).parent.parent -sys.path.insert(0, str(_TEST_DIR)) - -# Import local modules -from mixins.basic_crud import ConversationCRUDBaseTest, ResponseCRUDBaseTest -from mixins.function_call import FunctionCallingBaseTest -from mixins.mcp import MCPTests -from mixins.state_management import StateManagementTests -from mixins.structured_output import StructuredOutputBaseTest -from router_fixtures import popen_launch_openai_xai_router -from util import kill_process_tree - - -class TestOpenaiBackend( - ResponseCRUDBaseTest, - ConversationCRUDBaseTest, - StateManagementTests, - MCPTests, - FunctionCallingBaseTest, - StructuredOutputBaseTest, -): - """End to end tests for OpenAI backend.""" - - api_key = os.environ.get("OPENAI_API_KEY") - mcp_validation_mode = "strict" # Enable strict validation for HTTP backend - - @classmethod - def setUpClass(cls): - cls.model = "gpt-5-nano" - cls.base_url_port = "http://127.0.0.1:30010" - - cls.cluster = popen_launch_openai_xai_router( - backend="openai", - base_url=cls.base_url_port, - history_backend="memory", - ) - - cls.base_url = cls.cluster["base_url"] - cls.client = openai.Client(api_key=cls.api_key, base_url=cls.base_url + "/v1") - - @classmethod - def tearDownClass(cls): - kill_process_tree(cls.cluster["router"].pid) - - # Inherited from MCPTests: - # - test_mcp_basic_tool_call (with strict validation) - # - test_mcp_basic_tool_call_streaming (with strict validation) - # - test_mixed_mcp_and_function_tools (requires external MCP server) - # - test_mixed_mcp_and_function_tools_streaming (requires external MCP server) - - @unittest.skip( - "Requires external MCP server (deepwiki) - may not be accessible in CI" - ) - def test_mixed_mcp_and_function_tools(self): - super().test_mixed_mcp_and_function_tools() - - @unittest.skip( - "Requires external MCP server (deepwiki) - may not be accessible in CI" - ) - def test_mixed_mcp_and_function_tools_streaming(self): - super().test_mixed_mcp_and_function_tools_streaming() - - -class TestXaiBackend(StateManagementTests): - """End to end tests for XAI backend.""" - - api_key = os.environ.get("XAI_API_KEY") - - @classmethod - def setUpClass(cls): - cls.model = "grok-4-fast" - cls.base_url_port = "http://127.0.0.1:30023" - - cls.cluster = popen_launch_openai_xai_router( - backend="xai", - base_url=cls.base_url_port, - history_backend="memory", - ) - - cls.base_url = cls.cluster["base_url"] - cls.client = openai.Client(api_key=cls.api_key, base_url=cls.base_url + "/v1") - - @classmethod - def tearDownClass(cls): - kill_process_tree(cls.cluster["router"].pid) - - -if __name__ == "__main__": - unittest.main() diff --git a/sgl-router/py_test/e2e_response_api/conftest.py b/sgl-router/py_test/e2e_response_api/conftest.py index 19c082a42de3..786c4a9120ef 100644 --- a/sgl-router/py_test/e2e_response_api/conftest.py +++ b/sgl-router/py_test/e2e_response_api/conftest.py @@ -4,40 +4,129 @@ This configures pytest to not collect base test classes that are meant to be inherited. """ +import os + +import openai import pytest # noqa: F401 +from router_fixtures import ( + popen_launch_openai_xai_router, + popen_launch_workers_and_router, +) +from util import kill_process_tree + +# ------------------------------ +# Backend Configuration Map +# ------------------------------ +BACKENDS = { + "openai": { + "model": "gpt-5-nano", + "base_url_port": "http://127.0.0.1:30010", + "launcher": popen_launch_openai_xai_router, + "launcher_kwargs": { + "backend": "openai", + "history_backend": "memory", + }, + "api_key_env": "OPENAI_API_KEY", + "needs_workers": False, + }, + "xai": { + "model": "grok-4-fast", + "base_url_port": "http://127.0.0.1:30023", + "launcher": popen_launch_openai_xai_router, + "launcher_kwargs": { + "backend": "xai", + "history_backend": "memory", + }, + "api_key_env": "XAI_API_KEY", + "needs_workers": False, + }, + "grpc": { + "model": "/home/ubuntu/models/Qwen/Qwen2.5-14B-Instruct", + "base_url_port": "http://127.0.0.1:30030", + "launcher": popen_launch_workers_and_router, + "launcher_kwargs": { + "timeout": 90, + "num_workers": 1, + "tp_size": 2, + "policy": "round_robin", + "worker_args": ["--context-length=1000"], + "router_args": [ + "--history-backend", + "memory", + "--tool-call-parser", + "qwen", + ], + }, + "api_key_env": None, # grpc does not use API keys + "needs_workers": True, + }, + "grpc_harmony": { + "model": "/home/ubuntu/models/openai/gpt-oss-20b", + "base_url_port": "http://127.0.0.1:30030", + "launcher": popen_launch_workers_and_router, + "launcher_kwargs": { + "timeout": 90, + "num_workers": 1, + "tp_size": 2, + "policy": "round_robin", + "worker_args": ["--reasoning-parser=gpt-oss"], + "router_args": ["--history-backend", "memory"], + }, + "api_key_env": None, + "needs_workers": True, + }, + "oracle_store": { + "model": "gpt-5-nano", + "base_url_port": "http://127.0.0.1:30040", + "launcher": popen_launch_openai_xai_router, + "launcher_kwargs": { + "backend": "openai", + "history_backend": "oracle", + }, + "api_key_env": "OPENAI_API_KEY", + "needs_workers": False, + }, +} + + +@pytest.fixture(scope="class") +def setup_backend(request): + backend = request.param + if backend not in BACKENDS: + raise RuntimeError(f"Unknown backend {backend}") + + cfg = BACKENDS[backend] + + # Launch cluster + cluster = ( + cfg["launcher"]( + cfg["model"], + cfg["base_url_port"], + **cfg["launcher_kwargs"], + ) + if cfg["launcher"] is popen_launch_workers_and_router + else cfg["launcher"]( + backend=cfg["launcher_kwargs"]["backend"], + base_url=cfg["base_url_port"], + history_backend=cfg["launcher_kwargs"]["history_backend"], + ) + ) + + # Build client + api_key = os.environ.get(cfg["api_key_env"]) if cfg["api_key_env"] else None + client = openai.Client( + api_key=api_key, + base_url=cluster["base_url"] + "/v1", + ) + # Yield data to test + try: + yield backend, cfg["model"], client + finally: + # Always kill router + kill_process_tree(cluster["router"].pid) -def pytest_collection_modifyitems(config, items): - """ - Modify test collection to exclude base test classes. - - Base test classes are meant to be inherited, not run directly. - We exclude any test that comes from these base classes: - - StateManagementBaseTest - - ResponseCRUDBaseTest - - ConversationCRUDBaseTest - - MCPTests - - StateManagementTests - - FunctionCallingBaseTest - - StructuredOutputBaseTest - """ - base_class_names = { - "StateManagementBaseTest", - "ResponseCRUDBaseTest", - "ConversationCRUDBaseTest", - "MCPTests", - "StateManagementTests", - "FunctionCallingBaseTest", - "StructuredOutputBaseTest", - } - - # Filter out tests from base classes - filtered_items = [] - for item in items: - # Check if the test's parent class is a base class - parent_name = item.parent.name if hasattr(item, "parent") else None - if parent_name not in base_class_names: - filtered_items.append(item) - - # Update items list - items[:] = filtered_items + # If workers exist, kill them as well + if cfg["needs_workers"]: + for w in cluster.get("workers", []): + kill_process_tree(w.pid) diff --git a/sgl-router/py_test/e2e_response_api/features/test_basic_crud.py b/sgl-router/py_test/e2e_response_api/features/test_basic_crud.py new file mode 100644 index 000000000000..0695feaa2ef3 --- /dev/null +++ b/sgl-router/py_test/e2e_response_api/features/test_basic_crud.py @@ -0,0 +1,241 @@ +""" +Base test class for Response API e2e tests. + +This module provides base test classes that can be reused across different backends +(OpenAI, XAI, gRPC) with common test logic. +""" + +import sys +import time +from pathlib import Path + +import openai +import pytest +from openai import OpenAI +from openai.types import responses + +# Add current directory for local imports +_TEST_DIR = Path(__file__).parent +sys.path.insert(0, str(_TEST_DIR)) + + +@pytest.mark.parametrize("setup_backend", ["openai", "oracle_store"], indirect=True) +class TestResponseCRUD: + """Base class for Response API CRUD tests.""" + + def test_create_and_get_response(self, setup_backend): + """Test creating response and retrieving it.""" + _, model, client = setup_backend + + # Create response + create_resp = client.responses.create(model=model, input="Hello, world!") + assert create_resp.id is not None + assert create_resp.error is None + assert create_resp.status == "completed" + assert len(create_resp.output_text) > 0 + response_id = create_resp.id + + # Get response + get_resp = client.responses.retrieve(response_id=response_id) + assert get_resp.error is None + assert get_resp.id == response_id + assert get_resp.status == "completed" + + input_resp = client.responses.input_items.list(response_id=get_resp.id) + assert input_resp.data is not None + assert len(input_resp.data) > 0 + + @pytest.mark.skip(reason="TODO: Add delete response feature") + def test_delete_response(self, setup_backend): + """Test deleting response.""" + _, model, client = setup_backend + + # Create response + create_resp = client.responses.create(model=model, input="Test deletion") + assert create_resp.id is not None + assert create_resp.error is None + assert create_resp.status == "completed" + assert len(create_resp.output_text) > 0 + + response_id = create_resp.id + + # Delete response + client.responses.delete(response_id=response_id) + + # Verify it's deleted (should return 404) + with pytest.raises(openai.NotFoundError): + client.responses.retrieve(response_id=response_id) + + @pytest.mark.skip(reason="TODO: Add background response feature") + def test_background_response(self, setup_backend): + """Test background response execution.""" + _, model, client = setup_backend + + # Create background response + create_resp = client.responses.create( + model=model, + input="Write a short story", + background=True, + max_output_tokens=100, + ) + assert create_resp.id is not None + assert create_resp.error is None + assert create_resp.status in ["in_progress", "queued"] + + response_id = create_resp.id + + # Wait for completion + final_data = wait_for_background_task(client, response_id, timeout=60) + assert final_data.status == "completed" + + +@pytest.mark.parametrize("setup_backend", ["openai", "oracle_store"], indirect=True) +class TestConversationCRUD: + """Base class for Conversation API CRUD tests.""" + + def test_create_and_get_conversation(self, setup_backend): + """Test creating and retrieving conversation.""" + _, model, client = setup_backend + + # Create conversation + create_resp = client.conversations.create(metadata={"user": "test_user"}) + assert create_resp.id is not None + assert create_resp.created_at is not None + + create_data = create_resp.metadata + assert create_data["user"] == "test_user" + conversation_id = create_resp.id + + # Get conversation + get_resp = client.conversations.retrieve(conversation_id=conversation_id) + assert get_resp.id is not None + assert get_resp.created_at is not None + + get_data = get_resp.metadata + assert get_resp.id == conversation_id + assert get_data["user"] == "test_user" + + def test_update_conversation(self, setup_backend): + """Test updating conversation metadata.""" + _, model, client = setup_backend + + # Create conversation + create_resp = client.conversations.create(metadata={"key1": "value1"}) + assert create_resp.id is not None + assert create_resp.created_at is not None + + create_data = create_resp.metadata + assert create_data["key1"] == "value1" + assert "key2" not in create_data + conversation_id = create_resp.id + + # Update conversation + update_resp = client.conversations.update( + conversation_id=conversation_id, + metadata={"key1": "value1", "key2": "value2"}, + ) + assert update_resp.id == conversation_id + update_data = update_resp.metadata + assert update_data["key1"] == "value1" + assert update_data["key2"] == "value2" + + # Verify update + get_resp = client.conversations.retrieve(conversation_id=conversation_id) + get_data = get_resp.metadata + assert update_data["key1"] == "value1" + assert update_data["key2"] == "value2" + + def test_delete_conversation(self, setup_backend): + """Test deleting conversation.""" + _, model, client = setup_backend + + # Create conversation + create_resp = client.conversations.create() + assert create_resp.id is not None + assert create_resp.created_at is not None + conversation_id = create_resp.id + + # Delete conversation + delete_resp = client.conversations.delete(conversation_id=conversation_id) + assert delete_resp.id is not None + assert delete_resp.deleted + + # Verify deletion + with pytest.raises(openai.NotFoundError): + client.conversations.retrieve(conversation_id=conversation_id) + + def test_list_conversation_items(self, setup_backend): + """Test listing conversation items.""" + _, model, client = setup_backend + + # Create conversation + conv_resp = client.conversations.create() + assert conv_resp.id is not None + conversation_id = conv_resp.id + + # Create response with conversation + resp1 = client.responses.create( + model=model, + input="First message", + conversation=conversation_id, + max_output_tokens=50, + ) + assert resp1.error is None + resp2 = client.responses.create( + model=model, + input="Second message", + conversation=conversation_id, + max_output_tokens=50, + ) + assert resp2.error is None + + # List items + list_resp = client.conversations.items.list(conversation_id=conversation_id) + assert list_resp is not None + assert list_resp.data is not None + + list_data = list_resp.data + # Should have at least 4 items (2 inputs + 2 outputs) + assert len(list_data) >= 4 + + +def wait_for_background_task( + client: OpenAI, response_id: str, timeout: int = 30, poll_interval: float = 0.5 +) -> responses.Response: + """ + Wait for background task to complete. + + Args: + client: openai client + response_id: Response ID to poll + timeout: Max seconds to wait + poll_interval: Seconds between polls + + Returns: + Final response data + + Raises: + TimeoutError: If task doesn't complete in time + AssertionError: If task fails + """ + start_time = time.time() + + while time.time() - start_time < timeout: + resp = client.responses.retrieve(response_id=response_id) + assert resp.error is None + assert resp.id == response_id + + status = resp.status + + if status == "completed": + return resp + elif status == "failed": + raise AssertionError(f"Background task failed: {resp.error}") + elif status == "cancelled": + raise AssertionError("Background task was cancelled") + + time.sleep(poll_interval) + + raise TimeoutError( + f"Background task {response_id} did not complete within {timeout}s" + ) diff --git a/sgl-router/py_test/e2e_response_api/mixins/function_call.py b/sgl-router/py_test/e2e_response_api/features/test_function_call.py similarity index 73% rename from sgl-router/py_test/e2e_response_api/mixins/function_call.py rename to sgl-router/py_test/e2e_response_api/features/test_function_call.py index a5b401fa9965..d530be9b6d87 100644 --- a/sgl-router/py_test/e2e_response_api/mixins/function_call.py +++ b/sgl-router/py_test/e2e_response_api/features/test_function_call.py @@ -9,16 +9,17 @@ import sys from pathlib import Path +import pytest + # Add current directory for local imports _TEST_DIR = Path(__file__).parent sys.path.insert(0, str(_TEST_DIR)) -from basic_crud import ResponseAPIBaseTest - -class FunctionCallingBaseTest(ResponseAPIBaseTest): +@pytest.mark.parametrize("setup_backend", ["openai", "grpc_harmony"], indirect=True) +class TestFunctionCalling: - def test_basic_function_call(self): + def test_basic_function_call(self, setup_backend): """ Test basic function calling workflow. @@ -29,6 +30,8 @@ def test_basic_function_call(self): 4. Execute function locally and provide output 5. Model should generate final response using the function output """ + _, model, client = setup_backend + # 1. Define a list of callable tools for the model tools = [ { @@ -60,38 +63,38 @@ def test_basic_function_call(self): ] # 2. Prompt the model with tools defined - resp = self.create_response(input=input_list, tools=tools) + resp = client.responses.create(model=model, input=input_list, tools=tools) # Should successfully make the request - self.assertIsNone(resp.error) + assert resp.error is None # Basic response structure - self.assertIsNotNone(resp.id) - self.assertEqual(resp.status, "completed") - self.assertIsNotNone(resp.output) + assert resp.id is not None + assert resp.status == "completed" + assert resp.output is not None # Verify output array is not empty output = resp.output - self.assertIsInstance(output, list) - self.assertGreater(len(output), 0) + assert isinstance(output, list) + assert len(output) > 0 # Check for function_call in output function_calls = [item for item in output if item.type == "function_call"] - self.assertGreater( - len(function_calls), 0, "Response should contain at least one function_call" - ) + assert ( + len(function_calls) > 0 + ), "Response should contain at least one function_call" # Verify function_call structure function_call = function_calls[0] - self.assertIsNotNone(function_call.call_id) - self.assertIsNotNone(function_call.name) - self.assertEqual(function_call.name, "get_horoscope") - self.assertIsNotNone(function_call.arguments) + assert function_call.call_id is not None + assert function_call.name is not None + assert function_call.name == "get_horoscope" + assert function_call.arguments is not None # Parse arguments args = json.loads(function_call.arguments) - self.assertIn("sign", args) - self.assertEqual(args["sign"].lower(), "aquarius") + assert "sign" in args + assert args["sign"].lower() == "aquarius" # 3. Save function call outputs for subsequent requests input_list.append(function_call) @@ -109,36 +112,34 @@ def test_basic_function_call(self): ) # 6. Make second request with function output - resp2 = self.create_response( + resp2 = client.responses.create( + model=model, input=input_list, instructions="Respond only with a horoscope generated by a tool.", tools=tools, ) - self.assertIsNone(resp2.error) - self.assertEqual(resp2.status, "completed") + assert resp2.error is None + assert resp2.status == "completed" # The model should be able to give a response using the function output output2 = resp2.output - self.assertGreater(len(output2), 0) + assert len(output2) > 0 # Find message output messages = [item for item in output2 if item.type == "message"] - self.assertGreater( - len(messages), 0, "Response should contain at least one message" - ) + assert len(messages) > 0, "Response should contain at least one message" # Verify message contains the horoscope message = messages[0] - self.assertIsNotNone(message.content) + assert message.content is not None content_parts = message.content - self.assertGreater(len(content_parts), 0) + assert len(content_parts) > 0 # Get text from content text_parts = [part.text for part in content_parts if part.type == "output_text"] full_text = " ".join(text_parts).lower() # Should mention the horoscope or baby otter - self.assertTrue( - "baby otter" in full_text or "aquarius" in full_text, - "Response should reference the horoscope content", - ) + assert ( + "baby otter" in full_text or "aquarius" in full_text + ), "Response should reference the horoscope content" diff --git a/sgl-router/py_test/e2e_response_api/features/test_mcp.py b/sgl-router/py_test/e2e_response_api/features/test_mcp.py new file mode 100644 index 000000000000..68f19ae3b6f2 --- /dev/null +++ b/sgl-router/py_test/e2e_response_api/features/test_mcp.py @@ -0,0 +1,356 @@ +""" +MCP (Model Context Protocol) tests for Response API. + +Tests MCP tool calling in both streaming and non-streaming modes. +These tests should work across all backends that support MCP (OpenAI, XAI). +""" + +import json +import time + +import pytest + + +@pytest.mark.parametrize( + "setup_backend", ["openai", "grpc", "grpc_harmony"], indirect=True +) +class TestMcp: + """Tests for MCP tool calling in both streaming and non-streaming modes.""" + + # Class attribute to control validation strictness + # Subclasses can override this to enable strict validation + mcp_validation_mode = "relaxed" + + # Shared constants for MCP tests + BRAVE_MCP_TOOL = { + "type": "mcp", + "server_label": "brave", + "server_description": "A Tool to do web search", + "server_url": "http://localhost:8001/sse", + "require_approval": "never", + } + + MCP_TEST_PROMPT = ( + "show me some news about sglang router, use the tool to just search " + "one result and return one sentence response" + ) + + SYSTEM_DIAGNOSTICS_FUNCTION = { + "type": "function", + "name": "get_system_diagnostics", + "description": "Retrieve real-time diagnostics for a spacecraft system.", + "parameters": { + "type": "object", + "properties": { + "system_name": { + "type": "string", + "description": "Name of the spacecraft system to query. " + "Example: 'Astra-7 Core Reactor'.", + } + }, + "required": ["system_name"], + }, + } + + def test_mcp_basic_tool_call(self, setup_backend): + """Test basic MCP tool call (non-streaming). + + Validation strictness is controlled by parameter `backend` from setup_backend fixture. + Set to "strict" if backend is http. + """ + backend, model, client = setup_backend + + # To avoid being rate-limited by brave search server + time.sleep(2) + + resp = client.responses.create( + model=model, + input=self.MCP_TEST_PROMPT, + tools=[self.BRAVE_MCP_TOOL], + stream=False, + reasoning={"effort": "low"}, + ) + + # Should successfully make the request + assert resp.error is None + + # Basic response structure + assert resp.id is not None + assert resp.status == "completed" + assert resp.model is not None + assert resp.output is not None + + # Verify output array is not empty + assert len(resp.output_text) > 0 + + # Check for MCP-specific output types + output_types = [item.type for item in resp.output] + + # Should have mcp_list_tools - tools are listed before calling + assert ( + "mcp_list_tools" in output_types + ), "Response should contain mcp_list_tools" + + # Should have at least one mcp_call + mcp_calls = [item for item in resp.output if item.type == "mcp_call"] + assert len(mcp_calls) > 0, "Response should contain at least one mcp_call" + + # Verify mcp_call structure + for mcp_call in mcp_calls: + assert mcp_call.id is not None + assert mcp_call.error is None + assert mcp_call.status == "completed" + assert mcp_call.server_label == "brave" + assert mcp_call.name is not None + assert mcp_call.arguments is not None + assert mcp_call.output is not None + + # Strict mode: additional validation for HTTP backends + if backend == "openai": + # Should have final message output + messages = [item for item in resp.output if item.type == "message"] + assert len(messages) > 0, "Response should contain at least one message" + # Verify message structure + for msg in messages: + assert msg.content is not None + assert isinstance(msg.content, list) + + # Check content has text + for content_item in msg.content: + if content_item.type == "output_text": + assert content_item.text is not None + assert isinstance(content_item.text, str) + assert len(content_item.text) > 0 + + def test_mcp_basic_tool_call_streaming(self, setup_backend): + """Test basic MCP tool call (streaming). + + Validation strictness is controlled by the class attribute `mcp_validation_mode`. + Set to "strict" in subclasses for additional HTTP-specific validation. + """ + backend, model, client = setup_backend + + # To avoid being rate-limited by brave search server + time.sleep(2) + + resp = client.responses.create( + model=model, + input=self.MCP_TEST_PROMPT, + tools=[self.BRAVE_MCP_TOOL], + stream=True, + reasoning={"effort": "low"}, + ) + + # Should successfully make the request + events = [event for event in resp] + assert len(events) > 0 + + event_types = [event.type for event in events] + # Check for lifecycle events + assert "response.created" in event_types, "Should have response.created event" + assert ( + "response.completed" in event_types + ), "Should have response.completed event" + + # Check for MCP list tools events + assert ( + "response.output_item.added" in event_types + ), "Should have output_item.added events" + assert ( + "response.mcp_list_tools.in_progress" in event_types + ), "Should have mcp_list_tools.in_progress event" + assert ( + "response.mcp_list_tools.completed" in event_types + ), "Should have mcp_list_tools.completed event" + + # Check for MCP call events + assert ( + "response.mcp_call.in_progress" in event_types + ), "Should have mcp_call.in_progress event" + assert ( + "response.mcp_call_arguments.delta" in event_types + ), "Should have mcp_call_arguments.delta event" + assert ( + "response.mcp_call_arguments.done" in event_types + ), "Should have mcp_call_arguments.done event" + assert ( + "response.mcp_call.completed" in event_types + ), "Should have mcp_call.completed event" + + # Verify final completed event has full response + completed_events = [e for e in events if e.type == "response.completed"] + assert len(completed_events) == 1 + + final_response = completed_events[0].response + assert final_response.id is not None + assert final_response.status == "completed" + assert final_response.output is not None + + # Verify final output contains expected items + final_output = final_response.output + final_output_types = [item.type for item in final_output] + + assert "mcp_list_tools" in final_output_types + assert "mcp_call" in final_output_types + + # Verify mcp_call items in final output + mcp_calls = [item for item in final_output if item.type == "mcp_call"] + assert len(mcp_calls) > 0 + + for mcp_call in mcp_calls: + assert mcp_call.error is None + assert mcp_call.status == "completed" + assert mcp_call.server_label == "brave" + assert mcp_call.name is not None + assert mcp_call.arguments is not None + assert mcp_call.output is not None + + # Strict mode: additional validation for HTTP backends + if backend == "openai": + # Check for text output events + assert ( + "response.content_part.added" in event_types + ), "Should have content_part.added event" + assert ( + "response.output_text.delta" in event_types + ), "Should have output_text.delta events" + assert ( + "response.output_text.done" in event_types + ), "Should have output_text.done event" + assert ( + "response.content_part.done" in event_types + ), "Should have content_part.done event" + + assert "message" in final_output_types + + # Verify text deltas combine to final message + text_deltas = [ + e.delta for e in events if e.type == "response.output_text.delta" + ] + assert len(text_deltas) > 0, "Should have text deltas" + + # Get final text from output_text.done event + text_done_events = [ + e for e in events if e.type == "response.output_text.done" + ] + assert len(text_done_events) > 0 + + final_text = text_done_events[0].text + assert len(final_text) > 0, "Final text should not be empty" + + def test_mixed_mcp_and_function_tools(self, setup_backend): + """Test mixed MCP and function tools (non-streaming).""" + backend, model, client = setup_backend + + if backend in ["openai"]: + pytest.skip( + "Requires external MCP server (deepwiki) - may not be accessible in CI" + ) + + resp = client.responses.create( + model=model, + input="Give me diagnostics for the Astra-7 Core Reactor.", + tools=[self.BRAVE_MCP_TOOL, self.SYSTEM_DIAGNOSTICS_FUNCTION], + stream=False, + tool_choice="auto", + ) + + # Should successfully make the request + assert resp.error is None + + # Basic response structure + assert resp.id is not None + assert resp.status is not None + assert resp.output is not None + + # Verify output array is not empty + output = resp.output + assert isinstance(output, list) + assert len(output) > 0 + + # Check for function_call (not mcp_call for get_system_diagnostics) + function_calls = [item for item in output if item.type == "function_call"] + assert ( + len(function_calls) > 0 + ), "Response should contain at least one function_call" + + # Verify function_call structure for get_system_diagnostics + system_diagnostics_call = function_calls[0] + assert system_diagnostics_call.name == "get_system_diagnostics" + assert system_diagnostics_call.call_id is not None + assert system_diagnostics_call.arguments is not None + assert system_diagnostics_call.status is not None + + # Parse and verify arguments + args = json.loads(system_diagnostics_call.arguments) + assert "system_name" in args + assert "astra-7" in args["system_name"].lower() + + def test_mixed_mcp_and_function_tools_streaming(self, setup_backend): + """Test mixed MCP and function tools (streaming).""" + backend, model, client = setup_backend + + if backend in ["openai"]: + pytest.skip( + "Requires external MCP server (deepwiki) - may not be accessible in CI" + ) + + resp = client.responses.create( + model=model, + input="Give me diagnostics for the Astra-7 Core Reactor.", + tools=[self.BRAVE_MCP_TOOL, self.SYSTEM_DIAGNOSTICS_FUNCTION], + stream=True, + tool_choice="auto", # Encourage tool usage + ) + + # Should successfully make the request + events = [event for event in resp] + assert len(events) > 0 + + event_types = [e.type for e in events] + + # Check for lifecycle events + assert "response.created" in event_types, "Should have response.created event" + + # Should have mcp_list_tools events + assert ( + "response.mcp_list_tools.completed" in event_types + ), "Should have mcp_list_tools.completed event" + + # Should have function_call_arguments events (not mcp_call_arguments) + assert ( + "response.function_call_arguments.delta" in event_types + ), "Should have function_call_arguments.delta event for function tools" + assert ( + "response.function_call_arguments.done" in event_types + ), "Should have function_call_arguments.done event for function tools" + + # Should NOT have mcp_call_arguments events for function tools + # (get_system_diagnostics should use function_call_arguments, not mcp_call_arguments) + mcp_call_arg_events = [ + e + for e in events + if e.type == "response.mcp_call_arguments.delta" + and "get_system_diagnostics" in str(e.delta) + ] + assert ( + len(mcp_call_arg_events) == 0 + ), "Should NOT emit mcp_call_arguments.delta for function tools (get_system_diagnostics)" + + # Verify function_call_arguments.delta event structure + func_arg_deltas = [ + e for e in events if e.type == "response.function_call_arguments.delta" + ] + assert ( + len(func_arg_deltas) > 0 + ), "Should have function_call_arguments.delta events" + + # Check that delta event contains system_name arguments + full_delta_event = "" + for event in func_arg_deltas: + full_delta_event += event.delta + + assert ( + "system_name" in full_delta_event.lower() + and "astra-7" in full_delta_event.lower() + ), "function_call_arguments.delta should contain system_name and astra-7" diff --git a/sgl-router/py_test/e2e_response_api/features/test_state_management.py b/sgl-router/py_test/e2e_response_api/features/test_state_management.py new file mode 100644 index 000000000000..88f5346a9e04 --- /dev/null +++ b/sgl-router/py_test/e2e_response_api/features/test_state_management.py @@ -0,0 +1,161 @@ +""" +State management tests for Response API. + +Tests both previous_response_id and conversation-based state management. +These tests should work across all backends (OpenAI, XAI, gRPC). +""" + +import openai +import pytest + + +@pytest.mark.parametrize( + "setup_backend", ["openai", "xai", "grpc", "grpc_harmony"], indirect=True +) +class TestStateManagement: + """Tests for state management using previous_response_id and conversation.""" + + def test_basic_response_creation(self, setup_backend): + """Test basic response creation without state.""" + _, model, client = setup_backend + + resp = client.responses.create(model=model, input="What is 2+2?") + + assert resp.id is not None + assert resp.error is None + assert resp.status == "completed" + assert len(resp.output_text) > 0 + assert resp.usage is not None + + def test_streaming_response(self, setup_backend): + """Test streaming response.""" + _, model, client = setup_backend + + resp = client.responses.create( + model=model, input="Count to 5", stream=True, max_output_tokens=50 + ) + + # Check for response.created event + events = [event for event in resp] + created_events = [event for event in events if event.type == "response.created"] + assert len(created_events) > 0 + + # Check for final completed event or in_progress events + assert any( + event.type in ["response.completed", "response.in_progress"] + for event in events + ) + + def test_previous_response_id_chaining(self, setup_backend): + """Test chaining responses using previous_response_id.""" + _, model, client = setup_backend + # First response + resp1 = client.responses.create( + model=model, input="My name is Alice and my friend is Bob. Remember it." + ) + assert resp1.error is None + assert resp1.status == "completed" + response1_id = resp1.id + + # Second response referencing first + resp2 = client.responses.create( + model=model, input="What is my name", previous_response_id=response1_id + ) + assert resp2.error is None + assert resp2.status == "completed" + + # The model should remember the name from previous response + assert "Alice" in resp2.output_text + + # Third response referencing second + resp3 = client.responses.create( + model=model, + input="What is my friend name?", + previous_response_id=resp2.id, + ) + assert resp3.error is None + assert resp3.status == "completed" + assert "Bob" in resp3.output_text + + @pytest.mark.skip(reason="TODO: Add the invalid previous_response_id check") + def test_previous_response_id_invalid(self, setup_backend): + """Test using invalid previous_response_id.""" + _, model, client = setup_backend + with pytest.raises(openai.BadRequestError): + client.responses.create( + model=model, + input="Test", + previous_response_id="resp_invalid123", + max_output_tokens=50, + ) + + def test_conversation_with_multiple_turns(self, setup_backend): + """Test state management using conversation ID.""" + backend, model, client = setup_backend + + if backend in ["grpc", "grpc_harmony"]: + pytest.skip("TODO: 501 Not Implemented") + + # Create conversation + conv_resp = client.conversations.create(metadata={"topic": "math"}) + assert conv_resp.id is not None + assert conv_resp.created_at is not None + + conversation_id = conv_resp.id + + # First response in conversation + resp1 = client.responses.create( + model=model, input="I have 5 apples.", conversation=conversation_id + ) + assert resp1.error is None + assert resp1.status == "completed" + + # Second response in same conversation + resp2 = client.responses.create( + model=model, + input="How many apples do I have?", + conversation=conversation_id, + ) + assert resp2.error is None + assert resp2.status == "completed" + output_text = resp2.output_text + + # Should remember "5 apples" + assert "5" in output_text or "five" in output_text.lower() + + # Third response in same conversation + resp3 = client.responses.create( + model=model, + input="If I get 3 more, how many total?", + conversation=conversation_id, + ) + assert resp3.error is None + assert resp3.status == "completed" + output_text = resp3.output_text + + # Should calculate 5 + 3 = 8 + assert "8" in output_text or "eight" in output_text.lower() + list_resp = client.conversations.items.list(conversation_id) + assert list_resp.data is not None + items = list_resp.data + # Should have at least 6 items (3 inputs + 3 outputs) + assert len(items) >= 6 + + def test_mutually_exclusive_parameters(self, setup_backend): + """Test that previous_response_id and conversation are mutually exclusive.""" + _, model, client = setup_backend + + # TODO: Remove this once the conversation API is implemented for GRPC backend + conversation_id = "conv_123" + + resp1 = client.responses.create(model=model, input="Test") + response1_id = resp1.id + + # Try to use both parameters + with pytest.raises(openai.BadRequestError): + client.responses.create( + model=model, + input="This should fail", + previous_response_id=response1_id, + conversation=conversation_id, + ) diff --git a/sgl-router/py_test/e2e_response_api/features/test_structured_output.py b/sgl-router/py_test/e2e_response_api/features/test_structured_output.py new file mode 100644 index 000000000000..adfce1e03ce6 --- /dev/null +++ b/sgl-router/py_test/e2e_response_api/features/test_structured_output.py @@ -0,0 +1,172 @@ +""" +Structured output tests for Response API. + +Tests for text.format field with json_object and json_schema formats. +""" + +import json +import sys +from pathlib import Path + +import pytest + +# Add current directory for local imports +_TEST_DIR = Path(__file__).parent +sys.path.insert(0, str(_TEST_DIR)) + + +@pytest.mark.parametrize("setup_backend", ["openai", "grpc_harmony"], indirect=True) +class TestStructuredOutput: + + def test_structured_output_json_schema(self, setup_backend): + """Test structured output with json_schema format.""" + _, model, client = setup_backend + + # Create response with structured output + params = { + "model": model, + "input": [ + { + "role": "system", + "content": "You are a helpful math tutor. Guide the user through the solution step by step.", + }, + {"role": "user", "content": "how can I solve 8x + 7 = -23"}, + ], + "text": { + "format": { + "type": "json_schema", + "name": "math_reasoning", + "schema": { + "type": "object", + "properties": { + "steps": { + "type": "array", + "items": { + "type": "object", + "properties": { + "explanation": {"type": "string"}, + "output": {"type": "string"}, + }, + "required": ["explanation", "output"], + "additionalProperties": False, + }, + }, + "final_answer": {"type": "string"}, + }, + "required": ["steps", "final_answer"], + "additionalProperties": False, + }, + "strict": True, + } + }, + } + + create_resp = client.responses.create(**params) + assert create_resp.error is None + assert create_resp.id is not None + assert create_resp.output is not None + assert create_resp.text is not None + + # Verify text format was echoed back correctly + assert create_resp.text.format is not None + assert create_resp.text.format.type == "json_schema" + assert create_resp.text.format.name == "math_reasoning" + assert create_resp.text.format.schema_ is not None + assert create_resp.text.format.strict + + # Find the message output (output[0] may be reasoning, output[1] is message) + output_text = next( + ( + content.text + for item in create_resp.output + if item.type == "message" + for content in item.content + if content.type == "output_text" + ), + None, + ) + + assert output_text is not None, "No output_text found in response" + assert output_text.strip(), "output_text is empty" + + # Parse JSON output + output_json = json.loads(output_text) + + # Verify schema structure + assert "steps" in output_json + assert "final_answer" in output_json + assert isinstance(output_json["steps"], list) + assert len(output_json["steps"]) > 0 + + # Verify each step has required fields + for step in output_json["steps"]: + assert "explanation" in step + assert "output" in step + + +@pytest.mark.parametrize("setup_backend", ["grpc"], indirect=True) +class TestSimpleSchemaStructuredOutput: + + def test_structured_output_json_schema(self, setup_backend): + """Override with simpler schema for Llama model (complex schemas not well supported).""" + _, model, client = setup_backend + + params = { + "model": model, + "input": [ + { + "role": "system", + "content": "You are a math solver. Return ONLY a JSON object that matches the schema—no extra text.", + }, + { + "role": "user", + "content": "What is 1 + 1?", + }, + ], + "text": { + "format": { + "type": "json_schema", + "name": "math_answer", + "schema": { + "type": "object", + "properties": {"answer": {"type": "string"}}, + "required": ["answer"], + }, + } + }, + } + + create_resp = client.responses.create(**params) + assert create_resp.error is None + assert create_resp.id is not None + assert create_resp.output is not None + assert create_resp.text is not None + + # Verify text format was echoed back correctly + assert create_resp.text.format is not None + assert create_resp.text.format.type == "json_schema" + assert create_resp.text.format.name == "math_answer" + assert create_resp.text.format.schema_ is not None + + # Find the message output + output_text = next( + ( + content.text + for item in create_resp.output + if item.type == "message" + for content in item.content + if content.type == "output_text" + ), + None, + ) + + assert output_text is not None, "No output_text found in response" + assert output_text.strip(), "output_text is empty" + + # Parse JSON output + output_json = json.loads(output_text) + + # Verify simple schema structure (just answer field) + assert "answer" in output_json + assert isinstance(output_json["answer"], str) + assert output_json["answer"], "Answer is empty" diff --git a/sgl-router/py_test/e2e_response_api/mixins/basic_crud.py b/sgl-router/py_test/e2e_response_api/mixins/basic_crud.py deleted file mode 100644 index 2c4536608c9e..000000000000 --- a/sgl-router/py_test/e2e_response_api/mixins/basic_crud.py +++ /dev/null @@ -1,408 +0,0 @@ -""" -Base test class for Response API e2e tests. - -This module provides base test classes that can be reused across different backends -(OpenAI, XAI, gRPC) with common test logic. -""" - -from __future__ import annotations - -import sys -import time -import unittest -from pathlib import Path -from typing import Optional, Union - -import openai -from openai.types import conversations, responses - -# Add current directory for local imports -_TEST_DIR = Path(__file__).parent -sys.path.insert(0, str(_TEST_DIR)) - -from util import CustomTestCase - - -class ResponseAPIBaseTest(CustomTestCase): - """Base class for Response API tests with common utilities.""" - - # To be set by subclasses - base_url: str = None - api_key: str = None - model: str = None - client: openai.OpenAI = None - - def create_response( - self, - input: Union[str, responses.ResponseInputParam], - instructions: Optional[str] = None, - stream: bool = False, - max_output_tokens: Optional[int] = None, - temperature: Optional[float] = None, - previous_response_id: Optional[str] = None, - conversation: Optional[str] = None, - tools: Optional[list] = None, - background: bool = False, - **kwargs, - ) -> responses.Response | openai.Stream[responses.ResponseStreamEvent]: - """ - Create a response via POST /v1/responses. - - Args: - input: User input - instructions: Optional system instructions - stream: Whether to stream response - max_output_tokens: Optional max tokens to generate - temperature: Sampling temperature - previous_response_id: Optional previous response ID for state management - conversation: Optional conversation ID for state management - tools: Optional list of MCP tools - background: Whether to run in background mode - **kwargs: Additional request parameters - - Returns: - Response object for non-stream request - ResponseStreamEvent for stream request - """ - params = { - "model": self.model, - "input": input, - "stream": stream, - **kwargs, - } - - if instructions: - params["instructions"] = instructions - - if max_output_tokens is not None: - params["max_output_tokens"] = max_output_tokens - - if temperature is not None: - params["temperature"] = temperature - - if previous_response_id: - params["previous_response_id"] = previous_response_id - - if conversation: - params["conversation"] = conversation - - if tools: - params["tools"] = tools - - if background: - params["background"] = background - - return self.client.responses.create(**params) - - def get_response( - self, response_id: str - ) -> responses.Response | openai.Stream[responses.ResponseStreamEvent]: - """Get response by ID via GET /v1/responses/{response_id}.""" - return self.client.responses.retrieve(response_id=response_id) - - def delete_response(self, response_id: str) -> None: - """Delete response by ID via DELETE /v1/responses/{response_id}.""" - return self.client.responses.delete(response_id=response_id) - - def cancel_response(self, response_id: str) -> responses.Response: - """Cancel response by ID via POST /v1/responses/{response_id}/cancel.""" - return self.client.responses.cancel(response_id=response_id) - - def get_response_input_items( - self, response_id: str - ) -> openai.pagination.SyncCursorPage[responses.ResponseItem]: - """Get response input items via GET /v1/responses/{response_id}/input_items.""" - return self.client.responses.input_items.list(response_id=response_id) - - def create_conversation( - self, metadata: Optional[dict] = None - ) -> conversations.Conversation: - """Create conversation via POST /v1/conversations.""" - params = {} - if metadata: - params["metadata"] = metadata - return self.client.conversations.create(**params) - - def get_conversation(self, conversation_id: str) -> conversations.Conversation: - """Get conversation by ID via GET /v1/conversations/{conversation_id}.""" - return self.client.conversations.retrieve(conversation_id=conversation_id) - - def update_conversation( - self, conversation_id: str, metadata: dict - ) -> conversations.Conversation: - """Update conversation via POST /v1/conversations/{conversation_id}.""" - return self.client.conversations.update( - conversation_id=conversation_id, metadata=metadata - ) - - def delete_conversation( - self, conversation_id: str - ) -> conversations.ConversationDeletedResource: - """Delete conversation via DELETE /v1/conversations/{conversation_id}.""" - return self.client.conversations.delete(conversation_id=conversation_id) - - def list_conversation_items( - self, - conversation_id: str, - limit: Optional[int] = None, - after: Optional[str] = None, - order: str = "asc", - ) -> openai.pagination.SyncConversationCursorPage[conversations.ConversationItem]: - """List conversation items via GET /v1/conversations/{conversation_id}/items.""" - params = {"conversation_id": conversation_id, "order": order} - if limit: - params["limit"] = limit - if after: - params["after"] = after - return self.client.conversations.items.list(**params) - - def create_conversation_items( - self, conversation_id: str, items: list - ) -> conversations.ConversationItemList: - """Create conversation items via POST /v1/conversations/{conversation_id}/items.""" - return self.client.conversations.items.create( - conversation_id=conversation_id, items=items - ) - - def get_conversation_item( - self, conversation_id: str, item_id: str - ) -> conversations.ConversationItem: - """Get conversation item via GET /v1/conversations/{conversation_id}/items/{item_id}.""" - return self.client.conversations.items.retrieve( - conversation_id=conversation_id, item_id=item_id - ) - - def delete_conversation_item( - self, conversation_id: str, item_id: str - ) -> conversations.Conversation: - """Delete conversation item via DELETE /v1/conversations/{conversation_id}/items/{item_id}.""" - return self.client.conversations.items.delete( - conversation_id=conversation_id, item_id=item_id - ) - - def wait_for_background_task( - self, response_id: str, timeout: int = 30, poll_interval: float = 0.5 - ) -> responses.Response: - """ - Wait for background task to complete. - - Args: - response_id: Response ID to poll - timeout: Max seconds to wait - poll_interval: Seconds between polls - - Returns: - Final response data - - Raises: - TimeoutError: If task doesn't complete in time - AssertionError: If task fails - """ - start_time = time.time() - - while time.time() - start_time < timeout: - resp = self.get_response(response_id) - self.assertIsNone(resp.error) - self.assertEqual(resp.id, response_id) - - status = resp.status - - if status == "completed": - return resp - elif status == "failed": - raise AssertionError(f"Background task failed: {resp.error}") - elif status == "cancelled": - raise AssertionError("Background task was cancelled") - - time.sleep(poll_interval) - - raise TimeoutError( - f"Background task {response_id} did not complete within {timeout}s" - ) - - -class StateManagementBaseTest(ResponseAPIBaseTest): - """Base class for state management tests (previous_response_id and conversation).""" - - def test_basic_response_creation(self): - """Test basic response creation without state.""" - resp = self.create_response("What is 2+2?", max_output_tokens=50) - - self.assertIsNotNone(resp.id) - self.assertIsNone(resp.error) - self.assertEqual(resp.status, "completed") - self.assertGreater(len(resp.output_text), 0) - self.assertGreater(resp.usage.input_tokens, 0) - self.assertGreater(resp.usage.output_tokens, 0) - self.assertGreater(resp.usage.total_tokens, 0) - - def test_streaming_response(self): - """Test streaming response.""" - resp = self.create_response("Count to 5", stream=True, max_output_tokens=50) - - # Check for response.created event - events = [event for event in resp] - created_events = [event for event in events if event.type == "response.created"] - self.assertGreater(len(created_events), 0) - - # Check for final completed event or in_progress events - self.assertTrue( - any( - event.type in ["response.completed", "response.in_progress"] - for event in events - ) - ) - - -class ResponseCRUDBaseTest(ResponseAPIBaseTest): - """Base class for Response API CRUD tests.""" - - def test_create_and_get_response(self): - """Test creating response and retrieving it.""" - # Create response - create_resp = self.create_response("Hello, world!") - self.assertIsNotNone(create_resp.id) - self.assertIsNone(create_resp.error) - self.assertEqual(create_resp.status, "completed") - self.assertGreater(len(create_resp.output_text), 0) - response_id = create_resp.id - - # Get response - get_resp = self.get_response(response_id) - self.assertIsNone(get_resp.error) - self.assertEqual(get_resp.id, response_id) - self.assertEqual(get_resp.status, "completed") - - input_resp = self.get_response_input_items(get_resp.id) - self.assertIsNotNone(input_resp.data) - self.assertGreater(len(input_resp.data), 0) - - @unittest.skip("TODO: Add delete response feature") - def test_delete_response(self): - """Test deleting response.""" - # Create response - create_resp = self.create_response("Test deletion") - self.assertIsNotNone(create_resp.id) - self.assertIsNone(create_resp.error) - self.assertEqual(create_resp.status, "completed") - self.assertGreater(len(create_resp.output_text), 0) - - response_id = create_resp.id - - # Delete response - self.delete_response(response_id) - - # Verify it's deleted (should return 404) - with self.assertRaises(openai.NotFoundError): - self.get_response(response_id) - - @unittest.skip("TODO: Add background response feature") - def test_background_response(self): - """Test background response execution.""" - # Create background response - create_resp = self.create_response( - "Write a short story", background=True, max_output_tokens=100 - ) - self.assertIsNotNone(create_resp.id) - self.assertIsNone(create_resp.error) - self.assertIn(create_resp.status, ["in_progress", "queued"]) - - response_id = create_resp.id - - # Wait for completion - final_data = self.wait_for_background_task(response_id, timeout=60) - self.assertEqual(final_data.status, "completed") - - -class ConversationCRUDBaseTest(ResponseAPIBaseTest): - """Base class for Conversation API CRUD tests.""" - - def test_create_and_get_conversation(self): - """Test creating and retrieving conversation.""" - # Create conversation - create_resp = self.create_conversation(metadata={"user": "test_user"}) - self.assertIsNotNone(create_resp.id) - self.assertIsNotNone(create_resp.created_at) - - create_data = create_resp.metadata - self.assertEqual(create_data["user"], "test_user") - conversation_id = create_resp.id - - # Get conversation - get_resp = self.get_conversation(conversation_id) - self.assertIsNotNone(get_resp.id) - self.assertIsNotNone(get_resp.created_at) - - get_data = get_resp.metadata - self.assertEqual(get_resp.id, conversation_id) - self.assertEqual(get_data["user"], "test_user") - - def test_update_conversation(self): - """Test updating conversation metadata.""" - # Create conversation - create_resp = self.create_conversation(metadata={"key1": "value1"}) - self.assertIsNotNone(create_resp.id) - self.assertIsNotNone(create_resp.created_at) - - create_data = create_resp.metadata - self.assertEqual(create_data["key1"], "value1") - self.assertNotIn("key2", create_data) - conversation_id = create_resp.id - - # Update conversation - update_resp = self.update_conversation( - conversation_id, metadata={"key1": "value1", "key2": "value2"} - ) - self.assertEqual(update_resp.id, conversation_id) - update_data = update_resp.metadata - self.assertEqual(update_data["key1"], "value1") - self.assertEqual(update_data["key2"], "value2") - - # Verify update - get_resp = self.get_conversation(conversation_id) - get_data = get_resp.metadata - self.assertEqual(get_data["key1"], "value1") - self.assertEqual(get_data["key2"], "value2") - - def test_delete_conversation(self): - """Test deleting conversation.""" - # Create conversation - create_resp = self.create_conversation() - self.assertIsNotNone(create_resp.id) - self.assertIsNotNone(create_resp.created_at) - conversation_id = create_resp.id - - # Delete conversation - delete_resp = self.delete_conversation(conversation_id) - self.assertIsNotNone(delete_resp.id) - self.assertTrue(delete_resp.deleted) - - # Verify deletion - with self.assertRaises(openai.NotFoundError): - self.get_conversation(conversation_id) - - def test_list_conversation_items(self): - """Test listing conversation items.""" - # Create conversation - conv_resp = self.create_conversation() - self.assertIsNotNone(conv_resp.id) - conversation_id = conv_resp.id - - # Create response with conversation - resp1 = self.create_response( - "First message", conversation=conversation_id, max_output_tokens=50 - ) - self.assertIsNone(resp1.error) - resp2 = self.create_response( - "Second message", conversation=conversation_id, max_output_tokens=50 - ) - self.assertIsNone(resp2.error) - - # List items - list_resp = self.list_conversation_items(conversation_id) - self.assertIsNotNone(list_resp) - self.assertIsNotNone(list_resp.data) - - list_data = list_resp.data - # Should have at least 4 items (2 inputs + 2 outputs) - self.assertGreaterEqual(len(list_data), 4) diff --git a/sgl-router/py_test/e2e_response_api/mixins/mcp.py b/sgl-router/py_test/e2e_response_api/mixins/mcp.py deleted file mode 100644 index 0912c2a819ee..000000000000 --- a/sgl-router/py_test/e2e_response_api/mixins/mcp.py +++ /dev/null @@ -1,357 +0,0 @@ -""" -MCP (Model Context Protocol) tests for Response API. - -Tests MCP tool calling in both streaming and non-streaming modes. -These tests should work across all backends that support MCP (OpenAI, XAI). -""" - -import json - -from basic_crud import ResponseAPIBaseTest - - -class MCPTests(ResponseAPIBaseTest): - """Tests for MCP tool calling in both streaming and non-streaming modes.""" - - # Class attribute to control validation strictness - # Subclasses can override this to enable strict validation - mcp_validation_mode = "relaxed" - - # Shared constants for MCP tests - BRAVE_MCP_TOOL = { - "type": "mcp", - "server_label": "brave", - "server_description": "A Tool to do web search", - "server_url": "http://localhost:8001/sse", - "require_approval": "never", - } - - MCP_TEST_PROMPT = ( - "show me some news about sglang router, use the tool to just search " - "one result and return one sentence response" - ) - - GET_WEATHER_FUNCTION = { - "type": "function", - "name": "get_weather", - "description": "Get the current weather in a given location", - "parameters": { - "type": "object", - "properties": {"location": {"type": "string"}}, - "required": ["location"], - }, - } - - def test_mcp_basic_tool_call(self): - """Test basic MCP tool call (non-streaming). - - Validation strictness is controlled by the class attribute `mcp_validation_mode`. - Set to "strict" in subclasses for additional HTTP-specific validation. - """ - resp = self.create_response( - self.MCP_TEST_PROMPT, - tools=[self.BRAVE_MCP_TOOL], - stream=False, - reasoning={"effort": "low"}, - ) - - # Should successfully make the request - self.assertIsNone(resp.error) - - # Basic response structure - self.assertIsNotNone(resp.id) - self.assertEqual(resp.status, "completed") - self.assertIsNotNone(resp.model) - self.assertIsNotNone(resp.output) - - # Verify output array is not empty - self.assertGreater(len(resp.output_text), 0) - - # Check for MCP-specific output types - output_types = [item.type for item in resp.output] - - # Should have mcp_list_tools - tools are listed before calling - self.assertIn( - "mcp_list_tools", output_types, "Response should contain mcp_list_tools" - ) - - # Should have at least one mcp_call - mcp_calls = [item for item in resp.output if item.type == "mcp_call"] - self.assertGreater( - len(mcp_calls), 0, "Response should contain at least one mcp_call" - ) - - # Verify mcp_call structure - for mcp_call in mcp_calls: - self.assertIsNotNone(mcp_call.id) - self.assertEqual(mcp_call.status, "completed") - self.assertEqual(mcp_call.server_label, "brave") - self.assertIsNotNone(mcp_call.name) - self.assertIsNotNone(mcp_call.arguments) - self.assertIsNotNone(mcp_call.output) - - # Strict mode: additional validation for HTTP backends - if self.mcp_validation_mode == "strict": - # Should have final message output - messages = [item for item in resp.output if item.type == "message"] - self.assertGreater( - len(messages), 0, "Response should contain at least one message" - ) - # Verify message structure - for msg in messages: - self.assertIsNotNone(msg.content) - self.assertIsInstance(msg.content, list) - - # Check content has text - for content_item in msg.content: - if content_item.type == "output_text": - self.assertIsNotNone(content_item.text) - self.assertIsInstance(content_item.text, str) - self.assertGreater(len(content_item.text), 0) - - def test_mcp_basic_tool_call_streaming(self): - """Test basic MCP tool call (streaming). - - Validation strictness is controlled by the class attribute `mcp_validation_mode`. - Set to "strict" in subclasses for additional HTTP-specific validation. - """ - resp = self.create_response( - self.MCP_TEST_PROMPT, - tools=[self.BRAVE_MCP_TOOL], - stream=True, - reasoning={"effort": "low"}, - ) - - # Should successfully make the request - events = [event for event in resp] - self.assertGreater(len(events), 0) - - event_types = [event.type for event in events] - # Check for lifecycle events - self.assertIn( - "response.created", event_types, "Should have response.created event" - ) - self.assertIn( - "response.completed", event_types, "Should have response.completed event" - ) - - # Check for MCP list tools events - self.assertIn( - "response.output_item.added", - event_types, - "Should have output_item.added events", - ) - self.assertIn( - "response.mcp_list_tools.in_progress", - event_types, - "Should have mcp_list_tools.in_progress event", - ) - self.assertIn( - "response.mcp_list_tools.completed", - event_types, - "Should have mcp_list_tools.completed event", - ) - - # Check for MCP call events - self.assertIn( - "response.mcp_call.in_progress", - event_types, - "Should have mcp_call.in_progress event", - ) - self.assertIn( - "response.mcp_call_arguments.delta", - event_types, - "Should have mcp_call_arguments.delta event", - ) - self.assertIn( - "response.mcp_call_arguments.done", - event_types, - "Should have mcp_call_arguments.done event", - ) - self.assertIn( - "response.mcp_call.completed", - event_types, - "Should have mcp_call.completed event", - ) - - # Verify final completed event has full response - completed_events = [e for e in events if e.type == "response.completed"] - self.assertEqual(len(completed_events), 1) - - final_response = completed_events[0].response - self.assertIsNotNone(final_response.id) - self.assertEqual(final_response.status, "completed") - self.assertIsNotNone(final_response.output) - - # Verify final output contains expected items - final_output = final_response.output - final_output_types = [item.type for item in final_output] - - self.assertIn("mcp_list_tools", final_output_types) - self.assertIn("mcp_call", final_output_types) - - # Verify mcp_call items in final output - mcp_calls = [item for item in final_output if item.type == "mcp_call"] - self.assertGreater(len(mcp_calls), 0) - - for mcp_call in mcp_calls: - self.assertEqual(mcp_call.status, "completed") - self.assertEqual(mcp_call.server_label, "brave") - self.assertIsNotNone(mcp_call.name) - self.assertIsNotNone(mcp_call.arguments) - self.assertIsNotNone(mcp_call.output) - - # Strict mode: additional validation for HTTP backends - if self.mcp_validation_mode == "strict": - # Check for text output events - self.assertIn( - "response.content_part.added", - event_types, - "Should have content_part.added event", - ) - self.assertIn( - "response.output_text.delta", - event_types, - "Should have output_text.delta events", - ) - self.assertIn( - "response.output_text.done", - event_types, - "Should have output_text.done event", - ) - self.assertIn( - "response.content_part.done", - event_types, - "Should have content_part.done event", - ) - - self.assertIn("message", final_output_types) - - # Verify text deltas combine to final message - text_deltas = [ - e.delta for e in events if e.type == "response.output_text.delta" - ] - self.assertGreater(len(text_deltas), 0, "Should have text deltas") - - # Get final text from output_text.done event - text_done_events = [ - e for e in events if e.type == "response.output_text.done" - ] - self.assertGreater(len(text_done_events), 0) - - final_text = text_done_events[0].text - self.assertGreater(len(final_text), 0, "Final text should not be empty") - - def test_mixed_mcp_and_function_tools(self): - """Test mixed MCP and function tools (non-streaming).""" - resp = self.create_response( - "What is the weather in seattle now?", - tools=[self.BRAVE_MCP_TOOL, self.GET_WEATHER_FUNCTION], - stream=False, - tool_choice="auto", - ) - - # Should successfully make the request - self.assertIsNone(resp.error) - - # Basic response structure - self.assertIsNotNone(resp.id) - self.assertIsNotNone(resp.status) - self.assertIsNotNone(resp.output) - - # Verify output array is not empty - output = resp.output - self.assertIsInstance(output, list) - self.assertGreater(len(output), 0) - - # Check for function_call (not mcp_call for get_weather) - function_calls = [item for item in output if item.type == "function_call"] - self.assertGreater( - len(function_calls), 0, "Response should contain at least one function_call" - ) - - # Verify function_call structure for get_weather - weather_call = function_calls[0] - self.assertEqual(weather_call.name, "get_weather") - self.assertIsNotNone(weather_call.call_id) - self.assertIsNotNone(weather_call.arguments) - self.assertIsNotNone(weather_call.status) - - # Parse and verify arguments - args = json.loads(weather_call.arguments) - self.assertIn("location", args) - self.assertIn("seattle", args["location"].lower()) - - def test_mixed_mcp_and_function_tools_streaming(self): - """Test mixed MCP and function tools (streaming).""" - resp = self.create_response( - "What is the weather in seattle now?", - tools=[self.BRAVE_MCP_TOOL, self.GET_WEATHER_FUNCTION], - stream=True, - tool_choice="auto", # Encourage tool usage - ) - - # Should successfully make the request - events = [event for event in resp] - self.assertGreater(len(events), 0) - - event_types = [e.type for e in events] - - # Check for lifecycle events - self.assertIn( - "response.created", event_types, "Should have response.created event" - ) - - # Should have mcp_list_tools events - self.assertIn( - "response.mcp_list_tools.completed", - event_types, - "Should have mcp_list_tools.completed event", - ) - - # Should have function_call_arguments events (not mcp_call_arguments) - self.assertIn( - "response.function_call_arguments.delta", - event_types, - "Should have function_call_arguments.delta event for function tools", - ) - self.assertIn( - "response.function_call_arguments.done", - event_types, - "Should have function_call_arguments.done event for function tools", - ) - - # Should NOT have mcp_call_arguments events for function tools - # (get_weather should use function_call_arguments, not mcp_call_arguments) - mcp_call_arg_events = [ - e - for e in events - if e.type == "response.mcp_call_arguments.delta" - and "get_weather" in str(e.delta) - ] - self.assertEqual( - len(mcp_call_arg_events), - 0, - "Should NOT emit mcp_call_arguments.delta for function tools (get_weather)", - ) - - # Verify function_call_arguments.delta event structure - func_arg_deltas = [ - e for e in events if e.type == "response.function_call_arguments.delta" - ] - self.assertGreater( - len(func_arg_deltas), 0, "Should have function_call_arguments.delta events" - ) - - # Check that at least one delta event contains location arguments - has_location = False - for event in func_arg_deltas: - delta = event.delta - if "location" in delta.lower() or "seattle" in delta.lower(): - has_location = True - break - - self.assertTrue( - has_location, - "function_call_arguments.delta should contain location/seattle", - ) diff --git a/sgl-router/py_test/e2e_response_api/mixins/state_management.py b/sgl-router/py_test/e2e_response_api/mixins/state_management.py deleted file mode 100644 index f12bd971b3c7..000000000000 --- a/sgl-router/py_test/e2e_response_api/mixins/state_management.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -State management tests for Response API. - -Tests both previous_response_id and conversation-based state management. -These tests should work across all backends (OpenAI, XAI, gRPC). -""" - -import unittest - -import openai -from basic_crud import ResponseAPIBaseTest - - -class StateManagementTests(ResponseAPIBaseTest): - """Tests for state management using previous_response_id and conversation.""" - - def test_previous_response_id_chaining(self): - """Test chaining responses using previous_response_id.""" - # First response - resp1 = self.create_response( - "My name is Alice and my friend is Bob. Remember it." - ) - self.assertIsNone(resp1.error) - self.assertEqual(resp1.status, "completed") - response1_id = resp1.id - - # Second response referencing first - resp2 = self.create_response( - "What is my name", previous_response_id=response1_id - ) - self.assertIsNone(resp2.error) - self.assertEqual(resp2.status, "completed") - - # The model should remember the name from previous response - self.assertIn("Alice", resp2.output_text) - - # Third response referencing second - resp3 = self.create_response( - "What is my friend name?", - previous_response_id=resp2.id, - ) - self.assertIsNone(resp3.error) - self.assertEqual(resp3.status, "completed") - self.assertIn("Bob", resp3.output_text) - - @unittest.skip("TODO: Add the invalid previous_response_id check") - def test_previous_response_id_invalid(self): - """Test using invalid previous_response_id.""" - with self.assertRaises(openai.BadRequestError): - self.create_response( - "Test", previous_response_id="resp_invalid123", max_output_tokens=50 - ) - - def test_conversation_with_multiple_turns(self): - """Test state management using conversation ID.""" - # Create conversation - conv_resp = self.create_conversation(metadata={"topic": "math"}) - self.assertIsNotNone(conv_resp.id) - self.assertIsNotNone(conv_resp.created_at) - - conversation_id = conv_resp.id - - # First response in conversation - resp1 = self.create_response("I have 5 apples.", conversation=conversation_id) - self.assertIsNone(resp1.error) - self.assertEqual(resp1.status, "completed") - - # Second response in same conversation - resp2 = self.create_response( - "How many apples do I have?", - conversation=conversation_id, - ) - self.assertIsNone(resp2.error) - self.assertEqual(resp2.status, "completed") - output_text = resp2.output_text - - # Should remember "5 apples" - self.assertTrue("5" in output_text or "five" in output_text.lower()) - - # Third response in same conversation - resp3 = self.create_response( - "If I get 3 more, how many total?", - conversation=conversation_id, - ) - self.assertIsNone(resp3.error) - self.assertEqual(resp3.status, "completed") - output_text = resp3.output_text - - # Should calculate 5 + 3 = 8 - self.assertTrue("8" in output_text or "eight" in output_text.lower()) - list_resp = self.list_conversation_items(conversation_id) - self.assertIsNotNone(list_resp.data) - items = list_resp.data - # Should have at least 6 items (3 inputs + 3 outputs) - self.assertGreaterEqual(len(items), 6) - - def test_mutually_exclusive_parameters(self): - """Test that previous_response_id and conversation are mutually exclusive.""" - # TODO: Remove this once the conversation API is implemented for GRPC backend - conversation_id = "conv_123" - - resp1 = self.create_response("Test") - response1_id = resp1.id - - # Try to use both parameters - with self.assertRaises(openai.BadRequestError): - self.create_response( - "This should fail", - previous_response_id=response1_id, - conversation=conversation_id, - ) - - # Helper methods - - def _extract_output_text(self, response_data: dict) -> str: - """Extract text content from response output.""" - output = response_data.get("output", []) - if not output: - return "" - - text_parts = [] - for item in output: - content = item.get("content", []) - for part in content: - if part.get("type") == "output_text": - text_parts.append(part.get("text", "")) - - return " ".join(text_parts) diff --git a/sgl-router/py_test/e2e_response_api/mixins/structured_output.py b/sgl-router/py_test/e2e_response_api/mixins/structured_output.py deleted file mode 100644 index e71342ec367e..000000000000 --- a/sgl-router/py_test/e2e_response_api/mixins/structured_output.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -Structured output tests for Response API. - -Tests for text.format field with json_object and json_schema formats. -""" - -import json -import sys -from pathlib import Path - -# Add current directory for local imports -_TEST_DIR = Path(__file__).parent -sys.path.insert(0, str(_TEST_DIR)) - -from basic_crud import ResponseAPIBaseTest - - -class StructuredOutputBaseTest(ResponseAPIBaseTest): - - def test_structured_output_json_schema(self): - """Test structured output with json_schema format.""" - - # Create response with structured output - params = { - "input": [ - { - "role": "system", - "content": "You are a helpful math tutor. Guide the user through the solution step by step.", - }, - {"role": "user", "content": "how can I solve 8x + 7 = -23"}, - ], - "text": { - "format": { - "type": "json_schema", - "name": "math_reasoning", - "schema": { - "type": "object", - "properties": { - "steps": { - "type": "array", - "items": { - "type": "object", - "properties": { - "explanation": {"type": "string"}, - "output": {"type": "string"}, - }, - "required": ["explanation", "output"], - "additionalProperties": False, - }, - }, - "final_answer": {"type": "string"}, - }, - "required": ["steps", "final_answer"], - "additionalProperties": False, - }, - "strict": True, - } - }, - } - - create_resp = self.create_response(**params) - self.assertIsNone(create_resp.error) - self.assertIsNotNone(create_resp.id) - self.assertIsNotNone(create_resp.output) - self.assertIsNotNone(create_resp.text) - - # Verify text format was echoed back correctly - self.assertIsNotNone(create_resp.text.format) - self.assertEqual(create_resp.text.format.type, "json_schema") - self.assertEqual(create_resp.text.format.name, "math_reasoning") - self.assertIsNotNone(create_resp.text.format.schema_) - self.assertEqual(create_resp.text.format.strict, True) - - # Find the message output (output[0] may be reasoning, output[1] is message) - output_text = next( - ( - content.text - for item in create_resp.output - if item.type == "message" - for content in item.content - if content.type == "output_text" - ), - None, - ) - - self.assertIsNotNone(output_text, "No output_text found in response") - self.assertTrue(output_text.strip(), "output_text is empty") - - # Parse JSON output - output_json = json.loads(output_text) - - # Verify schema structure - self.assertIn("steps", output_json) - self.assertIn("final_answer", output_json) - self.assertIsInstance(output_json["steps"], list) - self.assertGreater(len(output_json["steps"]), 0) - - # Verify each step has required fields - for step in output_json["steps"]: - self.assertIn("explanation", step) - self.assertIn("output", step) diff --git a/sgl-router/py_test/e2e_response_api/persistence/test_oracle_store.py b/sgl-router/py_test/e2e_response_api/persistence/test_oracle_store.py deleted file mode 100644 index ea1634450a1c..000000000000 --- a/sgl-router/py_test/e2e_response_api/persistence/test_oracle_store.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -Oracle database storage backend tests for Response API. - -Run with: - export OPENAI_API_KEY=your_key - python3 -m pytest py_test/e2e_response_api/persistence/test_oracle_store.py -v - python3 -m unittest e2e_response_api.persistence.test_oracle_store.TestOracleStore -""" - -import os -import sys -import unittest -from pathlib import Path - -import openai - -# Add e2e_response_api directory for imports -_TEST_DIR = Path(__file__).parent.parent -sys.path.insert(0, str(_TEST_DIR)) - -# Import local modules -from mixins.basic_crud import ConversationCRUDBaseTest, ResponseCRUDBaseTest -from router_fixtures import popen_launch_openai_xai_router -from util import kill_process_tree - - -class TestOracleStore(ResponseCRUDBaseTest, ConversationCRUDBaseTest): - """End to end tests for Oracle database storage backend.""" - - api_key = os.environ.get("OPENAI_API_KEY") - - @classmethod - def setUpClass(cls): - cls.model = "gpt-5-nano" - cls.base_url_port = "http://127.0.0.1:30040" - - cls.cluster = popen_launch_openai_xai_router( - backend="openai", - base_url=cls.base_url_port, - history_backend="oracle", - ) - - cls.base_url = cls.cluster["base_url"] - cls.client = openai.Client(api_key=cls.api_key, base_url=cls.base_url + "/v1") - - @classmethod - def tearDownClass(cls): - kill_process_tree(cls.cluster["router"].pid) - - -if __name__ == "__main__": - unittest.main()