feat(watchers): device-pinned watcher runs end-to-end (#798 PR-1)#814
Conversation
The schema work in #811 added watchers.device_worker_id + agent_kind + notification + cooldown columns, and #808 made the server-side dispatcher skip runs already pinned to a device. This PR wires the pinning + claim + completion sides so a user's Lobu Mac app can actually execute the run via a local CLI agent (Claude Code, etc.). Server-side: 1. `materializeDueWatcherRuns` now reads `watchers.device_worker_id` and `watchers.agent_kind` and persists both into the run's `approved_input` JSONB. The existing #802 dispatcher exclusion keys off `approved_input->>'device_worker_id'`, so device-pinned rows stay `pending` on the server side. 2. `/api/workers/poll` gains a parallel CTE branch for `run_type='watcher'` AND `approved_input->>'device_worker_id' = <this device's id>`. The poll response is short-circuited for watchers — no connector code, no credentials, no compiled_code lookup. It returns a watcher payload envelope `{ watcher, event, context }` that the device-side dispatcher uses to build a CLI prompt. 3. New endpoint `POST /api/workers/me/runs/:runId/complete-watcher`: authorizes via the existing claim-ownership gate (`authorizeRunForWorker`), writes a `watcher_windows` row with `model_used='device-cli'` and `extracted_data.kind='device_cli_output'`, updates `runs.status` to completed/failed, advances `watchers.last_fired_at`, and emits a `watcher:updated` lifecycle event so dashboard metric_series picks up the run. 4. The new path is whitelisted in the user-scoped worker route gate (was previously 403 for `/api/workers/me/...` outside auth-profiles/feeds). Tests: - materializeDueWatcherRuns persists device_worker_id + agent_kind. - End-to-end complete-watcher → runs.completed, watcher_windows row with the CLI output, last_fired_at advanced. - Failure path: error supplied → runs.failed, no window row. - 409 for non-watcher run types, 404 for unknown run ids. PR-2 (Owletto Mac app dispatcher + ClaudeCodeExecutor) consumes this contract.
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds device-pinned watcher execution: persist ChangesDevice-Pinned Watcher Automation and Completion
Sequence DiagramsequenceDiagram
participant DeviceWorker
participant pollWorkerJob
participant completeWatcherRun
participant Database
participant Events
DeviceWorker->>pollWorkerJob: poll() with device_worker_id
pollWorkerJob->>Database: claim watcher run where approved_input.device_worker_id matches
Database-->>pollWorkerJob: watcher run + metadata
pollWorkerJob->>pollWorkerJob: construct event.fired_at, agent_kind, context
pollWorkerJob-->>DeviceWorker: { event, watcher, context }
DeviceWorker->>completeWatcherRun: POST /runs/:id/complete-watcher { error?, output? }
completeWatcherRun->>Database: verify watcher_id, authorize, lock run
alt error provided
completeWatcherRun->>Database: UPDATE runs status = failed
completeWatcherRun->>Database: advanceWatcherSchedule(tx, watcherId)
else success
completeWatcherRun->>Database: UPDATE runs status = completed
completeWatcherRun->>Database: call getNextNumericId('watcher_windows')
completeWatcherRun->>Database: INSERT watcher_windows with device CLI metadata
completeWatcherRun->>Database: UPDATE watchers.last_fired_at
completeWatcherRun->>Database: advanceWatcherSchedule(tx, watcherId)
end
completeWatcherRun->>Events: emit watcher lifecycle event
completeWatcherRun-->>DeviceWorker: { ok, status, idempotent? }
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2492fd309c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| SET last_fired_at = NOW(), | ||
| updated_at = NOW() |
There was a problem hiding this comment.
Advance watcher schedule after device completion
When a scheduled device-pinned watcher completes or fails through this endpoint, this update only writes last_fired_at and leaves next_run_at at the already-due timestamp that caused the run to be materialized. After the run is marked terminal there is no active run left, so materializeDueWatcherRuns will select the same watcher again on the next automation tick (next_run_at <= current_timestamp) and enqueue back-to-back duplicate runs. The normal complete_window/terminal-failure paths advance next_run_at; this device completion path should do the same based on the watcher's schedule.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/server/src/worker-api.ts`:
- Around line 1225-1297: The completion path must first lock and verify the run
row inside the same transaction to make the write atomic: in the tx callback,
perform an UPDATE runs ... WHERE id = ${runId} AND status = 'running' RETURNING
* (or SELECT ... FOR UPDATE then UPDATE) to claim the run and abort the
transaction if no row is returned/updated; only after that successful claim
proceed to compute windowId, INSERT INTO watcher_windows and then update runs to
set status/completed_at/window_id (or use the RETURNING row to drive the
subsequent UPDATE to completed), and finally update watchers.last_fired_at — use
the existing tx handle and runId/watchers symbols to locate and change the logic
so the run-state check+claim happens before any watcher_windows INSERT or
watchers update and aborts when the claim affects 0 rows.
- Around line 1243-1246: The current computation of windowId using SELECT
COALESCE(MAX(id), 0) + 1 on watcher_windows is racy; update the code that sets
windowId (the windowIdRows/windowId block inside the tx) to obtain an id
atomically from the column's sequence or the table default instead of scanning
MAX(id). Replace the SELECT/MAX logic with either calling the sequence (nextval
on the watcher_windows id sequence) or performing an INSERT that relies on the
table default and returns the generated id (e.g., INSERT ... RETURNING id)
within the same tx so windowId is assigned atomically and avoids duplicate-key
races.
- Around line 1171-1172: The current call to authorizeRunForWorker(c, runId,
body.worker_id) is insufficient for watcher completions because watcher runs
lack a connection_id; instead extract the pinned device id from the run's
approved_input (approved_input->>'device_worker_id'), resolve the device_owner
by querying the device_workers (or equivalent) join using that device_worker_id,
and perform the authorization check against that resolved device_owner (either
by updating authorizeRunForWorker to accept a device_worker_id/device_owner
param or by adding a small helper that resolves device_owner then calls
authorizeRunForWorker). Ensure you reference the run's approved_input JSON, the
device_workers lookup, and the authorizeRunForWorker call so the authorization
is based on the pinned device owner not just org scope.
- Around line 571-578: The payload is putting device_workers.id into
context.device.worker_id which conflicts with the install-scoped poll-body field
used by claimed_by and /complete-watcher; change the response to either return
the install-scoped worker_id expected by callers or rename the field to
context.device.device_worker_id so the dispatcher won't round-trip the wrong id.
Locate the code building the payload (the object containing
context.device.worker_id and the variable deviceWorkerId) and replace the
assignment with the correct identifier (install-scoped worker_id) or rename the
property to device_worker_id, and update any downstream uses that consume
context.device.worker_id accordingly (e.g., claimed_by and /complete-watcher
consumers) to prevent the 403 on completion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: e4b885bc-1076-43f7-91e4-d2a10d1a1c17
📒 Files selected for processing (5)
packages/server/src/__tests__/integration/watchers/automation-contract.test.tspackages/server/src/index.tspackages/server/src/utils/queue-helpers.tspackages/server/src/watchers/automation.tspackages/server/src/worker-api.ts
5 issues from pi's review of #814's WatcherDispatcher / completeWatcherRun that all materialise on real device traffic: 1. (BLOCKER) Schedule advancement mismatch — completing a device watcher moved `last_fired_at` forward but never bumped `next_run_at`, so the scheduler tick re-materialised the same watcher every minute forever. Extract `advanceWatcherSchedule(sql|tx, watcherId)` from automation.ts (was `advanceWatcherScheduleAfterTerminalFailure`) and call it from both manage_watchers(action="complete_window") and the device complete-watcher endpoint after the in-transaction completion writes. 2. Device-identity binding — `claimed_by === body.worker_id` alone is spoofable across devices that share a user OAuth token: any other device with the token could claim a run under any worker_id, then a third caller with the same token could complete it. For user-scoped workers we now resolve the caller's `device_workers.id` from `(workerUserId, body.worker_id)` and require it to equal `approved_input.device_worker_id` (which materializeDueWatcherRuns already snapshotted from the watcher pin). Mismatch → 403. 3. Completion race — two concurrent POSTs both passed the unlocked `authorizeRunForWorker` status read, both opened a tx, both INSERTed a watcher_windows row; the loser's run-UPDATE saw 0 rows and the tx committed a phantom window. Now lock `SELECT ... FOR UPDATE` on the run row at the top of the tx; if status is no longer 'running', return 200 idempotently with `{idempotent: true}`. 4. Window-id allocation — drop the inline `COALESCE(MAX(id), 0) + 1` in favour of the codebase's shared `getNextNumericId(tx, 'watcher_windows')` helper (whitelisted, same pattern used by manage_watchers). 5. Malformed payload no longer stranded the run — validation that throws inside the tx rolls back to `running`, and there's no stale-run sweep today. Validate `approved_input.window_start` / `window_end` BEFORE the transaction; on bad input mark the run `failed` (so next_run_at advances normally) and return 400. Tests: three integration tests covering #1, #3, and #5. The user-scoped spoof test (#2) needs OAuth device-flow setup that isn't wired up in the current test fixtures — leaving it for a follow-up that lands the helper alongside other user-scoped worker tests.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@packages/server/src/__tests__/integration/watchers/automation-contract.test.ts`:
- Around line 554-574: The test "complete-watcher endpoint refuses non-watcher
run types" inserts a run but never marks it claimed, so the request can fail
authorization before the non-watcher check; update the INSERT in this test (the
SQL that produces runId) to include claimed_by = 'any' and claimed_at =
current_timestamp (or a concrete timestamp) so the run is claimed by the same
worker_id used in the post request, ensuring the 409 comes from the non-watcher
guard; locate the INSERT that returns id in this test (and the runId variable)
and add the claimed_by/claimed_at fields to the VALUES clause.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: cfb527d5-3bf4-4bc7-9f39-312f1be5cae7
📒 Files selected for processing (4)
packages/server/src/__tests__/integration/watchers/automation-contract.test.tspackages/server/src/tools/admin/manage_watchers.tspackages/server/src/watchers/automation.tspackages/server/src/worker-api.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/server/src/watchers/automation.ts
…oc, no double-advance
A) Bound-worker check now reads `c.var.mcpAuthInfo?.workerId` (PAT/OAuth
token binding), not body.worker_id. Same-user attacker can't complete
as another registered worker by lying in the payload.
B) `getNextNumericId` takes a per-table `pg_advisory_xact_lock`
(`hashtext('<table>_id_alloc')`). Inside `completeWatcherRun`'s tx the
lock is held until commit, so two concurrent completions on different
watcher runs serialize on allocation instead of racing on MAX(id)+1.
C) Validation-failure path only advances the schedule when the
`UPDATE … WHERE status='running'` actually matched a row (RETURNING-gated).
Two concurrent malformed POSTs no longer double-tick `next_run_at`.
Tests:
- device spoof (Fix A): token bound to worker A, run pinned to worker B,
body posts worker-B — asserts 403 and run stays running.
- concurrent allocation (Fix B): two completions on different watchers
fired in parallel — both 200, distinct watcher_windows.id.
- double-advance (Fix C): two malformed POSTs against the same run —
next_run_at advances only once.
#823) Goals (added in #813) had no behavior of their own — just a nullable FK from watchers.goal_id into a parallel `goals` table plus a parallel CRUD surface. Agents already encapsulate the watcher-grouping use case via watchers.agent_id; goals were the redundant layer. The Mac app stopped using the primitive in lobu-ai/owletto#151, and the primitive never shipped in a release (v7.0.0 doesn't include it; v7.1.0 is still open). Removed: - db/migrations/20260517160000_drop_goals_primitive.sql (drops the watchers.goal_id column and the goals table; reversible) - packages/server/src/db/embedded-schema-patches.ts: drop the `goals-primitive` patch entry - packages/server/src/tools/admin/manage_goals.ts and its registration - packages/server/src/sandbox/namespaces/goals.ts (client.goals SDK) and its method-metadata entries - goal_id from manage_watchers.ts (create/update/list), get_watchers.ts, and WatcherMetadata - manage_goals from auth/tool-access scope tables - goals-crud integration test Untouched: the dispatcher (#814), the watcher schema additions from #811, and packages/owletto (separate submodule).
Summary
Wires the server side of #798: when a watcher is pinned to a user-owned device worker, the matching device's Lobu Mac app claims the run via
/api/workers/polland posts the CLI's output back through a newcomplete-watcherendpoint. The server-side dispatcher (#808) already refuses these rows; the schema (#811) already has the columns.PR-2 (Owletto Mac app
WatcherDispatcher+ClaudeCodeExecutor) consumes this contract.Changes
utils/queue-helpers.ts— extendWatcherRunPayloadandcreateWatcherRun(params)to accept and persistdevice_worker_id+agent_kindintoapproved_inputJSONB.watchers/automation.ts—materializeDueWatcherRunsreads the new watchers columns and forwards them throughenqueueWatcherRunForRecord.parseWatcherRunPayloadround-trips the fields too.worker-api.ts(pollWorkerJob) — adds a parallel CTE branch claimingrun_type='watcher' AND approved_input->>'device_worker_id' = <this device's id>. Returns a watcher payload envelope{ watcher, event, context }for those rows (no connector code, no credentials, no compiled_code lookup).worker-api.ts(newcompleteWatcherRun) —POST /api/workers/me/runs/:runId/complete-watcher:authorizeRunForWorker(status='running' + claimed_by === worker_id)runs.statusto completed/failed with exit code/signal/reasonwatcher_windowsrow withmodel_used='device-cli'andextracted_data.kind='device_cli_output'watchers.last_fired_atwatcher:updatedlifecycle event for dashboard metric_seriesindex.ts— wire the route + whitelist the new path in the user-scoped worker allowlist (was previously 403'd for everything under/api/workers/me/...outside auth-profiles/feeds).Tests
device_worker_id+agent_kindtoapproved_input/complete-watcher→runs.status='completed',watcher_windowsrow with output +execution_time_ms,last_fired_atadvancederrorsupplied →runs.status='failed', no window row,exit_coderecordedTest plan
make typecheck(already passes locally)make build-packages(already passes locally)automation-contractsuite covers the new behavior (DB-required, skipped locally)device_worker_idset against a local Mac running Owletto withclaudein PATHOut of scope
ClaudeCodeExecutor— that's PR-2 inlobu-ai/owlettodevice_worker_id(separate web PR)min_cooldown_secondsonlyagent_kindis a hint, Owletto's dispatcher choosesSummary by CodeRabbit
New Features
Bug Fixes
Tests
Chores