Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions db/migrations/20260517060000_watcher_schema_additions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
-- migrate:up

-- Adds the columns needed by the device-aware watcher dispatcher (see
-- lobu-ai/lobu#798 / #799):
--
-- * device_worker_id — pin a watcher to a specific device worker
-- (when its inputs live on that device).
-- * agent_kind — overrides the owning agent's default kind for
-- this watcher (e.g. "background", "notifier").
-- * notification_channel — where firings surface: canvas, OS notification,
-- or both.
-- * notification_priority — priority class used by the dispatcher's
-- interrupt budget.
-- * min_cooldown_seconds — minimum seconds between two firings of the
-- same watcher (0 = no cooldown).
-- * last_fired_at — last time this watcher actually dispatched
-- a notification/canvas item.
--
-- Also adds device_workers.notification_budget_per_day for the per-device
-- global interrupt budget. 10/day is a placeholder default; tune later.

ALTER TABLE public.watchers
ADD COLUMN device_worker_id uuid REFERENCES public.device_workers(id),
ADD COLUMN agent_kind text,
ADD COLUMN notification_channel text NOT NULL DEFAULT 'canvas',
ADD COLUMN notification_priority text NOT NULL DEFAULT 'normal',
ADD COLUMN min_cooldown_seconds integer NOT NULL DEFAULT 0,
ADD COLUMN last_fired_at timestamp with time zone;

ALTER TABLE public.watchers
ADD CONSTRAINT watchers_notification_channel_check
CHECK (notification_channel IN ('canvas', 'notification', 'both'));

ALTER TABLE public.watchers
ADD CONSTRAINT watchers_notification_priority_check
CHECK (notification_priority IN ('low', 'normal', 'high'));

ALTER TABLE public.watchers
ADD CONSTRAINT watchers_min_cooldown_seconds_nonneg
CHECK (min_cooldown_seconds >= 0);

CREATE INDEX IF NOT EXISTS idx_watchers_device_worker_id
ON public.watchers (device_worker_id)
WHERE device_worker_id IS NOT NULL;

ALTER TABLE public.device_workers
ADD COLUMN notification_budget_per_day integer NOT NULL DEFAULT 10;

ALTER TABLE public.device_workers
ADD CONSTRAINT device_workers_notification_budget_per_day_nonneg
CHECK (notification_budget_per_day >= 0);

-- migrate:down

ALTER TABLE public.device_workers
DROP CONSTRAINT IF EXISTS device_workers_notification_budget_per_day_nonneg;

ALTER TABLE public.device_workers
DROP COLUMN IF EXISTS notification_budget_per_day;

DROP INDEX IF EXISTS public.idx_watchers_device_worker_id;

ALTER TABLE public.watchers
DROP CONSTRAINT IF EXISTS watchers_min_cooldown_seconds_nonneg;

ALTER TABLE public.watchers
DROP CONSTRAINT IF EXISTS watchers_notification_priority_check;

ALTER TABLE public.watchers
DROP CONSTRAINT IF EXISTS watchers_notification_channel_check;

ALTER TABLE public.watchers
DROP COLUMN IF EXISTS last_fired_at,
DROP COLUMN IF EXISTS min_cooldown_seconds,
DROP COLUMN IF EXISTS notification_priority,
DROP COLUMN IF EXISTS notification_channel,
DROP COLUMN IF EXISTS agent_kind,
DROP COLUMN IF EXISTS device_worker_id;
31 changes: 28 additions & 3 deletions db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,9 @@ CREATE TABLE public.device_workers (
first_seen_at timestamp with time zone DEFAULT now() NOT NULL,
last_seen_at timestamp with time zone DEFAULT now() NOT NULL,
id uuid DEFAULT gen_random_uuid() NOT NULL,
organization_id text
organization_id text,
notification_budget_per_day integer DEFAULT 10 NOT NULL,
CONSTRAINT device_workers_notification_budget_per_day_nonneg CHECK ((notification_budget_per_day >= 0))
);

--
Expand Down Expand Up @@ -2002,7 +2004,16 @@ CREATE TABLE public.watchers (
scheduler_client_id text,
source_watcher_id integer,
watcher_group_id integer NOT NULL,
CONSTRAINT insights_status_check CHECK ((status = ANY (ARRAY['active'::text, 'archived'::text])))
device_worker_id uuid,
agent_kind text,
notification_channel text DEFAULT 'canvas'::text NOT NULL,
notification_priority text DEFAULT 'normal'::text NOT NULL,
min_cooldown_seconds integer DEFAULT 0 NOT NULL,
last_fired_at timestamp with time zone,
CONSTRAINT insights_status_check CHECK ((status = ANY (ARRAY['active'::text, 'archived'::text]))),
CONSTRAINT watchers_min_cooldown_seconds_nonneg CHECK ((min_cooldown_seconds >= 0)),
CONSTRAINT watchers_notification_channel_check CHECK ((notification_channel = ANY (ARRAY['canvas'::text, 'notification'::text, 'both'::text]))),
CONSTRAINT watchers_notification_priority_check CHECK ((notification_priority = ANY (ARRAY['low'::text, 'normal'::text, 'high'::text])))
);

--
Expand Down Expand Up @@ -3800,6 +3811,12 @@ CREATE INDEX idx_watchers_connection_id ON public.watchers USING btree (connecti

CREATE INDEX idx_watchers_created_by ON public.watchers USING btree (created_by);

--
-- Name: idx_watchers_device_worker_id; Type: INDEX; Schema: public; Owner: -
--

CREATE INDEX idx_watchers_device_worker_id ON public.watchers USING btree (device_worker_id) WHERE (device_worker_id IS NOT NULL);

--
-- Name: idx_watchers_entity_ids; Type: INDEX; Schema: public; Owner: -
--
Expand Down Expand Up @@ -4919,6 +4936,13 @@ ALTER TABLE ONLY public.watcher_windows
ALTER TABLE ONLY public.watchers
ADD CONSTRAINT watchers_current_version_id_fkey FOREIGN KEY (current_version_id) REFERENCES public.watcher_versions(id) ON DELETE SET NULL;

--
-- Name: watchers watchers_device_worker_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--

ALTER TABLE ONLY public.watchers
ADD CONSTRAINT watchers_device_worker_id_fkey FOREIGN KEY (device_worker_id) REFERENCES public.device_workers(id);

--
-- Name: watchers watchers_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
Expand Down Expand Up @@ -5003,4 +5027,5 @@ INSERT INTO public.schema_migrations (version) VALUES
('20260517020000'),
('20260517030000'),
('20260517040000'),
('20260517050000');
('20260517050000'),
('20260517060000');
118 changes: 118 additions & 0 deletions packages/server/src/db/embedded-schema-patches.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,4 +697,122 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [
`);
},
},
{
// Mirrors db/migrations/20260517060000_watcher_schema_additions.sql.
// Adds dispatcher-related columns to watchers (device_worker_id,
// agent_kind, notification_channel, notification_priority,
// min_cooldown_seconds, last_fired_at) plus the per-device daily
// notification budget on device_workers. Idempotent for replay on
// already-initialised embedded/PGlite databases — each ADD COLUMN is
// wrapped in a duplicate_column-tolerant DO block; constraint and index
// creation is gated on pg_constraint / IF NOT EXISTS.
id: 'watcher-schema-additions',
apply: async (sql) => {
const watcherColumns: Array<{ name: string; ddl: string }> = [
{
name: 'device_worker_id',
ddl: `ALTER TABLE public.watchers
ADD COLUMN device_worker_id uuid REFERENCES public.device_workers(id)`,
},
{
name: 'agent_kind',
ddl: `ALTER TABLE public.watchers ADD COLUMN agent_kind text`,
},
{
name: 'notification_channel',
ddl: `ALTER TABLE public.watchers
ADD COLUMN notification_channel text NOT NULL DEFAULT 'canvas'`,
},
{
name: 'notification_priority',
ddl: `ALTER TABLE public.watchers
ADD COLUMN notification_priority text NOT NULL DEFAULT 'normal'`,
},
{
name: 'min_cooldown_seconds',
ddl: `ALTER TABLE public.watchers
ADD COLUMN min_cooldown_seconds integer NOT NULL DEFAULT 0`,
},
{
name: 'last_fired_at',
ddl: `ALTER TABLE public.watchers
ADD COLUMN last_fired_at timestamp with time zone`,
},
];
for (const col of watcherColumns) {
await sql.unsafe(`
DO $$
BEGIN
${col.ddl};
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
`);
}

await sql.unsafe(`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'watchers_notification_channel_check'
) THEN
ALTER TABLE public.watchers
ADD CONSTRAINT watchers_notification_channel_check
CHECK (notification_channel IN ('canvas', 'notification', 'both'));
END IF;
END $$;
`);
await sql.unsafe(`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'watchers_notification_priority_check'
) THEN
ALTER TABLE public.watchers
ADD CONSTRAINT watchers_notification_priority_check
CHECK (notification_priority IN ('low', 'normal', 'high'));
END IF;
END $$;
`);
await sql.unsafe(`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'watchers_min_cooldown_seconds_nonneg'
) THEN
ALTER TABLE public.watchers
ADD CONSTRAINT watchers_min_cooldown_seconds_nonneg
CHECK (min_cooldown_seconds >= 0);
END IF;
END $$;
`);

await sql.unsafe(`
CREATE INDEX IF NOT EXISTS idx_watchers_device_worker_id
ON public.watchers (device_worker_id)
WHERE device_worker_id IS NOT NULL
`);

await sql.unsafe(`
DO $$
BEGIN
ALTER TABLE public.device_workers
ADD COLUMN notification_budget_per_day integer NOT NULL DEFAULT 10;
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
`);
await sql.unsafe(`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint
WHERE conname = 'device_workers_notification_budget_per_day_nonneg'
) THEN
ALTER TABLE public.device_workers
ADD CONSTRAINT device_workers_notification_budget_per_day_nonneg
CHECK (notification_budget_per_day >= 0);
END IF;
END $$;
`);
},
},
];
63 changes: 60 additions & 3 deletions packages/server/src/tools/admin/manage_watchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,47 @@ export const ManageWatchersSchema = Type.Object({
'[create/update/create_version] Optional MCP client ID that should auto-run this watcher.',
})
),
device_worker_id: Type.Optional(
Type.Union([Type.String(), Type.Null()], {
description:
'[create/update] Optional device worker UUID to pin this watcher to (when its inputs live on that device). Null clears the pin.',
Comment on lines +366 to +369
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate watcher device pins into queued runs

When callers set device_worker_id on a watcher, the pin is only stored on watchers; createWatcherRun() still builds runs.approved_input without that field, while the server dispatcher only skips device-bound watcher runs by checking approved_input->>'device_worker_id'. As a result, scheduled or manually triggered runs for a pinned watcher are still claimed and executed by the server-side dispatcher instead of being left for the user's device worker.

Useful? React with 👍 / 👎.

})
),
agent_kind: Type.Optional(
Type.Union([Type.String(), Type.Null()], {
description:
'[create/update] Optional agent kind override for this watcher (e.g. "background", "notifier"). Null clears the override.',
})
),
notification_channel: Type.Optional(
Type.Union(
[
Type.Literal('canvas'),
Type.Literal('notification'),
Type.Literal('both'),
],
{
description:
'[create/update] Where firings surface: "canvas" (default), "notification" (OS notification), or "both".',
}
)
),
notification_priority: Type.Optional(
Type.Union(
[Type.Literal('low'), Type.Literal('normal'), Type.Literal('high')],
{
description:
'[create/update] Priority class used by the dispatcher interrupt budget. Default "normal".',
}
)
),
min_cooldown_seconds: Type.Optional(
Type.Number({
description:
'[create/update] Minimum seconds between two firings of this watcher (0 = no cooldown).',
minimum: 0,
})
),
model_config: Type.Optional(Type.Any({ description: '[create/update] AI model configuration' })),
tags: Type.Optional(Type.Array(Type.String(), { description: '[create] Tags for filtering' })),

Expand Down Expand Up @@ -1012,7 +1053,9 @@ async function handleCreate(
id, name, slug, organization_id, entity_ids,
schedule, next_run_at, agent_id, scheduler_client_id, model_config, sources, version,
current_version_id, tags, status, created_by, created_at, updated_at,
watcher_group_id
watcher_group_id,
device_worker_id, agent_kind,
notification_channel, notification_priority, min_cooldown_seconds
) VALUES (
${watcherId}, ${args.name ?? args.slug}, ${args.slug}, ${organizationId},
${`{${entityIdsArray.join(',')}}`}::bigint[],
Expand All @@ -1021,7 +1064,11 @@ async function handleCreate(
${sql.json(args.model_config || {})}, ${sql.json(sources)},
1, NULL, ${toTextArrayParam(args.tags || [])}::text[],
'active', ${createdBy}, NOW(), NOW(),
${watcherId}
${watcherId},
${args.device_worker_id ?? null}, ${args.agent_kind ?? null},
${args.notification_channel ?? 'canvas'},
${args.notification_priority ?? 'normal'},
${args.min_cooldown_seconds ?? 0}
)
`;

Expand Down Expand Up @@ -1262,6 +1309,11 @@ async function handleUpdate(
if (args.agent_id !== undefined) updatedFields.push('agent_id');
if (args.scheduler_client_id !== undefined) updatedFields.push('scheduler_client_id');
if (args.tags !== undefined) updatedFields.push('tags');
if (args.device_worker_id !== undefined) updatedFields.push('device_worker_id');
if (args.agent_kind !== undefined) updatedFields.push('agent_kind');
if (args.notification_channel !== undefined) updatedFields.push('notification_channel');
if (args.notification_priority !== undefined) updatedFields.push('notification_priority');
if (args.min_cooldown_seconds !== undefined) updatedFields.push('min_cooldown_seconds');

if (updatedFields.length === 0) {
return {
Expand All @@ -1282,7 +1334,12 @@ async function handleUpdate(
next_run_at = CASE WHEN ${args.schedule !== undefined} THEN ${nextRunAtVal}::timestamptz ELSE next_run_at END,
agent_id = CASE WHEN ${args.agent_id !== undefined} THEN ${args.agent_id ?? null} ELSE agent_id END,
scheduler_client_id = CASE WHEN ${args.scheduler_client_id !== undefined} THEN ${args.scheduler_client_id ?? null} ELSE scheduler_client_id END,
tags = CASE WHEN ${args.tags !== undefined} THEN ${toTextArrayParam(args.tags || [])}::text[] ELSE tags END
tags = CASE WHEN ${args.tags !== undefined} THEN ${toTextArrayParam(args.tags || [])}::text[] ELSE tags END,
device_worker_id = CASE WHEN ${args.device_worker_id !== undefined} THEN ${args.device_worker_id ?? null}::uuid ELSE device_worker_id END,
agent_kind = CASE WHEN ${args.agent_kind !== undefined} THEN ${args.agent_kind ?? null} ELSE agent_kind END,
notification_channel = CASE WHEN ${args.notification_channel !== undefined} THEN ${args.notification_channel ?? 'canvas'} ELSE notification_channel END,
notification_priority = CASE WHEN ${args.notification_priority !== undefined} THEN ${args.notification_priority ?? 'normal'} ELSE notification_priority END,
min_cooldown_seconds = CASE WHEN ${args.min_cooldown_seconds !== undefined} THEN ${args.min_cooldown_seconds ?? 0} ELSE min_cooldown_seconds END
WHERE id = ${args.watcher_id}
`;

Expand Down
Loading