diff --git a/litellm/constants.py b/litellm/constants.py index 3d2cebf2224..4c38ecd74b5 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -193,9 +193,9 @@ # Aiohttp connection pooling - prevents memory leaks from unbounded connection growth # Set to 0 for unlimited (not recommended for production) -AIOHTTP_CONNECTOR_LIMIT = int(os.getenv("AIOHTTP_CONNECTOR_LIMIT", 300)) +AIOHTTP_CONNECTOR_LIMIT = int(os.getenv("AIOHTTP_CONNECTOR_LIMIT", 1000)) AIOHTTP_CONNECTOR_LIMIT_PER_HOST = int( - os.getenv("AIOHTTP_CONNECTOR_LIMIT_PER_HOST", 50) + os.getenv("AIOHTTP_CONNECTOR_LIMIT_PER_HOST", 500) ) AIOHTTP_KEEPALIVE_TIMEOUT = int(os.getenv("AIOHTTP_KEEPALIVE_TIMEOUT", 120)) AIOHTTP_TTL_DNS_CACHE = int(os.getenv("AIOHTTP_TTL_DNS_CACHE", 300)) diff --git a/litellm/litellm_core_utils/streaming_handler.py b/litellm/litellm_core_utils/streaming_handler.py index baf274f2c62..3b75a56fcc9 100644 --- a/litellm/litellm_core_utils/streaming_handler.py +++ b/litellm/litellm_core_utils/streaming_handler.py @@ -1968,22 +1968,24 @@ async def __anext__(self) -> "ModelResponseStream": # noqa: PLR0915 self.rules.post_call_rules( input=self.response_uptil_now, model=self.model ) - # Store a shallow copy so usage stripping below - # does not mutate the stored chunk. - self.chunks.append(processed_chunk.model_copy()) - # Add mcp_list_tools to first chunk if present if not self.sent_first_chunk: processed_chunk = self._add_mcp_list_tools_to_first_chunk(processed_chunk) self.sent_first_chunk = True - if ( + + _has_usage = ( hasattr(processed_chunk, "usage") and getattr(processed_chunk, "usage", None) is not None - ): + ) + + if _has_usage: + # Store a copy ONLY when usage stripping below will mutate + # the chunk. For non-usage chunks (vast majority), store + # directly to avoid expensive model_copy() per chunk. + self.chunks.append(processed_chunk.model_copy()) + # Strip usage from the outgoing chunk so it's not sent twice # (once in the chunk, once in _hidden_params). - # Create a new object without usage, matching sync behavior. - # The copy in self.chunks retains usage for calculate_total_usage(). obj_dict = processed_chunk.model_dump() if "usage" in obj_dict: del obj_dict["usage"] @@ -1995,6 +1997,9 @@ async def __anext__(self) -> "ModelResponseStream": # noqa: PLR0915 ) if is_empty: continue + else: + # No usage data — safe to store directly without copying + self.chunks.append(processed_chunk) # add usage as hidden param if self.sent_last_chunk is True and self.stream_options is None: diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 48025863641..bd5b5309e0f 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -5302,13 +5302,15 @@ async def async_data_generator( ): verbose_proxy_logger.debug("inside generator") try: - # Use a list to accumulate response segments to avoid O(n^2) string concatenation - str_so_far_parts: list[str] = [] error_message: Optional[str] = None requested_model_from_client = _get_client_requested_model_for_streaming( request_data=request_data ) model_mismatch_logged = False + # Use a running string instead of list + join to avoid O(n^2) overhead. + # Previously "".join(str_so_far_parts) was called every chunk, re-joining + # the entire accumulated response. String += is O(n) amortized total. + _str_so_far: str = "" async for chunk in proxy_logging_obj.async_post_call_streaming_iterator_hook( user_api_key_dict=user_api_key_dict, response=response, @@ -5319,12 +5321,12 @@ async def async_data_generator( user_api_key_dict=user_api_key_dict, response=chunk, data=request_data, - str_so_far="".join(str_so_far_parts), + str_so_far=_str_so_far if _str_so_far else None, ) if isinstance(chunk, (ModelResponse, ModelResponseStream)): response_str = litellm.get_response_string(response_obj=chunk) - str_so_far_parts.append(response_str) + _str_so_far += response_str chunk, model_mismatch_logged = _restamp_streaming_chunk_model( chunk=chunk, diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index f6613b5548f..5e0d5336aa9 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -23,23 +23,31 @@ ) from litellm import _custom_logger_compatible_callbacks_literal -from litellm.constants import (DEFAULT_MODEL_CREATED_AT_TIME, - MAX_TEAM_LIST_LIMIT) -from litellm.proxy._types import (DB_CONNECTION_ERROR_TYPES, CommonProxyErrors, - ProxyErrorTypes, ProxyException, - SpendLogsMetadata, SpendLogsPayload) +from litellm.constants import DEFAULT_MODEL_CREATED_AT_TIME, MAX_TEAM_LIST_LIMIT +from litellm.proxy._types import ( + DB_CONNECTION_ERROR_TYPES, + CommonProxyErrors, + ProxyErrorTypes, + ProxyException, + SpendLogsMetadata, + SpendLogsPayload, +) from litellm.types.guardrails import GuardrailEventHooks from litellm.types.utils import CallTypes, CallTypesLiteral try: - from litellm_enterprise.enterprise_callbacks.send_emails.base_email import \ - BaseEmailLogger - from litellm_enterprise.enterprise_callbacks.send_emails.resend_email import \ - ResendEmailLogger - from litellm_enterprise.enterprise_callbacks.send_emails.sendgrid_email import \ - SendGridEmailLogger - from litellm_enterprise.enterprise_callbacks.send_emails.smtp_email import \ - SMTPEmailLogger + from litellm_enterprise.enterprise_callbacks.send_emails.base_email import ( + BaseEmailLogger, + ) + from litellm_enterprise.enterprise_callbacks.send_emails.resend_email import ( + ResendEmailLogger, + ) + from litellm_enterprise.enterprise_callbacks.send_emails.sendgrid_email import ( + SendGridEmailLogger, + ) + from litellm_enterprise.enterprise_callbacks.send_emails.smtp_email import ( + SMTPEmailLogger, + ) except ImportError: BaseEmailLogger = None # type: ignore SendGridEmailLogger = None # type: ignore @@ -58,56 +66,70 @@ import litellm import litellm.litellm_core_utils import litellm.litellm_core_utils.litellm_logging -from litellm import (EmbeddingResponse, ImageResponse, ModelResponse, - ModelResponseStream, Router) +from litellm import ( + EmbeddingResponse, + ImageResponse, + ModelResponse, + ModelResponseStream, + Router, +) from litellm._logging import verbose_proxy_logger from litellm._service_logger import ServiceLogging, ServiceTypes from litellm.caching.caching import DualCache, RedisCache from litellm.caching.dual_cache import LimitedSizeOrderedDict from litellm.exceptions import RejectedRequestError -from litellm.integrations.custom_guardrail import (CustomGuardrail, - ModifyResponseException) +from litellm.integrations.custom_guardrail import ( + CustomGuardrail, + ModifyResponseException, +) from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting -from litellm.integrations.SlackAlerting.utils import \ - _add_langfuse_trace_id_to_alert +from litellm.integrations.SlackAlerting.utils import _add_langfuse_trace_id_to_alert from litellm.litellm_core_utils.litellm_logging import Logging from litellm.litellm_core_utils.safe_json_dumps import safe_dumps from litellm.litellm_core_utils.safe_json_loads import safe_json_loads from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler -from litellm.proxy._types import (AlertType, CallInfo, - LiteLLM_VerificationTokenView, Member, - UserAPIKeyAuth) +from litellm.proxy._types import ( + AlertType, + CallInfo, + LiteLLM_VerificationTokenView, + Member, + UserAPIKeyAuth, +) from litellm.proxy.auth.route_checks import RouteChecks -from litellm.proxy.db.create_views import (create_missing_views, - should_create_missing_views) +from litellm.proxy.db.create_views import ( + create_missing_views, + should_create_missing_views, +) from litellm.proxy.db.db_spend_update_writer import DBSpendUpdateWriter from litellm.proxy.db.exception_handler import PrismaDBExceptionHandler from litellm.proxy.db.log_db_metrics import log_db_metrics from litellm.proxy.db.prisma_client import PrismaWrapper -from litellm.proxy.guardrails.guardrail_hooks.unified_guardrail.unified_guardrail import \ - UnifiedLLMGuardrails +from litellm.proxy.guardrails.guardrail_hooks.unified_guardrail.unified_guardrail import ( + UnifiedLLMGuardrails, +) from litellm.proxy.hooks import PROXY_HOOKS, get_proxy_hook from litellm.proxy.hooks.cache_control_check import _PROXY_CacheControlCheck from litellm.proxy.hooks.max_budget_limiter import _PROXY_MaxBudgetLimiter -from litellm.proxy.hooks.parallel_request_limiter import \ - _PROXY_MaxParallelRequestsHandler +from litellm.proxy.hooks.parallel_request_limiter import ( + _PROXY_MaxParallelRequestsHandler, +) from litellm.proxy.litellm_pre_call_utils import LiteLLMProxyRequestSetup from litellm.proxy.policy_engine.pipeline_executor import PipelineExecutor from litellm.secret_managers.main import str_to_bool from litellm.types.integrations.slack_alerting import DEFAULT_ALERT_TYPES -from litellm.types.mcp import (MCPDuringCallResponseObject, - MCPPreCallRequestObject, - MCPPreCallResponseObject) -from litellm.types.proxy.policy_engine.pipeline_types import \ - PipelineExecutionResult +from litellm.types.mcp import ( + MCPDuringCallResponseObject, + MCPPreCallRequestObject, + MCPPreCallResponseObject, +) +from litellm.types.proxy.policy_engine.pipeline_types import PipelineExecutionResult from litellm.types.utils import LLMResponseTypes, LoggedLiteLLMParams if TYPE_CHECKING: from opentelemetry.trace import Span as _Span - from litellm.litellm_core_utils.litellm_logging import \ - Logging as LiteLLMLoggingObj + from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj Span = Union[_Span, Any] else: @@ -1050,9 +1072,10 @@ async def _process_prompt_template( """Process prompt template if applicable.""" from litellm.proxy.prompts.prompt_endpoints import ( - construct_versioned_prompt_id, get_latest_version_prompt_id) - from litellm.proxy.prompts.prompt_registry import \ - IN_MEMORY_PROMPT_REGISTRY + construct_versioned_prompt_id, + get_latest_version_prompt_id, + ) + from litellm.proxy.prompts.prompt_registry import IN_MEMORY_PROMPT_REGISTRY from litellm.utils import get_non_default_completion_params if prompt_version is None: @@ -1102,8 +1125,9 @@ async def _process_prompt_template( def _process_guardrail_metadata(self, data: dict) -> None: """Process guardrails from metadata and add to applied_guardrails.""" - from litellm.proxy.common_utils.callback_utils import \ - add_guardrail_to_applied_guardrails_header + from litellm.proxy.common_utils.callback_utils import ( + add_guardrail_to_applied_guardrails_header, + ) metadata_standard = data.get("metadata") or {} metadata_litellm = data.get("litellm_metadata") or {} @@ -2000,27 +2024,32 @@ async def async_post_call_streaming_hook( if isinstance(response, (ModelResponse, ModelResponseStream)): response_str = litellm.get_response_string(response_obj=response) elif isinstance(response, dict) and self.is_a2a_streaming_response(response): - from litellm.llms.a2a.common_utils import \ - extract_text_from_a2a_response + from litellm.llms.a2a.common_utils import extract_text_from_a2a_response response_str = extract_text_from_a2a_response(response) if response_str is not None: + # Cache model-level guardrails check per-request to avoid repeated + # dict lookups + llm_router.get_deployment() per callback per chunk. + _cached_guardrail_data: Optional[dict] = None + _guardrail_data_computed = False + for callback in litellm.callbacks: try: _callback: Optional[CustomLogger] = None if isinstance(callback, CustomGuardrail): # Main - V2 Guardrails implementation - from litellm.types.guardrails import \ - GuardrailEventHooks + from litellm.types.guardrails import GuardrailEventHooks - ## CHECK FOR MODEL-LEVEL GUARDRAILS - modified_data = _check_and_merge_model_level_guardrails( - data=data, llm_router=llm_router - ) + ## CHECK FOR MODEL-LEVEL GUARDRAILS (cached per-request) + if not _guardrail_data_computed: + _cached_guardrail_data = _check_and_merge_model_level_guardrails( + data=data, llm_router=llm_router + ) + _guardrail_data_computed = True if ( callback.should_run_guardrail( - data=modified_data, + data=_cached_guardrail_data, event_type=GuardrailEventHooks.post_call, ) is not True @@ -4626,8 +4655,9 @@ async def update_spend_logs_job( # Guardrail/policy usage tracking (same batch, outside spend-logs update) try: - from litellm.proxy.guardrails.usage_tracking import \ - process_spend_logs_guardrail_usage + from litellm.proxy.guardrails.usage_tracking import ( + process_spend_logs_guardrail_usage, + ) await process_spend_logs_guardrail_usage( prisma_client=prisma_client, logs_to_process=logs_to_process, @@ -4653,8 +4683,10 @@ async def _monitor_spend_logs_queue( db_writer_client: Optional HTTP handler for external spend logs endpoint proxy_logging_obj: Proxy logging object """ - from litellm.constants import (SPEND_LOG_QUEUE_POLL_INTERVAL, - SPEND_LOG_QUEUE_SIZE_THRESHOLD) + from litellm.constants import ( + SPEND_LOG_QUEUE_POLL_INTERVAL, + SPEND_LOG_QUEUE_SIZE_THRESHOLD, + ) threshold = SPEND_LOG_QUEUE_SIZE_THRESHOLD base_interval = SPEND_LOG_QUEUE_POLL_INTERVAL @@ -5175,11 +5207,12 @@ async def get_available_models_for_user( List of model names available to the user """ from litellm.proxy.auth.auth_checks import get_team_object - from litellm.proxy.auth.model_checks import (get_complete_model_list, - get_key_models, - get_team_models) - from litellm.proxy.management_endpoints.team_endpoints import \ - validate_membership + from litellm.proxy.auth.model_checks import ( + get_complete_model_list, + get_key_models, + get_team_models, + ) + from litellm.proxy.management_endpoints.team_endpoints import validate_membership # Get proxy model list and access groups if llm_router is None: