-
-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[Fix] Privilege Escalation on /key/block, /key/unblock, and /key/update max_budget #23781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,7 @@ | |
| from litellm.proxy.common_utils.timezone_utils import get_budget_reset_time | ||
| from litellm.proxy.hooks.key_management_event_hooks import KeyManagementEventHooks | ||
| from litellm.proxy.management_endpoints.common_utils import ( | ||
| _is_user_org_admin_for_team, | ||
| _is_user_team_admin, | ||
| _set_object_metadata_field, | ||
| ) | ||
|
|
@@ -1836,6 +1837,18 @@ async def _validate_update_key_data( | |
| user_api_key_cache=user_api_key_cache, | ||
| ) | ||
|
|
||
| # Admin-only: only proxy admins, team admins, or org admins can modify max_budget | ||
| if data.max_budget is not None and data.max_budget != existing_key_row.max_budget: | ||
| if prisma_client is not None: | ||
| hashed_key = existing_key_row.token | ||
| await _check_key_admin_access( | ||
| user_api_key_dict=user_api_key_dict, | ||
| hashed_token=hashed_key, | ||
| prisma_client=prisma_client, | ||
| user_api_key_cache=user_api_key_cache, | ||
| route="/key/update (max_budget)", | ||
| ) | ||
|
|
||
| # Check team limits if key has a team_id (from request or existing key) | ||
| team_obj: Optional[LiteLLM_TeamTableCachedObj] = None | ||
| _team_id_to_check = data.team_id or getattr(existing_key_row, "team_id", None) | ||
|
|
@@ -4733,6 +4746,64 @@ def _get_condition_to_filter_out_ui_session_tokens() -> Dict[str, Any]: | |
| } | ||
|
|
||
|
|
||
| async def _check_key_admin_access( | ||
| user_api_key_dict: UserAPIKeyAuth, | ||
| hashed_token: str, | ||
| prisma_client: Any, | ||
| user_api_key_cache: DualCache, | ||
| route: str, | ||
| ) -> None: | ||
| """ | ||
| Check that the caller has admin privileges for the target key. | ||
|
|
||
| Allowed callers: | ||
| - Proxy admin | ||
| - Team admin for the key's team | ||
| - Org admin for the key's team's organization | ||
|
|
||
| Raises HTTPException(403) if the caller is not authorized. | ||
| """ | ||
|
|
||
| if user_api_key_dict.user_role == LitellmUserRoles.PROXY_ADMIN.value: | ||
| return | ||
|
|
||
| # Look up the target key to find its team | ||
| target_key_row = await prisma_client.db.litellm_verificationtoken.find_unique( | ||
| where={"token": hashed_token} | ||
| ) | ||
|
Comment on lines
+4770
to
+4773
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant DB fetch in the When A simple improvement would be to add an optional |
||
| if target_key_row is None: | ||
| raise HTTPException( | ||
| status_code=404, | ||
| detail={"error": f"Key not found: {hashed_token}"}, | ||
| ) | ||
|
|
||
| # If the key belongs to a team, check team admin / org admin | ||
| if target_key_row.team_id: | ||
| team_obj = await get_team_object( | ||
| team_id=target_key_row.team_id, | ||
| prisma_client=prisma_client, | ||
| user_api_key_cache=user_api_key_cache, | ||
| check_db_only=True, | ||
| ) | ||
|
Comment on lines
+4782
to
+4787
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Passing Consider dropping |
||
| if team_obj is not None: | ||
| if _is_user_team_admin( | ||
| user_api_key_dict=user_api_key_dict, team_obj=team_obj | ||
| ): | ||
| return | ||
| if await _is_user_org_admin_for_team( | ||
| user_api_key_dict=user_api_key_dict, team_obj=team_obj | ||
| ): | ||
| return | ||
|
Comment on lines
+4780
to
+4796
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Team admin silently denied when team lookup returns When the target key has a The safer approach is to check |
||
|
|
||
| raise HTTPException( | ||
| status_code=403, | ||
| detail={ | ||
| "error": f"Only proxy admins, team admins, or org admins can call {route}. " | ||
| f"user_role={user_api_key_dict.user_role}, user_id={user_api_key_dict.user_id}" | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| @router.post( | ||
| "/key/block", tags=["key management"], dependencies=[Depends(user_api_key_auth)] | ||
| ) | ||
|
|
@@ -4762,7 +4833,7 @@ async def block_key( | |
| }' | ||
| ``` | ||
|
|
||
| Note: This is an admin-only endpoint. Only proxy admins can block keys. | ||
| Note: This is an admin-only endpoint. Only proxy admins, team admins, or org admins can block keys. | ||
| """ | ||
| from litellm.proxy.proxy_server import ( | ||
| create_audit_log_for_update, | ||
|
|
@@ -4788,6 +4859,15 @@ async def block_key( | |
| else: | ||
| hashed_token = data.key | ||
|
|
||
| # Admin-only: only proxy admins, team admins, or org admins can block keys | ||
| await _check_key_admin_access( | ||
| user_api_key_dict=user_api_key_dict, | ||
| hashed_token=hashed_token, | ||
| prisma_client=prisma_client, | ||
| user_api_key_cache=user_api_key_cache, | ||
| route="/key/block", | ||
| ) | ||
|
|
||
| if litellm.store_audit_logs is True: | ||
| # make an audit log for key update | ||
| record = await prisma_client.db.litellm_verificationtoken.find_unique( | ||
|
|
@@ -4876,7 +4956,7 @@ async def unblock_key( | |
| }' | ||
| ``` | ||
|
|
||
| Note: This is an admin-only endpoint. Only proxy admins can unblock keys. | ||
| Note: This is an admin-only endpoint. Only proxy admins, team admins, or org admins can unblock keys. | ||
| """ | ||
| from litellm.proxy.proxy_server import ( | ||
| create_audit_log_for_update, | ||
|
|
@@ -4902,6 +4982,15 @@ async def unblock_key( | |
| else: | ||
| hashed_token = data.key | ||
|
|
||
| # Admin-only: only proxy admins, team admins, or org admins can unblock keys | ||
| await _check_key_admin_access( | ||
| user_api_key_dict=user_api_key_dict, | ||
| hashed_token=hashed_token, | ||
| prisma_client=prisma_client, | ||
| user_api_key_cache=user_api_key_cache, | ||
| route="/key/unblock", | ||
| ) | ||
|
|
||
| if litellm.store_audit_logs is True: | ||
| # make an audit log for key update | ||
| record = await prisma_client.db.litellm_verificationtoken.find_unique( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Admin check silently skipped when
prisma_clientisNoneThe
if prisma_client is not None:guard means that if the DB is somehow unavailable at this call site, themax_budgetprivilege check is bypassed entirely and a non-admin user can silently raise their own budget.While
update_key_fnraises earlier whenprisma_client is None,_validate_update_key_datais a reusable helper that could be called from other call sites without that guarantee. The safer approach is to fail loudly rather than silently permit the mutation — for example, raising anHTTPException(status_code=500)whenprisma_client is Noneand a budget change is requested, so the absence of DB connectivity is an explicit error rather than a hidden privilege bypass.