Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions db/migrations/20260518020000_runs_heartbeat_inflight_narrow.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- migrate:up

-- Narrow the partial index `idx_runs_heartbeat_inflight` to the lanes that
-- actually emit `client.heartbeat()` from the connector-worker executor.
-- The previous index (20260518010000) covered all four connector lanes
-- (sync, action, embed_backfill, auth), but only `sync` and `auth` runs call
-- heartbeat() in packages/connector-worker/src/daemon/executor.ts. Action
-- runs (executeActionRun) and embed_backfill runs (executeEmbedBackfillRun)
-- never heartbeat, so the reaper's heartbeat-based WHERE clause would mark
-- any action/embed_backfill run lasting longer than the stale threshold
-- (default 120s) as `timeout` while it is still executing.
--
-- Drop and recreate to also align the index with the reaper query in
-- packages/server/src/scheduled/check-stalled-executions.ts, which is
-- updated to filter on the narrower lane set in the same change.
--
-- Follow-ups (tracked as separate issues):
-- 1. Wire `client.heartbeat()` into executeActionRun + executeEmbedBackfillRun
-- so those lanes can be safely reaped.
-- 2. Wire heartbeat into the Chrome/Owletto browser-worker run path.

DROP INDEX IF EXISTS public.idx_runs_heartbeat_inflight;

CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight
ON public.runs (last_heartbeat_at)
WHERE status IN ('claimed', 'running')
AND run_type IN ('sync', 'auth');

-- migrate:down

DROP INDEX IF EXISTS public.idx_runs_heartbeat_inflight;

CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight
ON public.runs (last_heartbeat_at)
WHERE status IN ('claimed', 'running')
AND run_type IN ('sync', 'action', 'embed_backfill', 'auth');
5 changes: 3 additions & 2 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3704,7 +3704,7 @@ CREATE INDEX idx_runs_feed ON public.runs USING btree (feed_id);
-- Name: idx_runs_heartbeat_inflight; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_runs_heartbeat_inflight ON public.runs USING btree (last_heartbeat_at) WHERE ((status = ANY (ARRAY['claimed'::text, 'running'::text])) AND (run_type = ANY (ARRAY['sync'::text, 'action'::text, 'embed_backfill'::text, 'auth'::text])));
CREATE INDEX idx_runs_heartbeat_inflight ON public.runs USING btree (last_heartbeat_at) WHERE ((status = ANY (ARRAY['claimed'::text, 'running'::text])) AND (run_type = ANY (ARRAY['sync'::text, 'auth'::text])));

--
-- Name: idx_runs_org; Type: INDEX; Schema: public; Owner: -
Expand Down Expand Up @@ -5078,4 +5078,5 @@ INSERT INTO public.schema_migrations (version) VALUES
('20260517150000'),
('20260517160000'),
('20260518000000'),
('20260518010000');
('20260518010000'),
('20260518020000');
18 changes: 18 additions & 0 deletions packages/server/src/db/embedded-schema-patches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,22 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [
`);
},
},
{
id: 'runs-heartbeat-inflight-narrow',
apply: async (sql) => {
// Mirrors db/migrations/20260518020000_runs_heartbeat_inflight_narrow.sql.
// Narrows the partial index + reaper WHERE clause to the lanes that
// actually heartbeat today (sync + auth). The previous wide set
// (sync, action, embed_backfill, auth) caused in-flight action and
// embed_backfill runs to be marked `timeout` after 120s because their
// executors never call client.heartbeat().
await sql.unsafe(`DROP INDEX IF EXISTS public.idx_runs_heartbeat_inflight`);
await sql.unsafe(`
CREATE INDEX IF NOT EXISTS idx_runs_heartbeat_inflight
ON public.runs (last_heartbeat_at)
WHERE status IN ('claimed', 'running')
AND run_type IN ('sync', 'auth')
`);
},
},
];
35 changes: 26 additions & 9 deletions packages/server/src/scheduled/__tests__/stale-run-reaper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,43 @@ describe('reapStaleRuns — connector lanes', () => {
expect(await statusOf(staleId)).toBe('timeout');
});

test('action and auth lanes are reaped (parity with sync/embed_backfill)', async () => {
// These lanes were missing from the legacy checkStalledExecutions sweep
// — only `sync` + `embed_backfill` were covered. The new reaper covers
// all four connector lanes uniformly.
test('auth lane is reaped (parity with sync)', async () => {
// `auth` heartbeats from executeAuthRun in the connector-worker daemon, so
// staleness on `last_heartbeat_at` is a real failure signal there too.
const authId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
runType: 'auth',
});

const result = await reapStaleRuns();
expect(result.reaped).toBe(1);
expect(await statusOf(authId)).toBe('timeout');
});

test('action and embed_backfill lanes are NOT reaped (they do not heartbeat today)', async () => {
// executeActionRun and executeEmbedBackfillRun in
// packages/connector-worker/src/daemon/executor.ts never call
// client.heartbeat(), so reaping them on `last_heartbeat_at` would kill
// in-flight runs after the stale threshold elapses. Until those lanes
// emit heartbeats, the reaper must leave them alone.
const actionId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
runType: 'action',
});
const authId = await seedRun({
const embedId = await seedRun({
status: 'running',
lastHeartbeatAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
claimedAtAgoSeconds: STALE_THRESHOLD_SECONDS * 3,
runType: 'auth',
runType: 'embed_backfill',
});

const result = await reapStaleRuns();
expect(result.reaped).toBe(2);
expect(await statusOf(actionId)).toBe('timeout');
expect(await statusOf(authId)).toBe('timeout');
expect(result.reaped).toBe(0);
expect(await statusOf(actionId)).toBe('running');
expect(await statusOf(embedId)).toBe('running');
});
});
13 changes: 10 additions & 3 deletions packages/server/src/scheduled/check-stalled-executions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
* reaper those rows sit "running" forever and the feed never gets a retry.
*
* Scope:
* - `sync`, `action`, `embed_backfill`, `auth` — driven by the out-of-process
* connector-worker daemon. These are reaped here.
* - `sync`, `auth` — driven by the out-of-process connector-worker daemon
* and emit `client.heartbeat()` from their executors. These are the only
* lanes safe to reap on a heartbeat-staleness basis today.
* - `action`, `embed_backfill` — also connector-worker lanes, but their
* executors (`executeActionRun`, `executeEmbedBackfillRun` in
* packages/connector-worker/src/daemon/executor.ts) do NOT heartbeat.
* Reaping them on `last_heartbeat_at` would kill in-flight runs the
* moment they exceed the stale threshold. Tracked as a follow-up — once
* those lanes heartbeat, fold them back into the WHERE clause + index.
* - `watcher` — driven in-process by the embedded gateway. Lifecycle is
* handled by WatcherRunTracker + the dedicated `sweepStaleWatcherRuns` /
* `resetOrphanedWatcherRuns` helpers in watchers/automation.ts.
Expand Down Expand Up @@ -99,7 +106,7 @@ export async function reapStaleRuns(): Promise<ReapStaleRunsResult> {
SET status = 'timeout',
completed_at = current_timestamp,
error_message = ${errorMessage}
WHERE run_type IN ('sync', 'action', 'embed_backfill', 'auth')
WHERE run_type IN ('sync', 'auth')
AND status IN ('claimed', 'running')
AND (
(last_heartbeat_at IS NULL
Expand Down
Loading