diff --git a/litellm/__init__.py b/litellm/__init__.py index 46124bf8363..f5db57f76fd 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -81,8 +81,6 @@ if litellm_mode == "DEV": dotenv.load_dotenv() -# Import default_encoding to ensure environment variables are initialized at import time -from litellm.litellm_core_utils import default_encoding # noqa: F401 #################################################### if set_verbose: diff --git a/litellm/proxy/guardrails/guardrail_hooks/presidio.py b/litellm/proxy/guardrails/guardrail_hooks/presidio.py index fbe6d753679..71ad9819146 100644 --- a/litellm/proxy/guardrails/guardrail_hooks/presidio.py +++ b/litellm/proxy/guardrails/guardrail_hooks/presidio.py @@ -114,6 +114,9 @@ def __init__( self._main_thread_id = threading.get_ident() + # Loop-bound session cache for background threads + self._loop_sessions: Dict[asyncio.AbstractEventLoop, aiohttp.ClientSession] = {} + if mock_testing is True: # for testing purposes only return @@ -189,9 +192,9 @@ async def _get_session_iterator( Logic: 1. If running in the main thread (where the object was initialized/destined to live normally), use the shared `self._http_session` (protected by a lock). - 2. If running in a background thread (e.g. logging hook), yield a NEW ephemeral session - and ensure it is closed after use. + 2. If running in a background thread (e.g. logging hook), use a cached session for that loop. """ + current_loop = asyncio.get_running_loop() # Check if we are in the stored main thread if threading.get_ident() == self._main_thread_id: @@ -201,22 +204,27 @@ async def _get_session_iterator( self._http_session = aiohttp.ClientSession() yield self._http_session else: - # Background thread -> create ephemeral session + # Background thread/loop -> use loop-bound session cache # This avoids "attached to a different loop" or "no running event loop" errors # when accessing the shared session created in the main loop - session = aiohttp.ClientSession() - try: - yield session - finally: - if not session.closed: - await session.close() + if ( + current_loop not in self._loop_sessions + or self._loop_sessions[current_loop].closed + ): + self._loop_sessions[current_loop] = aiohttp.ClientSession() + yield self._loop_sessions[current_loop] async def _close_http_session(self) -> None: - """Close the shared HTTP session if it exists.""" + """Close all cached HTTP sessions.""" if self._http_session is not None and not self._http_session.closed: await self._http_session.close() self._http_session = None + for session in self._loop_sessions.values(): + if not session.closed: + await session.close() + self._loop_sessions.clear() + def __del__(self): """Cleanup: we try to close, but doing async cleanup in __del__ is risky.""" pass @@ -501,12 +509,13 @@ async def check_pii( ) # Then anonymize the text using the analysis results - return await self.anonymize_text( + anonymized_text = await self.anonymize_text( text=text, analyze_results=analyze_results, output_parse_pii=output_parse_pii, masked_entity_count=masked_entity_count, ) + return anonymized_text return redacted_text["text"] except Exception as e: status = "guardrail_failed_to_respond" diff --git a/tests/test_presidio_latency.py b/tests/test_presidio_latency.py new file mode 100644 index 00000000000..d434e6222eb --- /dev/null +++ b/tests/test_presidio_latency.py @@ -0,0 +1,73 @@ + +import asyncio +import aiohttp +import pytest +from unittest.mock import MagicMock, patch +from litellm.proxy.guardrails.guardrail_hooks.presidio import _OPTIONAL_PresidioPIIMasking + +@pytest.mark.asyncio +async def test_sanity_presidio_session_reuse_main_thread(): + """ + SANITY CHECK: + Verify that Presidio guardrail reuses sessions in the main thread. + This ensures we don't break existing session pooling functionality. + """ + presidio = _OPTIONAL_PresidioPIIMasking( + mock_testing=True, + presidio_analyzer_api_base="http://mock-analyzer", + presidio_anonymizer_api_base="http://mock-anonymizer" + ) + + session_creations = 0 + original_init = aiohttp.ClientSession.__init__ + + def mocked_init(self, *args, **kwargs): + nonlocal session_creations + session_creations += 1 + original_init(self, *args, **kwargs) + + with patch.object(aiohttp.ClientSession, "__init__", side_effect=mocked_init, autospec=True): + for _ in range(10): + async with presidio._get_session_iterator() as session: + pass + + # Expected: Only 1 session created for all 10 calls. + assert session_creations == 1 + + await presidio._close_http_session() + +@pytest.mark.asyncio +async def test_bug_presidio_session_explosion_background_thread_causes_latency(): + """ + BUG REPRODUCTION: + Verify that background threads (like logging hooks) REUSE sessions. + Previously, each call in a background loop created a NEW ephemeral session, + leading to socket exhaustion and the reported 97s latency spike. + """ + import threading + presidio = _OPTIONAL_PresidioPIIMasking( + mock_testing=True, + presidio_analyzer_api_base="http://mock-analyzer", + presidio_anonymizer_api_base="http://mock-anonymizer" + ) + + # Force the code to think it's in a background thread + presidio._main_thread_id = threading.get_ident() + 1 + + session_creations = 0 + original_init = aiohttp.ClientSession.__init__ + + def mocked_init(self, *args, **kwargs): + nonlocal session_creations + session_creations += 1 + original_init(self, *args, **kwargs) + + with patch.object(aiohttp.ClientSession, "__init__", side_effect=mocked_init, autospec=True): + for _ in range(10): + async with presidio._get_session_iterator() as session: + pass + + # FIX VERIFICATION: Should now be 1 session (reused) instead of 10. + assert session_creations == 1 + + await presidio._close_http_session() \ No newline at end of file