Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/owletto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { computePendingWindow } from '../../../utils/window-utils';
import {
dispatchPendingWatcherRuns,
materializeDueWatcherRuns,
sweepStaleWatcherRuns,
} from '../../../watchers/automation';
import { generateSecureToken, hashToken } from '../../../auth/oauth/utils';
import { cleanupTestDatabase, getTestDb } from '../../setup/test-db';
Expand Down Expand Up @@ -1043,4 +1044,98 @@ describe('watcher automation contract', () => {
expect(secondNextRunMs).toBe(firstNextRunMs);
});
});

describe('sweepStaleWatcherRuns liveness reaping', () => {
// Seed a `running` watcher run with controlled claim/heartbeat ages.
// Omitting `heartbeatAgo` mirrors a client that never heartbeats — the
// claim sets last_heartbeat_at == claimed_at, so the row must fall to the
// coarse 2h path, never the fast heartbeat path.
async function seedRunningWatcherRun(opts: {
claimedAgo: string;
heartbeatAgo?: string;
}) {
const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher();
const granularity = inferWatcherGranularityFromSchedule('0 9 * * *');
const { windowStart, windowEnd } = await computePendingWindow(
dbClient,
watcherId,
granularity
);
const queued = await createWatcherRun({
organizationId: workspace.org.id,
watcherId,
agentId: agent.agentId,
windowStart: windowStart.toISOString(),
windowEnd: windowEnd.toISOString(),
dispatchSource: 'scheduled',
deviceWorkerId: '11111111-1111-1111-1111-111111111111',
agentKind: 'claude-code',
});
const heartbeatAgo = opts.heartbeatAgo ?? opts.claimedAgo;
await sql`
UPDATE runs
SET status = 'running',
claimed_by = 'mac-sweep-test',
claimed_at = NOW() - ${opts.claimedAgo}::interval,
last_heartbeat_at = NOW() - ${heartbeatAgo}::interval
WHERE id = ${queued.runId}
`;
return { sql, runId: queued.runId };
}

it('reaps a heartbeating run that went silent past the heartbeat window', async () => {
// Beat once after claim (10m ago > claim 1h ago), then silent >3min.
const { sql, runId } = await seedRunningWatcherRun({
claimedAgo: '1 hour',
heartbeatAgo: '10 minutes',
});
const { timedOut } = await sweepStaleWatcherRuns(sql);
expect(timedOut).toBeGreaterThanOrEqual(1);
const [row] = await sql`SELECT status, error_message FROM runs WHERE id = ${runId}`;
expect(String(row.status)).toBe('timeout');
expect(String(row.error_message ?? '')).toMatch(/heartbeat went silent/i);
});

it('leaves a run with a fresh heartbeat running', async () => {
const { sql, runId } = await seedRunningWatcherRun({
claimedAgo: '1 hour',
heartbeatAgo: '30 seconds',
});
await sweepStaleWatcherRuns(sql);
const [row] = await sql`SELECT status FROM runs WHERE id = ${runId}`;
expect(String(row.status)).toBe('running');
});

it('does not coarse-reap a live heartbeating run older than the 2h TTL', async () => {
// Claimed 3h ago but heartbeating fresh (30s) → still alive. The coarse
// 2h backstop must NOT touch it; only the (un-lapsed) fast path governs
// a heartbeating run. Guards against killing a legitimately long turn.
const { sql, runId } = await seedRunningWatcherRun({
claimedAgo: '3 hours',
heartbeatAgo: '30 seconds',
});
await sweepStaleWatcherRuns(sql);
const [row] = await sql`SELECT status FROM runs WHERE id = ${runId}`;
expect(String(row.status)).toBe('running');
});

it('does not fast-reap a recent run that never heartbeats', async () => {
// last_heartbeat_at == claimed_at (no beat) + only 30m old → the fast
// path must NOT fire; it stays running until the 2h coarse backstop.
// Backward-compat guard for clients that do not heartbeat.
const { sql, runId } = await seedRunningWatcherRun({ claimedAgo: '30 minutes' });
await sweepStaleWatcherRuns(sql);
const [row] = await sql`SELECT status FROM runs WHERE id = ${runId}`;
expect(String(row.status)).toBe('running');
});

it('reaps a non-heartbeating run via the coarse 2h backstop', async () => {
const { sql, runId } = await seedRunningWatcherRun({ claimedAgo: '3 hours' });
const { timedOut } = await sweepStaleWatcherRuns(sql);
expect(timedOut).toBeGreaterThanOrEqual(1);
const [row] = await sql`SELECT status, error_message FROM runs WHERE id = ${runId}`;
expect(String(row.status)).toBe('timeout');
expect(String(row.error_message ?? '')).toMatch(/2 hours/i);
});
});
});
62 changes: 52 additions & 10 deletions packages/server/src/watchers/automation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,34 +406,76 @@ export async function reconcileWatcherRuns(db?: DbClient): Promise<ReconcileWatc
}

/**
* Coarse backstop for watcher runs that never reached terminal state.
* Backstop for watcher runs that never reached terminal state.
*
* The primary lifecycle is driven by WatcherRunTracker (in-process completion
* events) plus startup reconciliation on gateway boot. This sweeper only
* catches truly stuck runs — the tracker entry was lost without a crash
* (graceful shutdown mid-turn, queue message silently dropped, etc). TTL is
* intentionally generous so long LLM turns are not killed.
* events) plus startup reconciliation on gateway boot. This sweeper catches
* stuck runs — the tracker entry was lost without a clean completion (graceful
* shutdown mid-turn, queue message silently dropped, the device executor
* crashing or its process being abandoned, etc).
*
* Two reap paths, both keyed on the run's OWN liveness so they're correct
* under N replicas (a run actively executing anywhere keeps its heartbeat
* fresh, so no replica's sweep touches it):
*
* 1. Heartbeat-stale (fast, ~minutes): a run whose executor heartbeats —
* the device WatcherDispatcher beats every {@link WATCHER_HEARTBEAT_MS}ms
* during the turn — and has gone silent past the window. We require
* `last_heartbeat_at > claimed_at` (i.e. it beat at least once after being
* claimed) so this NEVER fires for a client that doesn't heartbeat: the
* claim sets `last_heartbeat_at == claimed_at`, so a non-heartbeating run
* stays equal and falls through to the coarse path. Fully backward
* compatible with older Mac apps.
* 2. Coarse TTL (generous, 2h): the legacy backstop for runs that never
* heartbeat — measured from the claim/creation. Kept so a long but live
* non-heartbeating turn isn't killed prematurely.
*/
const WATCHER_RUN_STALE_INTERVAL = '2 hours';

/** ~4 missed 30s device heartbeats. A heartbeating executor that goes silent
* this long is crashed/abandoned; a live one (beats every ~30s) never lapses. */
const WATCHER_RUN_HEARTBEAT_STALE_INTERVAL = '3 minutes';

export async function sweepStaleWatcherRuns(
db?: DbClient
): Promise<{ timedOut: number }> {
const sql = db ?? getDb();
const errorMessage = `Watcher run exceeded ${WATCHER_RUN_STALE_INTERVAL} without reaching terminal state`;
const heartbeatError = `Watcher run heartbeat went silent for over ${WATCHER_RUN_HEARTBEAT_STALE_INTERVAL} — the executor crashed or was abandoned`;
const coarseError = `Watcher run exceeded ${WATCHER_RUN_STALE_INTERVAL} without reaching terminal state`;
const result = await sql`
UPDATE runs
SET status = 'timeout',
completed_at = current_timestamp,
error_message = ${errorMessage}
error_message = CASE
WHEN last_heartbeat_at IS NOT NULL
AND claimed_at IS NOT NULL
AND last_heartbeat_at > claimed_at
THEN ${heartbeatError}
ELSE ${coarseError}
END
WHERE run_type = 'watcher'
AND status IN ('running', 'claimed')
AND COALESCE(claimed_at, created_at)
< current_timestamp - ${WATCHER_RUN_STALE_INTERVAL}::interval
AND (
-- Fast path: the executor was heartbeating, then went silent.
(last_heartbeat_at IS NOT NULL
AND claimed_at IS NOT NULL
AND last_heartbeat_at > claimed_at
AND last_heartbeat_at
< current_timestamp - ${WATCHER_RUN_HEARTBEAT_STALE_INTERVAL}::interval)
OR
-- Coarse backstop: ONLY for runs that never heartbeated. A heartbeating
-- run is governed solely by the fast path above — so a live one that
-- legitimately runs past 2h (fresh heartbeat) is never killed here.
((last_heartbeat_at IS NULL
OR claimed_at IS NULL
OR last_heartbeat_at <= claimed_at)
AND COALESCE(claimed_at, created_at)
< current_timestamp - ${WATCHER_RUN_STALE_INTERVAL}::interval)
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
`;
const timedOut = Number(result.count ?? 0);
if (timedOut > 0) {
logger.warn({ timedOut }, '[watchers] Swept stale watcher runs past coarse TTL');
logger.warn({ timedOut }, '[watchers] Swept stale watcher runs');
}
return { timedOut };
}
Expand Down
Loading