diff --git a/packages/server/src/__tests__/integration/auth/default-provisioning.test.ts b/packages/server/src/__tests__/integration/auth/default-provisioning.test.ts new file mode 100644 index 000000000..739c2cf0c --- /dev/null +++ b/packages/server/src/__tests__/integration/auth/default-provisioning.test.ts @@ -0,0 +1,303 @@ +/** + * Integration tests for `auth/default-provisioning.ts`. + * + * Pins the sentinel behavior pi flagged: deletion stickiness (a removed + * agent / watcher is NOT auto-recreated on the next run), provisioning + * timing (watcher creation requires a device row), and idempotency. + */ + +import { beforeEach, describe, expect, it } from 'vitest'; +import { generateSecureToken } from '../../../auth/oauth/utils'; +import { + DEFAULT_AGENT_ID, + DEFAULT_AGENT_SENTINEL, + DEFAULT_WATCHER_SENTINEL, + DEFAULT_WATCHER_SLUG, + ensureDefaultAgent, + ensureDefaultWatcher, + hasOrgSentinel, +} from '../../../auth/default-provisioning'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; + +async function seedOrg(orgId: string): Promise { + const sql = getTestDb(); + const slug = orgId.replace(/[^a-z0-9]/gi, '-').toLowerCase(); + await sql` + INSERT INTO "organization" (id, name, slug, visibility, "createdAt") + VALUES (${orgId}, ${orgId}, ${slug}, 'private', NOW()) + ON CONFLICT (id) DO NOTHING + `; +} + +async function readMetadata(orgId: string): Promise> { + const sql = getTestDb(); + const rows = await sql` + SELECT metadata FROM "organization" WHERE id = ${orgId} LIMIT 1 + `; + const raw = rows[0]?.metadata as string | null | undefined; + if (!raw) return {}; + return JSON.parse(raw); +} + +describe('ensureDefaultAgent', () => { + beforeEach(async () => { + await cleanupTestDatabase(); + }); + + it('creates the default agent and writes the sentinel', async () => { + const orgId = `org-provision-${generateSecureToken(4)}`; + await seedOrg(orgId); + + const result = await ensureDefaultAgent(orgId); + expect(result.created).toBe(true); + expect(result.reason).toBe('inserted'); + + const sql = getTestDb(); + const agents = await sql` + SELECT id, name FROM agents WHERE organization_id = ${orgId} + `; + expect(agents).toHaveLength(1); + expect(String(agents[0].id)).toBe(DEFAULT_AGENT_ID); + expect(String(agents[0].name)).toBe('Owletto Personal'); + + const metadata = await readMetadata(orgId); + expect(metadata[DEFAULT_AGENT_SENTINEL]).toBeDefined(); + }); + + it('is idempotent — second call is a no-op', async () => { + const orgId = `org-provision-${generateSecureToken(4)}`; + await seedOrg(orgId); + + const first = await ensureDefaultAgent(orgId); + expect(first.created).toBe(true); + + const second = await ensureDefaultAgent(orgId); + expect(second.created).toBe(false); + expect(second.reason).toBe('sentinel'); + }); + + it('is sticky against deletion — recreate refused after sentinel set', async () => { + const orgId = `org-provision-${generateSecureToken(4)}`; + await seedOrg(orgId); + + await ensureDefaultAgent(orgId); + + // User deletes the agent via the web UI. + const sql = getTestDb(); + await sql` + DELETE FROM agents WHERE organization_id = ${orgId} AND id = ${DEFAULT_AGENT_ID} + `; + + const again = await ensureDefaultAgent(orgId); + expect(again.created).toBe(false); + expect(again.reason).toBe('sentinel'); + + const agents = await sql` + SELECT id FROM agents WHERE organization_id = ${orgId} + `; + expect(agents).toHaveLength(0); + }); + + it('skips creation (but stamps sentinel) when other agents already exist', async () => { + const orgId = `org-provision-${generateSecureToken(4)}`; + await seedOrg(orgId); + + // The user already curated their own agent before Owletto ever provisioned. + const sql = getTestDb(); + await sql` + INSERT INTO agents (id, organization_id, name) + VALUES ('user-curated', ${orgId}, 'User-Curated Agent') + `; + + const result = await ensureDefaultAgent(orgId); + expect(result.created).toBe(false); + expect(result.reason).toBe('has_agents'); + + const agents = await sql` + SELECT id FROM agents WHERE organization_id = ${orgId} + `; + expect(agents).toHaveLength(1); + expect(String(agents[0].id)).toBe('user-curated'); + + // Sentinel still set so the next boot doesn't keep re-checking. + expect(await hasOrgSentinel(orgId, DEFAULT_AGENT_SENTINEL)).toBe(true); + }); +}); + +describe('ensureDefaultWatcher', () => { + async function setupOrgWithDeviceAndAgent(): Promise<{ + orgId: string; + deviceWorkerId: string; + userId: string; + }> { + const orgId = `org-watcher-${generateSecureToken(4)}`; + await seedOrg(orgId); + const sql = getTestDb(); + + const userId = `user_${generateSecureToken(4)}`; + await sql` + INSERT INTO "user" (id, name, email, "emailVerified", "createdAt", "updatedAt") + VALUES (${userId}, 'Watcher User', ${`${userId}@test.local`}, true, NOW(), NOW()) + ON CONFLICT (id) DO NOTHING + `; + // Add the user as the org owner so `watchers.created_by` has a valid FK target. + await sql` + INSERT INTO "member" (id, "userId", "organizationId", role, "createdAt") + VALUES (${`member_${generateSecureToken(4)}`}, ${userId}, ${orgId}, 'owner', NOW()) + `; + const inserted = (await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label, organization_id) + VALUES (${userId}, ${`worker-${userId}`}, 'macos', ${sql.json({})}, 'Mac', ${orgId}) + RETURNING id + `) as unknown as Array<{ id: string }>; + const deviceWorkerId = String(inserted[0].id); + + // Pre-provision the default agent (the order ensureDefaultAgent enforces). + await ensureDefaultAgent(orgId); + + return { orgId, deviceWorkerId, userId }; + } + + beforeEach(async () => { + await cleanupTestDatabase(); + }); + + it('creates the daily-checkin watcher pinned to the device', async () => { + const { orgId, deviceWorkerId } = await setupOrgWithDeviceAndAgent(); + + const result = await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + expect(result.created).toBe(true); + expect(result.reason).toBe('inserted'); + + const sql = getTestDb(); + const watchers = await sql` + SELECT id, slug, agent_id, device_worker_id::text AS device_worker_id, schedule, status + FROM watchers + WHERE organization_id = ${orgId} + `; + expect(watchers).toHaveLength(1); + const w = watchers[0]; + expect(String(w.slug)).toBe(DEFAULT_WATCHER_SLUG); + expect(String(w.agent_id)).toBe(DEFAULT_AGENT_ID); + expect(String(w.device_worker_id)).toBe(deviceWorkerId); + expect(String(w.schedule)).toBe('0 9 * * *'); + expect(String(w.status)).toBe('active'); + + const versions = await sql` + SELECT prompt FROM watcher_versions WHERE watcher_id = ${w.id} + `; + expect(versions).toHaveLength(1); + expect(String(versions[0].prompt)).toMatch(/yesterday/i); + + expect(await hasOrgSentinel(orgId, DEFAULT_WATCHER_SENTINEL)).toBe(true); + }); + + it('is idempotent — second call is a no-op', async () => { + const { orgId, deviceWorkerId } = await setupOrgWithDeviceAndAgent(); + + const first = await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + expect(first.created).toBe(true); + + const second = await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + expect(second.created).toBe(false); + expect(second.reason).toBe('sentinel'); + + const sql = getTestDb(); + const watchers = await sql`SELECT id FROM watchers WHERE organization_id = ${orgId}`; + expect(watchers).toHaveLength(1); + }); + + it('is sticky against deletion — recreate refused after sentinel set', async () => { + const { orgId, deviceWorkerId } = await setupOrgWithDeviceAndAgent(); + + await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + + const sql = getTestDb(); + await sql`DELETE FROM watchers WHERE organization_id = ${orgId} AND slug = ${DEFAULT_WATCHER_SLUG}`; + + const again = await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + expect(again.created).toBe(false); + expect(again.reason).toBe('sentinel'); + + const watchers = await sql`SELECT id FROM watchers WHERE organization_id = ${orgId}`; + expect(watchers).toHaveLength(0); + }); + + it('falls back to another agent when the default has been deleted', async () => { + const { orgId, deviceWorkerId } = await setupOrgWithDeviceAndAgent(); + const sql = getTestDb(); + + // User deleted the default agent before the device first registered. + await sql`DELETE FROM agents WHERE organization_id = ${orgId} AND id = ${DEFAULT_AGENT_ID}`; + await sql` + INSERT INTO agents (id, organization_id, name) + VALUES ('fallback-agent', ${orgId}, 'Fallback') + `; + + const result = await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + expect(result.created).toBe(true); + + const watchers = await sql` + SELECT agent_id FROM watchers WHERE organization_id = ${orgId} + `; + expect(watchers).toHaveLength(1); + expect(String(watchers[0].agent_id)).toBe('fallback-agent'); + }); + + it('skips silently when the org has no agents at all', async () => { + const orgId = `org-watcher-noagent-${generateSecureToken(4)}`; + await seedOrg(orgId); + const sql = getTestDb(); + const userId = `user_${generateSecureToken(4)}`; + await sql` + INSERT INTO "user" (id, name, email, "emailVerified", "createdAt", "updatedAt") + VALUES (${userId}, 'No Agent User', ${`${userId}@test.local`}, true, NOW(), NOW()) + ON CONFLICT (id) DO NOTHING + `; + await sql` + INSERT INTO "member" (id, "userId", "organizationId", role, "createdAt") + VALUES (${`member_${generateSecureToken(4)}`}, ${userId}, ${orgId}, 'owner', NOW()) + `; + const inserted = (await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label, organization_id) + VALUES (${userId}, ${`worker-${userId}`}, 'macos', ${sql.json({})}, 'Mac', ${orgId}) + RETURNING id + `) as unknown as Array<{ id: string }>; + const deviceWorkerId = String(inserted[0].id); + + const result = await ensureDefaultWatcher({ + organizationId: orgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId, + }); + expect(result.created).toBe(false); + expect(result.reason).toBe('no_agent'); + + // Sentinel still set so we don't keep retrying on every poll. + expect(await hasOrgSentinel(orgId, DEFAULT_WATCHER_SENTINEL)).toBe(true); + }); +}); diff --git a/packages/server/src/__tests__/integration/watchers/manual-trigger.test.ts b/packages/server/src/__tests__/integration/watchers/manual-trigger.test.ts new file mode 100644 index 000000000..44181cee9 --- /dev/null +++ b/packages/server/src/__tests__/integration/watchers/manual-trigger.test.ts @@ -0,0 +1,310 @@ +/** + * Integration test for the manual-trigger endpoint: + * POST /api/workers/me/watchers/:watcher_id/trigger + * + * Verifies: + * - Correctly-bound device → 200, pending run row created with manual + * dispatch_source. + * - Wrong device → 403, no run row created. + * - Re-trigger while a run is active → 200 `already_queued: true`, no + * second run row created. + * - Trigger does NOT advance `watchers.next_run_at`. + */ + +import { beforeEach, describe, expect, it } from 'vitest'; +import type { DbClient } from '../../../db/client'; +import { generateSecureToken, hashToken } from '../../../auth/oauth/utils'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; +import { createTestAgent, createTestEntity } from '../../setup/test-fixtures'; +import { post } from '../../setup/test-helpers'; +import { TestWorkspace } from '../../setup/test-mcp-client'; + +/** + * Mint a PAT bound to a specific device worker_id with `device_worker:run` + * scope — same shape as createWorkerBoundPat in the sibling automation + * contract test. + */ +async function createWorkerBoundPat( + userId: string, + organizationId: string, + workerId: string, + scope = 'device_worker:run' +): Promise<{ token: string }> { + const sql = getTestDb(); + const token = `owl_pat_${generateSecureToken(24)}`; + const tokenHash = hashToken(token); + const tokenPrefix = token.substring(0, 12); + await sql` + INSERT INTO personal_access_tokens ( + token_hash, token_prefix, user_id, organization_id, name, scope, worker_id, + created_at, updated_at + ) VALUES ( + ${tokenHash}, ${tokenPrefix}, ${userId}, ${organizationId}, + ${`Test worker PAT (${workerId})`}, ${scope}, ${workerId}, + NOW(), NOW() + ) + `; + return { token }; +} + +/** + * Set up a device-pinned watcher owned by the workspace owner. Returns + * everything the tests need to mint the right PAT + assert on the run row. + */ +async function setupDevicePinnedWatcher(opts: { + workerId: string; +}): Promise<{ + sql: ReturnType; + dbClient: DbClient; + workspace: Awaited>; + watcherId: number; + deviceWorkerId: string; + agentId: string; +}> { + const sql = getTestDb(); + const dbClient = sql as unknown as DbClient; + const workspace = await TestWorkspace.create({ name: 'Manual Trigger Org' }); + const ownerUserId = workspace.users.owner.id; + + // Pre-register a device worker so the trigger can resolve the bound id. + const inserted = (await sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label, organization_id) + VALUES (${ownerUserId}, ${opts.workerId}, 'macos', ${sql.json({})}, 'Mac Test', ${workspace.org.id}) + RETURNING id + `) as unknown as Array<{ id: string }>; + const deviceWorkerId = String(inserted[0].id); + + const entity = await createTestEntity({ + name: 'Trigger Entity', + organization_id: workspace.org.id, + created_by: ownerUserId, + }); + const agent = await createTestAgent({ + organizationId: workspace.org.id, + ownerUserId, + agentId: 'trigger-agent', + name: 'Trigger Agent', + }); + const watcher = (await workspace.owner.watchers.create({ + entity_id: entity.id, + slug: 'trigger-watcher', + name: 'Trigger Watcher', + prompt: 'Summarize {{entities}}.', + extraction_schema: { + type: 'object', + properties: { summary: { type: 'string' } }, + required: ['summary'], + }, + schedule: '0 9 * * *', + agent_id: agent.agentId, + })) as { watcher_id: string }; + const watcherId = Number(watcher.watcher_id); + + // Pin the watcher to the device. `WatcherCreateInput` doesn't expose + // device_worker_id / agent_kind, so set them directly — matches how + // automation-contract.test.ts pins watchers for the #802 dispatcher tests. + await sql` + UPDATE watchers + SET device_worker_id = ${deviceWorkerId}::uuid, + agent_kind = 'claude-code' + WHERE id = ${watcherId} + `; + + return { sql, dbClient, workspace, watcherId, deviceWorkerId, agentId: agent.agentId }; +} + +describe('POST /api/workers/me/watchers/:watcher_id/trigger', () => { + beforeEach(async () => { + await cleanupTestDatabase(); + }); + + it('correctly-bound device → 200 + manual run row', async () => { + const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-trigger-ok' }); + const { token } = await createWorkerBoundPat( + ctx.workspace.users.owner.id, + ctx.workspace.org.id, + 'mac-trigger-ok' + ); + + const response = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(response.status).toBe(200); + const json = (await response.json()) as { + run_id: number; + status: string; + already_queued: boolean; + }; + expect(json.run_id).toBeGreaterThan(0); + expect(json.status).toBe('pending'); + expect(json.already_queued).toBe(false); + + const runs = await ctx.sql` + SELECT id, status, watcher_id, run_type, approved_input + FROM runs + WHERE watcher_id = ${ctx.watcherId} + `; + expect(runs).toHaveLength(1); + expect(String(runs[0].run_type)).toBe('watcher'); + expect(String(runs[0].status)).toBe('pending'); + const approved = runs[0].approved_input as Record; + expect(approved.dispatch_source).toBe('manual'); + expect(approved.device_worker_id).toBe(ctx.deviceWorkerId); + expect(approved.agent_kind).toBe('claude-code'); + }); + + it('wrong device → 403, no run created', async () => { + const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-pinned' }); + const ownerUserId = ctx.workspace.users.owner.id; + + // Same user registers a second, unrelated device. Their token is bound + // to that second worker_id and must NOT be able to trigger watcher A. + await ctx.sql` + INSERT INTO device_workers (user_id, worker_id, platform, capabilities, label, organization_id) + VALUES (${ownerUserId}, 'mac-other', 'macos', ${ctx.sql.json({})}, 'Other Mac', ${ctx.workspace.org.id}) + `; + const { token } = await createWorkerBoundPat( + ownerUserId, + ctx.workspace.org.id, + 'mac-other' + ); + + const response = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(response.status).toBe(403); + const body = (await response.json()) as { error: string }; + expect(body.error).toMatch(/not pinned to this device/i); + + const runs = await ctx.sql` + SELECT id FROM runs WHERE watcher_id = ${ctx.watcherId} + `; + expect(runs).toHaveLength(0); + }); + + it('re-trigger while a run is pending → 200 already_queued, no duplicate run', async () => { + const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-trigger-idem' }); + const { token } = await createWorkerBoundPat( + ctx.workspace.users.owner.id, + ctx.workspace.org.id, + 'mac-trigger-idem' + ); + + const first = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(first.status).toBe(200); + const firstJson = (await first.json()) as { run_id: number; already_queued: boolean }; + expect(firstJson.already_queued).toBe(false); + + const second = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(second.status).toBe(200); + const secondJson = (await second.json()) as { + run_id: number; + status: string; + already_queued: boolean; + }; + expect(secondJson.already_queued).toBe(true); + expect(secondJson.run_id).toBe(firstJson.run_id); + + const runs = await ctx.sql` + SELECT id FROM runs WHERE watcher_id = ${ctx.watcherId} + `; + expect(runs).toHaveLength(1); + }); + + it('also returns already_queued for claimed/running existing runs', async () => { + const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-trigger-claimed' }); + const { token } = await createWorkerBoundPat( + ctx.workspace.users.owner.id, + ctx.workspace.org.id, + 'mac-trigger-claimed' + ); + + const first = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(first.status).toBe(200); + const firstJson = (await first.json()) as { run_id: number }; + + // Advance the run from pending → running (post-claim state). The trigger + // helper should still see it as active and refuse to start a second run. + await ctx.sql` + UPDATE runs SET status = 'running', claimed_at = NOW(), claimed_by = 'mac-trigger-claimed' + WHERE id = ${firstJson.run_id} + `; + + const second = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(second.status).toBe(200); + const secondJson = (await second.json()) as { + run_id: number; + already_queued: boolean; + }; + expect(secondJson.already_queued).toBe(true); + expect(secondJson.run_id).toBe(firstJson.run_id); + + const runs = await ctx.sql` + SELECT id, status FROM runs WHERE watcher_id = ${ctx.watcherId} + `; + expect(runs).toHaveLength(1); + expect(String(runs[0].status)).toBe('running'); + }); + + it('does NOT advance watchers.next_run_at', async () => { + const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-trigger-nra' }); + const { token } = await createWorkerBoundPat( + ctx.workspace.users.owner.id, + ctx.workspace.org.id, + 'mac-trigger-nra' + ); + + const [before] = await ctx.sql` + SELECT next_run_at FROM watchers WHERE id = ${ctx.watcherId} + `; + const beforeNextRun = before.next_run_at as Date | string | null; + + const response = await post( + `/api/workers/me/watchers/${ctx.watcherId}/trigger`, + { token } + ); + expect(response.status).toBe(200); + + const [after] = await ctx.sql` + SELECT next_run_at FROM watchers WHERE id = ${ctx.watcherId} + `; + const afterNextRun = after.next_run_at as Date | string | null; + // Either both null (no schedule) or identical timestamps. Manual trigger + // must NOT shift the cron schedule forward. + if (beforeNextRun === null) { + expect(afterNextRun).toBeNull(); + } else { + const beforeMs = new Date(beforeNextRun).getTime(); + const afterMs = afterNextRun ? new Date(afterNextRun).getTime() : 0; + expect(afterMs).toBe(beforeMs); + } + }); + + it('returns 404 for an unknown watcher id', async () => { + const ctx = await setupDevicePinnedWatcher({ workerId: 'mac-404' }); + const { token } = await createWorkerBoundPat( + ctx.workspace.users.owner.id, + ctx.workspace.org.id, + 'mac-404' + ); + + const response = await post('/api/workers/me/watchers/999999999/trigger', { + token, + }); + expect(response.status).toBe(404); + }); +}); diff --git a/packages/server/src/auth/default-provisioning.ts b/packages/server/src/auth/default-provisioning.ts new file mode 100644 index 000000000..a94cca4f4 --- /dev/null +++ b/packages/server/src/auth/default-provisioning.ts @@ -0,0 +1,386 @@ +/** + * Default agent + watcher auto-provisioning for the Mac-app bootstrap org. + * + * The Owletto Mac app's onboarding expects a usable agent + a daily watcher + * already wired up the first time the device polls. Without this, the user + * lands on an empty dashboard and has no clear next step. + * + * Sticky against deletion: a sentinel timestamp is written to + * `organization.metadata` (JSON-as-text) per provisioning step. If the user + * later deletes the agent or watcher via the web UI, the sentinel stays — + * we do NOT auto-recreate. The sentinels live alongside the existing + * `personal_org_for_user_id` marker so we keep one source of truth for + * org-scoped lifecycle flags. + * + * Provisioning timing: + * - **Agent** is provisioned at server boot, immediately after + * `ensureBootstrapPat` lands the bootstrap user/org/member. + * - **Watcher** is provisioned the first time the user's Mac device + * polls `/api/workers/poll` (when the device_workers row is freshly + * INSERTed). Deferring it is what lets us pin the watcher to that + * exact device via `device_worker_id`. + */ + +import { getDb } from '../db/client'; +import type { DbClient } from '../db/client'; +import { getNextNumericId } from '../tools/admin/helpers/db-helpers'; +import { nextRunAt } from '../utils/cron'; +import logger from '../utils/logger'; + +export const DEFAULT_AGENT_SENTINEL = 'default_agent_provisioned'; +export const DEFAULT_WATCHER_SENTINEL = 'default_watcher_provisioned'; + +export const DEFAULT_AGENT_ID = 'owletto-default'; +export const DEFAULT_AGENT_NAME = 'Owletto Personal'; +export const DEFAULT_AGENT_IDENTITY = + "You are the user's personal assistant on their Mac. " + + 'You help them stay productive by surfacing relevant context ' + + 'and useful summaries. ' + + "If you don't have access to recent history or context, say so " + + 'clearly and suggest what the user could connect or track next.'; + +export const DEFAULT_WATCHER_SLUG = 'daily-checkin'; +export const DEFAULT_WATCHER_NAME = 'Daily check-in'; +export const DEFAULT_WATCHER_SCHEDULE = '0 9 * * *'; +export const DEFAULT_WATCHER_PROMPT = + 'Summarize what the user worked on yesterday in 1-2 sentences. ' + + 'Suggest 1-3 concrete priorities for today. ' + + "If you don't have recent history or context for this user, " + + 'say that clearly and suggest what the user could connect ' + + 'or track next (calendar, browser activity, etc.).'; + +export const DEFAULT_WATCHER_EXTRACTION_SCHEMA = { + type: 'object', + properties: { summary: { type: 'string' } }, +}; + +/** + * Read the JSON-decoded `organization.metadata` for the given org. + * Returns an empty object if the row is missing or the JSON is invalid. + */ +async function readOrgMetadata( + sql: DbClient, + organizationId: string +): Promise> { + const rows = (await sql` + SELECT metadata FROM "organization" WHERE id = ${organizationId} LIMIT 1 + `) as unknown as Array<{ metadata: string | null }>; + const raw = rows[0]?.metadata; + if (!raw) return {}; + try { + const parsed = JSON.parse(raw); + return typeof parsed === 'object' && parsed !== null + ? (parsed as Record) + : {}; + } catch { + // Defensive: legacy rows might hold non-JSON text. Treat as empty so the + // sentinel-write below produces a valid JSON object going forward. + logger.warn({ organizationId }, '[default-provisioning] organization.metadata is not valid JSON; resetting'); + return {}; + } +} + +/** + * Merge a sentinel key into `organization.metadata` and write it back as + * JSON-text. Idempotent: if the sentinel is already present, it's a no-op + * relative to the value (we overwrite with the current timestamp, but the + * key presence is what we read for the existence check). + */ +async function writeOrgSentinel( + sql: DbClient, + organizationId: string, + key: string, + value: string +): Promise { + const current = await readOrgMetadata(sql, organizationId); + current[key] = value; + const serialized = JSON.stringify(current); + await sql` + UPDATE "organization" SET metadata = ${serialized} WHERE id = ${organizationId} + `; +} + +/** + * True when the sentinel key is present in `organization.metadata` — meaning + * we previously ran this provisioning step for this org and must NOT re-run + * it even if the row it created has since been deleted. + */ +export async function hasOrgSentinel( + organizationId: string, + key: string, + sql?: DbClient +): Promise { + const client = sql ?? getDb(); + const metadata = await readOrgMetadata(client, organizationId); + return key in metadata; +} + +/** + * Provision the default Owletto agent for the given org, exactly once. + * + * Three guards stack: + * 1. Sentinel in `organization.metadata` (deletion stickiness). + * 2. No existing agents in the org (don't graft Owletto's defaults onto an + * org that's already curated agents by hand). + * 3. ON CONFLICT (organization_id, id) DO NOTHING on the agents PK. + * + * Best-effort: a thrown error is logged and swallowed. A failure here must + * not break the boot path that called us. + */ +export async function ensureDefaultAgent( + organizationId: string, + sql?: DbClient +): Promise<{ created: boolean; reason: 'sentinel' | 'has_agents' | 'inserted' }> { + const client = sql ?? getDb(); + try { + const provisioned = await hasOrgSentinel(organizationId, DEFAULT_AGENT_SENTINEL, client); + if (provisioned) { + return { created: false, reason: 'sentinel' }; + } + + const existingAgents = (await client` + SELECT 1 FROM agents WHERE organization_id = ${organizationId} LIMIT 1 + `) as unknown as Array; + if (existingAgents.length > 0) { + // Still write the sentinel so we don't re-check on every boot. + await writeOrgSentinel( + client, + organizationId, + DEFAULT_AGENT_SENTINEL, + new Date().toISOString() + ); + return { created: false, reason: 'has_agents' }; + } + + // Insert the default agent. The PK is (organization_id, id) so we can + // ON CONFLICT DO NOTHING to guard against a parallel boot. + await client` + INSERT INTO agents ( + id, organization_id, name, identity_md, + owner_platform, owner_user_id, is_workspace_agent, + created_at, updated_at + ) VALUES ( + ${DEFAULT_AGENT_ID}, ${organizationId}, ${DEFAULT_AGENT_NAME}, ${DEFAULT_AGENT_IDENTITY}, + 'lobu', NULL, false, + NOW(), NOW() + ) + ON CONFLICT (organization_id, id) DO NOTHING + `; + + await writeOrgSentinel( + client, + organizationId, + DEFAULT_AGENT_SENTINEL, + new Date().toISOString() + ); + + logger.info( + { organizationId, agentId: DEFAULT_AGENT_ID }, + '[default-provisioning] Provisioned default agent' + ); + return { created: true, reason: 'inserted' }; + } catch (err) { + logger.warn( + { organizationId, err: err instanceof Error ? err.message : String(err) }, + '[default-provisioning] Default-agent provisioning failed (non-fatal)' + ); + return { created: false, reason: 'sentinel' }; + } +} + +/** + * Provision the default daily-check-in watcher for the bootstrap org, pinned + * to the given device worker, exactly once. + * + * Deferred to the first `/api/workers/poll` from the user's first Mac so the + * `device_worker_id` lane is set correctly — the dispatcher then skips this + * watcher and only the matching device claims it via poll. + * + * Same three guards as `ensureDefaultAgent`: org sentinel, fall-back slug + * uniqueness check, and the watchers (organization_id, slug) constraint + * (enforced manually via SELECT + INSERT in a transaction). + * + * Best-effort: errors are logged and swallowed so a partial provisioning + * failure doesn't break the poll response. + */ +export async function ensureDefaultWatcher(params: { + organizationId: string; + agentId: string; + deviceWorkerId: string; + sql?: DbClient; +}): Promise<{ created: boolean; reason: 'sentinel' | 'slug_taken' | 'inserted' | 'no_agent' }> { + const sql = params.sql ?? getDb(); + try { + const provisioned = await hasOrgSentinel( + params.organizationId, + DEFAULT_WATCHER_SENTINEL, + sql + ); + if (provisioned) { + return { created: false, reason: 'sentinel' }; + } + + // The agent we pin to must actually exist. If the user deleted the + // default agent before the device first polled, fall back to ANY agent + // in the org so the watcher still has a valid foreign key. If there's + // no agent at all (zombied org), set the sentinel and skip — there's + // nothing useful we can wire up. + const agentRows = (await sql` + SELECT id FROM agents + WHERE organization_id = ${params.organizationId} + AND id = ${params.agentId} + LIMIT 1 + `) as unknown as Array<{ id: string }>; + let resolvedAgentId = agentRows[0]?.id ?? null; + if (!resolvedAgentId) { + const fallback = (await sql` + SELECT id FROM agents + WHERE organization_id = ${params.organizationId} + ORDER BY created_at ASC + LIMIT 1 + `) as unknown as Array<{ id: string }>; + resolvedAgentId = fallback[0]?.id ?? null; + } + if (!resolvedAgentId) { + await writeOrgSentinel( + sql, + params.organizationId, + DEFAULT_WATCHER_SENTINEL, + new Date().toISOString() + ); + return { created: false, reason: 'no_agent' }; + } + + // Slug-uniqueness guard (matches the implicit uniqueness handleCreate enforces). + const slugClash = (await sql` + SELECT 1 FROM watchers + WHERE organization_id = ${params.organizationId} + AND slug = ${DEFAULT_WATCHER_SLUG} + LIMIT 1 + `) as unknown as Array; + if (slugClash.length > 0) { + await writeOrgSentinel( + sql, + params.organizationId, + DEFAULT_WATCHER_SENTINEL, + new Date().toISOString() + ); + return { created: false, reason: 'slug_taken' }; + } + + // The `watchers.created_by` FK references `user(id)` ON DELETE RESTRICT. + // Pick the org owner (any member with role='owner', falling back to any + // member) so the row stays attributable. The `system` user fallback is + // for tests and local dev where the bootstrap path may not have run yet. + const createdByRows = (await sql` + SELECT "userId" FROM "member" + WHERE "organizationId" = ${params.organizationId} + ORDER BY CASE WHEN role = 'owner' THEN 0 ELSE 1 END ASC, "createdAt" ASC + LIMIT 1 + `) as unknown as Array<{ userId: string }>; + const ownerUserId = createdByRows[0]?.userId ?? null; + let createdBy: string | null = ownerUserId; + if (!createdBy) { + const systemRows = (await sql` + SELECT id FROM "user" WHERE id = 'system' LIMIT 1 + `) as unknown as Array<{ id: string }>; + createdBy = systemRows[0]?.id ?? null; + } + if (!createdBy) { + logger.warn( + { organizationId: params.organizationId }, + '[default-provisioning] No user available to attribute watcher creation — skipping' + ); + await writeOrgSentinel( + sql, + params.organizationId, + DEFAULT_WATCHER_SENTINEL, + new Date().toISOString() + ); + return { created: false, reason: 'no_agent' }; + } + + const extractionSchema = DEFAULT_WATCHER_EXTRACTION_SCHEMA; + const sources = [ + { name: 'content', query: 'SELECT * FROM events ORDER BY occurred_at DESC' }, + ]; + + await sql.begin(async (tx) => { + const watcherId = await getNextNumericId(tx, 'watchers'); + const versionId = await getNextNumericId(tx, 'watcher_versions'); + const scheduledNextRun = nextRunAt(DEFAULT_WATCHER_SCHEDULE); + + await tx` + INSERT INTO watchers ( + id, name, slug, organization_id, entity_ids, + schedule, next_run_at, agent_id, scheduler_client_id, model_config, sources, version, + current_version_id, tags, status, created_by, created_at, updated_at, + watcher_group_id, + device_worker_id, agent_kind, + notification_channel, notification_priority, min_cooldown_seconds + ) VALUES ( + ${watcherId}, ${DEFAULT_WATCHER_NAME}, ${DEFAULT_WATCHER_SLUG}, + ${params.organizationId}, ${'{}'}::bigint[], + ${DEFAULT_WATCHER_SCHEDULE}, ${scheduledNextRun}, + ${resolvedAgentId}, NULL, + ${tx.json({})}, ${tx.json(sources)}, + 1, NULL, ${'{}'}::text[], + 'active', ${createdBy}, NOW(), NOW(), + ${watcherId}, + ${params.deviceWorkerId}::uuid, NULL, + 'canvas', 'normal', 3600 + ) + `; + + await tx` + INSERT INTO watcher_versions ( + id, watcher_id, version, name, description, + prompt, extraction_schema, version_sources, + json_template, keying_config, classifiers, + condensation_prompt, condensation_window_count, + reactions_guidance, change_notes, created_by, created_at + ) VALUES ( + ${versionId}, ${watcherId}, 1, ${DEFAULT_WATCHER_NAME}, NULL, + ${DEFAULT_WATCHER_PROMPT}, ${tx.json(extractionSchema)}, ${tx.json(sources)}, + NULL, NULL, NULL, + NULL, NULL, + NULL, 'Initial version', ${createdBy}, NOW() + ) + `; + + await tx` + UPDATE watchers + SET current_version_id = ${versionId} + WHERE id = ${watcherId} + `; + }); + + await writeOrgSentinel( + sql, + params.organizationId, + DEFAULT_WATCHER_SENTINEL, + new Date().toISOString() + ); + + logger.info( + { + organizationId: params.organizationId, + agentId: resolvedAgentId, + deviceWorkerId: params.deviceWorkerId, + slug: DEFAULT_WATCHER_SLUG, + }, + '[default-provisioning] Provisioned default watcher pinned to device' + ); + return { created: true, reason: 'inserted' }; + } catch (err) { + logger.warn( + { + organizationId: params.organizationId, + deviceWorkerId: params.deviceWorkerId, + err: err instanceof Error ? err.message : String(err), + }, + '[default-provisioning] Default-watcher provisioning failed (non-fatal)' + ); + return { created: false, reason: 'sentinel' }; + } +} diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index db6b4453b..12f584f18 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -637,6 +637,7 @@ import { pollWorkerJob, postAuthSignal, streamContent, + triggerWatcherForDevice, } from './worker-api'; // Worker API authentication. @@ -691,11 +692,18 @@ app.use('/api/workers/*', async (c, next) => { // gate here would just block legitimate posts from the bound device. const isWatcherCompleteSubpath = /^\/api\/workers\/me\/runs\/\d+\/complete-watcher$/.test(requestPath); + // /api/workers/me/watchers//trigger — device-side manual + // re-run endpoint. The handler does its own bound-workerId → + // device_worker_id match, so the org-scope gate here would block + // legitimate triggers from the pinned device. + const isWatcherTriggerSubpath = + /^\/api\/workers\/me\/watchers\/\d+\/trigger$/.test(requestPath); if ( !allowedPathsForUserWorker.has(requestPath) && !isAuthProfileSubpath && !isFeedSubpath && - !isWatcherCompleteSubpath + !isWatcherCompleteSubpath && + !isWatcherTriggerSubpath ) { return c.json({ error: 'Endpoint not available to user-scoped workers' }, 403); } @@ -744,6 +752,7 @@ app.post('/api/workers/complete', completeWorkerJob); app.post('/api/workers/complete-action', completeActionRun); app.post('/api/workers/complete-embeddings', completeEmbeddings); app.post('/api/workers/me/runs/:runId/complete-watcher', completeWatcherRun); +app.post('/api/workers/me/watchers/:watcher_id/trigger', triggerWatcherForDevice); app.post('/api/workers/fetch-events', fetchEventsForEmbedding); app.post('/api/workers/emit-auth-artifact', emitAuthArtifact); app.post('/api/workers/poll-auth-signal', pollAuthSignal); diff --git a/packages/server/src/start-local.ts b/packages/server/src/start-local.ts index 5b42ad80e..e07107be5 100644 --- a/packages/server/src/start-local.ts +++ b/packages/server/src/start-local.ts @@ -27,6 +27,7 @@ import dotenv from 'dotenv'; dotenv.config(); +import { ensureDefaultAgent } from './auth/default-provisioning'; import { generatePAT, getPATPrefix, hashToken } from './auth/oauth/utils'; import { PGlite } from '@electric-sql/pglite'; @@ -204,6 +205,20 @@ async function main() { logger.warn({ err }, 'Bootstrap PAT setup failed'); } + // ─── Default agent (Mac-app onboarding) ────────────────────── + // Auto-provision the Owletto Personal agent for the bootstrap org + // the first time the deployment boots. Sticky against deletion via a + // sentinel in `organization.metadata` — if the user removes the agent + // through the web UI we do NOT recreate it on the next boot. + // + // Best-effort: failure here does not block boot. The Mac app degrades to + // an empty-agents state instead of failing to start the server. + try { + await ensureDefaultAgent(BOOTSTRAP_ORG_ID); + } catch (err) { + logger.warn({ err }, 'Default-agent provisioning failed'); + } + // ─── Listen ────────────────────────────────────────────────── // No-auth mode is loopback-only by design. Refuse to listen on anything diff --git a/packages/server/src/tools/admin/manage_watchers.ts b/packages/server/src/tools/admin/manage_watchers.ts index 7f4189986..8c3269d66 100644 --- a/packages/server/src/tools/admin/manage_watchers.ts +++ b/packages/server/src/tools/admin/manage_watchers.ts @@ -2293,6 +2293,7 @@ async function handleList( i.next_run_at, i.agent_id, i.device_worker_id, + i.last_fired_at, i.scheduler_client_id, i.model_config, i.sources, diff --git a/packages/server/src/watchers/automation.ts b/packages/server/src/watchers/automation.ts index ec13a851d..df759547d 100644 --- a/packages/server/src/watchers/automation.ts +++ b/packages/server/src/watchers/automation.ts @@ -204,7 +204,7 @@ async function enqueueWatcherRunForRecord( return queued; } -async function enqueueWatcherRunForWatcher( +export async function enqueueWatcherRunForWatcher( watcherId: number, dispatchSource: WatcherRunPayload['dispatch_source'], db?: DbClient diff --git a/packages/server/src/worker-api.ts b/packages/server/src/worker-api.ts index ba7e9e92a..884de78c8 100644 --- a/packages/server/src/worker-api.ts +++ b/packages/server/src/worker-api.ts @@ -28,7 +28,13 @@ import { import { captureServerError } from './sentry'; import { autoLinkEvent } from './utils/auto-linker'; import { nextRunAt as nextRunAtFromCron } from './utils/cron'; -import { advanceWatcherSchedule } from './watchers/automation'; +import { advanceWatcherSchedule, enqueueWatcherRunForWatcher } from './watchers/automation'; +import { + DEFAULT_AGENT_ID, + ensureDefaultWatcher, + hasOrgSentinel, + DEFAULT_AGENT_SENTINEL, +} from './auth/default-provisioning'; import { getNextNumericId } from './tools/admin/helpers/db-helpers'; import { reconcileDeviceCapabilities } from './worker-api/device-reconcile'; import { findBundledConnectorFile } from './utils/connector-catalog'; @@ -293,6 +299,37 @@ export async function pollWorkerJob(c: Context<{ Bindings: Env }>) { summary: `Device "${label ?? worker_id}" registered`, extra: { platform, worker_id, app_version }, }); + + // Mac-app onboarding: when a device registers for the first time in an + // org that's a candidate for default provisioning (agent sentinel set + // → `ensureDefaultAgent` ran for this org at boot), provision a daily + // check-in watcher pinned to THIS device. The sentinel on + // `organization.metadata` makes this exactly-once even across multiple + // first-poll attempts. Deletion stickiness: if the user later removes + // the watcher via the web UI, the sentinel stays and we do NOT + // recreate. + const provisioningOrgId = upserted[0].organization_id; + const provisioningDeviceId = upserted[0].id; + try { + const isCandidateOrg = await hasOrgSentinel( + provisioningOrgId, + DEFAULT_AGENT_SENTINEL, + sql + ); + if (isCandidateOrg) { + await ensureDefaultWatcher({ + organizationId: provisioningOrgId, + agentId: DEFAULT_AGENT_ID, + deviceWorkerId: provisioningDeviceId, + sql, + }); + } + } catch (err) { + logger.warn( + { err: errorMessage(err), organizationId: provisioningOrgId }, + '[pollWorkerJob] default-watcher provisioning failed (non-fatal)' + ); + } } // Reconcile this user's device connectors against the capabilities their @@ -2906,3 +2943,141 @@ export async function deleteMyDeviceFeed(c: Context<{ Bindings: Env }>) { } } +/** + * POST /api/workers/me/watchers/:watcher_id/trigger + * + * Manually fire a watcher run from the device that owns it. The Mac app's + * "Run now" action posts here. Unlike the scheduled path, this: + * - does NOT advance `watchers.next_run_at` (manual fires shouldn't shift + * the cron schedule); + * - is idempotent against active runs — re-trigger while a previous run is + * pending/claimed/running returns the existing `run_id` with + * `already_queued: true`; + * - requires the calling token's bound `device_workers.id` to match + * `watchers.device_worker_id`. No cross-device triggering. + * + * Auth: same `/api/workers/*` middleware. `device_worker:run` scope (granted + * to Mac-app PATs minted via the device-link flow). + */ +export async function triggerWatcherForDevice(c: Context<{ Bindings: Env }>) { + const watcherIdParam = c.req.param('watcher_id'); + if (!watcherIdParam) { + return c.json({ error: 'watcher_id is required' }, 400); + } + const watcherId = Number(watcherIdParam); + if (!Number.isFinite(watcherId) || watcherId <= 0) { + return c.json({ error: 'Invalid watcher_id' }, 400); + } + + // The middleware already verified the token has `device_worker:run` (or + // mcp:write/admin). The trigger surface is user-scoped only — trusted + // server workers shouldn't be triggering device-pinned watchers, that's + // what the scheduled path is for. + if (c.var.workerAuthMode !== 'user') { + return c.json({ error: 'Endpoint is user-scoped only' }, 403); + } + const workerUserId = c.var.workerUserId; + if (!workerUserId) { + return c.json({ error: 'Unauthorized' }, 401); + } + const scopes = c.var.mcpAuthInfo?.scopes ?? []; + if ( + !scopes.includes('device_worker:run') && + !scopes.includes('mcp:write') && + !scopes.includes('mcp:admin') + ) { + return c.json({ error: 'Worker token missing device_worker:run scope' }, 403); + } + + // Resolve the caller's bound device worker. mcpAuth populates + // `mcpAuthInfo.workerId` from the PAT row. Without a bound workerId there's + // no way to authorize the trigger — manual fires must come from a known + // physical device. + const boundWorkerId = c.var.mcpAuthInfo?.workerId ?? null; + if (!boundWorkerId) { + return c.json({ error: 'Token is not bound to a device worker' }, 403); + } + + const sql = getDb(); + let resolvedDeviceWorkerId: string; + try { + const deviceRows = (await sql` + SELECT id, organization_id + FROM device_workers + WHERE user_id = ${workerUserId} AND worker_id = ${boundWorkerId} + LIMIT 1 + `) as unknown as Array<{ id: string; organization_id: string | null }>; + const device = deviceRows[0]; + if (!device) { + return c.json({ error: 'Device not registered yet — poll first' }, 404); + } + resolvedDeviceWorkerId = device.id; + } catch (err) { + logger.error({ error: errorMessage(err) }, '[triggerWatcherForDevice] device lookup failed'); + return c.json({ error: 'Internal error' }, 500); + } + + // Load the watcher and enforce two checks: + // (1) the watcher is in the caller's org scope (auth middleware computed + // `workerOrgIds` from the token-bound org + the user's personal org); + // (2) `watchers.device_worker_id` matches the caller's device. Even if + // the user owns both devices, A cannot trigger a watcher pinned to B + // — that's a different pairing in the UI. + const watcherRows = (await sql` + SELECT id, organization_id, agent_id, status, device_worker_id::text AS device_worker_id + FROM watchers + WHERE id = ${watcherId} + LIMIT 1 + `) as unknown as Array<{ + id: number; + organization_id: string; + agent_id: string | null; + status: string; + device_worker_id: string | null; + }>; + const watcher = watcherRows[0]; + if (!watcher) { + return c.json({ error: 'Watcher not found' }, 404); + } + + const orgIds = c.var.workerOrgIds ?? []; + if (!orgIds.includes(watcher.organization_id)) { + return c.json({ error: 'Forbidden' }, 403); + } + if (!watcher.device_worker_id || watcher.device_worker_id !== resolvedDeviceWorkerId) { + return c.json({ error: 'Watcher is not pinned to this device' }, 403); + } + if ((watcher.status ?? 'active') !== 'active') { + return c.json({ error: 'Watcher is not active' }, 409); + } + if (!watcher.agent_id) { + return c.json({ error: 'Watcher has no agent assigned' }, 409); + } + + // Enqueue (or re-use) the run. `enqueueWatcherRunForWatcher` delegates to + // `createWatcherRun`, which checks for an active run in the same watcher_id + // lane and reuses it (returns `created: false`). That gives us broad + // idempotency across pending/claimed/running — re-trigger never starts a + // second run while the first is still in flight. We intentionally do NOT + // advance `watchers.next_run_at` here so a manual fire doesn't shift the + // cron schedule. + try { + const result = await enqueueWatcherRunForWatcher(watcherId, 'manual'); + return c.json( + { + run_id: result.runId, + status: result.status, + already_queued: !result.created, + queued_at: new Date().toISOString(), + }, + 200 + ); + } catch (err) { + logger.error( + { error: errorMessage(err), watcherId }, + '[triggerWatcherForDevice] enqueue failed' + ); + return c.json({ error: errorMessage(err) }, 500); + } +} +