Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions litellm/proxy/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ class LiteLLMRoutes(enum.Enum):

agent_routes = [
"/v1/agents",
"/v1/agents/{agent_id}",
"/agents",
"/a2a/{agent_id}",
"/a2a/{agent_id}/message/send",
Expand Down
30 changes: 29 additions & 1 deletion litellm/proxy/agent_endpoints/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@
router = APIRouter()


def _check_agent_management_permission(user_api_key_dict: UserAPIKeyAuth) -> None:
"""
Raises HTTP 403 if the caller does not have permission to create, update,
or delete agents. Only PROXY_ADMIN users are allowed to perform these
write operations.
"""
if user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN:
raise HTTPException(
status_code=403,
detail={
"error": "Only proxy admins can create, update, or delete agents. Your role={}".format(
user_api_key_dict.user_role
)
},
)


@router.get(
"/v1/agents",
tags=["[beta] A2A Agents"],
Expand Down Expand Up @@ -164,6 +181,8 @@ async def create_agent(
"""
from litellm.proxy.proxy_server import prisma_client

_check_agent_management_permission(user_api_key_dict)

if prisma_client is None:
raise HTTPException(status_code=500, detail="Prisma client not initialized")

Expand Down Expand Up @@ -302,6 +321,8 @@ async def update_agent(
"""
from litellm.proxy.proxy_server import prisma_client

_check_agent_management_permission(user_api_key_dict)

if prisma_client is None:
raise HTTPException(
status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
Expand Down Expand Up @@ -391,6 +412,8 @@ async def patch_agent(
"""
from litellm.proxy.proxy_server import prisma_client

_check_agent_management_permission(user_api_key_dict)

if prisma_client is None:
raise HTTPException(
status_code=500, detail=CommonProxyErrors.db_not_connected_error.value
Expand Down Expand Up @@ -441,7 +464,10 @@ async def patch_agent(
tags=["Agents"],
dependencies=[Depends(user_api_key_auth)],
)
async def delete_agent(agent_id: str):
async def delete_agent(
agent_id: str,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
):
"""
Delete an agent

Expand All @@ -460,6 +486,8 @@ async def delete_agent(agent_id: str):
"""
from litellm.proxy.proxy_server import prisma_client

_check_agent_management_permission(user_api_key_dict)

if prisma_client is None:
raise HTTPException(status_code=500, detail="Prisma client not initialized")

Expand Down
181 changes: 181 additions & 0 deletions tests/test_litellm/proxy/agent_endpoints/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from litellm.proxy._types import LitellmUserRoles, UserAPIKeyAuth
from litellm.proxy.agent_endpoints import endpoints as agent_endpoints
from litellm.proxy.agent_endpoints.endpoints import (
_check_agent_management_permission,
get_agent_daily_activity,
router,
user_api_key_auth,
Expand Down Expand Up @@ -47,6 +48,16 @@ def _sample_agent_response(
)


def _make_app_with_role(role: LitellmUserRoles) -> TestClient:
"""Create a TestClient where the auth dependency returns the given role."""
test_app = FastAPI()
test_app.include_router(router)
test_app.dependency_overrides[user_api_key_auth] = lambda: UserAPIKeyAuth(
user_id="test-user", user_role=role
)
return TestClient(test_app)


app = FastAPI()
app.include_router(router)
app.dependency_overrides[user_api_key_auth] = lambda: UserAPIKeyAuth(
Expand Down Expand Up @@ -258,3 +269,173 @@ async def test_get_agent_daily_activity_with_agent_names(monkeypatch):
"agent-1": {"agent_name": "First Agent"},
"agent-2": {"agent_name": "Second Agent"},
}


# ---------- RBAC enforcement tests ----------


class TestAgentRBACInternalUser:
"""Internal users should be able to read agents but not create/update/delete."""

@pytest.fixture(autouse=True)
def _setup(self, monkeypatch):
self.internal_client = _make_app_with_role(LitellmUserRoles.INTERNAL_USER)
self.mock_registry = MagicMock()
monkeypatch.setattr(agent_endpoints, "AGENT_REGISTRY", self.mock_registry)

def test_should_allow_internal_user_to_list_agents(self, monkeypatch):
self.mock_registry.get_agent_list = MagicMock(return_value=[])
resp = self.internal_client.get(
"/v1/agents", headers={"Authorization": "Bearer k"}
)
assert resp.status_code == 200

def test_should_allow_internal_user_to_get_agent_by_id(self, monkeypatch):
self.mock_registry.get_agent_by_id = MagicMock(
return_value=_sample_agent_response()
)
with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma:
resp = self.internal_client.get(
"/v1/agents/agent-123", headers={"Authorization": "Bearer k"}
)
assert resp.status_code == 200

def test_should_block_internal_user_from_creating_agent(self):
resp = self.internal_client.post(
"/v1/agents",
json=_sample_agent_config(),
headers={"Authorization": "Bearer k"},
)
assert resp.status_code == 403
assert "Only proxy admins" in resp.json()["detail"]["error"]

def test_should_block_internal_user_from_updating_agent(self):
resp = self.internal_client.put(
"/v1/agents/agent-123",
json=_sample_agent_config(),
headers={"Authorization": "Bearer k"},
)
assert resp.status_code == 403

def test_should_block_internal_user_from_patching_agent(self):
resp = self.internal_client.patch(
"/v1/agents/agent-123",
json={"agent_name": "new-name"},
headers={"Authorization": "Bearer k"},
)
assert resp.status_code == 403

def test_should_block_internal_user_from_deleting_agent(self):
resp = self.internal_client.delete(
"/v1/agents/agent-123", headers={"Authorization": "Bearer k"}
)
assert resp.status_code == 403


class TestAgentRBACInternalUserViewOnly:
"""View-only internal users should only be able to read agents."""

@pytest.fixture(autouse=True)
def _setup(self, monkeypatch):
self.viewer_client = _make_app_with_role(
LitellmUserRoles.INTERNAL_USER_VIEW_ONLY
)
self.mock_registry = MagicMock()
monkeypatch.setattr(agent_endpoints, "AGENT_REGISTRY", self.mock_registry)

def test_should_allow_view_only_user_to_list_agents(self):
self.mock_registry.get_agent_list = MagicMock(return_value=[])
resp = self.viewer_client.get(
"/v1/agents", headers={"Authorization": "Bearer k"}
)
assert resp.status_code == 200

def test_should_block_view_only_user_from_creating_agent(self):
resp = self.viewer_client.post(
"/v1/agents",
json=_sample_agent_config(),
headers={"Authorization": "Bearer k"},
)
assert resp.status_code == 403

def test_should_block_view_only_user_from_deleting_agent(self):
resp = self.viewer_client.delete(
"/v1/agents/agent-123", headers={"Authorization": "Bearer k"}
)
assert resp.status_code == 403


class TestAgentRBACProxyAdmin:
"""Proxy admins should have full CRUD access to agents."""

@pytest.fixture(autouse=True)
def _setup(self, monkeypatch):
self.admin_client = _make_app_with_role(LitellmUserRoles.PROXY_ADMIN)
self.mock_registry = MagicMock()
monkeypatch.setattr(agent_endpoints, "AGENT_REGISTRY", self.mock_registry)

def test_should_allow_admin_to_create_agent(self, monkeypatch):
with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma:
self.mock_registry.get_agent_by_name = MagicMock(return_value=None)
self.mock_registry.add_agent_to_db = AsyncMock(
return_value=_sample_agent_response()
)
self.mock_registry.register_agent = MagicMock()
resp = self.admin_client.post(
"/v1/agents",
json=_sample_agent_config(),
headers={"Authorization": "Bearer k"},
)
assert resp.status_code == 200

def test_should_allow_admin_to_delete_agent(self):
existing = {
"agent_id": "agent-123",
"agent_name": "Existing Agent",
"agent_card_params": _sample_agent_card_params(),
}
with patch("litellm.proxy.proxy_server.prisma_client") as mock_prisma:
mock_prisma.db.litellm_agentstable.find_unique = AsyncMock(
return_value=existing
)
self.mock_registry.delete_agent_from_db = AsyncMock()
self.mock_registry.deregister_agent = MagicMock()
resp = self.admin_client.delete(
"/v1/agents/agent-123", headers={"Authorization": "Bearer k"}
)
assert resp.status_code == 200


class TestCheckAgentManagementPermission:
"""Unit tests for the _check_agent_management_permission helper."""

def test_should_allow_proxy_admin(self):
auth = UserAPIKeyAuth(
user_id="admin", user_role=LitellmUserRoles.PROXY_ADMIN
)
_check_agent_management_permission(auth)

@pytest.mark.parametrize(
"role",
[
LitellmUserRoles.INTERNAL_USER,
LitellmUserRoles.INTERNAL_USER_VIEW_ONLY,
LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY,
],
)
def test_should_block_non_admin_roles(self, role):
from fastapi import HTTPException

auth = UserAPIKeyAuth(user_id="user", user_role=role)
with pytest.raises(HTTPException) as exc_info:
_check_agent_management_permission(auth)
assert exc_info.value.status_code == 403


class TestAgentRoutesIncludesAgentIdPattern:
"""Verify that agent_routes includes the {agent_id} pattern for route access."""

def test_should_include_agent_id_pattern(self):
from litellm.proxy._types import LiteLLMRoutes

assert "/v1/agents/{agent_id}" in LiteLLMRoutes.agent_routes.value
Loading
Loading