Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from assisted_service_client import models
from mcp.server.fastmcp import FastMCP


from service_client import InventoryClient, metrics, track_tool_usage, initiate_metrics
from service_client.logger import log

Expand Down Expand Up @@ -89,7 +88,7 @@ def get_offline_token() -> str:
raise RuntimeError("No offline token found in environment or request headers")


def get_access_token() -> str:
async def get_access_token() -> str:
"""
Retrieve the access token.

Expand Down Expand Up @@ -125,7 +124,7 @@ def get_access_token() -> str:
"SSO_URL",
"https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token",
)
response = requests.post(sso_url, data=params, timeout=30)
response = await asyncio.to_thread(requests.post, sso_url, data=params, timeout=30)
response.raise_for_status()
log.debug("Successfully generated new access token")
return response.json()["access_token"]
Expand All @@ -152,7 +151,7 @@ async def cluster_info(cluster_id: str) -> str:
- Host information and roles
"""
log.info("Retrieving cluster information for cluster_id: %s", cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.get_cluster(cluster_id=cluster_id)
log.info("Successfully retrieved cluster information for %s", cluster_id)
return result.to_str()
Expand All @@ -177,7 +176,7 @@ async def list_clusters() -> str:
- status (str): Current cluster status (e.g., 'ready', 'installing', 'error')
"""
log.info("Retrieving list of all clusters")
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
clusters = await client.list_clusters()
resp = [
{
Expand Down Expand Up @@ -210,7 +209,7 @@ async def cluster_events(cluster_id: str) -> str:
event types, and descriptive messages about cluster activities.
"""
log.info("Retrieving events for cluster_id: %s", cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.get_events(cluster_id=cluster_id)
log.info("Successfully retrieved events for cluster %s", cluster_id)
return result
Expand All @@ -234,7 +233,7 @@ async def host_events(cluster_id: str, host_id: str) -> str:
hardware validation results, installation steps, and error messages.
"""
log.info("Retrieving events for host %s in cluster %s", host_id, cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.get_events(cluster_id=cluster_id, host_id=host_id)
log.info(
"Successfully retrieved events for host %s in cluster %s", host_id, cluster_id
Expand All @@ -260,7 +259,7 @@ async def cluster_iso_download_url(cluster_id: str) -> str:
}]
"""
log.info("Retrieving InfraEnv ISO URLs for cluster_id: %s", cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
infra_envs = await client.list_infra_envs(cluster_id)

if not infra_envs:
Expand Down Expand Up @@ -354,7 +353,7 @@ async def create_cluster( # pylint: disable=too-many-arguments,too-many-positio
cpu_architecture,
ssh_public_key is not None,
)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())

# Prepare cluster parameters
cluster_params = {
Expand Down Expand Up @@ -417,7 +416,7 @@ async def set_cluster_vips(cluster_id: str, api_vip: str, ingress_vip: str) -> s
api_vip,
ingress_vip,
)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.update_cluster(
cluster_id, api_vip=api_vip, ingress_vip=ingress_vip
)
Expand Down Expand Up @@ -449,7 +448,7 @@ async def install_cluster(cluster_id: str) -> str:
- All cluster validations pass
"""
log.info("Initiating installation for cluster_id: %s", cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.install_cluster(cluster_id)
log.info("Successfully triggered installation for cluster %s", cluster_id)
return result.to_str()
Expand All @@ -470,7 +469,7 @@ async def list_versions() -> str:
including version numbers, release dates, and support status.
"""
log.info("Retrieving available OpenShift versions")
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.get_openshift_versions(True)
log.info("Successfully retrieved OpenShift versions")
return json.dumps(result)
Expand All @@ -490,7 +489,7 @@ async def list_operator_bundles() -> str:
including bundle names, descriptions, and operator details.
"""
log.info("Retrieving available operator bundles")
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.get_operator_bundles()
log.info("Successfully retrieved %s operator bundles", len(result))
return json.dumps(result)
Expand All @@ -516,7 +515,7 @@ async def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> s
showing the newly added operator bundle.
"""
log.info("Adding operator bundle '%s' to cluster %s", bundle_name, cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.add_operator_bundle_to_cluster(cluster_id, bundle_name)
log.info(
"Successfully added operator bundle '%s' to cluster %s", bundle_name, cluster_id
Expand Down Expand Up @@ -558,7 +557,7 @@ async def cluster_credentials_download_url(cluster_id: str, file_name: str) -> s
cluster_id,
file_name,
)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.get_presigned_for_cluster_credentials(cluster_id, file_name)
log.info(
"Successfully retrieved presigned URL for cluster %s credentials file %s - %s",
Expand Down Expand Up @@ -592,7 +591,7 @@ async def set_host_role(host_id: str, infraenv_id: str, role: str) -> str:
showing the newly assigned role.
"""
log.info("Setting role '%s' for host %s in InfraEnv %s", role, host_id, infraenv_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())
result = await client.update_host(host_id, infraenv_id, host_role=role)
log.info("Successfully set role '%s' for host %s", role, host_id)
return result.to_str()
Expand All @@ -618,7 +617,7 @@ async def set_cluster_ssh_key(cluster_id: str, ssh_public_key: str) -> str:
str: A formatted string containing the updated cluster configuration.
"""
log.info("Setting SSH public key for cluster %s", cluster_id)
client = InventoryClient(get_access_token())
client = InventoryClient(await get_access_token())

# Update the cluster with the new SSH public key
result = await client.update_cluster(cluster_id, ssh_public_key=ssh_public_key)
Expand Down
20 changes: 10 additions & 10 deletions service_client/assisted_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ def __init__(self, access_token: str):
)
self.client_debug = os.environ.get("CLIENT_DEBUG", "False").lower() == "true"

@property
def pull_secret(self) -> str:
async def get_pull_secret(self) -> str:
"""Lazy-load the pull secret when first accessed."""
if self._pull_secret is None:
self._pull_secret = self._get_pull_secret()
return self._pull_secret
if self._pull_secret is not None:
return self._pull_secret

def _get_pull_secret(self) -> str:
url = os.environ.get(
"PULL_SECRET_URL",
"https://api.openshift.com/api/accounts_mgmt/v1/access_token",
Expand All @@ -56,10 +53,13 @@ def _get_pull_secret(self) -> str:

try:
log.info("Fetching pull secret from %s", url)
response = requests.post(url, headers=headers, timeout=30)
response = await asyncio.to_thread(
requests.post, url, headers=headers, timeout=30
)
response.raise_for_status()
log.info("Successfully fetched pull secret")
return response.text
self._pull_secret = response.text
return self._pull_secret
except RequestException as e:
log.error("Error while fetching pull secret from %s: %s", url, str(e))
raise
Expand Down Expand Up @@ -242,7 +242,7 @@ async def create_cluster(
params = models.ClusterCreateParams(
name=name,
openshift_version=version,
pull_secret=self.pull_secret,
pull_secret=await self.get_pull_secret(),
**cluster_params,
)
log.info(
Expand Down Expand Up @@ -272,7 +272,7 @@ async def create_infra_env(
models.InfraEnv: The created infrastructure environment object.
"""
infra_env = models.InfraEnvCreateParams(
name=name, pull_secret=self.pull_secret, **infra_env_params
name=name, pull_secret=await self.get_pull_secret(), **infra_env_params
)
log.info("Creating infrastructure environment '%s'", name)
result = await asyncio.to_thread(
Expand Down
2 changes: 1 addition & 1 deletion service_client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
)
raise AssistedServiceAPIError(f"API error: Status {e.status}") from e
except Exception as e:
log.error("Unexpected error during %s: %s", operation_name, str(e))
log.exception("Unexpected error during %s: %s", operation_name, str(e))
raise AssistedServiceAPIError("An internal error occurred") from e

return wrapper
63 changes: 39 additions & 24 deletions service_client/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import os
import re
import sys
import atexit
import queue
from logging.handlers import QueueHandler, QueueListener


class SensitiveFormatter(logging.Formatter):
Expand Down Expand Up @@ -87,33 +90,18 @@ def get_logging_level() -> int:
logging.getLogger("asyncio").setLevel(logging.ERROR)


def add_log_file_handler(logger: logging.Logger, filename: str) -> logging.FileHandler:
"""
Add a file handler to the logger with sensitive information filtering.

Args:
logger: The logger instance to add the handler to.
filename: The path to the log file.

Returns:
logging.FileHandler: The created file handler.
"""
def _create_file_handler(filename: str) -> logging.FileHandler:
"""Create a file handler with sensitive formatting."""
fh = logging.FileHandler(filename)
fh.setFormatter(SensitiveFormatter())
logger.addHandler(fh)
return fh


def add_stream_handler(logger: logging.Logger) -> None:
"""
Add a stream handler to the logger with sensitive information filtering.

Args:
logger: The logger instance to add the handler to.
"""
def _create_stream_handler() -> logging.StreamHandler:
"""Create a stream handler to stderr with sensitive formatting."""
ch = logging.StreamHandler(sys.stderr)
ch.setFormatter(SensitiveFormatter())
logger.addHandler(ch)
return ch


logger_name = os.environ.get("LOGGER_NAME", "")
Expand All @@ -129,9 +117,36 @@ def add_stream_handler(logger: logging.Logger) -> None:
# Check if we should log to file (default: True, set to False in containers)
log_to_file = os.environ.get("LOG_TO_FILE", "true").lower() == "true"

# Configure non-blocking logging via a Queue
_log_queue: queue.Queue[logging.LogRecord] = queue.Queue()

_handlers: list[logging.Handler] = []
if log_to_file:
add_log_file_handler(log, "assisted-service-mcp.log")
add_log_file_handler(urllib3_logger, "assisted-service-mcp.log")
_handlers.append(_create_file_handler("assisted-service-mcp.log"))
_handlers.append(_create_stream_handler())

# Start a single listener that will process records on a background thread
_queue_listener = QueueListener(_log_queue, *_handlers, respect_handler_level=True)
_queue_listener.start()

# Attach QueueHandler to our loggers
_queue_handler = QueueHandler(_log_queue)

# Avoid duplicate propagation if root logger is used
log.handlers = [_queue_handler] if _queue_handler not in log.handlers else []
log.propagate = False

urllib3_logger.handlers = (
[_queue_handler] if _queue_handler not in urllib3_logger.handlers else []
)
urllib3_logger.propagate = False


def _stop_queue_listener() -> None:
try:
_queue_listener.stop()
except Exception: # noqa: BLE001 - best effort stop at exit
pass


add_stream_handler(log)
add_stream_handler(urllib3_logger)
atexit.register(_stop_queue_listener)
Loading