diff --git a/db/migrations/20260518010000_runs_heartbeat_reaper_index.sql b/db/migrations/20260518010000_runs_heartbeat_reaper_index.sql new file mode 100644 index 000000000..05c964002 --- /dev/null +++ b/db/migrations/20260518010000_runs_heartbeat_reaper_index.sql @@ -0,0 +1,22 @@ +-- migrate:up + +-- Partial index supporting the connector-lane stale-run reaper. The sweeper +-- query in reapStaleRuns() (packages/server/src/scheduled/check-stalled-executions.ts) +-- filters runs in the in-progress states (`claimed`, `running`) whose +-- `last_heartbeat_at` is older than the configured threshold. Without this +-- index every reaper tick does a full scan of `runs`. +-- +-- Restricted to the connector lanes (sync, action, embed_backfill, auth). The +-- lobu-queue lanes (chat_message, schedule, agent_run, internal, task) have +-- their own per-claim sweep inside RunsQueue keyed on `claimed_at`, not +-- `last_heartbeat_at`. The `watcher` lane has a dedicated 2h-TTL sweep in +-- watchers/automation.ts. + +CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight + ON public.runs (last_heartbeat_at) + WHERE status IN ('claimed', 'running') + AND run_type IN ('sync', 'action', 'embed_backfill', 'auth'); + +-- migrate:down + +DROP INDEX IF EXISTS public.idx_runs_heartbeat_inflight; diff --git a/db/schema.sql b/db/schema.sql index b803be343..c448e999f 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -3700,6 +3700,12 @@ CREATE UNIQUE INDEX idx_runs_dispatched_message_id ON public.runs USING btree (d CREATE INDEX idx_runs_feed ON public.runs USING btree (feed_id); +-- +-- Name: idx_runs_heartbeat_inflight; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_runs_heartbeat_inflight ON public.runs USING btree (last_heartbeat_at) WHERE ((status = ANY (ARRAY['claimed'::text, 'running'::text])) AND (run_type = ANY (ARRAY['sync'::text, 'action'::text, 'embed_backfill'::text, 'auth'::text]))); + -- -- Name: idx_runs_org; Type: INDEX; Schema: public; Owner: - -- @@ -5071,4 +5077,5 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260517060000'), ('20260517150000'), ('20260517160000'), - ('20260518000000'); + ('20260518000000'), + ('20260518010000'); diff --git a/packages/server/src/db/embedded-schema-patches.ts b/packages/server/src/db/embedded-schema-patches.ts index 3c1bb4822..71ddc4376 100644 --- a/packages/server/src/db/embedded-schema-patches.ts +++ b/packages/server/src/db/embedded-schema-patches.ts @@ -846,4 +846,20 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [ `); }, }, + { + id: 'runs-heartbeat-reaper-index', + apply: async (sql) => { + // Mirrors db/migrations/20260518010000_runs_heartbeat_reaper_index.sql. + // Supports the connector-lane stale-run reaper in + // scheduled/check-stalled-executions.ts. Restricted to the connector + // lanes; the lobu-queue lanes have their own sweep in RunsQueue and + // the watcher lane is handled by watchers/automation.ts. + await sql.unsafe(` + CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight + ON public.runs (last_heartbeat_at) + WHERE status IN ('claimed', 'running') + AND run_type IN ('sync', 'action', 'embed_backfill', 'auth') + `); + }, + }, ]; diff --git a/packages/server/src/scheduled/__tests__/stale-run-reaper.test.ts b/packages/server/src/scheduled/__tests__/stale-run-reaper.test.ts new file mode 100644 index 000000000..953c3cb50 --- /dev/null +++ b/packages/server/src/scheduled/__tests__/stale-run-reaper.test.ts @@ -0,0 +1,191 @@ +/** + * Integration test for the connector-lane stale-run reaper. Seeds three + * connector runs into PGlite and asserts the reaper only fails the one that + * is in-progress with a stale `last_heartbeat_at`. Also exercises the + * advisory-lock contention path: a second concurrent caller while the lock + * is held no-ops instead of double-failing the row. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, test } from 'bun:test'; +import { getDb } from '../../db/client'; +import { + ensurePgliteForGatewayTests, + resetTestDatabase, +} from '../../gateway/__tests__/helpers/db-setup'; +import { reapStaleRuns } from '../check-stalled-executions'; + +const ORG_ID = 'reaper-org'; +const STALE_THRESHOLD_SECONDS = 60; + +beforeAll(async () => { + await ensurePgliteForGatewayTests(); + process.env.RUNS_REAPER_STALE_AFTER_SECONDS = String(STALE_THRESHOLD_SECONDS); +}); + +afterAll(() => { + delete process.env.RUNS_REAPER_STALE_AFTER_SECONDS; +}); + +beforeEach(async () => { + await resetTestDatabase(); + const sql = getDb(); + await sql` + INSERT INTO organization (id, name, slug) + VALUES (${ORG_ID}, ${ORG_ID}, ${ORG_ID}) + ON CONFLICT (id) DO NOTHING + `; +}); + +interface SeedRunOpts { + status: 'pending' | 'claimed' | 'running' | 'completed'; + lastHeartbeatAgoSeconds: number | null; + claimedAtAgoSeconds?: number | null; + runType?: 'sync' | 'action' | 'embed_backfill' | 'auth' | 'watcher'; + feedId?: number | null; +} + +async function seedRun(opts: SeedRunOpts): Promise { + const sql = getDb(); + const runType = opts.runType ?? 'sync'; + const hbInterval = + opts.lastHeartbeatAgoSeconds !== null + ? `current_timestamp - interval '${opts.lastHeartbeatAgoSeconds} seconds'` + : 'NULL'; + const claimInterval = + opts.claimedAtAgoSeconds !== null && opts.claimedAtAgoSeconds !== undefined + ? `current_timestamp - interval '${opts.claimedAtAgoSeconds} seconds'` + : 'NULL'; + const rows = (await sql.unsafe( + `INSERT INTO runs ( + organization_id, run_type, feed_id, status, approval_status, + claimed_at, last_heartbeat_at, claimed_by, created_at + ) VALUES ( + $1, $2, $3, $4, 'auto', + ${claimInterval}, ${hbInterval}, 'test-worker', current_timestamp + ) + RETURNING id`, + [ORG_ID, runType, opts.feedId ?? null, opts.status], + )) as unknown as Array<{ id: number | string }>; + return Number(rows[0].id); +} + +async function statusOf(runId: number): Promise { + const sql = getDb(); + const rows = (await sql`SELECT status FROM runs WHERE id = ${runId}`) as unknown as Array<{ + status: string; + }>; + return rows[0]?.status ?? 'missing'; +} + +describe('reapStaleRuns — connector lanes', () => { + test('only the stale in-progress connector run is timed out', async () => { + // 1. Fresh heartbeat — should be left alone. + const freshId = await seedRun({ + status: 'running', + lastHeartbeatAgoSeconds: 5, + claimedAtAgoSeconds: 120, + }); + // 2. Stale heartbeat — should be reaped. + const staleId = await seedRun({ + status: 'running', + lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + }); + // 3. Terminal state (completed) — must never be touched even if it had a + // stale heartbeat at the moment it completed. + const terminalId = await seedRun({ + status: 'completed', + lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 10, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 10, + }); + + const result = await reapStaleRuns(); + + expect(result.acquired).toBe(true); + expect(result.reaped).toBe(1); + + expect(await statusOf(freshId)).toBe('running'); + expect(await statusOf(staleId)).toBe('timeout'); + expect(await statusOf(terminalId)).toBe('completed'); + + const sql = getDb(); + const reaped = (await sql` + SELECT error_message FROM runs WHERE id = ${staleId} + `) as unknown as Array<{ error_message: string | null }>; + expect(reaped[0].error_message).toBe('worker_heartbeat_lost'); + }); + + test('claimed rows that never sent any heartbeat are reaped via claimed_at', async () => { + const id = await seedRun({ + status: 'claimed', + lastHeartbeatAgoSeconds: null, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + }); + const result = await reapStaleRuns(); + expect(result.reaped).toBe(1); + expect(await statusOf(id)).toBe('timeout'); + }); + + test('watcher lane is excluded from this reaper', async () => { + // Watcher runs have their own dedicated 2h sweep in watchers/automation.ts. + const watcherId = await seedRun({ + status: 'running', + lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 10, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 10, + runType: 'watcher', + }); + const result = await reapStaleRuns(); + expect(result.reaped).toBe(0); + expect(await statusOf(watcherId)).toBe('running'); + }); + + test('back-to-back calls do not double-fail the same row', async () => { + // The advisory-lock guards cross-pod contention. Under PGlite the + // single-connection pool serializes everything, so we can't simulate + // two pods literally racing the SELECT-then-UPDATE. What we CAN prove + // here is the function-level invariant the lock enforces: a row that's + // already been reaped doesn't get reaped a second time even if the + // sweeper fires again. + const staleId = await seedRun({ + status: 'running', + lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + }); + + const first = await reapStaleRuns(); + expect(first.acquired).toBe(true); + expect(first.reaped).toBe(1); + expect(await statusOf(staleId)).toBe('timeout'); + + // Second pass — same lock acquired, but the row is now `timeout` so the + // WHERE clause excludes it. No double-fail, no parallel retry inserted. + const second = await reapStaleRuns(); + expect(second.acquired).toBe(true); + expect(second.reaped).toBe(0); + expect(second.retriesCreated).toBe(0); + expect(await statusOf(staleId)).toBe('timeout'); + }); + + test('action and auth lanes are reaped (parity with sync/embed_backfill)', async () => { + // These lanes were missing from the legacy checkStalledExecutions sweep + // — only `sync` + `embed_backfill` were covered. The new reaper covers + // all four connector lanes uniformly. + const actionId = await seedRun({ + status: 'running', + lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + runType: 'action', + }); + const authId = await seedRun({ + status: 'running', + lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3, + runType: 'auth', + }); + + const result = await reapStaleRuns(); + expect(result.reaped).toBe(2); + expect(await statusOf(actionId)).toBe('timeout'); + expect(await statusOf(authId)).toBe('timeout'); + }); +}); diff --git a/packages/server/src/scheduled/check-stalled-executions.ts b/packages/server/src/scheduled/check-stalled-executions.ts index 2a1713024..1873aeed4 100644 --- a/packages/server/src/scheduled/check-stalled-executions.ts +++ b/packages/server/src/scheduled/check-stalled-executions.ts @@ -1,135 +1,252 @@ /** - * Scheduled Job: Check Stalled Runs + * Stale-run reaper for the connector lanes. * - * Runs every 5 minutes to detect stalled sync/embed-backfill runs driven by the - * out-of-process connector-worker daemon. Watcher runs are driven in-process by - * the embedded Lobu gateway; their lifecycle is handled by WatcherRunTracker - * and startup reconciliation (see automation.ts and lobu/gateway.ts), with a - * coarse backstop in sweepStaleWatcherRuns below. + * `reapStaleRuns()` marks runs as `failed` (or `timeout`, see below) when they + * are stuck in an in-progress state (`claimed`/`running`) with a + * `last_heartbeat_at` older than the configured threshold. Connector workers + * heartbeat every 30s via `/api/workers/heartbeat`; a missed heartbeat means + * the worker crashed, was OOM-killed, or was scaled down mid-run. Without the + * reaper those rows sit "running" forever and the feed never gets a retry. + * + * Scope: + * - `sync`, `action`, `embed_backfill`, `auth` — driven by the out-of-process + * connector-worker daemon. These are reaped here. + * - `watcher` — driven in-process by the embedded gateway. Lifecycle is + * handled by WatcherRunTracker + the dedicated `sweepStaleWatcherRuns` / + * `resetOrphanedWatcherRuns` helpers in watchers/automation.ts. + * - lobu-queue lanes (`chat_message`, `schedule`, `agent_run`, `internal`, + * `task`) — claimed by RunsQueue with its own per-claim heartbeat on + * `claimed_at` and own 5-min stale sweep. Not touched here. + * + * Multi-pod safety: wrapped in `pg_try_advisory_lock`. A second gateway pod + * (or the legacy `check-stalled-executions` cron tick) trying to reap + * concurrently no-ops instead of double-failing rows. + * + * The legacy `checkStalledExecutions(env)` entry point is preserved and now + * delegates to `reapStaleRuns()` so the existing 5-minute TaskScheduler cron + * still works — the 30s `setInterval` registered in the gateway boot path is + * the primary cadence. */ +import type { ReservedSql } from 'postgres'; import { getDb } from '../db/client'; import type { Env } from '../index'; import { expireStaleConnectTokens } from '../utils/connect-tokens'; import logger from '../utils/logger'; import { isUniqueViolation } from '../utils/pg-errors'; -import { - EXECUTING_RUN_STATUSES, - isExecutingRunStatus, - runStatusLiteral, -} from '../utils/run-statuses'; import { reconcileWatcherRuns, sweepStaleWatcherRuns } from '../watchers/automation'; -export async function checkStalledExecutions(_env: Env) { +/** Advisory-lock key for cross-pod coordination of the stale-run reaper. + * Picked from the >2^31 range to avoid collisions with the queue-NOTIFY + * channel ids and the due-feeds lock; the high bits are arbitrary. */ +const REAPER_ADVISORY_LOCK_KEY = 0x726e7372; // 'rnsr' — runs-reaper + +/** Default stale threshold in seconds; override via RUNS_REAPER_STALE_AFTER_SECONDS. + * 120s leaves room for the 30s worker heartbeat to miss ~3 ticks before + * the reaper writes the row off — a real worker stutter (GC pause, network + * blip) gets a grace window, but a crashed worker frees the feed within + * a couple of minutes instead of five. */ +const DEFAULT_STALE_AFTER_SECONDS = 120; + +function staleAfterSeconds(): number { + const raw = Number(process.env.RUNS_REAPER_STALE_AFTER_SECONDS); + return Number.isFinite(raw) && raw > 0 ? raw : DEFAULT_STALE_AFTER_SECONDS; +} + +export interface ReapStaleRunsResult { + /** Whether the advisory lock was acquired. False means another pod is + * already running the sweep; the caller should treat this as a no-op. */ + acquired: boolean; + /** Rows transitioned to a terminal state (failed/timeout) this tick. */ + reaped: number; + /** Retry rows inserted for stalled `sync` runs (one per stalled feed). */ + retriesCreated: number; +} + +/** + * One pass of the stale-run reaper. Idempotent + cheap (single advisory-lock + * SELECT plus one indexed UPDATE), safe to call on a 30s setInterval. + */ +export async function reapStaleRuns(): Promise { const sql = getDb(); + const thresholdSeconds = staleAfterSeconds(); - await reconcileWatcherRuns(sql); - await sweepStaleWatcherRuns(sql); + // pg_try_advisory_lock is session-scoped — the connection holds the lock + // until we explicitly release. With postgres.js any random pool connection + // could serve the lock SELECT and the unlock; we wrap in a single + // .reserve() so both run on the same physical connection. DbClient doesn't + // type `reserve()` (it's only on the raw postgres.js surface), so we cast + // through `unknown` to the postgres.js ReservedSql shape. + const reserved = (await ( + sql as unknown as { reserve: () => Promise } + ).reserve()) as ReservedSql; + try { + const lockRows = (await reserved` + SELECT pg_try_advisory_lock(${REAPER_ADVISORY_LOCK_KEY}) AS acquired + `) as unknown as Array<{ acquired: boolean }>; + const acquired = !!lockRows[0]?.acquired; + if (!acquired) { + return { acquired: false, reaped: 0, retriesCreated: 0 }; + } - // Find stalled sync/embed_backfill runs (driven by out-of-process workers). - // Watcher runs are excluded — they run in-process and use tracker-based lifecycle. - const timedOut = await sql` - SELECT id, feed_id, connection_id, run_type, claimed_by, last_heartbeat_at, claimed_at, - organization_id, connector_key, connector_version, watcher_id, approved_input - FROM runs - WHERE run_type IN ('sync', 'embed_backfill') - AND status = ANY(${runStatusLiteral(EXECUTING_RUN_STATUSES)}::text[]) - AND ( - (last_heartbeat_at IS NULL AND COALESCE(claimed_at, created_at) < current_timestamp - INTERVAL '5 minutes') - OR - (last_heartbeat_at < current_timestamp - INTERVAL '5 minutes') - ) - `; - - if (timedOut.length > 0) { - logger.warn(`[StalledRuns] Detected ${timedOut.length} stalled runs`); - - for (const run of timedOut) { - const errorMessage = - run.last_heartbeat_at == null - ? 'Worker claimed run but never sent heartbeat (5+ minutes)' - : `Worker heartbeat stopped (last: ${String(run.last_heartbeat_at)})`; - - logger.warn( - `[StalledRuns] Run ${run.id} (feed ${run.feed_id}, worker ${run.claimed_by}): ${errorMessage}` - ); + try { + const errorMessage = 'worker_heartbeat_lost'; + // Reap in a single UPDATE so concurrent SELECT-then-UPDATE races inside + // the same pod cannot double-fail a row. The advisory lock makes the + // cross-pod race impossible too, but belt-and-braces. + const reaped = (await reserved` + UPDATE public.runs + SET status = 'timeout', + completed_at = current_timestamp, + error_message = ${errorMessage} + WHERE run_type IN ('sync', 'action', 'embed_backfill', 'auth') + AND status IN ('claimed', 'running') + AND ( + (last_heartbeat_at IS NULL + AND COALESCE(claimed_at, created_at) + < current_timestamp - (${thresholdSeconds}::int * interval '1 second')) + OR + (last_heartbeat_at IS NOT NULL + AND last_heartbeat_at + < current_timestamp - (${thresholdSeconds}::int * interval '1 second')) + ) + RETURNING id, run_type, feed_id, connection_id, connector_key, connector_version, organization_id + `) as unknown as Array<{ + id: number | string; + run_type: string; + feed_id: number | null; + connection_id: number | null; + connector_key: string | null; + connector_version: string | null; + organization_id: string | null; + }>; - try { - // Wrap timeout + retry in a transaction so a crash between them - // cannot leave the run in 'timeout' without a retry being created. - await sql.begin(async (tx) => { - // Re-check status inside the transaction to guard against concurrent updates - const current = await tx` - SELECT status FROM runs WHERE id = ${run.id} FOR UPDATE - `; - if (current.length === 0 || !isExecutingRunStatus(current[0].status)) { - return; // Already handled by another process - } - - await tx` - UPDATE runs - SET status = 'timeout', - completed_at = current_timestamp, - error_message = ${errorMessage} - WHERE id = ${run.id} - `; - - // Create retry run for sync runs - if (run.run_type === 'sync' && run.feed_id) { - try { - await tx` - INSERT INTO runs ( - organization_id, run_type, feed_id, connection_id, - connector_key, connector_version, status, approval_status, created_at - ) VALUES ( - ${run.organization_id}, 'sync', ${run.feed_id}, ${run.connection_id}, - ${run.connector_key}, ${run.connector_version}, 'pending', 'auto', current_timestamp - ) - `; - logger.info(`[StalledRuns] Created retry run for feed ${run.feed_id}`); - } catch (retryError) { - if (isUniqueViolation(retryError, 'idx_runs_active_sync_per_feed')) { - logger.info( - `[StalledRuns] Skipped retry for feed ${run.feed_id} - another active sync run exists` - ); - } else { - throw retryError; - } - } - } - }); - } catch (txError) { - logger.error({ error: txError, runId: run.id }, '[StalledRuns] Failed to timeout run'); - } + if (reaped.length === 0) { + return { acquired: true, reaped: 0, retriesCreated: 0 }; + } - // Retry embed_backfill runs (scheduler will pick up remaining events next tick) - if (run.run_type === 'embed_backfill') { - logger.info(`[StalledRuns] Embed backfill run ${run.id} timed out, scheduler will retry`); + logger.warn( + { reaped: reaped.length, thresholdSeconds }, + '[reaper] Marked stale connector runs as timeout (worker_heartbeat_lost)' + ); + + // Re-queue stalled `sync` runs so the feed picks itself back up on the + // next worker poll. Same retry semantics as the legacy + // checkStalledExecutions path. Unique-violation on the partial + // `idx_runs_active_sync_per_feed` index is benign — it means another + // active sync row already exists for this feed. + let retriesCreated = 0; + for (const row of reaped) { + if (row.run_type !== 'sync' || !row.feed_id) continue; + try { + await reserved` + INSERT INTO runs ( + organization_id, run_type, feed_id, connection_id, + connector_key, connector_version, status, approval_status, created_at + ) VALUES ( + ${row.organization_id}, 'sync', ${row.feed_id}, ${row.connection_id}, + ${row.connector_key}, ${row.connector_version}, 'pending', 'auto', current_timestamp + ) + `; + retriesCreated += 1; + } catch (err) { + if (isUniqueViolation(err, 'idx_runs_active_sync_per_feed')) { + logger.info( + { feedId: row.feed_id }, + '[reaper] Skipped sync retry — another active sync run exists' + ); + } else { + logger.error({ err, runId: row.id }, '[reaper] Failed to insert sync retry'); + } } } + + return { acquired: true, reaped: reaped.length, retriesCreated }; + } finally { + await reserved`SELECT pg_advisory_unlock(${REAPER_ADVISORY_LOCK_KEY})`; } + } finally { + reserved.release(); + } +} - // Expire stale connect tokens and revoke associated pending_auth connections +/** How often the gateway-boot setInterval calls `reapStaleRuns`. */ +const REAP_INTERVAL_MS = 30_000; + +/** + * Start the 30s reaper interval. Returns a teardown function — call it from + * the gateway's shutdown path so the interval doesn't keep the process alive. + * Repeat invocations are a no-op; one interval per process. + */ +let activeInterval: ReturnType | null = null; + +export function startStaleRunReaper(): () => void { + if (activeInterval) { + return () => stopStaleRunReaper(); + } + const tick = async () => { try { - const expiredCount = await expireStaleConnectTokens(); - if (expiredCount > 0) { - logger.info(`[StalledRuns] Expired ${expiredCount} stale connect tokens`); - } - } catch (connectTokenError) { - logger.error({ error: connectTokenError }, '[StalledRuns] Error expiring connect tokens'); + await reapStaleRuns(); + } catch (err) { + logger.warn({ err }, '[reaper] tick failed'); } + }; + // Fire once on boot so a crash-recovered gateway clears the queue without + // waiting a full interval. + void tick(); + activeInterval = setInterval(tick, REAP_INTERVAL_MS); + if (typeof activeInterval.unref === 'function') { + activeInterval.unref(); + } + return stopStaleRunReaper; +} + +export function stopStaleRunReaper(): void { + if (activeInterval) { + clearInterval(activeInterval); + activeInterval = null; + } +} + +/** + * Legacy entry point used by the 5-minute `check-stalled-executions` + * TaskScheduler cron. Delegates to `reapStaleRuns` and keeps the surrounding + * housekeeping (watcher reconcile + stale watcher sweep + connect-token + * expiry + 30-day retention) that the cron has owned all along. The 30s + * setInterval handles the hot path; the cron is the periodic backstop for + * the housekeeping that doesn't justify a separate interval. + * + * Returns the legacy "stalled count" only so existing log lines / metrics + * downstream of the cron keep their shape. + */ +export async function checkStalledExecutions(_env: Env): Promise { + const sql = getDb(); + + await reconcileWatcherRuns(sql); + await sweepStaleWatcherRuns(sql); + + await reapStaleRuns(); + + try { + const expiredCount = await expireStaleConnectTokens(); + if (expiredCount > 0) { + logger.info(`[StalledRuns] Expired ${expiredCount} stale connect tokens`); + } + } catch (connectTokenError) { + logger.error({ error: connectTokenError }, '[StalledRuns] Error expiring connect tokens'); + } - // Clean up old completed runs (keep last 30 days). - // Delete in bounded batches to avoid long-held locks. - const deleted = await sql` - DELETE FROM runs - WHERE id IN ( - SELECT id FROM runs - WHERE status IN ('completed', 'failed', 'timeout', 'cancelled') - AND completed_at < current_timestamp - INTERVAL '30 days' - LIMIT 1000 - ) - `; + // Clean up old completed runs (keep last 30 days). Delete in bounded + // batches to avoid long-held locks. + const deleted = await sql` + DELETE FROM runs + WHERE id IN ( + SELECT id FROM runs + WHERE status IN ('completed', 'failed', 'timeout', 'cancelled') + AND completed_at < current_timestamp - INTERVAL '30 days' + LIMIT 1000 + ) + `; if (deleted.count > 0) { logger.info(`[StalledRuns] Cleaned up ${deleted.count} old runs (> 30 days)`); } diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index f86bfe626..afaf9af23 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -38,6 +38,7 @@ import { initLobuGateway, stopLobuGateway, } from './lobu/gateway'; +import { startStaleRunReaper } from './scheduled/check-stalled-executions'; import { bootTaskScheduler } from './scheduled/jobs'; import * as Sentry from '@sentry/node'; import { assertExternalDepsResolvable } from '../../connector-worker/src/runtime-deps'; @@ -209,6 +210,12 @@ async function main() { // Cross-pod coordination is the runs-queue claim path. const taskScheduler = await bootTaskScheduler(getLobuCoreServices(), env); + // 30s interval that reaps connector runs whose worker missed heartbeat past + // RUNS_REAPER_STALE_AFTER_SECONDS. Cross-pod coordinated via advisory lock. + // The TaskScheduler cron also calls reapStaleRuns() every 5min as a + // backstop — the lock keeps the two cadences from double-failing rows. + const stopReaper = startStaleRunReaper(); + const port = parseInt(process.env.PORT || '8787', 10); const host = process.env.HOST?.trim() || '0.0.0.0'; @@ -232,6 +239,7 @@ async function main() { const shutdown = async (signal: string) => { logger.info({ signal }, 'Received shutdown signal, stopping gracefully...'); await vite?.close(); + stopReaper(); taskScheduler.stop(); await stopLobuGateway(); await closeDbSingleton(); diff --git a/packages/server/src/start-local.ts b/packages/server/src/start-local.ts index 0045709f4..70d704038 100644 --- a/packages/server/src/start-local.ts +++ b/packages/server/src/start-local.ts @@ -168,6 +168,12 @@ async function main() { const taskScheduler = await bootTaskScheduler(getLobuCoreServices(), env); const stopScheduler = () => taskScheduler.stop(); + // 30s connector-run heartbeat-lost reaper (see check-stalled-executions.ts). + // Same module used by the production server entrypoint; advisory lock makes + // it safe to also have the 5min TaskScheduler cron firing the same sweep. + const { startStaleRunReaper } = await import('./scheduled/check-stalled-executions'); + const stopReaper = startStaleRunReaper(); + const wrapper = new Hono<{ Bindings: Env }>(); wrapper.use('*', async (c, next) => { // Stash the peer TCP remote-address so handlers that need to enforce @@ -202,6 +208,7 @@ async function main() { const shutdown = async (signal: string) => { logger.info({ signal }, 'Shutting down'); + stopReaper(); stopScheduler(); await vite?.close(); httpServer.close(); diff --git a/packages/server/src/worker-api.ts b/packages/server/src/worker-api.ts index 884de78c8..9a52147e8 100644 --- a/packages/server/src/worker-api.ts +++ b/packages/server/src/worker-api.ts @@ -451,6 +451,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { UPDATE runs r SET status = 'running', claimed_at = current_timestamp, + last_heartbeat_at = current_timestamp, claimed_by = ${worker_id} FROM next_run nr WHERE r.id = nr.id