Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions server.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from mcp.server.fastmcp import FastMCP
import json
import os
import requests

from service_client import InventoryClient

mcp = FastMCP("AssistedService", host="0.0.0.0")

def get_offline_token():
def get_offline_token() -> str:
"""Retrieve the offline token from environment variables or request headers.

This function attempts to get the Red Hat OpenShift Cluster Manager (OCM) offline token
Expand All @@ -31,6 +32,39 @@ def get_offline_token():

raise RuntimeError("No offline token found in environment or request headers")

def get_access_token() -> str:
"""Retrieve the access token.

This function tries to get the Red Hat OpenShift Cluster Manager (OCM) access token. First
it tries to extract it from the authorization header, and if it isn't there then it tries
to generate a new one using the offline token.

Returns:
str: The access token.

Raises:
RuntimeError: If it isn't possible to obtain or generate the access token.
"""
# First try to get the token from the authorization header:
request = mcp.get_context().request_context.request
if request is not None:
header = request.headers.get("Authorization")
if header is not None:
parts = header.split()
if len(parts) == 2 and parts[0].lower() == "bearer":
return parts[1]

# Now try to get the offline token, and generate a new access token from it:
params = {
"client_id": "cloud-services",
"grant_type": "refresh_token",
"refresh_token": get_offline_token(),
}
sso_url = os.environ.get("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token")
response = requests.post(sso_url, data=params)
response.raise_for_status()
return response.json()["access_token"]

@mcp.tool()
def cluster_info(cluster_id: str) -> str:
"""Get comprehensive information about a specific assisted installer cluster.
Expand All @@ -49,7 +83,7 @@ def cluster_info(cluster_id: str) -> str:
- Network configuration (VIPs, subnets)
- Host information and roles
"""
return InventoryClient(get_offline_token()).get_cluster(cluster_id=cluster_id).to_str()
return InventoryClient(get_access_token()).get_cluster(cluster_id=cluster_id).to_str()

@mcp.tool()
def list_clusters() -> str:
Expand All @@ -67,7 +101,7 @@ def list_clusters() -> str:
- openshift_version (str): The OpenShift version being installed
- status (str): Current cluster status (e.g., 'ready', 'installing', 'error')
"""
clusters = InventoryClient(get_offline_token()).list_clusters()
clusters = InventoryClient(get_access_token()).list_clusters()
resp = [{"name": cluster["name"], "id": cluster["id"], "openshift_version": cluster["openshift_version"], "status": cluster["status"]} for cluster in clusters]
return json.dumps(resp)

Expand All @@ -86,7 +120,7 @@ def cluster_events(cluster_id: str) -> str:
str: A JSON-formatted string containing cluster events with timestamps,
event types, and descriptive messages about cluster activities.
"""
return InventoryClient(get_offline_token()).get_events(cluster_id=cluster_id)
return InventoryClient(get_access_token()).get_events(cluster_id=cluster_id)

@mcp.tool()
def host_events(cluster_id: str, host_id: str) -> str:
Expand All @@ -103,7 +137,7 @@ def host_events(cluster_id: str, host_id: str) -> str:
str: A JSON-formatted string containing host-specific events including
hardware validation results, installation steps, and error messages.
"""
return InventoryClient(get_offline_token()).get_events(cluster_id=cluster_id, host_id=host_id)
return InventoryClient(get_access_token()).get_events(cluster_id=cluster_id, host_id=host_id)

@mcp.tool()
def infraenv_info(infraenv_id: str) -> str:
Expand All @@ -124,7 +158,7 @@ def infraenv_info(infraenv_id: str) -> str:
- Associated cluster information
- Static network configuration if applicable
"""
return InventoryClient(get_offline_token()).get_infra_env(infraenv_id).to_str()
return InventoryClient(get_access_token()).get_infra_env(infraenv_id).to_str()

@mcp.tool()
def create_cluster(name: str, version: str, base_domain: str, single_node: bool) -> str:
Expand All @@ -148,7 +182,7 @@ def create_cluster(name: str, version: str, base_domain: str, single_node: bool)
- cluster_id (str): The unique identifier of the created cluster
- infraenv_id (str): The unique identifier of the created InfraEnv
"""
client = InventoryClient(get_offline_token())
client = InventoryClient(get_access_token())
cluster = client.create_cluster(name, version, single_node, base_dns_domain=base_domain)
infraenv = client.create_infra_env(name, cluster_id=cluster.id, openshift_version=cluster.openshift_version)
return json.dumps({'cluster_id': cluster.id, 'infraenv_id': infraenv.id})
Expand All @@ -172,7 +206,7 @@ def set_cluster_vips(cluster_id: str, api_vip: str, ingress_vip: str) -> str:
str: A formatted string containing the updated cluster configuration
showing the newly set VIP addresses.
"""
return InventoryClient(get_offline_token()).update_cluster(cluster_id, api_vip=api_vip, ingress_vip=ingress_vip).to_str()
return InventoryClient(get_access_token()).update_cluster(cluster_id, api_vip=api_vip, ingress_vip=ingress_vip).to_str()

@mcp.tool()
def install_cluster(cluster_id: str) -> str:
Expand All @@ -195,7 +229,7 @@ def install_cluster(cluster_id: str) -> str:
- Network configuration is complete (VIPs set if required)
- All cluster validations pass
"""
return InventoryClient(get_offline_token()).install_cluster(cluster_id).to_str()
return InventoryClient(get_access_token()).install_cluster(cluster_id).to_str()

@mcp.tool()
def list_versions() -> str:
Expand All @@ -209,7 +243,7 @@ def list_versions() -> str:
str: A JSON string containing available OpenShift versions with metadata
including version numbers, release dates, and support status.
"""
return json.dumps(InventoryClient(get_offline_token()).get_openshift_versions(True))
return json.dumps(InventoryClient(get_access_token()).get_openshift_versions(True))

@mcp.tool()
def list_operator_bundles() -> str:
Expand All @@ -223,7 +257,7 @@ def list_operator_bundles() -> str:
str: A JSON string containing available operator bundles with metadata
including bundle names, descriptions, and operator details.
"""
return json.dumps(InventoryClient(get_offline_token()).get_operator_bundles())
return json.dumps(InventoryClient(get_access_token()).get_operator_bundles())

@mcp.tool()
def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> str:
Expand All @@ -242,7 +276,7 @@ def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> str:
str: A formatted string containing the updated cluster configuration
showing the newly added operator bundle.
"""
return InventoryClient(get_offline_token()).add_operator_bundle_to_cluster(cluster_id, bundle_name).to_str()
return InventoryClient(get_access_token()).add_operator_bundle_to_cluster(cluster_id, bundle_name).to_str()

@mcp.tool()
def set_host_role(host_id: str, infraenv_id: str, role: str) -> str:
Expand All @@ -263,7 +297,7 @@ def set_host_role(host_id: str, infraenv_id: str, role: str) -> str:
str: A formatted string containing the updated host configuration
showing the newly assigned role.
"""
return InventoryClient(get_offline_token()).update_host(host_id, infraenv_id, host_role=role).to_str()
return InventoryClient(get_access_token()).update_host(host_id, infraenv_id, host_role=role).to_str()

if __name__ == "__main__":
mcp.run(transport="sse")
22 changes: 5 additions & 17 deletions service_client/assisted_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,15 @@
from service_client.logger import log

class InventoryClient(object):
def __init__(self, offline_token: str):
def __init__(self, access_token: str):
self.access_token = access_token
self.pull_secret = self._get_pull_secret()
self.inventory_url = os.environ.get("INVENTORY_URL", "https://api.openshift.com/api/assisted-install/v2")
self.offline_token = offline_token
self.access_token = self._get_access_token(self.offline_token)
self.pull_secret = self._get_pull_secret(self.access_token)
self.client_debug = os.environ.get("CLIENT_DEBUG", "False").lower() == "true"

def _get_access_token(self, offline_token: str) -> str:
params = {
"client_id": "cloud-services",
"grant_type": "refresh_token",
"refresh_token": offline_token,
}
sso_url = os.environ.get("SSO_URL", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token")
response = requests.post(sso_url, data=params)
response.raise_for_status()
return response.json()["access_token"]

def _get_pull_secret(self, access_token: str) -> str:
def _get_pull_secret(self) -> str:
url = os.environ.get("PULL_SECRET_URL", "https://api.openshift.com/api/accounts_mgmt/v1/access_token")
headers = {"Authorization": f"Bearer {access_token}"}
headers = {"Authorization": f"Bearer {self.access_token}"}
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.text
Expand Down