Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 139 additions & 17 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import json
import os
import asyncio
from typing import Optional, Any
from typing import Any, Annotated

import requests
import uvicorn
from pydantic import Field
Comment on lines +11 to +15
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Validate runtime supports Annotated/| unions and Pydantic annotations

This file now relies on Python 3.10+ (str | None) and Pydantic support for Annotated[...] fields. Please confirm CI/runtime versions to avoid runtime/type-model mismatches.

Run:


🏁 Script executed:

#!/bin/bash
# Verify Python and Pydantic versions declared in repo
fd -a 'pyproject.toml|setup.cfg|setup.py|requirements*.txt|Pipfile|poetry.lock|.tool-versions|.python-version|Dockerfile*' -HI \
| sed 's/^/--- /'
echo
rg -n -C2 -e 'python_requires|requires-python|tool.poetry.dependencies.*python|FROM .*python|pydantic\s*[<=>]' \
  pyproject.toml setup.cfg setup.py requirements*.txt Pipfile poetry.lock .tool-versions .python-version Dockerfile* 2>/dev/null || true

Length of output: 485


Align Python runtime and dependency versions

  • Mismatch: pyproject.toml declares requires-python = ">=3.13" (pyproject.toml:6) but Dockerfile uses a Python 3.11 base image (Dockerfile:1) — update the Dockerfile or lower the declared Python requirement so they match.
  • server.py uses typing.Annotated and PEP‑604 unions (str | None) — ensure CI/runtime >=3.10 and consistent with the declared version.
  • No explicit Pydantic pin found in scanned manifests — add/pin a Pydantic version that supports your Annotated usage and verify CI installs that version.
  • Verify .python-version (repo root) and any CI images/runner configs also match the chosen target.

from assisted_service_client import models
from mcp.server.fastmcp import FastMCP

Expand Down Expand Up @@ -134,7 +135,14 @@ def get_access_token() -> str:

@mcp.tool()
@track_tool_usage()
async def cluster_info(cluster_id: str) -> str:
async def cluster_info(
cluster_id: Annotated[
str,
Field(
description="The unique identifier of the cluster to retrieve information for. This is typically a UUID string."
),
],
) -> str:
"""
Get comprehensive information about a specific assisted installer cluster.

Expand Down Expand Up @@ -195,7 +203,12 @@ async def list_clusters() -> str:

@mcp.tool()
@track_tool_usage()
async def cluster_events(cluster_id: str) -> str:
async def cluster_events(
cluster_id: Annotated[
str,
Field(description="The unique identifier of the cluster to get events for."),
],
) -> str:
"""
Get the events related to a cluster with the given cluster id.

Expand All @@ -219,7 +232,18 @@ async def cluster_events(cluster_id: str) -> str:

@mcp.tool()
@track_tool_usage()
async def host_events(cluster_id: str, host_id: str) -> str:
async def host_events(
cluster_id: Annotated[
str,
Field(description="The unique identifier of the cluster containing the host."),
],
host_id: Annotated[
str,
Field(
description="The unique identifier of the specific host to get events for."
),
],
) -> str:
"""
Get events specific to a particular host within a cluster.

Expand All @@ -245,7 +269,14 @@ async def host_events(cluster_id: str, host_id: str) -> str:

@mcp.tool()
@track_tool_usage()
async def cluster_iso_download_url(cluster_id: str) -> str:
async def cluster_iso_download_url(
cluster_id: Annotated[
str,
Field(
description="The unique identifier of the cluster, whose ISO image URL has to be retrieved."
),
],
) -> str:
"""
Get ISO download URL(s) for a cluster.

Expand Down Expand Up @@ -304,12 +335,36 @@ async def cluster_iso_download_url(cluster_id: str) -> str:
@mcp.tool()
@track_tool_usage()
async def create_cluster( # pylint: disable=too-many-arguments,too-many-positional-arguments
name: str,
version: str,
base_domain: str,
single_node: bool,
ssh_public_key: Optional[str] = None,
cpu_architecture: Optional[str] = "x86_64",
name: Annotated[str, Field(description="The name of the new cluster.")],
version: Annotated[
str,
Field(
description="The OpenShift version to install (e.g., '4.18.2', '4.17.1')."
),
],
base_domain: Annotated[
str,
Field(
description="The base DNS domain for the cluster (e.g., 'example.com'). The cluster will be accessible at api.<name>.<base_domain>."
),
],
single_node: Annotated[
bool,
Field(
description="Whether to create a single-node cluster.Set to True for edge deployments or resource-constrained environments. Set to False for production high-availability clusters with multiple control plane nodes."
),
],
ssh_public_key: Annotated[
str | None,
Field(default=None, description="SSH public key for accessing cluster nodes."),
] = None,
cpu_architecture: Annotated[
str,
Field(
default="x86_64",
description="The CPU architecture for the cluster. Defaults to 'x86_64' if not specified. Valid options are: x86_64, aarch64, arm64, ppc64le, s390x.",
),
] = "x86_64",
) -> str:
"""
Create a new OpenShift cluster.
Expand Down Expand Up @@ -388,7 +443,23 @@ async def create_cluster( # pylint: disable=too-many-arguments,too-many-positio

@mcp.tool()
@track_tool_usage()
async def set_cluster_vips(cluster_id: str, api_vip: str, ingress_vip: str) -> str:
async def set_cluster_vips(
cluster_id: Annotated[
str, Field(description="The unique identifier of the cluster to configure.")
],
api_vip: Annotated[
str,
Field(
description="The IP address for the cluster API endpoint. This is where kubectl and other management tools will connect."
),
],
ingress_vip: Annotated[
str,
Field(
description="The IP address for ingress traffic to applications running in the cluster."
),
],
) -> str:
"""
Configure the virtual IP addresses (VIPs) for cluster API and ingress traffic.

Expand Down Expand Up @@ -423,7 +494,11 @@ async def set_cluster_vips(cluster_id: str, api_vip: str, ingress_vip: str) -> s

@mcp.tool()
@track_tool_usage()
async def install_cluster(cluster_id: str) -> str:
async def install_cluster(
cluster_id: Annotated[
str, Field(description="The unique identifier of the cluster to install.")
],
) -> str:
"""
Trigger the installation process for a prepared cluster.

Expand Down Expand Up @@ -494,7 +569,17 @@ async def list_operator_bundles() -> str:

@mcp.tool()
@track_tool_usage()
async def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> str:
async def add_operator_bundle_to_cluster(
cluster_id: Annotated[
str, Field(description="The unique identifier of the cluster to configure.")
],
bundle_name: Annotated[
str,
Field(
description="The name of the operator bundle to add. The available operator bundle names are 'virtualization' and 'openshift-ai'"
),
],
) -> str:
"""
Add an operator bundle to be installed with the cluster.

Expand Down Expand Up @@ -522,7 +607,20 @@ async def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> s

@mcp.tool()
@track_tool_usage()
async def cluster_credentials_download_url(cluster_id: str, file_name: str) -> str:
async def cluster_credentials_download_url(
cluster_id: Annotated[
str,
Field(
description="The unique identifier of the cluster to get credentials for."
),
],
file_name: Annotated[
str,
Field(
description="The type of credential file to download. Valid options are: kubeconfig (Standard kubeconfig file for cluster access), kubeconfig-noingress (Kubeconfig without ingress configuration), kubeadmin-password (The kubeadmin user password file)."
),
],
) -> str:
Comment on lines +610 to +623
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not log presigned URLs; restrict file_name to an enum

  • Logging result leaks a time‑limited secret URL into logs. Remove it.
  • Constrain file_name to a Literal[...] so tools/LLMs choose valid options.
- from typing import Any, Annotated
+ from typing import Any, Annotated, Literal
@@
-    file_name: Annotated[
-        str,
+    file_name: Annotated[
+        Literal["kubeconfig", "kubeconfig-noingress", "kubeadmin-password"],
         Field(
             description="The type of credential file to download. Valid options are: kubeconfig (Standard kubeconfig file for cluster access), kubeconfig-noingress (Kubeconfig without ingress configuration), kubeadmin-password (The kubeadmin user password file)."
         ),
     ],
@@
-    log.info(
-        "Successfully retrieved presigned URL for cluster %s credentials file %s - %s",
-        cluster_id,
-        file_name,
-        result,
-    )
+    log.info(
+        "Successfully retrieved presigned URL for cluster %s credentials file %s",
+        cluster_id,
+        file_name,
+    )

Also applies to: 657-662

🤖 Prompt for AI Agents
In server.py around lines 610-623 (and similarly around 657-662), the handler
currently accepts file_name as a free string and logs the presigned URL result;
replace the file_name Annotated[str, Field(...)] with an
Annotated[Literal["kubeconfig","kubeconfig-noingress","kubeadmin-password"],
Field(...)] to constrain allowed values, and remove any logging statements that
record or print the generated presigned URL (result) so the time-limited secret
is not written to logs; ensure validation/error handling still returns safe
messages without exposing the URL.

"""
Get presigned download URL for cluster credential files.

Expand Down Expand Up @@ -605,7 +703,21 @@ async def _get_cluster_infra_env_id(client: InventoryClient, cluster_id: str) ->

@mcp.tool()
@track_tool_usage()
async def set_host_role(host_id: str, cluster_id: str, role: str) -> str:
async def set_host_role(
host_id: Annotated[
str, Field(description="The unique identifier of the host to configure.")
],
cluster_id: Annotated[
str,
Field(description="The unique identifier of the cluster containing the host."),
],
role: Annotated[
str,
Field(
description="The role to assign to the host. Valid options are: auto-assign (Let the installer automatically determine the role), master (Control plane node - API server, etcd, scheduler), worker (Compute node for running application workloads)."
),
],
) -> str:
"""
Assign a specific role to a discovered host in the cluster.

Expand Down Expand Up @@ -643,7 +755,17 @@ async def set_host_role(host_id: str, cluster_id: str, role: str) -> str:

@mcp.tool()
@track_tool_usage()
async def set_cluster_ssh_key(cluster_id: str, ssh_public_key: str) -> str:
async def set_cluster_ssh_key(
cluster_id: Annotated[
str, Field(description="The unique identifier of the cluster to update.")
],
ssh_public_key: Annotated[
str,
Field(
description="The SSH public key to set for the cluster. This should be a valid SSH public key in OpenSSH format."
),
],
) -> str:
"""
Set or update the SSH public key for a cluster.

Expand Down