Skip to content
Merged
35 changes: 35 additions & 0 deletions litellm/llms/custom_httpx/aiohttp_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,41 @@ async def close(self):
# Ignore errors during transport cleanup
pass

def __del__(self):
"""
Cleanup: close aiohttp session on instance destruction.

Provides defense-in-depth for issue #12443 - ensures cleanup happens
even if atexit handler doesn't run (abnormal termination).
"""
if (
self.client_session is not None
and not self.client_session.closed
and self._owns_session
):
try:
import asyncio

try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Event loop is running - schedule cleanup task
asyncio.create_task(self.close())
else:
# Event loop exists but not running - run cleanup
loop.run_until_complete(self.close())
except RuntimeError:
# No event loop available - create one for cleanup
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.close())
finally:
loop.close()
except Exception:
# Silently ignore errors during __del__ to avoid issues
pass

async def _make_common_async_call(
self,
async_client_session: Optional[ClientSession],
Expand Down
46 changes: 30 additions & 16 deletions litellm/llms/custom_httpx/async_client_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ async def close_litellm_async_clients():
Close all cached async HTTP clients to prevent resource leaks.

This function iterates through all cached clients in litellm's in-memory cache
and closes any aiohttp client sessions that are still open.
and closes any aiohttp client sessions that are still open. Also closes the
global base_llm_aiohttp_handler instance (issue #12443).
"""
# Import here to avoid circular import
import litellm
Expand All @@ -25,7 +26,7 @@ async def close_litellm_async_clients():
except Exception:
# Silently ignore errors during cleanup
pass

# Handle AsyncHTTPHandler instances (used by Gemini and other providers)
elif hasattr(handler, 'client'):
client = handler.client
Expand All @@ -43,7 +44,7 @@ async def close_litellm_async_clients():
except Exception:
# Silently ignore errors during cleanup
pass

# Handle any other objects with aclose method
elif hasattr(handler, 'aclose'):
try:
Expand All @@ -52,6 +53,17 @@ async def close_litellm_async_clients():
# Silently ignore errors during cleanup
pass

# Close the global base_llm_aiohttp_handler instance (issue #12443)
# This is used by Gemini and other providers that use aiohttp
if hasattr(litellm, 'base_llm_aiohttp_handler'):
base_handler = getattr(litellm, 'base_llm_aiohttp_handler', None)
if isinstance(base_handler, BaseLLMAIOHTTPHandler) and hasattr(base_handler, 'close'):
try:
await base_handler.close()
except Exception:
# Silently ignore errors during cleanup
pass


def register_async_client_cleanup():
"""
Expand All @@ -62,22 +74,24 @@ def register_async_client_cleanup():
import atexit

def cleanup_wrapper():
"""
Cleanup wrapper that creates a fresh event loop for atexit cleanup.

At exit time, the main event loop is often already closed. Creating a new
event loop ensures cleanup runs successfully (fixes issue #12443).
"""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Schedule the cleanup coroutine
loop.create_task(close_litellm_async_clients())
else:
# Run the cleanup coroutine
loop.run_until_complete(close_litellm_async_clients())
except Exception:
# If we can't get an event loop or it's already closed, try creating a new one
# Always create a fresh event loop at exit time
# Don't use get_event_loop() - it may be closed or unavailable
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop = asyncio.new_event_loop()
loop.run_until_complete(close_litellm_async_clients())
finally:
# Clean up the loop we created
loop.close()
except Exception:
# Silently ignore errors during cleanup
pass
except Exception:
# Silently ignore errors during cleanup to avoid exit handler failures
pass

atexit.register(cleanup_wrapper)
76 changes: 57 additions & 19 deletions litellm/llms/openai/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
from aiohttp import ClientSession

import litellm
from litellm._logging import verbose_logger
from litellm.llms.base_llm.chat.transformation import BaseLLMException
from litellm.llms.custom_httpx.http_handler import (
_DEFAULT_TTL_FOR_HTTPX_CLIENTS,
AsyncHTTPHandler,
get_ssl_configuration,
)
from litellm.types.utils import LlmProviders


class OpenAIError(BaseLLMException):
Expand Down Expand Up @@ -203,30 +205,66 @@ def _get_async_http_client(
if litellm.aclient_session is not None:
return litellm.aclient_session

# Get unified SSL configuration
ssl_config = get_ssl_configuration()
# Use the global cached client system to prevent memory leaks (issue #14540)
# This routes through get_async_httpx_client() which provides TTL-based caching
from litellm.llms.custom_httpx.http_handler import get_async_httpx_client

return httpx.AsyncClient(
verify=ssl_config,
transport=AsyncHTTPHandler._create_async_transport(
ssl_context=ssl_config
if isinstance(ssl_config, ssl.SSLContext)
else None,
ssl_verify=ssl_config if isinstance(ssl_config, bool) else None,
try:
# Get SSL config and include in params for proper cache key
ssl_config = get_ssl_configuration()
params = {"ssl_verify": ssl_config} if ssl_config is not None else None

# Get a cached AsyncHTTPHandler which manages the httpx.AsyncClient
cached_handler = get_async_httpx_client(
llm_provider=LlmProviders.OPENAI, # Cache key includes provider
params=params, # Include SSL config in cache key
shared_session=shared_session,
),
follow_redirects=True,
)
)
# Return the underlying httpx client from the handler
return cached_handler.client
except (ImportError, AttributeError, KeyError) as e:
# Fallback to creating a client directly if caching system unavailable
# This preserves backwards compatibility
verbose_logger.debug(
f"Client caching unavailable ({type(e).__name__}), using direct client creation"
)
ssl_config = get_ssl_configuration()
return httpx.AsyncClient(
verify=ssl_config,
transport=AsyncHTTPHandler._create_async_transport(
ssl_context=ssl_config
if isinstance(ssl_config, ssl.SSLContext)
else None,
ssl_verify=ssl_config if isinstance(ssl_config, bool) else None,
shared_session=shared_session,
),
follow_redirects=True,
)

@staticmethod
def _get_sync_http_client() -> Optional[httpx.Client]:
if litellm.client_session is not None:
return litellm.client_session

# Get unified SSL configuration
ssl_config = get_ssl_configuration()

return httpx.Client(
verify=ssl_config,
follow_redirects=True,
)
# Use the global cached client system to prevent memory leaks (issue #14540)
from litellm.llms.custom_httpx.http_handler import _get_httpx_client

try:
# Get SSL config and include in params for proper cache key
ssl_config = get_ssl_configuration()
params = {"ssl_verify": ssl_config} if ssl_config is not None else None

# Get a cached HTTPHandler which manages the httpx.Client
cached_handler = _get_httpx_client(params=params)
# Return the underlying httpx client from the handler
return cached_handler.client
except (ImportError, AttributeError, KeyError) as e:
# Fallback to creating a client directly if caching system unavailable
verbose_logger.debug(
f"Client caching unavailable ({type(e).__name__}), using direct client creation"
)
ssl_config = get_ssl_configuration()
return httpx.Client(
verify=ssl_config,
follow_redirects=True,
)
Loading
Loading