diff --git a/docs/my-website/docs/proxy/alerting.md b/docs/my-website/docs/proxy/alerting.md index 38d6d47be445..e9afe2d99390 100644 --- a/docs/my-website/docs/proxy/alerting.md +++ b/docs/my-website/docs/proxy/alerting.md @@ -438,6 +438,59 @@ curl -X GET --location 'http://0.0.0.0:4000/health/services?service=webhook' \ - `event_message` *str*: A human-readable description of the event. +### Digest Mode (Reducing Alert Noise) + +By default, LiteLLM sends a separate Slack message for **every** alert event. For high-frequency alert types like `llm_requests_hanging` or `llm_too_slow`, this can produce hundreds of duplicate messages per day. + +**Digest mode** aggregates duplicate alerts within a configurable time window and emits a single summary message with the total count and time range. + +#### Configuration + +Use `alert_type_config` in `general_settings` to enable digest mode per alert type: + +```yaml +general_settings: + alerting: ["slack"] + alert_type_config: + llm_requests_hanging: + digest: true + digest_interval: 86400 # 24 hours (default) + llm_too_slow: + digest: true + digest_interval: 3600 # 1 hour + llm_exceptions: + digest: true + # uses default interval (86400 seconds / 24 hours) +``` + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `digest` | bool | `false` | Enable digest mode for this alert type | +| `digest_interval` | int | `86400` (24h) | Time window in seconds. Alerts are aggregated within this interval. | + +#### How It Works + +1. When an alert fires for a digest-enabled type, it is **grouped** by `(alert_type, request_model, api_base)` instead of being sent immediately +2. A counter tracks how many times the alert fires within the interval +3. When the interval expires, a **single summary message** is sent: + +``` +Alert type: `llm_requests_hanging` (Digest) +Level: `Medium` +Start: `2026-02-19 03:27:39` +End: `2026-02-20 03:27:39` +Count: `847` + +Message: `Requests are hanging - 600s+ request time` +Request Model: `gemini-2.5-flash` +API Base: `None` +``` + +#### Limitations + +- **Per-instance**: Digest state is held in memory per proxy instance. If you run multiple instances (e.g., Cloud Run with autoscaling), each instance maintains its own digest and emits its own summary. +- **Not durable**: If an instance is terminated before the digest interval expires, the aggregated alerts for that instance are lost. + ## Region-outage alerting (✨ Enterprise feature) :::info diff --git a/litellm/integrations/SlackAlerting/hanging_request_check.py b/litellm/integrations/SlackAlerting/hanging_request_check.py index 713e790ba901..d2f70c9caf14 100644 --- a/litellm/integrations/SlackAlerting/hanging_request_check.py +++ b/litellm/integrations/SlackAlerting/hanging_request_check.py @@ -172,4 +172,6 @@ async def send_hanging_request_alert( level="Medium", alert_type=AlertType.llm_requests_hanging, alerting_metadata=hanging_request_data.alerting_metadata or {}, + request_model=hanging_request_data.model, + api_base=hanging_request_data.api_base, ) diff --git a/litellm/integrations/SlackAlerting/slack_alerting.py b/litellm/integrations/SlackAlerting/slack_alerting.py index 8fb3e132ded4..03d23a98633c 100644 --- a/litellm/integrations/SlackAlerting/slack_alerting.py +++ b/litellm/integrations/SlackAlerting/slack_alerting.py @@ -70,6 +70,7 @@ def __init__( ] = None, # if user wants to separate alerts to diff channels alerting_args={}, default_webhook_url: Optional[str] = None, + alert_type_config: Optional[Dict[str, dict]] = None, **kwargs, ): if alerting_threshold is None: @@ -92,6 +93,12 @@ def __init__( self.hanging_request_check = AlertingHangingRequestCheck( slack_alerting_object=self, ) + self.alert_type_config: Dict[str, AlertTypeConfig] = {} + if alert_type_config: + for key, val in alert_type_config.items(): + self.alert_type_config[key] = AlertTypeConfig(**val) if isinstance(val, dict) else val + self.digest_buckets: Dict[str, DigestEntry] = {} + self.digest_lock = asyncio.Lock() super().__init__(**kwargs, flush_lock=self.flush_lock) def update_values( @@ -102,6 +109,7 @@ def update_values( alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None, alerting_args: Optional[Dict] = None, llm_router: Optional[Router] = None, + alert_type_config: Optional[Dict[str, dict]] = None, ): if alerting is not None: self.alerting = alerting @@ -116,6 +124,9 @@ def update_values( if not self.periodic_started: asyncio.create_task(self.periodic_flush()) self.periodic_started = True + if alert_type_config is not None: + for key, val in alert_type_config.items(): + self.alert_type_config[key] = AlertTypeConfig(**val) if isinstance(val, dict) else val if alert_to_webhook_url is not None: # update the dict @@ -284,6 +295,8 @@ async def response_taking_too_long_callback( level="Low", alert_type=AlertType.llm_too_slow, alerting_metadata=alerting_metadata, + request_model=model, + api_base=api_base, ) async def async_update_daily_reports( @@ -1360,6 +1373,8 @@ async def send_alert( alert_type: AlertType, alerting_metadata: dict, user_info: Optional[WebhookEvent] = None, + request_model: Optional[str] = None, + api_base: Optional[str] = None, **kwargs, ): """ @@ -1375,6 +1390,8 @@ async def send_alert( Parameters: level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. message: str - what is the alert about + request_model: Optional[str] - model name for digest grouping + api_base: Optional[str] - api base for digest grouping """ if self.alerting is None: return @@ -1408,6 +1425,44 @@ async def send_alert( from datetime import datetime + # Check if digest mode is enabled for this alert type + alert_type_name_str = getattr(alert_type, "value", str(alert_type)) + _atc = self.alert_type_config.get(alert_type_name_str) + if _atc is not None and _atc.digest: + # Resolve webhook URL for this alert type (needed for digest entry) + if ( + self.alert_to_webhook_url is not None + and alert_type in self.alert_to_webhook_url + ): + _digest_webhook: Optional[Union[str, List[str]]] = self.alert_to_webhook_url[alert_type] + elif self.default_webhook_url is not None: + _digest_webhook = self.default_webhook_url + else: + _digest_webhook = os.getenv("SLACK_WEBHOOK_URL", None) + if _digest_webhook is None: + raise ValueError("Missing SLACK_WEBHOOK_URL from environment") + + digest_key = f"{alert_type_name_str}:{request_model or ''}:{api_base or ''}" + + async with self.digest_lock: + now = datetime.now() + if digest_key in self.digest_buckets: + self.digest_buckets[digest_key]["count"] += 1 + self.digest_buckets[digest_key]["last_time"] = now + else: + self.digest_buckets[digest_key] = DigestEntry( + alert_type=alert_type_name_str, + request_model=request_model or "", + api_base=api_base or "", + first_message=message, + level=level, + count=1, + start_time=now, + last_time=now, + webhook_url=_digest_webhook, + ) + return # Suppress immediate alert; will be emitted by _flush_digest_buckets + # Get the current timestamp current_time = datetime.now().strftime("%H:%M:%S") _proxy_base_url = os.getenv("PROXY_BASE_URL", None) @@ -1483,6 +1538,72 @@ async def async_send_batch(self): await asyncio.gather(*tasks) self.log_queue.clear() + async def _flush_digest_buckets(self): + """Flush any digest buckets whose interval has expired. + + For each expired bucket, formats a digest summary message and + appends it to the log_queue for delivery via the normal batching path. + """ + from datetime import datetime + + now = datetime.now() + flushed_keys: List[str] = [] + + async with self.digest_lock: + for key, entry in self.digest_buckets.items(): + alert_type_name = entry["alert_type"] + _atc = self.alert_type_config.get(alert_type_name) + if _atc is None: + continue + elapsed = (now - entry["start_time"]).total_seconds() + if elapsed < _atc.digest_interval: + continue + + # Build digest summary message + start_ts = entry["start_time"].strftime("%H:%M:%S") + end_ts = entry["last_time"].strftime("%H:%M:%S") + start_date = entry["start_time"].strftime("%Y-%m-%d") + end_date = entry["last_time"].strftime("%Y-%m-%d") + formatted_message = ( + f"Alert type: `{alert_type_name}` (Digest)\n" + f"Level: `{entry['level']}`\n" + f"Start: `{start_date} {start_ts}`\n" + f"End: `{end_date} {end_ts}`\n" + f"Count: `{entry['count']}`\n\n" + f"Message: {entry['first_message']}" + ) + _proxy_base_url = os.getenv("PROXY_BASE_URL", None) + if _proxy_base_url is not None: + formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" + + payload = {"text": formatted_message} + headers = {"Content-type": "application/json"} + webhook_url = entry["webhook_url"] + + if isinstance(webhook_url, list): + for url in webhook_url: + self.log_queue.append( + {"url": url, "headers": headers, "payload": payload, "alert_type": alert_type_name} + ) + else: + self.log_queue.append( + {"url": webhook_url, "headers": headers, "payload": payload, "alert_type": alert_type_name} + ) + flushed_keys.append(key) + + for key in flushed_keys: + del self.digest_buckets[key] + + async def periodic_flush(self): + """Override base periodic_flush to also flush digest buckets.""" + while True: + await asyncio.sleep(self.flush_interval) + try: + await self._flush_digest_buckets() + except Exception as e: + verbose_proxy_logger.debug(f"Error flushing digest buckets: {str(e)}") + await self.flush_queue() + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): """Log deployment latency""" try: diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 22d635799865..1a14293f90ac 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -2982,6 +2982,7 @@ def _load_alerting_settings(self, general_settings: dict): alert_types=general_settings.get("alert_types", None), alert_to_webhook_url=general_settings.get("alert_to_webhook_url", None), alerting_args=general_settings.get("alerting_args", None), + alert_type_config=general_settings.get("alert_type_config", None), redis_cache=redis_usage_cache, ) diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 0aace65ff6b5..a4cb30f6f728 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -378,6 +378,7 @@ def update_values( alert_types: Optional[List[AlertType]] = None, alerting_args: Optional[dict] = None, alert_to_webhook_url: Optional[dict] = None, + alert_type_config: Optional[dict] = None, ): updated_slack_alerting: bool = False if alerting is not None: @@ -392,6 +393,8 @@ def update_values( if alert_to_webhook_url is not None: self.alert_to_webhook_url = alert_to_webhook_url updated_slack_alerting = True + if alert_type_config is not None: + updated_slack_alerting = True if updated_slack_alerting is True: self.slack_alerting_instance.update_values( @@ -400,6 +403,7 @@ def update_values( alert_types=self.alert_types, alerting_args=alerting_args, alert_to_webhook_url=self.alert_to_webhook_url, + alert_type_config=alert_type_config, ) if self.alerting is not None and "slack" in self.alerting: diff --git a/litellm/types/integrations/slack_alerting.py b/litellm/types/integrations/slack_alerting.py index 856640638c27..078e7953ad8a 100644 --- a/litellm/types/integrations/slack_alerting.py +++ b/litellm/types/integrations/slack_alerting.py @@ -1,13 +1,15 @@ import os from datetime import datetime as dt from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Set +from typing import Any, Dict, List, Literal, Optional, Set, Union from pydantic import BaseModel, Field from typing_extensions import TypedDict from litellm.types.utils import LiteLLMPydanticObjectBase +DEFAULT_DIGEST_INTERVAL = 86400 # 24 hours in seconds + SLACK_ALERTING_THRESHOLD_5_PERCENT = 0.05 SLACK_ALERTING_THRESHOLD_15_PERCENT = 0.15 MAX_OLDEST_HANGING_REQUESTS_TO_CHECK = 20 @@ -199,3 +201,30 @@ class HangingRequestData(BaseModel): key_alias: Optional[str] = None team_alias: Optional[str] = None alerting_metadata: Optional[dict] = None + + +class AlertTypeConfig(LiteLLMPydanticObjectBase): + """Per-alert-type configuration, including digest mode settings.""" + + digest: bool = Field( + default=False, + description="Enable digest mode for this alert type. When enabled, duplicate alerts are aggregated into a single summary message.", + ) + digest_interval: int = Field( + default=DEFAULT_DIGEST_INTERVAL, + description="Digest window in seconds. Alerts are aggregated within this interval. Default 24 hours.", + ) + + +class DigestEntry(TypedDict): + """Tracks an in-flight digest bucket for a unique (alert_type, model, api_base) combination.""" + + alert_type: str + request_model: str + api_base: str + first_message: str + level: str + count: int + start_time: dt + last_time: dt + webhook_url: Union[str, List[str]] diff --git a/tests/test_litellm/integrations/SlackAlerting/test_slack_alerting_digest.py b/tests/test_litellm/integrations/SlackAlerting/test_slack_alerting_digest.py new file mode 100644 index 000000000000..eeb3640dd87f --- /dev/null +++ b/tests/test_litellm/integrations/SlackAlerting/test_slack_alerting_digest.py @@ -0,0 +1,235 @@ +""" +Tests for Slack Alert Digest Mode + +Verifies that: +- Digest mode suppresses duplicate alerts within the interval +- Digest summary is emitted after the interval expires +- Non-digest alert types are unaffected +- Different (model, api_base) combos get separate digest entries +- The digest message format includes Start/End timestamps and Count +""" + +import os +import sys +import unittest +from datetime import datetime, timedelta + +sys.path.insert(0, os.path.abspath("../../..")) + +from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting +from litellm.proxy._types import AlertType +from litellm.types.integrations.slack_alerting import AlertTypeConfig + + +class TestDigestMode(unittest.IsolatedAsyncioTestCase): + """Test digest mode in SlackAlerting.send_alert().""" + + def setUp(self): + os.environ["SLACK_WEBHOOK_URL"] = "https://hooks.slack.com/test" + self.slack_alerting = SlackAlerting( + alerting=["slack"], + alert_type_config={ + "llm_requests_hanging": {"digest": True, "digest_interval": 60}, + }, + ) + # Prevent periodic flush from starting + self.slack_alerting.periodic_started = True + + def tearDown(self): + os.environ.pop("SLACK_WEBHOOK_URL", None) + + async def test_digest_suppresses_duplicate_alerts(self): + """Sending the same alert type + model + api_base multiple times should NOT add to log_queue.""" + message = "`Requests are hanging`\nRequest Model: `gemini-2.5-flash`\nAPI Base: `None`" + + for _ in range(5): + await self.slack_alerting.send_alert( + message=message, + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + request_model="gemini-2.5-flash", + api_base="None", + ) + + # No messages should be in the log queue - they're all in digest_buckets + self.assertEqual(len(self.slack_alerting.log_queue), 0) + # Should have exactly 1 digest bucket entry + self.assertEqual(len(self.slack_alerting.digest_buckets), 1) + # Count should be 5 + bucket = list(self.slack_alerting.digest_buckets.values())[0] + self.assertEqual(bucket["count"], 5) + + async def test_different_models_get_separate_digests(self): + """Different models should produce separate digest entries.""" + await self.slack_alerting.send_alert( + message="`Requests are hanging`", + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + request_model="gemini-2.5-flash", + api_base="None", + ) + await self.slack_alerting.send_alert( + message="`Requests are hanging`", + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + request_model="gpt-4", + api_base="https://api.openai.com", + ) + + self.assertEqual(len(self.slack_alerting.digest_buckets), 2) + + async def test_non_digest_alert_goes_to_queue(self): + """Alert types without digest enabled should go straight to the log queue.""" + message = "Budget exceeded" + + await self.slack_alerting.send_alert( + message=message, + level="High", + alert_type=AlertType.budget_alerts, + alerting_metadata={}, + ) + + # Should be in log_queue, not digest_buckets + self.assertGreater(len(self.slack_alerting.log_queue), 0) + self.assertEqual(len(self.slack_alerting.digest_buckets), 0) + + async def test_flush_digest_buckets_emits_after_interval(self): + """After the digest interval expires, _flush_digest_buckets should emit a summary.""" + message = "`Requests are hanging`\nRequest Model: `gemini-2.5-flash`\nAPI Base: `None`" + + # Send 3 alerts + for _ in range(3): + await self.slack_alerting.send_alert( + message=message, + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + request_model="gemini-2.5-flash", + api_base="None", + ) + + self.assertEqual(len(self.slack_alerting.log_queue), 0) + self.assertEqual(len(self.slack_alerting.digest_buckets), 1) + + # Manually backdate the start_time to simulate interval expiration + key = list(self.slack_alerting.digest_buckets.keys())[0] + self.slack_alerting.digest_buckets[key]["start_time"] = datetime.now() - timedelta(seconds=120) + + # Flush digest buckets + await self.slack_alerting._flush_digest_buckets() + + # Digest bucket should be cleared + self.assertEqual(len(self.slack_alerting.digest_buckets), 0) + # And a summary message should be in the log queue + self.assertEqual(len(self.slack_alerting.log_queue), 1) + payload_text = self.slack_alerting.log_queue[0]["payload"]["text"] + self.assertIn("(Digest)", payload_text) + self.assertIn("Count: `3`", payload_text) + self.assertIn("Start:", payload_text) + self.assertIn("End:", payload_text) + + async def test_flush_does_not_emit_before_interval(self): + """Digest buckets should NOT be flushed before the interval expires.""" + message = "`Requests are hanging`" + + await self.slack_alerting.send_alert( + message=message, + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + request_model="gemini-2.5-flash", + ) + + # Flush immediately (interval hasn't expired) + await self.slack_alerting._flush_digest_buckets() + + # Bucket should still be there + self.assertEqual(len(self.slack_alerting.digest_buckets), 1) + self.assertEqual(len(self.slack_alerting.log_queue), 0) + + async def test_digest_message_format(self): + """Verify the digest summary message format.""" + message = "`Requests are hanging - 600s+ request time`\nRequest Model: `gemini-2.5-flash`\nAPI Base: `None`" + + await self.slack_alerting.send_alert( + message=message, + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + request_model="gemini-2.5-flash", + api_base="None", + ) + + # Backdate and flush + key = list(self.slack_alerting.digest_buckets.keys())[0] + self.slack_alerting.digest_buckets[key]["start_time"] = datetime.now() - timedelta(seconds=120) + + await self.slack_alerting._flush_digest_buckets() + + payload_text = self.slack_alerting.log_queue[0]["payload"]["text"] + self.assertIn("Alert type: `llm_requests_hanging` (Digest)", payload_text) + self.assertIn("Level: `Medium`", payload_text) + self.assertIn("Count: `1`", payload_text) + self.assertIn("`Requests are hanging - 600s+ request time`", payload_text) + + async def test_digest_without_model_groups_by_alert_type_only(self): + """When request_model is not provided, alerts group by alert type alone.""" + for _ in range(3): + await self.slack_alerting.send_alert( + message="Some hanging request", + level="Medium", + alert_type=AlertType.llm_requests_hanging, + alerting_metadata={}, + ) + + # All 3 should be in the same bucket (empty model and api_base) + self.assertEqual(len(self.slack_alerting.digest_buckets), 1) + bucket = list(self.slack_alerting.digest_buckets.values())[0] + self.assertEqual(bucket["count"], 3) + self.assertEqual(bucket["request_model"], "") + self.assertEqual(bucket["api_base"], "") + + +class TestAlertTypeConfig(unittest.TestCase): + """Test AlertTypeConfig model and initialization.""" + + def test_default_values(self): + config = AlertTypeConfig() + self.assertFalse(config.digest) + self.assertEqual(config.digest_interval, 86400) + + def test_custom_values(self): + config = AlertTypeConfig(digest=True, digest_interval=3600) + self.assertTrue(config.digest) + self.assertEqual(config.digest_interval, 3600) + + def test_slack_alerting_init_with_config(self): + sa = SlackAlerting( + alerting=["slack"], + alert_type_config={ + "llm_requests_hanging": {"digest": True, "digest_interval": 7200}, + "llm_too_slow": {"digest": True}, + }, + ) + self.assertIn("llm_requests_hanging", sa.alert_type_config) + self.assertIn("llm_too_slow", sa.alert_type_config) + self.assertTrue(sa.alert_type_config["llm_requests_hanging"].digest) + self.assertEqual(sa.alert_type_config["llm_requests_hanging"].digest_interval, 7200) + self.assertEqual(sa.alert_type_config["llm_too_slow"].digest_interval, 86400) + + def test_update_values_with_config(self): + sa = SlackAlerting(alerting=["slack"]) + self.assertEqual(len(sa.alert_type_config), 0) + + sa.update_values( + alert_type_config={"llm_exceptions": {"digest": True, "digest_interval": 1800}}, + ) + self.assertIn("llm_exceptions", sa.alert_type_config) + self.assertTrue(sa.alert_type_config["llm_exceptions"].digest) + + +if __name__ == "__main__": + unittest.main()