diff --git a/db/migrations/20260517060000_watcher_schema_additions.sql b/db/migrations/20260517060000_watcher_schema_additions.sql new file mode 100644 index 000000000..1ca536903 --- /dev/null +++ b/db/migrations/20260517060000_watcher_schema_additions.sql @@ -0,0 +1,78 @@ +-- migrate:up + +-- Adds the columns needed by the device-aware watcher dispatcher (see +-- lobu-ai/lobu#798 / #799): +-- +-- * device_worker_id — pin a watcher to a specific device worker +-- (when its inputs live on that device). +-- * agent_kind — overrides the owning agent's default kind for +-- this watcher (e.g. "background", "notifier"). +-- * notification_channel — where firings surface: canvas, OS notification, +-- or both. +-- * notification_priority — priority class used by the dispatcher's +-- interrupt budget. +-- * min_cooldown_seconds — minimum seconds between two firings of the +-- same watcher (0 = no cooldown). +-- * last_fired_at — last time this watcher actually dispatched +-- a notification/canvas item. +-- +-- Also adds device_workers.notification_budget_per_day for the per-device +-- global interrupt budget. 10/day is a placeholder default; tune later. + +ALTER TABLE public.watchers + ADD COLUMN device_worker_id uuid REFERENCES public.device_workers(id), + ADD COLUMN agent_kind text, + ADD COLUMN notification_channel text NOT NULL DEFAULT 'canvas', + ADD COLUMN notification_priority text NOT NULL DEFAULT 'normal', + ADD COLUMN min_cooldown_seconds integer NOT NULL DEFAULT 0, + ADD COLUMN last_fired_at timestamp with time zone; + +ALTER TABLE public.watchers + ADD CONSTRAINT watchers_notification_channel_check + CHECK (notification_channel IN ('canvas', 'notification', 'both')); + +ALTER TABLE public.watchers + ADD CONSTRAINT watchers_notification_priority_check + CHECK (notification_priority IN ('low', 'normal', 'high')); + +ALTER TABLE public.watchers + ADD CONSTRAINT watchers_min_cooldown_seconds_nonneg + CHECK (min_cooldown_seconds >= 0); + +CREATE INDEX IF NOT EXISTS idx_watchers_device_worker_id + ON public.watchers (device_worker_id) + WHERE device_worker_id IS NOT NULL; + +ALTER TABLE public.device_workers + ADD COLUMN notification_budget_per_day integer NOT NULL DEFAULT 10; + +ALTER TABLE public.device_workers + ADD CONSTRAINT device_workers_notification_budget_per_day_nonneg + CHECK (notification_budget_per_day >= 0); + +-- migrate:down + +ALTER TABLE public.device_workers + DROP CONSTRAINT IF EXISTS device_workers_notification_budget_per_day_nonneg; + +ALTER TABLE public.device_workers + DROP COLUMN IF EXISTS notification_budget_per_day; + +DROP INDEX IF EXISTS public.idx_watchers_device_worker_id; + +ALTER TABLE public.watchers + DROP CONSTRAINT IF EXISTS watchers_min_cooldown_seconds_nonneg; + +ALTER TABLE public.watchers + DROP CONSTRAINT IF EXISTS watchers_notification_priority_check; + +ALTER TABLE public.watchers + DROP CONSTRAINT IF EXISTS watchers_notification_channel_check; + +ALTER TABLE public.watchers + DROP COLUMN IF EXISTS last_fired_at, + DROP COLUMN IF EXISTS min_cooldown_seconds, + DROP COLUMN IF EXISTS notification_priority, + DROP COLUMN IF EXISTS notification_channel, + DROP COLUMN IF EXISTS agent_kind, + DROP COLUMN IF EXISTS device_worker_id; diff --git a/db/schema.sql b/db/schema.sql index 7bea93d5e..88fe03247 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -626,7 +626,9 @@ CREATE TABLE public.device_workers ( first_seen_at timestamp with time zone DEFAULT now() NOT NULL, last_seen_at timestamp with time zone DEFAULT now() NOT NULL, id uuid DEFAULT gen_random_uuid() NOT NULL, - organization_id text + organization_id text, + notification_budget_per_day integer DEFAULT 10 NOT NULL, + CONSTRAINT device_workers_notification_budget_per_day_nonneg CHECK ((notification_budget_per_day >= 0)) ); -- @@ -2002,7 +2004,16 @@ CREATE TABLE public.watchers ( scheduler_client_id text, source_watcher_id integer, watcher_group_id integer NOT NULL, - CONSTRAINT insights_status_check CHECK ((status = ANY (ARRAY['active'::text, 'archived'::text]))) + device_worker_id uuid, + agent_kind text, + notification_channel text DEFAULT 'canvas'::text NOT NULL, + notification_priority text DEFAULT 'normal'::text NOT NULL, + min_cooldown_seconds integer DEFAULT 0 NOT NULL, + last_fired_at timestamp with time zone, + CONSTRAINT insights_status_check CHECK ((status = ANY (ARRAY['active'::text, 'archived'::text]))), + CONSTRAINT watchers_min_cooldown_seconds_nonneg CHECK ((min_cooldown_seconds >= 0)), + CONSTRAINT watchers_notification_channel_check CHECK ((notification_channel = ANY (ARRAY['canvas'::text, 'notification'::text, 'both'::text]))), + CONSTRAINT watchers_notification_priority_check CHECK ((notification_priority = ANY (ARRAY['low'::text, 'normal'::text, 'high'::text]))) ); -- @@ -3800,6 +3811,12 @@ CREATE INDEX idx_watchers_connection_id ON public.watchers USING btree (connecti CREATE INDEX idx_watchers_created_by ON public.watchers USING btree (created_by); +-- +-- Name: idx_watchers_device_worker_id; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_watchers_device_worker_id ON public.watchers USING btree (device_worker_id) WHERE (device_worker_id IS NOT NULL); + -- -- Name: idx_watchers_entity_ids; Type: INDEX; Schema: public; Owner: - -- @@ -4919,6 +4936,13 @@ ALTER TABLE ONLY public.watcher_windows ALTER TABLE ONLY public.watchers ADD CONSTRAINT watchers_current_version_id_fkey FOREIGN KEY (current_version_id) REFERENCES public.watcher_versions(id) ON DELETE SET NULL; +-- +-- Name: watchers watchers_device_worker_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.watchers + ADD CONSTRAINT watchers_device_worker_id_fkey FOREIGN KEY (device_worker_id) REFERENCES public.device_workers(id); + -- -- Name: watchers watchers_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -5003,4 +5027,5 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260517020000'), ('20260517030000'), ('20260517040000'), - ('20260517050000'); + ('20260517050000'), + ('20260517060000'); diff --git a/packages/server/src/db/embedded-schema-patches.ts b/packages/server/src/db/embedded-schema-patches.ts index 02a49ec28..2658eea4a 100644 --- a/packages/server/src/db/embedded-schema-patches.ts +++ b/packages/server/src/db/embedded-schema-patches.ts @@ -697,4 +697,122 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [ `); }, }, + { + // Mirrors db/migrations/20260517060000_watcher_schema_additions.sql. + // Adds dispatcher-related columns to watchers (device_worker_id, + // agent_kind, notification_channel, notification_priority, + // min_cooldown_seconds, last_fired_at) plus the per-device daily + // notification budget on device_workers. Idempotent for replay on + // already-initialised embedded/PGlite databases — each ADD COLUMN is + // wrapped in a duplicate_column-tolerant DO block; constraint and index + // creation is gated on pg_constraint / IF NOT EXISTS. + id: 'watcher-schema-additions', + apply: async (sql) => { + const watcherColumns: Array<{ name: string; ddl: string }> = [ + { + name: 'device_worker_id', + ddl: `ALTER TABLE public.watchers + ADD COLUMN device_worker_id uuid REFERENCES public.device_workers(id)`, + }, + { + name: 'agent_kind', + ddl: `ALTER TABLE public.watchers ADD COLUMN agent_kind text`, + }, + { + name: 'notification_channel', + ddl: `ALTER TABLE public.watchers + ADD COLUMN notification_channel text NOT NULL DEFAULT 'canvas'`, + }, + { + name: 'notification_priority', + ddl: `ALTER TABLE public.watchers + ADD COLUMN notification_priority text NOT NULL DEFAULT 'normal'`, + }, + { + name: 'min_cooldown_seconds', + ddl: `ALTER TABLE public.watchers + ADD COLUMN min_cooldown_seconds integer NOT NULL DEFAULT 0`, + }, + { + name: 'last_fired_at', + ddl: `ALTER TABLE public.watchers + ADD COLUMN last_fired_at timestamp with time zone`, + }, + ]; + for (const col of watcherColumns) { + await sql.unsafe(` + DO $$ + BEGIN + ${col.ddl}; + EXCEPTION WHEN duplicate_column THEN NULL; + END $$; + `); + } + + await sql.unsafe(` + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'watchers_notification_channel_check' + ) THEN + ALTER TABLE public.watchers + ADD CONSTRAINT watchers_notification_channel_check + CHECK (notification_channel IN ('canvas', 'notification', 'both')); + END IF; + END $$; + `); + await sql.unsafe(` + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'watchers_notification_priority_check' + ) THEN + ALTER TABLE public.watchers + ADD CONSTRAINT watchers_notification_priority_check + CHECK (notification_priority IN ('low', 'normal', 'high')); + END IF; + END $$; + `); + await sql.unsafe(` + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint WHERE conname = 'watchers_min_cooldown_seconds_nonneg' + ) THEN + ALTER TABLE public.watchers + ADD CONSTRAINT watchers_min_cooldown_seconds_nonneg + CHECK (min_cooldown_seconds >= 0); + END IF; + END $$; + `); + + await sql.unsafe(` + CREATE INDEX IF NOT EXISTS idx_watchers_device_worker_id + ON public.watchers (device_worker_id) + WHERE device_worker_id IS NOT NULL + `); + + await sql.unsafe(` + DO $$ + BEGIN + ALTER TABLE public.device_workers + ADD COLUMN notification_budget_per_day integer NOT NULL DEFAULT 10; + EXCEPTION WHEN duplicate_column THEN NULL; + END $$; + `); + await sql.unsafe(` + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_constraint + WHERE conname = 'device_workers_notification_budget_per_day_nonneg' + ) THEN + ALTER TABLE public.device_workers + ADD CONSTRAINT device_workers_notification_budget_per_day_nonneg + CHECK (notification_budget_per_day >= 0); + END IF; + END $$; + `); + }, + }, ]; diff --git a/packages/server/src/tools/admin/manage_watchers.ts b/packages/server/src/tools/admin/manage_watchers.ts index 2b1d0ccbd..f105561b9 100644 --- a/packages/server/src/tools/admin/manage_watchers.ts +++ b/packages/server/src/tools/admin/manage_watchers.ts @@ -363,6 +363,47 @@ export const ManageWatchersSchema = Type.Object({ '[create/update/create_version] Optional MCP client ID that should auto-run this watcher.', }) ), + device_worker_id: Type.Optional( + Type.Union([Type.String(), Type.Null()], { + description: + '[create/update] Optional device worker UUID to pin this watcher to (when its inputs live on that device). Null clears the pin.', + }) + ), + agent_kind: Type.Optional( + Type.Union([Type.String(), Type.Null()], { + description: + '[create/update] Optional agent kind override for this watcher (e.g. "background", "notifier"). Null clears the override.', + }) + ), + notification_channel: Type.Optional( + Type.Union( + [ + Type.Literal('canvas'), + Type.Literal('notification'), + Type.Literal('both'), + ], + { + description: + '[create/update] Where firings surface: "canvas" (default), "notification" (OS notification), or "both".', + } + ) + ), + notification_priority: Type.Optional( + Type.Union( + [Type.Literal('low'), Type.Literal('normal'), Type.Literal('high')], + { + description: + '[create/update] Priority class used by the dispatcher interrupt budget. Default "normal".', + } + ) + ), + min_cooldown_seconds: Type.Optional( + Type.Number({ + description: + '[create/update] Minimum seconds between two firings of this watcher (0 = no cooldown).', + minimum: 0, + }) + ), model_config: Type.Optional(Type.Any({ description: '[create/update] AI model configuration' })), tags: Type.Optional(Type.Array(Type.String(), { description: '[create] Tags for filtering' })), @@ -1012,7 +1053,9 @@ async function handleCreate( id, name, slug, organization_id, entity_ids, schedule, next_run_at, agent_id, scheduler_client_id, model_config, sources, version, current_version_id, tags, status, created_by, created_at, updated_at, - watcher_group_id + watcher_group_id, + device_worker_id, agent_kind, + notification_channel, notification_priority, min_cooldown_seconds ) VALUES ( ${watcherId}, ${args.name ?? args.slug}, ${args.slug}, ${organizationId}, ${`{${entityIdsArray.join(',')}}`}::bigint[], @@ -1021,7 +1064,11 @@ async function handleCreate( ${sql.json(args.model_config || {})}, ${sql.json(sources)}, 1, NULL, ${toTextArrayParam(args.tags || [])}::text[], 'active', ${createdBy}, NOW(), NOW(), - ${watcherId} + ${watcherId}, + ${args.device_worker_id ?? null}, ${args.agent_kind ?? null}, + ${args.notification_channel ?? 'canvas'}, + ${args.notification_priority ?? 'normal'}, + ${args.min_cooldown_seconds ?? 0} ) `; @@ -1262,6 +1309,11 @@ async function handleUpdate( if (args.agent_id !== undefined) updatedFields.push('agent_id'); if (args.scheduler_client_id !== undefined) updatedFields.push('scheduler_client_id'); if (args.tags !== undefined) updatedFields.push('tags'); + if (args.device_worker_id !== undefined) updatedFields.push('device_worker_id'); + if (args.agent_kind !== undefined) updatedFields.push('agent_kind'); + if (args.notification_channel !== undefined) updatedFields.push('notification_channel'); + if (args.notification_priority !== undefined) updatedFields.push('notification_priority'); + if (args.min_cooldown_seconds !== undefined) updatedFields.push('min_cooldown_seconds'); if (updatedFields.length === 0) { return { @@ -1282,7 +1334,12 @@ async function handleUpdate( next_run_at = CASE WHEN ${args.schedule !== undefined} THEN ${nextRunAtVal}::timestamptz ELSE next_run_at END, agent_id = CASE WHEN ${args.agent_id !== undefined} THEN ${args.agent_id ?? null} ELSE agent_id END, scheduler_client_id = CASE WHEN ${args.scheduler_client_id !== undefined} THEN ${args.scheduler_client_id ?? null} ELSE scheduler_client_id END, - tags = CASE WHEN ${args.tags !== undefined} THEN ${toTextArrayParam(args.tags || [])}::text[] ELSE tags END + tags = CASE WHEN ${args.tags !== undefined} THEN ${toTextArrayParam(args.tags || [])}::text[] ELSE tags END, + device_worker_id = CASE WHEN ${args.device_worker_id !== undefined} THEN ${args.device_worker_id ?? null}::uuid ELSE device_worker_id END, + agent_kind = CASE WHEN ${args.agent_kind !== undefined} THEN ${args.agent_kind ?? null} ELSE agent_kind END, + notification_channel = CASE WHEN ${args.notification_channel !== undefined} THEN ${args.notification_channel ?? 'canvas'} ELSE notification_channel END, + notification_priority = CASE WHEN ${args.notification_priority !== undefined} THEN ${args.notification_priority ?? 'normal'} ELSE notification_priority END, + min_cooldown_seconds = CASE WHEN ${args.min_cooldown_seconds !== undefined} THEN ${args.min_cooldown_seconds ?? 0} ELSE min_cooldown_seconds END WHERE id = ${args.watcher_id} `;