diff --git a/Dockerfile b/Dockerfile index dcad5fe..37027a7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM registry.redhat.io/ubi9/python-311:9.6 +FROM registry.access.redhat.com/ubi9/python-311:9.6 ENV APP_HOME=/opt/app-root/src WORKDIR ${APP_HOME} @@ -18,6 +18,9 @@ RUN chown -R 1001:0 ${APP_HOME} USER 1001 +# Disable file logging in containers - only log to stderr +ENV LOG_TO_FILE=false + EXPOSE 8000 CMD ["uv", "--cache-dir", "/tmp/uv-cache", "run", "server.py"] diff --git a/Makefile b/Makefile index 7f2759b..a8a9908 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ push: .PHONY: run run: - podman run --rm -p 8000:8000 $(IMAGE_NAME):$(TAG) + podman run --rm -p 127.0.0.1:8000:8000 $(IMAGE_NAME):$(TAG) .PHONY: run-local run-local: diff --git a/server.py b/server.py index 0508074..efccd48 100644 --- a/server.py +++ b/server.py @@ -12,6 +12,7 @@ from mcp.server.fastmcp import FastMCP from service_client import InventoryClient +from service_client.logger import log mcp = FastMCP("AssistedService", host="0.0.0.0") @@ -32,16 +33,20 @@ def get_offline_token() -> str: RuntimeError: If no offline token is found in either environment variables or request headers. """ + log.debug("Attempting to retrieve offline token") token = os.environ.get("OFFLINE_TOKEN") if token: + log.debug("Found offline token in environment variables") return token request = mcp.get_context().request_context.request if request is not None: token = request.headers.get("OCM-Offline-Token") if token: + log.debug("Found offline token in request headers") return token + log.error("No offline token found in environment or request headers") raise RuntimeError("No offline token found in environment or request headers") @@ -59,6 +64,7 @@ def get_access_token() -> str: Raises: RuntimeError: If it isn't possible to obtain or generate the access token. """ + log.debug("Attempting to retrieve access token") # First try to get the token from the authorization header: request = mcp.get_context().request_context.request if request is not None: @@ -66,9 +72,11 @@ def get_access_token() -> str: if header is not None: parts = header.split() if len(parts) == 2 and parts[0].lower() == "bearer": + log.debug("Found access token in authorization header") return parts[1] # Now try to get the offline token, and generate a new access token from it: + log.debug("Generating new access token from offline token") params = { "client_id": "cloud-services", "grant_type": "refresh_token", @@ -80,6 +88,7 @@ def get_access_token() -> str: ) response = requests.post(sso_url, data=params, timeout=30) response.raise_for_status() + log.debug("Successfully generated new access token") return response.json()["access_token"] @@ -102,8 +111,10 @@ async def cluster_info(cluster_id: str) -> str: - Network configuration (VIPs, subnets) - Host information and roles """ + log.info("Retrieving cluster information for cluster_id: %s", cluster_id) client = InventoryClient(get_access_token()) result = await client.get_cluster(cluster_id=cluster_id) + log.info("Successfully retrieved cluster information for %s", cluster_id) return result.to_str() @@ -124,6 +135,7 @@ async def list_clusters() -> str: - openshift_version (str): The OpenShift version being installed - status (str): Current cluster status (e.g., 'ready', 'installing', 'error') """ + log.info("Retrieving list of all clusters") client = InventoryClient(get_access_token()) clusters = await client.list_clusters() resp = [ @@ -135,6 +147,7 @@ async def list_clusters() -> str: } for cluster in clusters ] + log.info("Successfully retrieved %s clusters", len(resp)) return json.dumps(resp) @@ -154,8 +167,11 @@ async def cluster_events(cluster_id: str) -> str: str: A JSON-formatted string containing cluster events with timestamps, event types, and descriptive messages about cluster activities. """ + log.info("Retrieving events for cluster_id: %s", cluster_id) client = InventoryClient(get_access_token()) - return await client.get_events(cluster_id=cluster_id) + result = await client.get_events(cluster_id=cluster_id) + log.info("Successfully retrieved events for cluster %s", cluster_id) + return result @mcp.tool() @@ -174,8 +190,13 @@ async def host_events(cluster_id: str, host_id: str) -> str: str: A JSON-formatted string containing host-specific events including hardware validation results, installation steps, and error messages. """ + log.info("Retrieving events for host %s in cluster %s", host_id, cluster_id) client = InventoryClient(get_access_token()) - return await client.get_events(cluster_id=cluster_id, host_id=host_id) + result = await client.get_events(cluster_id=cluster_id, host_id=host_id) + log.info( + "Successfully retrieved events for host %s in cluster %s", host_id, cluster_id + ) + return result @mcp.tool() @@ -198,8 +219,10 @@ async def infraenv_info(infraenv_id: str) -> str: - Associated cluster information - Static network configuration if applicable """ + log.info("Retrieving InfraEnv information for infraenv_id: %s", infraenv_id) client = InventoryClient(get_access_token()) result = await client.get_infra_env(infraenv_id) + log.info("Successfully retrieved InfraEnv information for %s", infraenv_id) return result.to_str() @@ -228,13 +251,26 @@ async def create_cluster( - cluster_id (str): The unique identifier of the created cluster - infraenv_id (str): The unique identifier of the created InfraEnv """ + log.info( + "Creating cluster: name=%s, version=%s, base_domain=%s, single_node=%s", + name, + version, + base_domain, + single_node, + ) client = InventoryClient(get_access_token()) cluster = await client.create_cluster( name, version, single_node, base_dns_domain=base_domain, tags="chatbot" ) + log.info("Successfully created cluster %s with ID: %s", name, cluster.id) infraenv = await client.create_infra_env( name, cluster_id=cluster.id, openshift_version=cluster.openshift_version ) + log.info( + "Successfully created InfraEnv for cluster %s with ID: %s", + cluster.id, + infraenv.id, + ) return json.dumps({"cluster_id": cluster.id, "infraenv_id": infraenv.id}) @@ -258,10 +294,17 @@ async def set_cluster_vips(cluster_id: str, api_vip: str, ingress_vip: str) -> s str: A formatted string containing the updated cluster configuration showing the newly set VIP addresses. """ + log.info( + "Setting VIPs for cluster %s: api_vip=%s, ingress_vip=%s", + cluster_id, + api_vip, + ingress_vip, + ) client = InventoryClient(get_access_token()) result = await client.update_cluster( cluster_id, api_vip=api_vip, ingress_vip=ingress_vip ) + log.info("Successfully set VIPs for cluster %s", cluster_id) return result.to_str() @@ -287,8 +330,10 @@ async def install_cluster(cluster_id: str) -> str: - Network configuration is complete (VIPs set if required) - All cluster validations pass """ + log.info("Initiating installation for cluster_id: %s", cluster_id) client = InventoryClient(get_access_token()) result = await client.install_cluster(cluster_id) + log.info("Successfully triggered installation for cluster %s", cluster_id) return result.to_str() @@ -305,8 +350,10 @@ async def list_versions() -> str: str: A JSON string containing available OpenShift versions with metadata including version numbers, release dates, and support status. """ + log.info("Retrieving available OpenShift versions") client = InventoryClient(get_access_token()) result = await client.get_openshift_versions(True) + log.info("Successfully retrieved OpenShift versions") return json.dumps(result) @@ -323,8 +370,10 @@ async def list_operator_bundles() -> str: str: A JSON string containing available operator bundles with metadata including bundle names, descriptions, and operator details. """ + log.info("Retrieving available operator bundles") client = InventoryClient(get_access_token()) result = await client.get_operator_bundles() + log.info("Successfully retrieved %s operator bundles", len(result)) return json.dumps(result) @@ -346,8 +395,12 @@ async def add_operator_bundle_to_cluster(cluster_id: str, bundle_name: str) -> s str: A formatted string containing the updated cluster configuration showing the newly added operator bundle. """ + log.info("Adding operator bundle '%s' to cluster %s", bundle_name, cluster_id) client = InventoryClient(get_access_token()) result = await client.add_operator_bundle_to_cluster(cluster_id, bundle_name) + log.info( + "Successfully added operator bundle '%s' to cluster %s", bundle_name, cluster_id + ) return result.to_str() @@ -371,8 +424,10 @@ async def set_host_role(host_id: str, infraenv_id: str, role: str) -> str: str: A formatted string containing the updated host configuration showing the newly assigned role. """ + log.info("Setting role '%s' for host %s in InfraEnv %s", role, host_id, infraenv_id) client = InventoryClient(get_access_token()) result = await client.update_host(host_id, infraenv_id, host_role=role) + log.info("Successfully set role '%s' for host %s", role, host_id) return result.to_str() diff --git a/service_client/assisted_service_api.py b/service_client/assisted_service_api.py index 21d7cf2..6cf682e 100644 --- a/service_client/assisted_service_api.py +++ b/service_client/assisted_service_api.py @@ -12,7 +12,9 @@ from urllib.parse import urlparse import requests +from requests.exceptions import RequestException from assisted_service_client import ApiClient, Configuration, api, models +from assisted_service_client.rest import ApiException from service_client.logger import log @@ -44,9 +46,16 @@ def _get_pull_secret(self) -> str: "https://api.openshift.com/api/accounts_mgmt/v1/access_token", ) headers = {"Authorization": f"Bearer {self.access_token}"} - response = requests.post(url, headers=headers, timeout=30) - response.raise_for_status() - return response.text + + try: + log.info("Fetching pull secret from %s", url) + response = requests.post(url, headers=headers, timeout=30) + response.raise_for_status() + log.info("Successfully fetched pull secret") + return response.text + except RequestException as e: + log.error("Error while fetching pull secret from %s: %s", url, str(e)) + raise def _get_client(self) -> ApiClient: configs = Configuration() @@ -92,14 +101,33 @@ async def get_cluster( Returns: models.Cluster: The cluster object containing cluster information. """ - return cast( - models.Cluster, - await asyncio.to_thread( + try: + log.info( + "Getting cluster %s (unregistered: %s)", + cluster_id, + get_unregistered_clusters, + ) + result = await asyncio.to_thread( self._installer_api().v2_get_cluster, cluster_id=cluster_id, get_unregistered_clusters=get_unregistered_clusters, - ), - ) + ) + log.info("Successfully retrieved cluster %s", cluster_id) + return cast(models.Cluster, result) + except ApiException as e: + log.error( + "API error while getting cluster %s: Status: %s, Reason: %s, Body: %s", + cluster_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while getting cluster %s: %s", cluster_id, str(e) + ) + raise async def list_clusters(self) -> list: """ @@ -108,9 +136,22 @@ async def list_clusters(self) -> list: Returns: list: A list of cluster objects. """ - return cast( - list, await asyncio.to_thread(self._installer_api().v2_list_clusters) - ) + try: + log.info("Listing all clusters") + result = await asyncio.to_thread(self._installer_api().v2_list_clusters) + log.info("Successfully listed clusters") + return cast(list, result) + except ApiException as e: + log.error( + "API error while listing clusters: Status: %s, Reason: %s, Body: %s", + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error("Unexpected error while listing clusters: %s", str(e)) + raise async def get_events( self, @@ -135,23 +176,46 @@ async def get_events( """ if categories is None: categories = ["user"] - log.info( - "Downloading events for cluster %s, host %s, infraenv %s, categories %s", - cluster_id, - host_id, - infra_env_id, - categories, - ) - response = await asyncio.to_thread( - self._events_api().v2_list_events, - cluster_id=cluster_id, - host_id=host_id, - infra_env_id=infra_env_id, - categories=categories, - _preload_content=False, - **kwargs, - ) - return cast(Any, response).data + + try: + log.info( + "Getting events for cluster %s, host %s, infra_env %s, categories %s", + cluster_id, + host_id, + infra_env_id, + categories, + ) + response = await asyncio.to_thread( + self._events_api().v2_list_events, + cluster_id=cluster_id, + host_id=host_id, + infra_env_id=infra_env_id, + categories=categories, + _preload_content=False, + **kwargs, + ) + log.info("Successfully retrieved events") + return cast(Any, response).data + except ApiException as e: + log.error( + "API error while getting events (cluster: %s, host: %s, infra_env: %s): Status: %s, Reason: %s, Body: %s", + cluster_id, + host_id, + infra_env_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while getting events (cluster: %s, host: %s, infra_env: %s): %s", + cluster_id, + host_id, + infra_env_id, + str(e), + ) + raise async def get_infra_env(self, infra_env_id: str) -> models.InfraEnv: """ @@ -163,12 +227,31 @@ async def get_infra_env(self, infra_env_id: str) -> models.InfraEnv: Returns: models.InfraEnv: The infrastructure environment object. """ - return cast( - models.InfraEnv, - await asyncio.to_thread( + try: + log.info("Getting infrastructure environment %s", infra_env_id) + result = await asyncio.to_thread( self._installer_api().get_infra_env, infra_env_id=infra_env_id - ), - ) + ) + log.info( + "Successfully retrieved infrastructure environment %s", infra_env_id + ) + return cast(models.InfraEnv, result) + except ApiException as e: + log.error( + "API error while getting infrastructure environment %s: Status: %s, Reason: %s, Body: %s", + infra_env_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while getting infrastructure environment %s: %s", + infra_env_id, + str(e), + ) + raise async def create_cluster( self, name: str, version: str, single_node: bool, **cluster_params: Any @@ -185,22 +268,41 @@ async def create_cluster( Returns: models.Cluster: The created cluster object. """ - if single_node: - cluster_params["control_plane_count"] = 1 - cluster_params["high_availability_mode"] = "None" - cluster_params["user_managed_networking"] = True - - params = models.ClusterCreateParams( - name=name, - openshift_version=version, - pull_secret=self.pull_secret, - **cluster_params, - ) - log.info("Creating cluster with params %s", params.__dict__) - result = await asyncio.to_thread( - self._installer_api().v2_register_cluster, new_cluster_params=params - ) - return cast(models.Cluster, result) + try: + if single_node: + cluster_params["control_plane_count"] = 1 + cluster_params["high_availability_mode"] = "None" + cluster_params["user_managed_networking"] = True + + params = models.ClusterCreateParams( + name=name, + openshift_version=version, + pull_secret=self.pull_secret, + **cluster_params, + ) + log.info( + "Creating cluster '%s' with version %s (single_node: %s)", + name, + version, + single_node, + ) + result = await asyncio.to_thread( + self._installer_api().v2_register_cluster, new_cluster_params=params + ) + log.info("Successfully created cluster '%s'", name) + return cast(models.Cluster, result) + except ApiException as e: + log.error( + "API error while creating cluster '%s': Status: %s, Reason: %s, Body: %s", + name, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error("Unexpected error while creating cluster '%s': %s", name, str(e)) + raise async def create_infra_env( self, name: str, **infra_env_params: Any @@ -215,14 +317,33 @@ async def create_infra_env( Returns: models.InfraEnv: The created infrastructure environment object. """ - infra_env = models.InfraEnvCreateParams( - name=name, pull_secret=self.pull_secret, **infra_env_params - ) - log.info("Creating infra-env with params %s", infra_env.__dict__) - result = await asyncio.to_thread( - self._installer_api().register_infra_env, infraenv_create_params=infra_env - ) - return cast(models.InfraEnv, result) + try: + infra_env = models.InfraEnvCreateParams( + name=name, pull_secret=self.pull_secret, **infra_env_params + ) + log.info("Creating infrastructure environment '%s'", name) + result = await asyncio.to_thread( + self._installer_api().register_infra_env, + infraenv_create_params=infra_env, + ) + log.info("Successfully created infrastructure environment '%s'", name) + return cast(models.InfraEnv, result) + except ApiException as e: + log.error( + "API error while creating infrastructure environment '%s': Status: %s, Reason: %s, Body: %s", + name, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while creating infrastructure environment '%s': %s", + name, + str(e), + ) + raise async def update_cluster( self, @@ -243,23 +364,37 @@ async def update_cluster( Returns: models.Cluster: The updated cluster object. """ - params = models.V2ClusterUpdateParams(**update_params) - if api_vip != "": - params.api_vips = [models.ApiVip(cluster_id=cluster_id, ip=api_vip)] - if ingress_vip != "": - params.ingress_vips = [ - models.IngressVip(cluster_id=cluster_id, ip=ingress_vip) - ] - - log.info("Updating cluster %s with params %s", cluster_id, params) - return cast( - models.Cluster, - await asyncio.to_thread( + try: + params = models.V2ClusterUpdateParams(**update_params) + if api_vip != "": + params.api_vips = [models.ApiVip(cluster_id=cluster_id, ip=api_vip)] + if ingress_vip != "": + params.ingress_vips = [ + models.IngressVip(cluster_id=cluster_id, ip=ingress_vip) + ] + + log.info("Updating cluster %s", cluster_id) + result = await asyncio.to_thread( self._installer_api().v2_update_cluster, cluster_id=cluster_id, cluster_update_params=params, - ), - ) + ) + log.info("Successfully updated cluster %s", cluster_id) + return cast(models.Cluster, result) + except ApiException as e: + log.error( + "API error while updating cluster %s: Status: %s, Reason: %s, Body: %s", + cluster_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while updating cluster %s: %s", cluster_id, str(e) + ) + raise async def install_cluster(self, cluster_id: str) -> models.Cluster: """ @@ -271,13 +406,27 @@ async def install_cluster(self, cluster_id: str) -> models.Cluster: Returns: models.Cluster: The cluster object with updated installation status. """ - log.info("Installing cluster %s", cluster_id) - return cast( - models.Cluster, - await asyncio.to_thread( + try: + log.info("Starting installation for cluster %s", cluster_id) + result = await asyncio.to_thread( self._installer_api().v2_install_cluster, cluster_id=cluster_id - ), - ) + ) + log.info("Successfully started installation for cluster %s", cluster_id) + return cast(models.Cluster, result) + except ApiException as e: + log.error( + "API error while installing cluster %s: Status: %s, Reason: %s, Body: %s", + cluster_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while installing cluster %s: %s", cluster_id, str(e) + ) + raise async def get_openshift_versions( self, only_latest: bool @@ -291,13 +440,25 @@ async def get_openshift_versions( Returns: models.OpenshiftVersions: Object containing available OpenShift versions. """ - return cast( - models.OpenshiftVersions, - await asyncio.to_thread( + try: + log.info("Getting OpenShift versions (only_latest: %s)", only_latest) + result = await asyncio.to_thread( self._versions_api().v2_list_supported_openshift_versions, only_latest=only_latest, - ), - ) + ) + log.info("Successfully retrieved OpenShift versions") + return cast(models.OpenshiftVersions, result) + except ApiException as e: + log.error( + "API error while getting OpenShift versions: Status: %s, Reason: %s, Body: %s", + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error("Unexpected error while getting OpenShift versions: %s", str(e)) + raise async def get_operator_bundles(self) -> list[dict[str, Any]]: """ @@ -306,10 +467,22 @@ async def get_operator_bundles(self) -> list[dict[str, Any]]: Returns: list: A list of operator bundle dictionaries. """ - bundles = cast( - list, await asyncio.to_thread(self._operators_api().v2_list_bundles) - ) - return [bundle.to_dict() for bundle in bundles] + try: + log.info("Getting operator bundles") + bundles = await asyncio.to_thread(self._operators_api().v2_list_bundles) + log.info("Successfully retrieved operator bundles") + return [bundle.to_dict() for bundle in cast(list, bundles)] + except ApiException as e: + log.error( + "API error while getting operator bundles: Status: %s, Reason: %s, Body: %s", + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error("Unexpected error while getting operator bundles: %s", str(e)) + raise async def add_operator_bundle_to_cluster( self, cluster_id: str, bundle_name: str @@ -324,16 +497,44 @@ async def add_operator_bundle_to_cluster( Returns: models.Cluster: The updated cluster object with the new operator. """ - bundle = cast( - Any, - await asyncio.to_thread(self._operators_api().v2_get_bundle, bundle_name), - ) - olm_operators = [ - models.OperatorCreateParams(name=op_name) for op_name in bundle.operators - ] - return await self.update_cluster( - cluster_id=cluster_id, olm_operators=olm_operators - ) + try: + log.info( + "Adding operator bundle '%s' to cluster %s", bundle_name, cluster_id + ) + bundle = await asyncio.to_thread( + self._operators_api().v2_get_bundle, bundle_name + ) + olm_operators = [ + models.OperatorCreateParams(name=op_name) + for op_name in getattr(bundle, "operators", []) + ] + result = await self.update_cluster( + cluster_id=cluster_id, olm_operators=olm_operators + ) + log.info( + "Successfully added operator bundle '%s' to cluster %s", + bundle_name, + cluster_id, + ) + return result + except ApiException as e: + log.error( + "API error while adding operator bundle '%s' to cluster %s: Status: %s, Reason: %s, Body: %s", + bundle_name, + cluster_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while adding operator bundle '%s' to cluster %s: %s", + bundle_name, + cluster_id, + str(e), + ) + raise async def update_host( self, host_id: str, infra_env_id: str, **update_params: Any @@ -349,10 +550,37 @@ async def update_host( Returns: models.Host: The updated host object. """ - params = models.HostUpdateParams(**update_params) - return cast( - models.Host, - await asyncio.to_thread( + try: + params = models.HostUpdateParams(**update_params) + log.info( + "Updating host %s in infrastructure environment %s", + host_id, + infra_env_id, + ) + result = await asyncio.to_thread( self._installer_api().v2_update_host, infra_env_id, host_id, params - ), - ) + ) + log.info( + "Successfully updated host %s in infrastructure environment %s", + host_id, + infra_env_id, + ) + return cast(models.Host, result) + except ApiException as e: + log.error( + "API error while updating host %s in infrastructure environment %s: Status: %s, Reason: %s, Body: %s", + host_id, + infra_env_id, + e.status, + e.reason, + e.body, + ) + raise + except Exception as e: + log.error( + "Unexpected error while updating host %s in infrastructure environment %s: %s", + host_id, + infra_env_id, + str(e), + ) + raise diff --git a/service_client/logger.py b/service_client/logger.py index d7da957..39f8b5d 100644 --- a/service_client/logger.py +++ b/service_client/logger.py @@ -16,6 +16,15 @@ class SensitiveFormatter(logging.Formatter): """Formatter that removes sensitive info.""" + # Default log format used by this formatter + DEFAULT_FORMAT = "%(asctime)s - %(name)s - %(levelname)-8s - %(thread)d:%(process)d - %(message)s - (%(pathname)s:%(lineno)d)->%(funcName)s" + + def __init__(self, fmt: str | None = None) -> None: + """Initialize with default format if none provided.""" + if fmt is None: + fmt = self.DEFAULT_FORMAT + super().__init__(fmt) + @staticmethod def _filter(s: str) -> str: # Dict filter @@ -89,11 +98,8 @@ def add_log_file_handler(logger: logging.Logger, filename: str) -> logging.FileH Returns: logging.FileHandler: The created file handler. """ - fmt = SensitiveFormatter( - "%(asctime)s - %(name)s - %(levelname)s - %(thread)d:%(process)d - %(message)s" - ) fh = logging.FileHandler(filename) - fh.setFormatter(fmt) + fh.setFormatter(SensitiveFormatter()) logger.addHandler(fh) return fh @@ -105,12 +111,8 @@ def add_stream_handler(logger: logging.Logger) -> None: Args: logger: The logger instance to add the handler to. """ - fmt = SensitiveFormatter( - "%(asctime)s %(name)s %(levelname)-10s - %(thread)d - %(message)s \t" - "(%(pathname)s:%(lineno)d)->%(funcName)s" - ) ch = logging.StreamHandler(sys.stderr) - ch.setFormatter(fmt) + ch.setFormatter(SensitiveFormatter()) logger.addHandler(ch) @@ -124,7 +126,12 @@ def add_stream_handler(logger: logging.Logger) -> None: log = logging.getLogger(logger_name) log.setLevel(get_logging_level()) -add_log_file_handler(log, "assisted-service-mcp.log") -add_log_file_handler(urllib3_logger, "assisted-service-mcp.log") +# Check if we should log to file (default: True, set to False in containers) +log_to_file = os.environ.get("LOG_TO_FILE", "true").lower() == "true" + +if log_to_file: + add_log_file_handler(log, "assisted-service-mcp.log") + add_log_file_handler(urllib3_logger, "assisted-service-mcp.log") + add_stream_handler(log) add_stream_handler(urllib3_logger)