diff --git a/docs/my-website/docs/proxy/config_settings.md b/docs/my-website/docs/proxy/config_settings.md index 37e58d5540..8797e71da7 100644 --- a/docs/my-website/docs/proxy/config_settings.md +++ b/docs/my-website/docs/proxy/config_settings.md @@ -452,6 +452,8 @@ router_settings: | BERRISPEND_ACCOUNT_ID | Account ID for BerriSpend service | BRAINTRUST_API_KEY | API key for Braintrust integration | BRAINTRUST_API_BASE | Base URL for Braintrust API. Default is https://api.braintrustdata.com/v1 +| BRAINTRUST_MOCK | Enable mock mode for Braintrust integration testing. When set to true, intercepts Braintrust API calls and returns mock responses without making actual network calls. Default is false +| BRAINTRUST_MOCK_LATENCY_MS | Mock latency in milliseconds for Braintrust API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | CACHED_STREAMING_CHUNK_DELAY | Delay in seconds for cached streaming chunks. Default is 0.02 | CHATGPT_API_BASE | Base URL for ChatGPT API. Default is https://chatgpt.com/backend-api/codex | CHATGPT_AUTH_FILE | Filename for ChatGPT authentication data. Default is "auth.json" @@ -511,6 +513,8 @@ router_settings: | DD_ENV | Environment identifier for Datadog logs. Only supported for `datadog_llm_observability` callback | DD_SERVICE | Service identifier for Datadog logs. Defaults to "litellm-server" | DD_VERSION | Version identifier for Datadog logs. Defaults to "unknown" +| DATADOG_MOCK | Enable mock mode for Datadog integration testing. When set to true, intercepts Datadog API calls and returns mock responses without making actual network calls. Default is false +| DATADOG_MOCK_LATENCY_MS | Mock latency in milliseconds for Datadog API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | DEBUG_OTEL | Enable debug mode for OpenTelemetry | DEFAULT_ALLOWED_FAILS | Maximum failures allowed before cooling down a model. Default is 3 | DEFAULT_A2A_AGENT_TIMEOUT | Default timeout in seconds for A2A (Agent-to-Agent) protocol requests. Default is 6000 @@ -675,6 +679,8 @@ router_settings: | HCP_VAULT_CERT_ROLE | Role for [Hashicorp Vault Secret Manager Auth](../secret.md#hashicorp-vault) | HELICONE_API_KEY | API key for Helicone service | HELICONE_API_BASE | Base URL for Helicone service, defaults to `https://api.helicone.ai` +| HELICONE_MOCK | Enable mock mode for Helicone integration testing. When set to true, intercepts Helicone API calls and returns mock responses without making actual network calls. Default is false +| HELICONE_MOCK_LATENCY_MS | Mock latency in milliseconds for Helicone API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | HOSTNAME | Hostname for the server, this will be [emitted to `datadog` logs](https://docs.litellm.ai/docs/proxy/logging#datadog) | HOURS_IN_A_DAY | Hours in a day for calculation purposes. Default is 24 | HIDDENLAYER_API_BASE | Base URL for HiddenLayer API. Defaults to `https://api.hiddenlayer.ai` @@ -713,6 +719,8 @@ router_settings: | LANGSMITH_PROJECT | Project name for Langsmith integration | LANGSMITH_SAMPLING_RATE | Sampling rate for Langsmith logging | LANGSMITH_TENANT_ID | Tenant ID for Langsmith multi-tenant deployments +| LANGSMITH_MOCK | Enable mock mode for Langsmith integration testing. When set to true, intercepts Langsmith API calls and returns mock responses without making actual network calls. Default is false +| LANGSMITH_MOCK_LATENCY_MS | Mock latency in milliseconds for Langsmith API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | LANGTRACE_API_KEY | API key for Langtrace service | LASSO_API_BASE | Base URL for Lasso API | LASSO_API_KEY | API key for Lasso service @@ -845,6 +853,8 @@ router_settings: | POD_NAME | Pod name for the server, this will be [emitted to `datadog` logs](https://docs.litellm.ai/docs/proxy/logging#datadog) as `POD_NAME` | POSTHOG_API_KEY | API key for PostHog analytics integration | POSTHOG_API_URL | Base URL for PostHog API (defaults to https://us.i.posthog.com) +| POSTHOG_MOCK | Enable mock mode for PostHog integration testing. When set to true, intercepts PostHog API calls and returns mock responses without making actual network calls. Default is false +| POSTHOG_MOCK_LATENCY_MS | Mock latency in milliseconds for PostHog API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | PREDIBASE_API_BASE | Base URL for Predibase API | PRESIDIO_ANALYZER_API_BASE | Base URL for Presidio Analyzer service | PRESIDIO_ANONYMIZER_API_BASE | Base URL for Presidio Anonymizer service diff --git a/litellm/integrations/braintrust_logging.py b/litellm/integrations/braintrust_logging.py index 585de510e8..42e9680a7f 100644 --- a/litellm/integrations/braintrust_logging.py +++ b/litellm/integrations/braintrust_logging.py @@ -9,6 +9,10 @@ import litellm from litellm import verbose_logger +from litellm.integrations.braintrust_mock_client import ( + should_use_braintrust_mock, + create_mock_braintrust_client, +) from litellm.integrations.custom_logger import CustomLogger from litellm.llms.custom_httpx.http_handler import ( HTTPHandler, @@ -34,6 +38,10 @@ def __init__( self, api_key: Optional[str] = None, api_base: Optional[str] = None ) -> None: super().__init__() + self.is_mock_mode = should_use_braintrust_mock() + if self.is_mock_mode: + create_mock_braintrust_client() + verbose_logger.info("[BRAINTRUST MOCK] Braintrust logger initialized in mock mode") self.validate_environment(api_key=api_key) self.api_base = api_base or os.getenv("BRAINTRUST_API_BASE") or API_BASE self.default_project_id = None @@ -254,6 +262,8 @@ def log_success_event( # noqa: PLR0915 json={"events": [request_data]}, headers=self.headers, ) + if self.is_mock_mode: + print_verbose("[BRAINTRUST MOCK] Sync event successfully mocked") except httpx.HTTPStatusError as e: raise Exception(e.response.text) except Exception as e: @@ -399,6 +409,8 @@ async def async_log_success_event( # noqa: PLR0915 json={"events": [request_data]}, headers=self.headers, ) + if self.is_mock_mode: + print_verbose("[BRAINTRUST MOCK] Async event successfully mocked") except httpx.HTTPStatusError as e: raise Exception(e.response.text) except Exception as e: diff --git a/litellm/integrations/braintrust_mock_client.py b/litellm/integrations/braintrust_mock_client.py new file mode 100644 index 0000000000..030aa62cd0 --- /dev/null +++ b/litellm/integrations/braintrust_mock_client.py @@ -0,0 +1,131 @@ +""" +Mock HTTP client for Braintrust integration testing. + +This module intercepts Braintrust API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set BRAINTRUST_MOCK=true in environment variables or config to enable mock mode. +""" + +import os +import time +from urllib.parse import urlparse + +from litellm._logging import verbose_logger +from litellm.integrations.mock_client_factory import MockClientConfig, MockResponse, create_mock_client_factory + +# Use factory for should_use_mock and MockResponse +# Braintrust uses both HTTPHandler (sync) and AsyncHTTPHandler (async) +# Braintrust needs endpoint-specific responses, so we use custom HTTPHandler.post patching +_config = MockClientConfig( + "BRAINTRUST", + "BRAINTRUST_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"id": "mock-project-id", "status": "success"}, + url_matchers=[ + ".braintrustdata.com", + "braintrustdata.com", + ".braintrust.dev", + "braintrust.dev", + ], + patch_async_handler=True, # Patch AsyncHTTPHandler.post for async calls + patch_sync_client=False, # HTTPHandler uses self.client.send(), not self.client.post() + patch_http_handler=False, # We use custom patching for endpoint-specific responses +) + +# Get should_use_mock and create_mock_client from factory +# We need to call the factory's create_mock_client to patch AsyncHTTPHandler.post +create_mock_braintrust_factory_client, should_use_braintrust_mock = create_mock_client_factory(_config) + +# Store original HTTPHandler.post method (Braintrust-specific for sync calls with custom logic) +_original_http_handler_post = None +_mocks_initialized = False + +# Default mock latency in seconds +_MOCK_LATENCY_SECONDS = float(os.getenv("BRAINTRUST_MOCK_LATENCY_MS", "100")) / 1000.0 + + +def _is_braintrust_url(url: str) -> bool: + """Check if URL is a Braintrust API URL.""" + if not isinstance(url, str): + return False + + parsed = urlparse(url) + host = (parsed.hostname or "").lower() + + if not host: + return False + + return ( + host == "braintrustdata.com" + or host.endswith(".braintrustdata.com") + or host == "braintrust.dev" + or host.endswith(".braintrust.dev") + ) + + +def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None): + """Monkey-patched HTTPHandler.post that intercepts Braintrust calls with endpoint-specific responses.""" + # Only mock Braintrust API calls + if isinstance(url, str) and _is_braintrust_url(url): + verbose_logger.info(f"[BRAINTRUST MOCK] POST to {url}") + time.sleep(_MOCK_LATENCY_SECONDS) + # Return appropriate mock response based on endpoint + if "/project" in url: + # Project creation/retrieval/register endpoint + project_name = json.get("name", "litellm") if json else "litellm" + mock_data = {"id": f"mock-project-id-{project_name}", "name": project_name} + elif "/project_logs" in url: + # Log insertion endpoint + mock_data = {"status": "success"} + else: + mock_data = _config.default_json_data + return MockResponse( + status_code=_config.default_status_code, + json_data=mock_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_http_handler_post is not None: + return _original_http_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, files=files, content=content, logging_obj=logging_obj) + raise RuntimeError("Original HTTPHandler.post not available") + + +def create_mock_braintrust_client(): + """ + Monkey-patch HTTPHandler.post to intercept Braintrust sync calls. + + Braintrust uses HTTPHandler for sync calls and AsyncHTTPHandler for async calls. + HTTPHandler.post uses self.client.send(), not self.client.post(), so we need + custom patching for sync (similar to Helicone). + AsyncHTTPHandler.post is patched by the factory. + + We use custom patching instead of factory's patch_http_handler because we need + endpoint-specific responses (different for /project vs /project_logs). + + This function is idempotent - it only initializes mocks once, even if called multiple times. + """ + global _original_http_handler_post, _mocks_initialized + + if _mocks_initialized: + return + + verbose_logger.debug("[BRAINTRUST MOCK] Initializing Braintrust mock client...") + + from litellm.llms.custom_httpx.http_handler import HTTPHandler + + if _original_http_handler_post is None: + _original_http_handler_post = HTTPHandler.post + HTTPHandler.post = _mock_http_handler_post # type: ignore + verbose_logger.debug("[BRAINTRUST MOCK] Patched HTTPHandler.post") + + # CRITICAL: Call the factory's initialization function to patch AsyncHTTPHandler.post + # This is required for async calls to be mocked + create_mock_braintrust_factory_client() + + verbose_logger.debug(f"[BRAINTRUST MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug("[BRAINTRUST MOCK] Braintrust mock client initialization complete") + + _mocks_initialized = True diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index 735d1005d2..127b0e53fa 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -27,6 +27,10 @@ from litellm._logging import verbose_logger from litellm._uuid import uuid from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.datadog.datadog_mock_client import ( + should_use_datadog_mock, + create_mock_datadog_client, +) from litellm.integrations.datadog.datadog_handler import ( get_datadog_hostname, get_datadog_service, @@ -81,6 +85,12 @@ def __init__( """ try: verbose_logger.debug("Datadog: in init datadog logger") + + self.is_mock_mode = should_use_datadog_mock() + + if self.is_mock_mode: + create_mock_datadog_client() + verbose_logger.debug("[DATADOG MOCK] Datadog logger initialized in mock mode") ######################################################### # Handle datadog_params set as litellm.datadog_params @@ -220,6 +230,9 @@ async def async_send_batch(self): len(self.log_queue), self.intake_url, ) + + if self.is_mock_mode: + verbose_logger.debug("[DATADOG MOCK] Mock mode enabled - API calls will be intercepted") response = await self.async_send_compressed_data(self.log_queue) if response.status_code == 413: @@ -232,11 +245,16 @@ async def async_send_batch(self): f"Response from datadog API status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug( - "Datadog: Response from datadog API status_code: %s, text: %s", - response.status_code, - response.text, - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[DATADOG MOCK] Batch of {len(self.log_queue)} events successfully mocked" + ) + else: + verbose_logger.debug( + "Datadog: Response from datadog API status_code: %s, text: %s", + response.status_code, + response.text, + ) except Exception as e: verbose_logger.exception( f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" diff --git a/litellm/integrations/datadog/datadog_llm_obs.py b/litellm/integrations/datadog/datadog_llm_obs.py index 4f6a5b339a..076a147c60 100644 --- a/litellm/integrations/datadog/datadog_llm_obs.py +++ b/litellm/integrations/datadog/datadog_llm_obs.py @@ -18,6 +18,10 @@ import litellm from litellm._logging import verbose_logger from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.datadog.datadog_mock_client import ( + should_use_datadog_mock, + create_mock_datadog_client, +) from litellm.integrations.datadog.datadog_handler import ( get_datadog_service, get_datadog_tags, @@ -44,6 +48,19 @@ class DataDogLLMObsLogger(CustomBatchLogger): def __init__(self, **kwargs): try: verbose_logger.debug("DataDogLLMObs: Initializing logger") + + self.is_mock_mode = should_use_datadog_mock() + + if self.is_mock_mode: + create_mock_datadog_client() + verbose_logger.debug("[DATADOG MOCK] DataDogLLMObs logger initialized in mock mode") + + if os.getenv("DD_API_KEY", None) is None: + raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'") + if os.getenv("DD_SITE", None) is None: + raise Exception( + "DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`" + ) # Configure DataDog endpoint (Agent or Direct API) # Use LITELLM_DD_AGENT_HOST to avoid conflicts with ddtrace's DD_AGENT_HOST dd_agent_host = os.getenv("LITELLM_DD_AGENT_HOST") @@ -170,6 +187,9 @@ async def async_send_batch(self): verbose_logger.debug( f"DataDogLLMObs: Flushing {len(self.log_queue)} events" ) + + if self.is_mock_mode: + verbose_logger.debug("[DATADOG MOCK] Mock mode enabled - API calls will be intercepted") # Prepare the payload payload = { @@ -210,9 +230,14 @@ async def async_send_batch(self): f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug( - f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[DATADOG MOCK] Batch of {len(self.log_queue)} events successfully mocked" + ) + else: + verbose_logger.debug( + f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" + ) self.log_queue.clear() except httpx.HTTPStatusError as e: verbose_logger.exception( diff --git a/litellm/integrations/datadog/datadog_mock_client.py b/litellm/integrations/datadog/datadog_mock_client.py new file mode 100644 index 0000000000..a0a760deb0 --- /dev/null +++ b/litellm/integrations/datadog/datadog_mock_client.py @@ -0,0 +1,28 @@ +""" +Mock client for Datadog integration testing. + +This module intercepts Datadog API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set DATADOG_MOCK=true in environment variables or config to enable mock mode. +""" + +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="DATADOG", + env_var="DATADOG_MOCK", + default_latency_ms=100, + default_status_code=202, + default_json_data={"status": "ok"}, + url_matchers=[ + ".datadoghq.com", + "datadoghq.com", + ], + patch_async_handler=True, + patch_sync_client=True, +) + +create_mock_datadog_client, should_use_datadog_mock = create_mock_client_factory(_config) diff --git a/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py b/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py index 6201dc343d..2d14f5eb96 100644 --- a/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py +++ b/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py @@ -8,20 +8,28 @@ Set GCS_MOCK=true in environment variables or config to enable mock mode. """ -import httpx -import json import asyncio -from datetime import timedelta -from typing import Dict, Optional from litellm._logging import verbose_logger - -# Store original methods for restoration -_original_async_handler_post = None +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory, MockResponse + +# Use factory for POST handler +_config = MockClientConfig( + name="GCS", + env_var="GCS_MOCK", + default_latency_ms=150, + default_status_code=200, + default_json_data={"kind": "storage#object", "name": "mock-object"}, + url_matchers=["storage.googleapis.com"], + patch_async_handler=True, + patch_sync_client=False, +) + +_create_mock_gcs_post, should_use_gcs_mock = create_mock_client_factory(_config) + +# Store original methods for GET/DELETE (GCS-specific) _original_async_handler_get = None _original_async_handler_delete = None - -# Track if mocks have been initialized to avoid duplicate initialization _mocks_initialized = False # Default mock latency in seconds (simulates network round-trip) @@ -29,84 +37,59 @@ _MOCK_LATENCY_SECONDS = float(__import__("os").getenv("GCS_MOCK_LATENCY_MS", "150")) / 1000.0 -class MockGCSResponse: - """Mock httpx.Response that satisfies GCS API requirements.""" - - def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): - self.status_code = status_code - self._json_data = json_data or {"kind": "storage#object", "name": "mock-object"} - self.headers = httpx.Headers({}) - self.is_success = status_code < 400 - self.is_error = status_code >= 400 - self.is_redirect = 300 <= status_code < 400 - self.url = httpx.URL(url) if url else httpx.URL("") - # Set realistic elapsed time based on mock latency - elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS - self.elapsed = timedelta(seconds=elapsed_time) - self._text = json.dumps(self._json_data) - self._content = self._text.encode("utf-8") - - @property - def text(self) -> str: - """Return response text.""" - return self._text - - @property - def content(self) -> bytes: - """Return response content.""" - return self._content - - def json(self) -> Dict: - """Return JSON response data.""" - return self._json_data - - def read(self) -> bytes: - """Read response content.""" - return self._content - - def raise_for_status(self): - """Raise exception for error status codes.""" - if self.status_code >= 400: - raise Exception(f"HTTP {self.status_code}") - - -async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): - """Monkey-patched AsyncHTTPHandler.post that intercepts GCS calls.""" - # Only mock GCS API calls - if isinstance(url, str) and "storage.googleapis.com" in url: - verbose_logger.info(f"[GCS MOCK] POST to {url}") - # Simulate network latency - await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockGCSResponse( - status_code=200, - json_data={"kind": "storage#object", "name": "mock-object"}, - url=url, - elapsed_seconds=_MOCK_LATENCY_SECONDS - ) - # For non-GCS calls, use original method - if _original_async_handler_post is not None: - return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) - # Fallback: if original not set, raise error - raise RuntimeError("Original AsyncHTTPHandler.post not available") - - async def _mock_async_handler_get(self, url, params=None, headers=None, follow_redirects=None): """Monkey-patched AsyncHTTPHandler.get that intercepts GCS calls.""" # Only mock GCS API calls if isinstance(url, str) and "storage.googleapis.com" in url: verbose_logger.info(f"[GCS MOCK] GET to {url}") - # Simulate network latency await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockGCSResponse( - status_code=200, - json_data={"data": "mock-log-data"}, + # Return a minimal but valid StandardLoggingPayload JSON string as bytes + # This matches what GCS returns when downloading with ?alt=media + mock_payload = { + "id": "mock-request-id", + "trace_id": "mock-trace-id", + "call_type": "completion", + "stream": False, + "response_cost": 0.0, + "status": "success", + "status_fields": {"llm_api_status": "success"}, + "custom_llm_provider": "mock", + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "startTime": 0.0, + "endTime": 0.0, + "completionStartTime": 0.0, + "response_time": 0.0, + "model_map_information": {"model": "mock-model"}, + "model": "mock-model", + "model_id": None, + "model_group": None, + "api_base": "https://api.mock.com", + "metadata": {}, + "cache_hit": None, + "cache_key": None, + "saved_cache_cost": 0.0, + "request_tags": [], + "end_user": None, + "requester_ip_address": None, + "messages": None, + "response": None, + "error_str": None, + "error_information": None, + "model_parameters": {}, + "hidden_params": {}, + "guardrail_information": None, + "standard_built_in_tools_params": None, + } + return MockResponse( + status_code=200, + json_data=mock_payload, url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS ) - # For non-GCS calls, use original method if _original_async_handler_get is not None: return await _original_async_handler_get(self, url=url, params=params, headers=headers, follow_redirects=follow_redirects) - # Fallback: if original not set, raise error raise RuntimeError("Original AsyncHTTPHandler.get not available") @@ -115,18 +98,16 @@ async def _mock_async_handler_delete(self, url, data=None, json=None, params=Non # Only mock GCS API calls if isinstance(url, str) and "storage.googleapis.com" in url: verbose_logger.info(f"[GCS MOCK] DELETE to {url}") - # Simulate network latency await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockGCSResponse( - status_code=204, - json_data={}, + # DELETE returns 204 No Content with empty body (not JSON) + return MockResponse( + status_code=204, + json_data=None, # Empty body for DELETE url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS ) - # For non-GCS calls, use original method if _original_async_handler_delete is not None: return await _original_async_handler_delete(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, content=content) - # Fallback: if original not set, raise error raise RuntimeError("Original AsyncHTTPHandler.delete not available") @@ -139,30 +120,26 @@ def create_mock_gcs_client(): This function is idempotent - it only initializes mocks once, even if called multiple times. """ - global _original_async_handler_post, _original_async_handler_get, _original_async_handler_delete - global _mocks_initialized + global _original_async_handler_get, _original_async_handler_delete, _mocks_initialized - # If already initialized, skip + # Use factory for POST handler + _create_mock_gcs_post() + + # If already initialized, skip GET/DELETE patching if _mocks_initialized: return - verbose_logger.debug("[GCS MOCK] Initializing GCS mock client...") + verbose_logger.debug("[GCS MOCK] Initializing GCS GET/DELETE handlers...") - # Patch AsyncHTTPHandler methods (used by LiteLLM's custom httpx handler) - if _original_async_handler_post is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler - _original_async_handler_post = AsyncHTTPHandler.post - AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore - verbose_logger.debug("[GCS MOCK] Patched AsyncHTTPHandler.post") + # Patch GET and DELETE handlers (GCS-specific) + from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler if _original_async_handler_get is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler _original_async_handler_get = AsyncHTTPHandler.get AsyncHTTPHandler.get = _mock_async_handler_get # type: ignore verbose_logger.debug("[GCS MOCK] Patched AsyncHTTPHandler.get") if _original_async_handler_delete is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler _original_async_handler_delete = AsyncHTTPHandler.delete AsyncHTTPHandler.delete = _mock_async_handler_delete # type: ignore verbose_logger.debug("[GCS MOCK] Patched AsyncHTTPHandler.delete") @@ -212,25 +189,4 @@ def _mock_get_token_and_url(self, model, auth_header, vertex_credentials, vertex verbose_logger.debug("[GCS MOCK] Patched Vertex AI auth methods") -def should_use_gcs_mock() -> bool: - """ - Determine if GCS should run in mock mode. - - Checks the GCS_MOCK environment variable. - - Returns: - bool: True if mock mode should be enabled - """ - import os - from litellm.secret_managers.main import str_to_bool - - mock_mode = os.getenv("GCS_MOCK", "false") - result = str_to_bool(mock_mode) - - # Ensure we return a bool, not None - result = bool(result) if result is not None else False - - if result: - verbose_logger.info("GCS Mock Mode: ENABLED - API calls will be mocked") - - return result +# should_use_gcs_mock is already created by the factory diff --git a/litellm/integrations/helicone.py b/litellm/integrations/helicone.py index 198cbaf405..b996813b4e 100644 --- a/litellm/integrations/helicone.py +++ b/litellm/integrations/helicone.py @@ -4,6 +4,11 @@ import traceback import litellm +from litellm._logging import verbose_logger +from litellm.integrations.helicone_mock_client import ( + should_use_helicone_mock, + create_mock_helicone_client, +) class HeliconeLogger: @@ -22,6 +27,11 @@ class HeliconeLogger: def __init__(self): # Instance variables + self.is_mock_mode = should_use_helicone_mock() + if self.is_mock_mode: + create_mock_helicone_client() + verbose_logger.info("[HELICONE MOCK] Helicone logger initialized in mock mode") + self.provider_url = "https://api.openai.com/v1" self.key = os.getenv("HELICONE_API_KEY") self.api_base = os.getenv("HELICONE_API_BASE") or "https://api.hconeai.com" @@ -185,7 +195,10 @@ def log_success( } response = litellm.module_level_client.post(url, headers=headers, json=data) if response.status_code == 200: - print_verbose("Helicone Logging - Success!") + if self.is_mock_mode: + print_verbose("[HELICONE MOCK] Helicone Logging - Successfully mocked!") + else: + print_verbose("Helicone Logging - Success!") else: print_verbose( f"Helicone Logging - Error Request was not successful. Status Code: {response.status_code}" diff --git a/litellm/integrations/helicone_mock_client.py b/litellm/integrations/helicone_mock_client.py new file mode 100644 index 0000000000..0f4670a1d2 --- /dev/null +++ b/litellm/integrations/helicone_mock_client.py @@ -0,0 +1,32 @@ +""" +Mock HTTP client for Helicone integration testing. + +This module intercepts Helicone API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set HELICONE_MOCK=true in environment variables or config to enable mock mode. +""" + +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +# Helicone uses HTTPHandler which internally uses httpx.Client.send(), not httpx.Client.post() +_config = MockClientConfig( + name="HELICONE", + env_var="HELICONE_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success"}, + url_matchers=[ + ".hconeai.com", + "hconeai.com", + ".helicone.ai", + "helicone.ai", + ], + patch_async_handler=False, + patch_sync_client=False, # HTTPHandler uses self.client.send(), not self.client.post() + patch_http_handler=True, # Patch HTTPHandler.post directly +) + +create_mock_helicone_client, should_use_helicone_mock = create_mock_client_factory(_config) diff --git a/litellm/integrations/langfuse/langfuse_mock_client.py b/litellm/integrations/langfuse/langfuse_mock_client.py index 1dc739ea32..8ed6cff8d4 100644 --- a/litellm/integrations/langfuse/langfuse_mock_client.py +++ b/litellm/integrations/langfuse/langfuse_mock_client.py @@ -9,113 +9,27 @@ """ import httpx -import json -from datetime import timedelta -from typing import Dict, Optional - -from litellm._logging import verbose_logger - -_original_httpx_post = None - -# Default mock latency in seconds (simulates network round-trip) -# Typical Langfuse API calls take 50-150ms -_MOCK_LATENCY_SECONDS = float(__import__("os").getenv("LANGFUSE_MOCK_LATENCY_MS", "100")) / 1000.0 - - -class MockLangfuseResponse: - """Mock httpx.Response that satisfies Langfuse SDK requirements.""" - - def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): - self.status_code = status_code - self._json_data = json_data or {"status": "success"} - self.headers = httpx.Headers({}) - self.is_success = status_code < 400 - self.is_error = status_code >= 400 - self.is_redirect = 300 <= status_code < 400 - self.url = httpx.URL(url) if url else httpx.URL("") - # Set realistic elapsed time based on mock latency - elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS - self.elapsed = timedelta(seconds=elapsed_time) - self._text = json.dumps(self._json_data) - self._content = self._text.encode("utf-8") - - @property - def text(self) -> str: - return self._text - - @property - def content(self) -> bytes: - return self._content - - def json(self) -> Dict: - return self._json_data - - def read(self) -> bytes: - return self._content - - def raise_for_status(self): - if self.status_code >= 400: - raise Exception(f"HTTP {self.status_code}") - - -def _is_langfuse_url(url) -> bool: - """Check if URL is a Langfuse domain.""" - try: - parsed_url = httpx.URL(url) if isinstance(url, str) else url - hostname = parsed_url.host or "" - - return ( - hostname.endswith(".langfuse.com") or - hostname == "langfuse.com" or - (hostname in ("localhost", "127.0.0.1") and "langfuse" in str(parsed_url).lower()) - ) - except Exception: - return False - - -def _mock_httpx_post(self, url, **kwargs): - """Monkey-patched httpx.Client.post that intercepts Langfuse calls.""" - if _is_langfuse_url(url): - verbose_logger.info(f"[LANGFUSE MOCK] POST to {url}") - return MockLangfuseResponse(status_code=200, json_data={"status": "success"}, url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS) - - if _original_httpx_post is not None: - return _original_httpx_post(self, url, **kwargs) - - +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="LANGFUSE", + env_var="LANGFUSE_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success"}, + url_matchers=[ + ".langfuse.com", + "langfuse.com", + ], + patch_async_handler=False, + patch_sync_client=True, +) + +_create_mock_langfuse_client_internal, should_use_langfuse_mock = create_mock_client_factory(_config) + +# Langfuse needs to return an httpx.Client instance def create_mock_langfuse_client(): - """ - Monkey-patch httpx.Client.post to intercept Langfuse calls. - - Returns a real httpx.Client instance - the monkey-patch intercepts all calls. - """ - global _original_httpx_post - - if _original_httpx_post is None: - _original_httpx_post = httpx.Client.post - httpx.Client.post = _mock_httpx_post # type: ignore - verbose_logger.debug("[LANGFUSE MOCK] Patched httpx.Client.post") - + """Create and return an httpx.Client instance - the monkey-patch intercepts all calls.""" + _create_mock_langfuse_client_internal() return httpx.Client() - - -def should_use_langfuse_mock() -> bool: - """ - Determine if Langfuse should run in mock mode. - - Checks the LANGFUSE_MOCK environment variable. - - Returns: - bool: True if mock mode should be enabled - """ - import os - from litellm.secret_managers.main import str_to_bool - - mock_mode = os.getenv("LANGFUSE_MOCK", "false") - result = str_to_bool(mock_mode) - result = bool(result) if result is not None else False - - if result: - verbose_logger.info("Langfuse Mock Mode: ENABLED - API calls will be mocked") - - return result diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 5893f14105..ebd005f880 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -15,6 +15,10 @@ import litellm from litellm._logging import verbose_logger from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.langsmith_mock_client import ( + should_use_langsmith_mock, + create_mock_langsmith_client, +) from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, @@ -45,6 +49,12 @@ def __init__( ): self.flush_lock = asyncio.Lock() super().__init__(**kwargs, flush_lock=self.flush_lock) + self.is_mock_mode = should_use_langsmith_mock() + + if self.is_mock_mode: + create_mock_langsmith_client() + verbose_logger.debug("[LANGSMITH MOCK] LangSmith logger initialized in mock mode") + self.default_credentials = self.get_credentials_from_env( langsmith_api_key=langsmith_api_key, langsmith_project=langsmith_project, @@ -388,6 +398,8 @@ async def _log_batch_on_langsmith( verbose_logger.debug( "Sending batch of %s runs to Langsmith", len(elements_to_log) ) + if self.is_mock_mode: + verbose_logger.debug("[LANGSMITH MOCK] Mock mode enabled - API calls will be intercepted") response = await self.async_httpx_client.post( url=url, json={"post": elements_to_log}, @@ -400,9 +412,14 @@ async def _log_batch_on_langsmith( f"Langsmith Error: {response.status_code} - {response.text}" ) else: - verbose_logger.debug( - f"Batch of {len(self.log_queue)} runs successfully created" - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[LANGSMITH MOCK] Batch of {len(elements_to_log)} runs successfully mocked" + ) + else: + verbose_logger.debug( + f"Batch of {len(self.log_queue)} runs successfully created" + ) except httpx.HTTPStatusError as e: verbose_logger.exception( f"Langsmith HTTP Error: {e.response.status_code} - {e.response.text}" diff --git a/litellm/integrations/langsmith_mock_client.py b/litellm/integrations/langsmith_mock_client.py new file mode 100644 index 0000000000..ef60290823 --- /dev/null +++ b/litellm/integrations/langsmith_mock_client.py @@ -0,0 +1,29 @@ +""" +Mock client for LangSmith integration testing. + +This module intercepts LangSmith API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set LANGSMITH_MOCK=true in environment variables or config to enable mock mode. +""" + +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="LANGSMITH", + env_var="LANGSMITH_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success", "ids": ["mock-run-id"]}, + url_matchers=[ + ".smith.langchain.com", + "api.smith.langchain.com", + "smith.langchain.com", + ], + patch_async_handler=True, + patch_sync_client=False, +) + +create_mock_langsmith_client, should_use_langsmith_mock = create_mock_client_factory(_config) diff --git a/litellm/integrations/mock_client_factory.py b/litellm/integrations/mock_client_factory.py new file mode 100644 index 0000000000..2f04fae9f7 --- /dev/null +++ b/litellm/integrations/mock_client_factory.py @@ -0,0 +1,216 @@ +""" +Factory for creating mock HTTP clients for integration testing. + +This module provides a simple factory pattern to create mock clients that intercept +API calls and return successful mock responses, allowing full code execution without +making actual network calls. +""" + +import httpx +import json +import asyncio +from datetime import timedelta +from typing import Dict, Optional, List, cast +from dataclasses import dataclass + +from litellm._logging import verbose_logger + + +@dataclass +class MockClientConfig: + """Configuration for creating a mock client.""" + name: str # e.g., "GCS", "LANGFUSE", "LANGSMITH", "DATADOG" + env_var: str # e.g., "GCS_MOCK", "LANGFUSE_MOCK" + default_latency_ms: int = 100 # Default mock latency in milliseconds + default_status_code: int = 200 # Default HTTP status code + default_json_data: Optional[Dict] = None # Default JSON response data + url_matchers: Optional[List[str]] = None # List of strings to match in URLs (e.g., ["storage.googleapis.com"]) + patch_async_handler: bool = True # Whether to patch AsyncHTTPHandler.post + patch_sync_client: bool = False # Whether to patch httpx.Client.post + patch_http_handler: bool = False # Whether to patch HTTPHandler.post (for sync calls that use HTTPHandler) + + def __post_init__(self): + """Ensure url_matchers is a list.""" + if self.url_matchers is None: + self.url_matchers = [] + + +class MockResponse: + """Generic mock httpx.Response that satisfies API requirements.""" + + def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): + self.status_code = status_code + self._json_data = json_data or {"status": "success"} + self.headers = httpx.Headers({}) + self.is_success = status_code < 400 + self.is_error = status_code >= 400 + self.is_redirect = 300 <= status_code < 400 + self.url = httpx.URL(url) if url else httpx.URL("") + self.elapsed = timedelta(seconds=elapsed_seconds) + self._text = json.dumps(self._json_data) if json_data else "" + self._content = self._text.encode("utf-8") + + @property + def text(self) -> str: + """Return response text.""" + return self._text + + @property + def content(self) -> bytes: + """Return response content.""" + return self._content + + def json(self) -> Dict: + """Return JSON response data.""" + return self._json_data + + def read(self) -> bytes: + """Read response content.""" + return self._content + + def raise_for_status(self): + """Raise exception for error status codes.""" + if self.status_code >= 400: + raise Exception(f"HTTP {self.status_code}") + + +def _is_url_match(url, matchers: List[str]) -> bool: + """Check if URL matches any of the provided matchers.""" + try: + parsed_url = httpx.URL(url) if isinstance(url, str) else url + url_str = str(parsed_url).lower() + hostname = parsed_url.host or "" + + for matcher in matchers: + if matcher.lower() in url_str or matcher.lower() in hostname.lower(): + return True + + # Also check for localhost with matcher in path + if hostname in ("localhost", "127.0.0.1"): + for matcher in matchers: + if matcher.lower() in url_str: + return True + + return False + except Exception: + return False + + +def create_mock_client_factory(config: MockClientConfig): # noqa: PLR0915 + """ + Factory function that creates mock client functions based on configuration. + + Returns: + tuple: (create_mock_client_func, should_use_mock_func) + """ + # Store original methods for restoration + _original_async_handler_post = None + _original_sync_client_post = None + _original_http_handler_post = None + _mocks_initialized = False + + # Calculate mock latency + import os + latency_env = f"{config.name.upper()}_MOCK_LATENCY_MS" + _MOCK_LATENCY_SECONDS = float(os.getenv(latency_env, str(config.default_latency_ms))) / 1000.0 + + # Create URL matcher function + def _is_mock_url(url) -> bool: + # url_matchers is guaranteed to be a list after __post_init__ + return _is_url_match(url, cast(List[str], config.url_matchers)) + + # Create async handler mock + async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): + """Monkey-patched AsyncHTTPHandler.post that intercepts API calls.""" + if isinstance(url, str) and _is_mock_url(url): + verbose_logger.info(f"[{config.name} MOCK] POST to {url}") + await asyncio.sleep(_MOCK_LATENCY_SECONDS) + return MockResponse( + status_code=config.default_status_code, + json_data=config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_async_handler_post is not None: + return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) + raise RuntimeError("Original AsyncHTTPHandler.post not available") + + # Create sync client mock + def _mock_sync_client_post(self, url, **kwargs): + """Monkey-patched httpx.Client.post that intercepts API calls.""" + if _is_mock_url(url): + verbose_logger.info(f"[{config.name} MOCK] POST to {url} (sync)") + return MockResponse( + status_code=config.default_status_code, + json_data=config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_sync_client_post is not None: + return _original_sync_client_post(self, url, **kwargs) + + # Create HTTPHandler mock (for sync calls that use HTTPHandler.post) + def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None): + """Monkey-patched HTTPHandler.post that intercepts API calls.""" + if isinstance(url, str) and _is_mock_url(url): + verbose_logger.info(f"[{config.name} MOCK] POST to {url}") + import time + time.sleep(_MOCK_LATENCY_SECONDS) + return MockResponse( + status_code=config.default_status_code, + json_data=config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_http_handler_post is not None: + return _original_http_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, files=files, content=content, logging_obj=logging_obj) + raise RuntimeError("Original HTTPHandler.post not available") + + # Create mock client initialization function + def create_mock_client(): + """Initialize the mock client by patching HTTP handlers.""" + nonlocal _original_async_handler_post, _original_sync_client_post, _original_http_handler_post, _mocks_initialized + + if _mocks_initialized: + return + + verbose_logger.debug(f"[{config.name} MOCK] Initializing {config.name} mock client...") + + if config.patch_async_handler and _original_async_handler_post is None: + from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler + _original_async_handler_post = AsyncHTTPHandler.post + AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore + verbose_logger.debug(f"[{config.name} MOCK] Patched AsyncHTTPHandler.post") + + if config.patch_sync_client and _original_sync_client_post is None: + _original_sync_client_post = httpx.Client.post + httpx.Client.post = _mock_sync_client_post # type: ignore + verbose_logger.debug(f"[{config.name} MOCK] Patched httpx.Client.post") + + if config.patch_http_handler and _original_http_handler_post is None: + from litellm.llms.custom_httpx.http_handler import HTTPHandler + _original_http_handler_post = HTTPHandler.post + HTTPHandler.post = _mock_http_handler_post # type: ignore + verbose_logger.debug(f"[{config.name} MOCK] Patched HTTPHandler.post") + + verbose_logger.debug(f"[{config.name} MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug(f"[{config.name} MOCK] {config.name} mock client initialization complete") + + _mocks_initialized = True + + # Create should_use_mock function + def should_use_mock() -> bool: + """Determine if mock mode should be enabled.""" + import os + from litellm.secret_managers.main import str_to_bool + + mock_mode = os.getenv(config.env_var, "false") + result = str_to_bool(mock_mode) + result = bool(result) if result is not None else False + + if result: + verbose_logger.info(f"{config.name} Mock Mode: ENABLED - API calls will be mocked") + + return result + + return create_mock_client, should_use_mock diff --git a/litellm/integrations/posthog.py b/litellm/integrations/posthog.py index 468b1a441f..dd7c3627b8 100644 --- a/litellm/integrations/posthog.py +++ b/litellm/integrations/posthog.py @@ -17,6 +17,10 @@ from litellm._logging import verbose_logger from litellm._uuid import uuid from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.posthog_mock_client import ( + should_use_posthog_mock, + create_mock_posthog_client, +) from litellm.llms.custom_httpx.http_handler import ( _get_httpx_client, get_async_httpx_client, @@ -40,6 +44,12 @@ def __init__(self, **kwargs): """ try: verbose_logger.debug("PostHog: in init posthog logger") + + self.is_mock_mode = should_use_posthog_mock() + if self.is_mock_mode: + create_mock_posthog_client() + verbose_logger.debug("[POSTHOG MOCK] PostHog logger initialized in mock mode") + if os.getenv("POSTHOG_API_KEY", None) is None: raise Exception("POSTHOG_API_KEY is not set, set 'POSTHOG_API_KEY=<>'") @@ -100,7 +110,10 @@ def log_success_event(self, kwargs, response_obj, start_time, end_time): f"Response from PostHog API status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug("PostHog: Sync event successfully sent") + if self.is_mock_mode: + verbose_logger.debug("[POSTHOG MOCK] Sync event successfully mocked") + else: + verbose_logger.debug("PostHog: Sync event successfully sent") except Exception as e: verbose_logger.exception(f"PostHog Sync Layer Error - {str(e)}") @@ -320,6 +333,9 @@ async def async_send_batch(self): verbose_logger.debug( f"PostHog: Sending batch of {len(self.log_queue)} events" ) + + if self.is_mock_mode: + verbose_logger.debug("[POSTHOG MOCK] Mock mode enabled - API calls will be intercepted") # Group events by credentials for batch sending batches_by_credentials: Dict[tuple[str, str], list] = {} @@ -350,9 +366,12 @@ async def async_send_batch(self): f"Response from PostHog API status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug( - f"PostHog: Batch of {len(self.log_queue)} events successfully sent" - ) + if self.is_mock_mode: + verbose_logger.debug(f"[POSTHOG MOCK] Batch of {len(self.log_queue)} events successfully mocked") + else: + verbose_logger.debug( + f"PostHog: Batch of {len(self.log_queue)} events successfully sent" + ) except Exception as e: verbose_logger.exception(f"PostHog Error sending batch API - {str(e)}") @@ -429,9 +448,14 @@ def _flush_on_exit(self): f"PostHog: Failed to flush on exit - status {response.status_code}" ) - verbose_logger.debug( - f"PostHog: Successfully flushed {len(self.log_queue)} events on exit" - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[POSTHOG MOCK] Successfully flushed {len(self.log_queue)} events on exit" + ) + else: + verbose_logger.debug( + f"PostHog: Successfully flushed {len(self.log_queue)} events on exit" + ) self.log_queue.clear() except Exception as e: diff --git a/litellm/integrations/posthog_mock_client.py b/litellm/integrations/posthog_mock_client.py new file mode 100644 index 0000000000..b713587ed6 --- /dev/null +++ b/litellm/integrations/posthog_mock_client.py @@ -0,0 +1,30 @@ +""" +Mock httpx client for PostHog integration testing. + +This module intercepts PostHog API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set POSTHOG_MOCK=true in environment variables or config to enable mock mode. +""" + +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="POSTHOG", + env_var="POSTHOG_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success"}, + url_matchers=[ + ".posthog.com", + "posthog.com", + "us.i.posthog.com", + "app.posthog.com", + ], + patch_async_handler=True, + patch_sync_client=True, +) + +create_mock_posthog_client, should_use_posthog_mock = create_mock_client_factory(_config)