diff --git a/assisted_service_mcp/src/api.py b/assisted_service_mcp/src/api.py index 955d9a1..16136f6 100644 --- a/assisted_service_mcp/src/api.py +++ b/assisted_service_mcp/src/api.py @@ -15,7 +15,12 @@ server = AssistedServiceMCPServer() # Choose the appropriate transport protocol based on settings -if settings.TRANSPORT == "streamable-http": +if settings.TRANSPORT == "stdio": + # For stdio transport, no HTTP app is needed + # The server will run directly via asyncio + app = None # type: ignore + log.info("Using stdio transport (for Claude Code and CLI clients)") +elif settings.TRANSPORT == "streamable-http": app = server.mcp.streamable_http_app() log.info("Using StreamableHTTP transport (stateless)") else: diff --git a/assisted_service_mcp/src/main.py b/assisted_service_mcp/src/main.py index 96af2e4..62c0051 100644 --- a/assisted_service_mcp/src/main.py +++ b/assisted_service_mcp/src/main.py @@ -10,7 +10,8 @@ def main() -> None: """Start the MCP server. - Initializes the server, sets up metrics, and starts the uvicorn server. + Initializes the server, sets up metrics, and starts the server with + the configured transport (stdio, sse, or streamable-http). """ try: log.info("Starting Assisted Service MCP Server") @@ -26,12 +27,18 @@ def main() -> None: initiate_metrics(tool_names) log.info("Initialized metrics for %s tools", len(tool_names)) - # Add metrics endpoint - app.add_route("/metrics", metrics) - log.info("Metrics endpoint available at /metrics") - - # Start the server using settings - uvicorn.run(app, host=settings.MCP_HOST, port=settings.MCP_PORT) + # Handle stdio transport differently - no HTTP server needed + if settings.TRANSPORT == "stdio": + log.info("Running in stdio mode") + # Run the MCP server directly with stdio transport + server.mcp.run() + else: + # Add metrics endpoint for HTTP transports + app.add_route("/metrics", metrics) + log.info("Metrics endpoint available at /metrics") + + # Start the HTTP server using uvicorn + uvicorn.run(app, host=settings.MCP_HOST, port=settings.MCP_PORT) except KeyboardInterrupt: log.info("Received keyboard interrupt, shutting down") diff --git a/assisted_service_mcp/src/service_client/assisted_service_api.py b/assisted_service_mcp/src/service_client/assisted_service_api.py index baa4521..08f9552 100644 --- a/assisted_service_mcp/src/service_client/assisted_service_api.py +++ b/assisted_service_mcp/src/service_client/assisted_service_api.py @@ -137,42 +137,67 @@ async def get_presigned_cluster_logs_url(self, cluster_id: str) -> str: @sanitize_exceptions async def get_cluster( self, cluster_id: str, get_unregistered_clusters: bool = False - ) -> models.Cluster: + ) -> models.Cluster | dict[str, Any]: """ Get cluster information by ID. + Automatically detects which API to use based on INVENTORY_URL: + - If URL contains 'clusters_mgmt' → OCM API (ROSA/ARO/OSD) + - Otherwise → Assisted Installer API (OCP/SNO) + Args: cluster_id: The unique identifier of the cluster. - get_unregistered_clusters: Whether to include unregistered clusters. + get_unregistered_clusters: Whether to include unregistered clusters (Assisted Installer only). Returns: - models.Cluster: The cluster object containing cluster information. - """ - log.info( - "Getting cluster %s (unregistered: %s)", - cluster_id, - get_unregistered_clusters, - ) - result = await self._api_call( - self._installer_api().v2_get_cluster, - cluster_id=cluster_id, - get_unregistered_clusters=get_unregistered_clusters, - ) - log.info("Successfully retrieved cluster %s", cluster_id) - return cast(models.Cluster, result) + models.Cluster | dict[str, Any]: The cluster object (Assisted Installer) or dictionary (OCM). + """ + log.info("Getting cluster %s from %s", cluster_id, self.inventory_url) + + # Detect API type based on URL + if "clusters_mgmt" in self.inventory_url: + # Use OCM API for managed clusters (ROSA/ARO/OSD) + log.info("Detected OCM API - getting managed cluster") + return await self.ocm_get_cluster(cluster_id) + else: + # Use Assisted Installer API for self-managed clusters (OCP/SNO) + log.info( + "Detected Assisted Installer API - getting self-managed cluster (unregistered: %s)", + get_unregistered_clusters, + ) + result = await self._api_call( + self._installer_api().v2_get_cluster, + cluster_id=cluster_id, + get_unregistered_clusters=get_unregistered_clusters, + ) + log.info("Successfully retrieved cluster %s", cluster_id) + return cast(models.Cluster, result) @sanitize_exceptions async def list_clusters(self) -> list: """ List all clusters accessible to the authenticated user. + Automatically detects which API to use based on INVENTORY_URL: + - If URL contains 'clusters_mgmt' → OCM API (ROSA/ARO/OSD) + - Otherwise → Assisted Installer API (OCP/SNO) + Returns: list: A list of cluster objects. """ - log.info("Listing all clusters") - result = await self._api_call(self._installer_api().v2_list_clusters) - log.info("Successfully listed clusters") - return cast(list, result) + log.info("Listing all clusters from %s", self.inventory_url) + + # Detect API type based on URL + if "clusters_mgmt" in self.inventory_url: + # Use OCM API for managed clusters (ROSA/ARO/OSD) + log.info("Detected OCM API - listing managed clusters") + return await self.ocm_list_clusters() + else: + # Use Assisted Installer API for self-managed clusters (OCP/SNO) + log.info("Detected Assisted Installer API - listing self-managed clusters") + result = await self._api_call(self._installer_api().v2_list_clusters) + log.info("Successfully listed clusters") + return cast(list, result) @sanitize_exceptions async def get_cluster_logs( @@ -573,3 +598,61 @@ async def get_infra_env_download_url( infra_env_id, ) return cast(models.PresignedUrl, result) + + @sanitize_exceptions + async def ocm_list_clusters(self) -> list[dict[str, Any]]: + """ + List all OCM managed clusters (ROSA, ARO, OSD). + + This method should only be called when INVENTORY_URL points to the OCM API. + Uses the clusters_mgmt API endpoint to retrieve managed clusters. + + Returns: + list[dict[str, Any]]: A list of OCM cluster dictionaries containing cluster information. + """ + url = f"{self.inventory_url}/clusters" + headers = {"Authorization": f"Bearer {self.access_token}"} + + log.info("Listing OCM clusters from %s", url) + try: + response = await asyncio.to_thread( + requests.get, url, headers=headers, timeout=30 + ) + response.raise_for_status() + data = response.json() + clusters = data.get("items", []) + log.info("Successfully listed %d OCM clusters", len(clusters)) + return clusters + except RequestException as e: + log.error("Error while listing OCM clusters from %s: %s", url, str(e)) + raise + + @sanitize_exceptions + async def ocm_get_cluster(self, cluster_id: str) -> dict[str, Any]: + """ + Get detailed information about an OCM managed cluster. + + This method should only be called when INVENTORY_URL points to the OCM API. + Uses the clusters_mgmt API endpoint to retrieve managed cluster details. + + Args: + cluster_id: The unique identifier of the OCM cluster. + + Returns: + dict[str, Any]: OCM cluster information dictionary. + """ + url = f"{self.inventory_url}/clusters/{cluster_id}" + headers = {"Authorization": f"Bearer {self.access_token}"} + + log.info("Getting OCM cluster %s from %s", cluster_id, url) + try: + response = await asyncio.to_thread( + requests.get, url, headers=headers, timeout=30 + ) + response.raise_for_status() + cluster = response.json() + log.info("Successfully retrieved OCM cluster %s", cluster_id) + return cluster + except RequestException as e: + log.error("Error while getting OCM cluster %s: %s", cluster_id, str(e)) + raise diff --git a/assisted_service_mcp/src/settings.py b/assisted_service_mcp/src/settings.py index f871ddf..7b86202 100644 --- a/assisted_service_mcp/src/settings.py +++ b/assisted_service_mcp/src/settings.py @@ -49,7 +49,7 @@ class Settings(BaseSettings): ) # Transport Configuration - TRANSPORT: Literal["sse", "streamable-http"] = Field( + TRANSPORT: Literal["sse", "streamable-http", "stdio"] = Field( default="sse", json_schema_extra={ "env": "TRANSPORT", @@ -185,7 +185,7 @@ def validate_config(cfg: Settings) -> None: ) # Validate transport protocol - valid_transports = ["sse", "streamable-http"] + valid_transports = ["sse", "streamable-http", "stdio"] if cfg.TRANSPORT not in valid_transports: raise ValueError( f"TRANSPORT must be one of {valid_transports}, got {cfg.TRANSPORT}" diff --git a/assisted_service_mcp/src/tools/cluster_tools.py b/assisted_service_mcp/src/tools/cluster_tools.py index 1c842a6..11c1bef 100644 --- a/assisted_service_mcp/src/tools/cluster_tools.py +++ b/assisted_service_mcp/src/tools/cluster_tools.py @@ -26,6 +26,10 @@ async def cluster_info( installation progress, and host information. Use this to check cluster state, verify configuration, or monitor installation progress. + Automatically detects cluster type based on the configured INVENTORY_URL: + - Assisted Installer API → Returns self-managed cluster info (OCP, SNO) + - OCM API → Returns managed service cluster info (ROSA, ARO, OSD) + Prerequisites: - Valid cluster UUID (from list_clusters or create_cluster) @@ -40,7 +44,35 @@ async def cluster_info( client = InventoryClient(get_access_token_func()) result = await client.get_cluster(cluster_id=cluster_id) log.info("Successfully retrieved cluster information for %s", cluster_id) - return result.to_str() + + # Handle both Assisted Installer (models.Cluster) and OCM (dict) responses + if isinstance(result, dict): + # OCM cluster - format manually + formatted_output = f"OCM Cluster Details: {result.get('name', 'Unknown')}\n\n" + formatted_output += f"Basic Information:\n" + formatted_output += f"- Cluster ID: {result.get('id', 'Unknown')}\n" + formatted_output += f"- State: {result.get('state', 'Unknown')}\n" + formatted_output += f"- OpenShift Version: {result.get('version', {}).get('raw_id', 'Unknown')}\n" + formatted_output += f"- Created: {result.get('creation_timestamp', 'Unknown')}\n\n" + + formatted_output += f"Cloud Configuration:\n" + formatted_output += f"- Cloud Provider: {result.get('cloud_provider', {}).get('id', 'Unknown')}\n" + formatted_output += f"- Region: {result.get('region', {}).get('id', 'Unknown')}\n" + formatted_output += f"- Multi-AZ: {result.get('multi_az', False)}\n\n" + + formatted_output += f"Access:\n" + formatted_output += f"- API URL: {result.get('api', {}).get('url', 'Unknown')}\n" + formatted_output += f"- Console URL: {result.get('console', {}).get('url', 'Unknown')}\n\n" + + formatted_output += f"Node Information:\n" + nodes = result.get('nodes', {}) + formatted_output += f"- Compute nodes: {nodes.get('compute', 0)}\n" + formatted_output += f"- Compute machine type: {nodes.get('compute_machine_type', {}).get('id', 'Unknown')}\n" + + return formatted_output + else: + # Assisted Installer cluster - use to_str() method + return result.to_str() @track_tool_usage() @@ -51,36 +83,48 @@ async def list_clusters(get_access_token_func: Callable[[], str]) -> str: basic information about each cluster (name, ID, version, status) without detailed configuration. Use cluster_info() to get comprehensive details about a specific cluster. + Automatically detects cluster type based on the configured INVENTORY_URL: + - Assisted Installer API → Lists self-managed clusters (OCP, SNO) + - OCM API → Lists managed service clusters (ROSA, ARO, OSD) + Returns: str: A formatted list of clusters, each containing: - Cluster name - Unique cluster ID - OpenShift version - Current cluster status (e.g., "ready", "installing", "error") + - Cloud provider and region (for OCM managed clusters) """ log.info("Retrieving list of all clusters") client = InventoryClient(get_access_token_func()) clusters = await client.list_clusters() - resp = [ - { - "name": cluster["name"], - "id": cluster["id"], - "openshift_version": cluster.get("openshift_version", "Unknown"), - "status": cluster["status"], - } - for cluster in clusters - ] - log.info("Successfully retrieved %s clusters", len(resp)) - if not resp: + + if not clusters: return "No clusters found." formatted_output = "" - for cluster in resp: - formatted_output += f"{cluster['name']}\n" - formatted_output += f"- ID: {cluster['id']}\n" - formatted_output += f"- Openshift version: {cluster['openshift_version']}\n" - formatted_output += f"- Status: {cluster['status']}\n\n" - + for cluster in clusters: + # Extract common fields + name = cluster.get("name", "Unknown") + cluster_id = cluster.get("id", "Unknown") + version = cluster.get("openshift_version") or cluster.get("version", {}).get("raw_id", "Unknown") + status = cluster.get("status") if isinstance(cluster.get("status"), str) else cluster.get("status", {}).get("state", cluster.get("state", "Unknown")) + + # Format output + formatted_output += f"{name}\n" + formatted_output += f"- ID: {cluster_id}\n" + formatted_output += f"- OpenShift version: {version}\n" + formatted_output += f"- Status: {status}\n" + + # Add OCM-specific fields if present + if "cloud_provider" in cluster: + formatted_output += f"- Cloud provider: {cluster.get('cloud_provider', {}).get('id', 'Unknown')}\n" + if "region" in cluster: + formatted_output += f"- Region: {cluster.get('region', {}).get('id', 'Unknown')}\n" + + formatted_output += "\n" + + log.info("Successfully retrieved %s clusters", len(clusters)) return formatted_output