diff --git a/server.py b/server.py index cf75860..20565a2 100644 --- a/server.py +++ b/server.py @@ -1,12 +1,13 @@ from mcp.server.fastmcp import FastMCP import json import os +import requests from service_client import InventoryClient mcp = FastMCP("AssistedService", host="0.0.0.0") -def get_offline_token(): +def get_offline_token() -> str: """Retrieve the offline token from environment variables or request headers. This function attempts to get the Red Hat OpenShift Cluster Manager (OCM) offline token @@ -31,6 +32,39 @@ def get_offline_token(): raise RuntimeError("No offline token found in environment or request headers") +def get_access_token() -> str: + """Retrieve the access token. + + This function tries to get the Red Hat OpenShift Cluster Manager (OCM) access token. First + it tries to extract it from the authorization header, and if it isn't there then it tries + to generate a new one using the offline token. + + Returns: + str: The access token. + + Raises: + RuntimeError: If it isn't possible to obtain or generate the access token. + """ + # First try to get the token from the authorization header: + request = mcp.get_context().request_context.request + if request is not None: + header = request.headers.get("Authorization") + if header is not None: + parts = header.split() + if len(parts) == 2 and parts[0].lower() == "bearer": + return parts[1] + + # Now try to get the offline token, and generate a new access token from it: + params = { + "client_id": "cloud-services", + "grant_type": "refresh_token", + "refresh_token": get_offline_token(), + } + sso_url = os.environ.get("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token") + response = requests.post(sso_url, data=params) + response.raise_for_status() + return response.json()["access_token"] + @mcp.tool() def cluster_info(cluster_id: str) -> str: """Get comprehensive information about a specific assisted installer cluster. @@ -49,7 +83,7 @@ def cluster_info(cluster_id: str) -> str: - Network configuration (VIPs, subnets) - Host information and roles """ - return InventoryClient(get_offline_token()).get_cluster(cluster_id=cluster_id).to_str() + return InventoryClient(get_access_token()).get_cluster(cluster_id=cluster_id).to_str() @mcp.tool() def list_clusters() -> str: @@ -67,7 +101,7 @@ def list_clusters() -> str: - openshift_version (str): The OpenShift version being installed - status (str): Current cluster status (e.g., 'ready', 'installing', 'error') """ - clusters = InventoryClient(get_offline_token()).list_clusters() + clusters = InventoryClient(get_access_token()).list_clusters() resp = [{"name": cluster["name"], "id": cluster["id"], "openshift_version": cluster["openshift_version"], "status": cluster["status"]} for cluster in clusters] return json.dumps(resp) @@ -86,7 +120,7 @@ def cluster_events(cluster_id: str) -> str: str: A JSON-formatted string containing cluster events with timestamps, event types, and descriptive messages about cluster activities. """ - return InventoryClient(get_offline_token()).get_events(cluster_id=cluster_id) + return InventoryClient(get_access_token()).get_events(cluster_id=cluster_id) @mcp.tool() def host_events(cluster_id: str, host_id: str) -> str: @@ -103,7 +137,7 @@ def host_events(cluster_id: str, host_id: str) -> str: str: A JSON-formatted string containing host-specific events including hardware validation results, installation steps, and error messages. """ - return InventoryClient(get_offline_token()).get_events(cluster_id=cluster_id, host_id=host_id) + return InventoryClient(get_access_token()).get_events(cluster_id=cluster_id, host_id=host_id) @mcp.tool() def infraenv_info(infraenv_id: str) -> str: @@ -124,7 +158,7 @@ def infraenv_info(infraenv_id: str) -> str: - Associated cluster information - Static network configuration if applicable """ - return InventoryClient(get_offline_token()).get_infra_env(infraenv_id).to_str() + return InventoryClient(get_access_token()).get_infra_env(infraenv_id).to_str() @mcp.tool() def create_cluster(name: str, version: str, base_domain: str, single_node: bool) -> str: @@ -148,7 +182,7 @@ def create_cluster(name: str, version: str, base_domain: str, single_node: bool) - cluster_id (str): The unique identifier of the created cluster - infraenv_id (str): The unique identifier of the created InfraEnv """ - client = InventoryClient(get_offline_token()) + client = InventoryClient(get_access_token()) cluster = client.create_cluster(name, version, single_node, base_dns_domain=base_domain) infraenv = client.create_infra_env(name, cluster_id=cluster.id, openshift_version=cluster.openshift_version) return json.dumps({'cluster_id': cluster.id, 'infraenv_id': infraenv.id}) @@ -172,7 +206,7 @@ def set_cluster_vips(cluster_id: str, api_vip: str, ingress_vip: str) -> str: str: A formatted string containing the updated cluster configuration showing the newly set VIP addresses. """ - return InventoryClient(get_offline_token()).update_cluster(cluster_id, api_vip=api_vip, ingress_vip=ingress_vip).to_str() + return InventoryClient(get_access_token()).update_cluster(cluster_id, api_vip=api_vip, ingress_vip=ingress_vip).to_str() @mcp.tool() def install_cluster(cluster_id: str) -> str: @@ -195,7 +229,7 @@ def install_cluster(cluster_id: str) -> str: - Network configuration is complete (VIPs set if required) - All cluster validations pass """ - return InventoryClient(get_offline_token()).install_cluster(cluster_id).to_str() + return InventoryClient(get_access_token()).install_cluster(cluster_id).to_str() @mcp.tool() def list_versions() -> str: @@ -209,7 +243,7 @@ def list_versions() -> str: str: A JSON string containing available OpenShift versions with metadata including version numbers, release dates, and support status. """ - return json.dumps(InventoryClient(get_offline_token()).get_openshift_versions(True)) + return json.dumps(InventoryClient(get_access_token()).get_openshift_versions(True)) @mcp.tool() def list_operator_bundles() -> str: @@ -223,7 +257,7 @@ def list_operator_bundles() -> str: str: A JSON string containing available operator bundles with metadata including bundle names, descriptions, and operator details. """ - return json.dumps(InventoryClient(get_offline_token()).get_operator_bundles()) + return json.dumps(InventoryClient(get_access_token()).get_operator_bundles()) @mcp.tool() def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> str: @@ -242,7 +276,7 @@ def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> str: str: A formatted string containing the updated cluster configuration showing the newly added operator bundle. """ - return InventoryClient(get_offline_token()).add_operator_bundle_to_cluster(cluster_id, bundle_name).to_str() + return InventoryClient(get_access_token()).add_operator_bundle_to_cluster(cluster_id, bundle_name).to_str() @mcp.tool() def set_host_role(host_id: str, infraenv_id: str, role: str) -> str: @@ -263,7 +297,7 @@ def set_host_role(host_id: str, infraenv_id: str, role: str) -> str: str: A formatted string containing the updated host configuration showing the newly assigned role. """ - return InventoryClient(get_offline_token()).update_host(host_id, infraenv_id, host_role=role).to_str() + return InventoryClient(get_access_token()).update_host(host_id, infraenv_id, host_role=role).to_str() if __name__ == "__main__": mcp.run(transport="sse") diff --git a/service_client/assisted_service_api.py b/service_client/assisted_service_api.py index 4e7d36e..16d920b 100644 --- a/service_client/assisted_service_api.py +++ b/service_client/assisted_service_api.py @@ -9,27 +9,15 @@ from service_client.logger import log class InventoryClient(object): - def __init__(self, offline_token: str): + def __init__(self, access_token: str): + self.access_token = access_token + self.pull_secret = self._get_pull_secret() self.inventory_url = os.environ.get("INVENTORY_URL", "https://api.openshift.com/api/assisted-install/v2") - self.offline_token = offline_token - self.access_token = self._get_access_token(self.offline_token) - self.pull_secret = self._get_pull_secret(self.access_token) self.client_debug = os.environ.get("CLIENT_DEBUG", "False").lower() == "true" - def _get_access_token(self, offline_token: str) -> str: - params = { - "client_id": "cloud-services", - "grant_type": "refresh_token", - "refresh_token": offline_token, - } - sso_url = os.environ.get("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token") - response = requests.post(sso_url, data=params) - response.raise_for_status() - return response.json()["access_token"] - - def _get_pull_secret(self, access_token: str) -> str: + def _get_pull_secret(self) -> str: url = os.environ.get("PULL_SECRET_URL", "https://api.openshift.com/api/accounts_mgmt/v1/access_token") - headers = {"Authorization": f"Bearer {access_token}"} + headers = {"Authorization": f"Bearer {self.access_token}"} response = requests.post(url, headers=headers) response.raise_for_status() return response.text