diff --git a/litellm/proxy/management_endpoints/key_management_endpoints.py b/litellm/proxy/management_endpoints/key_management_endpoints.py index 1c0c212b60b..6572e3406fe 100644 --- a/litellm/proxy/management_endpoints/key_management_endpoints.py +++ b/litellm/proxy/management_endpoints/key_management_endpoints.py @@ -54,6 +54,7 @@ from litellm.proxy.common_utils.timezone_utils import get_budget_reset_time from litellm.proxy.hooks.key_management_event_hooks import KeyManagementEventHooks from litellm.proxy.management_endpoints.common_utils import ( + _is_user_org_admin_for_team, _is_user_team_admin, _set_object_metadata_field, ) @@ -1836,6 +1837,18 @@ async def _validate_update_key_data( user_api_key_cache=user_api_key_cache, ) + # Admin-only: only proxy admins, team admins, or org admins can modify max_budget + if data.max_budget is not None and data.max_budget != existing_key_row.max_budget: + if prisma_client is not None: + hashed_key = existing_key_row.token + await _check_key_admin_access( + user_api_key_dict=user_api_key_dict, + hashed_token=hashed_key, + prisma_client=prisma_client, + user_api_key_cache=user_api_key_cache, + route="/key/update (max_budget)", + ) + # Check team limits if key has a team_id (from request or existing key) team_obj: Optional[LiteLLM_TeamTableCachedObj] = None _team_id_to_check = data.team_id or getattr(existing_key_row, "team_id", None) @@ -4733,6 +4746,64 @@ def _get_condition_to_filter_out_ui_session_tokens() -> Dict[str, Any]: } +async def _check_key_admin_access( + user_api_key_dict: UserAPIKeyAuth, + hashed_token: str, + prisma_client: Any, + user_api_key_cache: DualCache, + route: str, +) -> None: + """ + Check that the caller has admin privileges for the target key. + + Allowed callers: + - Proxy admin + - Team admin for the key's team + - Org admin for the key's team's organization + + Raises HTTPException(403) if the caller is not authorized. + """ + + if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: + return + + # Look up the target key to find its team + target_key_row = await prisma_client.db.litellm_verificationtoken.find_unique( + where={"token": hashed_token} + ) + if target_key_row is None: + raise HTTPException( + status_code=404, + detail={"error": f"Key not found: {hashed_token}"}, + ) + + # If the key belongs to a team, check team admin / org admin + if target_key_row.team_id: + team_obj = await get_team_object( + team_id=target_key_row.team_id, + prisma_client=prisma_client, + user_api_key_cache=user_api_key_cache, + check_db_only=True, + ) + if team_obj is not None: + if _is_user_team_admin( + user_api_key_dict=user_api_key_dict, team_obj=team_obj + ): + return + if await _is_user_org_admin_for_team( + user_api_key_dict=user_api_key_dict, team_obj=team_obj + ): + return + + raise HTTPException( + status_code=403, + detail={ + "error": f"Only proxy admins, team admins, or org admins can call {route}. " + f"user_role={user_api_key_dict.user_role}, user_id={user_api_key_dict.user_id}" + }, + ) + + @router.post( "/key/block", tags=["key management"], dependencies=[Depends(user_api_key_auth)] ) @@ -4762,7 +4833,7 @@ async def block_key( }' ``` - Note: This is an admin-only endpoint. Only proxy admins can block keys. + Note: This is an admin-only endpoint. Only proxy admins, team admins, or org admins can block keys. """ from litellm.proxy.proxy_server import ( create_audit_log_for_update, @@ -4788,6 +4859,15 @@ async def block_key( else: hashed_token = data.key + # Admin-only: only proxy admins, team admins, or org admins can block keys + await _check_key_admin_access( + user_api_key_dict=user_api_key_dict, + hashed_token=hashed_token, + prisma_client=prisma_client, + user_api_key_cache=user_api_key_cache, + route="/key/block", + ) + if litellm.store_audit_logs is True: # make an audit log for key update record = await prisma_client.db.litellm_verificationtoken.find_unique( @@ -4876,7 +4956,7 @@ async def unblock_key( }' ``` - Note: This is an admin-only endpoint. Only proxy admins can unblock keys. + Note: This is an admin-only endpoint. Only proxy admins, team admins, or org admins can unblock keys. """ from litellm.proxy.proxy_server import ( create_audit_log_for_update, @@ -4902,6 +4982,15 @@ async def unblock_key( else: hashed_token = data.key + # Admin-only: only proxy admins, team admins, or org admins can unblock keys + await _check_key_admin_access( + user_api_key_dict=user_api_key_dict, + hashed_token=hashed_token, + prisma_client=prisma_client, + user_api_key_cache=user_api_key_cache, + route="/key/unblock", + ) + if litellm.store_audit_logs is True: # make an audit log for key update record = await prisma_client.db.litellm_verificationtoken.find_unique( diff --git a/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py b/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py index cfc16808afb..4bb0cd72ed2 100644 --- a/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py +++ b/tests/test_litellm/proxy/management_endpoints/test_key_management_endpoints.py @@ -7185,3 +7185,353 @@ def test_update_key_request_has_organization_id(): # Also verify it defaults to None data_no_org = UpdateKeyRequest(key="sk-test-key") assert data_no_org.organization_id is None + + +# ============================================================================ +# Tests for admin-only access on /key/block, /key/unblock, /key/update max_budget +# ============================================================================ + + +def _setup_block_unblock_mocks(monkeypatch, mock_key_team_id=None): + """Helper to set up common mocks for block/unblock tests.""" + mock_prisma_client = AsyncMock() + mock_user_api_key_cache = MagicMock() + mock_proxy_logging_obj = MagicMock() + + test_hashed_token = ( + "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd" + ) + + mock_key_record = MagicMock() + mock_key_record.token = test_hashed_token + mock_key_record.blocked = False + mock_key_record.team_id = mock_key_team_id + mock_key_record.model_dump_json.return_value = ( + f'{{"token": "{test_hashed_token}", "blocked": false}}' + ) + + mock_prisma_client.db.litellm_verificationtoken.find_unique = AsyncMock( + return_value=mock_key_record + ) + mock_prisma_client.db.litellm_verificationtoken.update = AsyncMock( + return_value=mock_key_record + ) + + mock_key_object = MagicMock() + mock_key_object.blocked = True + + def mock_hash_token(token): + if token.startswith("sk-"): + return test_hashed_token + return token + + monkeypatch.setattr("litellm.proxy.proxy_server.prisma_client", mock_prisma_client) + monkeypatch.setattr( + "litellm.proxy.proxy_server.user_api_key_cache", mock_user_api_key_cache + ) + monkeypatch.setattr( + "litellm.proxy.proxy_server.proxy_logging_obj", mock_proxy_logging_obj + ) + monkeypatch.setattr("litellm.proxy.proxy_server.hash_token", mock_hash_token) + monkeypatch.setattr("litellm.store_audit_logs", False) + + async def mock_get_key_object(**kwargs): + return mock_key_object + + async def mock_cache_key_object(**kwargs): + pass + + monkeypatch.setattr( + "litellm.proxy.management_endpoints.key_management_endpoints.get_key_object", + mock_get_key_object, + ) + monkeypatch.setattr( + "litellm.proxy.management_endpoints.key_management_endpoints._cache_key_object", + mock_cache_key_object, + ) + + return mock_prisma_client, test_hashed_token + + +@pytest.mark.asyncio +async def test_block_key_rejected_for_internal_user(monkeypatch): + """Internal users should not be able to block keys.""" + from litellm.proxy._types import BlockKeyRequest + from litellm.proxy.management_endpoints.key_management_endpoints import block_key + + _setup_block_unblock_mocks(monkeypatch) + + mock_request = MagicMock() + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-internal", + user_id="internal_user", + ) + + with pytest.raises(HTTPException) as exc: + await block_key( + data=BlockKeyRequest(key="sk-test123456789"), + http_request=mock_request, + user_api_key_dict=user_api_key_dict, + litellm_changed_by=None, + ) + + assert exc.value.status_code == 403 + assert "Only proxy admins, team admins, or org admins" in str(exc.value.detail) + + +@pytest.mark.asyncio +async def test_unblock_key_rejected_for_internal_user(monkeypatch): + """Internal users should not be able to unblock keys.""" + from litellm.proxy._types import BlockKeyRequest + from litellm.proxy.management_endpoints.key_management_endpoints import unblock_key + + _setup_block_unblock_mocks(monkeypatch) + + mock_request = MagicMock() + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-internal", + user_id="internal_user", + ) + + with pytest.raises(HTTPException) as exc: + await unblock_key( + data=BlockKeyRequest(key="sk-test123456789"), + http_request=mock_request, + user_api_key_dict=user_api_key_dict, + litellm_changed_by=None, + ) + + assert exc.value.status_code == 403 + assert "Only proxy admins, team admins, or org admins" in str(exc.value.detail) + + +@pytest.mark.asyncio +async def test_block_key_allowed_for_proxy_admin(monkeypatch): + """Proxy admins should be able to block keys.""" + from litellm.proxy._types import BlockKeyRequest + from litellm.proxy.management_endpoints.key_management_endpoints import block_key + + _setup_block_unblock_mocks(monkeypatch) + + mock_request = MagicMock() + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.PROXY_ADMIN, + api_key="sk-admin", + user_id="admin_user", + ) + + result = await block_key( + data=BlockKeyRequest(key="sk-test123456789"), + http_request=mock_request, + user_api_key_dict=user_api_key_dict, + litellm_changed_by=None, + ) + assert result is not None + + +@pytest.mark.asyncio +async def test_block_key_allowed_for_team_admin(monkeypatch): + """Team admins should be able to block keys belonging to their team.""" + from litellm.proxy._types import BlockKeyRequest + from litellm.proxy.management_endpoints.key_management_endpoints import block_key + + team_id = "team-123" + _setup_block_unblock_mocks(monkeypatch, mock_key_team_id=team_id) + + # Mock get_team_object to return a team where the user is admin + team_obj = LiteLLM_TeamTableCachedObj( + team_id=team_id, + members_with_roles=[ + Member(user_id="team_admin_user", role="admin"), + ], + ) + + async def mock_get_team_object(**kwargs): + return team_obj + + monkeypatch.setattr( + "litellm.proxy.management_endpoints.key_management_endpoints.get_team_object", + mock_get_team_object, + ) + + mock_request = MagicMock() + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-teamadmin", + user_id="team_admin_user", + ) + + result = await block_key( + data=BlockKeyRequest(key="sk-test123456789"), + http_request=mock_request, + user_api_key_dict=user_api_key_dict, + litellm_changed_by=None, + ) + assert result is not None + + +@pytest.mark.asyncio +async def test_update_key_max_budget_rejected_for_internal_user(monkeypatch): + """Internal users should not be able to modify max_budget on keys.""" + from litellm.proxy.management_endpoints.key_management_endpoints import ( + update_key_fn, + ) + + mock_prisma_client = AsyncMock() + mock_user_api_key_cache = AsyncMock() + mock_proxy_logging_obj = MagicMock() + + test_hashed_token = ( + "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd" + ) + + # Mock existing key row + mock_existing_key = MagicMock() + mock_existing_key.token = test_hashed_token + mock_existing_key.user_id = "internal_user" + mock_existing_key.team_id = None + mock_existing_key.project_id = None + mock_existing_key.max_budget = 10.0 + mock_existing_key.models = [] + mock_existing_key.model_dump.return_value = { + "token": test_hashed_token, + "user_id": "internal_user", + "team_id": None, + "max_budget": 10.0, + } + + mock_prisma_client.get_data = AsyncMock(return_value=mock_existing_key) + mock_prisma_client.db.litellm_verificationtoken.find_unique = AsyncMock( + return_value=mock_existing_key + ) + + monkeypatch.setattr("litellm.proxy.proxy_server.prisma_client", mock_prisma_client) + monkeypatch.setattr( + "litellm.proxy.proxy_server.user_api_key_cache", mock_user_api_key_cache + ) + monkeypatch.setattr( + "litellm.proxy.proxy_server.proxy_logging_obj", mock_proxy_logging_obj + ) + monkeypatch.setattr("litellm.proxy.proxy_server.llm_router", None) + monkeypatch.setattr("litellm.proxy.proxy_server.premium_user", True) + + mock_request = MagicMock() + mock_request.query_params = {} + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-internal", + user_id="internal_user", + ) + + with pytest.raises(ProxyException) as exc: + await update_key_fn( + request=mock_request, + data=UpdateKeyRequest(key=test_hashed_token, max_budget=999999), + user_api_key_dict=user_api_key_dict, + litellm_changed_by=None, + ) + + assert str(exc.value.code) == "403" + assert "Only proxy admins, team admins, or org admins" in str(exc.value.message) + + +@pytest.mark.asyncio +async def test_update_key_non_budget_fields_allowed_for_internal_user(monkeypatch): + """Internal users should still be able to update non-budget fields on their own keys.""" + from litellm.proxy.management_endpoints.key_management_endpoints import ( + update_key_fn, + ) + + mock_prisma_client = AsyncMock() + mock_user_api_key_cache = AsyncMock() + mock_proxy_logging_obj = MagicMock() + + test_hashed_token = ( + "a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd" + ) + + # Mock existing key row + mock_existing_key = MagicMock() + mock_existing_key.token = test_hashed_token + mock_existing_key.user_id = "internal_user" + mock_existing_key.team_id = None + mock_existing_key.project_id = None + mock_existing_key.max_budget = 10.0 + mock_existing_key.key_alias = None + mock_existing_key.models = [] + mock_existing_key.model_dump.return_value = { + "token": test_hashed_token, + "user_id": "internal_user", + "team_id": None, + "max_budget": 10.0, + } + + mock_updated_key = MagicMock() + mock_updated_key.token = test_hashed_token + mock_updated_key.key_alias = "my-alias" + + mock_prisma_client.get_data = AsyncMock(return_value=mock_existing_key) + mock_prisma_client.update_data = AsyncMock(return_value=mock_updated_key) + mock_prisma_client.db.litellm_verificationtoken.find_unique = AsyncMock( + return_value=mock_existing_key + ) + + monkeypatch.setattr("litellm.proxy.proxy_server.prisma_client", mock_prisma_client) + monkeypatch.setattr( + "litellm.proxy.proxy_server.user_api_key_cache", mock_user_api_key_cache + ) + monkeypatch.setattr( + "litellm.proxy.proxy_server.proxy_logging_obj", mock_proxy_logging_obj + ) + monkeypatch.setattr("litellm.proxy.proxy_server.llm_router", None) + monkeypatch.setattr("litellm.proxy.proxy_server.premium_user", True) + monkeypatch.setattr("litellm.store_audit_logs", False) + + def mock_hash_token(token): + return test_hashed_token + + monkeypatch.setattr("litellm.proxy.proxy_server.hash_token", mock_hash_token) + + async def mock_cache_key_object(**kwargs): + pass + + async def mock_delete_cache_key_object(**kwargs): + pass + + monkeypatch.setattr( + "litellm.proxy.management_endpoints.key_management_endpoints._cache_key_object", + mock_cache_key_object, + ) + monkeypatch.setattr( + "litellm.proxy.management_endpoints.key_management_endpoints._delete_cache_key_object", + mock_delete_cache_key_object, + ) + + # Mock _enforce_unique_key_alias to avoid DB call + async def mock_enforce_unique_key_alias(**kwargs): + pass + + monkeypatch.setattr( + "litellm.proxy.management_endpoints.key_management_endpoints._enforce_unique_key_alias", + mock_enforce_unique_key_alias, + ) + + mock_request = MagicMock() + mock_request.query_params = {} + user_api_key_dict = UserAPIKeyAuth( + user_role=LitellmUserRoles.INTERNAL_USER, + api_key="sk-internal", + user_id="internal_user", + ) + + # Updating key_alias (non-budget field) should succeed + result = await update_key_fn( + request=mock_request, + data=UpdateKeyRequest(key=test_hashed_token, key_alias="my-alias"), + user_api_key_dict=user_api_key_dict, + litellm_changed_by=None, + ) + + assert result is not None