diff --git a/packages/owletto b/packages/owletto index 6e7243ffa..887e8626a 160000 --- a/packages/owletto +++ b/packages/owletto @@ -1 +1 @@ -Subproject commit 6e7243ffa1262876430755f3405423995d937eb5 +Subproject commit 887e8626a194da652bde66772c3cf6dfeb704820 diff --git a/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts b/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts index 8896c1df8..9cc8a68b8 100644 --- a/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts +++ b/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts @@ -17,6 +17,7 @@ import { computePendingWindow } from '../../../utils/window-utils'; import { dispatchPendingWatcherRuns, materializeDueWatcherRuns, + sweepStaleWatcherRuns, } from '../../../watchers/automation'; import { generateSecureToken, hashToken } from '../../../auth/oauth/utils'; import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; @@ -1043,4 +1044,98 @@ describe('watcher automation contract', () => { expect(secondNextRunMs).toBe(firstNextRunMs); }); }); + + describe('sweepStaleWatcherRuns liveness reaping', () => { + // Seed a `running` watcher run with controlled claim/heartbeat ages. + // Omitting `heartbeatAgo` mirrors a client that never heartbeats — the + // claim sets last_heartbeat_at == claimed_at, so the row must fall to the + // coarse 2h path, never the fast heartbeat path. + async function seedRunningWatcherRun(opts: { + claimedAgo: string; + heartbeatAgo?: string; + }) { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '11111111-1111-1111-1111-111111111111', + agentKind: 'claude-code', + }); + const heartbeatAgo = opts.heartbeatAgo ?? opts.claimedAgo; + await sql` + UPDATE runs + SET status = 'running', + claimed_by = 'mac-sweep-test', + claimed_at = NOW() - ${opts.claimedAgo}::interval, + last_heartbeat_at = NOW() - ${heartbeatAgo}::interval + WHERE id = ${queued.runId} + `; + return { sql, runId: queued.runId }; + } + + it('reaps a heartbeating run that went silent past the heartbeat window', async () => { + // Beat once after claim (10m ago > claim 1h ago), then silent >3min. + const { sql, runId } = await seedRunningWatcherRun({ + claimedAgo: '1 hour', + heartbeatAgo: '10 minutes', + }); + const { timedOut } = await sweepStaleWatcherRuns(sql); + expect(timedOut).toBeGreaterThanOrEqual(1); + const [row] = await sql`SELECT status, error_message FROM runs WHERE id = ${runId}`; + expect(String(row.status)).toBe('timeout'); + expect(String(row.error_message ?? '')).toMatch(/heartbeat went silent/i); + }); + + it('leaves a run with a fresh heartbeat running', async () => { + const { sql, runId } = await seedRunningWatcherRun({ + claimedAgo: '1 hour', + heartbeatAgo: '30 seconds', + }); + await sweepStaleWatcherRuns(sql); + const [row] = await sql`SELECT status FROM runs WHERE id = ${runId}`; + expect(String(row.status)).toBe('running'); + }); + + it('does not coarse-reap a live heartbeating run older than the 2h TTL', async () => { + // Claimed 3h ago but heartbeating fresh (30s) → still alive. The coarse + // 2h backstop must NOT touch it; only the (un-lapsed) fast path governs + // a heartbeating run. Guards against killing a legitimately long turn. + const { sql, runId } = await seedRunningWatcherRun({ + claimedAgo: '3 hours', + heartbeatAgo: '30 seconds', + }); + await sweepStaleWatcherRuns(sql); + const [row] = await sql`SELECT status FROM runs WHERE id = ${runId}`; + expect(String(row.status)).toBe('running'); + }); + + it('does not fast-reap a recent run that never heartbeats', async () => { + // last_heartbeat_at == claimed_at (no beat) + only 30m old → the fast + // path must NOT fire; it stays running until the 2h coarse backstop. + // Backward-compat guard for clients that do not heartbeat. + const { sql, runId } = await seedRunningWatcherRun({ claimedAgo: '30 minutes' }); + await sweepStaleWatcherRuns(sql); + const [row] = await sql`SELECT status FROM runs WHERE id = ${runId}`; + expect(String(row.status)).toBe('running'); + }); + + it('reaps a non-heartbeating run via the coarse 2h backstop', async () => { + const { sql, runId } = await seedRunningWatcherRun({ claimedAgo: '3 hours' }); + const { timedOut } = await sweepStaleWatcherRuns(sql); + expect(timedOut).toBeGreaterThanOrEqual(1); + const [row] = await sql`SELECT status, error_message FROM runs WHERE id = ${runId}`; + expect(String(row.status)).toBe('timeout'); + expect(String(row.error_message ?? '')).toMatch(/2 hours/i); + }); + }); }); diff --git a/packages/server/src/watchers/automation.ts b/packages/server/src/watchers/automation.ts index df759547d..5b357e496 100644 --- a/packages/server/src/watchers/automation.ts +++ b/packages/server/src/watchers/automation.ts @@ -406,34 +406,76 @@ export async function reconcileWatcherRuns(db?: DbClient): Promise claimed_at` (i.e. it beat at least once after being + * claimed) so this NEVER fires for a client that doesn't heartbeat: the + * claim sets `last_heartbeat_at == claimed_at`, so a non-heartbeating run + * stays equal and falls through to the coarse path. Fully backward + * compatible with older Mac apps. + * 2. Coarse TTL (generous, 2h): the legacy backstop for runs that never + * heartbeat — measured from the claim/creation. Kept so a long but live + * non-heartbeating turn isn't killed prematurely. */ const WATCHER_RUN_STALE_INTERVAL = '2 hours'; +/** ~4 missed 30s device heartbeats. A heartbeating executor that goes silent + * this long is crashed/abandoned; a live one (beats every ~30s) never lapses. */ +const WATCHER_RUN_HEARTBEAT_STALE_INTERVAL = '3 minutes'; + export async function sweepStaleWatcherRuns( db?: DbClient ): Promise<{ timedOut: number }> { const sql = db ?? getDb(); - const errorMessage = `Watcher run exceeded ${WATCHER_RUN_STALE_INTERVAL} without reaching terminal state`; + const heartbeatError = `Watcher run heartbeat went silent for over ${WATCHER_RUN_HEARTBEAT_STALE_INTERVAL} — the executor crashed or was abandoned`; + const coarseError = `Watcher run exceeded ${WATCHER_RUN_STALE_INTERVAL} without reaching terminal state`; const result = await sql` UPDATE runs SET status = 'timeout', completed_at = current_timestamp, - error_message = ${errorMessage} + error_message = CASE + WHEN last_heartbeat_at IS NOT NULL + AND claimed_at IS NOT NULL + AND last_heartbeat_at > claimed_at + THEN ${heartbeatError} + ELSE ${coarseError} + END WHERE run_type = 'watcher' AND status IN ('running', 'claimed') - AND COALESCE(claimed_at, created_at) - < current_timestamp - ${WATCHER_RUN_STALE_INTERVAL}::interval + AND ( + -- Fast path: the executor was heartbeating, then went silent. + (last_heartbeat_at IS NOT NULL + AND claimed_at IS NOT NULL + AND last_heartbeat_at > claimed_at + AND last_heartbeat_at + < current_timestamp - ${WATCHER_RUN_HEARTBEAT_STALE_INTERVAL}::interval) + OR + -- Coarse backstop: ONLY for runs that never heartbeated. A heartbeating + -- run is governed solely by the fast path above — so a live one that + -- legitimately runs past 2h (fresh heartbeat) is never killed here. + ((last_heartbeat_at IS NULL + OR claimed_at IS NULL + OR last_heartbeat_at <= claimed_at) + AND COALESCE(claimed_at, created_at) + < current_timestamp - ${WATCHER_RUN_STALE_INTERVAL}::interval) + ) `; const timedOut = Number(result.count ?? 0); if (timedOut > 0) { - logger.warn({ timedOut }, '[watchers] Swept stale watcher runs past coarse TTL'); + logger.warn({ timedOut }, '[watchers] Swept stale watcher runs'); } return { timedOut }; }