diff --git a/db/migrations/20260514000000_scheduled_jobs.sql b/db/migrations/20260514000000_scheduled_jobs.sql new file mode 100644 index 000000000..1132bc932 --- /dev/null +++ b/db/migrations/20260514000000_scheduled_jobs.sql @@ -0,0 +1,97 @@ +-- migrate:up + +-- User-driven scheduled jobs. +-- +-- Why a separate table: +-- * `runs` already holds *fired* / *pending-to-fire* rows via +-- scheduler.spawn(). Each scheduled_jobs row is the *definition* of a +-- recurring (or one-shot) schedule — its source of truth. +-- * The ticker (`scheduled-jobs-tick`) scans this table on cron, spawns +-- a runs row per firing via TaskScheduler.spawn, and advances +-- next_run_at from `cron`. If the tick or a firing fails, the next +-- tick re-reads the same row (next_run_at didn't move forward) and +-- retries. Self-healing. +-- * Attribution lives here: who scheduled it (user or agent), what run +-- was the trigger, what event was the trigger. Lets "why did the +-- system act?" become a single JOIN. +-- * Cascade-on-delete: when an agent is deleted, all its schedules +-- evaporate via the FK — no orphan wake-ups firing into the void. + +CREATE TABLE public.scheduled_jobs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, + + -- What fires + action_type text NOT NULL, -- 'send_notification' | 'wake_agent' | ... + action_args jsonb NOT NULL, -- handler payload + cron text, -- null = one-shot; cron string = recurring + next_run_at timestamp with time zone NOT NULL, + last_fired_at timestamp with time zone, + last_fired_run_id bigint, -- the runs.id from the most recent firing + paused boolean NOT NULL DEFAULT false, + + description text NOT NULL, -- human summary for the UI / audit + + -- Attribution + created_by_user text, -- user that scheduled it (null when agent did) + created_by_agent text, -- agent that scheduled it (null when user did) + source_run_id bigint, -- runs.id that originated the scheduling, if any + source_event_id bigint, -- events.id that originated, if any + source_thread_id text, -- chat-thread context, if any + + created_at timestamp with time zone NOT NULL DEFAULT now(), + updated_at timestamp with time zone NOT NULL DEFAULT now(), + + CONSTRAINT scheduled_jobs_attribution_check CHECK ( + created_by_user IS NOT NULL OR created_by_agent IS NOT NULL + ) +); + +-- Cascade: dropping an agent kills its scheduled jobs (so an agent's +-- wake-ups don't outlive the agent itself). Conditional so the migration +-- works on installs where the agents table doesn't exist yet (very +-- old) — every row already has organization_id which is the harder constraint. +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r') THEN + ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_agent_fkey + FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; + END IF; +END$$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'runs' AND relkind = 'r') THEN + ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_source_run_fkey + FOREIGN KEY (source_run_id) REFERENCES public.runs(id) ON DELETE SET NULL; + END IF; +END$$; + +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'events' AND relkind = 'r') THEN + ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_source_event_fkey + FOREIGN KEY (source_event_id) REFERENCES public.events(id) ON DELETE SET NULL; + END IF; +END$$; + +-- Index: the ticker's hot read. +CREATE INDEX idx_scheduled_jobs_due + ON public.scheduled_jobs (next_run_at) + WHERE NOT paused; + +-- Index: list per-agent / per-user. +CREATE INDEX idx_scheduled_jobs_org_agent + ON public.scheduled_jobs (organization_id, created_by_agent) + WHERE created_by_agent IS NOT NULL; + +CREATE INDEX idx_scheduled_jobs_org_user + ON public.scheduled_jobs (organization_id, created_by_user) + WHERE created_by_user IS NOT NULL; + +-- migrate:down + +DROP TABLE IF EXISTS public.scheduled_jobs; diff --git a/db/schema.sql b/db/schema.sql index ddbc1a3c1..56ff6daed 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -1530,6 +1530,31 @@ CREATE SEQUENCE public.runs_id_seq ALTER SEQUENCE public.runs_id_seq OWNED BY public.runs.id; +-- +-- Name: scheduled_jobs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.scheduled_jobs ( + id uuid DEFAULT gen_random_uuid() NOT NULL, + organization_id text NOT NULL, + action_type text NOT NULL, + action_args jsonb NOT NULL, + cron text, + next_run_at timestamp with time zone NOT NULL, + last_fired_at timestamp with time zone, + last_fired_run_id bigint, + paused boolean DEFAULT false NOT NULL, + description text NOT NULL, + created_by_user text, + created_by_agent text, + source_run_id bigint, + source_event_id bigint, + source_thread_id text, + created_at timestamp with time zone DEFAULT now() NOT NULL, + updated_at timestamp with time zone DEFAULT now() NOT NULL, + CONSTRAINT scheduled_jobs_attribution_check CHECK (((created_by_user IS NOT NULL) OR (created_by_agent IS NOT NULL))) +); + -- -- Name: schema_migrations; Type: TABLE; Schema: public; Owner: - -- @@ -2609,6 +2634,13 @@ ALTER TABLE ONLY public.rate_limits ALTER TABLE ONLY public.runs ADD CONSTRAINT runs_pkey PRIMARY KEY (id); +-- +-- Name: scheduled_jobs scheduled_jobs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_pkey PRIMARY KEY (id); + -- -- Name: schema_migrations schema_migrations_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -3614,6 +3646,24 @@ CREATE INDEX idx_runs_type ON public.runs USING btree (run_type); CREATE INDEX idx_runs_watcher_id ON public.runs USING btree (watcher_id) WHERE (watcher_id IS NOT NULL); +-- +-- Name: idx_scheduled_jobs_due; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_scheduled_jobs_due ON public.scheduled_jobs USING btree (next_run_at) WHERE (NOT paused); + +-- +-- Name: idx_scheduled_jobs_org_agent; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_scheduled_jobs_org_agent ON public.scheduled_jobs USING btree (organization_id, created_by_agent) WHERE (created_by_agent IS NOT NULL); + +-- +-- Name: idx_scheduled_jobs_org_user; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_scheduled_jobs_org_user ON public.scheduled_jobs USING btree (organization_id, created_by_user) WHERE (created_by_user IS NOT NULL); + -- -- Name: idx_view_template_versions_resource; Type: INDEX; Schema: public; Owner: - -- @@ -4724,6 +4774,34 @@ ALTER TABLE ONLY public.runs ALTER TABLE ONLY public.runs ADD CONSTRAINT runs_window_id_fkey FOREIGN KEY (window_id) REFERENCES public.watcher_windows(id) ON DELETE SET NULL; +-- +-- Name: scheduled_jobs scheduled_jobs_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_agent_fkey FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; + +-- +-- Name: scheduled_jobs scheduled_jobs_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; + +-- +-- Name: scheduled_jobs scheduled_jobs_source_event_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_source_event_fkey FOREIGN KEY (source_event_id) REFERENCES public.events(id) ON DELETE SET NULL; + +-- +-- Name: scheduled_jobs scheduled_jobs_source_run_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_source_run_fkey FOREIGN KEY (source_run_id) REFERENCES public.runs(id) ON DELETE SET NULL; + -- -- Name: session session_activeOrganizationId_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -4869,4 +4947,5 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260513000000'), ('20260513120000'), ('20260513150000'), - ('20260513200000'); + ('20260513200000'), + ('20260514000000'); diff --git a/packages/server/src/db/embedded-schema-patches.ts b/packages/server/src/db/embedded-schema-patches.ts index 0f978411e..e432f5bf5 100644 --- a/packages/server/src/db/embedded-schema-patches.ts +++ b/packages/server/src/db/embedded-schema-patches.ts @@ -436,4 +436,59 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [ } }, }, + { + // Mirrors db/migrations/20260514000000_scheduled_jobs.sql. + id: 'scheduled-jobs', + apply: async (sql) => { + await sql.unsafe(` + CREATE TABLE IF NOT EXISTS public.scheduled_jobs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, + action_type text NOT NULL, + action_args jsonb NOT NULL, + cron text, + next_run_at timestamp with time zone NOT NULL, + last_fired_at timestamp with time zone, + last_fired_run_id bigint, + paused boolean NOT NULL DEFAULT false, + description text NOT NULL, + created_by_user text, + created_by_agent text, + source_run_id bigint, + source_event_id bigint, + source_thread_id text, + created_at timestamp with time zone NOT NULL DEFAULT now(), + updated_at timestamp with time zone NOT NULL DEFAULT now(), + CONSTRAINT scheduled_jobs_attribution_check CHECK ( + created_by_user IS NOT NULL OR created_by_agent IS NOT NULL + ) + ) + `); + await sql.unsafe(` + DO $$ + BEGIN + IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r') + AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_agent_fkey') THEN + ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_agent_fkey + FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; + END IF; + END$$; + `); + await sql.unsafe(` + CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_due + ON public.scheduled_jobs (next_run_at) WHERE NOT paused + `); + await sql.unsafe(` + CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_org_agent + ON public.scheduled_jobs (organization_id, created_by_agent) + WHERE created_by_agent IS NOT NULL + `); + await sql.unsafe(` + CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_org_user + ON public.scheduled_jobs (organization_id, created_by_user) + WHERE created_by_user IS NOT NULL + `); + }, + }, ]; diff --git a/packages/server/src/scheduled/jobs.ts b/packages/server/src/scheduled/jobs.ts index bc0894a43..e6868f917 100644 --- a/packages/server/src/scheduled/jobs.ts +++ b/packages/server/src/scheduled/jobs.ts @@ -18,8 +18,15 @@ import { } from '../watchers/automation'; import { checkStalledExecutions } from './check-stalled-executions'; import { runClassificationReconciliation } from './classification-reconciliation'; +import { registerScheduledJobsTicker } from './scheduled-jobs-service'; import { TaskScheduler } from './task-scheduler'; import { triggerEmbedBackfill } from './trigger-embed-backfill'; +import { getDb, pgTextArray } from '../db/client'; +import { createNotificationForUsers } from '../notifications/service'; +import { + createThreadForAgent, + enqueueAgentMessage, +} from '../gateway/services/agent-threads'; /** * Construct the TaskScheduler, register every periodic task, start dispatch, @@ -165,4 +172,131 @@ function registerMaintenanceTasks( }, { cron: '* * * * *' }, ); + + // scheduled_jobs ticker: scans the table every minute, spawns due rows + // as task runs via this same scheduler. The actual firing handlers are + // registered below so spawn() can find them. + registerScheduledJobsTicker(scheduler); + + // Handler: send_notification. Payload mirrors the notify-tool shape; + // resolves recipients to user_ids and inserts events + notification_targets. + scheduler.register('send_notification', async (ctx) => { + const sql = getDb(); + const p = ctx.payload as { + __organization_id?: string; + organization_id?: string; + recipients?: string[] | 'admins' | 'all'; + type?: string; + title?: string; + body?: string | null; + resource_url?: string | null; + }; + const orgId = p.__organization_id ?? p.organization_id; + const title = p.title; + if (!orgId || !title) { + logger.warn({ payload: ctx.payload }, '[task] send_notification missing org or title'); + return; + } + const recipients = p.recipients ?? 'admins'; + let userIds: string[]; + if (Array.isArray(recipients)) { + const rows = await sql<{ userId: string }>` + SELECT "userId" FROM "member" + WHERE "organizationId" = ${orgId} + AND "userId" = ANY(${pgTextArray(recipients)}::text[]) + `; + userIds = rows.map((r) => r.userId); + } else if (recipients === 'all') { + const rows = await sql<{ userId: string }>` + SELECT "userId" FROM "member" + WHERE "organizationId" = ${orgId} + `; + userIds = rows.map((r) => r.userId); + } else { + const rows = await sql<{ userId: string }>` + SELECT "userId" FROM "member" + WHERE "organizationId" = ${orgId} AND role IN ('admin', 'owner') + `; + userIds = rows.map((r) => r.userId); + } + if (userIds.length === 0) return; + await createNotificationForUsers(userIds, { + organizationId: orgId, + type: (p.type as 'agent_message') ?? 'agent_message', + title, + body: p.body ?? null, + resourceUrl: p.resource_url ?? null, + }); + }); + + // Handler: wake_agent. Creates a thread for the agent (or reuses one + // supplied by the caller) and enqueues the prompt as a user message. + // Lets an agent schedule its own follow-up wake-ups via manage_schedules. + scheduler.register('wake_agent', async (ctx) => { + const sql = getDb(); + const p = ctx.payload as { + __organization_id?: string; + __created_by_user?: string | null; + __created_by_agent?: string | null; + __scheduled_job_id?: string; + organization_id?: string; + agent_id?: string; + prompt?: string; + thread_id?: string | null; + reason?: string | null; + }; + const orgId = p.__organization_id ?? p.organization_id; + if (!orgId || !p.agent_id || !p.prompt) { + logger.warn({ payload: ctx.payload }, '[task] wake_agent missing org/agent/prompt'); + return; + } + // Target-agent existence check. The cascade FK on scheduled_jobs only + // covers `created_by_agent` (the *scheduler*'s identity), not the + // *target* of a wake_agent action. If a user scheduled a wake for + // agent X and X was deleted, we'd silently enqueue a message for a + // ghost — so verify the target exists and auto-pause the schedule + // when it doesn't. + const agentRows = (await sql` + SELECT id FROM agents WHERE id = ${p.agent_id} LIMIT 1 + `) as unknown as Array<{ id: string }>; + if (agentRows.length === 0) { + logger.warn( + { scheduled_job_id: p.__scheduled_job_id, agent_id: p.agent_id }, + '[task] wake_agent target agent no longer exists; pausing schedule' + ); + if (p.__scheduled_job_id) { + await sql`UPDATE scheduled_jobs SET paused = true, updated_at = now() WHERE id = ${p.__scheduled_job_id}`; + } + return; + } + const sessionManager = coreServices.getSessionManager(); + const queueProducer = coreServices.getQueueProducer(); + let threadId = p.thread_id ?? null; + if (!threadId) { + const result = await createThreadForAgent( + { sessionManager }, + { + agentId: p.agent_id, + organizationId: orgId, + // The ticker injects the scheduling user under the `__` prefix + // so handler payloads can mix scheduler-controlled metadata with + // user-supplied action_args without collision. Reading from + // p.__created_by_user keeps the wake-up's thread / message + // attribution pointing at whoever scheduled it (not the agent + // itself, which would obscure the audit trail). + createdByUserId: p.__created_by_user ?? undefined, + reason: p.reason ?? 'scheduled-wake', + } + ); + threadId = result.threadId; + } + await enqueueAgentMessage( + { sessionManager, queueProducer }, + { + threadId, + messageText: p.prompt, + source: 'scheduled-job', + } + ); + }); } diff --git a/packages/server/src/scheduled/scheduled-jobs-service.ts b/packages/server/src/scheduled/scheduled-jobs-service.ts new file mode 100644 index 000000000..a3498515d --- /dev/null +++ b/packages/server/src/scheduled/scheduled-jobs-service.ts @@ -0,0 +1,212 @@ +/** + * scheduled_jobs CRUD + ticker. + * + * Model: a `scheduled_jobs` row is the *definition* of a recurring (or + * one-shot) job. The ticker — registered as a TaskScheduler cron at + * `* * * * *` — scans due rows and `scheduler.spawn`s a task per firing. + * The actual handler execution rides on the existing runs-queue, with + * claim/retry/idempotency/observability inherited. + * + * Firing flow: + * 1. Tick claims rows WHERE next_run_at <= now AND NOT paused. + * 2. For each row, spawn(action_type, action_args, { idempotencyKey, runAt: now }). + * 3. Advance last_fired_at + next_run_at (or pause if one-shot completed). + * If the tick crashes between step 2 and 3, the next tick re-reads the + * same row (next_run_at not advanced) and re-spawns — idempotency dedup + * stops duplicates. Self-healing. + */ + +import { getDb } from '../db/client'; +import { nextRunAt as nextCronTickAt } from '../utils/cron'; +import logger from '../utils/logger'; +import type { TaskScheduler } from './task-scheduler'; +import { errorMessage } from '../utils/errors'; + +export interface ScheduledJobRow { + id: string; + organization_id: string; + action_type: string; + action_args: Record; + cron: string | null; + next_run_at: string; + last_fired_at: string | null; + last_fired_run_id: number | null; + paused: boolean; + description: string; + created_by_user: string | null; + created_by_agent: string | null; + source_run_id: number | null; + source_event_id: number | null; + source_thread_id: string | null; + created_at: string; + updated_at: string; +} + +export interface CreateScheduledJobParams { + organizationId: string; + actionType: string; + actionArgs: Record; + description: string; + cron?: string | null; + runAt: Date; + createdByUser?: string | null; + createdByAgent?: string | null; + sourceRunId?: number | null; + sourceEventId?: number | null; + sourceThreadId?: string | null; +} + +export async function createScheduledJob( + params: CreateScheduledJobParams +): Promise { + if (!params.createdByUser && !params.createdByAgent) { + throw new Error('scheduled_jobs requires created_by_user or created_by_agent'); + } + const sql = getDb(); + const rows = (await sql` + INSERT INTO scheduled_jobs ( + organization_id, action_type, action_args, cron, next_run_at, + description, + created_by_user, created_by_agent, + source_run_id, source_event_id, source_thread_id + ) VALUES ( + ${params.organizationId}, ${params.actionType}, + ${sql.json(params.actionArgs)}, ${params.cron ?? null}, ${params.runAt}, + ${params.description}, + ${params.createdByUser ?? null}, ${params.createdByAgent ?? null}, + ${params.sourceRunId ?? null}, ${params.sourceEventId ?? null}, + ${params.sourceThreadId ?? null} + ) + RETURNING * + `) as unknown as ScheduledJobRow[]; + return rows[0]; +} + +export async function listScheduledJobs(opts: { + organizationId: string; + createdByAgent?: string | null; + createdByUser?: string | null; + actionType?: string | null; + includePaused?: boolean; +}): Promise { + const sql = getDb(); + const includePaused = opts.includePaused ?? true; + return (await sql` + SELECT * FROM scheduled_jobs + WHERE organization_id = ${opts.organizationId} + AND (${opts.createdByAgent ?? null}::text IS NULL OR created_by_agent = ${opts.createdByAgent ?? null}) + AND (${opts.createdByUser ?? null}::text IS NULL OR created_by_user = ${opts.createdByUser ?? null}) + AND (${opts.actionType ?? null}::text IS NULL OR action_type = ${opts.actionType ?? null}) + AND (${includePaused} OR NOT paused) + ORDER BY next_run_at ASC + `) as unknown as ScheduledJobRow[]; +} + +export async function getScheduledJob( + organizationId: string, + id: string +): Promise { + const sql = getDb(); + const rows = (await sql` + SELECT * FROM scheduled_jobs + WHERE organization_id = ${organizationId} AND id = ${id} + LIMIT 1 + `) as unknown as ScheduledJobRow[]; + return rows[0] ?? null; +} + +export async function pauseScheduledJob( + organizationId: string, + id: string, + paused: boolean +): Promise { + const sql = getDb(); + const rows = (await sql` + UPDATE scheduled_jobs + SET paused = ${paused}, updated_at = now() + WHERE organization_id = ${organizationId} AND id = ${id} + RETURNING id + `) as unknown as Array<{ id: string }>; + return rows.length > 0; +} + +export async function deleteScheduledJob( + organizationId: string, + id: string +): Promise { + const sql = getDb(); + const rows = (await sql` + DELETE FROM scheduled_jobs + WHERE organization_id = ${organizationId} AND id = ${id} + RETURNING id + `) as unknown as Array<{ id: string }>; + return rows.length > 0; +} + +/** + * Register the per-minute tick. Call once during bootTaskScheduler. + * + * The handler claims due rows transactionally (FOR UPDATE SKIP LOCKED so + * concurrent pods coordinate without an advisory lock), spawns one task + * per row, and advances next_run_at. A handler crash leaves rows un- + * advanced — next minute's tick retries them. Per-row idempotency key + * `scheduled_job::` deduplicates if the same row is read + * twice across pods. + */ +export function registerScheduledJobsTicker(scheduler: TaskScheduler): void { + scheduler.register( + 'scheduled-jobs-tick', + async () => { + const sql = getDb(); + const claimed = await sql.begin(async (tx) => { + return (await tx` + SELECT * + FROM scheduled_jobs + WHERE next_run_at <= now() AND NOT paused + ORDER BY next_run_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 200 + `) as unknown as ScheduledJobRow[]; + }); + if (claimed.length === 0) return; + + for (const row of claimed) { + const tickIso = row.next_run_at; + const idempotencyKey = `scheduled_job:${row.id}:${tickIso}`; + try { + await scheduler.spawn(row.action_type, { + ...row.action_args, + __scheduled_job_id: row.id, + __scheduled_job_tick: tickIso, + __organization_id: row.organization_id, + __created_by_user: row.created_by_user, + __created_by_agent: row.created_by_agent, + }, { idempotencyKey }); + } catch (err) { + logger.warn( + { scheduled_job_id: row.id, err: errorMessage(err) }, + '[scheduled-jobs-tick] spawn failed; leaving next_run_at unchanged for retry' + ); + continue; + } + // Advance OR pause-when-done depending on whether this is recurring. + const nextAt = row.cron ? nextCronTickAt(row.cron) : null; + if (nextAt) { + await sql` + UPDATE scheduled_jobs + SET last_fired_at = now(), next_run_at = ${nextAt}, updated_at = now() + WHERE id = ${row.id} + `; + } else { + // One-shot: mark as fired + paused so the index ignores it. + await sql` + UPDATE scheduled_jobs + SET last_fired_at = now(), paused = true, updated_at = now() + WHERE id = ${row.id} + `; + } + } + }, + { cron: '* * * * *' } + ); +} diff --git a/packages/server/src/tools/admin/index.ts b/packages/server/src/tools/admin/index.ts index 584dbcce1..2eb489caf 100644 --- a/packages/server/src/tools/admin/index.ts +++ b/packages/server/src/tools/admin/index.ts @@ -23,6 +23,7 @@ import { ManageEntitySchemaSchema, manageEntitySchema } from './manage_entity_sc import { ManageFeedsSchema, manageFeeds } from './manage_feeds'; import { NotifySchema, notify } from './notify'; import { ManageOperationsSchema, manageOperations } from './manage_operations'; +import { ManageSchedulesSchema, manageSchedules } from './manage_schedules'; import { ManageViewTemplatesSchema, manageViewTemplates } from './manage_view_templates'; import { ListWatchersSchema, @@ -93,6 +94,14 @@ const ENTRIES: InternalToolEntry[] = [ handler: notify, annotations: { destructiveHint: false }, }, + { + name: 'manage_schedules', + description: + 'Create / list / pause / cancel recurring or one-shot scheduled jobs. Supports send_notification and wake_agent action types. Per-row attribution lets you trace what scheduled it and from where.', + schema: ManageSchedulesSchema, + handler: manageSchedules, + annotations: { destructiveHint: false }, + }, { name: 'manage_watchers', description: 'Watcher management. SDK alternative: client.watchers.', diff --git a/packages/server/src/tools/admin/manage_schedules.ts b/packages/server/src/tools/admin/manage_schedules.ts new file mode 100644 index 000000000..3a81ef7c9 --- /dev/null +++ b/packages/server/src/tools/admin/manage_schedules.ts @@ -0,0 +1,245 @@ +/** + * Tool: manage_schedules + * + * User-facing CRUD over `scheduled_jobs`. Recurring or one-shot. Two + * built-in action types out of the box: + * + * - `send_notification` — at the scheduled time, run the notify-tool's + * server-side path: resolve recipients, insert event + targets. + * Same shape as the immediate `notify` tool, minus the synchronous + * in-line fire. + * - `wake_agent` — at the scheduled time, post a synthetic user message + * to an agent's thread. Lets an agent schedule its own follow-up + * ("wake me in an hour and check X") and survives crashes / deploys. + * + * Attribution is captured from the calling ToolContext: a user-driven + * create stores `created_by_user`; an agent-driven create (via the + * gateway's agent loop) gets `created_by_agent` set. The agent can later + * list / pause its own schedules without holding extra state. + */ + +import { type Static, Type } from '@sinclair/typebox'; +import { TypeCompiler } from '@sinclair/typebox/compiler'; +import type { Env } from '../../index'; +import { routeAction } from './action-router'; +import { + createScheduledJob, + deleteScheduledJob, + getScheduledJob, + listScheduledJobs, + pauseScheduledJob, + type ScheduledJobRow, +} from '../../scheduled/scheduled-jobs-service'; +import type { ToolContext } from '../registry'; +import logger from '../../utils/logger'; +import { nextRunAt as nextCronTickAt } from '../../utils/cron'; + +// ============================================ +// Schema +// ============================================ + +const SendNotificationArgs = Type.Object({ + type: Type.Literal('send_notification'), + title: Type.String({ minLength: 1, maxLength: 200 }), + body: Type.Optional(Type.String({ maxLength: 1000 })), + recipients: Type.Optional( + Type.Union([ + Type.Literal('admins'), + Type.Literal('all'), + Type.Array(Type.String()), + ]) + ), + resource_url: Type.Optional(Type.String()), +}); + +const WakeAgentArgs = Type.Object({ + type: Type.Literal('wake_agent'), + agent_id: Type.String({ minLength: 1 }), + prompt: Type.String({ minLength: 1, maxLength: 4000 }), + thread_id: Type.Optional(Type.String()), + reason: Type.Optional(Type.String({ maxLength: 200 })), +}); + +const ActionUnion = Type.Union([SendNotificationArgs, WakeAgentArgs]); + +const CreateAction = Type.Object({ + action: Type.Literal('create'), + description: Type.String({ minLength: 1, maxLength: 200 }), + /** + * RFC3339 timestamp for the first (or only) firing. Required. + * For one-shot schedules this is the only firing. + */ + run_at: Type.String({ + description: "ISO timestamp for the first / only firing (e.g. '2026-05-15T09:00:00Z').", + }), + /** + * Cron expression. When set, the job re-fires on this cron after + * each firing. When omitted, the job is one-shot. + */ + cron: Type.Optional(Type.String()), + /** Handler-specific payload. The `type` field selects the handler. */ + payload: ActionUnion, + /** + * Optional source attribution — pass when the schedule was triggered + * by another run / event / chat thread so the audit trail captures it. + */ + source_run_id: Type.Optional(Type.Number()), + source_event_id: Type.Optional(Type.Number()), + source_thread_id: Type.Optional(Type.String()), +}); + +const ListAction = Type.Object({ + action: Type.Literal('list'), + agent_id: Type.Optional(Type.String()), + user_id: Type.Optional(Type.String()), + action_type: Type.Optional(Type.String()), + include_paused: Type.Optional(Type.Boolean()), +}); + +const PauseAction = Type.Object({ + action: Type.Literal('pause'), + id: Type.String({ format: 'uuid' }), + paused: Type.Optional(Type.Boolean({ default: true })), +}); + +const CancelAction = Type.Object({ + action: Type.Literal('cancel'), + id: Type.String({ format: 'uuid' }), +}); + +export const ManageSchedulesSchema = Type.Union([ + CreateAction, + ListAction, + PauseAction, + CancelAction, +]); +type ManageSchedulesArgs = Static; + +const createValidator = TypeCompiler.Compile(CreateAction); + +// ============================================ +// Handler +// ============================================ + +interface ToolResult { + schedule?: ReturnType; + schedules?: Array>; + ok?: boolean; + error?: string; +} + +export async function manageSchedules( + args: ManageSchedulesArgs, + _env: Env, + ctx: ToolContext +): Promise { + return routeAction('manage_schedules', args.action, ctx, { + create: () => handleCreate(args as Extract, ctx), + list: () => handleList(args as Extract, ctx), + pause: () => handlePause(args as Extract, ctx), + cancel: () => handleCancel(args as Extract, ctx), + }); +} + +async function handleCreate( + args: Extract, + ctx: ToolContext +): Promise { + if (!createValidator.Check(args)) { + const errs = [...createValidator.Errors(args)]; + return { error: `Invalid args: ${errs.map((e) => `${e.path} ${e.message}`).join('; ')}` }; + } + const runAtDate = new Date(args.run_at); + if (Number.isNaN(runAtDate.getTime())) { + return { error: `run_at is not a valid ISO timestamp: ${args.run_at}` }; + } + // If cron is set, sanity-check it by computing the next tick from now. + if (args.cron) { + try { + nextCronTickAt(args.cron); + } catch (err) { + return { + error: `cron expression rejected: ${err instanceof Error ? err.message : String(err)}`, + }; + } + } + // action_type comes from the payload's discriminant `type`. + const actionType = args.payload.type; + // Strip the discriminant before persisting — handlers know their own type. + const { type: _omit, ...actionArgs } = args.payload as Record & { + type: string; + }; + + const job = await createScheduledJob({ + organizationId: ctx.organizationId, + actionType, + actionArgs, + description: args.description, + cron: args.cron ?? null, + runAt: runAtDate, + createdByUser: ctx.userId ?? null, + // ToolContext doesn't carry an agent attribution today; populated by + // the gateway agent path when it lands (TODO once that wiring exists). + createdByAgent: null, + sourceRunId: args.source_run_id ?? null, + sourceEventId: args.source_event_id ?? null, + sourceThreadId: args.source_thread_id ?? null, + }); + return { schedule: serializeSchedule(job) }; +} + +async function handleList( + args: Extract, + ctx: ToolContext +): Promise { + const rows = await listScheduledJobs({ + organizationId: ctx.organizationId, + createdByAgent: args.agent_id ?? null, + createdByUser: args.user_id ?? null, + actionType: args.action_type ?? null, + includePaused: args.include_paused ?? true, + }); + return { schedules: rows.map(serializeSchedule) }; +} + +async function handlePause( + args: Extract, + ctx: ToolContext +): Promise { + const ok = await pauseScheduledJob(ctx.organizationId, args.id, args.paused ?? true); + if (!ok) return { error: `Schedule '${args.id}' not found in this organization.` }; + const job = await getScheduledJob(ctx.organizationId, args.id); + return { schedule: job ? serializeSchedule(job) : undefined, ok: true }; +} + +async function handleCancel( + args: Extract, + ctx: ToolContext +): Promise { + const ok = await deleteScheduledJob(ctx.organizationId, args.id); + if (!ok) return { error: `Schedule '${args.id}' not found in this organization.` }; + logger.info({ schedule_id: args.id, org: ctx.organizationId }, '[manage_schedules] cancelled'); + return { ok: true }; +} + +function serializeSchedule(row: ScheduledJobRow) { + return { + id: row.id, + organization_id: row.organization_id, + action_type: row.action_type, + action_args: row.action_args, + cron: row.cron, + next_run_at: row.next_run_at, + last_fired_at: row.last_fired_at, + last_fired_run_id: row.last_fired_run_id, + paused: row.paused, + description: row.description, + created_by_user: row.created_by_user, + created_by_agent: row.created_by_agent, + source_run_id: row.source_run_id, + source_event_id: row.source_event_id, + source_thread_id: row.source_thread_id, + created_at: row.created_at, + updated_at: row.updated_at, + }; +} diff --git a/packages/web b/packages/web index 965ec489c..e222de861 160000 --- a/packages/web +++ b/packages/web @@ -1 +1 @@ -Subproject commit 965ec489c53300230733b9f69afc4dc2298a8bda +Subproject commit e222de861a0c16887a9167c47f0ecfca5c5f2df6