Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion assisted_service_mcp/src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
server = AssistedServiceMCPServer()

# Choose the appropriate transport protocol based on settings
if settings.TRANSPORT == "streamable-http":
if settings.TRANSPORT == "stdio":
# For stdio transport, no HTTP app is needed
# The server will run directly via asyncio
app = None # type: ignore
log.info("Using stdio transport (for Claude Code and CLI clients)")
elif settings.TRANSPORT == "streamable-http":
app = server.mcp.streamable_http_app()
log.info("Using StreamableHTTP transport (stateless)")
else:
Expand Down
21 changes: 14 additions & 7 deletions assisted_service_mcp/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
def main() -> None:
"""Start the MCP server.

Initializes the server, sets up metrics, and starts the uvicorn server.
Initializes the server, sets up metrics, and starts the server with
the configured transport (stdio, sse, or streamable-http).
"""
try:
log.info("Starting Assisted Service MCP Server")
Expand All @@ -26,12 +27,18 @@ def main() -> None:
initiate_metrics(tool_names)
log.info("Initialized metrics for %s tools", len(tool_names))

# Add metrics endpoint
app.add_route("/metrics", metrics)
log.info("Metrics endpoint available at /metrics")

# Start the server using settings
uvicorn.run(app, host=settings.MCP_HOST, port=settings.MCP_PORT)
# Handle stdio transport differently - no HTTP server needed
if settings.TRANSPORT == "stdio":
log.info("Running in stdio mode")
# Run the MCP server directly with stdio transport
server.mcp.run()
else:
# Add metrics endpoint for HTTP transports
app.add_route("/metrics", metrics)
log.info("Metrics endpoint available at /metrics")

# Start the HTTP server using uvicorn
uvicorn.run(app, host=settings.MCP_HOST, port=settings.MCP_PORT)

except KeyboardInterrupt:
log.info("Received keyboard interrupt, shutting down")
Expand Down
123 changes: 103 additions & 20 deletions assisted_service_mcp/src/service_client/assisted_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,42 +137,67 @@ async def get_presigned_cluster_logs_url(self, cluster_id: str) -> str:
@sanitize_exceptions
async def get_cluster(
self, cluster_id: str, get_unregistered_clusters: bool = False
) -> models.Cluster:
) -> models.Cluster | dict[str, Any]:
"""
Get cluster information by ID.

Automatically detects which API to use based on INVENTORY_URL:
- If URL contains 'clusters_mgmt' → OCM API (ROSA/ARO/OSD)
- Otherwise → Assisted Installer API (OCP/SNO)

Args:
cluster_id: The unique identifier of the cluster.
get_unregistered_clusters: Whether to include unregistered clusters.
get_unregistered_clusters: Whether to include unregistered clusters (Assisted Installer only).

Returns:
models.Cluster: The cluster object containing cluster information.
"""
log.info(
"Getting cluster %s (unregistered: %s)",
cluster_id,
get_unregistered_clusters,
)
result = await self._api_call(
self._installer_api().v2_get_cluster,
cluster_id=cluster_id,
get_unregistered_clusters=get_unregistered_clusters,
)
log.info("Successfully retrieved cluster %s", cluster_id)
return cast(models.Cluster, result)
models.Cluster | dict[str, Any]: The cluster object (Assisted Installer) or dictionary (OCM).
"""
log.info("Getting cluster %s from %s", cluster_id, self.inventory_url)

# Detect API type based on URL
if "clusters_mgmt" in self.inventory_url:
# Use OCM API for managed clusters (ROSA/ARO/OSD)
log.info("Detected OCM API - getting managed cluster")
return await self.ocm_get_cluster(cluster_id)
else:
# Use Assisted Installer API for self-managed clusters (OCP/SNO)
log.info(
"Detected Assisted Installer API - getting self-managed cluster (unregistered: %s)",
get_unregistered_clusters,
)
result = await self._api_call(
self._installer_api().v2_get_cluster,
cluster_id=cluster_id,
get_unregistered_clusters=get_unregistered_clusters,
)
log.info("Successfully retrieved cluster %s", cluster_id)
return cast(models.Cluster, result)

@sanitize_exceptions
async def list_clusters(self) -> list:
"""
List all clusters accessible to the authenticated user.

Automatically detects which API to use based on INVENTORY_URL:
- If URL contains 'clusters_mgmt' → OCM API (ROSA/ARO/OSD)
- Otherwise → Assisted Installer API (OCP/SNO)

Returns:
list: A list of cluster objects.
"""
log.info("Listing all clusters")
result = await self._api_call(self._installer_api().v2_list_clusters)
log.info("Successfully listed clusters")
return cast(list, result)
log.info("Listing all clusters from %s", self.inventory_url)

# Detect API type based on URL
if "clusters_mgmt" in self.inventory_url:
# Use OCM API for managed clusters (ROSA/ARO/OSD)
log.info("Detected OCM API - listing managed clusters")
return await self.ocm_list_clusters()
else:
# Use Assisted Installer API for self-managed clusters (OCP/SNO)
log.info("Detected Assisted Installer API - listing self-managed clusters")
result = await self._api_call(self._installer_api().v2_list_clusters)
log.info("Successfully listed clusters")
return cast(list, result)

@sanitize_exceptions
async def get_cluster_logs(
Expand Down Expand Up @@ -573,3 +598,61 @@ async def get_infra_env_download_url(
infra_env_id,
)
return cast(models.PresignedUrl, result)

@sanitize_exceptions
async def ocm_list_clusters(self) -> list[dict[str, Any]]:
"""
List all OCM managed clusters (ROSA, ARO, OSD).

This method should only be called when INVENTORY_URL points to the OCM API.
Uses the clusters_mgmt API endpoint to retrieve managed clusters.

Returns:
list[dict[str, Any]]: A list of OCM cluster dictionaries containing cluster information.
"""
url = f"{self.inventory_url}/clusters"
headers = {"Authorization": f"Bearer {self.access_token}"}

log.info("Listing OCM clusters from %s", url)
try:
response = await asyncio.to_thread(
requests.get, url, headers=headers, timeout=30
)
response.raise_for_status()
data = response.json()
clusters = data.get("items", [])
log.info("Successfully listed %d OCM clusters", len(clusters))
return clusters
except RequestException as e:
log.error("Error while listing OCM clusters from %s: %s", url, str(e))
raise

@sanitize_exceptions
async def ocm_get_cluster(self, cluster_id: str) -> dict[str, Any]:
"""
Get detailed information about an OCM managed cluster.

This method should only be called when INVENTORY_URL points to the OCM API.
Uses the clusters_mgmt API endpoint to retrieve managed cluster details.

Args:
cluster_id: The unique identifier of the OCM cluster.

Returns:
dict[str, Any]: OCM cluster information dictionary.
"""
url = f"{self.inventory_url}/clusters/{cluster_id}"
headers = {"Authorization": f"Bearer {self.access_token}"}

log.info("Getting OCM cluster %s from %s", cluster_id, url)
try:
response = await asyncio.to_thread(
requests.get, url, headers=headers, timeout=30
)
response.raise_for_status()
cluster = response.json()
log.info("Successfully retrieved OCM cluster %s", cluster_id)
return cluster
except RequestException as e:
log.error("Error while getting OCM cluster %s: %s", cluster_id, str(e))
raise
4 changes: 2 additions & 2 deletions assisted_service_mcp/src/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Settings(BaseSettings):
)

# Transport Configuration
TRANSPORT: Literal["sse", "streamable-http"] = Field(
TRANSPORT: Literal["sse", "streamable-http", "stdio"] = Field(
default="sse",
json_schema_extra={
"env": "TRANSPORT",
Expand Down Expand Up @@ -185,7 +185,7 @@ def validate_config(cfg: Settings) -> None:
)

# Validate transport protocol
valid_transports = ["sse", "streamable-http"]
valid_transports = ["sse", "streamable-http", "stdio"]
if cfg.TRANSPORT not in valid_transports:
raise ValueError(
f"TRANSPORT must be one of {valid_transports}, got {cfg.TRANSPORT}"
Expand Down
80 changes: 62 additions & 18 deletions assisted_service_mcp/src/tools/cluster_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ async def cluster_info(
installation progress, and host information. Use this to check cluster state, verify
configuration, or monitor installation progress.

Automatically detects cluster type based on the configured INVENTORY_URL:
- Assisted Installer API → Returns self-managed cluster info (OCP, SNO)
- OCM API → Returns managed service cluster info (ROSA, ARO, OSD)

Prerequisites:
- Valid cluster UUID (from list_clusters or create_cluster)

Expand All @@ -40,7 +44,35 @@ async def cluster_info(
client = InventoryClient(get_access_token_func())
result = await client.get_cluster(cluster_id=cluster_id)
log.info("Successfully retrieved cluster information for %s", cluster_id)
return result.to_str()

# Handle both Assisted Installer (models.Cluster) and OCM (dict) responses
if isinstance(result, dict):
# OCM cluster - format manually
formatted_output = f"OCM Cluster Details: {result.get('name', 'Unknown')}\n\n"
formatted_output += f"Basic Information:\n"
formatted_output += f"- Cluster ID: {result.get('id', 'Unknown')}\n"
formatted_output += f"- State: {result.get('state', 'Unknown')}\n"
formatted_output += f"- OpenShift Version: {result.get('version', {}).get('raw_id', 'Unknown')}\n"
formatted_output += f"- Created: {result.get('creation_timestamp', 'Unknown')}\n\n"

formatted_output += f"Cloud Configuration:\n"
formatted_output += f"- Cloud Provider: {result.get('cloud_provider', {}).get('id', 'Unknown')}\n"
formatted_output += f"- Region: {result.get('region', {}).get('id', 'Unknown')}\n"
formatted_output += f"- Multi-AZ: {result.get('multi_az', False)}\n\n"

formatted_output += f"Access:\n"
formatted_output += f"- API URL: {result.get('api', {}).get('url', 'Unknown')}\n"
formatted_output += f"- Console URL: {result.get('console', {}).get('url', 'Unknown')}\n\n"

formatted_output += f"Node Information:\n"
nodes = result.get('nodes', {})
formatted_output += f"- Compute nodes: {nodes.get('compute', 0)}\n"
formatted_output += f"- Compute machine type: {nodes.get('compute_machine_type', {}).get('id', 'Unknown')}\n"

return formatted_output
else:
# Assisted Installer cluster - use to_str() method
return result.to_str()


@track_tool_usage()
Expand All @@ -51,36 +83,48 @@ async def list_clusters(get_access_token_func: Callable[[], str]) -> str:
basic information about each cluster (name, ID, version, status) without detailed
configuration. Use cluster_info() to get comprehensive details about a specific cluster.

Automatically detects cluster type based on the configured INVENTORY_URL:
- Assisted Installer API → Lists self-managed clusters (OCP, SNO)
- OCM API → Lists managed service clusters (ROSA, ARO, OSD)

Returns:
str: A formatted list of clusters, each containing:
- Cluster name
- Unique cluster ID
- OpenShift version
- Current cluster status (e.g., "ready", "installing", "error")
- Cloud provider and region (for OCM managed clusters)
"""
log.info("Retrieving list of all clusters")
client = InventoryClient(get_access_token_func())
clusters = await client.list_clusters()
resp = [
{
"name": cluster["name"],
"id": cluster["id"],
"openshift_version": cluster.get("openshift_version", "Unknown"),
"status": cluster["status"],
}
for cluster in clusters
]
log.info("Successfully retrieved %s clusters", len(resp))
if not resp:

if not clusters:
return "No clusters found."

formatted_output = ""
for cluster in resp:
formatted_output += f"{cluster['name']}\n"
formatted_output += f"- ID: {cluster['id']}\n"
formatted_output += f"- Openshift version: {cluster['openshift_version']}\n"
formatted_output += f"- Status: {cluster['status']}\n\n"

for cluster in clusters:
# Extract common fields
name = cluster.get("name", "Unknown")
cluster_id = cluster.get("id", "Unknown")
version = cluster.get("openshift_version") or cluster.get("version", {}).get("raw_id", "Unknown")
status = cluster.get("status") if isinstance(cluster.get("status"), str) else cluster.get("status", {}).get("state", cluster.get("state", "Unknown"))

# Format output
formatted_output += f"{name}\n"
formatted_output += f"- ID: {cluster_id}\n"
formatted_output += f"- OpenShift version: {version}\n"
formatted_output += f"- Status: {status}\n"

# Add OCM-specific fields if present
if "cloud_provider" in cluster:
formatted_output += f"- Cloud provider: {cluster.get('cloud_provider', {}).get('id', 'Unknown')}\n"
if "region" in cluster:
formatted_output += f"- Region: {cluster.get('region', {}).get('id', 'Unknown')}\n"

formatted_output += "\n"

log.info("Successfully retrieved %s clusters", len(clusters))
return formatted_output


Expand Down