diff --git a/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts b/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts index 49ab4251a..8896c1df8 100644 --- a/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts +++ b/packages/server/src/__tests__/integration/watchers/automation-contract.test.ts @@ -18,10 +18,40 @@ import { dispatchPendingWatcherRuns, materializeDueWatcherRuns, } from '../../../watchers/automation'; +import { generateSecureToken, hashToken } from '../../../auth/oauth/utils'; import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; import { createTestAgent, createTestEntity, createTestEvent } from '../../setup/test-fixtures'; +import { post } from '../../setup/test-helpers'; import { TestApiClient, TestWorkspace } from '../../setup/test-mcp-client'; +/** + * Mint a PAT bound to a specific device worker_id and `device_worker:run` + * scope. Mirrors PersonalAccessTokenService.create but inlined so the test + * can pre-set the binding without going through the route. + */ +async function createWorkerBoundPat( + userId: string, + organizationId: string, + workerId: string, + scope = 'device_worker:run' +): Promise<{ token: string }> { + const sql = getTestDb(); + const token = `owl_pat_${generateSecureToken(24)}`; + const tokenHash = hashToken(token); + const tokenPrefix = token.substring(0, 12); + await sql` + INSERT INTO personal_access_tokens ( + token_hash, token_prefix, user_id, organization_id, name, scope, worker_id, + created_at, updated_at + ) VALUES ( + ${tokenHash}, ${tokenPrefix}, ${userId}, ${organizationId}, + ${`Test worker PAT (${workerId})`}, ${scope}, ${workerId}, + NOW(), NOW() + ) + `; + return { token }; +} + async function createAutomatedWatcher() { const sql = getTestDb(); const dbClient = sql as unknown as DbClient; @@ -366,4 +396,651 @@ describe('watcher automation contract', () => { expect(Number(window.content_analyzed)).toBe(1); expect(links.map((row) => Number(row.event_id))).toEqual([event.id]); }); + + // #798 — device-pinned watcher execution end-to-end: + // + // watcher.device_worker_id set + // → materializeDueWatcherRuns persists the pin into approved_input + // → server-side dispatcher refuses to claim (#802 covers this; checked + // above by the "skips watcher runs pinned to a device worker" test) + // → device posts to /api/workers/me/runs/:id/complete-watcher + // which writes the watcher_windows row + advances last_fired_at. + describe('device-pinned execution (#798)', () => { + it('persists watchers.device_worker_id and agent_kind into approved_input on materialization', async () => { + const { sql, watcherId } = await createAutomatedWatcher(); + + // Register a device worker to anchor the foreign key. + const [device] = await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label) + VALUES ('user-watcher-pin', 'device-pin-1', 'macos', ${sql.json({})}, 'My Mac') + RETURNING id + `; + const deviceWorkerId = String((device as { id: unknown }).id); + + await sql` + UPDATE watchers + SET device_worker_id = ${deviceWorkerId}::uuid, + agent_kind = 'claude-code' + WHERE id = ${watcherId} + `; + + const result = await materializeDueWatcherRuns({} as Env); + expect(result.runsCreated).toBe(1); + + const [run] = await sql` + SELECT approved_input + FROM runs + WHERE watcher_id = ${watcherId} + AND run_type = 'watcher' + `; + const payload = run.approved_input as Record; + expect(payload.device_worker_id).toBe(deviceWorkerId); + expect(payload.agent_kind).toBe('claude-code'); + }); + + it('complete-watcher endpoint records a completed run + window + last_fired_at', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '11111111-1111-1111-1111-111111111111', + agentKind: 'claude-code', + }); + + // Move the run into `running` claimed by a specific worker — the device + // path normally claims via /api/workers/poll; we shortcut here. + const workerId = 'mac-device-cli-test'; + await sql` + UPDATE runs + SET status = 'running', + claimed_at = NOW(), + claimed_by = ${workerId} + WHERE id = ${queued.runId} + `; + + const response = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + body: { + worker_id: workerId, + output: 'CLI says: looked at 5 events, no anomalies.', + duration_ms: 1234, + }, + } + ); + expect(response.status).toBe(200); + const json = (await response.json()) as { ok: boolean; status: string }; + expect(json.ok).toBe(true); + expect(json.status).toBe('completed'); + + const [run] = await sql` + SELECT status, completed_at, window_id, exit_reason + FROM runs + WHERE id = ${queued.runId} + `; + expect(String(run.status)).toBe('completed'); + expect(run.completed_at).not.toBeNull(); + expect(Number(run.window_id)).toBeGreaterThan(0); + + const [window] = await sql` + SELECT extracted_data, run_metadata, execution_time_ms, model_used, granularity + FROM watcher_windows + WHERE id = ${run.window_id} + `; + const extracted = window.extracted_data as Record; + expect(extracted.kind).toBe('device_cli_output'); + expect(extracted.output).toBe('CLI says: looked at 5 events, no anomalies.'); + expect(extracted.agent_kind).toBe('claude-code'); + expect(Number(window.execution_time_ms)).toBe(1234); + expect(String(window.model_used)).toBe('device-cli'); + expect(String(window.granularity)).toBe('ad_hoc'); + const metadata = window.run_metadata as Record; + expect(metadata.source).toBe('device_worker'); + expect(metadata.watcher_run_id).toBe(queued.runId); + + const [watcher] = await sql` + SELECT last_fired_at + FROM watchers + WHERE id = ${watcherId} + `; + expect(watcher.last_fired_at).not.toBeNull(); + }); + + it('complete-watcher endpoint marks the run failed when error is supplied', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '11111111-1111-1111-1111-111111111111', + agentKind: 'claude-code', + }); + + const workerId = 'mac-device-cli-fail'; + await sql` + UPDATE runs + SET status = 'running', + claimed_at = NOW(), + claimed_by = ${workerId} + WHERE id = ${queued.runId} + `; + + const response = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + body: { + worker_id: workerId, + error: 'claude binary not found', + duration_ms: 12, + exit_reason: 'crash', + exit_code: 127, + }, + } + ); + expect(response.status).toBe(200); + const json = (await response.json()) as { ok: boolean; status: string }; + expect(json.status).toBe('failed'); + + const [run] = await sql` + SELECT status, error_message, window_id, exit_code, exit_reason + FROM runs + WHERE id = ${queued.runId} + `; + expect(String(run.status)).toBe('failed'); + expect(String(run.error_message)).toBe('claude binary not found'); + // No watcher_windows row on failure. + expect(run.window_id).toBeNull(); + expect(Number(run.exit_code)).toBe(127); + expect(String(run.exit_reason)).toBe('crash'); + + const windows = await sql` + SELECT id FROM watcher_windows WHERE run_id = ${queued.runId} + `; + expect(windows).toHaveLength(0); + }); + + it('complete-watcher endpoint refuses non-watcher run types', async () => { + const sql = getTestDb(); + const { workspace } = await createAutomatedWatcher(); + + const [authRun] = await sql` + INSERT INTO runs (organization_id, run_type, approval_status, status, created_at) + VALUES (${workspace.org.id}, 'sync', 'auto', 'running', current_timestamp) + RETURNING id + `; + const runId = Number((authRun as { id: unknown }).id); + + const response = await post( + `/api/workers/me/runs/${runId}/complete-watcher`, + { + body: { worker_id: 'any', output: '', duration_ms: 1 }, + } + ); + expect(response.status).toBe(409); + const body = (await response.json()) as { error: string }; + expect(body.error).toMatch(/watcher/i); + }); + + it('complete-watcher endpoint returns 404 for an unknown run id', async () => { + const response = await post( + '/api/workers/me/runs/999999999/complete-watcher', + { + body: { worker_id: 'any', output: '', duration_ms: 1 }, + } + ); + expect(response.status).toBe(404); + }); + + // Pi review #1: schedule must advance on completion or the scheduler + // re-fires the watcher every tick forever. + it('advances watchers.next_run_at on successful completion', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + + const [before] = await sql` + SELECT next_run_at FROM watchers WHERE id = ${watcherId} + `; + const beforeNextRun = before.next_run_at as Date | string | null; + + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '22222222-2222-2222-2222-222222222222', + agentKind: 'claude-code', + }); + + const workerId = 'mac-device-advance-test'; + await sql` + UPDATE runs + SET status = 'running', claimed_at = NOW(), claimed_by = ${workerId} + WHERE id = ${queued.runId} + `; + + const response = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + body: { worker_id: workerId, output: 'ok', duration_ms: 5 }, + } + ); + expect(response.status).toBe(200); + + const [after] = await sql` + SELECT next_run_at FROM watchers WHERE id = ${watcherId} + `; + const afterNextRun = after.next_run_at as Date | string | null; + expect(afterNextRun).not.toBeNull(); + // The cron is `0 9 * * *` (daily 9am); the new tick must be strictly in + // the future relative to the pre-completion value (which was forced + // 10min in the past by createAutomatedWatcher). + const beforeMs = beforeNextRun ? new Date(beforeNextRun).getTime() : 0; + const afterMs = new Date(afterNextRun as string | Date).getTime(); + expect(afterMs).toBeGreaterThan(beforeMs); + // And strictly in the future relative to "now". + expect(afterMs).toBeGreaterThan(Date.now() - 1000); + }); + + // Pi review #3: a second concurrent completion must be idempotent — no + // duplicate watcher_windows row, no 500, status reflects the winner. + it('treats a double completion as idempotent (no duplicate window, no 500)', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '33333333-3333-3333-3333-333333333333', + agentKind: 'claude-code', + }); + + const workerId = 'mac-device-idem-test'; + await sql` + UPDATE runs + SET status = 'running', claimed_at = NOW(), claimed_by = ${workerId} + WHERE id = ${queued.runId} + `; + + // First completion lands. + const first = await post(`/api/workers/me/runs/${queued.runId}/complete-watcher`, { + body: { worker_id: workerId, output: 'first', duration_ms: 11 }, + }); + expect(first.status).toBe(200); + + // Second completion against the now-terminal row must NOT 500. + // `authorizeRunForWorker` will return 409 first (status no longer + // 'running'); the in-tx FOR UPDATE / idempotent branch is exercised by + // the truly-concurrent case (post-claim, pre-commit), which a single + // serialized test runner can't easily reproduce — but we DO assert + // that no extra watcher_windows row was created either way. + const second = await post(`/api/workers/me/runs/${queued.runId}/complete-watcher`, { + body: { worker_id: workerId, output: 'second', duration_ms: 12 }, + }); + expect([200, 409]).toContain(second.status); + + const windowsForRun = await sql` + SELECT id FROM watcher_windows WHERE run_id = ${queued.runId} + `; + expect(windowsForRun).toHaveLength(1); + + const [run] = await sql` + SELECT status FROM runs WHERE id = ${queued.runId} + `; + expect(String(run.status)).toBe('completed'); + }); + + // Pi review #5: malformed window bounds in approved_input must fail the + // run and advance the schedule, not leave it stuck in 'running'. + it('marks run failed (not stuck running) on malformed approved_input', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '44444444-4444-4444-4444-444444444444', + agentKind: 'claude-code', + }); + const workerId = 'mac-device-malformed-test'; + // Corrupt the approved_input window_start so completion validation + // rejects it. The run is still claimed/running. + await sql` + UPDATE runs + SET status = 'running', + claimed_at = NOW(), + claimed_by = ${workerId}, + approved_input = approved_input || ${sql.json({ window_start: 'not-a-date' })} + WHERE id = ${queued.runId} + `; + + const [before] = await sql`SELECT next_run_at FROM watchers WHERE id = ${watcherId}`; + const beforeNextRun = before.next_run_at as Date | string | null; + + const response = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + body: { worker_id: workerId, output: 'whatever', duration_ms: 1 }, + } + ); + expect(response.status).toBe(400); + + const [run] = await sql` + SELECT status, error_message FROM runs WHERE id = ${queued.runId} + `; + expect(String(run.status)).toBe('failed'); + expect(String(run.error_message)).toMatch(/window_start/); + + const [after] = await sql`SELECT next_run_at FROM watchers WHERE id = ${watcherId}`; + const afterNextRun = after.next_run_at as Date | string | null; + const beforeMs = beforeNextRun ? new Date(beforeNextRun).getTime() : 0; + const afterMs = afterNextRun ? new Date(afterNextRun).getTime() : 0; + expect(afterMs).toBeGreaterThan(beforeMs); + }); + + // Pi review round-2 #A: device spoof — a same-user token bound to worker + // A cannot complete a run pinned to worker B by lying in body.worker_id. + // Previously the binding check was `(user_id, body.worker_id)`, which a + // same-user attacker could satisfy by registering worker B and POSTing + // worker B's id. The fix anchors on the OAuth-token-bound workerId. + it('rejects device spoof — token bound to worker A cannot complete worker B run', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + + // Two registered device workers under the SAME user. + const ownerUserId = workspace.users.owner.id; + const [deviceA] = await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label) + VALUES (${ownerUserId}, 'worker-A', 'macos', ${sql.json({})}, 'Mac A') + RETURNING id + `; + const [deviceB] = await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label) + VALUES (${ownerUserId}, 'worker-B', 'macos', ${sql.json({})}, 'Mac B') + RETURNING id + `; + const deviceBId = String((deviceB as { id: unknown }).id); + // deviceA.id is referenced via the bound PAT — no further use here. + void deviceA; + + // Token bound to worker A. + const { token: patForA } = await createWorkerBoundPat( + ownerUserId, + workspace.org.id, + 'worker-A' + ); + + // Watcher run pinned to worker B (via approved_input.device_worker_id). + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: deviceBId, + agentKind: 'claude-code', + }); + // Claim the run as worker B so `authorizeRunForWorker` passes its + // claimed_by check when the body posts worker_id=worker-B. The new + // bound-workerId check (Fix A) is what should fire instead. + await sql` + UPDATE runs + SET status = 'running', claimed_at = NOW(), claimed_by = 'worker-B' + WHERE id = ${queued.runId} + `; + + const response = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + token: patForA, + body: { worker_id: 'worker-B', output: 'spoofed', duration_ms: 1 }, + } + ); + expect(response.status).toBe(403); + const body = (await response.json()) as { error: string }; + expect(body.error).toMatch(/worker_id_mismatch|Forbidden/); + + // Run must still be 'running' — nothing was completed. + const [run] = await sql` + SELECT status, window_id FROM runs WHERE id = ${queued.runId} + `; + expect(String(run.status)).toBe('running'); + expect(run.window_id).toBeNull(); + // No watcher_windows row was created. + const windows = await sql` + SELECT id FROM watcher_windows WHERE run_id = ${queued.runId} + `; + expect(windows).toHaveLength(0); + }); + + // Pi review round-2 #B: concurrent allocation race — two completions on + // DIFFERENT watcher runs must both succeed with distinct window ids. + // Pre-fix, both could compute the same MAX(id)+1 and the second INSERT + // would 500 on the watcher_windows PK conflict. The advisory lock inside + // getNextNumericId serializes them. + it('serializes concurrent watcher_windows allocations across different runs', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + + // A second watcher in the same org so the two runs touch different + // `runs.id` rows AND different `watchers.id` (the SELECT … FOR UPDATE + // in the completeWatcherRun tx is per-row, so cross-watcher concurrent + // completions only collide on the watcher_windows allocator). + const secondEntity = await createTestEntity({ + name: 'Second Entity', + organization_id: workspace.org.id, + created_by: workspace.users.owner.id, + }); + const secondWatcher = (await workspace.owner.watchers.create({ + entity_id: secondEntity.id, + slug: 'automation-watcher-2', + name: 'Automation Watcher 2', + prompt: 'Summarize content for {{entities}}.', + extraction_schema: { + type: 'object', + properties: { summary: { type: 'string' } }, + required: ['summary'], + }, + schedule: '0 9 * * *', + agent_id: agent.agentId, + })) as { watcher_id: string }; + const watcherId2 = Number(secondWatcher.watcher_id); + + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + const queuedA = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '55555555-5555-5555-5555-555555555555', + agentKind: 'claude-code', + }); + const queuedB = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId: watcherId2, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '66666666-6666-6666-6666-666666666666', + agentKind: 'claude-code', + }); + + const workerIdA = 'mac-device-concurrent-A'; + const workerIdB = 'mac-device-concurrent-B'; + await sql` + UPDATE runs + SET status = 'running', claimed_at = NOW(), claimed_by = ${workerIdA} + WHERE id = ${queuedA.runId} + `; + await sql` + UPDATE runs + SET status = 'running', claimed_at = NOW(), claimed_by = ${workerIdB} + WHERE id = ${queuedB.runId} + `; + + // Fire both completions concurrently. With the per-table advisory lock + // they serialize on the SELECT MAX(id) and INSERT, so both succeed. + const [respA, respB] = await Promise.all([ + post(`/api/workers/me/runs/${queuedA.runId}/complete-watcher`, { + body: { worker_id: workerIdA, output: 'A', duration_ms: 1 }, + }), + post(`/api/workers/me/runs/${queuedB.runId}/complete-watcher`, { + body: { worker_id: workerIdB, output: 'B', duration_ms: 1 }, + }), + ]); + expect(respA.status).toBe(200); + expect(respB.status).toBe(200); + + // Both runs completed. + const runs = await sql` + SELECT id, status, window_id + FROM runs + WHERE id IN (${queuedA.runId}, ${queuedB.runId}) + ORDER BY id + `; + expect(runs).toHaveLength(2); + for (const row of runs) { + expect(String(row.status)).toBe('completed'); + expect(Number(row.window_id)).toBeGreaterThan(0); + } + + // Window ids are distinct (no PK conflict, no two rows under the same id). + const windows = await sql` + SELECT id, run_id + FROM watcher_windows + WHERE run_id IN (${queuedA.runId}, ${queuedB.runId}) + ORDER BY id + `; + expect(windows).toHaveLength(2); + const ids = windows.map((w) => Number((w as { id: unknown }).id)); + expect(new Set(ids).size).toBe(2); + }); + + // Pi review round-2 #C: malformed-completion double-advance — two + // concurrent malformed POSTs against the same run must only advance the + // schedule once. Without the RETURNING-gated advance, the second POST's + // UPDATE matches zero rows (status already 'failed') but still ticked + // next_run_at forward. + it('does not double-advance next_run_at on concurrent malformed completions', async () => { + const { sql, dbClient, workspace, watcherId, agent } = await createAutomatedWatcher(); + const granularity = inferWatcherGranularityFromSchedule('0 9 * * *'); + const { windowStart, windowEnd } = await computePendingWindow( + dbClient, + watcherId, + granularity + ); + const queued = await createWatcherRun({ + organizationId: workspace.org.id, + watcherId, + agentId: agent.agentId, + windowStart: windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + dispatchSource: 'scheduled', + deviceWorkerId: '77777777-7777-7777-7777-777777777777', + agentKind: 'claude-code', + }); + const workerId = 'mac-device-double-advance'; + // Poison window_start and claim the run. + await sql` + UPDATE runs + SET status = 'running', + claimed_at = NOW(), + claimed_by = ${workerId}, + approved_input = approved_input || ${sql.json({ window_start: 'not-a-date' })} + WHERE id = ${queued.runId} + `; + + // Capture next_run_at after the FIRST malformed completion lands. + const first = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + body: { worker_id: workerId, output: 'x', duration_ms: 1 }, + } + ); + expect(first.status).toBe(400); + const [afterFirst] = await sql` + SELECT next_run_at FROM watchers WHERE id = ${watcherId} + `; + const firstNextRunMs = new Date( + afterFirst.next_run_at as string | Date + ).getTime(); + + // Second malformed completion against the now-failed row. The validation + // error still fires (approved_input.window_start is still garbage), but + // the UPDATE … WHERE status='running' matches zero rows, so the schedule + // must NOT advance again. + const second = await post( + `/api/workers/me/runs/${queued.runId}/complete-watcher`, + { + body: { worker_id: workerId, output: 'x2', duration_ms: 1 }, + } + ); + expect(second.status).toBe(400); + const [afterSecond] = await sql` + SELECT next_run_at FROM watchers WHERE id = ${watcherId} + `; + const secondNextRunMs = new Date( + afterSecond.next_run_at as string | Date + ).getTime(); + + expect(secondNextRunMs).toBe(firstNextRunMs); + }); + }); }); diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 3a2bbda36..de9654a75 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -616,6 +616,7 @@ import { completeActionRun, completeAuthRun, completeEmbeddings, + completeWatcherRun, completeWorkerJob, createMyDeviceAuthProfile, createMyDeviceFeed, @@ -684,10 +685,17 @@ app.use('/api/workers/*', async (c, next) => { const requestPath = new URL(c.req.url).pathname; const isAuthProfileSubpath = requestPath.startsWith('/api/workers/me/auth-profiles'); const isFeedSubpath = requestPath.startsWith('/api/workers/me/feeds'); + // /api/workers/me/runs//complete-watcher — device-side watcher + // completion endpoint added in #798. The handler does its own + // `authorizeRunForWorker` claim-ownership check, so an org-scope + // gate here would just block legitimate posts from the bound device. + const isWatcherCompleteSubpath = + /^\/api\/workers\/me\/runs\/\d+\/complete-watcher$/.test(requestPath); if ( !allowedPathsForUserWorker.has(requestPath) && !isAuthProfileSubpath && - !isFeedSubpath + !isFeedSubpath && + !isWatcherCompleteSubpath ) { return c.json({ error: 'Endpoint not available to user-scoped workers' }, 403); } @@ -735,6 +743,7 @@ app.post('/api/workers/stream', streamContent); app.post('/api/workers/complete', completeWorkerJob); app.post('/api/workers/complete-action', completeActionRun); app.post('/api/workers/complete-embeddings', completeEmbeddings); +app.post('/api/workers/me/runs/:runId/complete-watcher', completeWatcherRun); app.post('/api/workers/fetch-events', fetchEventsForEmbedding); app.post('/api/workers/emit-auth-artifact', emitAuthArtifact); app.post('/api/workers/poll-auth-signal', pollAuthSignal); diff --git a/packages/server/src/tools/admin/helpers/db-helpers.ts b/packages/server/src/tools/admin/helpers/db-helpers.ts index 281ab9a91..b4d2223f7 100644 --- a/packages/server/src/tools/admin/helpers/db-helpers.ts +++ b/packages/server/src/tools/admin/helpers/db-helpers.ts @@ -58,12 +58,29 @@ export type NumericIdTable = (typeof NUMERIC_ID_TABLES)[number]; /** * Allocate the next numeric id for a whitelisted table (`COALESCE(MAX(id), 0) + 1`). - * Callers that need this to be race-free must hold an appropriate lock. + * + * Race-free across concurrent transactions via a per-table Postgres advisory + * lock keyed on `hashtext('_id_alloc')`. The lock is acquired with + * `pg_advisory_xact_lock`, so it's released automatically when the calling + * transaction commits or rolls back. To get serialization across the + * subsequent INSERT, the caller MUST invoke this from within a transaction + * (otherwise each statement is its own implicit tx and the lock releases + * before the INSERT executes — same race as without the lock). + * + * Without the advisory lock, two concurrent completions on DIFFERENT rows + * (e.g. two device workers completing two different watcher runs) can both + * compute the same `MAX(id)+1` and one will fail on the watcher_windows PK + * conflict. With the lock + caller-side tx, the second caller blocks until + * the first commits (and thus sees the first INSERT in its `MAX(id)`). */ export async function getNextNumericId(sql: DbClient, table: NumericIdTable): Promise { if (!NUMERIC_ID_TABLES.includes(table)) { throw new Error(`Invalid table name: ${table}`); } + // Per-table key: `hashtext` returns a stable int4 derived from the string. + // Same table → same key → serialized allocation; different tables → distinct + // keys → no false sharing. + await sql.unsafe(`SELECT pg_advisory_xact_lock(hashtext($1))`, [`${table}_id_alloc`]); const rows = await sql.unsafe<{ next_id: number }>( `SELECT COALESCE(MAX(id), 0) + 1 AS next_id FROM ${table}` ); diff --git a/packages/server/src/tools/admin/manage_watchers.ts b/packages/server/src/tools/admin/manage_watchers.ts index f105561b9..217e510e2 100644 --- a/packages/server/src/tools/admin/manage_watchers.ts +++ b/packages/server/src/tools/admin/manage_watchers.ts @@ -55,6 +55,7 @@ import { processWatcherClassifications, stripFields, } from '../../watchers/classifier-extraction'; +import { advanceWatcherSchedule } from '../../watchers/automation'; import { compileReactionScript, executeReaction } from '../../watchers/reaction-executor'; import { validateTemplate } from '../../watchers/renderer'; import { validateClassifierSourcePaths, validateExtractionSchema } from '../../watchers/validator'; @@ -1950,25 +1951,7 @@ async function handleCompleteWindow( // replays (no window created, no run transitioned) must not push // next_run_at forward, or each retry would shift the schedule. if (windowCreated || runMarkedCompleted) { - const watcherScheduleRows = await tx` - SELECT schedule, next_run_at - FROM watchers - WHERE id = ${watcherId} - LIMIT 1 - `; - const watcherSchedule = (watcherScheduleRows[0]?.schedule as string | null) ?? null; - const currentNextRunAt = (watcherScheduleRows[0]?.next_run_at as string | null) ?? null; - if (watcherSchedule) { - const nextRunBase = currentNextRunAt - ? new Date(Math.max(Date.now(), new Date(currentNextRunAt).getTime())) - : new Date(); - await tx` - UPDATE watchers - SET next_run_at = ${nextRunAt(watcherSchedule, nextRunBase)}::timestamptz, - updated_at = NOW() - WHERE id = ${watcherId} - `; - } + await advanceWatcherSchedule(tx, watcherId); } logger.info( diff --git a/packages/server/src/utils/queue-helpers.ts b/packages/server/src/utils/queue-helpers.ts index 6fd930df0..7c30c0400 100644 --- a/packages/server/src/utils/queue-helpers.ts +++ b/packages/server/src/utils/queue-helpers.ts @@ -30,6 +30,22 @@ export interface WatcherRunPayload { * against v2's schema. */ version_id: number | null; + /** + * When non-null, the watcher is pinned to a user-owned device worker. + * The server-side dispatcher (`packages/server/src/watchers/automation.ts`) + * MUST refuse to claim such rows — they are claimed exclusively by the + * matching device worker via `/api/workers/poll`. Mirrors the + * `connections.device_worker_id` lane that the worker poll already + * supports for sync runs. + */ + device_worker_id?: string | null; + /** + * Hint to the device-side dispatcher (Owletto Mac app, etc.) for which + * local CLI executor to spawn (e.g. `claude-code`, `codex`, `gemini`). + * Free-form string. Empty / null means "use the device's configured + * default agent". + */ + agent_kind?: string | null; } // ============================================ @@ -216,6 +232,8 @@ async function createWatcherRunWithClient( windowStart: string; windowEnd: string; dispatchSource: WatcherDispatchSource; + deviceWorkerId?: string | null; + agentKind?: string | null; } ): Promise<{ runId: number; status: string; created: boolean }> { const existing = await findActiveWatcherRun(sql, params.watcherId); @@ -236,6 +254,20 @@ async function createWatcherRunWithClient( ? Number(versionRows[0].current_version_id) : null; + // device_worker_id + agent_kind get persisted into approved_input so the + // server-side dispatcher (#802) can skip device-pinned rows from the SQL + // side, and so /api/workers/poll can claim them with a parallel CTE + // branch without a runs-schema migration. Empty strings are normalized to + // null so the dispatcher's `OR '' = ''` guard treats them as un-pinned. + const normalizedDeviceWorkerId = + typeof params.deviceWorkerId === 'string' && params.deviceWorkerId.trim() !== '' + ? params.deviceWorkerId.trim() + : null; + const normalizedAgentKind = + typeof params.agentKind === 'string' && params.agentKind.trim() !== '' + ? params.agentKind.trim() + : null; + const payload: WatcherRunPayload = { watcher_id: params.watcherId, agent_id: params.agentId, @@ -243,6 +275,8 @@ async function createWatcherRunWithClient( window_end: params.windowEnd, dispatch_source: params.dispatchSource, version_id: snapshotVersionId, + device_worker_id: normalizedDeviceWorkerId, + agent_kind: normalizedAgentKind, }; const inserted = await sql` @@ -284,6 +318,8 @@ export async function createWatcherRun( windowStart: string; windowEnd: string; dispatchSource: WatcherDispatchSource; + deviceWorkerId?: string | null; + agentKind?: string | null; }, db?: DbClient ): Promise<{ runId: number; status: string; created: boolean }> { diff --git a/packages/server/src/watchers/automation.ts b/packages/server/src/watchers/automation.ts index 9fbfc2f41..ec13a851d 100644 --- a/packages/server/src/watchers/automation.ts +++ b/packages/server/src/watchers/automation.ts @@ -31,6 +31,10 @@ interface DueWatcherRow { agent_id: string; schedule: string | null; status?: string; + /** Watcher is pinned to a user-owned device worker (e.g. Lobu Mac app). */ + device_worker_id?: string | null; + /** Preferred local agent kind on the pinned device (e.g. 'claude-code'). */ + agent_kind?: string | null; } interface ClaimedWatcherRunRow { @@ -117,6 +121,17 @@ export function parseWatcherRunPayload(value: unknown): WatcherRunPayload | null ? Number(rawVersionId) : null; + const rawDeviceWorkerId = payload.device_worker_id; + const deviceWorkerId = + typeof rawDeviceWorkerId === 'string' && rawDeviceWorkerId.trim() !== '' + ? rawDeviceWorkerId.trim() + : null; + const rawAgentKind = payload.agent_kind; + const agentKind = + typeof rawAgentKind === 'string' && rawAgentKind.trim() !== '' + ? rawAgentKind.trim() + : null; + return { watcher_id: watcherId, agent_id: agentId, @@ -124,6 +139,8 @@ export function parseWatcherRunPayload(value: unknown): WatcherRunPayload | null window_end: windowEnd, dispatch_source: dispatchSource, version_id: Number.isFinite(versionId as number) ? (versionId as number) : null, + device_worker_id: deviceWorkerId, + agent_kind: agentKind, }; } @@ -144,7 +161,8 @@ async function loadWatcherForAutomation( watcherId: number ): Promise { const rows = await sql` - SELECT id, organization_id, agent_id, schedule, status + SELECT id, organization_id, agent_id, schedule, status, + device_worker_id::text AS device_worker_id, agent_kind FROM watchers WHERE id = ${watcherId} LIMIT 1 @@ -177,6 +195,8 @@ async function enqueueWatcherRunForRecord( windowStart: windowStart.toISOString(), windowEnd: windowEnd.toISOString(), dispatchSource, + deviceWorkerId: watcher.device_worker_id ?? null, + agentKind: watcher.agent_kind ?? null, }, sql ); @@ -233,10 +253,21 @@ async function markWatcherRunFailedIdempotent( // broken watcher re-materializes + re-dispatches a fresh agent run every // single minute forever (token/worker burn). Mirrors the feeds model: a // failed run still moves the schedule forward by its normal cadence. - await advanceWatcherScheduleAfterTerminalFailure(sql, failedRows[0]?.watcher_id as number | undefined); + await advanceWatcherSchedule(sql, failedRows[0]?.watcher_id as number | undefined); } -async function advanceWatcherScheduleAfterTerminalFailure( +/** + * Move a watcher's `next_run_at` forward by one cron tick. Reused by: + * - terminal-failure paths in this module (broken watcher shouldn't re-fire each minute) + * - manage_watchers(action="complete_window") on successful completion + * - the device-side `/api/workers/me/runs/:id/complete-watcher` endpoint + * + * Pass either the singleton `sql` client or a transaction handle from + * `sql.begin(...)` to advance inside the caller's transaction. Schedule-less + * watchers (manual-only) are no-ops. Read failures are logged and swallowed — + * a missed schedule tick is preferable to failing the surrounding write. + */ +export async function advanceWatcherSchedule( sql: DbClient, watcherId: number | undefined ): Promise { @@ -261,7 +292,7 @@ async function advanceWatcherScheduleAfterTerminalFailure( WHERE id = ${watcherId} `; } catch (err) { - logger.warn(`[watchers] failed to advance next_run_at after terminal failure: ${err}`); + logger.warn(`[watchers] failed to advance next_run_at: ${err}`); } } @@ -463,7 +494,8 @@ export async function materializeDueWatcherRuns( const sql = db ?? getDb(); const dueWatchers = await sql` - SELECT w.id, w.organization_id, w.agent_id, w.schedule + SELECT w.id, w.organization_id, w.agent_id, w.schedule, + w.device_worker_id::text AS device_worker_id, w.agent_kind FROM watchers w WHERE w.status = 'active' AND w.agent_id IS NOT NULL @@ -500,7 +532,7 @@ export async function materializeDueWatcherRuns( ); // Don't leave next_run_at in the past — that would re-select this watcher // on every 60s tick. Push it forward per the watcher's cron schedule. - await advanceWatcherScheduleAfterTerminalFailure(sql, watcher.id); + await advanceWatcherSchedule(sql, watcher.id); } } diff --git a/packages/server/src/worker-api.ts b/packages/server/src/worker-api.ts index 6b8b3aa08..ba7e9e92a 100644 --- a/packages/server/src/worker-api.ts +++ b/packages/server/src/worker-api.ts @@ -28,6 +28,8 @@ import { import { captureServerError } from './sentry'; import { autoLinkEvent } from './utils/auto-linker'; import { nextRunAt as nextRunAtFromCron } from './utils/cron'; +import { advanceWatcherSchedule } from './watchers/automation'; +import { getNextNumericId } from './tools/admin/helpers/db-helpers'; import { reconcileDeviceCapabilities } from './worker-api/device-reconcile'; import { findBundledConnectorFile } from './utils/connector-catalog'; import { resolveConnectorCode } from './utils/ensure-connector-installed'; @@ -341,48 +343,65 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { LIMIT 1 ) cd ON true WHERE r.status = 'pending' - -- Connector worker only ever claims its own lanes. The lobu-queue - -- run types (chat_message, schedule, agent_run, internal) are - -- claimed in-process by the gateway's RunsQueue; an explicit - -- allow-list keeps the lanes separated. - AND r.run_type IN ('sync', 'action', 'embed_backfill', 'auth') AND (r.approval_status = 'auto' OR r.approval_status = 'approved') - AND (${hasBrowser} OR COALESCE(cd.api_type, 'api') = 'api') AND ( - -- (A) trusted/anonymous fleet worker: the no-capability cloud - -- connectors plus any capability it happens to advertise, in - -- any org — but NEVER a connection pinned to a device. + -- (1) Connector-worker lanes: sync / action / embed_backfill / auth. + -- Browser-only connectors require the browser capability. ( - ${!isUserScopedWorker} - AND COALESCE(cd.required_capability, '') = ANY(${pgTextArray(capabilityMatchSet)}::text[]) - AND con.device_worker_id IS NULL - ) - -- (B) user-scoped device worker: an unpinned capability-matched - -- device connector in an org this worker can see. Capability - -- match goes through the authorized set — a chrome-extension - -- claiming os.shell is dropped server-side (see - -- @lobu/core/capabilities), and that dropped string MUST NOT - -- match a connectors required_capability here either. - OR ( - ${isUserScopedWorker} - AND cd.required_capability IS NOT NULL - AND cd.required_capability = ANY(${pgTextArray(authorizedCapabilities)}::text[]) - AND con.device_worker_id IS NULL - AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) + r.run_type IN ('sync', 'action', 'embed_backfill', 'auth') + AND (${hasBrowser} OR COALESCE(cd.api_type, 'api') = 'api') + AND ( + -- (1A) trusted/anonymous fleet worker: the no-capability cloud + -- connectors plus any capability it happens to advertise, + -- in any org — but NEVER a connection pinned to a device. + ( + ${!isUserScopedWorker} + AND COALESCE(cd.required_capability, '') = ANY(${pgTextArray(capabilityMatchSet)}::text[]) + AND con.device_worker_id IS NULL + ) + -- (1B) user-scoped device worker: an unpinned capability-matched + -- device connector in an org this worker can see. Capability + -- match goes through the authorized set — a chrome-extension + -- claiming os.shell is dropped server-side (see + -- @lobu/core/capabilities), and that dropped string MUST NOT + -- match a connectors required_capability here either. + OR ( + ${isUserScopedWorker} + AND cd.required_capability IS NOT NULL + AND cd.required_capability = ANY(${pgTextArray(authorizedCapabilities)}::text[]) + AND con.device_worker_id IS NULL + AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) + ) + -- ... or any connection explicitly pinned to THIS device (this is + -- "run the Reddit connector on my Mac"). Still: a device-only + -- connector needs the capability currently advertised, and the + -- pin only counts in an org this worker can see (which includes + -- the org the device is attached to). + OR ( + ${isUserScopedWorker} + AND ${deviceWorkerId}::uuid IS NOT NULL + AND con.device_worker_id = ${deviceWorkerId}::uuid + AND ( + cd.required_capability IS NULL + OR cd.required_capability = ANY(${pgTextArray(authorizedCapabilities)}::text[]) + ) + AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) + ) + ) ) - -- ... or any connection explicitly pinned to THIS device (this is - -- "run the Reddit connector on my Mac"). Still: a device-only - -- connector needs the capability currently advertised, and the - -- pin only counts in an org this worker can see (which includes - -- the org the device is attached to). + -- (2) Watcher lane: a watcher run with approved_input.device_worker_id + -- matching this device. Watchers don't carry a connection_id and + -- don't gate on capabilities — the matching device's local CLI + -- executor handles the work (Owletto's WatcherDispatcher routes + -- by approved_input.agent_kind). The server-side dispatcher + -- (#802) already refuses to claim rows with this pin set, so + -- this branch is the only legal claim path for them. OR ( ${isUserScopedWorker} + AND r.run_type = 'watcher' AND ${deviceWorkerId}::uuid IS NOT NULL - AND con.device_worker_id = ${deviceWorkerId}::uuid - AND ( - cd.required_capability IS NULL - OR cd.required_capability = ANY(${pgTextArray(authorizedCapabilities)}::text[]) - ) + AND r.approved_input ? 'device_worker_id' + AND r.approved_input->>'device_worker_id' = ${deviceWorkerId}::text AND r.organization_id = ANY(${pgTextArray(orgScopeIds)}::text[]) ) ) @@ -421,6 +440,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { r.watcher_id, r.window_id, r.organization_id, + r.created_at AS run_created_at, r.auth_profile_id AS run_auth_profile_id, f.feed_key, f.config AS feed_config, @@ -431,13 +451,19 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { conn.config AS connection_config, conn.device_worker_id AS connection_device_worker_id, cv.compiled_code, - ap.auth_data AS auth_profile_auth_data + ap.auth_data AS auth_profile_auth_data, + w.name AS watcher_name, + w.slug AS watcher_slug, + w.agent_kind AS watcher_agent_kind, + w.notification_channel AS watcher_notification_channel, + w.notification_priority AS watcher_notification_priority FROM runs r LEFT JOIN feeds f ON f.id = r.feed_id LEFT JOIN connections conn ON conn.id = r.connection_id LEFT JOIN connector_versions cv ON cv.connector_key = r.connector_key AND cv.version = r.connector_version LEFT JOIN auth_profiles ap ON ap.id = r.auth_profile_id + LEFT JOIN watchers w ON w.id = r.watcher_id WHERE r.id = ${runId} LIMIT 1 `; @@ -481,6 +507,7 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { connector_version: string | null; action_key: string | null; action_input: Record | null; + approved_input: Record | null; feed_key: string | null; feed_config: Record | null; checkpoint: Record | null; @@ -490,15 +517,71 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { connection_config: Record | null; connection_device_worker_id: string | null; compiled_code: string | null; + run_created_at: string | Date | null; // Watcher run fields (populated via LEFT JOINs) watcher_id: number | null; window_id: number | null; organization_id: string; + watcher_name: string | null; + watcher_slug: string | null; + watcher_agent_kind: string | null; + watcher_notification_channel: string | null; + watcher_notification_priority: string | null; // Auth run fields run_auth_profile_id: number | null; auth_profile_auth_data: Record | null; }; + // Watcher run: device worker is going to spawn a local CLI executor and + // return the result via /api/workers/me/runs/:runId/complete-watcher. No + // connector code, no connection credentials, no compiled_code lookup — + // just the payload envelope the dispatcher needs to build a prompt. The + // server-side claim filter (#802 + this PR) already guarantees only the + // matching device can land on this row. + if (row.run_type === 'watcher') { + const approved = (row.approved_input ?? {}) as Record; + const firedAtRaw = row.run_created_at; + const firedAt = + firedAtRaw instanceof Date + ? firedAtRaw.toISOString() + : typeof firedAtRaw === 'string' && firedAtRaw.trim() + ? firedAtRaw + : new Date().toISOString(); + const watcherIdStr = row.watcher_id != null ? String(row.watcher_id) : ''; + const agentKindFromPayload = + typeof approved['agent_kind'] === 'string' && (approved['agent_kind'] as string).trim() + ? (approved['agent_kind'] as string).trim() + : null; + return c.json({ + run_id: row.run_id, + run_type: row.run_type, + organization_id: row.organization_id, + payload: { + watcher: { + id: watcherIdStr, + name: row.watcher_name ?? null, + slug: row.watcher_slug ?? null, + agent_kind: agentKindFromPayload ?? row.watcher_agent_kind ?? null, + notification_channel: row.watcher_notification_channel ?? 'canvas', + notification_priority: row.watcher_notification_priority ?? 'normal', + }, + event: { + trigger_event_id: null, + fired_at: firedAt, + payload: approved, + }, + context: { + device: { + worker_id: deviceWorkerId, + }, + user: { + user_id: workerUserId ?? null, + }, + }, + }, + }); + } + // Connector code delivery: // - Fleet workers (server pods, embedded mode) ship the same bundled // connector .ts sources in their image. The gateway omits @@ -1044,6 +1127,400 @@ export async function completeWorkerJob(c: Context<{ Bindings: Env }>) { } } +/** + * POST /api/workers/me/runs/:runId/complete-watcher + * + * Device-side completion path for a watcher run that was executed by a local + * CLI agent (Claude Code, Codex, etc.) on the user's machine. The Owletto + * Mac app's `WatcherDispatcher` posts here once the subprocess exits. + * + * Unlike the MCP-resident `manage_watchers(action="complete_window")` path, + * the device flow has no JWT window token, no extraction schema validation, + * and no entity-link resolution — the CLI output is free-form text. We + * still write a `watcher_windows` row so the dashboard surfaces the run + * the same way as a server-side watcher completion. + * + * Authorization: the caller must own the claim — same gate as + * /api/workers/complete (status='running' AND claimed_by === worker_id). + */ +export async function completeWatcherRun(c: Context<{ Bindings: Env }>) { + const runIdParam = c.req.param('runId'); + if (!runIdParam) { + return c.json({ error: 'runId is required' }, 400); + } + const runId = Number(runIdParam); + if (!Number.isFinite(runId) || runId <= 0) { + return c.json({ error: 'Invalid runId' }, 400); + } + + let body: { + worker_id: string; + output?: string; + error?: string; + duration_ms?: number; + exit_code?: number | null; + exit_signal?: string | null; + exit_reason?: 'ok' | 'error_message' | 'timeout' | 'oom' | 'crash'; + }; + try { + body = await c.req.json(); + } catch { + return c.json({ error: 'Invalid or missing JSON body' }, 400); + } + + const denied = await authorizeRunForWorker(c, runId, body.worker_id); + if (denied) return denied; + + const sql = getDb(); + // Reload the row now that authorization has cleared. We need the watcher_id + // + organization_id + approved_input to write the completion side-effects. + // The transaction below will re-lock and re-check status under SELECT ... + // FOR UPDATE; this read just gates the cheap rejection paths. + const runRows = (await sql` + SELECT id, organization_id, watcher_id, approved_input, run_type, claimed_at, status + FROM runs + WHERE id = ${runId} + LIMIT 1 + `) as unknown as Array<{ + id: number; + organization_id: string; + watcher_id: number | null; + approved_input: Record | null; + run_type: string; + claimed_at: string | Date | null; + status: string; + }>; + const run = runRows[0]; + if (!run) return c.json({ error: 'Run not found' }, 404); + if (run.run_type !== 'watcher') { + return c.json({ error: 'Not a watcher run' }, 409); + } + if (run.watcher_id == null) { + return c.json({ error: 'Watcher run missing watcher_id' }, 500); + } + + const watcherId = Number(run.watcher_id); + const approved = (run.approved_input ?? {}) as Record; + + // Fix 2 (pi round-2): device-identity binding pinned to the OAuth token, not + // the request body. + // + // The previous version looked up `(workerUserId, body.worker_id)` in + // `device_workers`, but `body.worker_id` is client-supplied. A same-user + // token could complete as a different registered worker by posting that + // worker's id. The fix is the same trick `pollWorkerJob` already uses: if + // the token was minted with a `workerId` binding (`device_worker:run` + // PATs/OAuth tokens always are), require `body.worker_id === boundWorkerId` + // AND, if the run is pinned to a device, the bound worker's + // `device_workers.id` matches `approved_input.device_worker_id`. + // + // For legacy/admin tokens with no `workerId` binding we fall through to the + // old user_id+worker_id lookup, but emit a warning so the audit trail can + // catch this path if it ever fires in production (Lobu for Mac always + // mints worker-bound tokens via /api/me/devices/mint-child-token). + if (c.var.workerAuthMode === 'user') { + const workerUserId = c.var.workerUserId; + const boundWorkerId = c.var.mcpAuthInfo?.workerId ?? null; + const pinnedDeviceWorkerId = + typeof approved.device_worker_id === 'string' ? approved.device_worker_id : null; + + if (boundWorkerId) { + if (boundWorkerId !== body.worker_id) { + logger.warn( + { run_id: runId, body_worker_id: body.worker_id, bound_worker_id: boundWorkerId }, + '[completeWatcherRun] body.worker_id != token-bound worker_id — rejecting' + ); + return c.json( + { + error: 'worker_id_mismatch', + error_description: `this token is bound to worker_id '${boundWorkerId}'`, + }, + 403 + ); + } + if (pinnedDeviceWorkerId && workerUserId) { + const deviceRows = (await sql` + SELECT id + FROM device_workers + WHERE user_id = ${workerUserId} + AND worker_id = ${boundWorkerId} + LIMIT 1 + `) as unknown as Array<{ id: string }>; + const callerDeviceWorkerId = deviceRows[0]?.id ?? null; + if (!callerDeviceWorkerId || callerDeviceWorkerId !== pinnedDeviceWorkerId) { + logger.warn( + { + run_id: runId, + bound_worker_id: boundWorkerId, + caller_device: callerDeviceWorkerId, + pinned_device: pinnedDeviceWorkerId, + }, + '[completeWatcherRun] device_worker_id mismatch — rejecting' + ); + return c.json({ error: 'Forbidden: device worker mismatch' }, 403); + } + } + } else if (workerUserId && pinnedDeviceWorkerId) { + // Legacy/admin path: no worker-bound token. Fall back to the + // (user_id, body.worker_id) lookup; this is weaker than the bound path + // but still gates on user ownership. Emit a warning so prod telemetry + // can flag if any non-Mac caller hits this branch. + logger.warn( + { run_id: runId, worker_user_id: workerUserId, body_worker_id: body.worker_id }, + '[completeWatcherRun] no token-bound workerId — falling back to user_id+worker_id check' + ); + const deviceRows = (await sql` + SELECT id + FROM device_workers + WHERE user_id = ${workerUserId} + AND worker_id = ${body.worker_id} + LIMIT 1 + `) as unknown as Array<{ id: string }>; + const callerDeviceWorkerId = deviceRows[0]?.id ?? null; + if (!callerDeviceWorkerId || callerDeviceWorkerId !== pinnedDeviceWorkerId) { + logger.warn( + { + run_id: runId, + body_worker_id: body.worker_id, + caller_device: callerDeviceWorkerId, + pinned_device: pinnedDeviceWorkerId, + }, + '[completeWatcherRun] device_worker_id mismatch (legacy path) — rejecting' + ); + return c.json({ error: 'Forbidden: device worker mismatch' }, 403); + } + } + } + + // Fix 5: validate the window bounds BEFORE opening any transaction. The + // legacy code defaulted silently to `new Date().toISOString()` — that hid + // garbage payloads behind a fresh timestamp. If approved_input contains a + // bound, it must be a parseable ISO string; otherwise the run is + // unrecoverably malformed and we mark it failed up front (so it can't get + // stuck in `running` waiting for a stale-run sweep that may not exist). + const validateIsoBound = ( + key: 'window_start' | 'window_end', + fallback: string + ): { value: string } | { error: string } => { + const raw = approved[key]; + if (raw === undefined || raw === null) return { value: fallback }; + if (typeof raw !== 'string') { + return { error: `approved_input.${key} must be an ISO timestamp string` }; + } + const parsed = Date.parse(raw); + if (!Number.isFinite(parsed)) { + return { error: `approved_input.${key} is not a valid ISO timestamp` }; + } + return { value: raw }; + }; + + const nowIso = new Date().toISOString(); + const startResult = validateIsoBound('window_start', nowIso); + const endResult = validateIsoBound('window_end', nowIso); + if ('error' in startResult || 'error' in endResult) { + const reason = + 'error' in startResult ? startResult.error : (endResult as { error: string }).error; + // Mark the run failed so the watcher's `next_run_at` advances and the + // schedule doesn't loop on this poisoned payload forever. + // + // Pi round-2 #C: only advance the schedule when the UPDATE actually + // changed a row. Without `RETURNING id`, two concurrent malformed + // completions would BOTH advance the schedule — the second one's UPDATE + // matches zero rows (status already 'failed') but we'd still tick + // `next_run_at` forward, potentially skipping a window. + try { + const failedRows = (await sql` + UPDATE runs + SET status = 'failed', + completed_at = current_timestamp, + error_message = ${`Invalid completion payload: ${reason}`}, + exit_reason = 'error_message' + WHERE id = ${runId} + AND status = 'running' + RETURNING id + `) as unknown as Array<{ id: number }>; + if (failedRows.length > 0) { + await advanceWatcherSchedule(sql, watcherId); + } + } catch (err) { + logger.error( + { run_id: runId, err: errorMessage(err) }, + '[completeWatcherRun] failed to mark run failed after validation error' + ); + } + return c.json({ error: reason }, 400); + } + const windowStart = startResult.value; + const windowEnd = endResult.value; + // Granularity isn't stored on watcher runs — infer once for the window + // row. A blank string fails the NOT NULL constraint; default to "ad_hoc" + // for device-driven runs (the dashboard's rollup logic treats this as a + // leaf window with no parent). + const granularity = 'ad_hoc'; + + const hasError = typeof body.error === 'string' && body.error.trim() !== ''; + const output = typeof body.output === 'string' ? body.output : ''; + const durationMs = + typeof body.duration_ms === 'number' && Number.isFinite(body.duration_ms) + ? Math.max(0, Math.floor(body.duration_ms)) + : null; + + // Track whether the work was already done by a concurrent completion. Used + // after the transaction to return an idempotent 200 instead of failing the + // duplicate-INSERT path that pi-#3 flagged. + let alreadyCompleted = false; + + try { + await sql.begin(async (tx) => { + // Fix 3: lock the run row inside the transaction. Without this, two + // concurrent POSTs can both pass `authorizeRunForWorker` (which reads + // without a lock), both enter the tx, both INSERT a watcher_windows + // row, and the second one's run-UPDATE fails the `status='running'` + // filter — leaving a duplicate window row and a 500. + const lockedRows = (await tx` + SELECT status + FROM runs + WHERE id = ${runId} + FOR UPDATE + `) as unknown as Array<{ status: string }>; + const currentStatus = lockedRows[0]?.status ?? null; + if (!currentStatus) { + // Disappeared between the pre-tx read and the lock — treat as 404 by + // throwing; outer catch surfaces as 500, callers will retry. + throw new Error('Run vanished while acquiring lock'); + } + if (currentStatus !== 'running') { + // A concurrent caller already terminated this run. Idempotent path: + // do nothing here and let the outer code return 200 with the existing + // terminal status. This is safe because the duplicate write would + // either violate the watcher_windows PK or insert a phantom row. + alreadyCompleted = true; + return; + } + + if (hasError) { + await tx` + UPDATE runs + SET status = 'failed', + completed_at = current_timestamp, + error_message = ${body.error ?? null}, + exit_code = ${body.exit_code ?? null}, + exit_signal = ${body.exit_signal ?? null}, + exit_reason = ${body.exit_reason ?? 'error_message'} + WHERE id = ${runId} + AND status = 'running' + `; + } else { + // Fix 4 (pi round-2 #B): allocate the window id via the shared + // helper, which now takes a per-table `pg_advisory_xact_lock` keyed + // on `hashtext('watcher_windows_id_alloc')`. Because this runs inside + // `sql.begin`, the lock is held until tx commit — bracketing the + // SELECT MAX + INSERT, so two concurrent completions on DIFFERENT + // watcher runs serialize on allocation and never collide on the + // watcher_windows PK. (Same-watcher concurrent completions are + // already serialized by the SELECT … FOR UPDATE on runs.id above.) + const windowId = await getNextNumericId(tx, 'watcher_windows'); + + const extractedData = { + kind: 'device_cli_output', + output, + agent_kind: + typeof approved.agent_kind === 'string' && (approved.agent_kind as string).trim() + ? (approved.agent_kind as string).trim() + : null, + } as Record; + const runMetadata = { + source: 'device_worker', + device_worker_id: + typeof approved.device_worker_id === 'string' + ? approved.device_worker_id + : null, + watcher_run_id: runId, + } as Record; + + await tx` + INSERT INTO watcher_windows ( + id, watcher_id, version_id, window_start, window_end, granularity, + extracted_data, content_analyzed, model_used, client_id, run_metadata, + is_rollup, depth, source_window_ids, run_id, execution_time_ms, created_at + ) VALUES ( + ${windowId}, ${watcherId}, NULL, ${windowStart}, ${windowEnd}, ${granularity}, + ${tx.json(extractedData)}, 0, 'device-cli', NULL, ${tx.json(runMetadata)}, + false, 0, NULL, ${runId}, ${durationMs}, current_timestamp + ) + `; + + await tx` + UPDATE runs + SET status = 'completed', + completed_at = current_timestamp, + window_id = ${windowId}, + error_message = NULL, + exit_code = ${body.exit_code ?? null}, + exit_signal = ${body.exit_signal ?? null}, + exit_reason = ${body.exit_reason ?? 'ok'} + WHERE id = ${runId} + AND status = 'running' + `; + } + + await tx` + UPDATE watchers + SET last_fired_at = NOW(), + updated_at = NOW() + WHERE id = ${watcherId} + `; + + // Fix 1: advance `next_run_at` in the SAME transaction that recorded + // the completion. Without this the scheduled-jobs tick sees the + // watcher as still due (last_fired_at moved, next_run_at didn't) and + // re-materializes immediately — looping forever on every minute tick. + // The helper is shared with `manage_watchers(action="complete_window")` + // and the terminal-failure path in `automation.ts`. + await advanceWatcherSchedule(tx, watcherId); + }); + } catch (err) { + logger.error( + { error: errorMessage(err), run_id: runId, watcher_id: watcherId }, + '[completeWatcherRun] Failed to record completion' + ); + return c.json({ error: errorMessage(err) }, 500); + } + + if (alreadyCompleted) { + // Re-read the terminal status so we echo back what actually landed (not + // what this request would have written). Don't fire the lifecycle event + // again — the winning concurrent caller already did. + const finalRows = (await sql` + SELECT status FROM runs WHERE id = ${runId} LIMIT 1 + `) as unknown as Array<{ status: string }>; + const finalStatus = finalRows[0]?.status ?? (hasError ? 'failed' : 'completed'); + return c.json({ ok: true, status: finalStatus, idempotent: true }); + } + + // Fire-and-forget: a "change" event so the dashboard's metric_series picks + // up the device-CLI completion the same way it picks up server-side ones. + // LifecycleOp is restricted to created/updated/deleted — we use 'updated' + // and put the actual ran/errored detail under `extra`. + recordLifecycleEvent({ + organizationId: run.organization_id, + entityType: 'watcher', + op: 'updated', + entityId: String(watcherId), + summary: hasError + ? `Watcher run ${runId} failed on device CLI: ${body.error ?? 'unknown error'}` + : `Watcher run ${runId} completed via device CLI`, + extra: { + run_id: runId, + source: 'device_worker', + outcome: hasError ? 'failed' : 'completed', + duration_ms: durationMs, + }, + }); + + return c.json({ ok: true, status: hasError ? 'failed' : 'completed' }); +} + /** * POST /api/workers/fetch-events *