From 02ad6c702914766a1347ee825304c1430e456944 Mon Sep 17 00:00:00 2001 From: Harshit Jain Date: Sat, 24 Jan 2026 12:55:57 +0530 Subject: [PATCH] fix: token calculations and refactor --- litellm/cost_calculator.py | 56 +++-- .../litellm_core_utils/llm_cost_calc/utils.py | 16 +- tests/test_litellm/test_cost_calculator.py | 233 +++++++++++------- 3 files changed, 202 insertions(+), 103 deletions(-) diff --git a/litellm/cost_calculator.py b/litellm/cost_calculator.py index f18e8d62aa9..20a78280595 100644 --- a/litellm/cost_calculator.py +++ b/litellm/cost_calculator.py @@ -23,7 +23,11 @@ from litellm.litellm_core_utils.llm_cost_calc.utils import ( CostCalculatorUtils, _generic_cost_per_character, + _get_service_tier_cost_key, + _parse_prompt_tokens_details, + calculate_cost_component, generic_cost_per_token, + get_billable_input_tokens, select_cost_metric_for_model, ) from litellm.llms.anthropic.cost_calculation import ( @@ -427,12 +431,18 @@ def cost_per_token( # noqa: PLR0915 model=model, custom_llm_provider=custom_llm_provider ) - if model_info["input_cost_per_token"] > 0: - ## COST PER TOKEN ## - prompt_tokens_cost_usd_dollar = ( - model_info["input_cost_per_token"] * prompt_tokens + if ( + model_info.get("input_cost_per_token", 0) > 0 + or model_info.get("output_cost_per_token", 0) > 0 + ): + return generic_cost_per_token( + model=model, + usage=usage_block, + custom_llm_provider=custom_llm_provider, + service_tier=service_tier, ) - elif ( + + if ( model_info.get("input_cost_per_second", None) is not None and response_time_ms is not None ): @@ -447,11 +457,7 @@ def cost_per_token( # noqa: PLR0915 model_info["input_cost_per_second"] * response_time_ms / 1000 # type: ignore ) - if model_info["output_cost_per_token"] > 0: - completion_tokens_cost_usd_dollar = ( - model_info["output_cost_per_token"] * completion_tokens - ) - elif ( + if ( model_info.get("output_cost_per_second", None) is not None and response_time_ms is not None ): @@ -951,7 +957,10 @@ def completion_cost( # noqa: PLR0915 router_model_id=router_model_id, ) - potential_model_names = [selected_model, _get_response_model(completion_response)] + potential_model_names = [ + selected_model, + _get_response_model(completion_response), + ] if model is not None: potential_model_names.append(model) @@ -1706,10 +1715,16 @@ def default_image_cost_calculator( ) # Priority 1: Use per-image pricing if available (for gpt-image-1 and similar models) - if "input_cost_per_image" in cost_info and cost_info["input_cost_per_image"] is not None: + if ( + "input_cost_per_image" in cost_info + and cost_info["input_cost_per_image"] is not None + ): return cost_info["input_cost_per_image"] * n # Priority 2: Fall back to per-pixel pricing for backward compatibility - elif "input_cost_per_pixel" in cost_info and cost_info["input_cost_per_pixel"] is not None: + elif ( + "input_cost_per_pixel" in cost_info + and cost_info["input_cost_per_pixel"] is not None + ): return cost_info["input_cost_per_pixel"] * height * width * n else: raise Exception( @@ -1829,9 +1844,22 @@ def batch_cost_calculator( if input_cost_per_token_batches: total_prompt_cost = usage.prompt_tokens * input_cost_per_token_batches elif input_cost_per_token: + # Subtract cached tokens from prompt_tokens before calculating cost + # Fixes issue where cached tokens are being charged again total_prompt_cost = ( - usage.prompt_tokens * (input_cost_per_token) / 2 + get_billable_input_tokens(usage) * (input_cost_per_token) / 2 ) # batch cost is usually half of the regular token cost + + # Add cache read cost if applicable + details = _parse_prompt_tokens_details(usage) + cache_read_tokens = details["cache_hit_tokens"] + cache_read_cost_key = _get_service_tier_cost_key( + "cache_read_input_token_cost", None + ) + total_prompt_cost += ( + calculate_cost_component(model_info, cache_read_cost_key, cache_read_tokens) + / 2 + ) if output_cost_per_token_batches: total_completion_cost = usage.completion_tokens * output_cost_per_token_batches elif output_cost_per_token: diff --git a/litellm/litellm_core_utils/llm_cost_calc/utils.py b/litellm/litellm_core_utils/llm_cost_calc/utils.py index 65e77f014a3..eee69924618 100644 --- a/litellm/litellm_core_utils/llm_cost_calc/utils.py +++ b/litellm/litellm_core_utils/llm_cost_calc/utils.py @@ -23,6 +23,15 @@ def _is_above_128k(tokens: float) -> bool: return False +def get_billable_input_tokens(usage: Usage) -> int: + """ + Returns the number of billable input tokens. + Subtracts cached tokens from prompt tokens if applicable. + """ + details = _parse_prompt_tokens_details(usage) + return usage.prompt_tokens - details["cache_hit_tokens"] + + def select_cost_metric_for_model( model_info: ModelInfo, ) -> Literal["cost_per_character", "cost_per_token"]: @@ -190,7 +199,6 @@ def _get_token_base_cost( 1000 if "k" in threshold_str else 1 ) if usage.prompt_tokens > threshold: - prompt_base_cost = cast( float, _get_cost_per_unit(model_info, key, prompt_base_cost) ) @@ -619,7 +627,11 @@ def generic_cost_per_token( # noqa: PLR0915 # Calculate text tokens as remainder when we have a breakdown # This handles cases like OpenAI's reasoning models where text_tokens isn't provided text_tokens = max( - 0, usage.completion_tokens - reasoning_tokens - audio_tokens - image_tokens + 0, + usage.completion_tokens + - reasoning_tokens + - audio_tokens + - image_tokens, ) else: # No breakdown at all, all tokens are text tokens diff --git a/tests/test_litellm/test_cost_calculator.py b/tests/test_litellm/test_cost_calculator.py index 4d6599fc1b5..61251ca1976 100644 --- a/tests/test_litellm/test_cost_calculator.py +++ b/tests/test_litellm/test_cost_calculator.py @@ -1,4 +1,3 @@ -import json import os import sys @@ -8,7 +7,6 @@ 0, os.path.abspath("../..") ) # Adds the parent directory to the system path -from unittest.mock import MagicMock, patch from pydantic import BaseModel @@ -70,8 +68,6 @@ class MockResponse(BaseModel): def test_cost_calculator_with_usage(monkeypatch): - from litellm import get_model_info - os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" litellm.model_cost = litellm.get_model_cost_map(url="") @@ -79,7 +75,9 @@ def test_cost_calculator_with_usage(monkeypatch): prompt_tokens=120, completion_tokens=100, prompt_tokens_details=PromptTokensDetailsWrapper( - text_tokens=10, audio_tokens=90, image_tokens=20, + text_tokens=10, + audio_tokens=90, + image_tokens=20, ), ) mr = ModelResponse(usage=usage, model="gemini-2.0-flash-001") @@ -98,7 +96,9 @@ def test_cost_calculator_with_usage(monkeypatch): # Step 1: Test a model where input_cost_per_image_token is not set. # In this case the calculation should use input_cost_per_token as fallback. - assert model_info.get("input_cost_per_image_token") is None, "Test case expects that input_cost_per_image_token is not set" + assert ( + model_info.get("input_cost_per_image_token") is None + ), "Test case expects that input_cost_per_image_token is not set" expected_cost = ( usage.prompt_tokens_details.audio_tokens @@ -118,9 +118,7 @@ def test_cost_calculator_with_usage(monkeypatch): monkeypatch.setattr( litellm, "model_cost", - { - "gemini-2.0-flash-001": temp_model_info_object - }, + {"gemini-2.0-flash-001": temp_model_info_object}, ) result = response_cost_calculator( @@ -136,8 +134,10 @@ def test_cost_calculator_with_usage(monkeypatch): expected_cost = ( usage.prompt_tokens_details.audio_tokens * temp_model_info_object["input_cost_per_audio_token"] - + usage.prompt_tokens_details.text_tokens * temp_model_info_object["input_cost_per_token"] - + usage.prompt_tokens_details.image_tokens * temp_model_info_object["input_cost_per_image_token"] + + usage.prompt_tokens_details.text_tokens + * temp_model_info_object["input_cost_per_token"] + + usage.prompt_tokens_details.image_tokens + * temp_model_info_object["input_cost_per_image_token"] + usage.completion_tokens * temp_model_info_object["output_cost_per_token"] ) @@ -329,8 +329,6 @@ def test_custom_pricing_with_router_model_id(): def test_azure_realtime_cost_calculator(): - from litellm import get_model_info - os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" litellm.model_cost = litellm.get_model_cost_map(url="") @@ -387,9 +385,7 @@ def test_cost_calculator_with_cache_creation(): from litellm import completion_cost from litellm.types.utils import ( Choices, - CompletionTokensDetailsWrapper, Message, - PromptTokensDetailsWrapper, Usage, ) @@ -445,7 +441,7 @@ def test_cost_calculator_with_cache_creation(): def test_bedrock_cost_calculator_comparison_with_without_cache(): """Test that Bedrock caching reduces costs compared to non-cached requests""" from litellm import completion_cost - from litellm.types.utils import Choices, Message, PromptTokensDetailsWrapper, Usage + from litellm.types.utils import Choices, Message, Usage # Response WITHOUT caching response_no_cache = ModelResponse( @@ -696,7 +692,7 @@ def test_log_context_cost_calculation(): f"DEBUG: Tiered input cost per token (>200k): ${input_cost_above_200k:.2e}" ) else: - print(f"DEBUG: No tiered input pricing available, using base pricing") + print("DEBUG: No tiered input pricing available, using base pricing") input_cost_above_200k = input_cost_per_token if output_cost_above_200k is not None: @@ -704,7 +700,7 @@ def test_log_context_cost_calculation(): f"DEBUG: Tiered output cost per token (>200k): ${output_cost_above_200k:.2e}" ) else: - print(f"DEBUG: No tiered output pricing available, using base pricing") + print("DEBUG: No tiered output pricing available, using base pricing") output_cost_above_200k = output_cost_per_token if cache_creation_above_200k is not None: @@ -712,7 +708,7 @@ def test_log_context_cost_calculation(): f"DEBUG: Tiered cache creation cost per token (>200k): ${cache_creation_above_200k:.2e}" ) else: - print(f"DEBUG: No tiered cache creation pricing available, using base pricing") + print("DEBUG: No tiered cache creation pricing available, using base pricing") cache_creation_above_200k = cache_creation_cost_per_token # Since we're above 200k tokens, we should use tiered pricing if available @@ -848,7 +844,7 @@ def test_cost_discount_vertex_ai(): expected_cost = cost_without_discount * 0.95 assert cost_with_discount == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost discount test passed:") + print("✓ Cost discount test passed:") print(f" - Original cost: ${cost_without_discount:.6f}") print(f" - Discounted cost (5% off): ${cost_with_discount:.6f}") print(f" - Savings: ${cost_without_discount - cost_with_discount:.6f}") @@ -898,7 +894,7 @@ def test_cost_discount_not_applied_to_other_providers(): # Costs should be the same (no discount applied to OpenAI) assert cost_with_selective_discount == cost_without_discount - print(f"✓ Selective discount test passed:") + print("✓ Selective discount test passed:") print(f" - OpenAI cost (no discount configured): ${cost_without_discount:.6f}") print(f" - Cost remains unchanged: ${cost_with_selective_discount:.6f}") @@ -948,7 +944,7 @@ def test_cost_margin_percentage(): expected_cost = cost_without_margin * 1.10 assert cost_with_margin == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost margin percentage test passed:") + print("✓ Cost margin percentage test passed:") print(f" - Original cost: ${cost_without_margin:.6f}") print(f" - Cost with margin (10%): ${cost_with_margin:.6f}") print(f" - Margin added: ${cost_with_margin - cost_without_margin:.6f}") @@ -999,7 +995,7 @@ def test_cost_margin_fixed_amount(): expected_cost = cost_without_margin + 0.001 assert cost_with_margin == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost margin fixed amount test passed:") + print("✓ Cost margin fixed amount test passed:") print(f" - Original cost: ${cost_without_margin:.6f}") print(f" - Cost with margin ($0.001): ${cost_with_margin:.6f}") print(f" - Margin added: ${cost_with_margin - cost_without_margin:.6f}") @@ -1034,7 +1030,9 @@ def test_cost_margin_combined(): ) # Set 8% margin + $0.0005 fixed for openai - litellm.cost_margin_config = {"openai": {"percentage": 0.08, "fixed_amount": 0.0005}} + litellm.cost_margin_config = { + "openai": {"percentage": 0.08, "fixed_amount": 0.0005} + } # Calculate cost with margin cost_with_margin = completion_cost( @@ -1050,7 +1048,7 @@ def test_cost_margin_combined(): expected_cost = cost_without_margin * 1.08 + 0.0005 assert cost_with_margin == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost margin combined test passed:") + print("✓ Cost margin combined test passed:") print(f" - Original cost: ${cost_without_margin:.6f}") print(f" - Cost with margin (8% + $0.0005): ${cost_with_margin:.6f}") print(f" - Margin added: ${cost_with_margin - cost_without_margin:.6f}") @@ -1101,7 +1099,7 @@ def test_cost_margin_global(): expected_cost = cost_without_margin * 1.05 assert cost_with_global_margin == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost margin global test passed:") + print("✓ Cost margin global test passed:") print(f" - Original cost: ${cost_without_margin:.6f}") print(f" - Cost with global margin (5%): ${cost_with_global_margin:.6f}") print(f" - Margin added: ${cost_with_global_margin - cost_without_margin:.6f}") @@ -1152,9 +1150,11 @@ def test_cost_margin_provider_overrides_global(): expected_cost = cost_without_margin * 1.10 # 10% from provider, not 5% from global assert cost_with_provider_margin == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost margin provider override test passed:") + print("✓ Cost margin provider override test passed:") print(f" - Original cost: ${cost_without_margin:.6f}") - print(f" - Cost with provider margin (10%, overrides 5% global): ${cost_with_provider_margin:.6f}") + print( + f" - Cost with provider margin (10%, overrides 5% global): ${cost_with_provider_margin:.6f}" + ) print(f" - Margin added: ${cost_with_provider_margin - cost_without_margin:.6f}") @@ -1208,7 +1208,7 @@ def test_cost_margin_with_discount(): expected_cost = base_cost * 0.95 * 1.10 assert cost_with_both == pytest.approx(expected_cost, rel=1e-9) - print(f"✓ Cost margin with discount test passed:") + print("✓ Cost margin with discount test passed:") print(f" - Base cost: ${base_cost:.6f}") print(f" - Cost with 5% discount + 10% margin: ${cost_with_both:.6f}") print(f" - Expected: ${expected_cost:.6f}") @@ -1277,14 +1277,10 @@ def test_completion_cost_extracts_service_tier_from_response(): # Test with gpt-5-nano which has flex pricing model = "gpt-5-nano" - + # Create usage object - usage = Usage( - prompt_tokens=1000, - completion_tokens=500, - total_tokens=1500 - ) - + usage = Usage(prompt_tokens=1000, completion_tokens=500, total_tokens=1500) + # Create ModelResponse with service_tier in the response object response_with_service_tier = ModelResponse( usage=usage, @@ -1292,34 +1288,36 @@ def test_completion_cost_extracts_service_tier_from_response(): ) # Set service_tier as an attribute on the response setattr(response_with_service_tier, "service_tier", "flex") - + # Test that flex pricing is used when service_tier is in response flex_cost = completion_cost( completion_response=response_with_service_tier, model=model, custom_llm_provider="openai", ) - + # Create ModelResponse without service_tier (should use standard pricing) response_without_service_tier = ModelResponse( usage=usage, model=model, ) - + # Test that standard pricing is used when service_tier is not in response standard_cost = completion_cost( completion_response=response_without_service_tier, model=model, custom_llm_provider="openai", ) - + # Flex should be approximately 50% of standard assert flex_cost > 0, "Flex cost should be greater than 0" assert standard_cost > 0, "Standard cost should be greater than 0" assert flex_cost < standard_cost, "Flex cost should be less than standard cost" - + flex_ratio = flex_cost / standard_cost - assert 0.45 <= flex_ratio <= 0.55, f"Flex pricing should be ~50% of standard, got {flex_ratio:.2f}" + assert ( + 0.45 <= flex_ratio <= 0.55 + ), f"Flex pricing should be ~50% of standard, got {flex_ratio:.2f}" def test_completion_cost_extracts_service_tier_from_usage(): @@ -1331,56 +1329,54 @@ def test_completion_cost_extracts_service_tier_from_usage(): # Test with gpt-5-nano which has flex pricing model = "gpt-5-nano" - + # Create usage object with service_tier usage_with_service_tier = Usage( - prompt_tokens=1000, - completion_tokens=500, - total_tokens=1500 + prompt_tokens=1000, completion_tokens=500, total_tokens=1500 ) # Set service_tier as an attribute on the usage object setattr(usage_with_service_tier, "service_tier", "flex") - + # Create ModelResponse with usage containing service_tier response = ModelResponse( usage=usage_with_service_tier, model=model, ) - + # Test that flex pricing is used when service_tier is in usage flex_cost = completion_cost( completion_response=response, model=model, custom_llm_provider="openai", ) - + # Create usage object without service_tier usage_without_service_tier = Usage( - prompt_tokens=1000, - completion_tokens=500, - total_tokens=1500 + prompt_tokens=1000, completion_tokens=500, total_tokens=1500 ) - + # Create ModelResponse with usage without service_tier response_standard = ModelResponse( usage=usage_without_service_tier, model=model, ) - + # Test that standard pricing is used when service_tier is not in usage standard_cost = completion_cost( completion_response=response_standard, model=model, custom_llm_provider="openai", ) - + # Flex should be approximately 50% of standard assert flex_cost > 0, "Flex cost should be greater than 0" assert standard_cost > 0, "Standard cost should be greater than 0" assert flex_cost < standard_cost, "Flex cost should be less than standard cost" - + flex_ratio = flex_cost / standard_cost - assert 0.45 <= flex_ratio <= 0.55, f"Flex pricing should be ~50% of standard, got {flex_ratio:.2f}" + assert ( + 0.45 <= flex_ratio <= 0.55 + ), f"Flex pricing should be ~50% of standard, got {flex_ratio:.2f}" def test_completion_cost_service_tier_priority(): @@ -1392,22 +1388,18 @@ def test_completion_cost_service_tier_priority(): # Test with gpt-5-nano which has flex pricing model = "gpt-5-nano" - + # Create usage object with service_tier="flex" - usage = Usage( - prompt_tokens=1000, - completion_tokens=500, - total_tokens=1500 - ) + usage = Usage(prompt_tokens=1000, completion_tokens=500, total_tokens=1500) setattr(usage, "service_tier", "flex") - + # Create response with service_tier="priority" response = ModelResponse( usage=usage, model=model, ) setattr(response, "service_tier", "priority") - + # Test that optional_params takes priority over response and usage cost_from_params = completion_cost( completion_response=response, @@ -1415,14 +1407,14 @@ def test_completion_cost_service_tier_priority(): custom_llm_provider="openai", optional_params={"service_tier": "flex"}, ) - + # Test that response takes priority over usage when optional_params is not provided - cost_from_response = completion_cost( + completion_cost( completion_response=response, model=model, custom_llm_provider="openai", ) - + # Test that usage is used when neither optional_params nor response have service_tier # Create a new response without service_tier attribute response_no_tier = ModelResponse( @@ -1430,25 +1422,27 @@ def test_completion_cost_service_tier_priority(): model=model, ) # Don't set service_tier on response, so it will fall back to usage - + cost_from_usage = completion_cost( completion_response=response_no_tier, model=model, custom_llm_provider="openai", ) - + # All should use flex pricing (from different sources) assert cost_from_params > 0, "Cost from params should be greater than 0" assert cost_from_usage > 0, "Cost from usage should be greater than 0" - + # Costs should be similar (all using flex) - assert abs(cost_from_params - cost_from_usage) < 1e-6, "Costs from params and usage should be similar (both flex)" + assert ( + abs(cost_from_params - cost_from_usage) < 1e-6 + ), "Costs from params and usage should be similar (both flex)" def test_gemini_cache_tokens_details_no_negative_values(): """ Test for Issue #18750: Negative text_tokens with Gemini caching - + When using Gemini with explicit caching, the response includes cacheTokensDetails which breaks down cached tokens by modality. This test ensures that: 1. text_tokens is never negative @@ -1469,41 +1463,47 @@ def test_gemini_cache_tokens_details_no_negative_values(): # Total tokens by modality (includes cached + non-cached) "promptTokensDetails": [ {"modality": "TEXT", "tokenCount": 9402}, - {"modality": "IMAGE", "tokenCount": 258} + {"modality": "IMAGE", "tokenCount": 258}, ], # Breakdown of cached tokens by modality "cacheTokensDetails": [ {"modality": "TEXT", "tokenCount": 9393}, - {"modality": "IMAGE", "tokenCount": 258} - ] + {"modality": "IMAGE", "tokenCount": 258}, + ], } } usage = VertexGeminiConfig._calculate_usage(completion_response) # Text tokens should be non-cached text only: 9402 - 9393 = 9 - assert usage.prompt_tokens_details.text_tokens == 9, \ - f"Expected text_tokens=9, got {usage.prompt_tokens_details.text_tokens}" + assert ( + usage.prompt_tokens_details.text_tokens == 9 + ), f"Expected text_tokens=9, got {usage.prompt_tokens_details.text_tokens}" # Image tokens should be non-cached image only: 258 - 258 = 0 - assert usage.prompt_tokens_details.image_tokens == 0, \ - f"Expected image_tokens=0, got {usage.prompt_tokens_details.image_tokens}" + assert ( + usage.prompt_tokens_details.image_tokens == 0 + ), f"Expected image_tokens=0, got {usage.prompt_tokens_details.image_tokens}" # Total cached should match - assert usage.prompt_tokens_details.cached_tokens == 9651, \ - f"Expected cached_tokens=9651, got {usage.prompt_tokens_details.cached_tokens}" + assert ( + usage.prompt_tokens_details.cached_tokens == 9651 + ), f"Expected cached_tokens=9651, got {usage.prompt_tokens_details.cached_tokens}" # MOST IMPORTANT: text_tokens should NEVER be negative - assert usage.prompt_tokens_details.text_tokens >= 0, \ - f"BUG: text_tokens is negative ({usage.prompt_tokens_details.text_tokens})! This was the issue in #18750" + assert ( + usage.prompt_tokens_details.text_tokens >= 0 + ), f"BUG: text_tokens is negative ({usage.prompt_tokens_details.text_tokens})! This was the issue in #18750" - print("✅ Issue #18750 fix verified: text_tokens is correctly calculated and non-negative") + print( + "✅ Issue #18750 fix verified: text_tokens is correctly calculated and non-negative" + ) def test_gemini_without_cache_tokens_details(): """ Test Gemini response without cacheTokensDetails (implicit caching or no cache) - + When cacheTokensDetails is not present, we should use promptTokensDetails as-is without subtracting anything. """ @@ -1518,7 +1518,7 @@ def test_gemini_without_cache_tokens_details(): "totalTokenCount": 279, "promptTokensDetails": [ {"modality": "TEXT", "tokenCount": 6}, - {"modality": "IMAGE", "tokenCount": 258} + {"modality": "IMAGE", "tokenCount": 258}, ] # No cacheTokensDetails } @@ -1532,3 +1532,62 @@ def test_gemini_without_cache_tokens_details(): assert usage.prompt_tokens_details.text_tokens >= 0 print("✅ Gemini without cacheTokensDetails works correctly") + + +def test_generic_provider_cached_token_cost(): + """ + Test that the generic cost calculator correctly handles cached tokens + for providers like z.ai/deepseek that are not explicitly handled. + """ + from litellm.cost_calculator import completion_cost + from litellm.types.utils import ModelResponse, PromptTokensDetailsWrapper, Usage + + # Setup model cost for a generic provider + # We use a name that will bypass complex provider mapping logic + model_name = "custom-cached-model" + litellm.model_cost[model_name] = { + "input_cost_per_token": 0.0000006, + "output_cost_per_token": 0.0000006, + "cache_read_input_token_cost": 0.0000001, + "litellm_provider": "openai", + } + + # Case 1: Standard nested cached tokens (prompt_tokens_details.cached_tokens) + usage = Usage( + prompt_tokens=10000, + completion_tokens=0, + prompt_tokens_details=PromptTokensDetailsWrapper(cached_tokens=9000), + ) + response = ModelResponse(usage=usage, model=model_name) + + cost = completion_cost( + completion_response=response, + model=model_name, + custom_llm_provider="openai", # Explicitly set provider to trigger generic path + ) + + # Expected: (1000 * 0.0000006) + (9000 * 0.0000001) = 0.0006 + 0.0009 = 0.0015 + expected_cost = 0.0015 + assert ( + abs(cost - expected_cost) < 1e-9 + ), f"Nested cache cost failed. Got {cost}, expected {expected_cost}" + + # Case 2: Top-level cached tokens (cache_read_input_tokens) + usage_top = Usage( + prompt_tokens=10000, + completion_tokens=0, + cache_read_input_tokens=9000, + ) + response_top = ModelResponse(usage=usage_top, model=model_name) + + cost_top = completion_cost( + completion_response=response_top, + model=model_name, + custom_llm_provider="openai", + ) + + assert ( + abs(cost_top - expected_cost) < 1e-9 + ), f"Top-level cache cost failed. Got {cost_top}, expected {expected_cost}" + + print("✅ Generic provider cached token cost verified")