-
Notifications
You must be signed in to change notification settings - Fork 20
feat(scheduled-jobs): user-driven cron / one-shot via TaskScheduler + scheduled_jobs table #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| -- migrate:up | ||
|
|
||
| -- User-driven scheduled jobs. | ||
| -- | ||
| -- Why a separate table: | ||
| -- * `runs` already holds *fired* / *pending-to-fire* rows via | ||
| -- scheduler.spawn(). Each scheduled_jobs row is the *definition* of a | ||
| -- recurring (or one-shot) schedule — its source of truth. | ||
| -- * The ticker (`scheduled-jobs-tick`) scans this table on cron, spawns | ||
| -- a runs row per firing via TaskScheduler.spawn, and advances | ||
| -- next_run_at from `cron`. If the tick or a firing fails, the next | ||
| -- tick re-reads the same row (next_run_at didn't move forward) and | ||
| -- retries. Self-healing. | ||
| -- * Attribution lives here: who scheduled it (user or agent), what run | ||
| -- was the trigger, what event was the trigger. Lets "why did the | ||
| -- system act?" become a single JOIN. | ||
| -- * Cascade-on-delete: when an agent is deleted, all its schedules | ||
| -- evaporate via the FK — no orphan wake-ups firing into the void. | ||
|
|
||
| CREATE TABLE public.scheduled_jobs ( | ||
| id uuid PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, | ||
|
|
||
| -- What fires | ||
| action_type text NOT NULL, -- 'send_notification' | 'wake_agent' | ... | ||
| action_args jsonb NOT NULL, -- handler payload | ||
| cron text, -- null = one-shot; cron string = recurring | ||
| next_run_at timestamp with time zone NOT NULL, | ||
| last_fired_at timestamp with time zone, | ||
| last_fired_run_id bigint, -- the runs.id from the most recent firing | ||
| paused boolean NOT NULL DEFAULT false, | ||
|
|
||
| description text NOT NULL, -- human summary for the UI / audit | ||
|
|
||
| -- Attribution | ||
| created_by_user text, -- user that scheduled it (null when agent did) | ||
| created_by_agent text, -- agent that scheduled it (null when user did) | ||
| source_run_id bigint, -- runs.id that originated the scheduling, if any | ||
| source_event_id bigint, -- events.id that originated, if any | ||
| source_thread_id text, -- chat-thread context, if any | ||
|
|
||
| created_at timestamp with time zone NOT NULL DEFAULT now(), | ||
| updated_at timestamp with time zone NOT NULL DEFAULT now(), | ||
|
|
||
| CONSTRAINT scheduled_jobs_attribution_check CHECK ( | ||
| created_by_user IS NOT NULL OR created_by_agent IS NOT NULL | ||
| ) | ||
| ); | ||
|
|
||
| -- Cascade: dropping an agent kills its scheduled jobs (so an agent's | ||
| -- wake-ups don't outlive the agent itself). Conditional so the migration | ||
| -- works on installs where the agents table doesn't exist yet (very | ||
| -- old) — every row already has organization_id which is the harder constraint. | ||
| DO $$ | ||
| BEGIN | ||
| IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r') THEN | ||
| ALTER TABLE public.scheduled_jobs | ||
| ADD CONSTRAINT scheduled_jobs_agent_fkey | ||
| FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; | ||
| END IF; | ||
| END$$; | ||
|
|
||
| DO $$ | ||
| BEGIN | ||
| IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'runs' AND relkind = 'r') THEN | ||
| ALTER TABLE public.scheduled_jobs | ||
| ADD CONSTRAINT scheduled_jobs_source_run_fkey | ||
| FOREIGN KEY (source_run_id) REFERENCES public.runs(id) ON DELETE SET NULL; | ||
| END IF; | ||
| END$$; | ||
|
|
||
| DO $$ | ||
| BEGIN | ||
| IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'events' AND relkind = 'r') THEN | ||
| ALTER TABLE public.scheduled_jobs | ||
| ADD CONSTRAINT scheduled_jobs_source_event_fkey | ||
| FOREIGN KEY (source_event_id) REFERENCES public.events(id) ON DELETE SET NULL; | ||
| END IF; | ||
| END$$; | ||
|
|
||
| -- Index: the ticker's hot read. | ||
| CREATE INDEX idx_scheduled_jobs_due | ||
| ON public.scheduled_jobs (next_run_at) | ||
| WHERE NOT paused; | ||
|
|
||
| -- Index: list per-agent / per-user. | ||
| CREATE INDEX idx_scheduled_jobs_org_agent | ||
| ON public.scheduled_jobs (organization_id, created_by_agent) | ||
| WHERE created_by_agent IS NOT NULL; | ||
|
|
||
| CREATE INDEX idx_scheduled_jobs_org_user | ||
| ON public.scheduled_jobs (organization_id, created_by_user) | ||
| WHERE created_by_user IS NOT NULL; | ||
|
|
||
| -- migrate:down | ||
|
|
||
| DROP TABLE IF EXISTS public.scheduled_jobs; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -436,4 +436,59 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [ | |
| } | ||
| }, | ||
| }, | ||
| { | ||
| // Mirrors db/migrations/20260514000000_scheduled_jobs.sql. | ||
| id: 'scheduled-jobs', | ||
| apply: async (sql) => { | ||
| await sql.unsafe(` | ||
| CREATE TABLE IF NOT EXISTS public.scheduled_jobs ( | ||
| id uuid PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, | ||
| action_type text NOT NULL, | ||
| action_args jsonb NOT NULL, | ||
| cron text, | ||
| next_run_at timestamp with time zone NOT NULL, | ||
| last_fired_at timestamp with time zone, | ||
| last_fired_run_id bigint, | ||
| paused boolean NOT NULL DEFAULT false, | ||
| description text NOT NULL, | ||
| created_by_user text, | ||
| created_by_agent text, | ||
| source_run_id bigint, | ||
| source_event_id bigint, | ||
| source_thread_id text, | ||
| created_at timestamp with time zone NOT NULL DEFAULT now(), | ||
| updated_at timestamp with time zone NOT NULL DEFAULT now(), | ||
| CONSTRAINT scheduled_jobs_attribution_check CHECK ( | ||
| created_by_user IS NOT NULL OR created_by_agent IS NOT NULL | ||
| ) | ||
| ) | ||
| `); | ||
| await sql.unsafe(` | ||
| DO $$ | ||
| BEGIN | ||
| IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r') | ||
| AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_agent_fkey') THEN | ||
| ALTER TABLE public.scheduled_jobs | ||
| ADD CONSTRAINT scheduled_jobs_agent_fkey | ||
| FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; | ||
| END IF; | ||
| END$$; | ||
| `); | ||
| await sql.unsafe(` | ||
| CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_due | ||
| ON public.scheduled_jobs (next_run_at) WHERE NOT paused | ||
| `); | ||
| await sql.unsafe(` | ||
| CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_org_agent | ||
| ON public.scheduled_jobs (organization_id, created_by_agent) | ||
| WHERE created_by_agent IS NOT NULL | ||
| `); | ||
| await sql.unsafe(` | ||
| CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_org_user | ||
| ON public.scheduled_jobs (organization_id, created_by_user) | ||
| WHERE created_by_user IS NOT NULL | ||
| `); | ||
| }, | ||
| }, | ||
|
Comment on lines
+439
to
+493
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mirror the source FKs in the embedded patch. The primary migration and 🔧 Proposed fix await sql.unsafe(`
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r')
AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_agent_fkey') THEN
ALTER TABLE public.scheduled_jobs
ADD CONSTRAINT scheduled_jobs_agent_fkey
FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE;
END IF;
END$$;
`);
+ await sql.unsafe(`
+ DO $$
+ BEGIN
+ IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'runs' AND relkind = 'r')
+ AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_source_run_fkey') THEN
+ ALTER TABLE public.scheduled_jobs
+ ADD CONSTRAINT scheduled_jobs_source_run_fkey
+ FOREIGN KEY (source_run_id) REFERENCES public.runs(id) ON DELETE SET NULL;
+ END IF;
+ END$$;
+ `);
+ await sql.unsafe(`
+ DO $$
+ BEGIN
+ IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'events' AND relkind = 'r')
+ AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_source_event_fkey') THEN
+ ALTER TABLE public.scheduled_jobs
+ ADD CONSTRAINT scheduled_jobs_source_event_fkey
+ FOREIGN KEY (source_event_id) REFERENCES public.events(id) ON DELETE SET NULL;
+ END IF;
+ END$$;
+ `);
await sql.unsafe(`
CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_due
ON public.scheduled_jobs (next_run_at) WHERE NOT paused
`);🤖 Prompt for AI Agents |
||
| ]; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This FK only covers
created_by_agent, not the target agent stored in awake_agentpayload. When a user creates a wake-up for an agent,created_by_agentis null, so deleting that target agent leaves the schedule behind and the ticker can later enqueue a synthetic message for an agent that no longer exists. The cascade needs to reference the wake target (or the handler/ticker must drop rows whose target agent is gone), not just the scheduler identity.Useful? React with 👍 / 👎.