Skip to content
2 changes: 1 addition & 1 deletion assisted_service_mcp/src/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _register_mcp_tools(self) -> None:
self.mcp.tool()(self._wrap_tool(cluster_tools.install_cluster))
self.mcp.tool()(self._wrap_tool(cluster_tools.set_cluster_ssh_key))
if settings.ENABLE_TROUBLESHOOTING_TOOLS:
self.mcp.tool()(self._wrap_tool(cluster_tools.analyze_cluster_logs))
self.mcp.tool()(self._wrap_tool(cluster_tools.troubleshoot_cluster))

# Register event monitoring tools
self.mcp.tool()(self._wrap_tool(event_tools.cluster_events))
Expand Down
10 changes: 5 additions & 5 deletions assisted_service_mcp/src/tools/cluster_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,19 +365,19 @@ async def set_cluster_ssh_key(


@track_tool_usage()
async def analyze_cluster_logs(
async def troubleshoot_cluster(
get_access_token_func: Callable[[], str],
cluster_id: Annotated[str, Field(description="The ID of the cluster")],
) -> str:
"""Analyze Assisted Installer logs for a cluster and summarize findings.
"""Troubleshoot an Assisted Installer cluster and its logs (if available) and summarize findings.

Runs a set of built‑in log analysis signatures against the cluster’s collected
logs (controller logs, bootstrap/control‑plane logs, and must‑gather content
Runs a set of built‑in analysis signatures against the cluster’s data or
the collected logs (controller logs, bootstrap/control‑plane logs, and must‑gather content
when available). The results highlight common misconfigurations and known
error patterns to speed up triage of failed or degraded installations.

Prerequisites:
- Logs are available for the target cluster (downloadable via the API)
- Cluster is available (downloadable via the API)

Returns:
str: Human‑readable report of signature results. Returns an empty
Expand Down
3 changes: 2 additions & 1 deletion assisted_service_mcp/src/utils/log_analyzer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
A standalone tool for analyzing OpenShift Assisted Installer logs.
"""

from .log_analyzer import LogAnalyzer
from .log_analyzer import ClusterAnalyzer, LogAnalyzer
from .signatures import ALL_SIGNATURES, SignatureResult

__version__ = "1.0.0"
__all__ = [
"ClusterAnalyzer",
"LogAnalyzer",
"ALL_SIGNATURES",
"SignatureResult",
Expand Down
141 changes: 87 additions & 54 deletions assisted_service_mcp/src/utils/log_analyzer/log_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Core log analyzer for OpenShift Assisted Installer logs.
Core analyzer for OpenShift Assisted Installer clusters and their logs.
"""

import json
Expand All @@ -19,44 +19,45 @@
)


class LogAnalyzer:
"""Analyzer for OpenShift Assisted Installer logs."""

_metadata: dict[str, Any] | None

def __init__(self, logs_archive: nestedarchive.RemoteNestedArchive):
"""
Initialize the log analyzer.
class ClusterAnalyzer:
"""Analyzer for OpenShift Assisted Installer clusters."""

Args:
logs_archive: RemoteNestedArchive containing the cluster logs
"""
self.logs_archive = logs_archive
def __init__(self):
self._metadata = None
self._cluster_events = None

def set_cluster_metadata(self, metadata: Dict[str, Any]):
"""Set cluster metadata for analyzer."""
if not metadata.get("cluster"):
# Wrap metadata in a "cluster" key to match the expected structure
metadata = {"cluster": metadata}
self._metadata = self._clean_metadata_json(metadata)

def set_cluster_events(self, events: List[Dict[str, Any]]):
"""Set cluster events for analyzer."""
self._cluster_events = events

@property
def metadata(self) -> Dict[str, Any] | None:
"""Get cluster metadata."""
if self._metadata is None:
try:
metadata_content = self.logs_archive.get("cluster_metadata.json")
raw_metadata = json.loads(cast(str | bytes, metadata_content))

# The metadata file contains cluster information at the root level
# Wrap it in a "cluster" key to match the expected structure
wrapped_metadata = {"cluster": raw_metadata}
self._metadata = self._clean_metadata_json(wrapped_metadata)
except Exception as e:
logger.error("Failed to load metadata: %s", e)
raise
return self._metadata

@property
def cluster_events(self) -> List[Dict[str, Any]] | None:
"""Get cluster events."""
return self._cluster_events

def get_all_cluster_events(self) -> List[Dict[str, Any]]:
"""Get all the cluster installation events."""
if self._cluster_events is None:
return []
return self._cluster_events

@staticmethod
def _clean_metadata_json(md: Dict[str, Any]) -> Dict[str, Any]:
"""Clean metadata JSON by separating deleted hosts."""
installation_start_time = dateutil.parser.isoparse(
md["cluster"]["install_started_at"]
str(md["cluster"]["install_started_at"])
)

Comment thread
coderabbitai[bot] marked this conversation as resolved.
def host_deleted_before_installation_started(host):
Expand Down Expand Up @@ -87,21 +88,6 @@ def get_last_install_cluster_events(self) -> List[Dict[str, Any]]:

return events

def get_all_cluster_events(self) -> List[Dict[str, Any]]:
"""Get all the cluster installation events."""
if self._cluster_events is None:
try:
events_content = self.logs_archive.get("cluster_events.json")
all_events = json.loads(cast(str | bytes, events_content))

# Get the last partition (latest installation attempt)
self._cluster_events = self.partition_cluster_events(all_events)[-1]
except Exception as e:
logger.error("Failed to load cluster events: %s", e)
self._cluster_events = []

return self._cluster_events

@staticmethod
def partition_cluster_events(
events: List[Dict[str, Any]],
Expand Down Expand Up @@ -131,6 +117,66 @@ def get_events_by_host(self) -> Dict[str, List[Dict[str, Any]]]:
events_by_host[event["host_id"]].append(event)
return events_by_host

@staticmethod
def get_hostname(host: Dict[str, Any]) -> str:
"""Extract hostname from host metadata."""
hostname = host.get("requested_hostname")
if hostname:
return hostname

try:
inventory = json.loads(host["inventory"])
return inventory["hostname"]
except (KeyError, json.JSONDecodeError):
return host.get("id", "unknown")


class LogAnalyzer(ClusterAnalyzer):
"""Analyzer for OpenShift Assisted Installer logs."""

_metadata: dict[str, Any] | None

def __init__(self, logs_archive: nestedarchive.RemoteNestedArchive):
"""
Initialize the log analyzer.

Args:
logs_archive: RemoteNestedArchive containing the cluster logs
"""
super().__init__()
self.logs_archive = logs_archive

@property
def metadata(self) -> Dict[str, Any] | None:
"""Get cluster metadata."""
if self._metadata is None:
try:
metadata_content = self.logs_archive.get("cluster_metadata.json")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit strange to me that we now sometimes get the metadata from the API and other times get it from the log bundle.

Did you consider making this consistent (I guess just always getting it from the API)?

I could see a situation where if the logs are available we're actually acting on old data where if they are not then we get the data from the live cluster.

Also it's a bit odd that the user of the ClusterAnalyzer is responsible for fetching the metadata and events from outside the object and setting them where LogAnalyzer handles that for the caller. We should probably be more consistent there as well.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then a follow on from this would be that maybe we don't need separate classes at all.

If the behavior is the same for getting the data (cluster and events) the Analyzer could detect if the logs are available and make that information public. Then the caller could filter the signatures depending on if the Analyzer detected that logs are available or not.

Or even better the signature could check analyzer.logs_available if/when it needs them and just no-op if the logs are required. Then we don't need to do any filtering at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider making this consistent (I guess just always getting it from the API)?
I could see a situation where if the logs are available we're actually acting on old data where if they are not then we get the data from the live cluster.

Yes that's exactly it, the data from the live cluster might differ from what's in the log bundle.

I've considered making it toggle-able as in either choose to use the api data or the log bundle data if it's available for the cluster metadata/events. However, this would then need to be something that the model decides when calling the tool, and I'm not sure how to do that...


If the behavior is the same for getting the data (cluster and events) the Analyzer could detect if the logs are available and make that information public. Then the caller could filter the signatures depending on if the Analyzer detected that logs are available or not.

Sure we could do it this way instead too, I was just hesitant to pass in the api client to the analyzer since the client is used outside of it.

Or even better the signature could check analyzer.logs_available if/when it needs them and just no-op if the logs are required. Then we don't need to do any filtering at all.

Yeah that works too, but it just requires more changes where we'd need to update all of the signatures that do require logs to do this check.

raw_metadata = json.loads(cast(str | bytes, metadata_content))

# The metadata file contains cluster information at the root level
# Wrap it in a "cluster" key to match the expected structure
wrapped_metadata = {"cluster": raw_metadata}
self._metadata = self._clean_metadata_json(wrapped_metadata)
except Exception as e:
logger.error("Failed to load metadata: %s", e)
raise
return self._metadata

def get_all_cluster_events(self) -> List[Dict[str, Any]]:
"""Get all the cluster installation events."""
if self._cluster_events is None:
try:
events_content = self.logs_archive.get("cluster_events.json")
all_events = json.loads(cast(str | bytes, events_content))

self._cluster_events = all_events
except Exception as e:
logger.error("Failed to load cluster events: %s", e)
self._cluster_events = []

return self._cluster_events

def get_host_log_file(self, host_id: str, filename: str) -> str:
"""
Get a specific log file for a host.
Expand Down Expand Up @@ -213,16 +259,3 @@ def get_must_gather(self) -> bytes:
"controller_logs.tar.gz/must-gather.tar.gz", mode="rb"
),
)

@staticmethod
def get_hostname(host: Dict[str, Any]) -> str:
"""Extract hostname from host metadata."""
hostname = host.get("requested_hostname")
if hostname:
return hostname

try:
inventory = json.loads(host["inventory"])
return inventory["hostname"]
except (KeyError, json.JSONDecodeError):
return host.get("id", "unknown")
82 changes: 63 additions & 19 deletions assisted_service_mcp/src/utils/log_analyzer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
Main entry point for the OpenShift Assisted Installer Log Analyzer.
"""
import logging
import json
from typing import List, Optional

from assisted_service_mcp.src.service_client.assisted_service_api import InventoryClient

from .log_analyzer import LogAnalyzer
from .signatures import ALL_SIGNATURES, SignatureResult
from assisted_service_mcp.src.utils.log_analyzer.log_analyzer import (
ClusterAnalyzer,
LogAnalyzer,
)
from assisted_service_mcp.src.utils.log_analyzer.signatures import (
ALL_SIGNATURES,
SignatureResult,
Signature,
)


async def analyze_cluster(
Expand All @@ -17,11 +24,11 @@ async def analyze_cluster(
specific_signatures: Optional[List[str]] = None,
) -> List[SignatureResult]:
"""
Analyze a cluster's logs.
Analyze a cluster by using a signature analysis tool to detect common issues and errors.

Args:
cluster_id: UUID of the cluster to analyze
api_client: Client to fetch log files with
api_client: Client to fetch the cluster and its log files with
specific_signatures: List of specific signature names to run (None for all)

Returns:
Expand All @@ -33,30 +40,52 @@ async def analyze_cluster(
logger.info("Analyzing cluster: %s", cluster_id)

try:
# Download logs
logs_archive = await api_client.get_cluster_logs(cluster_id)
# first call the api to get the cluster and check if logs are available
cluster = await api_client.get_cluster(cluster_id)

if cluster.logs_info != "completed":
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change this to actually check if we can download the logs.

I say this because when you reset a cluster the logs_info gets reset, but the actual log bundle is still present and we should be able to use it.

logger.info(
"Logs are not available for cluster: %s\nDefaulting to signatures that don't require logs",
cluster_id,
)

analyzer = ClusterAnalyzer()

# Call events API to get the events and set the events in the analyzer
events = await api_client.get_events(cluster_id)
analyzer.set_cluster_events(json.loads(events))

# Set the cluster metadata in the analyzer
analyzer.set_cluster_metadata(cluster.to_dict())

# Select signatures that don't require logs
signatures_to_run = [
sig for sig in ALL_SIGNATURES if sig.logs_required is False
]

# Initialize log analyzer
log_analyzer = LogAnalyzer(logs_archive)
else:
# Download logs
logs_archive = await api_client.get_cluster_logs(cluster_id)

# Determine which signatures to run
signatures_to_run = ALL_SIGNATURES
# Initialize log analyzer
analyzer = LogAnalyzer(logs_archive)

# Add all signatures to the list to run
signatures_to_run = ALL_SIGNATURES

# If specific signatures are provided, filter the signatures to run
if specific_signatures:
signature_classes = {sig.__name__: sig for sig in ALL_SIGNATURES}
signatures_to_run = []
for sig_name in specific_signatures:
if sig_name in signature_classes:
signatures_to_run.append(signature_classes[sig_name])
else:
logger.warning("Unknown signature: %s", sig_name)
signatures_to_run = filter_signatures(
signatures_to_run, specific_signatures
)
Comment on lines +43 to +80
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add warning when user-requested signatures are skipped due to log unavailability.

When logs are unavailable, the code filters to signatures where logs_required=False (lines 62-64), then applies user-requested specific_signatures (lines 77-80). If a user requests a signature that requires logs when logs aren't available, it will be silently skipped without any feedback.

Consider adding a warning after line 80:

         # If specific signatures are provided, filter the signatures to run
         if specific_signatures:
+            # Track which signatures were filtered out
+            before_filter = set(sig.__name__ for sig in signatures_to_run)
             signatures_to_run = filter_signatures(
                 signatures_to_run, specific_signatures
             )
+            after_filter = set(sig.__name__ for sig in signatures_to_run)
+            skipped = set(specific_signatures) - after_filter
+            if skipped and cluster.logs_info != "completed":
+                logger.warning(
+                    "Skipped signatures requiring logs (logs unavailable): %s",
+                    ", ".join(sorted(skipped))
+                )
🤖 Prompt for AI Agents
In assisted_service_mcp/src/utils/log_analyzer/main.py around lines 43 to 80,
when logs are unavailable we restrict to signatures with logs_required=False and
then apply user-requested specific_signatures, which can silently skip requested
signatures that require logs; after applying the specific_signatures filter
(i.e., after line ~80) detect the case where cluster.logs_info != "completed"
and specific_signatures was provided, compute which requested signatures were
removed (requested minus resulting), and emit a logger.warning listing those
skipped signatures and that they were skipped because logs are unavailable;
ensure signatures_to_run is a concrete list before computing the difference so
the comparison works.


# Run signatures
results = []
for signature_class in signatures_to_run:
logger.debug("Running signature: %s", signature_class.__name__)
try:
signature = signature_class()
result = signature.analyze(log_analyzer)
result = signature.analyze(analyzer)
if result:
results.append(result)
except Exception as e:
Expand All @@ -71,6 +100,21 @@ async def analyze_cluster(
raise


def filter_signatures(
signatures: List[type[Signature]], specific_signatures: List[str]
) -> List[type[Signature]]:
"""Filter signatures to run based on specific signatures."""
logger = logging.getLogger(__name__)
signature_classes = {sig.__name__: sig for sig in signatures}
filtered_signatures = []
for sig_name in specific_signatures:
if sig_name in signature_classes:
filtered_signatures.append(signature_classes[sig_name])
else:
logger.warning("Unknown signature: %s", sig_name)
return filtered_signatures


def print_results(results: List[SignatureResult]) -> None:
"""Print analysis results to stdout."""
if not results:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def filter_operators(
class EventsInstallationAttempts(Signature):
"""Inspects events file to check for multiple installation attempts."""

logs_required = False

def analyze(self, log_analyzer) -> Optional[SignatureResult]:
"""Analyze multiple installation attempts."""
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def __str__(self) -> str:
class Signature(abc.ABC):
"""Base class for signature analysis."""

logs_required = True

def __init__(self):
"""Initialize the signature."""
self.name = self.__class__.__name__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
class ComponentsVersionSignature(Signature):
"""Analyzes component versions."""

logs_required = False

def analyze(self, log_analyzer) -> Optional[SignatureResult]:
"""Analyze component versions."""
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def _search_patterns_in_string(string, patterns):
class SNOHostnameHasEtcd(ErrorSignature):
"""Looks for etcd in SNO hostname (OCPBUGS-15852)."""

logs_required = False

def analyze(self, log_analyzer) -> Optional[SignatureResult]:
"""Analyze SNO hostname for etcd."""
try:
Expand Down
Loading