From dbbd400b213fa0e2512bdcaa2ebcffb493c96c2c Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 10:58:12 -0800 Subject: [PATCH 1/9] Add LangSmith mock client support - Create langsmith_mock_client.py following GCS and Langfuse patterns - Add mock mode detection via LANGSMITH_MOCK environment variable - Intercept LangSmith API calls via AsyncHTTPHandler.post patching - Add verbose logging throughout mock implementation - Update LangsmithLogger to initialize mock client when mock mode enabled - Supports configurable mock latency via LANGSMITH_MOCK_LATENCY_MS --- litellm/integrations/langsmith.py | 23 ++- litellm/integrations/langsmith_mock_client.py | 157 ++++++++++++++++++ 2 files changed, 177 insertions(+), 3 deletions(-) create mode 100644 litellm/integrations/langsmith_mock_client.py diff --git a/litellm/integrations/langsmith.py b/litellm/integrations/langsmith.py index 5893f14105d..ebd005f8804 100644 --- a/litellm/integrations/langsmith.py +++ b/litellm/integrations/langsmith.py @@ -15,6 +15,10 @@ import litellm from litellm._logging import verbose_logger from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.langsmith_mock_client import ( + should_use_langsmith_mock, + create_mock_langsmith_client, +) from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, @@ -45,6 +49,12 @@ def __init__( ): self.flush_lock = asyncio.Lock() super().__init__(**kwargs, flush_lock=self.flush_lock) + self.is_mock_mode = should_use_langsmith_mock() + + if self.is_mock_mode: + create_mock_langsmith_client() + verbose_logger.debug("[LANGSMITH MOCK] LangSmith logger initialized in mock mode") + self.default_credentials = self.get_credentials_from_env( langsmith_api_key=langsmith_api_key, langsmith_project=langsmith_project, @@ -388,6 +398,8 @@ async def _log_batch_on_langsmith( verbose_logger.debug( "Sending batch of %s runs to Langsmith", len(elements_to_log) ) + if self.is_mock_mode: + verbose_logger.debug("[LANGSMITH MOCK] Mock mode enabled - API calls will be intercepted") response = await self.async_httpx_client.post( url=url, json={"post": elements_to_log}, @@ -400,9 +412,14 @@ async def _log_batch_on_langsmith( f"Langsmith Error: {response.status_code} - {response.text}" ) else: - verbose_logger.debug( - f"Batch of {len(self.log_queue)} runs successfully created" - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[LANGSMITH MOCK] Batch of {len(elements_to_log)} runs successfully mocked" + ) + else: + verbose_logger.debug( + f"Batch of {len(self.log_queue)} runs successfully created" + ) except httpx.HTTPStatusError as e: verbose_logger.exception( f"Langsmith HTTP Error: {e.response.status_code} - {e.response.text}" diff --git a/litellm/integrations/langsmith_mock_client.py b/litellm/integrations/langsmith_mock_client.py new file mode 100644 index 00000000000..a23d0b8bf94 --- /dev/null +++ b/litellm/integrations/langsmith_mock_client.py @@ -0,0 +1,157 @@ +""" +Mock client for LangSmith integration testing. + +This module intercepts LangSmith API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set LANGSMITH_MOCK=true in environment variables or config to enable mock mode. +""" + +import httpx +import json +import asyncio +from datetime import timedelta +from typing import Dict, Optional + +from litellm._logging import verbose_logger + +# Store original methods for restoration +_original_async_handler_post = None + +# Track if mocks have been initialized to avoid duplicate initialization +_mocks_initialized = False + +# Default mock latency in seconds (simulates network round-trip) +# Typical LangSmith API calls take 50-150ms +_MOCK_LATENCY_SECONDS = float(__import__("os").getenv("LANGSMITH_MOCK_LATENCY_MS", "100")) / 1000.0 + + +class MockLangsmithResponse: + """Mock httpx.Response that satisfies LangSmith API requirements.""" + + def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): + self.status_code = status_code + self._json_data = json_data or {"status": "success"} + self.headers = httpx.Headers({}) + self.is_success = status_code < 400 + self.is_error = status_code >= 400 + self.is_redirect = 300 <= status_code < 400 + self.url = httpx.URL(url) if url else httpx.URL("") + # Set realistic elapsed time based on mock latency + elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS + self.elapsed = timedelta(seconds=elapsed_time) + self._text = json.dumps(self._json_data) + self._content = self._text.encode("utf-8") + + @property + def text(self) -> str: + """Return response text.""" + return self._text + + @property + def content(self) -> bytes: + """Return response content.""" + return self._content + + def json(self) -> Dict: + """Return JSON response data.""" + return self._json_data + + def read(self) -> bytes: + """Read response content.""" + return self._content + + def raise_for_status(self): + """Raise exception for error status codes.""" + if self.status_code >= 400: + raise Exception(f"HTTP {self.status_code}") + + +def _is_langsmith_url(url) -> bool: + """Check if URL is a LangSmith domain.""" + try: + parsed_url = httpx.URL(url) if isinstance(url, str) else url + hostname = parsed_url.host or "" + + return ( + hostname.endswith(".smith.langchain.com") or + hostname == "api.smith.langchain.com" or + "smith.langchain.com" in hostname or + (hostname in ("localhost", "127.0.0.1") and "langsmith" in str(parsed_url).lower()) + ) + except Exception: + return False + + +async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): + """Monkey-patched AsyncHTTPHandler.post that intercepts LangSmith calls.""" + # Only mock LangSmith API calls + if isinstance(url, str) and _is_langsmith_url(url): + verbose_logger.info(f"[LANGSMITH MOCK] POST to {url}") + # Simulate network latency + await asyncio.sleep(_MOCK_LATENCY_SECONDS) + return MockLangsmithResponse( + status_code=200, + json_data={"status": "success", "ids": ["mock-run-id"]}, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + # For non-LangSmith calls, use original method + if _original_async_handler_post is not None: + return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) + # Fallback: if original not set, raise error + raise RuntimeError("Original AsyncHTTPHandler.post not available") + + +def create_mock_langsmith_client(): + """ + Monkey-patch AsyncHTTPHandler.post to intercept LangSmith calls. + + AsyncHTTPHandler is used by LiteLLM's get_async_httpx_client() which is what + LangsmithLogger uses for making API calls. + + This function is idempotent - it only initializes mocks once, even if called multiple times. + """ + global _original_async_handler_post + global _mocks_initialized + + # If already initialized, skip + if _mocks_initialized: + return + + verbose_logger.debug("[LANGSMITH MOCK] Initializing LangSmith mock client...") + + # Patch AsyncHTTPHandler.post (used by LiteLLM's custom httpx handler) + if _original_async_handler_post is None: + from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler + _original_async_handler_post = AsyncHTTPHandler.post + AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore + verbose_logger.debug("[LANGSMITH MOCK] Patched AsyncHTTPHandler.post") + + verbose_logger.debug(f"[LANGSMITH MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug("[LANGSMITH MOCK] LangSmith mock client initialization complete") + + _mocks_initialized = True + + +def should_use_langsmith_mock() -> bool: + """ + Determine if LangSmith should run in mock mode. + + Checks the LANGSMITH_MOCK environment variable. + + Returns: + bool: True if mock mode should be enabled + """ + import os + from litellm.secret_managers.main import str_to_bool + + mock_mode = os.getenv("LANGSMITH_MOCK", "false") + result = str_to_bool(mock_mode) + result = bool(result) if result is not None else False + + if result: + verbose_logger.info("LangSmith Mock Mode: ENABLED - API calls will be mocked") + + return result From 70598a494423ecc7ec794b723518d8a3d165d041 Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 11:04:34 -0800 Subject: [PATCH 2/9] Add Datadog mock client support - Create datadog_mock_client.py following GCS, Langfuse, and LangSmith patterns - Add mock mode detection via DATADOG_MOCK environment variable - Intercept Datadog API calls via AsyncHTTPHandler.post and httpx.Client.post patching - Add verbose logging throughout mock implementation - Update DataDogLogger and DataDogLLMObsLogger to initialize mock client when mock mode enabled - Supports both async and sync logging paths - Supports configurable mock latency via DATADOG_MOCK_LATENCY_MS --- litellm/integrations/datadog/datadog.py | 28 ++- .../integrations/datadog/datadog_llm_obs.py | 25 ++- .../datadog/datadog_mock_client.py | 176 ++++++++++++++++++ 3 files changed, 221 insertions(+), 8 deletions(-) create mode 100644 litellm/integrations/datadog/datadog_mock_client.py diff --git a/litellm/integrations/datadog/datadog.py b/litellm/integrations/datadog/datadog.py index 503e8d8c87a..08029f0da0a 100644 --- a/litellm/integrations/datadog/datadog.py +++ b/litellm/integrations/datadog/datadog.py @@ -27,6 +27,10 @@ from litellm._logging import verbose_logger from litellm._uuid import uuid from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.datadog.datadog_mock_client import ( + should_use_datadog_mock, + create_mock_datadog_client, +) from litellm.integrations.datadog.datadog_handler import ( get_datadog_hostname, get_datadog_service, @@ -80,6 +84,12 @@ def __init__( """ try: verbose_logger.debug("Datadog: in init datadog logger") + + self.is_mock_mode = should_use_datadog_mock() + + if self.is_mock_mode: + create_mock_datadog_client() + verbose_logger.debug("[DATADOG MOCK] Datadog logger initialized in mock mode") ######################################################### # Handle datadog_params set as litellm.datadog_params @@ -229,6 +239,9 @@ async def async_send_batch(self): len(self.log_queue), self.intake_url, ) + + if self.is_mock_mode: + verbose_logger.debug("[DATADOG MOCK] Mock mode enabled - API calls will be intercepted") response = await self.async_send_compressed_data(self.log_queue) if response.status_code == 413: @@ -241,11 +254,16 @@ async def async_send_batch(self): f"Response from datadog API status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug( - "Datadog: Response from datadog API status_code: %s, text: %s", - response.status_code, - response.text, - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[DATADOG MOCK] Batch of {len(self.log_queue)} events successfully mocked" + ) + else: + verbose_logger.debug( + "Datadog: Response from datadog API status_code: %s, text: %s", + response.status_code, + response.text, + ) except Exception as e: verbose_logger.exception( f"Datadog Error sending batch API - {str(e)}\n{traceback.format_exc()}" diff --git a/litellm/integrations/datadog/datadog_llm_obs.py b/litellm/integrations/datadog/datadog_llm_obs.py index 6ffdbc0a005..9291dfc2857 100644 --- a/litellm/integrations/datadog/datadog_llm_obs.py +++ b/litellm/integrations/datadog/datadog_llm_obs.py @@ -18,6 +18,10 @@ import litellm from litellm._logging import verbose_logger from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.datadog.datadog_mock_client import ( + should_use_datadog_mock, + create_mock_datadog_client, +) from litellm.integrations.datadog.datadog_handler import ( get_datadog_service, get_datadog_tags, @@ -43,6 +47,13 @@ class DataDogLLMObsLogger(CustomBatchLogger): def __init__(self, **kwargs): try: verbose_logger.debug("DataDogLLMObs: Initializing logger") + + self.is_mock_mode = should_use_datadog_mock() + + if self.is_mock_mode: + create_mock_datadog_client() + verbose_logger.debug("[DATADOG MOCK] DataDogLLMObs logger initialized in mock mode") + if os.getenv("DD_API_KEY", None) is None: raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'") if os.getenv("DD_SITE", None) is None: @@ -139,6 +150,9 @@ async def async_send_batch(self): verbose_logger.debug( f"DataDogLLMObs: Flushing {len(self.log_queue)} events" ) + + if self.is_mock_mode: + verbose_logger.debug("[DATADOG MOCK] Mock mode enabled - API calls will be intercepted") # Prepare the payload payload = { @@ -178,9 +192,14 @@ async def async_send_batch(self): f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug( - f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[DATADOG MOCK] Batch of {len(self.log_queue)} events successfully mocked" + ) + else: + verbose_logger.debug( + f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}" + ) self.log_queue.clear() except httpx.HTTPStatusError as e: verbose_logger.exception( diff --git a/litellm/integrations/datadog/datadog_mock_client.py b/litellm/integrations/datadog/datadog_mock_client.py new file mode 100644 index 00000000000..80c1003510d --- /dev/null +++ b/litellm/integrations/datadog/datadog_mock_client.py @@ -0,0 +1,176 @@ +""" +Mock client for Datadog integration testing. + +This module intercepts Datadog API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set DATADOG_MOCK=true in environment variables or config to enable mock mode. +""" + +import httpx +import json +import asyncio +from datetime import timedelta +from typing import Dict, Optional + +from litellm._logging import verbose_logger + +# Store original methods for restoration +_original_async_handler_post = None +_original_sync_client_post = None + +# Track if mocks have been initialized to avoid duplicate initialization +_mocks_initialized = False + +# Default mock latency in seconds (simulates network round-trip) +# Typical Datadog API calls take 50-150ms +_MOCK_LATENCY_SECONDS = float(__import__("os").getenv("DATADOG_MOCK_LATENCY_MS", "100")) / 1000.0 + + +class MockDatadogResponse: + """Mock httpx.Response that satisfies Datadog API requirements.""" + + def __init__(self, status_code: int = 202, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): + self.status_code = status_code + self._json_data = json_data or {"status": "ok"} + self.headers = httpx.Headers({}) + self.is_success = status_code < 400 + self.is_error = status_code >= 400 + self.is_redirect = 300 <= status_code < 400 + self.url = httpx.URL(url) if url else httpx.URL("") + # Set realistic elapsed time based on mock latency + elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS + self.elapsed = timedelta(seconds=elapsed_time) + self._text = json.dumps(self._json_data) if json_data else "" + self._content = self._text.encode("utf-8") + + @property + def text(self) -> str: + """Return response text.""" + return self._text + + @property + def content(self) -> bytes: + """Return response content.""" + return self._content + + def json(self) -> Dict: + """Return JSON response data.""" + return self._json_data + + def read(self) -> bytes: + """Read response content.""" + return self._content + + def raise_for_status(self): + """Raise exception for error status codes.""" + if self.status_code >= 400: + raise Exception(f"HTTP {self.status_code}") + + +def _is_datadog_url(url) -> bool: + """Check if URL is a Datadog domain.""" + try: + parsed_url = httpx.URL(url) if isinstance(url, str) else url + hostname = parsed_url.host or "" + + return ( + hostname.endswith(".datadoghq.com") or + hostname == "datadoghq.com" or + "datadoghq.com" in hostname or + (hostname in ("localhost", "127.0.0.1") and "datadog" in str(parsed_url).lower()) + ) + except Exception: + return False + + +async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): + """Monkey-patched AsyncHTTPHandler.post that intercepts Datadog calls.""" + # Only mock Datadog API calls + if isinstance(url, str) and _is_datadog_url(url): + verbose_logger.info(f"[DATADOG MOCK] POST to {url}") + # Simulate network latency + await asyncio.sleep(_MOCK_LATENCY_SECONDS) + return MockDatadogResponse( + status_code=202, + json_data={"status": "ok"}, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + # For non-Datadog calls, use original method + if _original_async_handler_post is not None: + return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) + # Fallback: if original not set, raise error + raise RuntimeError("Original AsyncHTTPHandler.post not available") + + +def _mock_sync_client_post(self, url, **kwargs): + """Monkey-patched httpx.Client.post that intercepts Datadog calls.""" + if _is_datadog_url(url): + verbose_logger.info(f"[DATADOG MOCK] POST to {url} (sync)") + return MockDatadogResponse(status_code=202, json_data={"status": "ok"}, url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS) + + if _original_sync_client_post is not None: + return _original_sync_client_post(self, url, **kwargs) + + +def create_mock_datadog_client(): + """ + Monkey-patch AsyncHTTPHandler.post and httpx.Client.post to intercept Datadog calls. + + AsyncHTTPHandler is used by LiteLLM's get_async_httpx_client() which is what + DataDogLogger and DataDogLLMObsLogger use for making API calls. + + httpx.Client is used for sync logging in DataDogLogger. + + This function is idempotent - it only initializes mocks once, even if called multiple times. + """ + global _original_async_handler_post, _original_sync_client_post + global _mocks_initialized + + # If already initialized, skip + if _mocks_initialized: + return + + verbose_logger.debug("[DATADOG MOCK] Initializing Datadog mock client...") + + # Patch AsyncHTTPHandler.post (used by LiteLLM's custom httpx handler) + if _original_async_handler_post is None: + from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler + _original_async_handler_post = AsyncHTTPHandler.post + AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore + verbose_logger.debug("[DATADOG MOCK] Patched AsyncHTTPHandler.post") + + # Patch httpx.Client.post (used for sync logging) + if _original_sync_client_post is None: + _original_sync_client_post = httpx.Client.post + httpx.Client.post = _mock_sync_client_post # type: ignore + verbose_logger.debug("[DATADOG MOCK] Patched httpx.Client.post") + + verbose_logger.debug(f"[DATADOG MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug("[DATADOG MOCK] Datadog mock client initialization complete") + + _mocks_initialized = True + + +def should_use_datadog_mock() -> bool: + """ + Determine if Datadog should run in mock mode. + + Checks the DATADOG_MOCK environment variable. + + Returns: + bool: True if mock mode should be enabled + """ + import os + from litellm.secret_managers.main import str_to_bool + + mock_mode = os.getenv("DATADOG_MOCK", "false") + result = str_to_bool(mock_mode) + result = bool(result) if result is not None else False + + if result: + verbose_logger.info("Datadog Mock Mode: ENABLED - API calls will be mocked") + + return result From e5bea6dd282d38fa4591fd55c8731c88e305ae9b Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 11:17:36 -0800 Subject: [PATCH 3/9] refactor: consolidate mock client logic into factory pattern - Create mock_client_factory.py to centralize common mock HTTP client logic - Refactor GCS, Langfuse, LangSmith, and Datadog mock clients to use factory - Improve GET/DELETE mock accuracy for GCS (return valid StandardLoggingPayload) - Fix DELETE mock to return empty body (204 No Content) instead of JSON - Reduce code duplication across integration mock clients --- .../datadog/datadog_mock_client.py | 184 ++--------------- .../gcs_bucket/gcs_bucket_mock_client.py | 184 +++++++---------- .../langfuse/langfuse_mock_client.py | 130 ++---------- litellm/integrations/langsmith_mock_client.py | 166 ++------------- litellm/integrations/mock_client_factory.py | 191 ++++++++++++++++++ 5 files changed, 322 insertions(+), 533 deletions(-) create mode 100644 litellm/integrations/mock_client_factory.py diff --git a/litellm/integrations/datadog/datadog_mock_client.py b/litellm/integrations/datadog/datadog_mock_client.py index 80c1003510d..a0a760deb0b 100644 --- a/litellm/integrations/datadog/datadog_mock_client.py +++ b/litellm/integrations/datadog/datadog_mock_client.py @@ -8,169 +8,21 @@ Set DATADOG_MOCK=true in environment variables or config to enable mock mode. """ -import httpx -import json -import asyncio -from datetime import timedelta -from typing import Dict, Optional - -from litellm._logging import verbose_logger - -# Store original methods for restoration -_original_async_handler_post = None -_original_sync_client_post = None - -# Track if mocks have been initialized to avoid duplicate initialization -_mocks_initialized = False - -# Default mock latency in seconds (simulates network round-trip) -# Typical Datadog API calls take 50-150ms -_MOCK_LATENCY_SECONDS = float(__import__("os").getenv("DATADOG_MOCK_LATENCY_MS", "100")) / 1000.0 - - -class MockDatadogResponse: - """Mock httpx.Response that satisfies Datadog API requirements.""" - - def __init__(self, status_code: int = 202, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): - self.status_code = status_code - self._json_data = json_data or {"status": "ok"} - self.headers = httpx.Headers({}) - self.is_success = status_code < 400 - self.is_error = status_code >= 400 - self.is_redirect = 300 <= status_code < 400 - self.url = httpx.URL(url) if url else httpx.URL("") - # Set realistic elapsed time based on mock latency - elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS - self.elapsed = timedelta(seconds=elapsed_time) - self._text = json.dumps(self._json_data) if json_data else "" - self._content = self._text.encode("utf-8") - - @property - def text(self) -> str: - """Return response text.""" - return self._text - - @property - def content(self) -> bytes: - """Return response content.""" - return self._content - - def json(self) -> Dict: - """Return JSON response data.""" - return self._json_data - - def read(self) -> bytes: - """Read response content.""" - return self._content - - def raise_for_status(self): - """Raise exception for error status codes.""" - if self.status_code >= 400: - raise Exception(f"HTTP {self.status_code}") - - -def _is_datadog_url(url) -> bool: - """Check if URL is a Datadog domain.""" - try: - parsed_url = httpx.URL(url) if isinstance(url, str) else url - hostname = parsed_url.host or "" - - return ( - hostname.endswith(".datadoghq.com") or - hostname == "datadoghq.com" or - "datadoghq.com" in hostname or - (hostname in ("localhost", "127.0.0.1") and "datadog" in str(parsed_url).lower()) - ) - except Exception: - return False - - -async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): - """Monkey-patched AsyncHTTPHandler.post that intercepts Datadog calls.""" - # Only mock Datadog API calls - if isinstance(url, str) and _is_datadog_url(url): - verbose_logger.info(f"[DATADOG MOCK] POST to {url}") - # Simulate network latency - await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockDatadogResponse( - status_code=202, - json_data={"status": "ok"}, - url=url, - elapsed_seconds=_MOCK_LATENCY_SECONDS - ) - # For non-Datadog calls, use original method - if _original_async_handler_post is not None: - return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) - # Fallback: if original not set, raise error - raise RuntimeError("Original AsyncHTTPHandler.post not available") - - -def _mock_sync_client_post(self, url, **kwargs): - """Monkey-patched httpx.Client.post that intercepts Datadog calls.""" - if _is_datadog_url(url): - verbose_logger.info(f"[DATADOG MOCK] POST to {url} (sync)") - return MockDatadogResponse(status_code=202, json_data={"status": "ok"}, url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS) - - if _original_sync_client_post is not None: - return _original_sync_client_post(self, url, **kwargs) - - -def create_mock_datadog_client(): - """ - Monkey-patch AsyncHTTPHandler.post and httpx.Client.post to intercept Datadog calls. - - AsyncHTTPHandler is used by LiteLLM's get_async_httpx_client() which is what - DataDogLogger and DataDogLLMObsLogger use for making API calls. - - httpx.Client is used for sync logging in DataDogLogger. - - This function is idempotent - it only initializes mocks once, even if called multiple times. - """ - global _original_async_handler_post, _original_sync_client_post - global _mocks_initialized - - # If already initialized, skip - if _mocks_initialized: - return - - verbose_logger.debug("[DATADOG MOCK] Initializing Datadog mock client...") - - # Patch AsyncHTTPHandler.post (used by LiteLLM's custom httpx handler) - if _original_async_handler_post is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler - _original_async_handler_post = AsyncHTTPHandler.post - AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore - verbose_logger.debug("[DATADOG MOCK] Patched AsyncHTTPHandler.post") - - # Patch httpx.Client.post (used for sync logging) - if _original_sync_client_post is None: - _original_sync_client_post = httpx.Client.post - httpx.Client.post = _mock_sync_client_post # type: ignore - verbose_logger.debug("[DATADOG MOCK] Patched httpx.Client.post") - - verbose_logger.debug(f"[DATADOG MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") - verbose_logger.debug("[DATADOG MOCK] Datadog mock client initialization complete") - - _mocks_initialized = True - - -def should_use_datadog_mock() -> bool: - """ - Determine if Datadog should run in mock mode. - - Checks the DATADOG_MOCK environment variable. - - Returns: - bool: True if mock mode should be enabled - """ - import os - from litellm.secret_managers.main import str_to_bool - - mock_mode = os.getenv("DATADOG_MOCK", "false") - result = str_to_bool(mock_mode) - result = bool(result) if result is not None else False - - if result: - verbose_logger.info("Datadog Mock Mode: ENABLED - API calls will be mocked") - - return result +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="DATADOG", + env_var="DATADOG_MOCK", + default_latency_ms=100, + default_status_code=202, + default_json_data={"status": "ok"}, + url_matchers=[ + ".datadoghq.com", + "datadoghq.com", + ], + patch_async_handler=True, + patch_sync_client=True, +) + +create_mock_datadog_client, should_use_datadog_mock = create_mock_client_factory(_config) diff --git a/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py b/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py index 6201dc343dc..de3e1739482 100644 --- a/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py +++ b/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py @@ -15,13 +15,25 @@ from typing import Dict, Optional from litellm._logging import verbose_logger - -# Store original methods for restoration -_original_async_handler_post = None +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory, MockResponse + +# Use factory for POST handler +_config = MockClientConfig( + name="GCS", + env_var="GCS_MOCK", + default_latency_ms=150, + default_status_code=200, + default_json_data={"kind": "storage#object", "name": "mock-object"}, + url_matchers=["storage.googleapis.com"], + patch_async_handler=True, + patch_sync_client=False, +) + +_create_mock_gcs_post, should_use_gcs_mock = create_mock_client_factory(_config) + +# Store original methods for GET/DELETE (GCS-specific) _original_async_handler_get = None _original_async_handler_delete = None - -# Track if mocks have been initialized to avoid duplicate initialization _mocks_initialized = False # Default mock latency in seconds (simulates network round-trip) @@ -29,84 +41,59 @@ _MOCK_LATENCY_SECONDS = float(__import__("os").getenv("GCS_MOCK_LATENCY_MS", "150")) / 1000.0 -class MockGCSResponse: - """Mock httpx.Response that satisfies GCS API requirements.""" - - def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): - self.status_code = status_code - self._json_data = json_data or {"kind": "storage#object", "name": "mock-object"} - self.headers = httpx.Headers({}) - self.is_success = status_code < 400 - self.is_error = status_code >= 400 - self.is_redirect = 300 <= status_code < 400 - self.url = httpx.URL(url) if url else httpx.URL("") - # Set realistic elapsed time based on mock latency - elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS - self.elapsed = timedelta(seconds=elapsed_time) - self._text = json.dumps(self._json_data) - self._content = self._text.encode("utf-8") - - @property - def text(self) -> str: - """Return response text.""" - return self._text - - @property - def content(self) -> bytes: - """Return response content.""" - return self._content - - def json(self) -> Dict: - """Return JSON response data.""" - return self._json_data - - def read(self) -> bytes: - """Read response content.""" - return self._content - - def raise_for_status(self): - """Raise exception for error status codes.""" - if self.status_code >= 400: - raise Exception(f"HTTP {self.status_code}") - - -async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): - """Monkey-patched AsyncHTTPHandler.post that intercepts GCS calls.""" - # Only mock GCS API calls - if isinstance(url, str) and "storage.googleapis.com" in url: - verbose_logger.info(f"[GCS MOCK] POST to {url}") - # Simulate network latency - await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockGCSResponse( - status_code=200, - json_data={"kind": "storage#object", "name": "mock-object"}, - url=url, - elapsed_seconds=_MOCK_LATENCY_SECONDS - ) - # For non-GCS calls, use original method - if _original_async_handler_post is not None: - return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) - # Fallback: if original not set, raise error - raise RuntimeError("Original AsyncHTTPHandler.post not available") - - async def _mock_async_handler_get(self, url, params=None, headers=None, follow_redirects=None): """Monkey-patched AsyncHTTPHandler.get that intercepts GCS calls.""" # Only mock GCS API calls if isinstance(url, str) and "storage.googleapis.com" in url: verbose_logger.info(f"[GCS MOCK] GET to {url}") - # Simulate network latency await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockGCSResponse( - status_code=200, - json_data={"data": "mock-log-data"}, + # Return a minimal but valid StandardLoggingPayload JSON string as bytes + # This matches what GCS returns when downloading with ?alt=media + mock_payload = { + "id": "mock-request-id", + "trace_id": "mock-trace-id", + "call_type": "completion", + "stream": False, + "response_cost": 0.0, + "status": "success", + "status_fields": {"llm_api_status": "success"}, + "custom_llm_provider": "mock", + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "startTime": 0.0, + "endTime": 0.0, + "completionStartTime": 0.0, + "response_time": 0.0, + "model_map_information": {"model": "mock-model"}, + "model": "mock-model", + "model_id": None, + "model_group": None, + "api_base": "https://api.mock.com", + "metadata": {}, + "cache_hit": None, + "cache_key": None, + "saved_cache_cost": 0.0, + "request_tags": [], + "end_user": None, + "requester_ip_address": None, + "messages": None, + "response": None, + "error_str": None, + "error_information": None, + "model_parameters": {}, + "hidden_params": {}, + "guardrail_information": None, + "standard_built_in_tools_params": None, + } + return MockResponse( + status_code=200, + json_data=mock_payload, url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS ) - # For non-GCS calls, use original method if _original_async_handler_get is not None: return await _original_async_handler_get(self, url=url, params=params, headers=headers, follow_redirects=follow_redirects) - # Fallback: if original not set, raise error raise RuntimeError("Original AsyncHTTPHandler.get not available") @@ -115,18 +102,16 @@ async def _mock_async_handler_delete(self, url, data=None, json=None, params=Non # Only mock GCS API calls if isinstance(url, str) and "storage.googleapis.com" in url: verbose_logger.info(f"[GCS MOCK] DELETE to {url}") - # Simulate network latency await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockGCSResponse( - status_code=204, - json_data={}, + # DELETE returns 204 No Content with empty body (not JSON) + return MockResponse( + status_code=204, + json_data=None, # Empty body for DELETE url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS ) - # For non-GCS calls, use original method if _original_async_handler_delete is not None: return await _original_async_handler_delete(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, content=content) - # Fallback: if original not set, raise error raise RuntimeError("Original AsyncHTTPHandler.delete not available") @@ -139,30 +124,26 @@ def create_mock_gcs_client(): This function is idempotent - it only initializes mocks once, even if called multiple times. """ - global _original_async_handler_post, _original_async_handler_get, _original_async_handler_delete - global _mocks_initialized + global _original_async_handler_get, _original_async_handler_delete, _mocks_initialized - # If already initialized, skip + # Use factory for POST handler + _create_mock_gcs_post() + + # If already initialized, skip GET/DELETE patching if _mocks_initialized: return - verbose_logger.debug("[GCS MOCK] Initializing GCS mock client...") + verbose_logger.debug("[GCS MOCK] Initializing GCS GET/DELETE handlers...") - # Patch AsyncHTTPHandler methods (used by LiteLLM's custom httpx handler) - if _original_async_handler_post is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler - _original_async_handler_post = AsyncHTTPHandler.post - AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore - verbose_logger.debug("[GCS MOCK] Patched AsyncHTTPHandler.post") + # Patch GET and DELETE handlers (GCS-specific) + from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler if _original_async_handler_get is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler _original_async_handler_get = AsyncHTTPHandler.get AsyncHTTPHandler.get = _mock_async_handler_get # type: ignore verbose_logger.debug("[GCS MOCK] Patched AsyncHTTPHandler.get") if _original_async_handler_delete is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler _original_async_handler_delete = AsyncHTTPHandler.delete AsyncHTTPHandler.delete = _mock_async_handler_delete # type: ignore verbose_logger.debug("[GCS MOCK] Patched AsyncHTTPHandler.delete") @@ -212,25 +193,4 @@ def _mock_get_token_and_url(self, model, auth_header, vertex_credentials, vertex verbose_logger.debug("[GCS MOCK] Patched Vertex AI auth methods") -def should_use_gcs_mock() -> bool: - """ - Determine if GCS should run in mock mode. - - Checks the GCS_MOCK environment variable. - - Returns: - bool: True if mock mode should be enabled - """ - import os - from litellm.secret_managers.main import str_to_bool - - mock_mode = os.getenv("GCS_MOCK", "false") - result = str_to_bool(mock_mode) - - # Ensure we return a bool, not None - result = bool(result) if result is not None else False - - if result: - verbose_logger.info("GCS Mock Mode: ENABLED - API calls will be mocked") - - return result +# should_use_gcs_mock is already created by the factory diff --git a/litellm/integrations/langfuse/langfuse_mock_client.py b/litellm/integrations/langfuse/langfuse_mock_client.py index 1dc739ea328..8ed6cff8d47 100644 --- a/litellm/integrations/langfuse/langfuse_mock_client.py +++ b/litellm/integrations/langfuse/langfuse_mock_client.py @@ -9,113 +9,27 @@ """ import httpx -import json -from datetime import timedelta -from typing import Dict, Optional - -from litellm._logging import verbose_logger - -_original_httpx_post = None - -# Default mock latency in seconds (simulates network round-trip) -# Typical Langfuse API calls take 50-150ms -_MOCK_LATENCY_SECONDS = float(__import__("os").getenv("LANGFUSE_MOCK_LATENCY_MS", "100")) / 1000.0 - - -class MockLangfuseResponse: - """Mock httpx.Response that satisfies Langfuse SDK requirements.""" - - def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): - self.status_code = status_code - self._json_data = json_data or {"status": "success"} - self.headers = httpx.Headers({}) - self.is_success = status_code < 400 - self.is_error = status_code >= 400 - self.is_redirect = 300 <= status_code < 400 - self.url = httpx.URL(url) if url else httpx.URL("") - # Set realistic elapsed time based on mock latency - elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS - self.elapsed = timedelta(seconds=elapsed_time) - self._text = json.dumps(self._json_data) - self._content = self._text.encode("utf-8") - - @property - def text(self) -> str: - return self._text - - @property - def content(self) -> bytes: - return self._content - - def json(self) -> Dict: - return self._json_data - - def read(self) -> bytes: - return self._content - - def raise_for_status(self): - if self.status_code >= 400: - raise Exception(f"HTTP {self.status_code}") - - -def _is_langfuse_url(url) -> bool: - """Check if URL is a Langfuse domain.""" - try: - parsed_url = httpx.URL(url) if isinstance(url, str) else url - hostname = parsed_url.host or "" - - return ( - hostname.endswith(".langfuse.com") or - hostname == "langfuse.com" or - (hostname in ("localhost", "127.0.0.1") and "langfuse" in str(parsed_url).lower()) - ) - except Exception: - return False - - -def _mock_httpx_post(self, url, **kwargs): - """Monkey-patched httpx.Client.post that intercepts Langfuse calls.""" - if _is_langfuse_url(url): - verbose_logger.info(f"[LANGFUSE MOCK] POST to {url}") - return MockLangfuseResponse(status_code=200, json_data={"status": "success"}, url=url, elapsed_seconds=_MOCK_LATENCY_SECONDS) - - if _original_httpx_post is not None: - return _original_httpx_post(self, url, **kwargs) - - +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="LANGFUSE", + env_var="LANGFUSE_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success"}, + url_matchers=[ + ".langfuse.com", + "langfuse.com", + ], + patch_async_handler=False, + patch_sync_client=True, +) + +_create_mock_langfuse_client_internal, should_use_langfuse_mock = create_mock_client_factory(_config) + +# Langfuse needs to return an httpx.Client instance def create_mock_langfuse_client(): - """ - Monkey-patch httpx.Client.post to intercept Langfuse calls. - - Returns a real httpx.Client instance - the monkey-patch intercepts all calls. - """ - global _original_httpx_post - - if _original_httpx_post is None: - _original_httpx_post = httpx.Client.post - httpx.Client.post = _mock_httpx_post # type: ignore - verbose_logger.debug("[LANGFUSE MOCK] Patched httpx.Client.post") - + """Create and return an httpx.Client instance - the monkey-patch intercepts all calls.""" + _create_mock_langfuse_client_internal() return httpx.Client() - - -def should_use_langfuse_mock() -> bool: - """ - Determine if Langfuse should run in mock mode. - - Checks the LANGFUSE_MOCK environment variable. - - Returns: - bool: True if mock mode should be enabled - """ - import os - from litellm.secret_managers.main import str_to_bool - - mock_mode = os.getenv("LANGFUSE_MOCK", "false") - result = str_to_bool(mock_mode) - result = bool(result) if result is not None else False - - if result: - verbose_logger.info("Langfuse Mock Mode: ENABLED - API calls will be mocked") - - return result diff --git a/litellm/integrations/langsmith_mock_client.py b/litellm/integrations/langsmith_mock_client.py index a23d0b8bf94..ef602908231 100644 --- a/litellm/integrations/langsmith_mock_client.py +++ b/litellm/integrations/langsmith_mock_client.py @@ -8,150 +8,22 @@ Set LANGSMITH_MOCK=true in environment variables or config to enable mock mode. """ -import httpx -import json -import asyncio -from datetime import timedelta -from typing import Dict, Optional - -from litellm._logging import verbose_logger - -# Store original methods for restoration -_original_async_handler_post = None - -# Track if mocks have been initialized to avoid duplicate initialization -_mocks_initialized = False - -# Default mock latency in seconds (simulates network round-trip) -# Typical LangSmith API calls take 50-150ms -_MOCK_LATENCY_SECONDS = float(__import__("os").getenv("LANGSMITH_MOCK_LATENCY_MS", "100")) / 1000.0 - - -class MockLangsmithResponse: - """Mock httpx.Response that satisfies LangSmith API requirements.""" - - def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): - self.status_code = status_code - self._json_data = json_data or {"status": "success"} - self.headers = httpx.Headers({}) - self.is_success = status_code < 400 - self.is_error = status_code >= 400 - self.is_redirect = 300 <= status_code < 400 - self.url = httpx.URL(url) if url else httpx.URL("") - # Set realistic elapsed time based on mock latency - elapsed_time = elapsed_seconds if elapsed_seconds > 0 else _MOCK_LATENCY_SECONDS - self.elapsed = timedelta(seconds=elapsed_time) - self._text = json.dumps(self._json_data) - self._content = self._text.encode("utf-8") - - @property - def text(self) -> str: - """Return response text.""" - return self._text - - @property - def content(self) -> bytes: - """Return response content.""" - return self._content - - def json(self) -> Dict: - """Return JSON response data.""" - return self._json_data - - def read(self) -> bytes: - """Read response content.""" - return self._content - - def raise_for_status(self): - """Raise exception for error status codes.""" - if self.status_code >= 400: - raise Exception(f"HTTP {self.status_code}") - - -def _is_langsmith_url(url) -> bool: - """Check if URL is a LangSmith domain.""" - try: - parsed_url = httpx.URL(url) if isinstance(url, str) else url - hostname = parsed_url.host or "" - - return ( - hostname.endswith(".smith.langchain.com") or - hostname == "api.smith.langchain.com" or - "smith.langchain.com" in hostname or - (hostname in ("localhost", "127.0.0.1") and "langsmith" in str(parsed_url).lower()) - ) - except Exception: - return False - - -async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): - """Monkey-patched AsyncHTTPHandler.post that intercepts LangSmith calls.""" - # Only mock LangSmith API calls - if isinstance(url, str) and _is_langsmith_url(url): - verbose_logger.info(f"[LANGSMITH MOCK] POST to {url}") - # Simulate network latency - await asyncio.sleep(_MOCK_LATENCY_SECONDS) - return MockLangsmithResponse( - status_code=200, - json_data={"status": "success", "ids": ["mock-run-id"]}, - url=url, - elapsed_seconds=_MOCK_LATENCY_SECONDS - ) - # For non-LangSmith calls, use original method - if _original_async_handler_post is not None: - return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) - # Fallback: if original not set, raise error - raise RuntimeError("Original AsyncHTTPHandler.post not available") - - -def create_mock_langsmith_client(): - """ - Monkey-patch AsyncHTTPHandler.post to intercept LangSmith calls. - - AsyncHTTPHandler is used by LiteLLM's get_async_httpx_client() which is what - LangsmithLogger uses for making API calls. - - This function is idempotent - it only initializes mocks once, even if called multiple times. - """ - global _original_async_handler_post - global _mocks_initialized - - # If already initialized, skip - if _mocks_initialized: - return - - verbose_logger.debug("[LANGSMITH MOCK] Initializing LangSmith mock client...") - - # Patch AsyncHTTPHandler.post (used by LiteLLM's custom httpx handler) - if _original_async_handler_post is None: - from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler - _original_async_handler_post = AsyncHTTPHandler.post - AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore - verbose_logger.debug("[LANGSMITH MOCK] Patched AsyncHTTPHandler.post") - - verbose_logger.debug(f"[LANGSMITH MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") - verbose_logger.debug("[LANGSMITH MOCK] LangSmith mock client initialization complete") - - _mocks_initialized = True - - -def should_use_langsmith_mock() -> bool: - """ - Determine if LangSmith should run in mock mode. - - Checks the LANGSMITH_MOCK environment variable. - - Returns: - bool: True if mock mode should be enabled - """ - import os - from litellm.secret_managers.main import str_to_bool - - mock_mode = os.getenv("LANGSMITH_MOCK", "false") - result = str_to_bool(mock_mode) - result = bool(result) if result is not None else False - - if result: - verbose_logger.info("LangSmith Mock Mode: ENABLED - API calls will be mocked") - - return result +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="LANGSMITH", + env_var="LANGSMITH_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success", "ids": ["mock-run-id"]}, + url_matchers=[ + ".smith.langchain.com", + "api.smith.langchain.com", + "smith.langchain.com", + ], + patch_async_handler=True, + patch_sync_client=False, +) + +create_mock_langsmith_client, should_use_langsmith_mock = create_mock_client_factory(_config) diff --git a/litellm/integrations/mock_client_factory.py b/litellm/integrations/mock_client_factory.py new file mode 100644 index 00000000000..d788570b995 --- /dev/null +++ b/litellm/integrations/mock_client_factory.py @@ -0,0 +1,191 @@ +""" +Factory for creating mock HTTP clients for integration testing. + +This module provides a simple factory pattern to create mock clients that intercept +API calls and return successful mock responses, allowing full code execution without +making actual network calls. +""" + +import httpx +import json +import asyncio +from datetime import timedelta +from typing import Dict, Optional, Callable, List, cast +from dataclasses import dataclass + +from litellm._logging import verbose_logger + + +@dataclass +class MockClientConfig: + """Configuration for creating a mock client.""" + name: str # e.g., "GCS", "LANGFUSE", "LANGSMITH", "DATADOG" + env_var: str # e.g., "GCS_MOCK", "LANGFUSE_MOCK" + default_latency_ms: int = 100 # Default mock latency in milliseconds + default_status_code: int = 200 # Default HTTP status code + default_json_data: Optional[Dict] = None # Default JSON response data + url_matchers: Optional[List[str]] = None # List of strings to match in URLs (e.g., ["storage.googleapis.com"]) + patch_async_handler: bool = True # Whether to patch AsyncHTTPHandler.post + patch_sync_client: bool = False # Whether to patch httpx.Client.post + + def __post_init__(self): + """Ensure url_matchers is a list.""" + if self.url_matchers is None: + self.url_matchers = [] + + +class MockResponse: + """Generic mock httpx.Response that satisfies API requirements.""" + + def __init__(self, status_code: int = 200, json_data: Optional[Dict] = None, url: Optional[str] = None, elapsed_seconds: float = 0.0): + self.status_code = status_code + self._json_data = json_data or {"status": "success"} + self.headers = httpx.Headers({}) + self.is_success = status_code < 400 + self.is_error = status_code >= 400 + self.is_redirect = 300 <= status_code < 400 + self.url = httpx.URL(url) if url else httpx.URL("") + self.elapsed = timedelta(seconds=elapsed_seconds) + self._text = json.dumps(self._json_data) if json_data else "" + self._content = self._text.encode("utf-8") + + @property + def text(self) -> str: + """Return response text.""" + return self._text + + @property + def content(self) -> bytes: + """Return response content.""" + return self._content + + def json(self) -> Dict: + """Return JSON response data.""" + return self._json_data + + def read(self) -> bytes: + """Read response content.""" + return self._content + + def raise_for_status(self): + """Raise exception for error status codes.""" + if self.status_code >= 400: + raise Exception(f"HTTP {self.status_code}") + + +def _is_url_match(url, matchers: List[str]) -> bool: + """Check if URL matches any of the provided matchers.""" + try: + parsed_url = httpx.URL(url) if isinstance(url, str) else url + url_str = str(parsed_url).lower() + hostname = parsed_url.host or "" + + for matcher in matchers: + if matcher.lower() in url_str or matcher.lower() in hostname.lower(): + return True + + # Also check for localhost with matcher in path + if hostname in ("localhost", "127.0.0.1"): + for matcher in matchers: + if matcher.lower() in url_str: + return True + + return False + except Exception: + return False + + +def create_mock_client_factory(config: MockClientConfig): + """ + Factory function that creates mock client functions based on configuration. + + Returns: + tuple: (create_mock_client_func, should_use_mock_func) + """ + # Store original methods for restoration + _original_async_handler_post = None + _original_sync_client_post = None + _mocks_initialized = False + + # Calculate mock latency + import os + latency_env = f"{config.name.upper()}_MOCK_LATENCY_MS" + _MOCK_LATENCY_SECONDS = float(os.getenv(latency_env, str(config.default_latency_ms))) / 1000.0 + + # Create URL matcher function + def _is_mock_url(url) -> bool: + # url_matchers is guaranteed to be a list after __post_init__ + return _is_url_match(url, cast(List[str], config.url_matchers)) + + # Create async handler mock + async def _mock_async_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, logging_obj=None, files=None, content=None): + """Monkey-patched AsyncHTTPHandler.post that intercepts API calls.""" + if isinstance(url, str) and _is_mock_url(url): + verbose_logger.info(f"[{config.name} MOCK] POST to {url}") + await asyncio.sleep(_MOCK_LATENCY_SECONDS) + return MockResponse( + status_code=config.default_status_code, + json_data=config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_async_handler_post is not None: + return await _original_async_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, logging_obj=logging_obj, files=files, content=content) + raise RuntimeError("Original AsyncHTTPHandler.post not available") + + # Create sync client mock + def _mock_sync_client_post(self, url, **kwargs): + """Monkey-patched httpx.Client.post that intercepts API calls.""" + if _is_mock_url(url): + verbose_logger.info(f"[{config.name} MOCK] POST to {url} (sync)") + return MockResponse( + status_code=config.default_status_code, + json_data=config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_sync_client_post is not None: + return _original_sync_client_post(self, url, **kwargs) + + # Create mock client initialization function + def create_mock_client(): + """Initialize the mock client by patching HTTP handlers.""" + nonlocal _original_async_handler_post, _original_sync_client_post, _mocks_initialized + + if _mocks_initialized: + return + + verbose_logger.debug(f"[{config.name} MOCK] Initializing {config.name} mock client...") + + if config.patch_async_handler and _original_async_handler_post is None: + from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler + _original_async_handler_post = AsyncHTTPHandler.post + AsyncHTTPHandler.post = _mock_async_handler_post # type: ignore + verbose_logger.debug(f"[{config.name} MOCK] Patched AsyncHTTPHandler.post") + + if config.patch_sync_client and _original_sync_client_post is None: + _original_sync_client_post = httpx.Client.post + httpx.Client.post = _mock_sync_client_post # type: ignore + verbose_logger.debug(f"[{config.name} MOCK] Patched httpx.Client.post") + + verbose_logger.debug(f"[{config.name} MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug(f"[{config.name} MOCK] {config.name} mock client initialization complete") + + _mocks_initialized = True + + # Create should_use_mock function + def should_use_mock() -> bool: + """Determine if mock mode should be enabled.""" + import os + from litellm.secret_managers.main import str_to_bool + + mock_mode = os.getenv(config.env_var, "false") + result = str_to_bool(mock_mode) + result = bool(result) if result is not None else False + + if result: + verbose_logger.info(f"{config.name} Mock Mode: ENABLED - API calls will be mocked") + + return result + + return create_mock_client, should_use_mock From 05b39a02fbca74947c8d63d7c65cb7d96f65d938 Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 11:26:52 -0800 Subject: [PATCH 4/9] feat: add PostHog mock client support - Create posthog_mock_client.py using factory pattern - Integrate mock client into PostHogLogger with mock mode detection - Add verbose logging for mock mode initialization and batch operations - Enable mock mode via POSTHOG_MOCK environment variable --- litellm/integrations/posthog.py | 38 +++++++++++++++++---- litellm/integrations/posthog_mock_client.py | 30 ++++++++++++++++ 2 files changed, 61 insertions(+), 7 deletions(-) create mode 100644 litellm/integrations/posthog_mock_client.py diff --git a/litellm/integrations/posthog.py b/litellm/integrations/posthog.py index 468b1a441fb..dd7c3627b87 100644 --- a/litellm/integrations/posthog.py +++ b/litellm/integrations/posthog.py @@ -17,6 +17,10 @@ from litellm._logging import verbose_logger from litellm._uuid import uuid from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.integrations.posthog_mock_client import ( + should_use_posthog_mock, + create_mock_posthog_client, +) from litellm.llms.custom_httpx.http_handler import ( _get_httpx_client, get_async_httpx_client, @@ -40,6 +44,12 @@ def __init__(self, **kwargs): """ try: verbose_logger.debug("PostHog: in init posthog logger") + + self.is_mock_mode = should_use_posthog_mock() + if self.is_mock_mode: + create_mock_posthog_client() + verbose_logger.debug("[POSTHOG MOCK] PostHog logger initialized in mock mode") + if os.getenv("POSTHOG_API_KEY", None) is None: raise Exception("POSTHOG_API_KEY is not set, set 'POSTHOG_API_KEY=<>'") @@ -100,7 +110,10 @@ def log_success_event(self, kwargs, response_obj, start_time, end_time): f"Response from PostHog API status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug("PostHog: Sync event successfully sent") + if self.is_mock_mode: + verbose_logger.debug("[POSTHOG MOCK] Sync event successfully mocked") + else: + verbose_logger.debug("PostHog: Sync event successfully sent") except Exception as e: verbose_logger.exception(f"PostHog Sync Layer Error - {str(e)}") @@ -320,6 +333,9 @@ async def async_send_batch(self): verbose_logger.debug( f"PostHog: Sending batch of {len(self.log_queue)} events" ) + + if self.is_mock_mode: + verbose_logger.debug("[POSTHOG MOCK] Mock mode enabled - API calls will be intercepted") # Group events by credentials for batch sending batches_by_credentials: Dict[tuple[str, str], list] = {} @@ -350,9 +366,12 @@ async def async_send_batch(self): f"Response from PostHog API status_code: {response.status_code}, text: {response.text}" ) - verbose_logger.debug( - f"PostHog: Batch of {len(self.log_queue)} events successfully sent" - ) + if self.is_mock_mode: + verbose_logger.debug(f"[POSTHOG MOCK] Batch of {len(self.log_queue)} events successfully mocked") + else: + verbose_logger.debug( + f"PostHog: Batch of {len(self.log_queue)} events successfully sent" + ) except Exception as e: verbose_logger.exception(f"PostHog Error sending batch API - {str(e)}") @@ -429,9 +448,14 @@ def _flush_on_exit(self): f"PostHog: Failed to flush on exit - status {response.status_code}" ) - verbose_logger.debug( - f"PostHog: Successfully flushed {len(self.log_queue)} events on exit" - ) + if self.is_mock_mode: + verbose_logger.debug( + f"[POSTHOG MOCK] Successfully flushed {len(self.log_queue)} events on exit" + ) + else: + verbose_logger.debug( + f"PostHog: Successfully flushed {len(self.log_queue)} events on exit" + ) self.log_queue.clear() except Exception as e: diff --git a/litellm/integrations/posthog_mock_client.py b/litellm/integrations/posthog_mock_client.py new file mode 100644 index 00000000000..b713587ed6f --- /dev/null +++ b/litellm/integrations/posthog_mock_client.py @@ -0,0 +1,30 @@ +""" +Mock httpx client for PostHog integration testing. + +This module intercepts PostHog API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set POSTHOG_MOCK=true in environment variables or config to enable mock mode. +""" + +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory + +# Create mock client using factory +_config = MockClientConfig( + name="POSTHOG", + env_var="POSTHOG_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success"}, + url_matchers=[ + ".posthog.com", + "posthog.com", + "us.i.posthog.com", + "app.posthog.com", + ], + patch_async_handler=True, + patch_sync_client=True, +) + +create_mock_posthog_client, should_use_posthog_mock = create_mock_client_factory(_config) From 5706ba9fe23e803485261ebf910854a6ba48b3c2 Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 11:47:35 -0800 Subject: [PATCH 5/9] Add Helicone mock client support - Created helicone_mock_client.py using factory pattern (similar to GCS) - Integrated mock mode detection and initialization in HeliconeLogger - Mock client patches HTTPHandler.post to intercept Helicone API calls - Uses factory pattern for should_use_mock and MockResponse utilities - Custom HTTPHandler.post patching required since HTTPHandler uses self.client.send() --- litellm/integrations/helicone.py | 15 ++- litellm/integrations/helicone_mock_client.py | 96 ++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 litellm/integrations/helicone_mock_client.py diff --git a/litellm/integrations/helicone.py b/litellm/integrations/helicone.py index 198cbaf4058..b996813b4e7 100644 --- a/litellm/integrations/helicone.py +++ b/litellm/integrations/helicone.py @@ -4,6 +4,11 @@ import traceback import litellm +from litellm._logging import verbose_logger +from litellm.integrations.helicone_mock_client import ( + should_use_helicone_mock, + create_mock_helicone_client, +) class HeliconeLogger: @@ -22,6 +27,11 @@ class HeliconeLogger: def __init__(self): # Instance variables + self.is_mock_mode = should_use_helicone_mock() + if self.is_mock_mode: + create_mock_helicone_client() + verbose_logger.info("[HELICONE MOCK] Helicone logger initialized in mock mode") + self.provider_url = "https://api.openai.com/v1" self.key = os.getenv("HELICONE_API_KEY") self.api_base = os.getenv("HELICONE_API_BASE") or "https://api.hconeai.com" @@ -185,7 +195,10 @@ def log_success( } response = litellm.module_level_client.post(url, headers=headers, json=data) if response.status_code == 200: - print_verbose("Helicone Logging - Success!") + if self.is_mock_mode: + print_verbose("[HELICONE MOCK] Helicone Logging - Successfully mocked!") + else: + print_verbose("Helicone Logging - Success!") else: print_verbose( f"Helicone Logging - Error Request was not successful. Status Code: {response.status_code}" diff --git a/litellm/integrations/helicone_mock_client.py b/litellm/integrations/helicone_mock_client.py new file mode 100644 index 00000000000..152e9c73cbe --- /dev/null +++ b/litellm/integrations/helicone_mock_client.py @@ -0,0 +1,96 @@ +""" +Mock HTTP client for Helicone integration testing. + +This module intercepts Helicone API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set HELICONE_MOCK=true in environment variables or config to enable mock mode. +""" + +import os +import time + +from litellm._logging import verbose_logger +from litellm.integrations.mock_client_factory import MockClientConfig, MockResponse, create_mock_client_factory + +# Use factory for should_use_mock and MockResponse +# HTTPHandler uses self.client.send(), not self.client.post(), so we need custom patching +_config = MockClientConfig( + name="HELICONE", + env_var="HELICONE_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"status": "success"}, + url_matchers=[ + ".hconeai.com", + "hconeai.com", + ".helicone.ai", + "helicone.ai", + ], + patch_async_handler=False, + patch_sync_client=False, # HTTPHandler uses self.client.send(), not self.client.post() +) + +# Get should_use_mock from factory (but don't use its patching since HTTPHandler is different) +_, should_use_helicone_mock = create_mock_client_factory(_config) + +# Store original HTTPHandler.post method (Helicone-specific) +_original_http_handler_post = None +_mocks_initialized = False + +# Default mock latency in seconds +_MOCK_LATENCY_SECONDS = float(os.getenv("HELICONE_MOCK_LATENCY_MS", "100")) / 1000.0 + + +def _is_helicone_url(url: str) -> bool: + """Check if URL is a Helicone API URL.""" + url_lower = url.lower() + return "hconeai.com" in url_lower or "helicone.ai" in url_lower + + +def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None): + """Monkey-patched HTTPHandler.post that intercepts Helicone calls.""" + # Only mock Helicone API calls + if isinstance(url, str) and _is_helicone_url(url): + verbose_logger.info(f"[HELICONE MOCK] POST to {url}") + time.sleep(_MOCK_LATENCY_SECONDS) + return MockResponse( + status_code=_config.default_status_code, + json_data=_config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_http_handler_post is not None: + return _original_http_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, files=files, content=content, logging_obj=logging_obj) + raise RuntimeError("Original HTTPHandler.post not available") + + +def create_mock_helicone_client(): + """ + Monkey-patch HTTPHandler.post to intercept Helicone calls. + + Helicone uses litellm.module_level_client which is an HTTPHandler instance. + HTTPHandler.post uses self.client.send(), not self.client.post(), so we need + custom patching (similar to how GCS has custom GET/DELETE handlers). + + This function is idempotent - it only initializes mocks once, even if called multiple times. + """ + global _original_http_handler_post, _mocks_initialized + + if _mocks_initialized: + return + + verbose_logger.debug("[HELICONE MOCK] Initializing Helicone mock client...") + + from litellm.llms.custom_httpx.http_handler import HTTPHandler + + if _original_http_handler_post is None: + _original_http_handler_post = HTTPHandler.post + HTTPHandler.post = _mock_http_handler_post # type: ignore + verbose_logger.debug("[HELICONE MOCK] Patched HTTPHandler.post") + + verbose_logger.debug(f"[HELICONE MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug("[HELICONE MOCK] Helicone mock client initialization complete") + + _mocks_initialized = True From 5c2f55bcd511c9c2d658df1903be11e76a6425fd Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 12:09:27 -0800 Subject: [PATCH 6/9] Add mock support for Braintrust integration and extend mock client factory - Add braintrust_mock_client.py with mock HTTP client for Braintrust integration testing - Integrate mock client into BraintrustLogger with mock mode detection - Refactor Helicone mock client to fully utilize factory's HTTPHandler.post patching - Extend mock_client_factory to support patching HTTPHandler.post for sync calls - Enable endpoint-specific mock responses for Braintrust (/project vs /project_logs) - All mock clients now properly handle both async (AsyncHTTPHandler) and sync (HTTPHandler) calls --- litellm/integrations/braintrust_logging.py | 12 ++ .../integrations/braintrust_mock_client.py | 117 ++++++++++++++++++ litellm/integrations/helicone_mock_client.py | 74 +---------- litellm/integrations/mock_client_factory.py | 27 +++- 4 files changed, 160 insertions(+), 70 deletions(-) create mode 100644 litellm/integrations/braintrust_mock_client.py diff --git a/litellm/integrations/braintrust_logging.py b/litellm/integrations/braintrust_logging.py index 585de510e8b..42e9680a7fc 100644 --- a/litellm/integrations/braintrust_logging.py +++ b/litellm/integrations/braintrust_logging.py @@ -9,6 +9,10 @@ import litellm from litellm import verbose_logger +from litellm.integrations.braintrust_mock_client import ( + should_use_braintrust_mock, + create_mock_braintrust_client, +) from litellm.integrations.custom_logger import CustomLogger from litellm.llms.custom_httpx.http_handler import ( HTTPHandler, @@ -34,6 +38,10 @@ def __init__( self, api_key: Optional[str] = None, api_base: Optional[str] = None ) -> None: super().__init__() + self.is_mock_mode = should_use_braintrust_mock() + if self.is_mock_mode: + create_mock_braintrust_client() + verbose_logger.info("[BRAINTRUST MOCK] Braintrust logger initialized in mock mode") self.validate_environment(api_key=api_key) self.api_base = api_base or os.getenv("BRAINTRUST_API_BASE") or API_BASE self.default_project_id = None @@ -254,6 +262,8 @@ def log_success_event( # noqa: PLR0915 json={"events": [request_data]}, headers=self.headers, ) + if self.is_mock_mode: + print_verbose("[BRAINTRUST MOCK] Sync event successfully mocked") except httpx.HTTPStatusError as e: raise Exception(e.response.text) except Exception as e: @@ -399,6 +409,8 @@ async def async_log_success_event( # noqa: PLR0915 json={"events": [request_data]}, headers=self.headers, ) + if self.is_mock_mode: + print_verbose("[BRAINTRUST MOCK] Async event successfully mocked") except httpx.HTTPStatusError as e: raise Exception(e.response.text) except Exception as e: diff --git a/litellm/integrations/braintrust_mock_client.py b/litellm/integrations/braintrust_mock_client.py new file mode 100644 index 00000000000..5dd2d79211c --- /dev/null +++ b/litellm/integrations/braintrust_mock_client.py @@ -0,0 +1,117 @@ +""" +Mock HTTP client for Braintrust integration testing. + +This module intercepts Braintrust API calls and returns successful mock responses, +allowing full code execution without making actual network calls. + +Usage: + Set BRAINTRUST_MOCK=true in environment variables or config to enable mock mode. +""" + +import os +import time + +from litellm._logging import verbose_logger +from litellm.integrations.mock_client_factory import MockClientConfig, MockResponse, create_mock_client_factory + +# Use factory for should_use_mock and MockResponse +# Braintrust uses both HTTPHandler (sync) and AsyncHTTPHandler (async) +# Braintrust needs endpoint-specific responses, so we use custom HTTPHandler.post patching +_config = MockClientConfig( + name="BRAINTRUST", + env_var="BRAINTRUST_MOCK", + default_latency_ms=100, + default_status_code=200, + default_json_data={"id": "mock-project-id", "status": "success"}, + url_matchers=[ + ".braintrustdata.com", + "braintrustdata.com", + ".braintrust.dev", + "braintrust.dev", + ], + patch_async_handler=True, # Patch AsyncHTTPHandler.post for async calls + patch_sync_client=False, # HTTPHandler uses self.client.send(), not self.client.post() + patch_http_handler=False, # We use custom patching for endpoint-specific responses +) + +# Get should_use_mock and create_mock_client from factory +# We need to call the factory's create_mock_client to patch AsyncHTTPHandler.post +create_mock_braintrust_factory_client, should_use_braintrust_mock = create_mock_client_factory(_config) + +# Store original HTTPHandler.post method (Braintrust-specific for sync calls with custom logic) +_original_http_handler_post = None +_mocks_initialized = False + +# Default mock latency in seconds +_MOCK_LATENCY_SECONDS = float(os.getenv("BRAINTRUST_MOCK_LATENCY_MS", "100")) / 1000.0 + + +def _is_braintrust_url(url: str) -> bool: + """Check if URL is a Braintrust API URL.""" + url_lower = url.lower() + return "braintrustdata.com" in url_lower or "braintrust.dev" in url_lower + + +def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None): + """Monkey-patched HTTPHandler.post that intercepts Braintrust calls with endpoint-specific responses.""" + # Only mock Braintrust API calls + if isinstance(url, str) and _is_braintrust_url(url): + verbose_logger.info(f"[BRAINTRUST MOCK] POST to {url}") + time.sleep(_MOCK_LATENCY_SECONDS) + # Return appropriate mock response based on endpoint + if "/project" in url: + # Project creation/retrieval/register endpoint + project_name = json.get("name", "litellm") if json else "litellm" + mock_data = {"id": f"mock-project-id-{project_name}", "name": project_name} + elif "/project_logs" in url: + # Log insertion endpoint + mock_data = {"status": "success"} + else: + mock_data = _config.default_json_data + return MockResponse( + status_code=_config.default_status_code, + json_data=mock_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_http_handler_post is not None: + return _original_http_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, files=files, content=content, logging_obj=logging_obj) + raise RuntimeError("Original HTTPHandler.post not available") + + +def create_mock_braintrust_client(): + """ + Monkey-patch HTTPHandler.post to intercept Braintrust sync calls. + + Braintrust uses HTTPHandler for sync calls and AsyncHTTPHandler for async calls. + HTTPHandler.post uses self.client.send(), not self.client.post(), so we need + custom patching for sync (similar to Helicone). + AsyncHTTPHandler.post is patched by the factory. + + We use custom patching instead of factory's patch_http_handler because we need + endpoint-specific responses (different for /project vs /project_logs). + + This function is idempotent - it only initializes mocks once, even if called multiple times. + """ + global _original_http_handler_post, _mocks_initialized + + if _mocks_initialized: + return + + verbose_logger.debug("[BRAINTRUST MOCK] Initializing Braintrust mock client...") + + from litellm.llms.custom_httpx.http_handler import HTTPHandler + + if _original_http_handler_post is None: + _original_http_handler_post = HTTPHandler.post + HTTPHandler.post = _mock_http_handler_post # type: ignore + verbose_logger.debug("[BRAINTRUST MOCK] Patched HTTPHandler.post") + + # CRITICAL: Call the factory's initialization function to patch AsyncHTTPHandler.post + # This is required for async calls to be mocked + create_mock_braintrust_factory_client() + + verbose_logger.debug(f"[BRAINTRUST MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") + verbose_logger.debug("[BRAINTRUST MOCK] Braintrust mock client initialization complete") + + _mocks_initialized = True diff --git a/litellm/integrations/helicone_mock_client.py b/litellm/integrations/helicone_mock_client.py index 152e9c73cbe..0f4670a1d2c 100644 --- a/litellm/integrations/helicone_mock_client.py +++ b/litellm/integrations/helicone_mock_client.py @@ -8,14 +8,10 @@ Set HELICONE_MOCK=true in environment variables or config to enable mock mode. """ -import os -import time +from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory -from litellm._logging import verbose_logger -from litellm.integrations.mock_client_factory import MockClientConfig, MockResponse, create_mock_client_factory - -# Use factory for should_use_mock and MockResponse -# HTTPHandler uses self.client.send(), not self.client.post(), so we need custom patching +# Create mock client using factory +# Helicone uses HTTPHandler which internally uses httpx.Client.send(), not httpx.Client.post() _config = MockClientConfig( name="HELICONE", env_var="HELICONE_MOCK", @@ -30,67 +26,7 @@ ], patch_async_handler=False, patch_sync_client=False, # HTTPHandler uses self.client.send(), not self.client.post() + patch_http_handler=True, # Patch HTTPHandler.post directly ) -# Get should_use_mock from factory (but don't use its patching since HTTPHandler is different) -_, should_use_helicone_mock = create_mock_client_factory(_config) - -# Store original HTTPHandler.post method (Helicone-specific) -_original_http_handler_post = None -_mocks_initialized = False - -# Default mock latency in seconds -_MOCK_LATENCY_SECONDS = float(os.getenv("HELICONE_MOCK_LATENCY_MS", "100")) / 1000.0 - - -def _is_helicone_url(url: str) -> bool: - """Check if URL is a Helicone API URL.""" - url_lower = url.lower() - return "hconeai.com" in url_lower or "helicone.ai" in url_lower - - -def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None): - """Monkey-patched HTTPHandler.post that intercepts Helicone calls.""" - # Only mock Helicone API calls - if isinstance(url, str) and _is_helicone_url(url): - verbose_logger.info(f"[HELICONE MOCK] POST to {url}") - time.sleep(_MOCK_LATENCY_SECONDS) - return MockResponse( - status_code=_config.default_status_code, - json_data=_config.default_json_data, - url=url, - elapsed_seconds=_MOCK_LATENCY_SECONDS - ) - if _original_http_handler_post is not None: - return _original_http_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, files=files, content=content, logging_obj=logging_obj) - raise RuntimeError("Original HTTPHandler.post not available") - - -def create_mock_helicone_client(): - """ - Monkey-patch HTTPHandler.post to intercept Helicone calls. - - Helicone uses litellm.module_level_client which is an HTTPHandler instance. - HTTPHandler.post uses self.client.send(), not self.client.post(), so we need - custom patching (similar to how GCS has custom GET/DELETE handlers). - - This function is idempotent - it only initializes mocks once, even if called multiple times. - """ - global _original_http_handler_post, _mocks_initialized - - if _mocks_initialized: - return - - verbose_logger.debug("[HELICONE MOCK] Initializing Helicone mock client...") - - from litellm.llms.custom_httpx.http_handler import HTTPHandler - - if _original_http_handler_post is None: - _original_http_handler_post = HTTPHandler.post - HTTPHandler.post = _mock_http_handler_post # type: ignore - verbose_logger.debug("[HELICONE MOCK] Patched HTTPHandler.post") - - verbose_logger.debug(f"[HELICONE MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") - verbose_logger.debug("[HELICONE MOCK] Helicone mock client initialization complete") - - _mocks_initialized = True +create_mock_helicone_client, should_use_helicone_mock = create_mock_client_factory(_config) diff --git a/litellm/integrations/mock_client_factory.py b/litellm/integrations/mock_client_factory.py index d788570b995..c45707c263d 100644 --- a/litellm/integrations/mock_client_factory.py +++ b/litellm/integrations/mock_client_factory.py @@ -27,6 +27,7 @@ class MockClientConfig: url_matchers: Optional[List[str]] = None # List of strings to match in URLs (e.g., ["storage.googleapis.com"]) patch_async_handler: bool = True # Whether to patch AsyncHTTPHandler.post patch_sync_client: bool = False # Whether to patch httpx.Client.post + patch_http_handler: bool = False # Whether to patch HTTPHandler.post (for sync calls that use HTTPHandler) def __post_init__(self): """Ensure url_matchers is a list.""" @@ -105,6 +106,7 @@ def create_mock_client_factory(config: MockClientConfig): # Store original methods for restoration _original_async_handler_post = None _original_sync_client_post = None + _original_http_handler_post = None _mocks_initialized = False # Calculate mock latency @@ -147,10 +149,27 @@ def _mock_sync_client_post(self, url, **kwargs): if _original_sync_client_post is not None: return _original_sync_client_post(self, url, **kwargs) + # Create HTTPHandler mock (for sync calls that use HTTPHandler.post) + def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None): + """Monkey-patched HTTPHandler.post that intercepts API calls.""" + if isinstance(url, str) and _is_mock_url(url): + verbose_logger.info(f"[{config.name} MOCK] POST to {url}") + import time + time.sleep(_MOCK_LATENCY_SECONDS) + return MockResponse( + status_code=config.default_status_code, + json_data=config.default_json_data, + url=url, + elapsed_seconds=_MOCK_LATENCY_SECONDS + ) + if _original_http_handler_post is not None: + return _original_http_handler_post(self, url=url, data=data, json=json, params=params, headers=headers, timeout=timeout, stream=stream, files=files, content=content, logging_obj=logging_obj) + raise RuntimeError("Original HTTPHandler.post not available") + # Create mock client initialization function def create_mock_client(): """Initialize the mock client by patching HTTP handlers.""" - nonlocal _original_async_handler_post, _original_sync_client_post, _mocks_initialized + nonlocal _original_async_handler_post, _original_sync_client_post, _original_http_handler_post, _mocks_initialized if _mocks_initialized: return @@ -168,6 +187,12 @@ def create_mock_client(): httpx.Client.post = _mock_sync_client_post # type: ignore verbose_logger.debug(f"[{config.name} MOCK] Patched httpx.Client.post") + if config.patch_http_handler and _original_http_handler_post is None: + from litellm.llms.custom_httpx.http_handler import HTTPHandler + _original_http_handler_post = HTTPHandler.post + HTTPHandler.post = _mock_http_handler_post # type: ignore + verbose_logger.debug(f"[{config.name} MOCK] Patched HTTPHandler.post") + verbose_logger.debug(f"[{config.name} MOCK] Mock latency set to {_MOCK_LATENCY_SECONDS*1000:.0f}ms") verbose_logger.debug(f"[{config.name} MOCK] {config.name} mock client initialization complete") From f60e59ab88aa7eb297ed0c3e062ae915660368c2 Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 12:17:18 -0800 Subject: [PATCH 7/9] Fix linter errors: remove unused imports and suppress complexity warning - Remove unused imports from gcs_bucket_mock_client.py (httpx, json, timedelta, Dict, Optional) - Remove unused Callable import from mock_client_factory.py - Add noqa comment to suppress PLR0915 complexity warning for create_mock_client_factory function --- litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py | 4 ---- litellm/integrations/mock_client_factory.py | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py b/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py index de3e1739482..2d14f5eb962 100644 --- a/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py +++ b/litellm/integrations/gcs_bucket/gcs_bucket_mock_client.py @@ -8,11 +8,7 @@ Set GCS_MOCK=true in environment variables or config to enable mock mode. """ -import httpx -import json import asyncio -from datetime import timedelta -from typing import Dict, Optional from litellm._logging import verbose_logger from litellm.integrations.mock_client_factory import MockClientConfig, create_mock_client_factory, MockResponse diff --git a/litellm/integrations/mock_client_factory.py b/litellm/integrations/mock_client_factory.py index c45707c263d..2f04fae9f76 100644 --- a/litellm/integrations/mock_client_factory.py +++ b/litellm/integrations/mock_client_factory.py @@ -10,7 +10,7 @@ import json import asyncio from datetime import timedelta -from typing import Dict, Optional, Callable, List, cast +from typing import Dict, Optional, List, cast from dataclasses import dataclass from litellm._logging import verbose_logger @@ -96,7 +96,7 @@ def _is_url_match(url, matchers: List[str]) -> bool: return False -def create_mock_client_factory(config: MockClientConfig): +def create_mock_client_factory(config: MockClientConfig): # noqa: PLR0915 """ Factory function that creates mock client functions based on configuration. From 33c0ff768b883bd4c39013550726dcc8395a1b95 Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Sat, 24 Jan 2026 12:20:30 -0800 Subject: [PATCH 8/9] Document mock environment variables for PostHog, Helicone, Braintrust, Datadog, and Langsmith integrations - Add POSTHOG_MOCK and POSTHOG_MOCK_LATENCY_MS documentation - Add HELICONE_MOCK and HELICONE_MOCK_LATENCY_MS documentation - Add BRAINTRUST_MOCK and BRAINTRUST_MOCK_LATENCY_MS documentation - Add DATADOG_MOCK and DATADOG_MOCK_LATENCY_MS documentation - Add LANGSMITH_MOCK and LANGSMITH_MOCK_LATENCY_MS documentation All mock env vars follow the same pattern: enable mock mode for integration testing by intercepting API calls and returning mock responses without making actual network calls. --- docs/my-website/docs/proxy/config_settings.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/my-website/docs/proxy/config_settings.md b/docs/my-website/docs/proxy/config_settings.md index 6d3fa206113..8b0d9c00078 100644 --- a/docs/my-website/docs/proxy/config_settings.md +++ b/docs/my-website/docs/proxy/config_settings.md @@ -452,6 +452,8 @@ router_settings: | BERRISPEND_ACCOUNT_ID | Account ID for BerriSpend service | BRAINTRUST_API_KEY | API key for Braintrust integration | BRAINTRUST_API_BASE | Base URL for Braintrust API. Default is https://api.braintrustdata.com/v1 +| BRAINTRUST_MOCK | Enable mock mode for Braintrust integration testing. When set to true, intercepts Braintrust API calls and returns mock responses without making actual network calls. Default is false +| BRAINTRUST_MOCK_LATENCY_MS | Mock latency in milliseconds for Braintrust API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | CACHED_STREAMING_CHUNK_DELAY | Delay in seconds for cached streaming chunks. Default is 0.02 | CHATGPT_API_BASE | Base URL for ChatGPT API. Default is https://chatgpt.com/backend-api/codex | CHATGPT_AUTH_FILE | Filename for ChatGPT authentication data. Default is "auth.json" @@ -510,6 +512,8 @@ router_settings: | DD_ENV | Environment identifier for Datadog logs. Only supported for `datadog_llm_observability` callback | DD_SERVICE | Service identifier for Datadog logs. Defaults to "litellm-server" | DD_VERSION | Version identifier for Datadog logs. Defaults to "unknown" +| DATADOG_MOCK | Enable mock mode for Datadog integration testing. When set to true, intercepts Datadog API calls and returns mock responses without making actual network calls. Default is false +| DATADOG_MOCK_LATENCY_MS | Mock latency in milliseconds for Datadog API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | DEBUG_OTEL | Enable debug mode for OpenTelemetry | DEFAULT_ALLOWED_FAILS | Maximum failures allowed before cooling down a model. Default is 3 | DEFAULT_A2A_AGENT_TIMEOUT | Default timeout in seconds for A2A (Agent-to-Agent) protocol requests. Default is 6000 @@ -674,6 +678,8 @@ router_settings: | HCP_VAULT_CERT_ROLE | Role for [Hashicorp Vault Secret Manager Auth](../secret.md#hashicorp-vault) | HELICONE_API_KEY | API key for Helicone service | HELICONE_API_BASE | Base URL for Helicone service, defaults to `https://api.helicone.ai` +| HELICONE_MOCK | Enable mock mode for Helicone integration testing. When set to true, intercepts Helicone API calls and returns mock responses without making actual network calls. Default is false +| HELICONE_MOCK_LATENCY_MS | Mock latency in milliseconds for Helicone API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | HOSTNAME | Hostname for the server, this will be [emitted to `datadog` logs](https://docs.litellm.ai/docs/proxy/logging#datadog) | HOURS_IN_A_DAY | Hours in a day for calculation purposes. Default is 24 | HIDDENLAYER_API_BASE | Base URL for HiddenLayer API. Defaults to `https://api.hiddenlayer.ai` @@ -712,6 +718,8 @@ router_settings: | LANGSMITH_PROJECT | Project name for Langsmith integration | LANGSMITH_SAMPLING_RATE | Sampling rate for Langsmith logging | LANGSMITH_TENANT_ID | Tenant ID for Langsmith multi-tenant deployments +| LANGSMITH_MOCK | Enable mock mode for Langsmith integration testing. When set to true, intercepts Langsmith API calls and returns mock responses without making actual network calls. Default is false +| LANGSMITH_MOCK_LATENCY_MS | Mock latency in milliseconds for Langsmith API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | LANGTRACE_API_KEY | API key for Langtrace service | LASSO_API_BASE | Base URL for Lasso API | LASSO_API_KEY | API key for Lasso service @@ -843,6 +851,8 @@ router_settings: | POD_NAME | Pod name for the server, this will be [emitted to `datadog` logs](https://docs.litellm.ai/docs/proxy/logging#datadog) as `POD_NAME` | POSTHOG_API_KEY | API key for PostHog analytics integration | POSTHOG_API_URL | Base URL for PostHog API (defaults to https://us.i.posthog.com) +| POSTHOG_MOCK | Enable mock mode for PostHog integration testing. When set to true, intercepts PostHog API calls and returns mock responses without making actual network calls. Default is false +| POSTHOG_MOCK_LATENCY_MS | Mock latency in milliseconds for PostHog API calls when mock mode is enabled. Simulates network round-trip time. Default is 100ms | PREDIBASE_API_BASE | Base URL for Predibase API | PRESIDIO_ANALYZER_API_BASE | Base URL for Presidio Analyzer service | PRESIDIO_ANONYMIZER_API_BASE | Base URL for Presidio Anonymizer service From 597bb37d5eca1ddbc3f741ec3ea06b9efd047d9b Mon Sep 17 00:00:00 2001 From: Alexsander Hamir Date: Fri, 30 Jan 2026 09:49:20 -0800 Subject: [PATCH 9/9] Fix security issue --- .../integrations/braintrust_mock_client.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/litellm/integrations/braintrust_mock_client.py b/litellm/integrations/braintrust_mock_client.py index 5dd2d79211c..030aa62cd0f 100644 --- a/litellm/integrations/braintrust_mock_client.py +++ b/litellm/integrations/braintrust_mock_client.py @@ -10,6 +10,7 @@ import os import time +from urllib.parse import urlparse from litellm._logging import verbose_logger from litellm.integrations.mock_client_factory import MockClientConfig, MockResponse, create_mock_client_factory @@ -18,8 +19,8 @@ # Braintrust uses both HTTPHandler (sync) and AsyncHTTPHandler (async) # Braintrust needs endpoint-specific responses, so we use custom HTTPHandler.post patching _config = MockClientConfig( - name="BRAINTRUST", - env_var="BRAINTRUST_MOCK", + "BRAINTRUST", + "BRAINTRUST_MOCK", default_latency_ms=100, default_status_code=200, default_json_data={"id": "mock-project-id", "status": "success"}, @@ -48,8 +49,21 @@ def _is_braintrust_url(url: str) -> bool: """Check if URL is a Braintrust API URL.""" - url_lower = url.lower() - return "braintrustdata.com" in url_lower or "braintrust.dev" in url_lower + if not isinstance(url, str): + return False + + parsed = urlparse(url) + host = (parsed.hostname or "").lower() + + if not host: + return False + + return ( + host == "braintrustdata.com" + or host.endswith(".braintrustdata.com") + or host == "braintrust.dev" + or host.endswith(".braintrust.dev") + ) def _mock_http_handler_post(self, url, data=None, json=None, params=None, headers=None, timeout=None, stream=False, files=None, content=None, logging_obj=None):