Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/mypy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
- name: Install dependencies
run: uv sync
- name: Python linter
run: uv run mypy --explicit-package-bases --disallow-untyped-calls --disallow-untyped-defs --disallow-incomplete-defs --ignore-missing-imports --disable-error-code attr-defined .
run: uv run mypy --config-file pyproject.toml .
20 changes: 0 additions & 20 deletions .github/workflows/pydocstyle.yaml

This file was deleted.

1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN uv sync
COPY server.py .
COPY service_client ./service_client/
COPY static_net ./static_net/
COPY log_analyzer ./log_analyzer/
COPY metrics ./metrics/

RUN chown -R 1001:0 ${APP_HOME}
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ ruff:
uv run ruff check .

check-types:
uv run mypy --explicit-package-bases --disallow-untyped-calls --disallow-untyped-defs --disallow-incomplete-defs --ignore-missing-imports --disable-error-code attr-defined .
uv run mypy --config-file pyproject.toml .

verify: black pylint pyright docstyle ruff check-types test
verify: black pylint pyright ruff check-types test

format:
uv run black .
Expand Down
15 changes: 15 additions & 0 deletions log_analyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
OpenShift Assisted Installer Log Analyzer.

A standalone tool for analyzing OpenShift Assisted Installer logs.
"""

from .log_analyzer import LogAnalyzer
from .signatures import ALL_SIGNATURES, SignatureResult

__version__ = "1.0.0"
__all__ = [
"LogAnalyzer",
"ALL_SIGNATURES",
"SignatureResult",
]
228 changes: 228 additions & 0 deletions log_analyzer/log_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""
Core log analyzer for OpenShift Assisted Installer logs.
"""

import json
import logging
from collections import defaultdict
from typing import Dict, List, Any, cast

import dateutil.parser
import nestedarchive

logger = logging.getLogger(__name__)

# Archive path constants for different log bundle formats
NEW_LOG_BUNDLE_PATH = "*_bootstrap_*.tar/*_bootstrap_*.tar.gz/logs_host_*/log-bundle-*.tar.gz/log-bundle-*"
OLD_LOG_BUNDLE_PATH = (
"*_bootstrap_*.tar.gz/logs_host_*/log-bundle-*.tar.gz/log-bundle-*"
)


class LogAnalyzer:
"""Analyzer for OpenShift Assisted Installer logs."""

_metadata: dict[str, Any] | None

def __init__(self, logs_archive: nestedarchive.RemoteNestedArchive):
"""
Initialize the log analyzer.

Args:
logs_archive: RemoteNestedArchive containing the cluster logs
"""
self.logs_archive = logs_archive
self._metadata = None
self._cluster_events = None

Comment thread
keitwb marked this conversation as resolved.
@property
def metadata(self) -> Dict[str, Any] | None:
"""Get cluster metadata."""
if self._metadata is None:
try:
metadata_content = self.logs_archive.get("cluster_metadata.json")
raw_metadata = json.loads(cast(str | bytes, metadata_content))

# The metadata file contains cluster information at the root level
# Wrap it in a "cluster" key to match the expected structure
wrapped_metadata = {"cluster": raw_metadata}
self._metadata = self._clean_metadata_json(wrapped_metadata)
except Exception as e:
logger.error("Failed to load metadata: %s", e)
raise
return self._metadata

@staticmethod
def _clean_metadata_json(md: Dict[str, Any]) -> Dict[str, Any]:
"""Clean metadata JSON by separating deleted hosts."""
installation_start_time = dateutil.parser.isoparse(
md["cluster"]["install_started_at"]
)

def host_deleted_before_installation_started(host):
if deleted_at := host.get("deleted_at"):
return dateutil.parser.isoparse(deleted_at) < installation_start_time
return False

all_hosts = md["cluster"]["hosts"]
md["cluster"]["deleted_hosts"] = [
h for h in all_hosts if host_deleted_before_installation_started(h)
]
md["cluster"]["hosts"] = [
h for h in all_hosts if not host_deleted_before_installation_started(h)
]

return md

def get_last_install_cluster_events(self) -> List[Dict[str, Any]]:
"""Get the cluster installation events for the most recent attempt."""
try:
all_events = self.get_all_cluster_events()

# Get the last partition (latest installation attempt)
events = self.partition_cluster_events(all_events)[-1]
except Exception as e:
logger.error("Failed to load cluster events: %s", e)
return []

return events

def get_all_cluster_events(self) -> List[Dict[str, Any]]:
"""Get all the cluster installation events."""
if self._cluster_events is None:
try:
events_content = self.logs_archive.get("cluster_events.json")
all_events = json.loads(cast(str | bytes, events_content))

# Get the last partition (latest installation attempt)
self._cluster_events = self.partition_cluster_events(all_events)[-1]
except Exception as e:
logger.error("Failed to load cluster events: %s", e)
self._cluster_events = []

return self._cluster_events

@staticmethod
def partition_cluster_events(
events: List[Dict[str, Any]],
) -> List[List[Dict[str, Any]]]:
"""Partition events by reset events to separate installation attempts."""
partitions = []
current_partition = []

for event in events:
if event["name"] == "cluster_installation_reset":
if current_partition:
partitions.append(current_partition)
current_partition = []
else:
current_partition.append(event)

if current_partition:
partitions.append(current_partition)

return partitions or [[]]

def get_events_by_host(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get events grouped by host ID."""
events_by_host = defaultdict(list)
for event in self.get_last_install_cluster_events():
if "host_id" in event:
events_by_host[event["host_id"]].append(event)
return events_by_host

def get_host_log_file(self, host_id: str, filename: str) -> str:
"""
Get a specific log file for a host.

Args:
host_id: Host UUID
filename: Name of the log file (e.g., 'agent.logs', 'journal.logs')

Returns:
Content of the log file

Raises:
FileNotFoundError: If the log file cannot be found
"""
hostname = "*" # Use wildcard since hostname is not always known

# Try new log path format first
new_logs_path = (
f"{hostname}.tar/{hostname}.tar.gz/logs_host_{host_id}/{filename}"
)
try:
content = self.logs_archive.get(new_logs_path)
logger.debug("Found logs under new location: %s", new_logs_path)
return cast(str, content)
except FileNotFoundError:
pass

# Fall back to old log path format
old_logs_path = f"{hostname}.tar.gz/logs_host_{host_id}/{filename}"
content = self.logs_archive.get(old_logs_path)
logger.debug("Found logs under old location: %s", old_logs_path)
return cast(str, content)

def get_journal_log(self, host_ip: str, journal_file: str, **kwargs) -> str:
"""
Get journal logs for a specific host.

Args:
host_ip: IP address of the host
journal_file: Name of the journal file
**kwargs: Additional arguments for the archive get method

Returns:
Content of the journal file

Raises:
FileNotFoundError: If the journal file cannot be found
"""
new_logs_path = (
f"{NEW_LOG_BUNDLE_PATH}/control-plane/{host_ip}/journals/{journal_file}"
)
try:
content = self.logs_archive.get(new_logs_path, **kwargs)
logger.debug("Found journal under new location: %s", new_logs_path)
return cast(str, content)
except FileNotFoundError:
pass

old_logs_path = (
f"{OLD_LOG_BUNDLE_PATH}/control-plane/{host_ip}/journals/{journal_file}"
)
content = self.logs_archive.get(old_logs_path, **kwargs)
logger.debug("Found journal under old location: %s", old_logs_path)
return cast(str, content)

def get_controller_logs(self) -> str:
"""Get assisted installer controller logs."""
return cast(
str,
self.logs_archive.get(
"controller_logs.tar.gz/assisted-installer-controller*.logs"
),
)

def get_must_gather(self) -> bytes:
"""Get must-gather logs."""
return cast(
bytes,
self.logs_archive.get(
"controller_logs.tar.gz/must-gather.tar.gz", mode="rb"
),
)

@staticmethod
def get_hostname(host: Dict[str, Any]) -> str:
"""Extract hostname from host metadata."""
hostname = host.get("requested_hostname")
if hostname:
return hostname

try:
inventory = json.loads(host["inventory"])
return inventory["hostname"]
except (KeyError, json.JSONDecodeError):
return host.get("id", "unknown")
85 changes: 85 additions & 0 deletions log_analyzer/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
Main entry point for the OpenShift Assisted Installer Log Analyzer.
"""
import logging
from typing import List, Optional

from service_client.assisted_service_api import InventoryClient

from .log_analyzer import LogAnalyzer
from .signatures import ALL_SIGNATURES, SignatureResult


async def analyze_cluster(
cluster_id: str,
api_client: InventoryClient,
specific_signatures: Optional[List[str]] = None,
) -> List[SignatureResult]:
"""
Analyze a cluster's logs.

Args:
cluster_id: UUID of the cluster to analyze
api_client: Client to fetch log files with
specific_signatures: List of specific signature names to run (None for all)

Returns:
List of SignatureResult objects
"""
Comment thread
coderabbitai[bot] marked this conversation as resolved.
logger = logging.getLogger(__name__)

# Initialize API client
logger.info("Analyzing cluster: %s", cluster_id)

try:
# Download logs
logs_archive = await api_client.get_cluster_logs(cluster_id)

# Initialize log analyzer
log_analyzer = LogAnalyzer(logs_archive)

# Determine which signatures to run
signatures_to_run = ALL_SIGNATURES
if specific_signatures:
signature_classes = {sig.__name__: sig for sig in ALL_SIGNATURES}
signatures_to_run = []
for sig_name in specific_signatures:
if sig_name in signature_classes:
signatures_to_run.append(signature_classes[sig_name])
else:
logger.warning("Unknown signature: %s", sig_name)

# Run signatures
results = []
for signature_class in signatures_to_run:
logger.debug("Running signature: %s", signature_class.__name__)
try:
signature = signature_class()
result = signature.analyze(log_analyzer)
if result:
results.append(result)
except Exception as e:
logger.error(
"Error running signature %s: %s", signature_class.__name__, e
)

return results

except Exception as e:
logger.error("Error analyzing cluster %s: %s", cluster_id, e)
raise


def print_results(results: List[SignatureResult]) -> None:
"""Print analysis results to stdout."""
if not results:
print("No issues found in the cluster logs.")
return

print("OpenShift Assisted Installer Log Analysis")
print("=" * 50)
print()

for result in results:
print(result)
Loading