From a7cb78d6bd7d99cd91819b9a2ae3dacacd3da4fe Mon Sep 17 00:00:00 2001 From: dougyster Date: Mon, 17 Nov 2025 18:39:47 -0500 Subject: [PATCH 1/2] update ci failure monitor --- .github/workflows/ci-failure-monitor.yml | 8 +- scripts/ci_monitor/README.md | 31 ++ scripts/ci_monitor/ci_failures_analysis.py | 613 ++++++++++++++++++--- 3 files changed, 557 insertions(+), 95 deletions(-) diff --git a/.github/workflows/ci-failure-monitor.yml b/.github/workflows/ci-failure-monitor.yml index 2173222ccce0..665ef4757ad5 100644 --- a/.github/workflows/ci-failure-monitor.yml +++ b/.github/workflows/ci-failure-monitor.yml @@ -6,9 +6,9 @@ on: workflow_dispatch: inputs: limit: - description: 'Number of workflow runs to analyze' + description: 'Number of workflow runs to analyze (across all workflows)' required: false - default: '300' + default: '800' type: string threshold: description: 'Alert threshold for consecutive failures' @@ -51,8 +51,8 @@ jobs: cd scripts/ci_monitor python ci_failures_analysis.py \ --token $GITHUB_TOKEN \ - --limit ${{ inputs.limit || '300' }} \ - --threshold ${{ inputs.threshold || '2' }} \ + --limit ${{ inputs.limit || '800' }} \ + --threshold ${{ inputs.threshold || '4' }} \ --output ci_failure_analysis_$(date +%Y%m%d_%H%M%S).json - name: Upload Analysis Results diff --git a/scripts/ci_monitor/README.md b/scripts/ci_monitor/README.md index 42b3c9be9f7d..4c0f953ddd04 100644 --- a/scripts/ci_monitor/README.md +++ b/scripts/ci_monitor/README.md @@ -40,8 +40,11 @@ A comprehensive toolkit to analyze CI failures and performance trends for the SG ### Failures Analyzer (`ci_failures_analysis.py`) - **Consecutive Failure Tracking**: Identify jobs currently failing - **Runner Health Monitoring**: Track runner failure rates and identify problematic infrastructure +- **Multi-Workflow Support**: Monitors PR Test (Nvidia), PR Test (AMD), and PR Test (Xeon) workflows +- **Queue Time Tracking**: Monitor average and P90 queue times per runner type - **Alert System**: Automatic alerts for consecutive failures and runner problems - **Instance Tracking**: Monitor specific runner instances for targeted remediation +- **Slack Notifications**: Send condensed alerts to Slack (top 3 jobs/runners by consecutive failures and failure rates) - **GitHub Integration**: Generate comprehensive summaries with actionable recommendations - **JSON Export**: Export detailed analysis data for further processing @@ -160,6 +163,33 @@ python ci_failures_analysis.py --token $GITHUB_TOKEN --limit 300 --threshold 2 python ci_failures_analysis.py --token $GITHUB_TOKEN --limit 500 --threshold 3 ``` +#### Monitored Workflows + +The Failures Analyzer monitors the following workflows: + +- **PR Test** - Nvidia GPU tests (self-hosted runners: 1-gpu-runner, 4-gpu-h100-runner, etc.) +- **PR Test (AMD)** - AMD GPU tests (AMD-specific runners) +- **PR Test (Xeon)** - Intel Xeon CPU tests (Xeon-specific runners) + +All three workflows are analyzed together, with runner statistics tracked separately by runner type. + +#### Slack Notifications + +The Failures Analyzer can send condensed alerts to Slack. See [SLACK_SETUP.md](SLACK_SETUP.md) for complete setup instructions. + +**What gets sent:** +- Top 3 jobs with consecutive failures +- Top 3 runners with consecutive failures +- Top 3 jobs with highest total failure rate +- Top 3 runners with highest total failure rate +- Queue time summary + +```bash +# Send Slack notification from analysis JSON +export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/YOUR/WEBHOOK/URL" +python slack_notifier.py --json ci_failure_analysis.json +``` + #### Understanding the Output The script generates a **2-section report**: @@ -170,6 +200,7 @@ The script generates a **2-section report**: **Section 2: Runner Health Analysis** - Shows which runners have high failure rates +- Includes queue time metrics (average and P90) - Helps identify infrastructure vs code issues #### Alert Types diff --git a/scripts/ci_monitor/ci_failures_analysis.py b/scripts/ci_monitor/ci_failures_analysis.py index 407bb869ef42..1ffb54d6bc39 100644 --- a/scripts/ci_monitor/ci_failures_analysis.py +++ b/scripts/ci_monitor/ci_failures_analysis.py @@ -44,12 +44,17 @@ def __init__(self, token: str, alert_threshold: int = 3): self.session.headers.update(self.headers) # Target workflows to monitor - self.target_workflows = ["PR Test"] + self.target_workflows = [ + "PR Test", # Nvidia GPU tests + "PR Test (AMD)", # AMD GPU tests + "PR Test (Xeon)", # Intel Xeon CPU tests + ] # Jobs to EXCLUDE from analysis (administrative/setup jobs, not actual tests) self.excluded_jobs = [ "check-changes", - "pr-test-finish", + "pr-test-finish", # Nvidia workflow teardown + "pr-test-amd-finish", # AMD workflow teardown ] def get_recent_runs(self, limit: int = 500) -> List[Dict]: @@ -111,21 +116,23 @@ def get_jobs_for_run(self, run_id: int) -> List[Dict]: def analyze_runner_health( self, runs: List[Dict] - ) -> Tuple[Dict[str, Dict], Dict[str, Dict]]: + ) -> Tuple[Dict[str, Dict], Dict[str, Dict], Dict[str, Dict], Dict[str, Dict]]: """ - Analyze runner health by tracking failures per runner. + Analyze runner health by tracking failures per runner and consecutive failure streaks. Returns: - Tuple of (runner_stats, runner_job_failures) + Tuple of (runner_stats, runner_instance_data, runner_streak_data, runner_instance_streak_data) - runner_stats: Overall stats per runner (failure rate, total jobs, etc.) - - runner_job_failures: Per-runner breakdown of which jobs failed + - runner_instance_data: Per-instance breakdown of failures + - runner_streak_data: Consecutive failure streaks per runner label + - runner_instance_streak_data: Consecutive failure streaks per runner instance """ - print("\nAnalyzing runner health...") + print("\nAnalyzing runner health and consecutive failures...") # Sort runs by created_at (oldest first) sorted_runs = sorted(runs, key=lambda x: x.get("created_at", "")) - # Track runner statistics + # Track runner statistics (overall) runner_total_jobs: Dict[str, int] = defaultdict(int) runner_failed_jobs: Dict[str, int] = defaultdict(int) runner_job_failures: Dict[str, Dict[str, int]] = defaultdict( @@ -135,11 +142,26 @@ def analyze_runner_health( lambda: defaultdict(int) ) + # Track queue times per runner + runner_queue_times: Dict[str, List[float]] = defaultdict(list) + # Track individual runner instances (runner_name + runner_id) runner_instance_stats: Dict[str, Dict] = defaultdict( lambda: {"total_jobs": 0, "failed_jobs": 0, "jobs_failed": defaultdict(int)} ) + # Track consecutive failures per runner (by labels) + runner_current_streak: Dict[str, int] = defaultdict(int) + runner_max_streak: Dict[str, int] = defaultdict(int) + runner_first_failure_in_streak: Dict[str, Optional[Dict]] = {} + runner_recovery_info: Dict[str, Optional[Dict]] = {} + + # Track consecutive failures per runner instance + runner_instance_current_streak: Dict[str, int] = defaultdict(int) + runner_instance_max_streak: Dict[str, int] = defaultdict(int) + runner_instance_first_failure: Dict[str, Optional[Dict]] = {} + runner_instance_recovery: Dict[str, Optional[Dict]] = {} + total_runs_processed = len(sorted_runs) for i, run in enumerate(sorted_runs, 1): if i % 50 == 0 or i == total_runs_processed: @@ -147,9 +169,30 @@ def analyze_runner_health( f"Processing run {i}/{total_runs_processed} for runner analysis: #{run.get('run_number')}" ) + run_info = { + "run_number": run.get("run_number"), + "run_id": run.get("id"), + "created_at": run.get("created_at"), + "head_sha": run.get("head_sha", "")[:8], + "author": run.get("head_commit", {}) + .get("author", {}) + .get("name", "Unknown"), + "url": f"https://github.com/{self.repo}/actions/runs/{run.get('id')}", + } + + pull_requests = run.get("pull_requests", []) + if pull_requests: + run_info["pr_number"] = pull_requests[0].get("number") + # Get jobs for this run jobs = self.get_jobs_for_run(run.get("id")) + # Track whether each runner had at least one failure in this run + runner_had_failure: Dict[str, bool] = defaultdict(bool) + runner_had_success: Dict[str, bool] = defaultdict(bool) + runner_instance_had_failure: Dict[str, bool] = defaultdict(bool) + runner_instance_had_success: Dict[str, bool] = defaultdict(bool) + for job in jobs: job_name = job.get("name", "") @@ -184,6 +227,20 @@ def analyze_runner_health( runner_total_jobs[runner_key] += 1 runner_job_totals[runner_key][job_name] += 1 + # Calculate queue time (time from created to started) + created_at = job.get("created_at") + started_at = job.get("started_at") + if created_at and started_at: + try: + from datetime import datetime + created_time = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + started_time = datetime.fromisoformat(started_at.replace('Z', '+00:00')) + queue_time_seconds = (started_time - created_time).total_seconds() + if queue_time_seconds >= 0: # Sanity check + runner_queue_times[runner_key].append(queue_time_seconds) + except (ValueError, AttributeError): + pass # Skip if timestamp parsing fails + # Track by specific runner instance if runner_id: runner_instance_key = f"{runner_labels_str}_{runner_id}" @@ -199,12 +256,83 @@ def analyze_runner_health( # Failure detected runner_failed_jobs[runner_key] += 1 runner_job_failures[runner_key][job_name] += 1 + runner_had_failure[runner_key] = True if runner_id: runner_instance_stats[runner_instance_key]["failed_jobs"] += 1 runner_instance_stats[runner_instance_key]["jobs_failed"][ job_name ] += 1 + runner_instance_had_failure[runner_instance_key] = True + + elif conclusion == "success": + runner_had_success[runner_key] = True + if runner_id: + runner_instance_had_success[runner_instance_key] = True + + # Update consecutive failure streaks based on run-level results + # A runner is considered "failing" if it had at least one failure in the run + for runner_key in set(list(runner_had_failure.keys()) + list(runner_had_success.keys())): + if runner_had_failure[runner_key]: + runner_current_streak[runner_key] += 1 + + # Track if this is the first failure in a new streak + if runner_current_streak[runner_key] == 1: + runner_first_failure_in_streak[runner_key] = { + **run_info, + "runner_key": runner_key, + } + + # Update max streak + if runner_current_streak[runner_key] > runner_max_streak[runner_key]: + runner_max_streak[runner_key] = runner_current_streak[runner_key] + + elif runner_had_success[runner_key]: + # Success - streak broken + if runner_current_streak[runner_key] > 0: + runner_recovery_info[runner_key] = { + **run_info, + "runner_key": runner_key, + "streak_length": runner_current_streak[runner_key], + } + + runner_current_streak[runner_key] = 0 + runner_first_failure_in_streak[runner_key] = None + + # Update instance streaks + for runner_instance_key in set( + list(runner_instance_had_failure.keys()) + + list(runner_instance_had_success.keys()) + ): + if runner_instance_had_failure[runner_instance_key]: + runner_instance_current_streak[runner_instance_key] += 1 + + if runner_instance_current_streak[runner_instance_key] == 1: + runner_instance_first_failure[runner_instance_key] = { + **run_info, + "runner_instance": runner_instance_key, + } + + if ( + runner_instance_current_streak[runner_instance_key] + > runner_instance_max_streak[runner_instance_key] + ): + runner_instance_max_streak[runner_instance_key] = ( + runner_instance_current_streak[runner_instance_key] + ) + + elif runner_instance_had_success[runner_instance_key]: + if runner_instance_current_streak[runner_instance_key] > 0: + runner_instance_recovery[runner_instance_key] = { + **run_info, + "runner_instance": runner_instance_key, + "streak_length": runner_instance_current_streak[ + runner_instance_key + ], + } + + runner_instance_current_streak[runner_instance_key] = 0 + runner_instance_first_failure[runner_instance_key] = None time.sleep(0.05) @@ -215,6 +343,15 @@ def analyze_runner_health( failed = runner_failed_jobs[runner_key] failure_rate = (failed / total * 100) if total > 0 else 0 + # Calculate queue time statistics + queue_times = runner_queue_times[runner_key] + avg_queue_time = sum(queue_times) / len(queue_times) if queue_times else 0 + p90_queue_time = 0 + if queue_times: + sorted_queue_times = sorted(queue_times) + p90_index = int(len(sorted_queue_times) * 0.9) + p90_queue_time = sorted_queue_times[p90_index] if p90_index < len(sorted_queue_times) else sorted_queue_times[-1] + runner_stats[runner_key] = { "total_jobs": total, "failed_jobs": failed, @@ -222,6 +359,9 @@ def analyze_runner_health( "unique_jobs_with_failures": len(runner_job_failures[runner_key]), "jobs_failed": dict(runner_job_failures[runner_key]), "jobs_total": dict(runner_job_totals[runner_key]), + "avg_queue_time_seconds": avg_queue_time, + "p90_queue_time_seconds": p90_queue_time, + "queue_time_samples": len(queue_times), } # Convert runner instance stats to regular dicts @@ -239,7 +379,46 @@ def analyze_runner_health( "runner_name": stats.get("runner_name", "unknown"), } - return runner_stats, runner_instance_data + # Build runner streak data + runner_streak_data = {} + for runner_key in runner_total_jobs.keys(): + runner_streak_data[runner_key] = { + "current_streak": runner_current_streak[runner_key], + "max_streak": runner_max_streak[runner_key], + "total_failures": runner_failed_jobs[runner_key], + "total_jobs": runner_total_jobs[runner_key], + "failure_rate": ( + runner_failed_jobs[runner_key] / runner_total_jobs[runner_key] * 100 + if runner_total_jobs[runner_key] > 0 + else 0 + ), + "jobs_failed": dict(runner_job_failures[runner_key]), + "first_failure_in_streak": runner_first_failure_in_streak.get(runner_key), + "recovery_info": runner_recovery_info.get(runner_key), + } + + # Build runner instance streak data + runner_instance_streak_data = {} + for instance_key in runner_instance_stats.keys(): + runner_instance_streak_data[instance_key] = { + "current_streak": runner_instance_current_streak[instance_key], + "max_streak": runner_instance_max_streak[instance_key], + "total_failures": runner_instance_stats[instance_key]["failed_jobs"], + "total_jobs": runner_instance_stats[instance_key]["total_jobs"], + "failure_rate": ( + runner_instance_stats[instance_key]["failed_jobs"] + / runner_instance_stats[instance_key]["total_jobs"] + * 100 + if runner_instance_stats[instance_key]["total_jobs"] > 0 + else 0 + ), + "runner_name": runner_instance_stats[instance_key].get("runner_name", "unknown"), + "jobs_failed": dict(runner_instance_stats[instance_key]["jobs_failed"]), + "first_failure_in_streak": runner_instance_first_failure.get(instance_key), + "recovery_info": runner_instance_recovery.get(instance_key), + } + + return runner_stats, runner_instance_data, runner_streak_data, runner_instance_streak_data def analyze_consecutive_failures( self, runs: List[Dict] @@ -256,7 +435,6 @@ def analyze_consecutive_failures( sorted_runs = sorted(runs, key=lambda x: x.get("created_at", "")) # Track current streak for each job - job_streaks: Dict[str, List[Dict]] = defaultdict(list) job_current_streak: Dict[str, int] = defaultdict(int) job_max_streak: Dict[str, int] = defaultdict(int) job_total_failures: Dict[str, int] = defaultdict(int) @@ -434,6 +612,8 @@ def detect_alerts( job_current_streaks: Dict[str, int], runner_stats: Optional[Dict[str, Dict]] = None, runner_instance_data: Optional[Dict[str, Dict]] = None, + runner_streak_data: Optional[Dict[str, Dict]] = None, + runner_instance_streak_data: Optional[Dict[str, Dict]] = None, ) -> Tuple[List[Dict], List[Dict]]: """ Detect jobs and runners that need alerts based on thresholds. @@ -462,6 +642,50 @@ def detect_alerts( # Detect runner alerts runner_alerts = [] + + # Alert for runners with consecutive failures + if runner_streak_data: + for runner_labels, streak_data in runner_streak_data.items(): + if streak_data["current_streak"] >= self.alert_threshold: + runner_alerts.append( + { + "runner_labels": runner_labels, + "current_streak": streak_data["current_streak"], + "max_streak": streak_data["max_streak"], + "failure_rate": streak_data["failure_rate"], + "total_failures": streak_data["total_failures"], + "total_jobs": streak_data["total_jobs"], + "jobs_failed": streak_data.get("jobs_failed", {}), + "first_failure": streak_data["first_failure_in_streak"], + "alert_type": "runner_consecutive_failures", + "severity": ( + "high" if streak_data["current_streak"] >= 5 else "medium" + ), + } + ) + + # Alert for runner instances with consecutive failures + if runner_instance_streak_data: + for instance_key, streak_data in runner_instance_streak_data.items(): + if streak_data["current_streak"] >= self.alert_threshold: + runner_alerts.append( + { + "runner_instance": instance_key, + "runner_name": streak_data.get("runner_name", "unknown"), + "current_streak": streak_data["current_streak"], + "max_streak": streak_data["max_streak"], + "failure_rate": streak_data["failure_rate"], + "total_failures": streak_data["total_failures"], + "total_jobs": streak_data["total_jobs"], + "jobs_failed": streak_data.get("jobs_failed", {}), + "first_failure": streak_data["first_failure_in_streak"], + "alert_type": "runner_instance_consecutive_failures", + "severity": ( + "high" if streak_data["current_streak"] >= 5 else "medium" + ), + } + ) + if runner_stats: # Alert if runner has high failure rate (>30%) and multiple jobs failing for runner_labels, stats in runner_stats.items(): @@ -513,6 +737,8 @@ def generate_failure_report( runner_stats: Optional[Dict[str, Dict]] = None, runner_instance_data: Optional[Dict[str, Dict]] = None, runner_alerts: Optional[List[Dict]] = None, + runner_streak_data: Optional[Dict[str, Dict]] = None, + runner_instance_streak_data: Optional[Dict[str, Dict]] = None, output_file: Optional[str] = None, ): """Generate detailed failure analysis report.""" @@ -543,6 +769,21 @@ def generate_failure_report( f"Runner alerts triggered: {len(runner_alerts) if runner_alerts else 0}" ) + # Calculate overall queue time statistics + all_avg_queue_times = [] + all_p90_queue_times = [] + for stats in runner_stats.values(): + if stats["queue_time_samples"] > 0: + all_avg_queue_times.append(stats["avg_queue_time_seconds"]) + all_p90_queue_times.append(stats["p90_queue_time_seconds"]) + + if all_avg_queue_times: + overall_avg = sum(all_avg_queue_times) / len(all_avg_queue_times) + overall_p90 = sum(all_p90_queue_times) / len(all_p90_queue_times) + print(f"\n--- Queue Time Summary ---") + print(f"Average queue time across all runners: {overall_avg / 60:.1f} minutes ({overall_avg:.0f}s)") + print(f"P90 queue time across all runners: {overall_p90 / 60:.1f} minutes ({overall_p90:.0f}s)") + # Section 1: Currently Broken Jobs (Consecutive Failures) - URGENT print("\n" + "=" * 100) print("SECTION 1: Currently Broken Jobs (Active Consecutive Failures)") @@ -587,68 +828,167 @@ def generate_failure_report( ) print(f" Link: {first['url']}") - # Section 3: Runner Health Analysis - if runner_stats: + # Section 2: Runner Health Analysis + if runner_stats and runner_streak_data: print("\n" + "=" * 100) print("SECTION 2: Runner Health Analysis") print("=" * 100) - # Sort runners by failure rate + # Combine stats with streak data and sort by consecutive failures first + combined_data = [] + for runner_labels, stats in runner_stats.items(): + streak_data = runner_streak_data.get(runner_labels, {}) + combined_data.append({ + "runner_labels": runner_labels, + "current_streak": streak_data.get("current_streak", 0), + "max_streak": streak_data.get("max_streak", 0), + "failure_rate": stats["failure_rate"], + "total_jobs": stats["total_jobs"], + "unique_jobs": stats["unique_jobs_with_failures"], + "avg_queue": stats["avg_queue_time_seconds"], + "p90_queue": stats["p90_queue_time_seconds"], + "queue_samples": stats["queue_time_samples"], + }) + + # Sort by current streak (descending), then max streak, then failure rate sorted_runners = sorted( - runner_stats.items(), - key=lambda x: (x[1]["failure_rate"], x[1]["failed_jobs"]), + combined_data, + key=lambda x: (x["current_streak"], x["max_streak"], x["failure_rate"]), reverse=True, ) - print(f"\nTop 15 Runners by Failure Rate:") - print("-" * 100) + print(f"\nTop 15 Runners by Consecutive Failures:") + print(" (High failure + Low unique jobs = Same job failing repeatedly → Likely job/test issue)") + print(" (High failure + High unique jobs = Many different jobs failing → Likely runner/infrastructure issue)") + print("-" * 160) print( - f"{'Rank':<4} {'Runner Labels':<45} {'Fail Rate':<12} {'Failed':<10} {'Total':<10} {'Unique Jobs':<12}" + f"{'Rank':<4} {'Runner Labels':<35} {'Streak':<8} {'Max':<6} {'Fail Rate':<10} {'Total':<7} {'Unique Jobs':<13} {'Avg Queue':<11} {'P90 Queue':<11}" ) - print("-" * 100) + print("-" * 160) - for i, (runner_labels, stats) in enumerate(sorted_runners[:15], 1): + for i, runner_data in enumerate(sorted_runners[:15], 1): # Truncate labels if too long for display display_labels = ( - runner_labels - if len(runner_labels) <= 43 - else runner_labels[:40] + "..." + runner_data["runner_labels"] + if len(runner_data["runner_labels"]) <= 33 + else runner_data["runner_labels"][:30] + "..." ) + + # Format streak + current_streak = runner_data["current_streak"] + streak_str = f"{current_streak}" if current_streak > 0 else "-" + + # Format max streak + max_streak = runner_data["max_streak"] + max_str = f"{max_streak}" if max_streak > 0 else "-" + + # Format queue times + avg_queue_str = f"{runner_data['avg_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" + p90_queue_str = f"{runner_data['p90_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" + print( - f"{i:<4} {display_labels:<45} {stats['failure_rate']:>10.1f}% " - f"{stats['failed_jobs']:<10} {stats['total_jobs']:<10} {stats['unique_jobs_with_failures']:<12}" + f"{i:<4} {display_labels:<35} {streak_str:<8} {max_str:<6} {runner_data['failure_rate']:>8.1f}% " + f"{runner_data['total_jobs']:<7} {runner_data['unique_jobs']:<13} " + f"{avg_queue_str:<11} {p90_queue_str:<11}" ) # Print runner alerts if runner_alerts: print("\n" + "!" * 40) - print("ALERTS: Runners with High Failure Rates") + print("ALERTS: Runners with Issues") print("!" * 40) + # Only show consecutive failure alerts + consecutive_alerts = [a for a in runner_alerts + if a["alert_type"] in ["runner_consecutive_failures", "runner_instance_consecutive_failures"]] + + if consecutive_alerts: + print("\n--- CONSECUTIVE FAILURE ALERTS ---") + print("(Runners that have failed in multiple consecutive workflow runs)") + print() + for alert in sorted( - runner_alerts, key=lambda x: x.get("failure_rate", 0), reverse=True + consecutive_alerts, + key=lambda x: (x.get("current_streak", 0), x.get("failure_rate", 0)), + reverse=True, ): - if alert["alert_type"] == "runner_health": + if alert["alert_type"] == "runner_consecutive_failures": print(f"\n Runner Labels: {alert['runner_labels']}") - print(f" Failure Rate: {alert['failure_rate']:.1f}%") print( - f" Failed Jobs: {alert['failed_jobs']} / {alert['total_jobs']}" + f" Current Streak: {alert['current_streak']} consecutive runs with failures" ) + print(f" Max Streak: {alert['max_streak']}") + print(f" Failure Rate: {alert['failure_rate']:.1f}%") print( - f" Unique Jobs with Failures: {alert['unique_jobs_with_failures']}" + f" Total Failures: {alert['total_failures']} / {alert['total_jobs']}" ) + + # Show jobs that failed on this runner type + jobs_failed = alert.get('jobs_failed', {}) + if jobs_failed: + print(f" Jobs That Failed:") + for job_name, count in sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True): + print(f" - {job_name}: {count} failure(s)") + print(f" Severity: {alert['severity'].upper()}") - elif alert["alert_type"] == "runner_instance_health": - print(f"\n Runner Instance: {alert['runner_instance']}") - print(f" Runner Name: {alert['runner_name']}") + if alert.get("first_failure"): + first = alert["first_failure"] + print( + f" First Failure in Streak: Run #{first['run_number']} ({first['created_at']})" + ) + print(f" Link: {first['url']}") + elif alert["alert_type"] == "runner_instance_consecutive_failures": + # Extract runner labels from instance key (format: "labels_id") + instance_key = alert['runner_instance'] + runner_labels = instance_key.rsplit('_', 1)[0] if '_' in instance_key else instance_key + runner_id = instance_key.rsplit('_', 1)[1] if '_' in instance_key else 'unknown' + + print(f"\n Runner Type: {runner_labels}") + print(f" Specific Instance ID: {runner_id}") + print(f" Machine Name: {alert['runner_name']}") + print( + f" Current Streak: {alert['current_streak']} consecutive runs with failures" + ) + print(f" Max Streak: {alert['max_streak']}") print(f" Failure Rate: {alert['failure_rate']:.1f}%") print( - f" Failed Jobs: {alert['failed_jobs']} / {alert['total_jobs']}" + f" Total Failures: {alert['total_failures']} / {alert['total_jobs']}" ) - print(f" Jobs Failed: {list(alert['jobs_failed'].keys())}") + + # Show jobs that failed on this runner instance + jobs_failed = alert.get('jobs_failed', {}) + if jobs_failed: + print(f" Jobs That Failed:") + for job_name, count in sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True): + print(f" - {job_name}: {count} failure(s)") + print(f" Severity: {alert['severity'].upper()}") + if alert.get("first_failure"): + first = alert["first_failure"] + print( + f" First Failure in Streak: Run #{first['run_number']} ({first['created_at']})" + ) + print(f" Link: {first['url']}") # Build report data (always needed for GitHub summary) + # Calculate overall queue time for summary + overall_avg_queue = 0 + overall_p90_queue = 0 + if runner_stats: + all_avg_queue_times = [ + stats["avg_queue_time_seconds"] + for stats in runner_stats.values() + if stats["queue_time_samples"] > 0 + ] + all_p90_queue_times = [ + stats["p90_queue_time_seconds"] + for stats in runner_stats.values() + if stats["queue_time_samples"] > 0 + ] + if all_avg_queue_times: + overall_avg_queue = sum(all_avg_queue_times) / len(all_avg_queue_times) + overall_p90_queue = sum(all_p90_queue_times) / len(all_p90_queue_times) + report_data = { "summary": { "total_jobs": len(sorted_jobs), @@ -660,6 +1000,8 @@ def generate_failure_report( "total_runners": len(runner_stats) if runner_stats else 0, "alert_threshold": self.alert_threshold, "analysis_timestamp": datetime.now().isoformat(), + "avg_queue_time_seconds": overall_avg_queue, + "p90_queue_time_seconds": overall_p90_queue, }, "job_streak_data": { job_name: { @@ -675,6 +1017,10 @@ def generate_failure_report( "runner_instance_data": ( runner_instance_data if runner_instance_data else {} ), + "runner_streak_data": runner_streak_data if runner_streak_data else {}, + "runner_instance_streak_data": ( + runner_instance_streak_data if runner_instance_streak_data else {} + ), "runner_alerts": runner_alerts if runner_alerts else [], } @@ -731,6 +1077,22 @@ def generate_github_summary(self, report_data: Dict): ) summary_lines.append("") + # Queue Time Summary + if report_data.get("summary", {}).get("avg_queue_time_seconds") is not None: + summary_lines.append("## Queue Time Summary") + summary_lines.append("") + summary_lines.append("| Metric | Value |") + summary_lines.append("|--------|-------|") + avg_queue = report_data['summary']['avg_queue_time_seconds'] + p90_queue = report_data['summary']['p90_queue_time_seconds'] + summary_lines.append( + f"| Average Queue Time (across all runners) | {avg_queue / 60:.1f} minutes ({avg_queue:.0f}s) |" + ) + summary_lines.append( + f"| P90 Queue Time (across all runners) | {p90_queue / 60:.1f} minutes ({p90_queue:.0f}s) |" + ) + summary_lines.append("") + # Job Alerts section if report_data.get("job_alerts"): summary_lines.append("## ALERTS: Critical Consecutive Job Failures") @@ -768,42 +1130,75 @@ def generate_github_summary(self, report_data: Dict): # Runner Alerts section if report_data.get("runner_alerts"): - summary_lines.append("## ALERTS: Runners with High Failure Rates") + summary_lines.append("## ALERTS: Runners with Issues") summary_lines.append("") - summary_lines.append( - "| Runner Labels | Failure Rate | Failed Jobs | Total Jobs | Unique Jobs Failed | Severity |" - ) - summary_lines.append( - "|---------------|--------------|-------------|------------|-------------------|----------|" - ) - for alert in sorted( - report_data["runner_alerts"], - key=lambda x: x.get("failure_rate", 0), - reverse=True, - ): - if alert["alert_type"] == "runner_health": - runner_labels = alert["runner_labels"] - if len(runner_labels) > 35: - runner_labels = runner_labels[:32] + "..." - - summary_lines.append( - f"| `{runner_labels}` | {alert['failure_rate']:.1f}% | {alert['failed_jobs']} | " - f"{alert['total_jobs']} | {alert['unique_jobs_with_failures']} | {alert['severity'].upper()} |" - ) - elif alert["alert_type"] == "runner_instance_health": - instance = alert["runner_instance"] - runner_name = alert["runner_name"] - if len(instance) > 35: - instance = instance[:32] + "..." - - summary_lines.append( - f"| `{instance}` | {alert['failure_rate']:.1f}% | {alert['failed_jobs']} | " - f"{alert['total_jobs']} | {len(alert['jobs_failed'])} | {alert['severity'].upper()} |" - ) - summary_lines.append(f"| (Runner: {runner_name}) | | | | | |") + # Only show consecutive failure alerts + consecutive_alerts = [a for a in report_data["runner_alerts"] + if a["alert_type"] in ["runner_consecutive_failures", "runner_instance_consecutive_failures"]] - summary_lines.append("") + if consecutive_alerts: + summary_lines.append("### Runner Consecutive Failures") + summary_lines.append("") + summary_lines.append( + "| Runner | Current Streak | Max Streak | Failure Rate | Jobs Failed | First Failure | Link |" + ) + summary_lines.append( + "|--------|----------------|------------|--------------|-------------|---------------|------|" + ) + + for alert in sorted( + consecutive_alerts, + key=lambda x: x.get("current_streak", 0), + reverse=True, + ): + if alert["alert_type"] == "runner_consecutive_failures": + runner_labels = alert["runner_labels"] + if len(runner_labels) > 35: + runner_labels = runner_labels[:32] + "..." + + # Get top 3 failed jobs + jobs_failed = alert.get("jobs_failed", {}) + top_jobs = sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True)[:3] + jobs_str = ", ".join([f"{job} ({count})" for job, count in top_jobs]) if top_jobs else "N/A" + + first_failure = alert.get("first_failure") + first_failure_str = ( + f"Run #{first_failure['run_number']}" + if first_failure + else "N/A" + ) + first_failure_link = first_failure["url"] if first_failure else "" + + summary_lines.append( + f"| `{runner_labels}` | {alert['current_streak']} | {alert['max_streak']} | " + f"{alert['failure_rate']:.1f}% | {jobs_str} | {first_failure_str} | [View]({first_failure_link}) |" + ) + elif alert["alert_type"] == "runner_instance_consecutive_failures": + instance = alert["runner_instance"] + if len(instance) > 35: + instance = instance[:32] + "..." + + # Get top 3 failed jobs + jobs_failed = alert.get("jobs_failed", {}) + top_jobs = sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True)[:3] + jobs_str = ", ".join([f"{job} ({count})" for job, count in top_jobs]) if top_jobs else "N/A" + + first_failure = alert.get("first_failure") + first_failure_str = ( + f"Run #{first_failure['run_number']}" + if first_failure + else "N/A" + ) + first_failure_link = first_failure["url"] if first_failure else "" + + summary_lines.append( + f"| `{instance}` (instance) | {alert['current_streak']} | {alert['max_streak']} | " + f"{alert['failure_rate']:.1f}% | {jobs_str} | {first_failure_str} | [View]({first_failure_link}) |" + ) + + summary_lines.append("") + summary_lines.append("") # Section 1: Currently Broken Jobs summary_lines.append( @@ -841,35 +1236,61 @@ def generate_github_summary(self, report_data: Dict): summary_lines.append("") # Section 2: Runner Health Analysis - if report_data.get("runner_stats"): + if report_data.get("runner_stats") and report_data.get("runner_streak_data"): summary_lines.append("## Section 2: Runner Health Analysis") summary_lines.append("") - # Sort runners by failure rate + # Combine stats with streak data and sort by consecutive failures first + combined_data = [] + for runner_labels, stats in report_data["runner_stats"].items(): + streak_data = report_data["runner_streak_data"].get(runner_labels, {}) + combined_data.append({ + "runner_labels": runner_labels, + "current_streak": streak_data.get("current_streak", 0), + "max_streak": streak_data.get("max_streak", 0), + "failure_rate": stats["failure_rate"], + "total_jobs": stats["total_jobs"], + "unique_jobs": stats["unique_jobs_with_failures"], + "avg_queue": stats["avg_queue_time_seconds"], + "p90_queue": stats["p90_queue_time_seconds"], + "queue_samples": stats.get("queue_time_samples", 0), + }) + + # Sort by current streak (descending), then max streak, then failure rate sorted_runners = sorted( - report_data["runner_stats"].items(), - key=lambda x: (x[1]["failure_rate"], x[1]["failed_jobs"]), + combined_data, + key=lambda x: (x["current_streak"], x["max_streak"], x["failure_rate"]), reverse=True, ) - summary_lines.append("### Top 15 Runners by Failure Rate") + summary_lines.append("### Top 15 Runners by Consecutive Failures") summary_lines.append("") summary_lines.append( - "| Rank | Runner Labels | Failure Rate | Failed Jobs | Total Jobs | Unique Jobs Failed |" + "| Rank | Runner Labels | Streak | Max | Fail Rate | Total | Unique Jobs | Avg Queue | P90 Queue |" ) summary_lines.append( - "|------|---------------|--------------|-------------|------------|--------------------|" + "|------|---------------|--------|-----|-----------|-------|-------------|-----------|-----------|" ) - for i, (runner_labels, stats) in enumerate(sorted_runners[:15], 1): + for i, runner_data in enumerate(sorted_runners[:15], 1): display_labels = ( - runner_labels - if len(runner_labels) <= 35 - else runner_labels[:32] + "..." + runner_data["runner_labels"] + if len(runner_data["runner_labels"]) <= 30 + else runner_data["runner_labels"][:27] + "..." ) + + # Format streaks + streak_str = str(runner_data["current_streak"]) if runner_data["current_streak"] > 0 else "-" + max_str = str(runner_data["max_streak"]) if runner_data["max_streak"] > 0 else "-" + + # Format queue times + avg_queue_str = f"{runner_data['avg_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" + p90_queue_str = f"{runner_data['p90_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" + summary_lines.append( - f"| {i} | `{display_labels}` | {stats['failure_rate']:.1f}% | " - f"{stats['failed_jobs']} | {stats['total_jobs']} | {stats['unique_jobs_with_failures']} |" + f"| {i} | `{display_labels}` | {streak_str} | {max_str} | {runner_data['failure_rate']:.1f}% | " + f"{runner_data['total_jobs']} | {runner_data['unique_jobs']} | " + f"{avg_queue_str} | {p90_queue_str} |" ) summary_lines.append("") @@ -893,8 +1314,8 @@ def main(): parser.add_argument( "--limit", type=int, - default=500, - help="Number of workflow runs to analyze (default: 500)", + default=1000, + help="Number of workflow runs to analyze across all monitored workflows (default: 1000)", ) parser.add_argument( "--threshold", @@ -929,17 +1350,25 @@ def main(): print("No job data found") return - # Aggregate matrix jobs (e.g., "job (0)", "job (1)" -> "job") - print("\nAggregating matrix jobs...") - job_streak_data = analyzer.aggregate_matrix_jobs(job_streak_data) - print(f"After aggregation: {len(job_streak_data)} unique jobs") + # Skip aggregation to show individual job shards + print(f"\nTotal jobs (including shards): {len(job_streak_data)}") - # Analyze runner health - runner_stats, runner_instance_data = analyzer.analyze_runner_health(runs) + # Analyze runner health and consecutive failures + ( + runner_stats, + runner_instance_data, + runner_streak_data, + runner_instance_streak_data, + ) = analyzer.analyze_runner_health(runs) # Detect alerts job_alerts, runner_alerts = analyzer.detect_alerts( - job_streak_data, job_current_streaks, runner_stats, runner_instance_data + job_streak_data, + job_current_streaks, + runner_stats, + runner_instance_data, + runner_streak_data, + runner_instance_streak_data, ) # Generate report @@ -949,6 +1378,8 @@ def main(): runner_stats, runner_instance_data, runner_alerts, + runner_streak_data, + runner_instance_streak_data, args.output, ) From ddaffc302b0d39c2dea2a87cf2fa70aad8dfbf5c Mon Sep 17 00:00:00 2001 From: dougyster Date: Mon, 17 Nov 2025 18:56:06 -0500 Subject: [PATCH 2/2] linting and remove dead code --- scripts/ci_monitor/ci_failures_analysis.py | 352 ++++++++++++--------- 1 file changed, 202 insertions(+), 150 deletions(-) diff --git a/scripts/ci_monitor/ci_failures_analysis.py b/scripts/ci_monitor/ci_failures_analysis.py index 1ffb54d6bc39..c6844bcb72c9 100644 --- a/scripts/ci_monitor/ci_failures_analysis.py +++ b/scripts/ci_monitor/ci_failures_analysis.py @@ -45,16 +45,16 @@ def __init__(self, token: str, alert_threshold: int = 3): # Target workflows to monitor self.target_workflows = [ - "PR Test", # Nvidia GPU tests - "PR Test (AMD)", # AMD GPU tests - "PR Test (Xeon)", # Intel Xeon CPU tests + "PR Test", # Nvidia GPU tests + "PR Test (AMD)", # AMD GPU tests + "PR Test (Xeon)", # Intel Xeon CPU tests ] # Jobs to EXCLUDE from analysis (administrative/setup jobs, not actual tests) self.excluded_jobs = [ "check-changes", - "pr-test-finish", # Nvidia workflow teardown - "pr-test-amd-finish", # AMD workflow teardown + "pr-test-finish", # Nvidia workflow teardown + "pr-test-amd-finish", # AMD workflow teardown ] def get_recent_runs(self, limit: int = 500) -> List[Dict]: @@ -233,9 +233,16 @@ def analyze_runner_health( if created_at and started_at: try: from datetime import datetime - created_time = datetime.fromisoformat(created_at.replace('Z', '+00:00')) - started_time = datetime.fromisoformat(started_at.replace('Z', '+00:00')) - queue_time_seconds = (started_time - created_time).total_seconds() + + created_time = datetime.fromisoformat( + created_at.replace("Z", "+00:00") + ) + started_time = datetime.fromisoformat( + started_at.replace("Z", "+00:00") + ) + queue_time_seconds = ( + started_time - created_time + ).total_seconds() if queue_time_seconds >= 0: # Sanity check runner_queue_times[runner_key].append(queue_time_seconds) except (ValueError, AttributeError): @@ -272,7 +279,9 @@ def analyze_runner_health( # Update consecutive failure streaks based on run-level results # A runner is considered "failing" if it had at least one failure in the run - for runner_key in set(list(runner_had_failure.keys()) + list(runner_had_success.keys())): + for runner_key in set( + list(runner_had_failure.keys()) + list(runner_had_success.keys()) + ): if runner_had_failure[runner_key]: runner_current_streak[runner_key] += 1 @@ -284,8 +293,13 @@ def analyze_runner_health( } # Update max streak - if runner_current_streak[runner_key] > runner_max_streak[runner_key]: - runner_max_streak[runner_key] = runner_current_streak[runner_key] + if ( + runner_current_streak[runner_key] + > runner_max_streak[runner_key] + ): + runner_max_streak[runner_key] = runner_current_streak[ + runner_key + ] elif runner_had_success[runner_key]: # Success - streak broken @@ -350,7 +364,11 @@ def analyze_runner_health( if queue_times: sorted_queue_times = sorted(queue_times) p90_index = int(len(sorted_queue_times) * 0.9) - p90_queue_time = sorted_queue_times[p90_index] if p90_index < len(sorted_queue_times) else sorted_queue_times[-1] + p90_queue_time = ( + sorted_queue_times[p90_index] + if p90_index < len(sorted_queue_times) + else sorted_queue_times[-1] + ) runner_stats[runner_key] = { "total_jobs": total, @@ -393,7 +411,9 @@ def analyze_runner_health( else 0 ), "jobs_failed": dict(runner_job_failures[runner_key]), - "first_failure_in_streak": runner_first_failure_in_streak.get(runner_key), + "first_failure_in_streak": runner_first_failure_in_streak.get( + runner_key + ), "recovery_info": runner_recovery_info.get(runner_key), } @@ -412,13 +432,22 @@ def analyze_runner_health( if runner_instance_stats[instance_key]["total_jobs"] > 0 else 0 ), - "runner_name": runner_instance_stats[instance_key].get("runner_name", "unknown"), + "runner_name": runner_instance_stats[instance_key].get( + "runner_name", "unknown" + ), "jobs_failed": dict(runner_instance_stats[instance_key]["jobs_failed"]), - "first_failure_in_streak": runner_instance_first_failure.get(instance_key), + "first_failure_in_streak": runner_instance_first_failure.get( + instance_key + ), "recovery_info": runner_instance_recovery.get(instance_key), } - return runner_stats, runner_instance_data, runner_streak_data, runner_instance_streak_data + return ( + runner_stats, + runner_instance_data, + runner_streak_data, + runner_instance_streak_data, + ) def analyze_consecutive_failures( self, runs: List[Dict] @@ -530,82 +559,6 @@ def analyze_consecutive_failures( return job_streak_data, job_current_streak - def aggregate_matrix_jobs( - self, job_streak_data: Dict[str, Dict] - ) -> Dict[str, Dict]: - """ - Aggregate matrix jobs (e.g., 'job-name (0)', 'job-name (1)') into a single entry. - - Returns: - Dictionary with aggregated job data - """ - import re - - # Identify base job names (strip matrix suffix like " (0)", " (1)") - base_jobs: Dict[str, List[Tuple[str, Dict]]] = defaultdict(list) - - for job_name, data in job_streak_data.items(): - # Match pattern like "job-name (0)" or "job-name (1)" - match = re.match(r"^(.+?)\s*\((\d+)\)$", job_name) - if match: - base_name = match.group(1) - base_jobs[base_name].append((job_name, data)) - else: - # Not a matrix job, keep as-is - base_jobs[job_name].append((job_name, data)) - - # Aggregate stats for matrix jobs - aggregated_data = {} - - for base_name, job_list in base_jobs.items(): - if len(job_list) == 1: - # Single job, no aggregation needed - job_name, data = job_list[0] - aggregated_data[job_name] = data - else: - # Multiple matrix jobs - aggregate them - total_runs = sum(data["total_runs"] for _, data in job_list) - total_failures = sum(data["total_failures"] for _, data in job_list) - - # Current streak: take the max across all matrix jobs - # (if any partition is broken, the whole job is considered broken) - current_streak = max(data["current_streak"] for _, data in job_list) - max_streak = max(data["max_streak"] for _, data in job_list) - - # Get the first failure from the job with the longest current streak - first_failure_in_streak = None - for _, data in job_list: - if ( - data["current_streak"] == current_streak - and data["first_failure_in_streak"] - ): - first_failure_in_streak = data["first_failure_in_streak"] - break - - # Recovery info from most recent recovery - recovery_info = None - for _, data in job_list: - if data["recovery_info"]: - recovery_info = data["recovery_info"] - break - - aggregated_data[base_name] = { - "current_streak": current_streak, - "max_streak": max_streak, - "total_failures": total_failures, - "total_runs": total_runs, - "failure_rate": ( - (total_failures / total_runs * 100) if total_runs > 0 else 0 - ), - "first_failure_in_streak": first_failure_in_streak, - "recovery_info": recovery_info, - "is_aggregated": True, - "partition_count": len(job_list), - "partitions": [job_name for job_name, _ in job_list], - } - - return aggregated_data - def detect_alerts( self, job_streak_data: Dict[str, Dict], @@ -659,7 +612,9 @@ def detect_alerts( "first_failure": streak_data["first_failure_in_streak"], "alert_type": "runner_consecutive_failures", "severity": ( - "high" if streak_data["current_streak"] >= 5 else "medium" + "high" + if streak_data["current_streak"] >= 5 + else "medium" ), } ) @@ -681,7 +636,9 @@ def detect_alerts( "first_failure": streak_data["first_failure_in_streak"], "alert_type": "runner_instance_consecutive_failures", "severity": ( - "high" if streak_data["current_streak"] >= 5 else "medium" + "high" + if streak_data["current_streak"] >= 5 + else "medium" ), } ) @@ -781,8 +738,12 @@ def generate_failure_report( overall_avg = sum(all_avg_queue_times) / len(all_avg_queue_times) overall_p90 = sum(all_p90_queue_times) / len(all_p90_queue_times) print(f"\n--- Queue Time Summary ---") - print(f"Average queue time across all runners: {overall_avg / 60:.1f} minutes ({overall_avg:.0f}s)") - print(f"P90 queue time across all runners: {overall_p90 / 60:.1f} minutes ({overall_p90:.0f}s)") + print( + f"Average queue time across all runners: {overall_avg / 60:.1f} minutes ({overall_avg:.0f}s)" + ) + print( + f"P90 queue time across all runners: {overall_p90 / 60:.1f} minutes ({overall_p90:.0f}s)" + ) # Section 1: Currently Broken Jobs (Consecutive Failures) - URGENT print("\n" + "=" * 100) @@ -838,17 +799,19 @@ def generate_failure_report( combined_data = [] for runner_labels, stats in runner_stats.items(): streak_data = runner_streak_data.get(runner_labels, {}) - combined_data.append({ - "runner_labels": runner_labels, - "current_streak": streak_data.get("current_streak", 0), - "max_streak": streak_data.get("max_streak", 0), - "failure_rate": stats["failure_rate"], - "total_jobs": stats["total_jobs"], - "unique_jobs": stats["unique_jobs_with_failures"], - "avg_queue": stats["avg_queue_time_seconds"], - "p90_queue": stats["p90_queue_time_seconds"], - "queue_samples": stats["queue_time_samples"], - }) + combined_data.append( + { + "runner_labels": runner_labels, + "current_streak": streak_data.get("current_streak", 0), + "max_streak": streak_data.get("max_streak", 0), + "failure_rate": stats["failure_rate"], + "total_jobs": stats["total_jobs"], + "unique_jobs": stats["unique_jobs_with_failures"], + "avg_queue": stats["avg_queue_time_seconds"], + "p90_queue": stats["p90_queue_time_seconds"], + "queue_samples": stats["queue_time_samples"], + } + ) # Sort by current streak (descending), then max streak, then failure rate sorted_runners = sorted( @@ -858,8 +821,12 @@ def generate_failure_report( ) print(f"\nTop 15 Runners by Consecutive Failures:") - print(" (High failure + Low unique jobs = Same job failing repeatedly → Likely job/test issue)") - print(" (High failure + High unique jobs = Many different jobs failing → Likely runner/infrastructure issue)") + print( + " (High failure + Low unique jobs = Same job failing repeatedly → Likely job/test issue)" + ) + print( + " (High failure + High unique jobs = Many different jobs failing → Likely runner/infrastructure issue)" + ) print("-" * 160) print( f"{'Rank':<4} {'Runner Labels':<35} {'Streak':<8} {'Max':<6} {'Fail Rate':<10} {'Total':<7} {'Unique Jobs':<13} {'Avg Queue':<11} {'P90 Queue':<11}" @@ -883,8 +850,16 @@ def generate_failure_report( max_str = f"{max_streak}" if max_streak > 0 else "-" # Format queue times - avg_queue_str = f"{runner_data['avg_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" - p90_queue_str = f"{runner_data['p90_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" + avg_queue_str = ( + f"{runner_data['avg_queue'] / 60:.1f}m" + if runner_data["queue_samples"] > 0 + else "N/A" + ) + p90_queue_str = ( + f"{runner_data['p90_queue'] / 60:.1f}m" + if runner_data["queue_samples"] > 0 + else "N/A" + ) print( f"{i:<4} {display_labels:<35} {streak_str:<8} {max_str:<6} {runner_data['failure_rate']:>8.1f}% " @@ -899,12 +874,21 @@ def generate_failure_report( print("!" * 40) # Only show consecutive failure alerts - consecutive_alerts = [a for a in runner_alerts - if a["alert_type"] in ["runner_consecutive_failures", "runner_instance_consecutive_failures"]] + consecutive_alerts = [ + a + for a in runner_alerts + if a["alert_type"] + in [ + "runner_consecutive_failures", + "runner_instance_consecutive_failures", + ] + ] if consecutive_alerts: print("\n--- CONSECUTIVE FAILURE ALERTS ---") - print("(Runners that have failed in multiple consecutive workflow runs)") + print( + "(Runners that have failed in multiple consecutive workflow runs)" + ) print() for alert in sorted( @@ -924,10 +908,12 @@ def generate_failure_report( ) # Show jobs that failed on this runner type - jobs_failed = alert.get('jobs_failed', {}) + jobs_failed = alert.get("jobs_failed", {}) if jobs_failed: print(f" Jobs That Failed:") - for job_name, count in sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True): + for job_name, count in sorted( + jobs_failed.items(), key=lambda x: x[1], reverse=True + ): print(f" - {job_name}: {count} failure(s)") print(f" Severity: {alert['severity'].upper()}") @@ -939,9 +925,17 @@ def generate_failure_report( print(f" Link: {first['url']}") elif alert["alert_type"] == "runner_instance_consecutive_failures": # Extract runner labels from instance key (format: "labels_id") - instance_key = alert['runner_instance'] - runner_labels = instance_key.rsplit('_', 1)[0] if '_' in instance_key else instance_key - runner_id = instance_key.rsplit('_', 1)[1] if '_' in instance_key else 'unknown' + instance_key = alert["runner_instance"] + runner_labels = ( + instance_key.rsplit("_", 1)[0] + if "_" in instance_key + else instance_key + ) + runner_id = ( + instance_key.rsplit("_", 1)[1] + if "_" in instance_key + else "unknown" + ) print(f"\n Runner Type: {runner_labels}") print(f" Specific Instance ID: {runner_id}") @@ -956,10 +950,12 @@ def generate_failure_report( ) # Show jobs that failed on this runner instance - jobs_failed = alert.get('jobs_failed', {}) + jobs_failed = alert.get("jobs_failed", {}) if jobs_failed: print(f" Jobs That Failed:") - for job_name, count in sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True): + for job_name, count in sorted( + jobs_failed.items(), key=lambda x: x[1], reverse=True + ): print(f" - {job_name}: {count} failure(s)") print(f" Severity: {alert['severity'].upper()}") @@ -1083,8 +1079,8 @@ def generate_github_summary(self, report_data: Dict): summary_lines.append("") summary_lines.append("| Metric | Value |") summary_lines.append("|--------|-------|") - avg_queue = report_data['summary']['avg_queue_time_seconds'] - p90_queue = report_data['summary']['p90_queue_time_seconds'] + avg_queue = report_data["summary"]["avg_queue_time_seconds"] + p90_queue = report_data["summary"]["p90_queue_time_seconds"] summary_lines.append( f"| Average Queue Time (across all runners) | {avg_queue / 60:.1f} minutes ({avg_queue:.0f}s) |" ) @@ -1134,8 +1130,15 @@ def generate_github_summary(self, report_data: Dict): summary_lines.append("") # Only show consecutive failure alerts - consecutive_alerts = [a for a in report_data["runner_alerts"] - if a["alert_type"] in ["runner_consecutive_failures", "runner_instance_consecutive_failures"]] + consecutive_alerts = [ + a + for a in report_data["runner_alerts"] + if a["alert_type"] + in [ + "runner_consecutive_failures", + "runner_instance_consecutive_failures", + ] + ] if consecutive_alerts: summary_lines.append("### Runner Consecutive Failures") @@ -1159,8 +1162,16 @@ def generate_github_summary(self, report_data: Dict): # Get top 3 failed jobs jobs_failed = alert.get("jobs_failed", {}) - top_jobs = sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True)[:3] - jobs_str = ", ".join([f"{job} ({count})" for job, count in top_jobs]) if top_jobs else "N/A" + top_jobs = sorted( + jobs_failed.items(), key=lambda x: x[1], reverse=True + )[:3] + jobs_str = ( + ", ".join( + [f"{job} ({count})" for job, count in top_jobs] + ) + if top_jobs + else "N/A" + ) first_failure = alert.get("first_failure") first_failure_str = ( @@ -1168,21 +1179,34 @@ def generate_github_summary(self, report_data: Dict): if first_failure else "N/A" ) - first_failure_link = first_failure["url"] if first_failure else "" + first_failure_link = ( + first_failure["url"] if first_failure else "" + ) summary_lines.append( f"| `{runner_labels}` | {alert['current_streak']} | {alert['max_streak']} | " f"{alert['failure_rate']:.1f}% | {jobs_str} | {first_failure_str} | [View]({first_failure_link}) |" ) - elif alert["alert_type"] == "runner_instance_consecutive_failures": + elif ( + alert["alert_type"] + == "runner_instance_consecutive_failures" + ): instance = alert["runner_instance"] if len(instance) > 35: instance = instance[:32] + "..." # Get top 3 failed jobs jobs_failed = alert.get("jobs_failed", {}) - top_jobs = sorted(jobs_failed.items(), key=lambda x: x[1], reverse=True)[:3] - jobs_str = ", ".join([f"{job} ({count})" for job, count in top_jobs]) if top_jobs else "N/A" + top_jobs = sorted( + jobs_failed.items(), key=lambda x: x[1], reverse=True + )[:3] + jobs_str = ( + ", ".join( + [f"{job} ({count})" for job, count in top_jobs] + ) + if top_jobs + else "N/A" + ) first_failure = alert.get("first_failure") first_failure_str = ( @@ -1190,7 +1214,9 @@ def generate_github_summary(self, report_data: Dict): if first_failure else "N/A" ) - first_failure_link = first_failure["url"] if first_failure else "" + first_failure_link = ( + first_failure["url"] if first_failure else "" + ) summary_lines.append( f"| `{instance}` (instance) | {alert['current_streak']} | {alert['max_streak']} | " @@ -1236,30 +1262,40 @@ def generate_github_summary(self, report_data: Dict): summary_lines.append("") # Section 2: Runner Health Analysis - if report_data.get("runner_stats") and report_data.get("runner_streak_data"): + if report_data.get("runner_stats") and report_data.get( + "runner_streak_data" + ): summary_lines.append("## Section 2: Runner Health Analysis") summary_lines.append("") # Combine stats with streak data and sort by consecutive failures first combined_data = [] for runner_labels, stats in report_data["runner_stats"].items(): - streak_data = report_data["runner_streak_data"].get(runner_labels, {}) - combined_data.append({ - "runner_labels": runner_labels, - "current_streak": streak_data.get("current_streak", 0), - "max_streak": streak_data.get("max_streak", 0), - "failure_rate": stats["failure_rate"], - "total_jobs": stats["total_jobs"], - "unique_jobs": stats["unique_jobs_with_failures"], - "avg_queue": stats["avg_queue_time_seconds"], - "p90_queue": stats["p90_queue_time_seconds"], - "queue_samples": stats.get("queue_time_samples", 0), - }) + streak_data = report_data["runner_streak_data"].get( + runner_labels, {} + ) + combined_data.append( + { + "runner_labels": runner_labels, + "current_streak": streak_data.get("current_streak", 0), + "max_streak": streak_data.get("max_streak", 0), + "failure_rate": stats["failure_rate"], + "total_jobs": stats["total_jobs"], + "unique_jobs": stats["unique_jobs_with_failures"], + "avg_queue": stats["avg_queue_time_seconds"], + "p90_queue": stats["p90_queue_time_seconds"], + "queue_samples": stats.get("queue_time_samples", 0), + } + ) # Sort by current streak (descending), then max streak, then failure rate sorted_runners = sorted( combined_data, - key=lambda x: (x["current_streak"], x["max_streak"], x["failure_rate"]), + key=lambda x: ( + x["current_streak"], + x["max_streak"], + x["failure_rate"], + ), reverse=True, ) @@ -1280,12 +1316,28 @@ def generate_github_summary(self, report_data: Dict): ) # Format streaks - streak_str = str(runner_data["current_streak"]) if runner_data["current_streak"] > 0 else "-" - max_str = str(runner_data["max_streak"]) if runner_data["max_streak"] > 0 else "-" + streak_str = ( + str(runner_data["current_streak"]) + if runner_data["current_streak"] > 0 + else "-" + ) + max_str = ( + str(runner_data["max_streak"]) + if runner_data["max_streak"] > 0 + else "-" + ) # Format queue times - avg_queue_str = f"{runner_data['avg_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" - p90_queue_str = f"{runner_data['p90_queue'] / 60:.1f}m" if runner_data['queue_samples'] > 0 else "N/A" + avg_queue_str = ( + f"{runner_data['avg_queue'] / 60:.1f}m" + if runner_data["queue_samples"] > 0 + else "N/A" + ) + p90_queue_str = ( + f"{runner_data['p90_queue'] / 60:.1f}m" + if runner_data["queue_samples"] > 0 + else "N/A" + ) summary_lines.append( f"| {i} | `{display_labels}` | {streak_str} | {max_str} | {runner_data['failure_rate']:.1f}% | "