diff --git a/ci3/ci-metrics/app.py b/ci3/ci-metrics/app.py
index c62875e7d19a..2e8cfcf21c7a 100644
--- a/ci3/ci-metrics/app.py
+++ b/ci3/ci-metrics/app.py
@@ -37,7 +37,7 @@ def verify_password(username, password):
def _init():
- """Initialize SQLite and start background threads."""
+ """Initialize SQLite, warm caches, and start background threads."""
try:
db.get_db()
metrics.start_test_listener(r)
@@ -45,6 +45,18 @@ def _init():
print("[ci-metrics] Background threads started")
except Exception as e:
print(f"[ci-metrics] Warning: startup failed: {e}")
+ # Warm billing caches so first request isn't slow
+ try:
+ from billing.gcp import _ensure_cached as _warm_gcp
+ _warm_gcp()
+ print("[ci-metrics] GCP billing cache warmed")
+ except Exception as e:
+ print(f"[ci-metrics] GCP billing warmup failed: {e}")
+ try:
+ billing_aws.get_costs_overview()
+ print("[ci-metrics] AWS costs cache warmed")
+ except Exception as e:
+ print(f"[ci-metrics] AWS costs warmup failed: {e}")
threading.Thread(target=_init, daemon=True, name='metrics-init').start()
@@ -166,7 +178,7 @@ def api_ci_runs():
ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000) if date_from else None
ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000) if date_to else None
- runs = metrics.get_ci_runs(r, ts_from, ts_to)
+ runs = metrics.get_ci_runs(ts_from, ts_to)
if status_filter:
runs = [run for run in runs if run.get('status') == status_filter]
@@ -185,7 +197,7 @@ def api_ci_runs():
@auth.login_required
def api_ci_stats():
ts_from = int((datetime.now() - timedelta(days=7)).timestamp() * 1000)
- runs = metrics.get_ci_runs(r, ts_from)
+ runs = metrics.get_ci_runs(ts_from)
total = len(runs)
passed = sum(1 for run in runs if run.get('status') == 'PASSED')
@@ -233,6 +245,7 @@ def api_costs_overview():
buckets[key]['aws_total'] += entry.get('aws_total', 0)
buckets[key]['gcp_total'] += entry.get('gcp_total', 0)
result['by_date'] = sorted(buckets.values(), key=lambda x: x['date'])
+ result['period'] = {'from': date_from, 'to': date_to}
return _json(result)
@@ -287,7 +300,7 @@ def api_costs_attribution():
ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000)
ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000)
- runs = metrics.get_ci_runs(r, ts_from, ts_to)
+ runs = metrics.get_ci_runs(ts_from, ts_to)
runs_with_cost = [run for run in runs if run.get('cost_usd') is not None]
# Enrich merge queue runs with PR author from GitHub
@@ -405,6 +418,7 @@ def api_costs_attribution():
'by_date': by_date_list,
'run_types': all_types,
'instances': instances[:500],
+ 'period': {'from': date_from, 'to': date_to},
'totals': {'aws': round(total_aws, 2), 'gcp': round(gcp_total, 2),
'gcp_unattributed': round(gcp_total, 2),
'combined': round(total_aws + gcp_total, 2)},
@@ -421,7 +435,7 @@ def api_costs_runners():
ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000)
ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000)
- runs = metrics.get_ci_runs(r, ts_from, ts_to)
+ runs = metrics.get_ci_runs(ts_from, ts_to)
runs_with_cost = [run for run in runs if run.get('cost_usd') is not None]
if dashboard:
runs_with_cost = [run for run in runs_with_cost if run.get('dashboard') == dashboard]
@@ -475,6 +489,7 @@ def api_costs_runners():
'by_date': by_date,
'by_instance_type': by_instance,
'by_dashboard': by_dashboard,
+ 'period': {'from': date_from, 'to': date_to},
'summary': {
'total_cost': round(total_cost, 2),
'spot_pct': round(100.0 * spot_cost / max(total_cost, 0.01), 1),
@@ -496,7 +511,7 @@ def api_ci_performance():
ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000)
ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000)
- runs = metrics.get_ci_runs(r, ts_from, ts_to)
+ runs = metrics.get_ci_runs(ts_from, ts_to)
runs = [run for run in runs if run.get('status') in ('PASSED', 'FAILED')]
if dashboard:
runs = [run for run in runs if run.get('dashboard') == dashboard]
@@ -529,46 +544,34 @@ def api_ci_performance():
'avg_duration_mins': round(sum(d['durations']) / len(d['durations']), 1) if d['durations'] else None,
})
+ # Merge test outcome counts from test_daily_stats before aggregation
+ ds_conditions = ['date >= ?', 'date <= ?']
+ ds_params = [date_from, date_to]
+ if dashboard:
+ ds_conditions.append('dashboard = ?')
+ ds_params.append(dashboard)
+ ds_where = 'WHERE ' + ' AND '.join(ds_conditions)
+
+ daily_test_counts = db.query(f'''
+ SELECT date, SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked
+ FROM test_daily_stats {ds_where}
+ GROUP BY date
+ ''', ds_params)
+ daily_test_map = {r['date']: r for r in daily_test_counts}
+ for d in by_date:
+ tc = daily_test_map.get(d['date'], {})
+ d['flake_count'] = tc.get('flaked', 0) or 0
+ d['test_failure_count'] = tc.get('failed', 0) or 0
+ d['test_success_count'] = tc.get('passed', 0) or 0
+
by_date = _aggregate_dates(by_date, granularity,
- sum_fields=['total', 'passed', 'failed'],
+ sum_fields=['total', 'passed', 'failed',
+ 'flake_count', 'test_failure_count', 'test_success_count'],
avg_fields=['avg_duration_mins'])
for d in by_date:
d['pass_rate'] = round(100.0 * d['passed'] / max(d['total'], 1), 1)
d['failure_rate'] = round(100.0 * d['failed'] / max(d['total'], 1), 1)
- # Daily flake/failure counts from test_events
- if dashboard:
- flake_daily = db.query('''
- SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count
- FROM test_events WHERE status = 'flaked' AND dashboard = ?
- AND timestamp >= ? AND timestamp < ?
- GROUP BY substr(timestamp, 1, 10)
- ''', (dashboard, date_from, date_to + 'T23:59:59'))
- fail_test_daily = db.query('''
- SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count
- FROM test_events WHERE status = 'failed' AND dashboard = ?
- AND timestamp >= ? AND timestamp < ?
- GROUP BY substr(timestamp, 1, 10)
- ''', (dashboard, date_from, date_to + 'T23:59:59'))
- else:
- flake_daily = db.query('''
- SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count
- FROM test_events WHERE status = 'flaked'
- AND timestamp >= ? AND timestamp < ?
- GROUP BY substr(timestamp, 1, 10)
- ''', (date_from, date_to + 'T23:59:59'))
- fail_test_daily = db.query('''
- SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count
- FROM test_events WHERE status = 'failed'
- AND timestamp >= ? AND timestamp < ?
- GROUP BY substr(timestamp, 1, 10)
- ''', (date_from, date_to + 'T23:59:59'))
- flake_daily_map = {r['date']: r['count'] for r in flake_daily}
- fail_test_daily_map = {r['date']: r['count'] for r in fail_test_daily}
- for d in by_date:
- d['flake_count'] = flake_daily_map.get(d['date'], 0)
- d['test_failure_count'] = fail_test_daily_map.get(d['date'], 0)
-
# Top flakes/failures
if dashboard:
top_flakes = db.query('''
@@ -606,38 +609,22 @@ def api_ci_performance():
if complete and ts:
durations.append((complete - ts) / 60000.0)
- if dashboard:
- flake_count = db.query('''
- SELECT COUNT(*) as c FROM test_events WHERE status='flaked' AND dashboard = ?
- AND timestamp >= ? AND timestamp <= ?
- ''', (dashboard, date_from, date_to + 'T23:59:59'))
- total_tests = db.query('''
- SELECT COUNT(*) as c FROM test_events WHERE status IN ('failed','flaked') AND dashboard = ?
- AND timestamp >= ? AND timestamp <= ?
- ''', (dashboard, date_from, date_to + 'T23:59:59'))
- total_failures_count = db.query('''
- SELECT COUNT(*) as c FROM test_events WHERE status='failed' AND dashboard = ?
- AND timestamp >= ? AND timestamp <= ?
- ''', (dashboard, date_from, date_to + 'T23:59:59'))
- else:
- flake_count = db.query('''
- SELECT COUNT(*) as c FROM test_events WHERE status='flaked' AND timestamp >= ? AND timestamp <= ?
- ''', (date_from, date_to + 'T23:59:59'))
- total_tests = db.query('''
- SELECT COUNT(*) as c FROM test_events WHERE status IN ('failed','flaked') AND timestamp >= ? AND timestamp <= ?
- ''', (date_from, date_to + 'T23:59:59'))
- total_failures_count = db.query('''
- SELECT COUNT(*) as c FROM test_events WHERE status='failed' AND timestamp >= ? AND timestamp <= ?
- ''', (date_from, date_to + 'T23:59:59'))
-
- fc = flake_count[0]['c'] if flake_count else 0
- tc = total_tests[0]['c'] if total_tests else 0
- tfc = total_failures_count[0]['c'] if total_failures_count else 0
+ # Test outcome summary from test_daily_stats
+ ds_summary = db.query(f'''
+ SELECT SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked
+ FROM test_daily_stats {ds_where}
+ ''', ds_params)
+ ds_s = ds_summary[0] if ds_summary else {}
+ fc = ds_s.get('flaked', 0) or 0
+ tfc = ds_s.get('failed', 0) or 0
+ tpc = ds_s.get('passed', 0) or 0
+ tc = fc + tfc + tpc
return _json({
'by_date': by_date,
'top_flakes': top_flakes,
'top_failures': top_failures,
+ 'period': {'from': date_from, 'to': date_to},
'summary': {
'total_runs': total,
'pass_rate': round(100.0 * passed / max(total, 1), 1),
@@ -646,6 +633,7 @@ def api_ci_performance():
'flake_rate': round(100.0 * fc / max(tc, 1), 1) if tc else 0,
'total_flakes': fc,
'total_test_failures': tfc,
+ 'total_test_successes': tpc,
},
})
@@ -684,7 +672,7 @@ def api_pr_metrics():
author = request.args.get('author', '')
ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000)
ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000)
- ci_runs = metrics.get_ci_runs(r, ts_from, ts_to)
+ ci_runs = metrics.get_ci_runs(ts_from, ts_to)
return _json(github_data.get_pr_metrics(date_from, date_to, author, ci_runs))
@@ -734,55 +722,110 @@ def api_test_timings():
where = 'WHERE ' + ' AND '.join(conditions)
- # Per-test stats
+ # Per-test timing from test_events (failed/flaked only — passed not persisted)
by_test = db.query(f'''
SELECT test_cmd,
- COUNT(*) as count,
+ COUNT(*) as event_count,
ROUND(AVG(duration_secs), 1) as avg_secs,
ROUND(MIN(duration_secs), 1) as min_secs,
ROUND(MAX(duration_secs), 1) as max_secs,
- SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
- SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as flaked,
dashboard
FROM test_events {where}
GROUP BY test_cmd
- ORDER BY count DESC
+ ORDER BY event_count DESC
LIMIT 200
''', params)
- # Add pass rate
- for row in by_test:
- total = row['passed'] + row['failed'] + row['flaked']
- row['pass_rate'] = round(100.0 * row['passed'] / max(total, 1), 1)
- row['total_time_secs'] = round(row['avg_secs'] * row['count'], 0)
+ # Per-test counts from daily stats (includes passed)
+ ds_conditions = ['date >= ?', 'date <= ?']
+ ds_params = [date_from, date_to]
+ if dashboard:
+ ds_conditions.append('dashboard = ?')
+ ds_params.append(dashboard)
+ if test_cmd:
+ ds_conditions.append('test_cmd = ?')
+ ds_params.append(test_cmd)
+ ds_where = 'WHERE ' + ' AND '.join(ds_conditions)
+
+ daily_stats_by_test = {r['test_cmd']: r for r in db.query(f'''
+ SELECT test_cmd,
+ SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked
+ FROM test_daily_stats {ds_where}
+ GROUP BY test_cmd
+ ''', ds_params)}
- # Daily time series (aggregate across all tests or filtered test)
+ # Merge counts into timing data
+ for row in by_test:
+ ds = daily_stats_by_test.get(row['test_cmd'], {})
+ row['passed'] = ds.get('passed', 0) or 0
+ row['failed'] = ds.get('failed', 0) or row['event_count']
+ row['flaked'] = ds.get('flaked', 0) or 0
+ row['count'] = row['passed'] + row['failed'] + row['flaked']
+ total = max(row['count'], 1)
+ row['pass_rate'] = round(100.0 * row['passed'] / total, 1)
+ row['total_time_secs'] = round(row['avg_secs'] * row['event_count'], 0)
+ del row['event_count']
+
+ # Also add tests that only have daily stats (all passed, no individual events)
+ existing_cmds = {r['test_cmd'] for r in by_test}
+ for cmd, ds in daily_stats_by_test.items():
+ if cmd not in existing_cmds and not status:
+ passed = ds.get('passed', 0) or 0
+ failed = ds.get('failed', 0) or 0
+ flaked = ds.get('flaked', 0) or 0
+ total = passed + failed + flaked
+ if total > 0:
+ by_test.append({
+ 'test_cmd': cmd, 'count': total,
+ 'avg_secs': None, 'min_secs': None, 'max_secs': None,
+ 'passed': passed, 'failed': failed, 'flaked': flaked,
+ 'pass_rate': round(100.0 * passed / total, 1),
+ 'total_time_secs': 0, 'dashboard': '',
+ })
+ by_test.sort(key=lambda r: r['count'], reverse=True)
+
+ # Daily time series from daily stats
by_date = db.query(f'''
+ SELECT date,
+ SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked,
+ SUM(passed) + SUM(failed) + SUM(flaked) as count
+ FROM test_daily_stats {ds_where}
+ GROUP BY date
+ ORDER BY date
+ ''', ds_params)
+
+ # Enrich with timing from test_events
+ timing_by_date = {r['date']: r for r in db.query(f'''
SELECT substr(timestamp, 1, 10) as date,
- COUNT(*) as count,
ROUND(AVG(duration_secs), 1) as avg_secs,
- ROUND(MAX(duration_secs), 1) as max_secs,
- SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
- SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as flaked
+ ROUND(MAX(duration_secs), 1) as max_secs
FROM test_events {where}
GROUP BY substr(timestamp, 1, 10)
- ORDER BY date
- ''', params)
-
- # Summary
- summary_rows = db.query(f'''
- SELECT COUNT(*) as count,
- ROUND(AVG(duration_secs), 1) as avg_secs,
+ ''', params)}
+ for d in by_date:
+ t = timing_by_date.get(d['date'], {})
+ d['avg_secs'] = t.get('avg_secs')
+ d['max_secs'] = t.get('max_secs')
+
+ # Summary from daily stats
+ ds_summary = db.query(f'''
+ SELECT SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked
+ FROM test_daily_stats {ds_where}
+ ''', ds_params)
+ ds_s = ds_summary[0] if ds_summary else {}
+
+ # Timing summary from test_events
+ timing_summary = db.query(f'''
+ SELECT ROUND(AVG(duration_secs), 1) as avg_secs,
ROUND(MAX(duration_secs), 1) as max_secs,
- SUM(duration_secs) as total_secs,
- SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed,
- SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as flaked
+ SUM(duration_secs) as total_secs
FROM test_events {where}
''', params)
- s = summary_rows[0] if summary_rows else {}
+ ts = timing_summary[0] if timing_summary else {}
+
+ passed = ds_s.get('passed', 0) or 0
+ failed = ds_s.get('failed', 0) or 0
+ flaked = ds_s.get('flaked', 0) or 0
# Slowest individual test runs
slowest = db.query(f'''
@@ -797,14 +840,15 @@ def api_test_timings():
'by_test': by_test,
'by_date': by_date,
'slowest': slowest,
+ 'period': {'from': date_from, 'to': date_to},
'summary': {
- 'total_runs': s.get('count', 0),
- 'avg_duration_secs': s.get('avg_secs'),
- 'max_duration_secs': s.get('max_secs'),
- 'total_compute_secs': round(s.get('total_secs', 0) or 0, 0),
- 'passed': s.get('passed', 0),
- 'failed': s.get('failed', 0),
- 'flaked': s.get('flaked', 0),
+ 'total_runs': passed + failed + flaked,
+ 'avg_duration_secs': ts.get('avg_secs'),
+ 'max_duration_secs': ts.get('max_secs'),
+ 'total_compute_secs': round(ts.get('total_secs', 0) or 0, 0),
+ 'passed': passed,
+ 'failed': failed,
+ 'flaked': flaked,
},
})
diff --git a/ci3/ci-metrics/billing/billing-dashboard.html b/ci3/ci-metrics/billing/billing-dashboard.html
index 87193ffae207..47d4832ca43d 100644
--- a/ci3/ci-metrics/billing/billing-dashboard.html
+++ b/ci3/ci-metrics/billing/billing-dashboard.html
@@ -58,6 +58,7 @@
cost overview
namespace billing
ci insights
+ test timings
namespace billing
diff --git a/ci3/ci-metrics/db.py b/ci3/ci-metrics/db.py
index 93e970fe3a56..c83a372b29b6 100644
--- a/ci3/ci-metrics/db.py
+++ b/ci3/ci-metrics/db.py
@@ -7,7 +7,8 @@
import sqlite3
import threading
-_DB_PATH = os.path.join(os.getenv('LOGS_DISK_PATH', '/logs-disk'), 'metrics.db')
+_DB_PATH = os.getenv('METRICS_DB_PATH',
+ os.path.join(os.getenv('LOGS_DISK_PATH', '/logs-disk'), 'metrics.db'))
_local = threading.local()
SCHEMA = """
@@ -64,6 +65,18 @@
CREATE INDEX IF NOT EXISTS idx_ci_runs_ts ON ci_runs(timestamp_ms);
CREATE INDEX IF NOT EXISTS idx_ci_runs_name ON ci_runs(name);
CREATE INDEX IF NOT EXISTS idx_ci_runs_dashboard ON ci_runs(dashboard);
+
+CREATE TABLE IF NOT EXISTS test_daily_stats (
+ date TEXT NOT NULL,
+ test_cmd TEXT NOT NULL,
+ dashboard TEXT NOT NULL DEFAULT '',
+ passed INTEGER NOT NULL DEFAULT 0,
+ failed INTEGER NOT NULL DEFAULT 0,
+ flaked INTEGER NOT NULL DEFAULT 0,
+ PRIMARY KEY (date, test_cmd, dashboard)
+);
+CREATE INDEX IF NOT EXISTS idx_tds_date ON test_daily_stats(date);
+CREATE INDEX IF NOT EXISTS idx_tds_dashboard ON test_daily_stats(dashboard);
"""
diff --git a/ci3/ci-metrics/ec2_pricing.py b/ci3/ci-metrics/ec2_pricing.py
index ace55ea4f40a..96e0561d0d70 100644
--- a/ci3/ci-metrics/ec2_pricing.py
+++ b/ci3/ci-metrics/ec2_pricing.py
@@ -16,12 +16,20 @@
# ---- Hardcoded fallback rates (us-east-2, USD/hr) ----
_HARDCODED_RATES = {
- ('m6a.48xlarge', True): 8.31, # spot
- ('m6a.48xlarge', False): 16.56, # on-demand
- ('m6a.32xlarge', True): 5.54,
- ('m6a.32xlarge', False): 11.04,
+ ('m6a.xlarge', True): 0.07, # spot
+ ('m6a.xlarge', False): 0.1728, # on-demand
+ ('m6a.4xlarge', True): 0.28,
+ ('m6a.4xlarge', False): 0.6912,
+ ('m6a.8xlarge', True): 0.55,
+ ('m6a.8xlarge', False): 1.3824,
('m6a.16xlarge', True): 2.77,
('m6a.16xlarge', False): 5.52,
+ ('m6a.24xlarge', True): 1.66,
+ ('m6a.24xlarge', False): 4.1472,
+ ('m6a.32xlarge', True): 5.54,
+ ('m6a.32xlarge', False): 11.04,
+ ('m6a.48xlarge', True): 8.31,
+ ('m6a.48xlarge', False): 16.56,
('m7a.48xlarge', True): 8.31,
('m7a.48xlarge', False): 16.56,
('m7a.16xlarge', True): 2.77,
@@ -145,8 +153,19 @@ def _fetch_all_spot(instance_types: list[str]) -> dict[str, float]:
# ---- Cache refresh ----
def _get_known_instance_types() -> list[str]:
- """Return the set of instance types we need pricing for."""
- return sorted({itype for itype, _ in _HARDCODED_RATES})
+ """Return the set of instance types we need pricing for (hardcoded + from DB)."""
+ types = {itype for itype, _ in _HARDCODED_RATES}
+ try:
+ import db
+ conn = db.get_db()
+ rows = conn.execute(
+ "SELECT DISTINCT instance_type FROM ci_runs "
+ "WHERE instance_type IS NOT NULL AND instance_type != '' AND instance_type != 'unknown'"
+ ).fetchall()
+ types.update(r['instance_type'] for r in rows)
+ except Exception:
+ pass
+ return sorted(types)
def _refresh_cache():
diff --git a/ci3/ci-metrics/metrics.py b/ci3/ci-metrics/metrics.py
index 5c0d1610e06b..481205a2b405 100644
--- a/ci3/ci-metrics/metrics.py
+++ b/ci3/ci-metrics/metrics.py
@@ -1,8 +1,9 @@
-"""CI metrics: direct Redis reads + test event listener.
+"""CI metrics: SQLite source of truth + Redis ingestion + test event listener.
-Reads CI run data directly from Redis sorted sets on each request.
+CI runs are ingested from Redis (written by log_ci_run on CI instances) and
+stored in SQLite. All reads go through SQLite so enriched fields (instance_type
+from CloudTrail, recalculated costs) are preserved.
Test events stored in SQLite since they only arrive via pub/sub.
-CI runs periodically synced from Redis to SQLite for flake correlation.
"""
import json
import re
@@ -31,7 +32,9 @@ def compute_run_cost(data: dict) -> float | None:
is_spot = bool(data.get('spot'))
rate = ec2_pricing.get_instance_rate(instance_type, is_spot)
if not rate:
- vcpus = data.get('instance_vcpus', 192)
+ vcpus = data.get('instance_vcpus')
+ if not vcpus:
+ return None # unknown instance type and no vCPU data
rate = vcpus * ec2_pricing.get_fallback_vcpu_rate(is_spot)
return round(hours * rate, 4)
@@ -116,31 +119,14 @@ def _get_ci_runs_from_sqlite(date_from_ms=None, date_to_ms=None):
return runs
-def get_ci_runs(redis_conn, date_from_ms=None, date_to_ms=None):
- """Read CI runs from Redis, backfilled with SQLite for data that Redis has flushed."""
- redis_runs = _get_ci_runs_from_redis(redis_conn, date_from_ms, date_to_ms)
+def get_ci_runs(date_from_ms=None, date_to_ms=None):
+ """Read CI runs from SQLite (the source of truth).
- # Find the earliest timestamp in Redis to know what SQLite needs to fill
- redis_keys = set()
- redis_min_ts = float('inf')
- for run in redis_runs:
- ts = run.get('timestamp', 0)
- redis_keys.add((run.get('dashboard', ''), ts, run.get('name', '')))
- if ts < redis_min_ts:
- redis_min_ts = ts
-
- # If requesting data older than what Redis has, backfill from SQLite
- sqlite_runs = []
- need_sqlite = (date_from_ms is not None and date_from_ms < redis_min_ts) or not redis_runs
- if need_sqlite:
- sqlite_to = int(redis_min_ts) if redis_runs else date_to_ms
- sqlite_runs = _get_ci_runs_from_sqlite(date_from_ms, sqlite_to)
- # Deduplicate: only include SQLite runs not already in Redis
- sqlite_runs = [r for r in sqlite_runs
- if (r.get('dashboard', ''), r.get('timestamp', 0), r.get('name', ''))
- not in redis_keys]
-
- return sqlite_runs + redis_runs
+ Redis is only an ingestion pipe — sync_ci_runs_to_sqlite() copies data in.
+ All reads go through SQLite so enriched fields (instance_type from CloudTrail,
+ recalculated costs) are always reflected.
+ """
+ return _get_ci_runs_from_sqlite(date_from_ms, date_to_ms)
def _ts_to_date(ts_ms):
@@ -149,6 +135,19 @@ def _ts_to_date(ts_ms):
# ---- Test event handling (only thing needing SQLite) ----
+def _upsert_daily_stats(status: str, test_cmd: str, dashboard: str, timestamp: str):
+ """Increment the daily counter for a test status."""
+ date = timestamp[:10] # 'YYYY-MM-DD'
+ col = status if status in ('passed', 'failed', 'flaked') else None
+ if not col:
+ return
+ db.execute(f'''
+ INSERT INTO test_daily_stats (date, test_cmd, dashboard, {col})
+ VALUES (?, ?, ?, 1)
+ ON CONFLICT(date, test_cmd, dashboard) DO UPDATE SET {col} = {col} + 1
+ ''', (date, test_cmd, dashboard))
+
+
def _handle_test_event(channel: str, data: dict):
status = channel.split(':')[-1]
# Handle field name mismatches: run_test_cmd publishes 'cmd' for failed/flaked
@@ -157,6 +156,17 @@ def _handle_test_event(channel: str, data: dict):
log_url = data.get('log_url') or data.get('log_key')
if log_url and not log_url.startswith('http'):
log_url = f'http://ci.aztec-labs.com/{log_url}'
+ dashboard = data.get('dashboard', '')
+ timestamp = data.get('timestamp', datetime.now(timezone.utc).isoformat())
+
+ # Always update daily stats (lightweight aggregate)
+ _upsert_daily_stats(status, test_cmd, dashboard, timestamp)
+
+ # Only persist individual rows for failed/flaked (for drill-down / log URLs).
+ # Passed events are tracked via daily stats only.
+ if status == 'passed':
+ return
+
db.execute('''
INSERT INTO test_events
(status, test_cmd, log_url, ref_name, commit_hash, commit_author,
@@ -176,8 +186,8 @@ def _handle_test_event(channel: str, data: dict):
1 if data.get('is_scenario_test') else 0,
json.dumps(data['owners']) if data.get('owners') else None,
data.get('flake_group_id'),
- data.get('dashboard', ''),
- data.get('timestamp', datetime.now(timezone.utc).isoformat()),
+ dashboard,
+ timestamp,
))
@@ -472,14 +482,19 @@ def _load_seed_data():
def sync_ci_runs_to_sqlite(redis_conn):
- """Sync all CI runs from Redis into SQLite for persistence."""
+ """Ingest CI runs from Redis into SQLite.
+
+ Redis is the ingestion pipe (log_ci_run writes there from CI instances).
+ SQLite is the source of truth. Fields enriched post-ingestion (instance_type,
+ cost_usd from CloudTrail resolution) are preserved — only overwritten if
+ Redis has a non-empty value.
+ """
global _ci_sync_ts
now = time.time()
if now - _ci_sync_ts < _CI_SYNC_TTL:
return
_ci_sync_ts = now
- # Sync everything Redis has (not just 30 days)
runs = _get_ci_runs_from_redis(redis_conn)
now_iso = datetime.now(timezone.utc).isoformat()
@@ -488,11 +503,32 @@ def sync_ci_runs_to_sqlite(redis_conn):
for run in runs:
try:
conn.execute('''
- INSERT OR REPLACE INTO ci_runs
+ INSERT INTO ci_runs
(dashboard, name, timestamp_ms, complete_ms, status, author,
pr_number, instance_type, instance_vcpus, spot, cost_usd,
job_id, arch, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ ON CONFLICT(dashboard, timestamp_ms, name) DO UPDATE SET
+ complete_ms = excluded.complete_ms,
+ status = excluded.status,
+ author = excluded.author,
+ pr_number = excluded.pr_number,
+ instance_vcpus = excluded.instance_vcpus,
+ spot = excluded.spot,
+ job_id = excluded.job_id,
+ arch = excluded.arch,
+ synced_at = excluded.synced_at,
+ -- Preserve enriched fields: only overwrite if Redis has real data
+ instance_type = CASE
+ WHEN excluded.instance_type IS NOT NULL AND excluded.instance_type != ''
+ THEN excluded.instance_type
+ ELSE ci_runs.instance_type
+ END,
+ cost_usd = CASE
+ WHEN excluded.instance_type IS NOT NULL AND excluded.instance_type != ''
+ THEN excluded.cost_usd
+ ELSE ci_runs.cost_usd
+ END
''', (
run.get('dashboard', ''),
run.get('name', ''),
@@ -516,15 +552,280 @@ def sync_ci_runs_to_sqlite(redis_conn):
print(f"[rk_metrics] Synced {count} CI runs to SQLite")
+def _backfill_daily_stats():
+ """Populate test_daily_stats from existing test_events rows (one-time)."""
+ conn = db.get_db()
+ # Check if daily stats are already populated
+ row = conn.execute("SELECT COUNT(*) as c FROM test_daily_stats").fetchone()
+ if row['c'] > 0:
+ return
+ conn.execute('''
+ INSERT OR IGNORE INTO test_daily_stats (date, test_cmd, dashboard, passed, failed, flaked)
+ SELECT substr(timestamp, 1, 10) as date, test_cmd, dashboard,
+ SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END),
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END),
+ SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END)
+ FROM test_events
+ GROUP BY substr(timestamp, 1, 10), test_cmd, dashboard
+ ''')
+ conn.commit()
+ count = conn.execute("SELECT COUNT(*) as c FROM test_daily_stats").fetchone()['c']
+ if count:
+ print(f"[rk_metrics] Backfilled {count} daily stat rows from test_events")
+
+
+# ---- CloudTrail instance type resolution ----
+
+_ct_resolve_ts = 0
+_CT_RESOLVE_TTL = 6 * 3600 # 6 hours
+
+
+def _fetch_cloudtrail_daily(ct, event_name, start_time, end_time, max_per_day=10000):
+ """Fetch CloudTrail events in daily chunks to avoid the 5000-event global limit."""
+ events = []
+ day = start_time.replace(hour=0, minute=0, second=0, microsecond=0)
+ while day < end_time:
+ day_end = min(day + timedelta(days=1), end_time)
+ kwargs = {
+ 'LookupAttributes': [
+ {'AttributeKey': 'EventName', 'AttributeValue': event_name},
+ ],
+ 'StartTime': day,
+ 'EndTime': day_end,
+ 'MaxResults': 50,
+ }
+ while True:
+ resp = ct.lookup_events(**kwargs)
+ events.extend(resp.get('Events', []))
+ token = resp.get('NextToken')
+ if not token or len(events) >= max_per_day:
+ break
+ kwargs['NextToken'] = token
+ day += timedelta(days=1)
+ return events
+
+
+# Name tag format: _[_]
+_NAME_TAG_RE = re.compile(r'^(.+)_(amd64|arm64)(?:_.*)?$')
+
+
+def _normalize_branch_name(name):
+ """Normalize a branch name the same way bootstrap_ec2 does for the EC2 Name tag."""
+ m = re.match(r'^gh-readonly-queue/[^/]+/pr-(\d+)', name)
+ if m:
+ return f'pr-{m.group(1)}'
+ name = re.sub(r'\s*\(queue\)$', '', name)
+ return re.sub(r'[^a-zA-Z0-9-]', '_', name[:50])
+
+
+def resolve_unknown_instance_types():
+ """Query CloudTrail for RunInstances + CreateTags events to resolve unknown instance types.
+
+ Strategy:
+ 1. Fetch RunInstances events (daily chunks) → instance_id → instance_type + launch_time
+ 2. Fetch CreateTags events (daily chunks) → instance_id → {Name, Group, Dashboard, ...}
+ Tags are accumulated across multiple events then filtered to Group=build-instance.
+ 3. Join by instance_id, then match to ci_runs by normalized branch name + arch + time window.
+ """
+ global _ct_resolve_ts
+ now = time.time()
+ if now - _ct_resolve_ts < _CT_RESOLVE_TTL:
+ return
+ _ct_resolve_ts = now
+
+ conn = db.get_db()
+ unknown_runs = conn.execute('''
+ SELECT dashboard, name, timestamp_ms, complete_ms, instance_vcpus, spot,
+ cost_usd, arch, pr_number
+ FROM ci_runs
+ WHERE (instance_type IS NULL OR instance_type = '' OR instance_type = 'unknown')
+ AND timestamp_ms > ?
+ ''', (int((time.time() - 90 * 86400) * 1000),)).fetchall()
+
+ if not unknown_runs:
+ return
+
+ try:
+ import boto3
+ except ImportError:
+ return
+
+ try:
+ ct = boto3.client('cloudtrail', region_name='us-east-2')
+ start_time = datetime.fromtimestamp(
+ min(r['timestamp_ms'] for r in unknown_runs) / 1000 - 300, tz=timezone.utc)
+ end_time = datetime.now(timezone.utc)
+
+ # Step 1: Fetch RunInstances events in daily chunks → instance_id → type + launch time
+ run_events = _fetch_cloudtrail_daily(ct, 'RunInstances', start_time, end_time)
+ instance_types = {}
+ instance_launch_times = {}
+ for event in run_events:
+ try:
+ detail = json.loads(event.get('CloudTrailEvent', '{}'))
+ itype = detail.get('requestParameters', {}).get('instanceType', '')
+ items = (detail.get('responseElements') or {}).get('instancesSet', {}).get('items', [])
+ for item in items:
+ iid = item.get('instanceId', '')
+ item_type = item.get('instanceType', '') or itype
+ if iid and item_type:
+ instance_types[iid] = item_type
+ instance_launch_times[iid] = int(event['EventTime'].timestamp() * 1000)
+ except Exception:
+ continue
+
+ if not instance_types:
+ print("[rk_metrics] CloudTrail: no RunInstances events found")
+ return
+
+ # Step 2: Fetch CreateTags events in daily chunks.
+ # Accumulate ALL tags per instance first, then filter to build instances.
+ # This handles the case where Name, Group, and Dashboard are set in separate
+ # create-tags API calls (aws_request_instance_type lines 97, 126, 127).
+ tag_events = _fetch_cloudtrail_daily(ct, 'CreateTags', start_time, end_time)
+ all_instance_tags = {}
+ for event in tag_events:
+ try:
+ detail = json.loads(event.get('CloudTrailEvent', '{}'))
+ req = detail.get('requestParameters', {})
+ resources = req.get('resourcesSet', {}).get('items', [])
+ tags = req.get('tagSet', {}).get('items', [])
+ tag_dict = {t.get('key', ''): t.get('value', '') for t in tags}
+ for res in resources:
+ rid = res.get('resourceId', '')
+ if rid.startswith('i-'):
+ if rid not in all_instance_tags:
+ all_instance_tags[rid] = {}
+ all_instance_tags[rid].update(tag_dict)
+ except Exception:
+ continue
+
+ # Filter to build instances
+ instance_tags = {
+ iid: tags for iid, tags in all_instance_tags.items()
+ if tags.get('Group') == 'build-instance'
+ }
+
+ # Step 3: Join RunInstances + CreateTags by instance_id
+ instances = []
+ for iid, itype in instance_types.items():
+ tags = instance_tags.get(iid, {})
+ if not tags.get('Name'):
+ continue
+ instances.append({
+ 'instance_type': itype,
+ 'launch_ms': instance_launch_times.get(iid, 0),
+ 'dashboard': tags.get('Dashboard', ''),
+ 'name_tag': tags.get('Name', ''),
+ })
+
+ # Build index: normalized branch name → [instances]
+ tag_index = {}
+ for inst in instances:
+ m = _NAME_TAG_RE.match(inst['name_tag'])
+ if m:
+ tag_index.setdefault(m.group(1), []).append(inst)
+ else:
+ tag_index.setdefault(inst['name_tag'], []).append(inst)
+
+ # Step 4: Match unknown runs to instances
+ updated = 0
+ for run in unknown_runs:
+ run_name = run['name']
+ run_arch = run['arch'] or ''
+ run_ts = run['timestamp_ms']
+ run_dashboard = run['dashboard']
+
+ expected_name = _normalize_branch_name(run_name)
+ candidates = tag_index.get(expected_name, [])
+
+ best = None
+ for inst in candidates:
+ # Verify arch matches
+ if run_arch:
+ m = _NAME_TAG_RE.match(inst['name_tag'])
+ if m and m.group(2) != run_arch:
+ continue
+
+ # Verify dashboard matches (if tag present)
+ if inst['dashboard'] and inst['dashboard'] != run_dashboard:
+ continue
+
+ # CI run starts after instance launch; allow up to 90 min (instance lifetime)
+ delta = run_ts - inst['launch_ms']
+ if delta < -60_000 or delta > 5400_000:
+ continue
+
+ # Prefer most recently launched instance before the run
+ if delta >= 0 and (best is None or inst['launch_ms'] > best['launch_ms']):
+ best = inst
+ elif best is None and abs(delta) < 60_000:
+ best = inst
+
+ if best:
+ itype = best['instance_type']
+ is_spot = bool(run['spot'])
+ rate = ec2_pricing.get_instance_rate(itype, is_spot)
+ new_cost = run['cost_usd']
+ if rate and run['complete_ms'] and run['timestamp_ms']:
+ hours = (run['complete_ms'] - run['timestamp_ms']) / 3_600_000
+ new_cost = round(hours * rate, 4)
+ conn.execute('''
+ UPDATE ci_runs SET instance_type = ?, cost_usd = ?
+ WHERE dashboard = ? AND timestamp_ms = ? AND name = ?
+ ''', (itype, new_cost, run['dashboard'], run['timestamp_ms'], run['name']))
+ updated += 1
+
+ conn.commit()
+ if updated:
+ print(f"[rk_metrics] CloudTrail: resolved {updated}/{len(unknown_runs)} unknown instance types")
+ else:
+ print(f"[rk_metrics] CloudTrail: {len(instances)} instances, "
+ f"0/{len(unknown_runs)} matched")
+ except Exception as e:
+ print(f"[rk_metrics] CloudTrail resolution failed: {e}")
+
+
+def recalculate_all_costs():
+ """Recalculate cost_usd for all ci_runs based on current instance_type and pricing."""
+ conn = db.get_db()
+ runs = conn.execute('''
+ SELECT dashboard, name, timestamp_ms, complete_ms, instance_type,
+ instance_vcpus, spot, cost_usd
+ FROM ci_runs
+ WHERE complete_ms IS NOT NULL AND complete_ms > 0
+ ''').fetchall()
+ updated = 0
+ for run in runs:
+ cost = compute_run_cost({
+ 'complete': run['complete_ms'],
+ 'timestamp': run['timestamp_ms'],
+ 'instance_type': run['instance_type'] or 'unknown',
+ 'spot': run['spot'],
+ 'instance_vcpus': run['instance_vcpus'],
+ })
+ if cost is not None and cost != run['cost_usd']:
+ conn.execute('''
+ UPDATE ci_runs SET cost_usd = ?
+ WHERE dashboard = ? AND timestamp_ms = ? AND name = ?
+ ''', (cost, run['dashboard'], run['timestamp_ms'], run['name']))
+ updated += 1
+ conn.commit()
+ print(f"[rk_metrics] Recalculated costs: {updated}/{len(runs)} changed")
+ return updated
+
+
def start_ci_run_sync(redis_conn):
"""Start periodic CI run + test event sync thread."""
_load_seed_data()
+ _backfill_daily_stats()
def loop():
while True:
try:
sync_ci_runs_to_sqlite(redis_conn)
sync_failed_tests_to_sqlite(redis_conn)
+ resolve_unknown_instance_types()
except Exception as e:
print(f"[rk_metrics] sync error: {e}")
time.sleep(600) # check every 10 min (TTL gates actual work)
diff --git a/ci3/ci-metrics/views/ci-insights.html b/ci3/ci-metrics/views/ci-insights.html
index 533b6bfb62cd..3d55695d1cd9 100644
--- a/ci3/ci-metrics/views/ci-insights.html
+++ b/ci3/ci-metrics/views/ci-insights.html
@@ -62,9 +62,10 @@
cost overview
namespace billing
ci insights
+ test timings
- ci insights
+ ci insights
@@ -107,7 +108,7 @@
merge queue: daily outcomes + success rate
-
flakes + test failures per day
+
test outcomes per day
@@ -231,6 +232,12 @@ flakes + test failures per day
Object.values(charts).forEach(c => c.destroy());
charts = {};
+ // Show period
+ const period = perfData?.period || costData?.period;
+ if (period) {
+ document.getElementById('period-label').textContent = `(${period.from} to ${period.to})`;
+ }
+
renderKPIs();
renderCostChart();
renderMQChart();
@@ -399,7 +406,7 @@ flakes + test failures per day
});
}
- // ---- Flake / test failure chart ----
+ // ---- Test outcomes chart (successes + flakes + failures) ----
function renderFlakeChart() {
const byDate = perfData?.by_date || [];
const dates = byDate.map(d => d.date);
@@ -408,8 +415,9 @@ flakes + test failures per day
data: {
labels: dates.map(fmtDate),
datasets: [
- { type: 'bar', label: 'Flakes', data: byDate.map(d => d.flake_count || 0), backgroundColor: '#f0883e', borderWidth: 0, order: 2 },
- { type: 'bar', label: 'Test Failures', data: byDate.map(d => d.test_failure_count || 0), backgroundColor: '#f85149', borderWidth: 0, order: 2 },
+ { type: 'bar', label: 'Successes', data: byDate.map(d => d.test_success_count || 0), backgroundColor: '#3fb950', borderWidth: 0, order: 3 },
+ { type: 'bar', label: 'Flakes', data: byDate.map(d => d.flake_count || 0), backgroundColor: '#f0883e', borderWidth: 0, order: 3 },
+ { type: 'bar', label: 'Test Failures', data: byDate.map(d => d.test_failure_count || 0), backgroundColor: '#f85149', borderWidth: 0, order: 3 },
{ type: 'line', label: 'CI Failure Rate %', data: byDate.map(d => d.failure_rate || 0), borderColor: '#ff7b72', borderWidth: 2, pointRadius: 0, tension: 0.3, yAxisID: 'y1', order: 1 },
]
},
@@ -417,8 +425,8 @@ flakes + test failures per day
responsive: true, maintainAspectRatio: false,
interaction: { mode: 'index', intersect: false },
scales: {
- x: { ticks: { color: '#555', maxRotation: 45 }, grid: { color: '#181818' } },
- y: { position: 'left', ticks: { color: '#555' }, grid: { color: '#181818' } },
+ x: { stacked: true, ticks: { color: '#555', maxRotation: 45 }, grid: { color: '#181818' } },
+ y: { stacked: true, position: 'left', ticks: { color: '#555' }, grid: { color: '#181818' } },
y1: { position: 'right', min: 0, ticks: { color: '#ff7b72', callback: v => v + '%' }, grid: { display: false } }
},
plugins: {
diff --git a/ci3/ci-metrics/views/cost-overview.html b/ci3/ci-metrics/views/cost-overview.html
index 53424a2d2d70..6c8f9718429d 100644
--- a/ci3/ci-metrics/views/cost-overview.html
+++ b/ci3/ci-metrics/views/cost-overview.html
@@ -74,9 +74,10 @@
cost overview
namespace billing
ci insights
+ test timings
- cost overview
+ cost overview
@@ -248,6 +249,10 @@
c.destroy());
charts = {};
+ if (apiData.period) {
+ document.getElementById('period-label').textContent = `(${apiData.period.from} to ${apiData.period.to})`;
+ }
+
const byDate = apiData.by_date;
const totals = apiData.totals;
const days = byDate.length || 1;
diff --git a/ci3/ci-metrics/views/test-timings.html b/ci3/ci-metrics/views/test-timings.html
index 0bf6c7213bd6..40cd5199ab73 100644
--- a/ci3/ci-metrics/views/test-timings.html
+++ b/ci3/ci-metrics/views/test-timings.html
@@ -54,7 +54,7 @@
test timings
- test timings
+ test timings
@@ -193,6 +193,9 @@
/dev/null || echo "unknown")
+ instance_type=${EC2_INSTANCE_TYPE:-$(aws_get_meta_data instance-type 2>/dev/null)}
+ instance_type=${instance_type:-unknown}
instance_vcpus=$(nproc 2>/dev/null || echo 0)
# Extract PR number from branch name or merge queue ref