diff --git a/tests/test_litellm/proxy/management_endpoints/test_internal_user_endpoints.py b/tests/test_litellm/proxy/management_endpoints/test_internal_user_endpoints.py index 33f2a75fac6..397a6af556f 100644 --- a/tests/test_litellm/proxy/management_endpoints/test_internal_user_endpoints.py +++ b/tests/test_litellm/proxy/management_endpoints/test_internal_user_endpoints.py @@ -12,6 +12,7 @@ from litellm.proxy._types import ( LiteLLM_UserTableFiltered, + LitellmUserRoles, NewUserRequest, ProxyException, UpdateUserRequest, @@ -306,6 +307,88 @@ async def mock_check_duplicate_user_id(*args, **kwargs): mock_license_check.is_over_limit.assert_called_once_with(total_users=1000) +@pytest.mark.asyncio +async def test_new_user_non_admin_cannot_create_admin(mocker): + """ + Test that non-admin users cannot create administrative users (PROXY_ADMIN or PROXY_ADMIN_VIEW_ONLY). + This prevents privilege escalation vulnerabilities. + """ + from litellm.proxy.management_endpoints.internal_user_endpoints import new_user + + # Mock the prisma client + mock_prisma_client = mocker.MagicMock() + + # Setup the mock count response (under license limit) + async def mock_count(*args, **kwargs): + return 5 # Low user count, under limit + + mock_prisma_client.db.litellm_usertable.count = mock_count + + # Mock duplicate checks to pass + async def mock_check_duplicate_user_email(*args, **kwargs): + return None # No duplicate found + + async def mock_check_duplicate_user_id(*args, **kwargs): + return None # No duplicate found + + mocker.patch( + "litellm.proxy.management_endpoints.internal_user_endpoints._check_duplicate_user_email", + mock_check_duplicate_user_email, + ) + mocker.patch( + "litellm.proxy.management_endpoints.internal_user_endpoints._check_duplicate_user_id", + mock_check_duplicate_user_id, + ) + + # Mock the license check to return False (under limit) + mock_license_check = mocker.MagicMock() + mock_license_check.is_over_limit.return_value = False + + # Patch the imports in the endpoint + mocker.patch("litellm.proxy.proxy_server.prisma_client", mock_prisma_client) + mocker.patch("litellm.proxy.proxy_server._license_check", mock_license_check) + + # Test Case 1: INTERNAL_USER trying to create PROXY_ADMIN + user_request = NewUserRequest( + user_email="admin@example.com", user_role=LitellmUserRoles.PROXY_ADMIN + ) + + # Mock user_api_key_dict with non-admin role + mock_user_api_key_dict = UserAPIKeyAuth( + user_id="test_internal_user", user_role=LitellmUserRoles.INTERNAL_USER + ) + + # Call new_user function and expect ProxyException + with pytest.raises(ProxyException) as exc_info: + await new_user(data=user_request, user_api_key_dict=mock_user_api_key_dict) + + # Verify the exception details + assert exc_info.value.code == 403 or exc_info.value.code == "403" + assert "Only proxy admins can create administrative users" in str(exc_info.value.message) + assert "proxy_admin" in str(exc_info.value.message) + assert "proxy_admin_viewer" in str(exc_info.value.message) + assert str(LitellmUserRoles.PROXY_ADMIN) in str(exc_info.value.message) + assert str(LitellmUserRoles.INTERNAL_USER) in str(exc_info.value.message) + + # Test Case 2: INTERNAL_USER trying to create PROXY_ADMIN_VIEW_ONLY + user_request_viewer = NewUserRequest( + user_email="admin_viewer@example.com", + user_role=LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY, + ) + + with pytest.raises(ProxyException) as exc_info2: + await new_user( + data=user_request_viewer, user_api_key_dict=mock_user_api_key_dict + ) + + # Verify the exception details + assert exc_info2.value.code == 403 or exc_info2.value.code == "403" + assert "Only proxy admins can create administrative users" in str( + exc_info2.value.message + ) + assert str(LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY) in str(exc_info2.value.message) + + @pytest.mark.asyncio async def test_user_info_url_encoding_plus_character(mocker): """