Skip to content

feat(schema): watchers — device_worker_id, agent_kind, notification, cooldown columns#811

Merged
buremba merged 1 commit into
mainfrom
feat/watcher-schema-additions
May 17, 2026
Merged

feat(schema): watchers — device_worker_id, agent_kind, notification, cooldown columns#811
buremba merged 1 commit into
mainfrom
feat/watcher-schema-additions

Conversation

@buremba
Copy link
Copy Markdown
Member

@buremba buremba commented May 17, 2026

Summary

Adds the columns needed for the device-aware watcher dispatcher work in #798, per the schema spec in #799.

watchers additions

Column Type Default Notes
device_worker_id uuiddevice_workers(id) NULL Pin the watcher to a specific device worker (when its inputs live on that device). FK is ON DELETE NO ACTION — the dispatcher must decide whether to clear or reroute orphaned pins.
agent_kind text NULL Optional override of the owning agent's default kind (background, notifier, …).
notification_channel text NOT NULL 'canvas' Where firings surface. CHECK IN ('canvas','notification','both').
notification_priority text NOT NULL 'normal' Priority class used by the interrupt budget. CHECK IN ('low','normal','high').
min_cooldown_seconds integer NOT NULL 0 Minimum seconds between two firings. CHECK >= 0.
last_fired_at timestamptz NULL Last time the watcher actually dispatched a notification/canvas item.

Plus CREATE INDEX idx_watchers_device_worker_id ON watchers (device_worker_id) WHERE device_worker_id IS NOT NULL so the dispatcher's "watchers pinned to device X" query stays cheap.

device_workers additions

  • notification_budget_per_day integer NOT NULL DEFAULT 10 with CHECK >= 0 — the per-device daily global interrupt budget. 10/day is a placeholder default; tune once the dispatcher exists.

Where it ships

  • db/migrations/20260517060000_watcher_schema_additions.sql — forward + down for prod / existing dev databases.
  • EMBEDDED_SCHEMA_PATCHES['watcher-schema-additions'] — idempotent mirror for already-initialised PGlite installs (duplicate_column-tolerant + IF NOT EXISTS everywhere).
  • db/schema.sql regenerated via dbmate dump + scripts/normalize-schema.sh; schema_migrations.version length restored to varchar(128) (local pg_dump strips it).
  • manage_watchers accepts the new fields with TypeBox enum / non-negative validation on the create + update paths, and the INSERT/UPDATE SQL persists them. create_from_version inherits DB defaults — fine for the spec's "existing CRUD continues to work when new columns are at defaults".

Verification

  • make build-packages + bun run typecheck clean.
  • Fresh dbmate apply: dbmate up ran clean on a new local Postgres; dbmate rollback then dbmate up round-trips cleanly.
  • Fresh PGlite boot (start-local.bundle.mjs with LOBU_ALLOW_EPHEMERAL_ENCRYPTION_KEY=1, isolated LOBU_DATA_DIR): comes up clean, columns + constraints + index present (verified by querying information_schema.columns and pg_constraint).
  • Patch idempotency: applied 3× back-to-back over a minimal watchers + device_workers skeleton — no errors, final shape stable.
  • A second boot against the same data dir trips a pre-existing, unrelated bug in the scheduled-jobs embedded patch on main (it re-adds a single-column FK after the per-org PK swap dropped the unique constraint that backed it). Reproduces on main@a88d8401 without these changes, so it's out of scope for this PR.

Follow-ups

Closes #799

Summary by CodeRabbit

  • New Features
    • Watchers now support configurable notification channels and priority levels.
    • Added cooldown period settings to control notification frequency.
    • Device workers now have daily notification budget limits.
    • Enhanced watcher management with device worker assignment capabilities.

Review Change Stack

…cooldown columns

Adds the columns needed for the device-aware watcher dispatcher (#798):

* watchers.device_worker_id (uuid → device_workers.id)
* watchers.agent_kind (text, nullable)
* watchers.notification_channel (text NOT NULL DEFAULT 'canvas',
  CHECK in (canvas, notification, both))
* watchers.notification_priority (text NOT NULL DEFAULT 'normal',
  CHECK in (low, normal, high))
* watchers.min_cooldown_seconds (integer NOT NULL DEFAULT 0, CHECK >= 0)
* watchers.last_fired_at (timestamptz, nullable)
* device_workers.notification_budget_per_day (integer NOT NULL DEFAULT 10,
  CHECK >= 0) for the global per-device interrupt budget
* idx_watchers_device_worker_id partial index

Ships as both a dbmate migration (db/migrations/20260517060000_*) and the
matching idempotent embedded-schema patch (watcher-schema-additions) so
fresh PGlite installs and already-initialised embedded databases converge
to the same shape. manage_watchers accepts and persists the new fields,
with enum/non-negative validation enforced via TypeBox at the tool layer.

Closes #799
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 17, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 08341a96-404c-497e-b6c7-38295ac024bb

📥 Commits

Reviewing files that changed from the base of the PR and between fdbfc27 and 98378bd.

📒 Files selected for processing (4)
  • db/migrations/20260517060000_watcher_schema_additions.sql
  • db/schema.sql
  • packages/server/src/db/embedded-schema-patches.ts
  • packages/server/src/tools/admin/manage_watchers.ts

📝 Walkthrough

Walkthrough

This PR extends the watchers table with device-worker routing and notification configuration. It adds a device_worker_id foreign key, agent_kind field, notification channel and priority enums, per-watcher cooldown timing, and daily budget tracking on device workers. Changes span database migrations, schema definition, embedded-database patches, and admin tooling.

Changes

Watcher notification and device-worker routing

Layer / File(s) Summary
Database schema migration and definition
db/migrations/20260517060000_watcher_schema_additions.sql, db/schema.sql
Migration adds/removes device routing and notification columns on watchers; schema extends both watchers (device_worker_id FK, agent_kind, notification_channel, notification_priority, min_cooldown_seconds, last_fired_at) and device_workers (notification_budget_per_day). Includes constraints on notification enums and non-negative cooldown/budget values, a partial index on device_worker_id, and migration version seed.
Embedded database schema patching
packages/server/src/db/embedded-schema-patches.ts
Idempotent watcher-schema-additions patch for PGlite databases. Safely replays column additions using exception-guarded DO blocks, adds check constraints guarded by pg_constraint existence checks, creates index with IF NOT EXISTS guard, and extends device_workers with budget column and constraint.
Admin tool schema and persistence integration
packages/server/src/tools/admin/manage_watchers.ts
ManageWatchersSchema expanded with optional device_worker_id, agent_kind, and notification configuration fields. Create action inserts new columns with type-safe defaults. Update action adds fields to updatedFields list and issues CASE expressions for column updates including notification/cooldown defaulting.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • lobu-ai/lobu#808: Both PRs coordinate device-worker pinning for watcher dispatch; this PR adds the watchers.device_worker_id schema column and admin plumbing, while that PR updates the dispatcher to skip runs when approved_input.device_worker_id is set.

Poem

🐰 ✨ The watchers now know where to go,
To devices and users, a guided flow,
With cooldowns and budgets, they'll never collide,
Notifications arrive at the perfect stride,
Device-aware routing—our watcher guide! 🎯

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/watcher-schema-additions

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@buremba buremba merged commit 76aaf2d into main May 17, 2026
17 of 20 checks passed
@buremba buremba deleted the feat/watcher-schema-additions branch May 17, 2026 04:20
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 98378bddd4

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +366 to +369
device_worker_id: Type.Optional(
Type.Union([Type.String(), Type.Null()], {
description:
'[create/update] Optional device worker UUID to pin this watcher to (when its inputs live on that device). Null clears the pin.',
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate watcher device pins into queued runs

When callers set device_worker_id on a watcher, the pin is only stored on watchers; createWatcherRun() still builds runs.approved_input without that field, while the server dispatcher only skips device-bound watcher runs by checking approved_input->>'device_worker_id'. As a result, scheduled or manually triggered runs for a pinned watcher are still claimed and executed by the server-side dispatcher instead of being left for the user's device worker.

Useful? React with 👍 / 👎.

buremba added a commit that referenced this pull request May 17, 2026
* feat(watchers): device-pinned watcher runs end-to-end (#798 PR-1)

The schema work in #811 added watchers.device_worker_id + agent_kind +
notification + cooldown columns, and #808 made the server-side
dispatcher skip runs already pinned to a device. This PR wires the
pinning + claim + completion sides so a user's Lobu Mac app can
actually execute the run via a local CLI agent (Claude Code, etc.).

Server-side:

  1. `materializeDueWatcherRuns` now reads `watchers.device_worker_id`
     and `watchers.agent_kind` and persists both into the run's
     `approved_input` JSONB. The existing #802 dispatcher exclusion
     keys off `approved_input->>'device_worker_id'`, so device-pinned
     rows stay `pending` on the server side.

  2. `/api/workers/poll` gains a parallel CTE branch for
     `run_type='watcher'` AND
     `approved_input->>'device_worker_id' = <this device's id>`. The
     poll response is short-circuited for watchers — no connector
     code, no credentials, no compiled_code lookup. It returns a
     watcher payload envelope `{ watcher, event, context }` that the
     device-side dispatcher uses to build a CLI prompt.

  3. New endpoint `POST /api/workers/me/runs/:runId/complete-watcher`:
     authorizes via the existing claim-ownership gate
     (`authorizeRunForWorker`), writes a `watcher_windows` row with
     `model_used='device-cli'` and `extracted_data.kind='device_cli_output'`,
     updates `runs.status` to completed/failed, advances
     `watchers.last_fired_at`, and emits a `watcher:updated` lifecycle
     event so dashboard metric_series picks up the run.

  4. The new path is whitelisted in the user-scoped worker route gate
     (was previously 403 for `/api/workers/me/...` outside
     auth-profiles/feeds).

Tests:

  - materializeDueWatcherRuns persists device_worker_id + agent_kind.
  - End-to-end complete-watcher → runs.completed, watcher_windows
    row with the CLI output, last_fired_at advanced.
  - Failure path: error supplied → runs.failed, no window row.
  - 409 for non-watcher run types, 404 for unknown run ids.

PR-2 (Owletto Mac app dispatcher + ClaudeCodeExecutor) consumes this
contract.

* fix(watchers): close pi review on device-side complete-watcher (#814)

5 issues from pi's review of #814's WatcherDispatcher / completeWatcherRun
that all materialise on real device traffic:

1. (BLOCKER) Schedule advancement mismatch — completing a device watcher
   moved `last_fired_at` forward but never bumped `next_run_at`, so the
   scheduler tick re-materialised the same watcher every minute forever.
   Extract `advanceWatcherSchedule(sql|tx, watcherId)` from automation.ts
   (was `advanceWatcherScheduleAfterTerminalFailure`) and call it from
   both manage_watchers(action="complete_window") and the device
   complete-watcher endpoint after the in-transaction completion writes.

2. Device-identity binding — `claimed_by === body.worker_id` alone is
   spoofable across devices that share a user OAuth token: any other
   device with the token could claim a run under any worker_id, then a
   third caller with the same token could complete it. For user-scoped
   workers we now resolve the caller's `device_workers.id` from
   `(workerUserId, body.worker_id)` and require it to equal
   `approved_input.device_worker_id` (which materializeDueWatcherRuns
   already snapshotted from the watcher pin). Mismatch → 403.

3. Completion race — two concurrent POSTs both passed the unlocked
   `authorizeRunForWorker` status read, both opened a tx, both INSERTed
   a watcher_windows row; the loser's run-UPDATE saw 0 rows and the tx
   committed a phantom window. Now lock `SELECT ... FOR UPDATE` on the
   run row at the top of the tx; if status is no longer 'running',
   return 200 idempotently with `{idempotent: true}`.

4. Window-id allocation — drop the inline `COALESCE(MAX(id), 0) + 1`
   in favour of the codebase's shared `getNextNumericId(tx, 'watcher_windows')`
   helper (whitelisted, same pattern used by manage_watchers).

5. Malformed payload no longer stranded the run — validation that
   throws inside the tx rolls back to `running`, and there's no
   stale-run sweep today. Validate `approved_input.window_start` /
   `window_end` BEFORE the transaction; on bad input mark the run
   `failed` (so next_run_at advances normally) and return 400.

Tests: three integration tests covering #1, #3, and #5. The user-scoped
spoof test (#2) needs OAuth device-flow setup that isn't wired up in
the current test fixtures — leaving it for a follow-up that lands the
helper alongside other user-scoped worker tests.

* fix(watchers): pi round-2 — bind workerId from auth, race-free id alloc, no double-advance

A) Bound-worker check now reads `c.var.mcpAuthInfo?.workerId` (PAT/OAuth
   token binding), not body.worker_id. Same-user attacker can't complete
   as another registered worker by lying in the payload.

B) `getNextNumericId` takes a per-table `pg_advisory_xact_lock`
   (`hashtext('<table>_id_alloc')`). Inside `completeWatcherRun`'s tx the
   lock is held until commit, so two concurrent completions on different
   watcher runs serialize on allocation instead of racing on MAX(id)+1.

C) Validation-failure path only advances the schedule when the
   `UPDATE … WHERE status='running'` actually matched a row (RETURNING-gated).
   Two concurrent malformed POSTs no longer double-tick `next_run_at`.

Tests:
- device spoof (Fix A): token bound to worker A, run pinned to worker B,
  body posts worker-B — asserts 403 and run stays running.
- concurrent allocation (Fix B): two completions on different watchers
  fired in parallel — both 200, distinct watcher_windows.id.
- double-advance (Fix C): two malformed POSTs against the same run —
  next_run_at advances only once.
buremba added a commit that referenced this pull request May 17, 2026
#823)

Goals (added in #813) had no behavior of their own — just a nullable FK
from watchers.goal_id into a parallel `goals` table plus a parallel CRUD
surface. Agents already encapsulate the watcher-grouping use case via
watchers.agent_id; goals were the redundant layer. The Mac app stopped
using the primitive in lobu-ai/owletto#151, and the primitive never
shipped in a release (v7.0.0 doesn't include it; v7.1.0 is still open).

Removed:
- db/migrations/20260517160000_drop_goals_primitive.sql (drops the
  watchers.goal_id column and the goals table; reversible)
- packages/server/src/db/embedded-schema-patches.ts: drop the
  `goals-primitive` patch entry
- packages/server/src/tools/admin/manage_goals.ts and its registration
- packages/server/src/sandbox/namespaces/goals.ts (client.goals SDK)
  and its method-metadata entries
- goal_id from manage_watchers.ts (create/update/list), get_watchers.ts,
  and WatcherMetadata
- manage_goals from auth/tool-access scope tables
- goals-crud integration test

Untouched: the dispatcher (#814), the watcher schema additions from
#811, and packages/owletto (separate submodule).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Watcher schema: add device_worker_id, agent_kind, notification opt-in, cooldown columns

2 participants