Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion litellm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,13 @@
REDIS_DAILY_AGENT_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_agent_spend_update_buffer"
REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_tag_spend_update_buffer"
MAX_REDIS_BUFFER_DEQUEUE_COUNT = int(os.getenv("MAX_REDIS_BUFFER_DEQUEUE_COUNT", 100))
MAX_SIZE_IN_MEMORY_QUEUE = int(os.getenv("MAX_SIZE_IN_MEMORY_QUEUE", 2000))
# Bounds asyncio.Queue() instances (log queues, spend update queues, etc.) to prevent unbounded memory growth
LITELLM_ASYNCIO_QUEUE_MAXSIZE = int(os.getenv("LITELLM_ASYNCIO_QUEUE_MAXSIZE", 1000))
# Aggregation threshold: default to 80% of the asyncio queue maxsize so the check can always trigger.
# Must be < LITELLM_ASYNCIO_QUEUE_MAXSIZE; if set higher the aggregation logic will never fire.
MAX_SIZE_IN_MEMORY_QUEUE = int(
os.getenv("MAX_SIZE_IN_MEMORY_QUEUE", int(LITELLM_ASYNCIO_QUEUE_MAXSIZE * 0.8))
)
MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = int(
os.getenv("MAX_IN_MEMORY_QUEUE_FLUSH_COUNT", 1000)
)
Expand Down
9 changes: 9 additions & 0 deletions litellm/proxy/db/db_transaction_queue/base_update_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ class BaseUpdateQueue:
def __init__(self):
self.update_queue = asyncio.Queue(maxsize=LITELLM_ASYNCIO_QUEUE_MAXSIZE)
self.MAX_SIZE_IN_MEMORY_QUEUE = MAX_SIZE_IN_MEMORY_QUEUE
if MAX_SIZE_IN_MEMORY_QUEUE >= LITELLM_ASYNCIO_QUEUE_MAXSIZE:
verbose_proxy_logger.warning(
"Misconfigured queue thresholds: MAX_SIZE_IN_MEMORY_QUEUE (%d) >= LITELLM_ASYNCIO_QUEUE_MAXSIZE (%d). "
"The spend aggregation check will never trigger because the asyncio.Queue blocks at %d items. "
"Set MAX_SIZE_IN_MEMORY_QUEUE to a value less than LITELLM_ASYNCIO_QUEUE_MAXSIZE (recommended: 80%% of it).",
MAX_SIZE_IN_MEMORY_QUEUE,
LITELLM_ASYNCIO_QUEUE_MAXSIZE,
LITELLM_ASYNCIO_QUEUE_MAXSIZE,
)

async def add_update(self, update):
"""Enqueue an update."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import sys
from unittest.mock import patch

import pytest
from fastapi.testclient import TestClient
Expand Down Expand Up @@ -42,3 +43,20 @@ async def test_queue_flush_limit():
assert (
queue.update_queue.qsize() == 100
), "Expected 100 items to remain in the queue"


def test_misconfigured_queue_thresholds_warns():
"""
Test that a warning is logged when MAX_SIZE_IN_MEMORY_QUEUE >= LITELLM_ASYNCIO_QUEUE_MAXSIZE.

This misconfiguration causes the spend aggregation check in SpendUpdateQueue.add_update()
to never trigger because asyncio.Queue blocks before qsize() can reach the threshold.
"""
import litellm.proxy.db.db_transaction_queue.base_update_queue as bq_module

with patch.object(bq_module, "MAX_SIZE_IN_MEMORY_QUEUE", 2000), patch.object(
bq_module, "LITELLM_ASYNCIO_QUEUE_MAXSIZE", 1000
), patch.object(bq_module.verbose_proxy_logger, "warning") as mock_warning:
BaseUpdateQueue()
mock_warning.assert_called_once()
assert "Misconfigured queue thresholds" in mock_warning.call_args[0][0]
Loading