feat(scheduled-jobs): user-driven cron / one-shot via TaskScheduler + scheduled_jobs table#710
Conversation
… new scheduled_jobs table
Background
==========
TaskScheduler already gives us a pg-backed scheduler (claim / retry /
idempotency / observability via the `runs` queue), but `register()` is
code-side only and `spawn()` alone can't track recurring definitions.
This PR adds the missing piece — a user-driven *registry* — without
forking the scheduler.
Schema
------
* `scheduled_jobs` (id, organization_id, action_type, action_args,
cron, next_run_at, last_fired_at, paused, description, created_by_user,
created_by_agent, source_run_id, source_event_id, source_thread_id).
* FK ON DELETE CASCADE on `agents` (when present) so deleting an agent
also drops its scheduled wake-ups — no orphan jobs firing into the
void. Conditional FKs on runs/events for traceability.
* Indexes on (next_run_at WHERE NOT paused), (org, agent), (org, user).
* Migration 20260514000000 + embedded patch (idempotent).
Wiring
------
* `registerScheduledJobsTicker(scheduler)` registers a `* * * * *` cron.
Each tick: SELECT due rows FOR UPDATE SKIP LOCKED → spawn(action_type,
payload, { idempotencyKey: 'scheduled_job:<id>:<tick-iso>' }) → advance
next_run_at (or paused=true for one-shot). Failure between spawn and
advance is fine — next minute's tick re-claims the same row, and the
idempotency key dedups the re-spawn.
* Two task handlers in scheduled/jobs.ts:
- `send_notification` — resolves recipients (admins / all / list)
and calls createNotificationForUsers (which now writes to events +
notification_targets per PR #707).
- `wake_agent` — creates a thread (or reuses one) and enqueues a
synthetic user message via the existing agent-threads API. Lets
an agent schedule its own follow-up wake-up.
Tool
----
* `manage_schedules` MCP tool: create / list / pause / cancel.
Create captures attribution from ToolContext (created_by_user; agent
attribution wires up when the gateway agent path lands).
* Per-action payload validation (typebox discriminated union):
send_notification needs title; wake_agent needs agent_id + prompt.
Smoke test (verified)
---------------------
Scheduled a notification 60s out → ticker fired at next minute boundary
→ row marked paused (one-shot done) with last_fired_at set →
notification appeared via the events/notification_targets path. The full
chain (tool → table → cron tick → spawn → handler → notification) works
end-to-end on a fresh PGlite dev DB.
Out of scope (follow-ups)
-------------------------
* Web UI to list/pause/cancel schedules (manage_schedules is REST-
reachable today, just no dedicated page).
* Agent attribution on ToolContext so a wake-up scheduled by an agent
populates `created_by_agent`. Currently always created_by_user.
* `last_fired_run_id` is wired as a column but not yet populated — the
spawn() return path doesn't surface the runs.id back to the ticker.
Trivial follow-up.
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds a database-backed scheduled-jobs system: migration and embedded schema patch, a service with CRUD and a per-minute claiming ticker, scheduler task handlers for notifications and agent wakeups, and an admin manage_schedules tool plus a web subproject pointer bump. ChangesScheduled Jobs System
Sequence Diagram(s)sequenceDiagram
participant Scheduler as TaskScheduler
participant Ticker as registerScheduledJobsTicker
participant DB as Database
participant TaskQueue as TaskScheduler
participant Notif as send_notification
participant Agent as wake_agent
participant NotifDB as createNotificationForUsers
participant Thread as createThreadForAgent
participant Queue as enqueueAgentMessage
Scheduler->>Ticker: minute tick
Ticker->>DB: Claim due unpaused rows (FOR UPDATE SKIP LOCKED)
DB-->>Ticker: job rows
Ticker->>TaskQueue: spawn task per row (deterministic id)
TaskQueue->>Notif: execute send_notification(payload)
Notif->>NotifDB: createNotificationForUsers(userIds, attrs)
TaskQueue->>Agent: execute wake_agent(payload)
Agent->>Thread: createThreadForAgent(if missing)
Agent->>Queue: enqueueAgentMessage(thread, prompt, source)
Agent->>DB: pause job if agent missing
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ESLint
ESLint skipped: no ESLint configuration detected in root package.json. To enable, add Comment |
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d9f08d296b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| { | ||
| agentId: p.agent_id, | ||
| organizationId: orgId, | ||
| createdByUserId: p.created_by_user ?? undefined, |
There was a problem hiding this comment.
Preserve the scheduling user when waking agents
For user-created wake_agent schedules that do not pass an existing thread_id, the ticker injects the creator as __created_by_user, but this handler reads created_by_user instead. In that scenario createdByUserId is always undefined, so createThreadForAgent falls back to userId = agentId, attributing the synthetic thread/message to the agent rather than the user who scheduled it; any user-scoped session behavior or audit trail for the wake-up is then wrong. Read the injected __created_by_user field here (or pass created_by_user from the ticker).
Useful? React with 👍 / 👎.
| ALTER TABLE public.scheduled_jobs | ||
| ADD CONSTRAINT scheduled_jobs_agent_fkey | ||
| FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; |
There was a problem hiding this comment.
Cascade wake-ups from the target agent
This FK only covers created_by_agent, not the target agent stored in a wake_agent payload. When a user creates a wake-up for an agent, created_by_agent is null, so deleting that target agent leaves the schedule behind and the ticker can later enqueue a synthetic message for an agent that no longer exists. The cascade needs to reference the wake target (or the handler/ticker must drop rows whose target agent is gone), not just the scheduler identity.
Useful? React with 👍 / 👎.
* typecheck: drop unused pgTextArray import from scheduled-jobs-service. * wake_agent: read __created_by_user (not created_by_user) — the ticker injects the scheduling user under that __ prefix to avoid colliding with user-supplied action_args. Without this fix, user-scheduled wake-ups attributed their thread to the agent instead of the user. * wake_agent: existence-check the target agent before enqueuing. The scheduled_jobs.created_by_agent cascade only covers the *scheduler*'s identity; the *target* of a wake_agent payload lives in action_args and isn't FK-protected. Auto-pause the schedule when the target agent is gone so we don't silently enqueue messages for ghosts. * Bump web submodule to owletto-web/main (e222de8) — required for the CI check-drift gate.
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/server/src/db/embedded-schema-patches.ts`:
- Around line 439-493: The embedded patch for id 'scheduled-jobs' creates the
scheduled_jobs table but omits the foreign key constraints for source_run_id and
source_event_id, allowing dangling references; update the apply handler for the
'scheduled-jobs' patch to add the same FK constraints used in the primary
migration/schema (foreign keys on scheduled_jobs.source_run_id and
scheduled_jobs.source_event_id referencing their parent tables with ON DELETE
SET NULL and appropriate constraint names, e.g.
scheduled_jobs_source_run_id_fkey and scheduled_jobs_source_event_id_fkey), and
add those ALTER TABLE / ADD CONSTRAINT statements (guarded by the same
pg_class/pg_constraint existence checks pattern used for
scheduled_jobs_agent_fkey) so PGlite installs mirror the main schema behavior.
In `@packages/server/src/scheduled/jobs.ts`:
- Around line 236-260: The handler reads created_by_user from the payload but
the ticker injects __created_by_user, so update the payload extraction logic
(variable p and usage when calling createThreadForAgent) to prefer
p.__created_by_user if present, e.g. set createdByUserId to p.__created_by_user
?? p.created_by_user ?? undefined before calling createThreadForAgent, ensuring
thread creation uses the injected scheduler attribution and preserves
audit/ownership semantics.
In `@packages/server/src/scheduled/scheduled-jobs-service.ts`:
- Around line 161-170: The SELECT ... FOR UPDATE is released when sql.begin's
callback returns, so rows can be reselected before this worker calls spawn() or
the follow-up UPDATE; to fix, keep the transaction open while you mark/advance
the rows or spawn tasks: perform the SELECT ... FOR UPDATE inside sql.begin and
then, still inside that same transaction callback, either (a) update each
ScheduledJobRow (e.g., set a claimed flag, claim_owner/claim_id or advance
next_run_at) to persist the claim, or (b) call spawn(...) (or a helper that
inserts the task and advances the job) for each row before returning; reference
the existing sql.begin, the claimed variable, and the spawn() / UPDATE follow-up
logic and move those operations into the transaction callback so locks are held
until the claim is durable.
In `@packages/server/src/tools/admin/manage_schedules.ts`:
- Around line 86-87: The schema currently uses Type.Number() for source_run_id
and source_event_id which allows non-integer values; update the schema in
manage_schedules.ts to validate these as integers (e.g., replace Type.Number()
with Type.Integer() or add a refinement/validator) so inputs like 1.5 are
rejected before DB insert; ensure the change targets the fields named
source_run_id and source_event_id in the same Type.Optional(...) declarations
and adjust any related parsing/tests that assume numeric-only validation.
- Around line 173-187: The handler calls createScheduledJob with createdByAgent:
null which causes a 500 when ctx.userId is null; update the handler to ensure at
least one attribution is provided before calling createScheduledJob: pass
createdByUser: ctx.userId ?? null and createdByAgent: ctx.agentId ?? null (or
the correct agent field on ctx), and add a pre-check/guard that if both
ctx.userId and the agent field are null you return a controlled error (HTTP/Trpc
bad request or similar) instead of invoking createScheduledJob; ensure the guard
references createScheduledJob, createdByUser, createdByAgent and
ctx.userId/ctx.agentId so the intent is clear.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 9266b855-6963-43f7-b608-0056f4541c73
📒 Files selected for processing (7)
db/migrations/20260514000000_scheduled_jobs.sqldb/schema.sqlpackages/server/src/db/embedded-schema-patches.tspackages/server/src/scheduled/jobs.tspackages/server/src/scheduled/scheduled-jobs-service.tspackages/server/src/tools/admin/index.tspackages/server/src/tools/admin/manage_schedules.ts
| { | ||
| // Mirrors db/migrations/20260514000000_scheduled_jobs.sql. | ||
| id: 'scheduled-jobs', | ||
| apply: async (sql) => { | ||
| await sql.unsafe(` | ||
| CREATE TABLE IF NOT EXISTS public.scheduled_jobs ( | ||
| id uuid PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, | ||
| action_type text NOT NULL, | ||
| action_args jsonb NOT NULL, | ||
| cron text, | ||
| next_run_at timestamp with time zone NOT NULL, | ||
| last_fired_at timestamp with time zone, | ||
| last_fired_run_id bigint, | ||
| paused boolean NOT NULL DEFAULT false, | ||
| description text NOT NULL, | ||
| created_by_user text, | ||
| created_by_agent text, | ||
| source_run_id bigint, | ||
| source_event_id bigint, | ||
| source_thread_id text, | ||
| created_at timestamp with time zone NOT NULL DEFAULT now(), | ||
| updated_at timestamp with time zone NOT NULL DEFAULT now(), | ||
| CONSTRAINT scheduled_jobs_attribution_check CHECK ( | ||
| created_by_user IS NOT NULL OR created_by_agent IS NOT NULL | ||
| ) | ||
| ) | ||
| `); | ||
| await sql.unsafe(` | ||
| DO $$ | ||
| BEGIN | ||
| IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r') | ||
| AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_agent_fkey') THEN | ||
| ALTER TABLE public.scheduled_jobs | ||
| ADD CONSTRAINT scheduled_jobs_agent_fkey | ||
| FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; | ||
| END IF; | ||
| END$$; | ||
| `); | ||
| await sql.unsafe(` | ||
| CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_due | ||
| ON public.scheduled_jobs (next_run_at) WHERE NOT paused | ||
| `); | ||
| await sql.unsafe(` | ||
| CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_org_agent | ||
| ON public.scheduled_jobs (organization_id, created_by_agent) | ||
| WHERE created_by_agent IS NOT NULL | ||
| `); | ||
| await sql.unsafe(` | ||
| CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_org_user | ||
| ON public.scheduled_jobs (organization_id, created_by_user) | ||
| WHERE created_by_user IS NOT NULL | ||
| `); | ||
| }, | ||
| }, |
There was a problem hiding this comment.
Mirror the source FKs in the embedded patch.
The primary migration and db/schema.sql null out source_run_id/source_event_id on parent deletes, but the embedded patch never adds those constraints. PGlite installs can therefore keep dangling source references and drift from the main schema.
🔧 Proposed fix
await sql.unsafe(`
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'agents' AND relkind = 'r')
AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_agent_fkey') THEN
ALTER TABLE public.scheduled_jobs
ADD CONSTRAINT scheduled_jobs_agent_fkey
FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE;
END IF;
END$$;
`);
+ await sql.unsafe(`
+ DO $$
+ BEGIN
+ IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'runs' AND relkind = 'r')
+ AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_source_run_fkey') THEN
+ ALTER TABLE public.scheduled_jobs
+ ADD CONSTRAINT scheduled_jobs_source_run_fkey
+ FOREIGN KEY (source_run_id) REFERENCES public.runs(id) ON DELETE SET NULL;
+ END IF;
+ END$$;
+ `);
+ await sql.unsafe(`
+ DO $$
+ BEGIN
+ IF EXISTS (SELECT 1 FROM pg_class WHERE relname = 'events' AND relkind = 'r')
+ AND NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'scheduled_jobs_source_event_fkey') THEN
+ ALTER TABLE public.scheduled_jobs
+ ADD CONSTRAINT scheduled_jobs_source_event_fkey
+ FOREIGN KEY (source_event_id) REFERENCES public.events(id) ON DELETE SET NULL;
+ END IF;
+ END$$;
+ `);
await sql.unsafe(`
CREATE INDEX IF NOT EXISTS idx_scheduled_jobs_due
ON public.scheduled_jobs (next_run_at) WHERE NOT paused
`);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/db/embedded-schema-patches.ts` around lines 439 - 493,
The embedded patch for id 'scheduled-jobs' creates the scheduled_jobs table but
omits the foreign key constraints for source_run_id and source_event_id,
allowing dangling references; update the apply handler for the 'scheduled-jobs'
patch to add the same FK constraints used in the primary migration/schema
(foreign keys on scheduled_jobs.source_run_id and scheduled_jobs.source_event_id
referencing their parent tables with ON DELETE SET NULL and appropriate
constraint names, e.g. scheduled_jobs_source_run_id_fkey and
scheduled_jobs_source_event_id_fkey), and add those ALTER TABLE / ADD CONSTRAINT
statements (guarded by the same pg_class/pg_constraint existence checks pattern
used for scheduled_jobs_agent_fkey) so PGlite installs mirror the main schema
behavior.
| const claimed = await sql.begin(async (tx) => { | ||
| return (await tx` | ||
| SELECT * | ||
| FROM scheduled_jobs | ||
| WHERE next_run_at <= now() AND NOT paused | ||
| ORDER BY next_run_at ASC | ||
| FOR UPDATE SKIP LOCKED | ||
| LIMIT 200 | ||
| `) as unknown as ScheduledJobRow[]; | ||
| }); |
There was a problem hiding this comment.
The lock is gone before the row is spawned or advanced.
This transaction only wraps SELECT ... FOR UPDATE SKIP LOCKED. Once the callback returns, every claimed row is unlocked while still due, so another ticker can select the same rows before this worker reaches spawn() or the follow-up UPDATE. The idempotency key masks duplicate task inserts, but the concurrent-claim guarantee here does not actually hold.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/scheduled/scheduled-jobs-service.ts` around lines 161 -
170, The SELECT ... FOR UPDATE is released when sql.begin's callback returns, so
rows can be reselected before this worker calls spawn() or the follow-up UPDATE;
to fix, keep the transaction open while you mark/advance the rows or spawn
tasks: perform the SELECT ... FOR UPDATE inside sql.begin and then, still inside
that same transaction callback, either (a) update each ScheduledJobRow (e.g.,
set a claimed flag, claim_owner/claim_id or advance next_run_at) to persist the
claim, or (b) call spawn(...) (or a helper that inserts the task and advances
the job) for each row before returning; reference the existing sql.begin, the
claimed variable, and the spawn() / UPDATE follow-up logic and move those
operations into the transaction callback so locks are held until the claim is
durable.
| source_run_id: Type.Optional(Type.Number()), | ||
| source_event_id: Type.Optional(Type.Number()), |
There was a problem hiding this comment.
Validate source IDs as integers.
source_run_id and source_event_id back bigint columns, but Type.Number() accepts fractional values. Inputs like 1.5 will pass schema validation and then fail at insert time.
🔧 Proposed fix
- source_run_id: Type.Optional(Type.Number()),
- source_event_id: Type.Optional(Type.Number()),
+ source_run_id: Type.Optional(Type.Integer()),
+ source_event_id: Type.Optional(Type.Integer()),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| source_run_id: Type.Optional(Type.Number()), | |
| source_event_id: Type.Optional(Type.Number()), | |
| source_run_id: Type.Optional(Type.Integer()), | |
| source_event_id: Type.Optional(Type.Integer()), |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/tools/admin/manage_schedules.ts` around lines 86 - 87,
The schema currently uses Type.Number() for source_run_id and source_event_id
which allows non-integer values; update the schema in manage_schedules.ts to
validate these as integers (e.g., replace Type.Number() with Type.Integer() or
add a refinement/validator) so inputs like 1.5 are rejected before DB insert;
ensure the change targets the fields named source_run_id and source_event_id in
the same Type.Optional(...) declarations and adjust any related parsing/tests
that assume numeric-only validation.
| const job = await createScheduledJob({ | ||
| organizationId: ctx.organizationId, | ||
| actionType, | ||
| actionArgs, | ||
| description: args.description, | ||
| cron: args.cron ?? null, | ||
| runAt: runAtDate, | ||
| createdByUser: ctx.userId ?? null, | ||
| // ToolContext doesn't carry an agent attribution today; populated by | ||
| // the gateway agent path when it lands (TODO once that wiring exists). | ||
| createdByAgent: null, | ||
| sourceRunId: args.source_run_id ?? null, | ||
| sourceEventId: args.source_event_id ?? null, | ||
| sourceThreadId: args.source_thread_id ?? null, | ||
| }); |
There was a problem hiding this comment.
Don't let non-user creates fall through to a 500.
createScheduledJob throws unless one of createdByUser or createdByAgent is set, but this handler always passes createdByAgent: null. Any create path with ctx.userId == null currently explodes instead of returning a controlled error or persisting agent attribution.
🔧 Minimal mitigation
+ if (!ctx.userId) {
+ return {
+ error:
+ 'manage_schedules.create needs caller attribution; plumb agent attribution into ToolContext before allowing non-user creates.',
+ };
+ }
+
const job = await createScheduledJob({
organizationId: ctx.organizationId,
actionType,
actionArgs,
description: args.description,
cron: args.cron ?? null,
runAt: runAtDate,
- createdByUser: ctx.userId ?? null,
+ createdByUser: ctx.userId,
// ToolContext doesn't carry an agent attribution today; populated by
// the gateway agent path when it lands (TODO once that wiring exists).
createdByAgent: null,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const job = await createScheduledJob({ | |
| organizationId: ctx.organizationId, | |
| actionType, | |
| actionArgs, | |
| description: args.description, | |
| cron: args.cron ?? null, | |
| runAt: runAtDate, | |
| createdByUser: ctx.userId ?? null, | |
| // ToolContext doesn't carry an agent attribution today; populated by | |
| // the gateway agent path when it lands (TODO once that wiring exists). | |
| createdByAgent: null, | |
| sourceRunId: args.source_run_id ?? null, | |
| sourceEventId: args.source_event_id ?? null, | |
| sourceThreadId: args.source_thread_id ?? null, | |
| }); | |
| if (!ctx.userId) { | |
| return { | |
| error: | |
| 'manage_schedules.create needs caller attribution; plumb agent attribution into ToolContext before allowing non-user creates.', | |
| }; | |
| } | |
| const job = await createScheduledJob({ | |
| organizationId: ctx.organizationId, | |
| actionType, | |
| actionArgs, | |
| description: args.description, | |
| cron: args.cron ?? null, | |
| runAt: runAtDate, | |
| createdByUser: ctx.userId, | |
| // ToolContext doesn't carry an agent attribution today; populated by | |
| // the gateway agent path when it lands (TODO once that wiring exists). | |
| createdByAgent: null, | |
| sourceRunId: args.source_run_id ?? null, | |
| sourceEventId: args.source_event_id ?? null, | |
| sourceThreadId: args.source_thread_id ?? null, | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/server/src/tools/admin/manage_schedules.ts` around lines 173 - 187,
The handler calls createScheduledJob with createdByAgent: null which causes a
500 when ctx.userId is null; update the handler to ensure at least one
attribution is provided before calling createScheduledJob: pass createdByUser:
ctx.userId ?? null and createdByAgent: ctx.agentId ?? null (or the correct agent
field on ctx), and add a pre-check/guard that if both ctx.userId and the agent
field are null you return a controlled error (HTTP/Trpc bad request or similar)
instead of invoking createScheduledJob; ensure the guard references
createScheduledJob, createdByUser, createdByAgent and ctx.userId/ctx.agentId so
the intent is clear.
Summary
Adds the missing piece between TaskScheduler (pg-backed scheduler we already had) and user/agent-driven schedules: a small `scheduled_jobs` registry table + a 1-minute ticker that scans due rows and spawns them as task runs via the existing scheduler.
Two built-in actions out of the box:
Why a separate table
`scheduler.spawn` alone handles one-shot fine — but recurring needs the cron rule to outlive any single firing, and we want addressable schedules (list, pause, cancel, audit) without parsing payload jsonb. A small table is the source of truth; the firing still rides the runs queue. See the migration comment for the full rationale.
The alternative — self-perpetuating spawn chains — has a silent-failure mode (handler runs the work then crashes before re-spawn = chain dies forever) and no way to address a schedule for cancellation without scanning runs.
Reliability
Smoke test (verified locally)
```
00:12:49Z curl manage_schedules action=create run_at=60s → row inserted
00:14:00Z scheduled-jobs-tick fires (top of minute) → spawn(send_notification)
00:14:00Z row advanced: last_fired_at set, paused=true → (one-shot done)
00:14:00Z notification appears via events + notification_targets
```
Full chain — tool → table → cron tick → spawn → handler → notification — works end-to-end on a fresh PGlite dev DB.
Test plan
Follow-ups (separate PRs)
Summary by CodeRabbit