Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,8 @@ def swagger_monkey_patch(*args, **kwargs):
redis_usage_cache: Optional[RedisCache] = (
None # redis cache used for tracking spend, tpm/rpm limits
)
polling_via_cache_enabled: Union[Literal["all"], List[str], bool] = False
polling_cache_ttl: int = 3600 # Default 1 hour TTL for polling cache
user_custom_auth = None
user_custom_key_generate = None
user_custom_sso = None
Expand Down Expand Up @@ -2317,6 +2319,15 @@ async def load_config( # noqa: PLR0915
# this is set in the cache branch
# see usage here: https://docs.litellm.ai/docs/proxy/caching
pass
elif key == "responses":
# Initialize global polling via cache settings
global polling_via_cache_enabled, polling_cache_ttl
background_mode = value.get("background_mode", {})
polling_via_cache_enabled = background_mode.get("polling_via_cache", False)
polling_cache_ttl = background_mode.get("ttl", 3600)
verbose_proxy_logger.debug(
f"{blue_color_code} Initialized polling via cache: enabled={polling_via_cache_enabled}, ttl={polling_cache_ttl}{reset_color_code}"
)
elif key == "default_team_settings":
for idx, team_setting in enumerate(
value
Expand Down
223 changes: 219 additions & 4 deletions litellm/proxy/response_api_endpoints/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from fastapi import APIRouter, Depends, Request, Response
import asyncio

from fastapi import APIRouter, Depends, HTTPException, Request, Response

from litellm._logging import verbose_proxy_logger
from litellm.proxy._types import *
from litellm.proxy.auth.user_api_key_auth import UserAPIKeyAuth, user_api_key_auth
from litellm.proxy.common_request_processing import ProxyBaseLLMRequestProcessing
from litellm.types.responses.main import DeleteResponseResult

router = APIRouter()

Expand Down Expand Up @@ -30,22 +34,40 @@ async def responses_api(
"""
Follows the OpenAI Responses API spec: https://platform.openai.com/docs/api-reference/responses

Supports background mode with polling_via_cache for partial response retrieval.
When background=true and polling_via_cache is enabled, returns a polling_id immediately
and streams the response in the background, updating Redis cache.

```bash
# Normal request
curl -X POST http://localhost:4000/v1/responses \
-H "Content-Type: application/json" \
-H "Authorization: Bearer sk-1234" \
-d '{
"model": "gpt-4o",
"input": "Tell me about AI"
}'

# Background request with polling
curl -X POST http://localhost:4000/v1/responses \
-H "Content-Type: application/json" \
-H "Authorization: Bearer sk-1234" \
-d '{
"model": "gpt-4o",
"input": "Tell me about AI",
"background": true
}'
```
"""
from litellm.proxy.proxy_server import (
_read_request_body,
general_settings,
llm_router,
polling_cache_ttl,
polling_via_cache_enabled,
proxy_config,
proxy_logging_obj,
redis_usage_cache,
select_data_generator,
user_api_base,
user_max_tokens,
Expand All @@ -56,6 +78,74 @@ async def responses_api(
)

data = await _read_request_body(request=request)

# Check if polling via cache should be used for this request
from litellm.proxy.response_polling.polling_handler import should_use_polling_for_request

should_use_polling = should_use_polling_for_request(
background_mode=data.get("background", False),
polling_via_cache_enabled=polling_via_cache_enabled,
redis_cache=redis_usage_cache,
model=data.get("model", ""),
llm_router=llm_router,
)

# If polling is enabled, use polling mode
if should_use_polling:
from litellm.proxy.response_polling.polling_handler import (
ResponsePollingHandler,
)
from litellm.proxy.response_polling.background_streaming import (
background_streaming_task,
)

verbose_proxy_logger.info(
f"Starting background response with polling for model={data.get('model')}"
)

# Initialize polling handler with configured TTL (from global config)
polling_handler = ResponsePollingHandler(
redis_cache=redis_usage_cache,
ttl=polling_cache_ttl # Global var set at startup
)

# Generate polling ID
polling_id = ResponsePollingHandler.generate_polling_id()

# Create initial state in Redis
initial_state = await polling_handler.create_initial_state(
polling_id=polling_id,
request_data=data,
)

# Start background task to stream and update cache
asyncio.create_task(
background_streaming_task(
polling_id=polling_id,
data=data.copy(),
polling_handler=polling_handler,
request=request,
fastapi_response=fastapi_response,
user_api_key_dict=user_api_key_dict,
general_settings=general_settings,
llm_router=llm_router,
proxy_config=proxy_config,
proxy_logging_obj=proxy_logging_obj,
select_data_generator=select_data_generator,
user_model=user_model,
user_temperature=user_temperature,
user_request_timeout=user_request_timeout,
user_max_tokens=user_max_tokens,
user_api_base=user_api_base,
version=version,
)
)

# Return OpenAI Response object format (initial state)
# https://platform.openai.com/docs/api-reference/responses/object
return initial_state

# Normal response flow
processor = ProxyBaseLLMRequestProcessing(data=data)
try:
return await processor.base_process_llm_request(
Expand Down Expand Up @@ -109,9 +199,18 @@ async def get_response(
"""
Get a response by ID.

Supports both:
- Polling IDs (litellm_poll_*): Returns cumulative cached content from background responses
- Provider response IDs: Passes through to provider API

Follows the OpenAI Responses API spec: https://platform.openai.com/docs/api-reference/responses/get

```bash
# Get polling response
curl -X GET http://localhost:4000/v1/responses/litellm_poll_abc123 \
-H "Authorization: Bearer sk-1234"

# Get provider response
curl -X GET http://localhost:4000/v1/responses/resp_abc123 \
-H "Authorization: Bearer sk-1234"
```
Expand All @@ -122,6 +221,7 @@ async def get_response(
llm_router,
proxy_config,
proxy_logging_obj,
redis_usage_cache,
select_data_generator,
user_api_base,
user_max_tokens,
Expand All @@ -130,7 +230,33 @@ async def get_response(
user_temperature,
version,
)

from litellm.proxy.response_polling.polling_handler import ResponsePollingHandler

# Check if this is a polling ID
if ResponsePollingHandler.is_polling_id(response_id):
# Handle polling response
if not redis_usage_cache:
raise HTTPException(
status_code=500,
detail="Redis cache not configured. Polling requires Redis."
)

polling_handler = ResponsePollingHandler(redis_cache=redis_usage_cache)

# Get current state from cache
state = await polling_handler.get_state(response_id)

if not state:
raise HTTPException(
status_code=404,
detail=f"Polling response {response_id} not found or expired"
)

# Return the whole state directly (OpenAI Response object format)
# https://platform.openai.com/docs/api-reference/responses/object
return state

# Normal provider response flow
data = await _read_request_body(request=request)
data["response_id"] = response_id
processor = ProxyBaseLLMRequestProcessing(data=data)
Expand Down Expand Up @@ -186,6 +312,10 @@ async def delete_response(
"""
Delete a response by ID.

Supports both:
- Polling IDs (litellm_poll_*): Deletes from Redis cache
- Provider response IDs: Passes through to provider API

Follows the OpenAI Responses API spec: https://platform.openai.com/docs/api-reference/responses/delete

```bash
Expand All @@ -199,6 +329,7 @@ async def delete_response(
llm_router,
proxy_config,
proxy_logging_obj,
redis_usage_cache,
select_data_generator,
user_api_base,
user_max_tokens,
Expand All @@ -207,7 +338,44 @@ async def delete_response(
user_temperature,
version,
)

from litellm.proxy.response_polling.polling_handler import ResponsePollingHandler

# Check if this is a polling ID
if ResponsePollingHandler.is_polling_id(response_id):
# Handle polling response deletion
if not redis_usage_cache:
raise HTTPException(
status_code=500,
detail="Redis cache not configured."
)

polling_handler = ResponsePollingHandler(redis_cache=redis_usage_cache)

# Get state to verify access
state = await polling_handler.get_state(response_id)

if not state:
raise HTTPException(
status_code=404,
detail=f"Polling response {response_id} not found"
)

# Delete from cache
success = await polling_handler.delete_polling(response_id)

if success:
return DeleteResponseResult(
id=response_id,
object="response",
deleted=True
)
else:
raise HTTPException(
status_code=500,
detail="Failed to delete polling response"
)

# Normal provider response flow
data = await _read_request_body(request=request)
data["response_id"] = response_id
processor = ProxyBaseLLMRequestProcessing(data=data)
Expand Down Expand Up @@ -331,9 +499,18 @@ async def cancel_response(
"""
Cancel a response by ID.

Supports both:
- Polling IDs (litellm_poll_*): Cancels background response and updates status in Redis
- Provider response IDs: Passes through to provider API

Follows the OpenAI Responses API spec: https://platform.openai.com/docs/api-reference/responses/cancel

```bash
# Cancel polling response
curl -X POST http://localhost:4000/v1/responses/litellm_poll_abc123/cancel \
-H "Authorization: Bearer sk-1234"

# Cancel provider response
curl -X POST http://localhost:4000/v1/responses/resp_abc123/cancel \
-H "Authorization: Bearer sk-1234"
```
Expand All @@ -344,6 +521,7 @@ async def cancel_response(
llm_router,
proxy_config,
proxy_logging_obj,
redis_usage_cache,
select_data_generator,
user_api_base,
user_max_tokens,
Expand All @@ -352,7 +530,44 @@ async def cancel_response(
user_temperature,
version,
)

from litellm.proxy.response_polling.polling_handler import ResponsePollingHandler

# Check if this is a polling ID
if ResponsePollingHandler.is_polling_id(response_id):
# Handle polling response cancellation
if not redis_usage_cache:
raise HTTPException(
status_code=500,
detail="Redis cache not configured."
)

polling_handler = ResponsePollingHandler(redis_cache=redis_usage_cache)

# Get current state to verify it exists
state = await polling_handler.get_state(response_id)

if not state:
raise HTTPException(
status_code=404,
detail=f"Polling response {response_id} not found"
)

# Cancel the polling response (sets status to "cancelled")
success = await polling_handler.cancel_polling(response_id)

if success:
# Fetch the updated state with cancelled status
updated_state = await polling_handler.get_state(response_id)

# Return the whole state directly (now with status="cancelled")
return updated_state
else:
raise HTTPException(
status_code=500,
detail="Failed to cancel polling response"
)

# Normal provider response flow
data = await _read_request_body(request=request)
data["response_id"] = response_id
processor = ProxyBaseLLMRequestProcessing(data=data)
Expand Down
16 changes: 16 additions & 0 deletions litellm/proxy/response_polling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Response Polling Module for Background Responses with Cache
"""
from litellm.proxy.response_polling.background_streaming import (
background_streaming_task,
)
from litellm.proxy.response_polling.polling_handler import (
ResponsePollingHandler,
should_use_polling_for_request,
)

__all__ = [
"ResponsePollingHandler",
"background_streaming_task",
"should_use_polling_for_request",
]
Loading
Loading