Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions db/migrations/20260518010000_runs_heartbeat_reaper_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- migrate:up

-- Partial index supporting the connector-lane stale-run reaper. The sweeper
-- query in reapStaleRuns() (packages/server/src/scheduled/check-stalled-executions.ts)
-- filters runs in the in-progress states (`claimed`, `running`) whose
-- `last_heartbeat_at` is older than the configured threshold. Without this
-- index every reaper tick does a full scan of `runs`.
--
-- Restricted to the connector lanes (sync, action, embed_backfill, auth). The
-- lobu-queue lanes (chat_message, schedule, agent_run, internal, task) have
-- their own per-claim sweep inside RunsQueue keyed on `claimed_at`, not
-- `last_heartbeat_at`. The `watcher` lane has a dedicated 2h-TTL sweep in
-- watchers/automation.ts.

CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight
ON public.runs (last_heartbeat_at)
WHERE status IN ('claimed', 'running')
AND run_type IN ('sync', 'action', 'embed_backfill', 'auth');

-- migrate:down

DROP INDEX IF EXISTS public.idx_runs_heartbeat_inflight;
9 changes: 8 additions & 1 deletion db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3700,6 +3700,12 @@ CREATE UNIQUE INDEX idx_runs_dispatched_message_id ON public.runs USING btree (d

CREATE INDEX idx_runs_feed ON public.runs USING btree (feed_id);

--
-- Name: idx_runs_heartbeat_inflight; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_runs_heartbeat_inflight ON public.runs USING btree (last_heartbeat_at) WHERE ((status = ANY (ARRAY['claimed'::text, 'running'::text])) AND (run_type = ANY (ARRAY['sync'::text, 'action'::text, 'embed_backfill'::text, 'auth'::text])));

--
-- Name: idx_runs_org; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -5071,4 +5077,5 @@ INSERT INTO public.schema_migrations (version) VALUES
('20260517060000'),
('20260517150000'),
('20260517160000'),
('20260518000000');
('20260518000000'),
('20260518010000');
16 changes: 16 additions & 0 deletions packages/server/src/db/embedded-schema-patches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -846,4 +846,20 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [
`);
},
},
{
id: 'runs-heartbeat-reaper-index',
apply: async (sql) => {
// Mirrors db/migrations/20260518010000_runs_heartbeat_reaper_index.sql.
// Supports the connector-lane stale-run reaper in
// scheduled/check-stalled-executions.ts. Restricted to the connector
// lanes; the lobu-queue lanes have their own sweep in RunsQueue and
// the watcher lane is handled by watchers/automation.ts.
await sql.unsafe(`
CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight
ON public.runs (last_heartbeat_at)
WHERE status IN ('claimed', 'running')
AND run_type IN ('sync', 'action', 'embed_backfill', 'auth')
`);
},
},
];
191 changes: 191 additions & 0 deletions packages/server/src/scheduled/__tests__/stale-run-reaper.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/**
* Integration test for the connector-lane stale-run reaper. Seeds three
* connector runs into PGlite and asserts the reaper only fails the one that
* is in-progress with a stale `last_heartbeat_at`. Also exercises the
* advisory-lock contention path: a second concurrent caller while the lock
* is held no-ops instead of double-failing the row.
*/

import { afterAll, beforeAll, beforeEach, describe, expect, test } from 'bun:test';
import { getDb } from '../../db/client';
import {
ensurePgliteForGatewayTests,
resetTestDatabase,
} from '../../gateway/__tests__/helpers/db-setup';
import { reapStaleRuns } from '../check-stalled-executions';

const ORG_ID = 'reaper-org';
const STALE_THRESHOLD_SECONDS = 60;

beforeAll(async () => {
await ensurePgliteForGatewayTests();
process.env.RUNS_REAPER_STALE_AFTER_SECONDS = String(STALE_THRESHOLD_SECONDS);
});

afterAll(() => {
delete process.env.RUNS_REAPER_STALE_AFTER_SECONDS;
});

beforeEach(async () => {
await resetTestDatabase();
const sql = getDb();
await sql`
INSERT INTO organization (id, name, slug)
VALUES (${ORG_ID}, ${ORG_ID}, ${ORG_ID})
ON CONFLICT (id) DO NOTHING
`;
});

interface SeedRunOpts {
status: 'pending' | 'claimed' | 'running' | 'completed';
lastHeartbeatAgoSeconds: number | null;
claimedAtAgoSeconds?: number | null;
runType?: 'sync' | 'action' | 'embed_backfill' | 'auth' | 'watcher';
feedId?: number | null;
}

async function seedRun(opts: SeedRunOpts): Promise<number> {
const sql = getDb();
const runType = opts.runType ?? 'sync';
const hbInterval =
opts.lastHeartbeatAgoSeconds !== null
? `current_timestamp - interval '${opts.lastHeartbeatAgoSeconds} seconds'`
: 'NULL';
const claimInterval =
opts.claimedAtAgoSeconds !== null && opts.claimedAtAgoSeconds !== undefined
? `current_timestamp - interval '${opts.claimedAtAgoSeconds} seconds'`
: 'NULL';
const rows = (await sql.unsafe(
`INSERT INTO runs (
organization_id, run_type, feed_id, status, approval_status,
claimed_at, last_heartbeat_at, claimed_by, created_at
) VALUES (
$1, $2, $3, $4, 'auto',
${claimInterval}, ${hbInterval}, 'test-worker', current_timestamp
)
RETURNING id`,
[ORG_ID, runType, opts.feedId ?? null, opts.status],
)) as unknown as Array<{ id: number | string }>;
return Number(rows[0].id);
}

async function statusOf(runId: number): Promise<string> {
const sql = getDb();
const rows = (await sql`SELECT status FROM runs WHERE id = ${runId}`) as unknown as Array<{
status: string;
}>;
return rows[0]?.status ?? 'missing';
}

describe('reapStaleRuns — connector lanes', () => {
test('only the stale in-progress connector run is timed out', async () => {
// 1. Fresh heartbeat — should be left alone.
const freshId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: 5,
claimedAtAgoSeconds: 120,
});
// 2. Stale heartbeat — should be reaped.
const staleId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
});
// 3. Terminal state (completed) — must never be touched even if it had a
// stale heartbeat at the moment it completed.
const terminalId = await seedRun({
status: 'completed',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 10,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 10,
});

const result = await reapStaleRuns();

expect(result.acquired).toBe(true);
expect(result.reaped).toBe(1);

expect(await statusOf(freshId)).toBe('running');
expect(await statusOf(staleId)).toBe('timeout');
expect(await statusOf(terminalId)).toBe('completed');

const sql = getDb();
const reaped = (await sql`
SELECT error_message FROM runs WHERE id = ${staleId}
`) as unknown as Array<{ error_message: string | null }>;
expect(reaped[0].error_message).toBe('worker_heartbeat_lost');
});

test('claimed rows that never sent any heartbeat are reaped via claimed_at', async () => {
const id = await seedRun({
status: 'claimed',
lastHeartbeatAgoSeconds: null,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
});
const result = await reapStaleRuns();
expect(result.reaped).toBe(1);
expect(await statusOf(id)).toBe('timeout');
});

test('watcher lane is excluded from this reaper', async () => {
// Watcher runs have their own dedicated 2h sweep in watchers/automation.ts.
const watcherId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 10,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 10,
runType: 'watcher',
});
const result = await reapStaleRuns();
expect(result.reaped).toBe(0);
expect(await statusOf(watcherId)).toBe('running');
});

test('back-to-back calls do not double-fail the same row', async () => {
// The advisory-lock guards cross-pod contention. Under PGlite the
// single-connection pool serializes everything, so we can't simulate
// two pods literally racing the SELECT-then-UPDATE. What we CAN prove
// here is the function-level invariant the lock enforces: a row that's
// already been reaped doesn't get reaped a second time even if the
// sweeper fires again.
const staleId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
});

const first = await reapStaleRuns();
expect(first.acquired).toBe(true);
expect(first.reaped).toBe(1);
expect(await statusOf(staleId)).toBe('timeout');

// Second pass — same lock acquired, but the row is now `timeout` so the
// WHERE clause excludes it. No double-fail, no parallel retry inserted.
const second = await reapStaleRuns();
expect(second.acquired).toBe(true);
expect(second.reaped).toBe(0);
expect(second.retriesCreated).toBe(0);
expect(await statusOf(staleId)).toBe('timeout');
});

test('action and auth lanes are reaped (parity with sync/embed_backfill)', async () => {
// These lanes were missing from the legacy checkStalledExecutions sweep
// — only `sync` + `embed_backfill` were covered. The new reaper covers
// all four connector lanes uniformly.
const actionId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
runType: 'action',
});
const authId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
runType: 'auth',
});

const result = await reapStaleRuns();
expect(result.reaped).toBe(2);
expect(await statusOf(actionId)).toBe('timeout');
expect(await statusOf(authId)).toBe('timeout');
});
});
Loading
Loading