Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions litellm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@
if litellm_mode == "DEV":
dotenv.load_dotenv()

# Import default_encoding to ensure environment variables are initialized at import time
from litellm.litellm_core_utils import default_encoding # noqa: F401

####################################################
if set_verbose:
Expand Down
31 changes: 20 additions & 11 deletions litellm/proxy/guardrails/guardrail_hooks/presidio.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def __init__(

self._main_thread_id = threading.get_ident()

# Loop-bound session cache for background threads
self._loop_sessions: Dict[asyncio.AbstractEventLoop, aiohttp.ClientSession] = {}

if mock_testing is True: # for testing purposes only
return

Expand Down Expand Up @@ -189,9 +192,9 @@ async def _get_session_iterator(
Logic:
1. If running in the main thread (where the object was initialized/destined to live normally),
use the shared `self._http_session` (protected by a lock).
2. If running in a background thread (e.g. logging hook), yield a NEW ephemeral session
and ensure it is closed after use.
2. If running in a background thread (e.g. logging hook), use a cached session for that loop.
"""
current_loop = asyncio.get_running_loop()

# Check if we are in the stored main thread
if threading.get_ident() == self._main_thread_id:
Expand All @@ -201,22 +204,27 @@ async def _get_session_iterator(
self._http_session = aiohttp.ClientSession()
yield self._http_session
else:
# Background thread -> create ephemeral session
# Background thread/loop -> use loop-bound session cache
# This avoids "attached to a different loop" or "no running event loop" errors
# when accessing the shared session created in the main loop
session = aiohttp.ClientSession()
try:
yield session
finally:
if not session.closed:
await session.close()
if (
current_loop not in self._loop_sessions
or self._loop_sessions[current_loop].closed
):
self._loop_sessions[current_loop] = aiohttp.ClientSession()
yield self._loop_sessions[current_loop]

async def _close_http_session(self) -> None:
"""Close the shared HTTP session if it exists."""
"""Close all cached HTTP sessions."""
if self._http_session is not None and not self._http_session.closed:
await self._http_session.close()
self._http_session = None

for session in self._loop_sessions.values():
if not session.closed:
await session.close()
self._loop_sessions.clear()

def __del__(self):
"""Cleanup: we try to close, but doing async cleanup in __del__ is risky."""
pass
Expand Down Expand Up @@ -501,12 +509,13 @@ async def check_pii(
)

# Then anonymize the text using the analysis results
return await self.anonymize_text(
anonymized_text = await self.anonymize_text(
text=text,
analyze_results=analyze_results,
output_parse_pii=output_parse_pii,
masked_entity_count=masked_entity_count,
)
return anonymized_text
return redacted_text["text"]
except Exception as e:
status = "guardrail_failed_to_respond"
Expand Down
73 changes: 73 additions & 0 deletions tests/test_presidio_latency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

import asyncio
import aiohttp
import pytest
from unittest.mock import MagicMock, patch
from litellm.proxy.guardrails.guardrail_hooks.presidio import _OPTIONAL_PresidioPIIMasking

@pytest.mark.asyncio
async def test_sanity_presidio_session_reuse_main_thread():
"""
SANITY CHECK:
Verify that Presidio guardrail reuses sessions in the main thread.
This ensures we don't break existing session pooling functionality.
"""
presidio = _OPTIONAL_PresidioPIIMasking(
mock_testing=True,
presidio_analyzer_api_base="http://mock-analyzer",
presidio_anonymizer_api_base="http://mock-anonymizer"
)

session_creations = 0
original_init = aiohttp.ClientSession.__init__

def mocked_init(self, *args, **kwargs):
nonlocal session_creations
session_creations += 1
original_init(self, *args, **kwargs)

with patch.object(aiohttp.ClientSession, "__init__", side_effect=mocked_init, autospec=True):
for _ in range(10):
async with presidio._get_session_iterator() as session:
pass

# Expected: Only 1 session created for all 10 calls.
assert session_creations == 1

await presidio._close_http_session()

@pytest.mark.asyncio
async def test_bug_presidio_session_explosion_background_thread_causes_latency():
"""
BUG REPRODUCTION:
Verify that background threads (like logging hooks) REUSE sessions.
Previously, each call in a background loop created a NEW ephemeral session,
leading to socket exhaustion and the reported 97s latency spike.
"""
import threading
presidio = _OPTIONAL_PresidioPIIMasking(
mock_testing=True,
presidio_analyzer_api_base="http://mock-analyzer",
presidio_anonymizer_api_base="http://mock-anonymizer"
)

# Force the code to think it's in a background thread
presidio._main_thread_id = threading.get_ident() + 1

session_creations = 0
original_init = aiohttp.ClientSession.__init__

def mocked_init(self, *args, **kwargs):
nonlocal session_creations
session_creations += 1
original_init(self, *args, **kwargs)

with patch.object(aiohttp.ClientSession, "__init__", side_effect=mocked_init, autospec=True):
for _ in range(10):
async with presidio._get_session_iterator() as session:
pass

# FIX VERIFICATION: Should now be 1 session (reused) instead of 10.
assert session_creations == 1

await presidio._close_http_session()
Loading