diff --git a/assisted_service_mcp/src/mcp.py b/assisted_service_mcp/src/mcp.py index 53ac9ec..24ae569 100644 --- a/assisted_service_mcp/src/mcp.py +++ b/assisted_service_mcp/src/mcp.py @@ -75,7 +75,7 @@ def _register_mcp_tools(self) -> None: self.mcp.tool()(self._wrap_tool(cluster_tools.install_cluster)) self.mcp.tool()(self._wrap_tool(cluster_tools.set_cluster_ssh_key)) if settings.ENABLE_TROUBLESHOOTING_TOOLS: - self.mcp.tool()(self._wrap_tool(cluster_tools.analyze_cluster_logs)) + self.mcp.tool()(self._wrap_tool(cluster_tools.troubleshoot_cluster)) # Register event monitoring tools self.mcp.tool()(self._wrap_tool(event_tools.cluster_events)) diff --git a/assisted_service_mcp/src/tools/cluster_tools.py b/assisted_service_mcp/src/tools/cluster_tools.py index c249c45..1539d95 100644 --- a/assisted_service_mcp/src/tools/cluster_tools.py +++ b/assisted_service_mcp/src/tools/cluster_tools.py @@ -365,19 +365,19 @@ async def set_cluster_ssh_key( @track_tool_usage() -async def analyze_cluster_logs( +async def troubleshoot_cluster( get_access_token_func: Callable[[], str], cluster_id: Annotated[str, Field(description="The ID of the cluster")], ) -> str: - """Analyze Assisted Installer logs for a cluster and summarize findings. + """Troubleshoot an Assisted Installer cluster and its logs (if available) and summarize findings. - Runs a set of built‑in log analysis signatures against the cluster’s collected - logs (controller logs, bootstrap/control‑plane logs, and must‑gather content + Runs a set of built‑in analysis signatures against the cluster’s data or + the collected logs (controller logs, bootstrap/control‑plane logs, and must‑gather content when available). The results highlight common misconfigurations and known error patterns to speed up triage of failed or degraded installations. Prerequisites: - - Logs are available for the target cluster (downloadable via the API) + - Cluster is available (downloadable via the API) Returns: str: Human‑readable report of signature results. Returns an empty diff --git a/assisted_service_mcp/src/utils/log_analyzer/__init__.py b/assisted_service_mcp/src/utils/log_analyzer/__init__.py index ef5f8b4..b80d736 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/__init__.py +++ b/assisted_service_mcp/src/utils/log_analyzer/__init__.py @@ -4,11 +4,12 @@ A standalone tool for analyzing OpenShift Assisted Installer logs. """ -from .log_analyzer import LogAnalyzer +from .log_analyzer import ClusterAnalyzer, LogAnalyzer from .signatures import ALL_SIGNATURES, SignatureResult __version__ = "1.0.0" __all__ = [ + "ClusterAnalyzer", "LogAnalyzer", "ALL_SIGNATURES", "SignatureResult", diff --git a/assisted_service_mcp/src/utils/log_analyzer/log_analyzer.py b/assisted_service_mcp/src/utils/log_analyzer/log_analyzer.py index e35b6a8..0bcafc5 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/log_analyzer.py +++ b/assisted_service_mcp/src/utils/log_analyzer/log_analyzer.py @@ -1,5 +1,5 @@ """ -Core log analyzer for OpenShift Assisted Installer logs. +Core analyzer for OpenShift Assisted Installer clusters and their logs. """ import json @@ -19,44 +19,45 @@ ) -class LogAnalyzer: - """Analyzer for OpenShift Assisted Installer logs.""" - - _metadata: dict[str, Any] | None - - def __init__(self, logs_archive: nestedarchive.RemoteNestedArchive): - """ - Initialize the log analyzer. +class ClusterAnalyzer: + """Analyzer for OpenShift Assisted Installer clusters.""" - Args: - logs_archive: RemoteNestedArchive containing the cluster logs - """ - self.logs_archive = logs_archive + def __init__(self): self._metadata = None self._cluster_events = None + def set_cluster_metadata(self, metadata: Dict[str, Any]): + """Set cluster metadata for analyzer.""" + if not metadata.get("cluster"): + # Wrap metadata in a "cluster" key to match the expected structure + metadata = {"cluster": metadata} + self._metadata = self._clean_metadata_json(metadata) + + def set_cluster_events(self, events: List[Dict[str, Any]]): + """Set cluster events for analyzer.""" + self._cluster_events = events + @property def metadata(self) -> Dict[str, Any] | None: """Get cluster metadata.""" - if self._metadata is None: - try: - metadata_content = self.logs_archive.get("cluster_metadata.json") - raw_metadata = json.loads(cast(str | bytes, metadata_content)) - - # The metadata file contains cluster information at the root level - # Wrap it in a "cluster" key to match the expected structure - wrapped_metadata = {"cluster": raw_metadata} - self._metadata = self._clean_metadata_json(wrapped_metadata) - except Exception as e: - logger.error("Failed to load metadata: %s", e) - raise return self._metadata + @property + def cluster_events(self) -> List[Dict[str, Any]] | None: + """Get cluster events.""" + return self._cluster_events + + def get_all_cluster_events(self) -> List[Dict[str, Any]]: + """Get all the cluster installation events.""" + if self._cluster_events is None: + return [] + return self._cluster_events + @staticmethod def _clean_metadata_json(md: Dict[str, Any]) -> Dict[str, Any]: """Clean metadata JSON by separating deleted hosts.""" installation_start_time = dateutil.parser.isoparse( - md["cluster"]["install_started_at"] + str(md["cluster"]["install_started_at"]) ) def host_deleted_before_installation_started(host): @@ -87,21 +88,6 @@ def get_last_install_cluster_events(self) -> List[Dict[str, Any]]: return events - def get_all_cluster_events(self) -> List[Dict[str, Any]]: - """Get all the cluster installation events.""" - if self._cluster_events is None: - try: - events_content = self.logs_archive.get("cluster_events.json") - all_events = json.loads(cast(str | bytes, events_content)) - - # Get the last partition (latest installation attempt) - self._cluster_events = self.partition_cluster_events(all_events)[-1] - except Exception as e: - logger.error("Failed to load cluster events: %s", e) - self._cluster_events = [] - - return self._cluster_events - @staticmethod def partition_cluster_events( events: List[Dict[str, Any]], @@ -131,6 +117,66 @@ def get_events_by_host(self) -> Dict[str, List[Dict[str, Any]]]: events_by_host[event["host_id"]].append(event) return events_by_host + @staticmethod + def get_hostname(host: Dict[str, Any]) -> str: + """Extract hostname from host metadata.""" + hostname = host.get("requested_hostname") + if hostname: + return hostname + + try: + inventory = json.loads(host["inventory"]) + return inventory["hostname"] + except (KeyError, json.JSONDecodeError): + return host.get("id", "unknown") + + +class LogAnalyzer(ClusterAnalyzer): + """Analyzer for OpenShift Assisted Installer logs.""" + + _metadata: dict[str, Any] | None + + def __init__(self, logs_archive: nestedarchive.RemoteNestedArchive): + """ + Initialize the log analyzer. + + Args: + logs_archive: RemoteNestedArchive containing the cluster logs + """ + super().__init__() + self.logs_archive = logs_archive + + @property + def metadata(self) -> Dict[str, Any] | None: + """Get cluster metadata.""" + if self._metadata is None: + try: + metadata_content = self.logs_archive.get("cluster_metadata.json") + raw_metadata = json.loads(cast(str | bytes, metadata_content)) + + # The metadata file contains cluster information at the root level + # Wrap it in a "cluster" key to match the expected structure + wrapped_metadata = {"cluster": raw_metadata} + self._metadata = self._clean_metadata_json(wrapped_metadata) + except Exception as e: + logger.error("Failed to load metadata: %s", e) + raise + return self._metadata + + def get_all_cluster_events(self) -> List[Dict[str, Any]]: + """Get all the cluster installation events.""" + if self._cluster_events is None: + try: + events_content = self.logs_archive.get("cluster_events.json") + all_events = json.loads(cast(str | bytes, events_content)) + + self._cluster_events = all_events + except Exception as e: + logger.error("Failed to load cluster events: %s", e) + self._cluster_events = [] + + return self._cluster_events + def get_host_log_file(self, host_id: str, filename: str) -> str: """ Get a specific log file for a host. @@ -213,16 +259,3 @@ def get_must_gather(self) -> bytes: "controller_logs.tar.gz/must-gather.tar.gz", mode="rb" ), ) - - @staticmethod - def get_hostname(host: Dict[str, Any]) -> str: - """Extract hostname from host metadata.""" - hostname = host.get("requested_hostname") - if hostname: - return hostname - - try: - inventory = json.loads(host["inventory"]) - return inventory["hostname"] - except (KeyError, json.JSONDecodeError): - return host.get("id", "unknown") diff --git a/assisted_service_mcp/src/utils/log_analyzer/main.py b/assisted_service_mcp/src/utils/log_analyzer/main.py index 580eafb..0ccfa57 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/main.py +++ b/assisted_service_mcp/src/utils/log_analyzer/main.py @@ -3,12 +3,19 @@ Main entry point for the OpenShift Assisted Installer Log Analyzer. """ import logging +import json from typing import List, Optional from assisted_service_mcp.src.service_client.assisted_service_api import InventoryClient - -from .log_analyzer import LogAnalyzer -from .signatures import ALL_SIGNATURES, SignatureResult +from assisted_service_mcp.src.utils.log_analyzer.log_analyzer import ( + ClusterAnalyzer, + LogAnalyzer, +) +from assisted_service_mcp.src.utils.log_analyzer.signatures import ( + ALL_SIGNATURES, + SignatureResult, + Signature, +) async def analyze_cluster( @@ -17,11 +24,11 @@ async def analyze_cluster( specific_signatures: Optional[List[str]] = None, ) -> List[SignatureResult]: """ - Analyze a cluster's logs. + Analyze a cluster by using a signature analysis tool to detect common issues and errors. Args: cluster_id: UUID of the cluster to analyze - api_client: Client to fetch log files with + api_client: Client to fetch the cluster and its log files with specific_signatures: List of specific signature names to run (None for all) Returns: @@ -33,22 +40,44 @@ async def analyze_cluster( logger.info("Analyzing cluster: %s", cluster_id) try: - # Download logs - logs_archive = await api_client.get_cluster_logs(cluster_id) + # first call the api to get the cluster and check if logs are available + cluster = await api_client.get_cluster(cluster_id) + + if cluster.logs_info != "completed": + logger.info( + "Logs are not available for cluster: %s\nDefaulting to signatures that don't require logs", + cluster_id, + ) + + analyzer = ClusterAnalyzer() + + # Call events API to get the events and set the events in the analyzer + events = await api_client.get_events(cluster_id) + analyzer.set_cluster_events(json.loads(events)) + + # Set the cluster metadata in the analyzer + analyzer.set_cluster_metadata(cluster.to_dict()) + + # Select signatures that don't require logs + signatures_to_run = [ + sig for sig in ALL_SIGNATURES if sig.logs_required is False + ] - # Initialize log analyzer - log_analyzer = LogAnalyzer(logs_archive) + else: + # Download logs + logs_archive = await api_client.get_cluster_logs(cluster_id) - # Determine which signatures to run - signatures_to_run = ALL_SIGNATURES + # Initialize log analyzer + analyzer = LogAnalyzer(logs_archive) + + # Add all signatures to the list to run + signatures_to_run = ALL_SIGNATURES + + # If specific signatures are provided, filter the signatures to run if specific_signatures: - signature_classes = {sig.__name__: sig for sig in ALL_SIGNATURES} - signatures_to_run = [] - for sig_name in specific_signatures: - if sig_name in signature_classes: - signatures_to_run.append(signature_classes[sig_name]) - else: - logger.warning("Unknown signature: %s", sig_name) + signatures_to_run = filter_signatures( + signatures_to_run, specific_signatures + ) # Run signatures results = [] @@ -56,7 +85,7 @@ async def analyze_cluster( logger.debug("Running signature: %s", signature_class.__name__) try: signature = signature_class() - result = signature.analyze(log_analyzer) + result = signature.analyze(analyzer) if result: results.append(result) except Exception as e: @@ -71,6 +100,21 @@ async def analyze_cluster( raise +def filter_signatures( + signatures: List[type[Signature]], specific_signatures: List[str] +) -> List[type[Signature]]: + """Filter signatures to run based on specific signatures.""" + logger = logging.getLogger(__name__) + signature_classes = {sig.__name__: sig for sig in signatures} + filtered_signatures = [] + for sig_name in specific_signatures: + if sig_name in signature_classes: + filtered_signatures.append(signature_classes[sig_name]) + else: + logger.warning("Unknown signature: %s", sig_name) + return filtered_signatures + + def print_results(results: List[SignatureResult]) -> None: """Print analysis results to stdout.""" if not results: diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/advanced_analysis.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/advanced_analysis.py index 0be4f39..3a8b7bc 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/advanced_analysis.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/advanced_analysis.py @@ -79,6 +79,8 @@ def filter_operators( class EventsInstallationAttempts(Signature): """Inspects events file to check for multiple installation attempts.""" + logs_required = False + def analyze(self, log_analyzer) -> Optional[SignatureResult]: """Analyze multiple installation attempts.""" try: diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/base.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/base.py index e52c9a3..f73ee04 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/base.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/base.py @@ -51,6 +51,8 @@ def __str__(self) -> str: class Signature(abc.ABC): """Base class for signature analysis.""" + logs_required = True + def __init__(self): """Initialize the signature.""" self.name = self.__class__.__name__ diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/basic_info.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/basic_info.py index 9faa959..67f24eb 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/basic_info.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/basic_info.py @@ -14,6 +14,8 @@ class ComponentsVersionSignature(Signature): """Analyzes component versions.""" + logs_required = False + def analyze(self, log_analyzer) -> Optional[SignatureResult]: """Analyze component versions.""" try: diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/error_detection.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/error_detection.py index b2d8277..2a571db 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/error_detection.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/error_detection.py @@ -36,6 +36,8 @@ def _search_patterns_in_string(string, patterns): class SNOHostnameHasEtcd(ErrorSignature): """Looks for etcd in SNO hostname (OCPBUGS-15852).""" + logs_required = False + def analyze(self, log_analyzer) -> Optional[SignatureResult]: """Analyze SNO hostname for etcd.""" try: diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/networking.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/networking.py index b6e096f..5b8c45f 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/networking.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/networking.py @@ -27,6 +27,8 @@ class SNOMachineCidrSignature(Signature): """Validates machine CIDR configuration for SNO clusters.""" + logs_required = False + def analyze(self, log_analyzer) -> Optional[SignatureResult]: """Analyze SNO machine CIDR configuration.""" try: diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/performance.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/performance.py index ce3b987..f2c90a8 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/performance.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/performance.py @@ -16,6 +16,7 @@ class SlowImageDownloadSignature(ErrorSignature): """Analyzes slow image download rates.""" + logs_required = False image_download_regex = re.compile( r"Host (?P.+?): New image status (?P.+?). result:.+?; download rate: (?P.+?) MBps" ) diff --git a/assisted_service_mcp/src/utils/log_analyzer/signatures/platform_specific.py b/assisted_service_mcp/src/utils/log_analyzer/signatures/platform_specific.py index f4498cb..aad171d 100644 --- a/assisted_service_mcp/src/utils/log_analyzer/signatures/platform_specific.py +++ b/assisted_service_mcp/src/utils/log_analyzer/signatures/platform_specific.py @@ -19,6 +19,8 @@ class LibvirtRebootFlagSignature(ErrorSignature): """Detect potential libvirt _on_reboot_ flag issue (MGMT-2840).""" + logs_required = False + def analyze(self, log_analyzer) -> Optional[SignatureResult]: md = log_analyzer.metadata cluster = md["cluster"] diff --git a/tests/test_log_analyzer.py b/tests/test_log_analyzer.py index 1ef1033..17eb743 100644 --- a/tests/test_log_analyzer.py +++ b/tests/test_log_analyzer.py @@ -1,6 +1,6 @@ import asyncio from typing import Any, Mapping -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock def make_archive(get_map: Mapping[str, object]) -> Any: @@ -17,6 +17,97 @@ def get( return _A() +def test_cluster_analyzer_metadata_and_events_partitioning() -> None: + """Test that the cluster analyzer correctly handles cluster metadata and events partitioning.""" + from assisted_service_mcp.src.utils.log_analyzer.log_analyzer import ClusterAnalyzer + + ca = ClusterAnalyzer() + ca.set_cluster_metadata( + { + "cluster": { + "install_started_at": "2025-01-01T11:11:00Z", + "hosts": [ + {"id": "h1", "requested_hostname": "h1-hostname"}, + { + "id": "h2", + "requested_hostname": "h2", + "deleted_at": "2025-01-01T10:11:00Z", + }, + ], + } + } + ) + ca.set_cluster_events( + [ + { + "name": "example_event", + "event_time": "2025-01-01T11:11:00Z", + "host_id": "h1", + }, + { + "name": "cluster_installation_reset", + "message": "Cluster installation reset 1", + "host_id": "h1", + }, + { + "name": "example_recent_event", + "event_time": "2025-01-01T11:11:01Z", + "message": "Most recent event", + "host_id": "h1", + }, + ] + ) + assert ca.metadata is not None + assert ca.metadata["cluster"]["install_started_at"] == "2025-01-01T11:11:00Z" + # deleted host should be removed from hosts list + assert ca.metadata["cluster"]["hosts"] == [ + {"id": "h1", "requested_hostname": "h1-hostname"} + ] + assert ca.cluster_events is not None + assert ca.get_all_cluster_events() == [ + { + "name": "example_event", + "event_time": "2025-01-01T11:11:00Z", + "host_id": "h1", + }, + { + "name": "cluster_installation_reset", + "message": "Cluster installation reset 1", + "host_id": "h1", + }, + { + "name": "example_recent_event", + "event_time": "2025-01-01T11:11:01Z", + "message": "Most recent event", + "host_id": "h1", + }, + ] + # last install cluster events should be the most recent event after the reset event + assert ca.get_last_install_cluster_events() == [ + { + "name": "example_recent_event", + "event_time": "2025-01-01T11:11:01Z", + "message": "Most recent event", + "host_id": "h1", + } + ] + assert ca.get_events_by_host() == { + "h1": [ + { + "name": "example_recent_event", + "event_time": "2025-01-01T11:11:01Z", + "message": "Most recent event", + "host_id": "h1", + } + ] + } + # get hostname should return the requested hostname of the host + assert ( + ca.get_hostname({"id": "h1", "requested_hostname": "h1-hostname"}) + == "h1-hostname" + ) + + def test_log_analyzer_metadata_and_events_partitioning() -> None: from assisted_service_mcp.src.utils.log_analyzer.log_analyzer import LogAnalyzer @@ -93,14 +184,22 @@ def test_main_analyze_cluster_runs_signatures() -> None: from assisted_service_mcp.src.utils.log_analyzer import main as main_mod fake_archive = MagicMock() - fake_archive.get.return_value = "{}" # minimal content to keep signatures no-op - - fake_client = MagicMock() + fake_archive.get.return_value = ( + '{"cluster": {"install_started_at": "2025-01-01T00:00:00Z", "hosts": []}}' + ) - async def _get_logs(cid: str) -> Any: # pylint: disable=unused-argument - return fake_archive + # Mock the cluster object returned by get_cluster + fake_cluster = MagicMock() + fake_cluster.logs_info = "completed" # Trigger the logs path + fake_cluster.to_dict.return_value = { + "install_started_at": "2025-01-01T00:00:00Z", + "hosts": [], + } - fake_client.get_cluster_logs = _get_logs # type: ignore[attr-defined] + fake_client = AsyncMock() + fake_client.get_cluster.return_value = fake_cluster + fake_client.get_cluster_logs.return_value = fake_archive + fake_client.get_events.return_value = "[]" # JSON string # Run with an empty signature list to ensure happy path async def run() -> None: @@ -112,6 +211,142 @@ async def run() -> None: asyncio.run(run()) +def test_main_analyze_cluster_runs_signatures_no_logs() -> None: + """Test that the main function runs signatures when no logs are available.""" + from assisted_service_mcp.src.utils.log_analyzer import main as main_mod + + # Mock the cluster object returned by get_cluster + fake_cluster = MagicMock() + fake_cluster.to_dict.return_value = { + "cluster": { + "install_started_at": "2025-01-01T00:00:00Z", + "hosts": [ + { + "id": "h1", + "requested_hostname": "etcd-h1", # should trigger SNOHostnameHasEtcd signature + }, + ], + "high_availability_mode": "None", + } + } + fake_cluster.logs_info = "not_completed" + + fake_client = AsyncMock() + fake_client.get_cluster.return_value = fake_cluster + # should trigger SlowImageDownloadSignature signature + fake_client.get_events.return_value = '[{"name": "slow_image_download", "event_time": "2025-01-01T00:00:00Z", "message": "Host h1: New image status quay.io/openshift-release-dev/ocp-release:4.19.12-x86_64. result: downloaded; download rate: 8.0 MBps"}]' # JSON string + + # Run with an empty signature list should run all signatures that don't require logs + results = asyncio.run( + main_mod.analyze_cluster("cid", fake_client, specific_signatures=[]) + ) + assert isinstance(results, list) + assert len(results) == 2 + for result in results: + assert result.title in ["No etcd in SNO hostname", "Slow Image Download"] + if result.title in "Slow Image Download": + assert "Detected slow image download rate (MBps):" in result.content + + +def test_does_run_signature_if_logs_are_available() -> None: + """Test that a signature that requires logs runs if logs are available.""" + from assisted_service_mcp.src.utils.log_analyzer import main as main_mod + + fake_archive = MagicMock() + # These controller logs should trigger ApiInvalidCertificateSignature + fake_archive.get.return_value = 'time="2025-01-01T00:00:00Z" level=error msg="x509: certificate is valid for 127.0.0.1, not 192.168.1.1"' + + # Mock the cluster object returned by get_cluster + fake_cluster = MagicMock() + fake_cluster.to_dict.return_value = { + "install_started_at": "2025-01-01T00:00:00Z", + "hosts": [], + } + fake_cluster.logs_info = "completed" + + fake_client = AsyncMock() + fake_client.get_cluster.return_value = fake_cluster + # This event should trigger SlowImageDownloadSignature signature, which should not run + fake_client.get_events.return_value = '[{"name": "slow_image_download", "event_time": "2025-01-01T00:00:00Z", "message": "Host h1: New image status quay.io/openshift-release-dev/ocp-release:4.19.12-x86_64. result: downloaded; download rate: 8.0 MBps"}]' # JSON string + # This should not be called + fake_client.get_cluster_logs.return_value = fake_archive + + # Run with APIInvalidCertificateSignature, which should run if logs are available + results = asyncio.run( + main_mod.analyze_cluster( + "cid", fake_client, specific_signatures=["ApiInvalidCertificateSignature"] + ) + ) + assert isinstance(results, list) + assert len(results) == 1 + assert results[0].title == "Invalid SAN values on certificate for AI API" + assert results[0].severity == "error" + assert "x509: certificate is valid for" in results[0].content + + +def test_does_not_run_signature_if_logs_are_not_available() -> None: + """Test that a signature that requires logs does not run if logs are not available.""" + from assisted_service_mcp.src.utils.log_analyzer import main as main_mod + + fake_archive = MagicMock() + # These controller logs should trigger ApiInvalidCertificateSignature, which should not run because logs are not available + fake_archive.get.return_value = 'time="2025-01-01T00:00:00Z" level=error msg="x509: certificate is valid for 127.0.0.1, not 192.168.1.1"' + + # Mock the cluster object returned by get_cluster + fake_cluster = MagicMock() + fake_cluster.to_dict.return_value = { + "install_started_at": "2025-01-01T00:00:00Z", + "hosts": [], + } + fake_cluster.logs_info = "not_completed" + + fake_client = AsyncMock() + fake_client.get_cluster.return_value = fake_cluster + # This event should trigger SlowImageDownloadSignature signature, which should not run + fake_client.get_events.return_value = '[{"name": "slow_image_download", "event_time": "2025-01-01T00:00:00Z", "message": "Host h1: New image status quay.io/openshift-release-dev/ocp-release:4.19.12-x86_64. result: downloaded; download rate: 8.0 MBps"}]' # JSON string + # This should not be called + fake_client.get_cluster_logs.return_value = fake_archive + + # Run with ApiInvalidCertificateSignature, which shouldn't run if logs are not available + results = asyncio.run( + main_mod.analyze_cluster( + "cid", fake_client, specific_signatures=["ApiInvalidCertificateSignature"] + ) + ) + assert isinstance(results, list) + assert len(results) == 0 + + +def test_slow_image_download_signature_runs_no_logs() -> None: + """Test that the slow image download signature runs when logs are not available.""" + from assisted_service_mcp.src.utils.log_analyzer import main as main_mod + + # Mock the cluster object returned by get_cluster + fake_cluster = MagicMock() + fake_cluster.to_dict.return_value = { + "install_started_at": "2025-01-01T00:00:00Z", + "hosts": [], + } + fake_cluster.logs_info = "not_completed" + + fake_client = AsyncMock() + fake_client.get_cluster.return_value = fake_cluster + fake_client.get_events.return_value = '[{"name": "image_download", "event_time": "2025-01-01T00:00:00Z", "message": "Host h1: New image status quay.io/openshift-release-dev/ocp-release:4.19.12-x86_64. result: downloaded; download rate: 8.0 MBps"}]' # JSON string + + # Run with slow image download signature + async def run() -> None: + results = await main_mod.analyze_cluster( + "cid", fake_client, specific_signatures=["SlowImageDownloadSignature"] + ) + assert isinstance(results, list) + assert len(results) == 1 + assert results[0].title == "Slow Image Download" + assert results[0].severity == "warning" + assert "Detected slow image download rate (MBps):" in results[0].content + + asyncio.run(run()) + + def test_basic_info_signature_runs() -> None: from assisted_service_mcp.src.utils.log_analyzer.log_analyzer import LogAnalyzer from assisted_service_mcp.src.utils.log_analyzer.signatures.basic_info import (