Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions litellm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@

# Aiohttp connection pooling - prevents memory leaks from unbounded connection growth
# Set to 0 for unlimited (not recommended for production)
AIOHTTP_CONNECTOR_LIMIT = int(os.getenv("AIOHTTP_CONNECTOR_LIMIT", 300))
AIOHTTP_CONNECTOR_LIMIT = int(os.getenv("AIOHTTP_CONNECTOR_LIMIT", 1000))
AIOHTTP_CONNECTOR_LIMIT_PER_HOST = int(
os.getenv("AIOHTTP_CONNECTOR_LIMIT_PER_HOST", 50)
os.getenv("AIOHTTP_CONNECTOR_LIMIT_PER_HOST", 500)
)
AIOHTTP_KEEPALIVE_TIMEOUT = int(os.getenv("AIOHTTP_KEEPALIVE_TIMEOUT", 120))
AIOHTTP_TTL_DNS_CACHE = int(os.getenv("AIOHTTP_TTL_DNS_CACHE", 300))
Expand Down
21 changes: 13 additions & 8 deletions litellm/litellm_core_utils/streaming_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1968,22 +1968,24 @@ async def __anext__(self) -> "ModelResponseStream": # noqa: PLR0915
self.rules.post_call_rules(
input=self.response_uptil_now, model=self.model
)
# Store a shallow copy so usage stripping below
# does not mutate the stored chunk.
self.chunks.append(processed_chunk.model_copy())

# Add mcp_list_tools to first chunk if present
if not self.sent_first_chunk:
processed_chunk = self._add_mcp_list_tools_to_first_chunk(processed_chunk)
self.sent_first_chunk = True
if (

_has_usage = (
hasattr(processed_chunk, "usage")
and getattr(processed_chunk, "usage", None) is not None
):
)

if _has_usage:
# Store a copy ONLY when usage stripping below will mutate
# the chunk. For non-usage chunks (vast majority), store
# directly to avoid expensive model_copy() per chunk.
self.chunks.append(processed_chunk.model_copy())

# Strip usage from the outgoing chunk so it's not sent twice
# (once in the chunk, once in _hidden_params).
# Create a new object without usage, matching sync behavior.
# The copy in self.chunks retains usage for calculate_total_usage().
obj_dict = processed_chunk.model_dump()
if "usage" in obj_dict:
del obj_dict["usage"]
Expand All @@ -1995,6 +1997,9 @@ async def __anext__(self) -> "ModelResponseStream": # noqa: PLR0915
)
if is_empty:
continue
else:
# No usage data — safe to store directly without copying
self.chunks.append(processed_chunk)

# add usage as hidden param
if self.sent_last_chunk is True and self.stream_options is None:
Expand Down
10 changes: 6 additions & 4 deletions litellm/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5302,13 +5302,15 @@ async def async_data_generator(
):
verbose_proxy_logger.debug("inside generator")
try:
# Use a list to accumulate response segments to avoid O(n^2) string concatenation
str_so_far_parts: list[str] = []
error_message: Optional[str] = None
requested_model_from_client = _get_client_requested_model_for_streaming(
request_data=request_data
)
model_mismatch_logged = False
# Use a running string instead of list + join to avoid O(n^2) overhead.
# Previously "".join(str_so_far_parts) was called every chunk, re-joining
# the entire accumulated response. String += is O(n) amortized total.
_str_so_far: str = ""
async for chunk in proxy_logging_obj.async_post_call_streaming_iterator_hook(
user_api_key_dict=user_api_key_dict,
response=response,
Expand All @@ -5319,12 +5321,12 @@ async def async_data_generator(
user_api_key_dict=user_api_key_dict,
response=chunk,
data=request_data,
str_so_far="".join(str_so_far_parts),
str_so_far=_str_so_far if _str_so_far else None,
)

if isinstance(chunk, (ModelResponse, ModelResponseStream)):
response_str = litellm.get_response_string(response_obj=chunk)
str_so_far_parts.append(response_str)
_str_so_far += response_str

chunk, model_mismatch_logged = _restamp_streaming_chunk_model(
chunk=chunk,
Expand Down
149 changes: 91 additions & 58 deletions litellm/proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,31 @@
)

from litellm import _custom_logger_compatible_callbacks_literal
from litellm.constants import (DEFAULT_MODEL_CREATED_AT_TIME,
MAX_TEAM_LIST_LIMIT)
from litellm.proxy._types import (DB_CONNECTION_ERROR_TYPES, CommonProxyErrors,
ProxyErrorTypes, ProxyException,
SpendLogsMetadata, SpendLogsPayload)
from litellm.constants import DEFAULT_MODEL_CREATED_AT_TIME, MAX_TEAM_LIST_LIMIT
from litellm.proxy._types import (
DB_CONNECTION_ERROR_TYPES,
CommonProxyErrors,
ProxyErrorTypes,
ProxyException,
SpendLogsMetadata,
SpendLogsPayload,
)
from litellm.types.guardrails import GuardrailEventHooks
from litellm.types.utils import CallTypes, CallTypesLiteral

try:
from litellm_enterprise.enterprise_callbacks.send_emails.base_email import \
BaseEmailLogger
from litellm_enterprise.enterprise_callbacks.send_emails.resend_email import \
ResendEmailLogger
from litellm_enterprise.enterprise_callbacks.send_emails.sendgrid_email import \
SendGridEmailLogger
from litellm_enterprise.enterprise_callbacks.send_emails.smtp_email import \
SMTPEmailLogger
from litellm_enterprise.enterprise_callbacks.send_emails.base_email import (
BaseEmailLogger,
)
from litellm_enterprise.enterprise_callbacks.send_emails.resend_email import (
ResendEmailLogger,
)
from litellm_enterprise.enterprise_callbacks.send_emails.sendgrid_email import (
SendGridEmailLogger,
)
from litellm_enterprise.enterprise_callbacks.send_emails.smtp_email import (
SMTPEmailLogger,
)
except ImportError:
BaseEmailLogger = None # type: ignore
SendGridEmailLogger = None # type: ignore
Expand All @@ -58,56 +66,70 @@
import litellm
import litellm.litellm_core_utils
import litellm.litellm_core_utils.litellm_logging
from litellm import (EmbeddingResponse, ImageResponse, ModelResponse,
ModelResponseStream, Router)
from litellm import (
EmbeddingResponse,
ImageResponse,
ModelResponse,
ModelResponseStream,
Router,
)
from litellm._logging import verbose_proxy_logger
from litellm._service_logger import ServiceLogging, ServiceTypes
from litellm.caching.caching import DualCache, RedisCache
from litellm.caching.dual_cache import LimitedSizeOrderedDict
from litellm.exceptions import RejectedRequestError
from litellm.integrations.custom_guardrail import (CustomGuardrail,
ModifyResponseException)
from litellm.integrations.custom_guardrail import (
CustomGuardrail,
ModifyResponseException,
)
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
from litellm.integrations.SlackAlerting.utils import \
_add_langfuse_trace_id_to_alert
from litellm.integrations.SlackAlerting.utils import _add_langfuse_trace_id_to_alert
from litellm.litellm_core_utils.litellm_logging import Logging
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
from litellm.litellm_core_utils.safe_json_loads import safe_json_loads
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
from litellm.proxy._types import (AlertType, CallInfo,
LiteLLM_VerificationTokenView, Member,
UserAPIKeyAuth)
from litellm.proxy._types import (
AlertType,
CallInfo,
LiteLLM_VerificationTokenView,
Member,
UserAPIKeyAuth,
)
from litellm.proxy.auth.route_checks import RouteChecks
from litellm.proxy.db.create_views import (create_missing_views,
should_create_missing_views)
from litellm.proxy.db.create_views import (
create_missing_views,
should_create_missing_views,
)
from litellm.proxy.db.db_spend_update_writer import DBSpendUpdateWriter
from litellm.proxy.db.exception_handler import PrismaDBExceptionHandler
from litellm.proxy.db.log_db_metrics import log_db_metrics
from litellm.proxy.db.prisma_client import PrismaWrapper
from litellm.proxy.guardrails.guardrail_hooks.unified_guardrail.unified_guardrail import \
UnifiedLLMGuardrails
from litellm.proxy.guardrails.guardrail_hooks.unified_guardrail.unified_guardrail import (
UnifiedLLMGuardrails,
)
from litellm.proxy.hooks import PROXY_HOOKS, get_proxy_hook
from litellm.proxy.hooks.cache_control_check import _PROXY_CacheControlCheck
from litellm.proxy.hooks.max_budget_limiter import _PROXY_MaxBudgetLimiter
from litellm.proxy.hooks.parallel_request_limiter import \
_PROXY_MaxParallelRequestsHandler
from litellm.proxy.hooks.parallel_request_limiter import (
_PROXY_MaxParallelRequestsHandler,
)
from litellm.proxy.litellm_pre_call_utils import LiteLLMProxyRequestSetup
from litellm.proxy.policy_engine.pipeline_executor import PipelineExecutor
from litellm.secret_managers.main import str_to_bool
from litellm.types.integrations.slack_alerting import DEFAULT_ALERT_TYPES
from litellm.types.mcp import (MCPDuringCallResponseObject,
MCPPreCallRequestObject,
MCPPreCallResponseObject)
from litellm.types.proxy.policy_engine.pipeline_types import \
PipelineExecutionResult
from litellm.types.mcp import (
MCPDuringCallResponseObject,
MCPPreCallRequestObject,
MCPPreCallResponseObject,
)
from litellm.types.proxy.policy_engine.pipeline_types import PipelineExecutionResult
from litellm.types.utils import LLMResponseTypes, LoggedLiteLLMParams

if TYPE_CHECKING:
from opentelemetry.trace import Span as _Span

from litellm.litellm_core_utils.litellm_logging import \
Logging as LiteLLMLoggingObj
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj

Span = Union[_Span, Any]
else:
Expand Down Expand Up @@ -1050,9 +1072,10 @@ async def _process_prompt_template(
"""Process prompt template if applicable."""

from litellm.proxy.prompts.prompt_endpoints import (
construct_versioned_prompt_id, get_latest_version_prompt_id)
from litellm.proxy.prompts.prompt_registry import \
IN_MEMORY_PROMPT_REGISTRY
construct_versioned_prompt_id,
get_latest_version_prompt_id,
)
from litellm.proxy.prompts.prompt_registry import IN_MEMORY_PROMPT_REGISTRY
from litellm.utils import get_non_default_completion_params

if prompt_version is None:
Expand Down Expand Up @@ -1102,8 +1125,9 @@ async def _process_prompt_template(

def _process_guardrail_metadata(self, data: dict) -> None:
"""Process guardrails from metadata and add to applied_guardrails."""
from litellm.proxy.common_utils.callback_utils import \
add_guardrail_to_applied_guardrails_header
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)

metadata_standard = data.get("metadata") or {}
metadata_litellm = data.get("litellm_metadata") or {}
Expand Down Expand Up @@ -2000,27 +2024,32 @@ async def async_post_call_streaming_hook(
if isinstance(response, (ModelResponse, ModelResponseStream)):
response_str = litellm.get_response_string(response_obj=response)
elif isinstance(response, dict) and self.is_a2a_streaming_response(response):
from litellm.llms.a2a.common_utils import \
extract_text_from_a2a_response
from litellm.llms.a2a.common_utils import extract_text_from_a2a_response

response_str = extract_text_from_a2a_response(response)
if response_str is not None:
# Cache model-level guardrails check per-request to avoid repeated
# dict lookups + llm_router.get_deployment() per callback per chunk.
_cached_guardrail_data: Optional[dict] = None
_guardrail_data_computed = False
Comment on lines +2031 to +2034
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache scope is per-chunk, not per-request

The comment says "Cache model-level guardrails check per-request" but the variables _cached_guardrail_data and _guardrail_data_computed are local to async_post_call_streaming_hook, which is called once per chunk (from async_data_generator). So the cache is only effective within a single invocation — it avoids calling _check_and_merge_model_level_guardrails() once per CustomGuardrail callback within the same chunk, but still calls it once per chunk.

This is still a useful optimization if there are multiple guardrail callbacks, but the PR description claims it's "computed once per streaming response and cached for the rest," which is not accurate. To truly cache per-request, the cache would need to be stored outside this method (e.g., on self or passed in as state from the caller).


for callback in litellm.callbacks:
try:
_callback: Optional[CustomLogger] = None
if isinstance(callback, CustomGuardrail):
# Main - V2 Guardrails implementation
from litellm.types.guardrails import \
GuardrailEventHooks
from litellm.types.guardrails import GuardrailEventHooks

## CHECK FOR MODEL-LEVEL GUARDRAILS
modified_data = _check_and_merge_model_level_guardrails(
data=data, llm_router=llm_router
)
## CHECK FOR MODEL-LEVEL GUARDRAILS (cached per-request)
if not _guardrail_data_computed:
_cached_guardrail_data = _check_and_merge_model_level_guardrails(
data=data, llm_router=llm_router
)
_guardrail_data_computed = True

if (
callback.should_run_guardrail(
data=modified_data,
data=_cached_guardrail_data,
event_type=GuardrailEventHooks.post_call,
)
is not True
Expand Down Expand Up @@ -4626,8 +4655,9 @@ async def update_spend_logs_job(

# Guardrail/policy usage tracking (same batch, outside spend-logs update)
try:
from litellm.proxy.guardrails.usage_tracking import \
process_spend_logs_guardrail_usage
from litellm.proxy.guardrails.usage_tracking import (
process_spend_logs_guardrail_usage,
)
await process_spend_logs_guardrail_usage(
prisma_client=prisma_client,
logs_to_process=logs_to_process,
Expand All @@ -4653,8 +4683,10 @@ async def _monitor_spend_logs_queue(
db_writer_client: Optional HTTP handler for external spend logs endpoint
proxy_logging_obj: Proxy logging object
"""
from litellm.constants import (SPEND_LOG_QUEUE_POLL_INTERVAL,
SPEND_LOG_QUEUE_SIZE_THRESHOLD)
from litellm.constants import (
SPEND_LOG_QUEUE_POLL_INTERVAL,
SPEND_LOG_QUEUE_SIZE_THRESHOLD,
)

threshold = SPEND_LOG_QUEUE_SIZE_THRESHOLD
base_interval = SPEND_LOG_QUEUE_POLL_INTERVAL
Expand Down Expand Up @@ -5175,11 +5207,12 @@ async def get_available_models_for_user(
List of model names available to the user
"""
from litellm.proxy.auth.auth_checks import get_team_object
from litellm.proxy.auth.model_checks import (get_complete_model_list,
get_key_models,
get_team_models)
from litellm.proxy.management_endpoints.team_endpoints import \
validate_membership
from litellm.proxy.auth.model_checks import (
get_complete_model_list,
get_key_models,
get_team_models,
)
from litellm.proxy.management_endpoints.team_endpoints import validate_membership

# Get proxy model list and access groups
if llm_router is None:
Expand Down
Loading