diff --git a/litellm/constants.py b/litellm/constants.py index 89992b459c2d..ee79f2fa56fd 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -242,9 +242,13 @@ REDIS_DAILY_AGENT_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_agent_spend_update_buffer" REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_tag_spend_update_buffer" MAX_REDIS_BUFFER_DEQUEUE_COUNT = int(os.getenv("MAX_REDIS_BUFFER_DEQUEUE_COUNT", 100)) -MAX_SIZE_IN_MEMORY_QUEUE = int(os.getenv("MAX_SIZE_IN_MEMORY_QUEUE", 2000)) # Bounds asyncio.Queue() instances (log queues, spend update queues, etc.) to prevent unbounded memory growth LITELLM_ASYNCIO_QUEUE_MAXSIZE = int(os.getenv("LITELLM_ASYNCIO_QUEUE_MAXSIZE", 1000)) +# Aggregation threshold: default to 80% of the asyncio queue maxsize so the check can always trigger. +# Must be < LITELLM_ASYNCIO_QUEUE_MAXSIZE; if set higher the aggregation logic will never fire. +MAX_SIZE_IN_MEMORY_QUEUE = int( + os.getenv("MAX_SIZE_IN_MEMORY_QUEUE", int(LITELLM_ASYNCIO_QUEUE_MAXSIZE * 0.8)) +) MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = int( os.getenv("MAX_IN_MEMORY_QUEUE_FLUSH_COUNT", 1000) ) diff --git a/litellm/proxy/db/db_transaction_queue/base_update_queue.py b/litellm/proxy/db/db_transaction_queue/base_update_queue.py index a5ec1c3eaf45..e37200c02e98 100644 --- a/litellm/proxy/db/db_transaction_queue/base_update_queue.py +++ b/litellm/proxy/db/db_transaction_queue/base_update_queue.py @@ -23,6 +23,15 @@ class BaseUpdateQueue: def __init__(self): self.update_queue = asyncio.Queue(maxsize=LITELLM_ASYNCIO_QUEUE_MAXSIZE) self.MAX_SIZE_IN_MEMORY_QUEUE = MAX_SIZE_IN_MEMORY_QUEUE + if MAX_SIZE_IN_MEMORY_QUEUE >= LITELLM_ASYNCIO_QUEUE_MAXSIZE: + verbose_proxy_logger.warning( + "Misconfigured queue thresholds: MAX_SIZE_IN_MEMORY_QUEUE (%d) >= LITELLM_ASYNCIO_QUEUE_MAXSIZE (%d). " + "The spend aggregation check will never trigger because the asyncio.Queue blocks at %d items. " + "Set MAX_SIZE_IN_MEMORY_QUEUE to a value less than LITELLM_ASYNCIO_QUEUE_MAXSIZE (recommended: 80%% of it).", + MAX_SIZE_IN_MEMORY_QUEUE, + LITELLM_ASYNCIO_QUEUE_MAXSIZE, + LITELLM_ASYNCIO_QUEUE_MAXSIZE, + ) async def add_update(self, update): """Enqueue an update.""" diff --git a/tests/test_litellm/proxy/db/db_transaction_queue/test_base_update_queue.py b/tests/test_litellm/proxy/db/db_transaction_queue/test_base_update_queue.py index 6ab5a4a46002..c3807b5f79a3 100644 --- a/tests/test_litellm/proxy/db/db_transaction_queue/test_base_update_queue.py +++ b/tests/test_litellm/proxy/db/db_transaction_queue/test_base_update_queue.py @@ -2,6 +2,7 @@ import json import os import sys +from unittest.mock import patch import pytest from fastapi.testclient import TestClient @@ -42,3 +43,20 @@ async def test_queue_flush_limit(): assert ( queue.update_queue.qsize() == 100 ), "Expected 100 items to remain in the queue" + + +def test_misconfigured_queue_thresholds_warns(): + """ + Test that a warning is logged when MAX_SIZE_IN_MEMORY_QUEUE >= LITELLM_ASYNCIO_QUEUE_MAXSIZE. + + This misconfiguration causes the spend aggregation check in SpendUpdateQueue.add_update() + to never trigger because asyncio.Queue blocks before qsize() can reach the threshold. + """ + import litellm.proxy.db.db_transaction_queue.base_update_queue as bq_module + + with patch.object(bq_module, "MAX_SIZE_IN_MEMORY_QUEUE", 2000), patch.object( + bq_module, "LITELLM_ASYNCIO_QUEUE_MAXSIZE", 1000 + ), patch.object(bq_module.verbose_proxy_logger, "warning") as mock_warning: + BaseUpdateQueue() + mock_warning.assert_called_once() + assert "Misconfigured queue thresholds" in mock_warning.call_args[0][0]