diff --git a/.github/workflows/mypy.yaml b/.github/workflows/mypy.yaml index 9d93815..825fd4c 100644 --- a/.github/workflows/mypy.yaml +++ b/.github/workflows/mypy.yaml @@ -19,4 +19,4 @@ jobs: - name: Install dependencies run: uv sync - name: Python linter - run: uv run mypy --explicit-package-bases --disallow-untyped-calls --disallow-untyped-defs --disallow-incomplete-defs --ignore-missing-imports --disable-error-code attr-defined . + run: uv run mypy --config-file pyproject.toml . diff --git a/.github/workflows/pydocstyle.yaml b/.github/workflows/pydocstyle.yaml deleted file mode 100644 index 0b6e971..0000000 --- a/.github/workflows/pydocstyle.yaml +++ /dev/null @@ -1,20 +0,0 @@ -name: Pydocstyle - -on: - - push - - pull_request - -jobs: - pydocstyle: - runs-on: ubuntu-latest - permissions: - contents: read - pull-requests: read - steps: - - uses: actions/checkout@v4 - - name: Install uv - uses: astral-sh/setup-uv@v5 - with: - python-version: '3.13' - - name: Python linter - run: uv tool run pydocstyle -v . diff --git a/Dockerfile b/Dockerfile index 37a505d..5d436cd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ RUN uv sync COPY server.py . COPY service_client ./service_client/ COPY static_net ./static_net/ +COPY log_analyzer ./log_analyzer/ COPY metrics ./metrics/ RUN chown -R 1001:0 ${APP_HOME} diff --git a/Makefile b/Makefile index 00f324c..a05afb6 100644 --- a/Makefile +++ b/Makefile @@ -55,9 +55,9 @@ ruff: uv run ruff check . check-types: - uv run mypy --explicit-package-bases --disallow-untyped-calls --disallow-untyped-defs --disallow-incomplete-defs --ignore-missing-imports --disable-error-code attr-defined . + uv run mypy --config-file pyproject.toml . -verify: black pylint pyright docstyle ruff check-types test +verify: black pylint pyright ruff check-types test format: uv run black . diff --git a/log_analyzer/__init__.py b/log_analyzer/__init__.py new file mode 100644 index 0000000..ef5f8b4 --- /dev/null +++ b/log_analyzer/__init__.py @@ -0,0 +1,15 @@ +""" +OpenShift Assisted Installer Log Analyzer. + +A standalone tool for analyzing OpenShift Assisted Installer logs. +""" + +from .log_analyzer import LogAnalyzer +from .signatures import ALL_SIGNATURES, SignatureResult + +__version__ = "1.0.0" +__all__ = [ + "LogAnalyzer", + "ALL_SIGNATURES", + "SignatureResult", +] diff --git a/log_analyzer/log_analyzer.py b/log_analyzer/log_analyzer.py new file mode 100644 index 0000000..e35b6a8 --- /dev/null +++ b/log_analyzer/log_analyzer.py @@ -0,0 +1,228 @@ +""" +Core log analyzer for OpenShift Assisted Installer logs. +""" + +import json +import logging +from collections import defaultdict +from typing import Dict, List, Any, cast + +import dateutil.parser +import nestedarchive + +logger = logging.getLogger(__name__) + +# Archive path constants for different log bundle formats +NEW_LOG_BUNDLE_PATH = "*_bootstrap_*.tar/*_bootstrap_*.tar.gz/logs_host_*/log-bundle-*.tar.gz/log-bundle-*" +OLD_LOG_BUNDLE_PATH = ( + "*_bootstrap_*.tar.gz/logs_host_*/log-bundle-*.tar.gz/log-bundle-*" +) + + +class LogAnalyzer: + """Analyzer for OpenShift Assisted Installer logs.""" + + _metadata: dict[str, Any] | None + + def __init__(self, logs_archive: nestedarchive.RemoteNestedArchive): + """ + Initialize the log analyzer. + + Args: + logs_archive: RemoteNestedArchive containing the cluster logs + """ + self.logs_archive = logs_archive + self._metadata = None + self._cluster_events = None + + @property + def metadata(self) -> Dict[str, Any] | None: + """Get cluster metadata.""" + if self._metadata is None: + try: + metadata_content = self.logs_archive.get("cluster_metadata.json") + raw_metadata = json.loads(cast(str | bytes, metadata_content)) + + # The metadata file contains cluster information at the root level + # Wrap it in a "cluster" key to match the expected structure + wrapped_metadata = {"cluster": raw_metadata} + self._metadata = self._clean_metadata_json(wrapped_metadata) + except Exception as e: + logger.error("Failed to load metadata: %s", e) + raise + return self._metadata + + @staticmethod + def _clean_metadata_json(md: Dict[str, Any]) -> Dict[str, Any]: + """Clean metadata JSON by separating deleted hosts.""" + installation_start_time = dateutil.parser.isoparse( + md["cluster"]["install_started_at"] + ) + + def host_deleted_before_installation_started(host): + if deleted_at := host.get("deleted_at"): + return dateutil.parser.isoparse(deleted_at) < installation_start_time + return False + + all_hosts = md["cluster"]["hosts"] + md["cluster"]["deleted_hosts"] = [ + h for h in all_hosts if host_deleted_before_installation_started(h) + ] + md["cluster"]["hosts"] = [ + h for h in all_hosts if not host_deleted_before_installation_started(h) + ] + + return md + + def get_last_install_cluster_events(self) -> List[Dict[str, Any]]: + """Get the cluster installation events for the most recent attempt.""" + try: + all_events = self.get_all_cluster_events() + + # Get the last partition (latest installation attempt) + events = self.partition_cluster_events(all_events)[-1] + except Exception as e: + logger.error("Failed to load cluster events: %s", e) + return [] + + return events + + def get_all_cluster_events(self) -> List[Dict[str, Any]]: + """Get all the cluster installation events.""" + if self._cluster_events is None: + try: + events_content = self.logs_archive.get("cluster_events.json") + all_events = json.loads(cast(str | bytes, events_content)) + + # Get the last partition (latest installation attempt) + self._cluster_events = self.partition_cluster_events(all_events)[-1] + except Exception as e: + logger.error("Failed to load cluster events: %s", e) + self._cluster_events = [] + + return self._cluster_events + + @staticmethod + def partition_cluster_events( + events: List[Dict[str, Any]], + ) -> List[List[Dict[str, Any]]]: + """Partition events by reset events to separate installation attempts.""" + partitions = [] + current_partition = [] + + for event in events: + if event["name"] == "cluster_installation_reset": + if current_partition: + partitions.append(current_partition) + current_partition = [] + else: + current_partition.append(event) + + if current_partition: + partitions.append(current_partition) + + return partitions or [[]] + + def get_events_by_host(self) -> Dict[str, List[Dict[str, Any]]]: + """Get events grouped by host ID.""" + events_by_host = defaultdict(list) + for event in self.get_last_install_cluster_events(): + if "host_id" in event: + events_by_host[event["host_id"]].append(event) + return events_by_host + + def get_host_log_file(self, host_id: str, filename: str) -> str: + """ + Get a specific log file for a host. + + Args: + host_id: Host UUID + filename: Name of the log file (e.g., 'agent.logs', 'journal.logs') + + Returns: + Content of the log file + + Raises: + FileNotFoundError: If the log file cannot be found + """ + hostname = "*" # Use wildcard since hostname is not always known + + # Try new log path format first + new_logs_path = ( + f"{hostname}.tar/{hostname}.tar.gz/logs_host_{host_id}/{filename}" + ) + try: + content = self.logs_archive.get(new_logs_path) + logger.debug("Found logs under new location: %s", new_logs_path) + return cast(str, content) + except FileNotFoundError: + pass + + # Fall back to old log path format + old_logs_path = f"{hostname}.tar.gz/logs_host_{host_id}/{filename}" + content = self.logs_archive.get(old_logs_path) + logger.debug("Found logs under old location: %s", old_logs_path) + return cast(str, content) + + def get_journal_log(self, host_ip: str, journal_file: str, **kwargs) -> str: + """ + Get journal logs for a specific host. + + Args: + host_ip: IP address of the host + journal_file: Name of the journal file + **kwargs: Additional arguments for the archive get method + + Returns: + Content of the journal file + + Raises: + FileNotFoundError: If the journal file cannot be found + """ + new_logs_path = ( + f"{NEW_LOG_BUNDLE_PATH}/control-plane/{host_ip}/journals/{journal_file}" + ) + try: + content = self.logs_archive.get(new_logs_path, **kwargs) + logger.debug("Found journal under new location: %s", new_logs_path) + return cast(str, content) + except FileNotFoundError: + pass + + old_logs_path = ( + f"{OLD_LOG_BUNDLE_PATH}/control-plane/{host_ip}/journals/{journal_file}" + ) + content = self.logs_archive.get(old_logs_path, **kwargs) + logger.debug("Found journal under old location: %s", old_logs_path) + return cast(str, content) + + def get_controller_logs(self) -> str: + """Get assisted installer controller logs.""" + return cast( + str, + self.logs_archive.get( + "controller_logs.tar.gz/assisted-installer-controller*.logs" + ), + ) + + def get_must_gather(self) -> bytes: + """Get must-gather logs.""" + return cast( + bytes, + self.logs_archive.get( + "controller_logs.tar.gz/must-gather.tar.gz", mode="rb" + ), + ) + + @staticmethod + def get_hostname(host: Dict[str, Any]) -> str: + """Extract hostname from host metadata.""" + hostname = host.get("requested_hostname") + if hostname: + return hostname + + try: + inventory = json.loads(host["inventory"]) + return inventory["hostname"] + except (KeyError, json.JSONDecodeError): + return host.get("id", "unknown") diff --git a/log_analyzer/main.py b/log_analyzer/main.py new file mode 100644 index 0000000..1947323 --- /dev/null +++ b/log_analyzer/main.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +""" +Main entry point for the OpenShift Assisted Installer Log Analyzer. +""" +import logging +from typing import List, Optional + +from service_client.assisted_service_api import InventoryClient + +from .log_analyzer import LogAnalyzer +from .signatures import ALL_SIGNATURES, SignatureResult + + +async def analyze_cluster( + cluster_id: str, + api_client: InventoryClient, + specific_signatures: Optional[List[str]] = None, +) -> List[SignatureResult]: + """ + Analyze a cluster's logs. + + Args: + cluster_id: UUID of the cluster to analyze + api_client: Client to fetch log files with + specific_signatures: List of specific signature names to run (None for all) + + Returns: + List of SignatureResult objects + """ + logger = logging.getLogger(__name__) + + # Initialize API client + logger.info("Analyzing cluster: %s", cluster_id) + + try: + # Download logs + logs_archive = await api_client.get_cluster_logs(cluster_id) + + # Initialize log analyzer + log_analyzer = LogAnalyzer(logs_archive) + + # Determine which signatures to run + signatures_to_run = ALL_SIGNATURES + if specific_signatures: + signature_classes = {sig.__name__: sig for sig in ALL_SIGNATURES} + signatures_to_run = [] + for sig_name in specific_signatures: + if sig_name in signature_classes: + signatures_to_run.append(signature_classes[sig_name]) + else: + logger.warning("Unknown signature: %s", sig_name) + + # Run signatures + results = [] + for signature_class in signatures_to_run: + logger.debug("Running signature: %s", signature_class.__name__) + try: + signature = signature_class() + result = signature.analyze(log_analyzer) + if result: + results.append(result) + except Exception as e: + logger.error( + "Error running signature %s: %s", signature_class.__name__, e + ) + + return results + + except Exception as e: + logger.error("Error analyzing cluster %s: %s", cluster_id, e) + raise + + +def print_results(results: List[SignatureResult]) -> None: + """Print analysis results to stdout.""" + if not results: + print("No issues found in the cluster logs.") + return + + print("OpenShift Assisted Installer Log Analysis") + print("=" * 50) + print() + + for result in results: + print(result) diff --git a/log_analyzer/signatures/__init__.py b/log_analyzer/signatures/__init__.py new file mode 100644 index 0000000..a1b809d --- /dev/null +++ b/log_analyzer/signatures/__init__.py @@ -0,0 +1,33 @@ +""" +Signature analysis modules for OpenShift Assisted Installer logs. +""" + +import sys +import inspect + +from .base import Signature, ErrorSignature, SignatureResult +from .basic_info import * # noqa +from .error_detection import * # noqa +from .performance import * # noqa +from .networking import * # noqa +from .advanced_analysis import * # noqa +from .platform_specific import * # noqa + +# Collect all signatures from all modules +ALL_SIGNATURES = [] + +current_module = sys.modules[__name__] +for name, obj in inspect.getmembers(current_module): + if ( + inspect.isclass(obj) + and issubclass(obj, Signature) + and obj is not Signature + and obj is not ErrorSignature + and obj is not SignatureResult + ): + ALL_SIGNATURES.append(obj) + +# Sort by name for consistent ordering +ALL_SIGNATURES.sort(key=lambda x: x.__name__) + +__all__ = ["Signature", "ErrorSignature", "SignatureResult", "ALL_SIGNATURES"] diff --git a/log_analyzer/signatures/advanced_analysis.py b/log_analyzer/signatures/advanced_analysis.py new file mode 100644 index 0000000..438a896 --- /dev/null +++ b/log_analyzer/signatures/advanced_analysis.py @@ -0,0 +1,474 @@ +""" +Advanced analysis signatures for OpenShift Assisted Installer logs. +These signatures perform complex analysis across multiple log sources. +""" + +import json +import logging +import re +from collections import Counter, OrderedDict +from typing import Any, Generator, Optional, Callable, cast + +from log_analyzer.log_analyzer import NEW_LOG_BUNDLE_PATH, OLD_LOG_BUNDLE_PATH + +from .base import Signature, SignatureResult + + +def operator_statuses_from_controller_logs( + controller_log: str, include_empty: bool = False +): + operator_regex = re.compile(r"Operator ([a-z\-]+), statuses: \[(.*)\].*") + conditions_regex = re.compile(r"\{(.+?)\}") + condition_regex = re.compile( + r"([A-Za-z]+) (False|True) ([0-9a-zA-Z\-]+ [0-9a-zA-Z\:]+ [0-9a-zA-Z\-\+]+ [A-Z]+) (.*)" + ) + operator_statuses = {} + + for operator_name, operator_status in operator_regex.findall(controller_log): + if include_empty: + operator_statuses[operator_name] = {} + operator_conditions = operator_statuses.setdefault(operator_name, {}) + for operator_conditions_raw in conditions_regex.findall(operator_status): + for ( + condition_name, + condition_result, + condition_timestamp, + condition_reason, + ) in condition_regex.findall(operator_conditions_raw): + operator_conditions[condition_name] = { + "result": condition_result == "True", + "timestamp": condition_timestamp, + "reason": condition_reason, + } + + return operator_statuses + + +def condition_has_result( + operator_conditions, expected_condition_name: str, expected_condition_result: bool +) -> bool: + return any( + condition_values["result"] == expected_condition_result + for condition_name, condition_values in operator_conditions.items() + if condition_name == expected_condition_name + ) + + +def filter_operators( + operator_statuses, + required_conditions, + aggregation_function: Callable[[Generator[Any, None, None]], bool], +): + return { + operator_name: operator_conditions + for operator_name, operator_conditions in operator_statuses.items() + if aggregation_function( + condition_has_result( + operator_conditions, required_condition_name, expected_condition_result + ) + for required_condition_name, expected_condition_result in required_conditions + ) + } + + +logger = logging.getLogger(__name__) + + +class EventsInstallationAttempts(Signature): + """Inspects events file to check for multiple installation attempts.""" + + def analyze(self, log_analyzer) -> Optional[SignatureResult]: + """Analyze multiple installation attempts.""" + try: + # Get all cluster events and partition them by reset events + all_events = log_analyzer.get_all_cluster_events() + partitions = log_analyzer.partition_cluster_events(all_events) + installation_attempts = len(partitions) + + if installation_attempts != 1: + current_events = log_analyzer.get_last_install_cluster_events() + if current_events: + last_attempt_first_event = current_events[0] + content = ( + f"The events file for this cluster contains events from {installation_attempts} installation attempts.\n" + f"When reading the events for this ticket, make sure you look only at the events for the last installation attempt,\n" + f"the first event in that attempt happened around {last_attempt_first_event['event_time']}." + ) + + return SignatureResult( + signature_name=self.name, + title="Multiple Installation Attempts in Events File", + content=content, + severity="warning", + ) + + except Exception as e: + logger.error("Error in EventsInstallationAttempts: %s", e) + + return None + + +class MissingMustGatherLogs(Signature): + """Checks if must-gather logs are missing when they should be collected.""" + + def analyze(self, log_analyzer) -> Optional[SignatureResult]: + """Analyze for missing must-gather logs.""" + try: + metadata = log_analyzer.metadata + cluster_hosts = metadata["cluster"]["hosts"] + bootstrap_node = [ + host for host in cluster_hosts if host.get("bootstrap", False) + ] + + if not bootstrap_node: + return None + + bootstrap_node = bootstrap_node[0] + + eligible_bootstrap_stages = ["Rebooting", "Configuring", "Joined", "Done"] + if len(cluster_hosts) <= 1: + # In SNO when the bootstrap node goes to reboot the controller is still not running + eligible_bootstrap_stages.remove("Rebooting") + + if ( + bootstrap_node["progress"]["current_stage"] + not in eligible_bootstrap_stages + ): + return None + + cluster = metadata["cluster"] + if cluster["logs_info"] in ("timeout", "completed"): + try: + log_analyzer.get_must_gather() + return None # Must-gather exists, no issue + except FileNotFoundError: + content = "This cluster's collected logs are missing must-gather logs although it should be collected, why is it missing?" + + return SignatureResult( + signature_name=self.name, + title="Missing Must-Gather Logs", + content=content, + severity="error", + ) + + except Exception as e: + logger.error("Error in MissingMustGatherLogs: %s", e) + + return None + + +class FlappingValidations(Signature): + """Analyzes flapping validation states.""" + + validation_name_regexp = re.compile(r"Host .+: validation '(.+)'.+") + succeed_to_failing_regexp = re.compile( + r"Host .+: validation '.+' that used to succeed is now failing" + ) + now_fixed_regexp = re.compile(r"Host .+: validation '.+' is now fixed") + + def analyze(self, log_analyzer) -> Optional[SignatureResult]: + """Analyze flapping validations.""" + try: + events_by_host = log_analyzer.get_events_by_host() + + host_tables = {} + for host_id, events in events_by_host.items(): + succeed_to_failing_counter = Counter( + cast( + re.Match[str], + self.validation_name_regexp.match(event["message"]), + ).groups()[0] + for event in events + if self.succeed_to_failing_regexp.match(event["message"]) + ) + + now_fixed = Counter( + cast( + re.Match[str], + self.validation_name_regexp.match(event["message"]), + ).groups()[0] + for event in events + if self.now_fixed_regexp.match(event["message"]) + ) + + table = [ + OrderedDict[str, Any]( + validation=validation_name, + failed=f"This went from succeeding to failing {succeed_to_failing_occurrences} times", + fixed=f"This validation was fixed {now_fixed.get(validation_name, 0)} times", + ) + for validation_name, succeed_to_failing_occurrences in succeed_to_failing_counter.items() + ] + + if table: + host_tables[host_id] = self.generate_table(table) + + if host_tables: + content = "\n".join( + f"Host ID {host_id}:\n{table}" + for host_id, table in host_tables.items() + ) + + return SignatureResult( + signature_name=self.name, + title="Flapping Validations", + content=content, + severity="warning", + ) + + except Exception as e: + logger.error("Error in FlappingValidations: %s", e) + + return None + + +class NodeStatus(Signature): + """Dump node statuses from installer gather nodes.json.""" + + def analyze(self, log_analyzer) -> Optional[SignatureResult]: + for base in (NEW_LOG_BUNDLE_PATH, OLD_LOG_BUNDLE_PATH): + path = f"{base}/resources/nodes.json" + try: + nodes_json = log_analyzer.logs_archive.get(path) + except FileNotFoundError: + continue + try: + nodes = json.loads(nodes_json) + except json.JSONDecodeError: + continue + nodes_table = [] + for node in nodes.get("items", []): + + def get_by_type(t, node): + conds = node.get("status", {}).get("conditions", []) + + c = next((c for c in conds if c.get("type") == t), None) + if not c: + return "(Condition not found)" + return f"Status {c['status']} with reason {c['reason']}, message {c['message']}" + + nodes_table.append( + OrderedDict( + name=node.get("metadata", {}).get("name"), + MemoryPressure=get_by_type("MemoryPressure", node), + DiskPressure=get_by_type("DiskPressure", node), + PIDPressure=get_by_type("PIDPressure", node), + Ready=get_by_type("Ready", node), + ) + ) + if nodes_table: + return SignatureResult( + signature_name=self.name, + title="Collected nodes.json from installer gather", + content=self.generate_table(nodes_table), + severity="info", + ) + return SignatureResult( + signature_name=self.name, + title="Collected nodes.json from installer gather", + content=( + "The nodes.json file doesn't have any node resources in it. You should probably check the kubelet logs for the 2 non-bootstrap control-plane hosts" + ), + severity="warning", + ) + return None + + +class ControllerWarnings(Signature): + """Search for warnings in controller logs.""" + + def analyze(self, log_analyzer) -> Optional[SignatureResult]: + try: + controller_logs = log_analyzer.get_controller_logs() + except FileNotFoundError: + return None + warnings = re.findall(r'time=".*" level=warning msg=".*', controller_logs) + if warnings: + shown = warnings[:10] + content = "\n".join(shown) + if len(warnings) > 10: + content += ( + f"\nThere are {len(warnings) - 10} additional warnings not shown" + ) + return SignatureResult( + signature_name=self.name, + title="Controller warning logs", + content=content, + severity="warning", + ) + return None + + +class UserHasLoggedIntoCluster(Signature): + """Detect user login to cluster nodes during installation.""" + + USER_LOGIN_PATTERN = re.compile( + r"pam_unix\((sshd|login):session\): session opened for user .+ by" + ) + + def analyze(self, log_analyzer) -> Optional[SignatureResult]: + cluster = log_analyzer.metadata.get("cluster", {}) + msgs = [] + for host in cluster.get("hosts", []): + host_id = host["id"] + try: + journal_logs = log_analyzer.get_host_log_file(host_id, "journal.logs") + except FileNotFoundError: + continue + if self.USER_LOGIN_PATTERN.findall(journal_logs): + msgs.append( + f"Host {host_id}: found evidence of a user login during installation. This might indicate that some settings have been changed manually; if incorrect they could contribute to failure." + ) + if msgs: + return SignatureResult( + signature_name=self.name, + title="User has logged into cluster nodes during installation", + content="\n".join(msgs), + severity="warning", + ) + return None + + +class FailedRequestTriggersHostTimeout(Signature): + """Look for failed requests that could have caused host timeout.""" + + LOG_PATTERN = re.compile( + r'time="(?P