diff --git a/docs/deployment/frameworks/runpod.md b/docs/deployment/frameworks/runpod.md new file mode 100644 index 000000000000..1bd39131e23c --- /dev/null +++ b/docs/deployment/frameworks/runpod.md @@ -0,0 +1,87 @@ +# RunPod + +vLLM can be deployed on [RunPod](https://www.runpod.io/), a cloud GPU platform that provides on-demand and serverless GPU instances for AI inference workloads. + +## Prerequisites + +- A RunPod account with GPU pod access +- A GPU pod running a CUDA-compatible template (e.g., `runpod/pytorch`) + +## Starting the Server + +SSH into your RunPod pod and launch the vLLM OpenAI-compatible server: + +```bash +python -m vllm.entrypoints.openai.api_server \ + --model \ + --host 0.0.0.0 \ + --port 8000 +``` + +!!! note + + Use `--host 0.0.0.0` to bind to all interfaces so the server is reachable from outside the container. + +## Exposing Port 8000 + +RunPod exposes HTTP services through its proxy. To make port 8000 accessible: + +1. In the RunPod dashboard, navigate to your pod settings. +2. Add `8000` to the list of exposed HTTP ports. +3. After the pod restarts, RunPod provides a public URL in the format: + + ```text + https://-8000.proxy.runpod.net + ``` + +## Troubleshooting 502 Bad Gateway + +A `502 Bad Gateway` error from the RunPod proxy typically means the server is not yet listening. Common causes: + +- **Model still loading** — Large models take time to download and load into GPU memory. Check the pod logs for progress. +- **Wrong host binding** — Ensure you passed `--host 0.0.0.0`. Binding to `127.0.0.1` (the default) makes the server unreachable from the proxy. +- **Port mismatch** — Verify the `--port` value matches the port exposed in the RunPod dashboard. +- **Out of GPU memory** — The model may be too large for the allocated GPU. Check logs for CUDA OOM errors and consider using a larger instance or adding `--tensor-parallel-size` for multi-GPU pods. + +## Verifying the Deployment + +Once the server is running, test it with a curl request: + +??? console "Command" + + ```bash + curl https://-8000.proxy.runpod.net/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "", + "messages": [ + {"role": "user", "content": "Hello, how are you?"} + ], + "max_tokens": 50 + }' + ``` + +??? console "Response" + + ```json + { + "id": "chat-abc123", + "object": "chat.completion", + "choices": [ + { + "message": { + "role": "assistant", + "content": "I'm doing well, thank you for asking! How can I help you today?" + }, + "index": 0, + "finish_reason": "stop" + } + ] + } + ``` + +You can also check the server health endpoint: + +```bash +curl https://-8000.proxy.runpod.net/health +``` diff --git a/tests/v1/entrypoints/openai/serving_responses/test_store_eviction.py b/tests/v1/entrypoints/openai/serving_responses/test_store_eviction.py new file mode 100644 index 000000000000..67b42b4253ab --- /dev/null +++ b/tests/v1/entrypoints/openai/serving_responses/test_store_eviction.py @@ -0,0 +1,215 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Unit tests for Responses API store LRU eviction. + +These tests verify the eviction and deletion logic without importing the +full vLLM stack (no torch/CUDA dependency), by reimplementing the core +methods under test and binding them to lightweight mock objects. +""" + +import asyncio +import logging +from collections import OrderedDict, deque +from unittest.mock import MagicMock + +import pytest + +# --------------------------------------------------------------------------- +# Replicate the two methods under test so we can run without torch/vllm deps. +# These must stay in sync with OpenAIServingResponses in serving.py. +# --------------------------------------------------------------------------- + +logger = logging.getLogger(__name__) + + +def _evict_oldest_store_entries(self) -> None: + while len(self.response_store) > self.store_max_size: + evicted_id, _ = self.response_store.popitem(last=False) + self.msg_store.pop(evicted_id, None) + self.event_store.pop(evicted_id, None) + + # Cancel background task if it exists + if task := self.background_tasks.pop(evicted_id, None): + task.cancel() + + logger.debug("Evicted response %s from store (LRU)", evicted_id) + + +async def _delete_response(self, response_id: str): + async with self.response_store_lock: + response = self.response_store.pop(response_id, None) + if response is None: + return self._make_not_found_error(response_id) + self.msg_store.pop(response_id, None) + self.event_store.pop(response_id, None) + + if task := self.background_tasks.pop(response_id, None): + task.cancel() + + response.status = "cancelled" + return response + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_mock_response(response_id: str) -> MagicMock: + resp = MagicMock() + resp.id = response_id + resp.status = "completed" + return resp + + +def _make_serving(max_size: int = 3): + serving = MagicMock() + serving.response_store = OrderedDict() + serving.msg_store = OrderedDict() + serving.event_store = OrderedDict() + serving.store_max_size = max_size + serving.response_store_lock = asyncio.Lock() + serving.background_tasks = {} + serving._evict_oldest_store_entries = _evict_oldest_store_entries.__get__( + serving, type(serving) + ) + serving.delete_response = _delete_response.__get__(serving, type(serving)) + serving._make_not_found_error = MagicMock( + return_value=MagicMock(error=MagicMock(code=404)) + ) + return serving + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestStoreEviction: + """Test LRU eviction logic for response_store, msg_store, event_store.""" + + def test_eviction_removes_oldest(self): + """When store exceeds max_size, oldest entries are evicted.""" + serving = _make_serving(max_size=2) + + for i in range(3): + rid = f"resp_{i}" + serving.response_store[rid] = _make_mock_response(rid) + serving.msg_store[rid] = [{"role": "user", "content": f"msg_{i}"}] + + serving._evict_oldest_store_entries() + + assert len(serving.response_store) == 2 + assert "resp_0" not in serving.response_store + assert "resp_1" in serving.response_store + assert "resp_2" in serving.response_store + assert "resp_0" not in serving.msg_store + + def test_eviction_cleans_all_stores(self): + """Eviction removes entries from all three stores.""" + serving = _make_serving(max_size=1) + + rid = "resp_old" + serving.response_store[rid] = _make_mock_response(rid) + serving.msg_store[rid] = [{"role": "user", "content": "old"}] + serving.event_store[rid] = (deque(), asyncio.Event()) + + rid_new = "resp_new" + serving.response_store[rid_new] = _make_mock_response(rid_new) + + serving._evict_oldest_store_entries() + + assert "resp_old" not in serving.response_store + assert "resp_old" not in serving.msg_store + assert "resp_old" not in serving.event_store + assert "resp_new" in serving.response_store + + def test_no_eviction_under_limit(self): + """No eviction when store is under max_size.""" + serving = _make_serving(max_size=5) + + for i in range(3): + rid = f"resp_{i}" + serving.response_store[rid] = _make_mock_response(rid) + + serving._evict_oldest_store_entries() + + assert len(serving.response_store) == 3 + + def test_eviction_bulk(self): + """Eviction handles large overshoot correctly.""" + serving = _make_serving(max_size=2) + + for i in range(100): + rid = f"resp_{i}" + serving.response_store[rid] = _make_mock_response(rid) + + serving._evict_oldest_store_entries() + + assert len(serving.response_store) == 2 + assert "resp_98" in serving.response_store + assert "resp_99" in serving.response_store + + def test_eviction_cancels_background_tasks(self): + """Eviction cancels background tasks for evicted responses.""" + serving = _make_serving(max_size=1) + + # Add response with background task + rid_old = "resp_old" + serving.response_store[rid_old] = _make_mock_response(rid_old) + mock_task = MagicMock() + serving.background_tasks[rid_old] = mock_task + + # Add new response to trigger eviction + rid_new = "resp_new" + serving.response_store[rid_new] = _make_mock_response(rid_new) + serving._evict_oldest_store_entries() + + # Verify old task was cancelled + mock_task.cancel.assert_called_once() + assert rid_old not in serving.background_tasks + + @pytest.mark.asyncio + async def test_delete_response(self): + """delete_response removes from all stores.""" + serving = _make_serving() + + rid = "resp_to_delete" + serving.response_store[rid] = _make_mock_response(rid) + serving.msg_store[rid] = [{"role": "user", "content": "hi"}] + serving.event_store[rid] = (deque(), asyncio.Event()) + + result = await serving.delete_response(rid) + + assert result.status == "cancelled" + assert rid not in serving.response_store + assert rid not in serving.msg_store + assert rid not in serving.event_store + + @pytest.mark.asyncio + async def test_delete_nonexistent_response(self): + """delete_response returns error for unknown ID.""" + serving = _make_serving() + + await serving.delete_response("nonexistent") + + serving._make_not_found_error.assert_called_once_with("nonexistent") + + def test_lru_move_to_end(self): + """Accessing an entry moves it to end, protecting it from eviction.""" + serving = _make_serving(max_size=2) + + serving.response_store["resp_a"] = _make_mock_response("resp_a") + serving.response_store["resp_b"] = _make_mock_response("resp_b") + + # Access resp_a, moving it to end (simulates retrieve) + serving.response_store.move_to_end("resp_a") + + # Add a new entry, triggering eviction + serving.response_store["resp_c"] = _make_mock_response("resp_c") + serving._evict_oldest_store_entries() + + # resp_b should be evicted (oldest), resp_a should survive + assert "resp_b" not in serving.response_store + assert "resp_a" in serving.response_store + assert "resp_c" in serving.response_store diff --git a/vllm/entrypoints/openai/responses/api_router.py b/vllm/entrypoints/openai/responses/api_router.py index 62328c045df4..fd4d81be9631 100644 --- a/vllm/entrypoints/openai/responses/api_router.py +++ b/vllm/entrypoints/openai/responses/api_router.py @@ -137,5 +137,27 @@ async def cancel_responses(response_id: str, raw_request: Request): return JSONResponse(content=response.model_dump()) +@router.delete("/v1/responses/{response_id}") +@load_aware_call +async def delete_responses(response_id: str, raw_request: Request): + handler = responses(raw_request) + if handler is None: + base_server = raw_request.app.state.openai_serving_tokenization + return base_server.create_error_response( + message="The model does not support Responses API" + ) + + try: + response = await handler.delete_response(response_id) + except Exception as e: + return handler.create_error_response(e) + + if isinstance(response, ErrorResponse): + return JSONResponse( + content=response.model_dump(), status_code=response.error.code + ) + return JSONResponse(content=response.model_dump()) + + def attach_router(app: FastAPI): app.include_router(router) diff --git a/vllm/entrypoints/openai/responses/serving.py b/vllm/entrypoints/openai/responses/serving.py index ea422a2b73a2..7f06a6f3ceab 100644 --- a/vllm/entrypoints/openai/responses/serving.py +++ b/vllm/entrypoints/openai/responses/serving.py @@ -5,7 +5,7 @@ import json import time import uuid -from collections import deque +from collections import OrderedDict, deque from collections.abc import AsyncGenerator, AsyncIterator, Callable, Sequence from contextlib import AsyncExitStack from copy import copy @@ -241,11 +241,12 @@ def __init__( # behavior in OpenAI's Responses API is to store the response, but # vLLM's default behavior is not. self.enable_store = envs.VLLM_ENABLE_RESPONSES_API_STORE + self.store_max_size = envs.VLLM_RESPONSES_API_STORE_MAX_SIZE if self.enable_store: - logger.warning_once( - "`VLLM_ENABLE_RESPONSES_API_STORE` is enabled. This may " - "cause a memory leak since we never remove responses from " - "the store." + logger.info( + "Responses API store is enabled (max_size=%d). " + "Oldest entries will be evicted when the limit is reached.", + self.store_max_size, ) self.use_harmony = self.model_config.hf_config.model_type == "gpt_oss" @@ -273,23 +274,18 @@ def __init__( self.tool_call_id_type = "random" self.enable_auto_tools = enable_auto_tools - # HACK(woosuk): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove responses from the store. - self.response_store: dict[str, ResponsesResponse] = {} + # In-memory stores with LRU eviction to bound memory usage. + # Oldest entries are evicted when store_max_size is exceeded. + self.response_store: OrderedDict[str, ResponsesResponse] = OrderedDict() self.response_store_lock = asyncio.Lock() - # HACK(woosuk): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove messages from the store. - self.msg_store: dict[str, list[ChatCompletionMessageParam]] = {} + self.msg_store: OrderedDict[str, list[ChatCompletionMessageParam]] = ( + OrderedDict() + ) - # HACK(wuhang): This is a hack. We should use a better store. - # FIXME: If enable_store=True, this may cause a memory leak since we - # never remove events from the store. - self.event_store: dict[ + self.event_store: OrderedDict[ str, tuple[deque[StreamingResponsesResponse], asyncio.Event] - ] = {} + ] = OrderedDict() self.background_tasks: dict[str, asyncio.Task] = {} @@ -539,6 +535,7 @@ async def create_responses( ) async with self.response_store_lock: self.response_store[response.id] = response + self._evict_oldest_store_entries() # Run the request in the background. if request.stream: @@ -1226,6 +1223,9 @@ async def retrieve_responses( ): async with self.response_store_lock: response = self.response_store.get(response_id) + if response is not None: + # Move to end so recently accessed entries are kept. + self.response_store.move_to_end(response_id) if response is None: return self._make_not_found_error(response_id) @@ -1287,6 +1287,40 @@ def _make_store_not_supported_error(self) -> ErrorResponse: param="store", ) + def _evict_oldest_store_entries(self) -> None: + """Evict oldest entries from all stores when capacity is exceeded. + + Must be called while holding response_store_lock. + """ + while len(self.response_store) > self.store_max_size: + evicted_id, _ = self.response_store.popitem(last=False) + self.msg_store.pop(evicted_id, None) + self.event_store.pop(evicted_id, None) + + # Cancel background task if it exists + if task := self.background_tasks.pop(evicted_id, None): + task.cancel() + + logger.debug("Evicted response %s from store (LRU)", evicted_id) + + async def delete_response( + self, + response_id: str, + ) -> ErrorResponse | ResponsesResponse: + async with self.response_store_lock: + response = self.response_store.pop(response_id, None) + if response is None: + return self._make_not_found_error(response_id) + self.msg_store.pop(response_id, None) + self.event_store.pop(response_id, None) + + # Cancel any background task for this response. + if task := self.background_tasks.pop(response_id, None): + task.cancel() + + response.status = "cancelled" + return response + async def _process_simple_streaming_events( self, request: ResponsesRequest, diff --git a/vllm/envs.py b/vllm/envs.py index b32683ecb994..3875d79b2306 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -198,6 +198,7 @@ VLLM_LOOPBACK_IP: str = "" VLLM_ALLOW_CHUNKED_LOCAL_ATTN_WITH_HYBRID_KV_CACHE: bool = True VLLM_ENABLE_RESPONSES_API_STORE: bool = False + VLLM_RESPONSES_API_STORE_MAX_SIZE: int = 10000 VLLM_NVFP4_GEMM_BACKEND: str | None = None VLLM_HAS_FLASHINFER_CUBIN: bool = False VLLM_USE_FLASHINFER_MOE_MXFP4_MXFP8: bool = False @@ -1426,11 +1427,17 @@ def _get_or_set_default() -> str: # NOTE/WARNING: # 1. Messages are kept in memory only (not persisted to disk) and will be # lost when the vLLM server shuts down. - # 2. Enabling this option will cause a memory leak, as stored messages are - # never removed from memory until the server terminates. + # 2. The store uses LRU eviction to limit memory usage. The maximum number + # of stored responses is controlled by VLLM_RESPONSES_API_STORE_MAX_SIZE. "VLLM_ENABLE_RESPONSES_API_STORE": lambda: bool( int(os.getenv("VLLM_ENABLE_RESPONSES_API_STORE", "0")) ), + # Maximum number of responses to keep in the in-memory store for the + # Responses API. When the limit is reached, the oldest entries are evicted. + # Only effective when VLLM_ENABLE_RESPONSES_API_STORE is enabled. + "VLLM_RESPONSES_API_STORE_MAX_SIZE": lambda: int( + os.getenv("VLLM_RESPONSES_API_STORE_MAX_SIZE", "10000") + ), # If set, use the fp8 mfma in rocm paged attention. "VLLM_ROCM_FP8_MFMA_PAGE_ATTN": lambda: bool( int(os.getenv("VLLM_ROCM_FP8_MFMA_PAGE_ATTN", "0"))