diff --git a/examples/agent-community/models/reactions/opportunity-matcher.reaction.ts b/examples/agent-community/models/reactions/opportunity-matcher.reaction.ts new file mode 100644 index 000000000..6066c723b --- /dev/null +++ b/examples/agent-community/models/reactions/opportunity-matcher.reaction.ts @@ -0,0 +1,38 @@ +/** + * Reaction for the `opportunity-matcher` watcher. + * + * Runs every 12h after the LLM scans member activity and produces a list of + * suggested matches. Persists each match as a `community_match` event so + * downstream consumers (intro-drafting agents, weekly digest, audit log) can + * iterate over a single source of truth instead of re-running the matcher. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface MatchData { + signals?: Array<{ + member_a: string; + member_b: string; + reason: string; + confidence?: number; + }>; +} + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as MatchData; + const signals = data.signals ?? []; + if (signals.length === 0) return; + + for (const s of signals) { + await client.knowledge.save({ + entity_ids: ctx.entities.map((e) => e.id), + content: `Match: ${s.member_a} ↔ ${s.member_b} — ${s.reason}`, + semantic_type: "community_match", + metadata: { + member_a: s.member_a, + member_b: s.member_b, + confidence: s.confidence ?? null, + window_id: ctx.window.id, + }, + }); + } +}; diff --git a/examples/agent-community/models/schema.yaml b/examples/agent-community/models/schema.yaml index 3266ebbee..ffac7763a 100644 --- a/examples/agent-community/models/schema.yaml +++ b/examples/agent-community/models/schema.yaml @@ -89,6 +89,10 @@ watchers: agent: agent-community name: Opportunity matcher schedule: 0 */12 * * * + notification_priority: normal + tags: [community, matching] + min_cooldown_seconds: 300 + reaction_script: ./reactions/opportunity-matcher.reaction.ts prompt: | Monitor connected profiles, newsletters, websites, and member updates for new launches, posts, hiring signals, funding news, and project changes. Identify which members are likely to care, explain why, and queue approved intro or outreach drafts. extraction_schema: diff --git a/examples/atlas/models/reactions/catalog-staleness-checker.reaction.ts b/examples/atlas/models/reactions/catalog-staleness-checker.reaction.ts new file mode 100644 index 000000000..d098b793b --- /dev/null +++ b/examples/atlas/models/reactions/catalog-staleness-checker.reaction.ts @@ -0,0 +1,38 @@ +/** + * Reaction for atlas's `catalog-staleness-checker` watcher. + * + * Writes a `catalog_stale` event per stale entry the LLM identified. Atlas is + * a long-lived reference catalog — entries that haven't been re-verified in + * 90+ days are flagged so a curator can decide whether to refresh, retire, or + * leave them. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface StaleData { + stale_entries?: Array<{ + entity_type: string; + slug: string; + last_updated: string; + suggested_action: string; + }>; +} + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as StaleData; + const stale = data.stale_entries ?? []; + if (stale.length === 0) return; + + for (const s of stale) { + await client.knowledge.save({ + entity_ids: ctx.entities.map((e) => e.id), + content: `Stale ${s.entity_type}/${s.slug} — last updated ${s.last_updated}\n→ ${s.suggested_action}`, + semantic_type: "catalog_stale", + metadata: { + entity_type: s.entity_type, + slug: s.slug, + last_updated: s.last_updated, + window_id: ctx.window.id, + }, + }); + } +}; diff --git a/examples/atlas/models/schema.yaml b/examples/atlas/models/schema.yaml index 2e71613f0..a800eafa3 100644 --- a/examples/atlas/models/schema.yaml +++ b/examples/atlas/models/schema.yaml @@ -153,3 +153,43 @@ entities: homepage_url: type: string format: uri + +watchers: + # Demo watcher exercising the file-first apply surface: tags, notification + # routing, cooldowns, and a sibling reaction script. Atlas is a reference + # catalog (cities, countries, industries, universities, …) — once a week + # we sweep for entries that haven't been touched in 90+ days and flag them + # for re-verification. + - slug: catalog-staleness-checker + agent: atlas-curator + name: Catalog staleness checker + schedule: 0 4 * * 1 + notification_priority: low + tags: [atlas, reference, weekly] + min_cooldown_seconds: 3600 + reaction_script: ./reactions/catalog-staleness-checker.reaction.ts + prompt: | + Sweep the atlas reference catalog for entries that haven't been + updated in 90+ days. List the stalest 10 across cities, countries, + industries, technologies, and universities. Suggest a re-verification + action for each (e.g. "country/PL: confirm population from latest census"). + extraction_schema: + type: object + required: + - stale_entries + properties: + stale_entries: + type: array + items: + type: object + properties: + entity_type: + type: string + slug: + type: string + last_updated: + type: string + suggested_action: + type: string + total_stale_count: + type: integer diff --git a/examples/delivery/models/schema.yaml b/examples/delivery/models/schema.yaml index 8ed1b030a..b6e6c3c5c 100644 --- a/examples/delivery/models/schema.yaml +++ b/examples/delivery/models/schema.yaml @@ -110,6 +110,10 @@ watchers: agent: delivery name: Phoenix rollout tracker schedule: 0 9 * * 1 + notification_priority: high + notification_channel: both + tags: [delivery, weekly, rollout] + min_cooldown_seconds: 3600 prompt: | Check project blockers, milestone progress, and generate the weekly risk summary for leadership. extraction_schema: diff --git a/examples/ecommerce/models/schema.yaml b/examples/ecommerce/models/schema.yaml index b469850a6..dd565a26f 100644 --- a/examples/ecommerce/models/schema.yaml +++ b/examples/ecommerce/models/schema.yaml @@ -111,6 +111,9 @@ watchers: agent: ecommerce-ops name: Customer activity tracker schedule: 0 */6 * * * + notification_priority: normal + tags: [ecommerce, customer-ops] + min_cooldown_seconds: 300 prompt: | Monitor customers for new orders, subscription changes, delivery requests, and support interactions. extraction_schema: diff --git a/examples/finance/models/reactions/reconciliation-monitor.reaction.ts b/examples/finance/models/reactions/reconciliation-monitor.reaction.ts new file mode 100644 index 000000000..528e44d3f --- /dev/null +++ b/examples/finance/models/reactions/reconciliation-monitor.reaction.ts @@ -0,0 +1,40 @@ +/** + * Reaction for the `reconciliation-monitor` watcher. + * + * Persists any variance flagged during the daily 6am sweep as a durable + * `variance_flag` event tied to the affected account. Downstream agents + * (close-of-month rollup, audit prep) consume these events instead of + * re-extracting variances from the raw transaction stream. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface ReconciliationData { + variances?: Array<{ + account: string; + amount: number; + direction: "over" | "under"; + reason: string; + }>; + unreconciled_count?: number; +} + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as ReconciliationData; + const variances = data.variances ?? []; + if (variances.length === 0) return; + + for (const v of variances) { + await client.knowledge.save({ + entity_ids: ctx.entities.map((e) => e.id), + content: `Variance ${v.direction} on ${v.account}: ${v.amount} — ${v.reason}`, + semantic_type: "variance_flag", + metadata: { + account: v.account, + amount: v.amount, + direction: v.direction, + window_id: ctx.window.id, + unreconciled_count: data.unreconciled_count ?? null, + }, + }); + } +}; diff --git a/examples/finance/models/schema.yaml b/examples/finance/models/schema.yaml index fdce28f49..d2ce8fce0 100644 --- a/examples/finance/models/schema.yaml +++ b/examples/finance/models/schema.yaml @@ -111,6 +111,11 @@ watchers: agent: finance name: Reconciliation monitor schedule: 0 6 * * 1-5 + notification_priority: high + notification_channel: both + tags: [finance, reconciliation, daily] + min_cooldown_seconds: 3600 + reaction_script: ./reactions/reconciliation-monitor.reaction.ts prompt: | Check accounts for unreconciled transactions, new variances, and approaching reporting deadlines. Lead with exceptions that need review. extraction_schema: diff --git a/examples/leadership/models/schema.yaml b/examples/leadership/models/schema.yaml index 822c5bc5c..a05c03890 100644 --- a/examples/leadership/models/schema.yaml +++ b/examples/leadership/models/schema.yaml @@ -134,6 +134,10 @@ watchers: agent: leadership name: Board action tracker schedule: 0 8 * * * + notification_priority: high + notification_channel: both + tags: [leadership, daily, board] + agent_kind: notifier prompt: | Track board action items: check task delivery status, blocker resolution progress, and approaching deadlines for the next board packet. extraction_schema: diff --git a/examples/legal/models/schema.yaml b/examples/legal/models/schema.yaml index aa3cdcff8..5250aa627 100644 --- a/examples/legal/models/schema.yaml +++ b/examples/legal/models/schema.yaml @@ -114,6 +114,13 @@ watchers: agent: legal-review name: Contract review tracker schedule: 0 8 * * 1-5 + notification_priority: high + tags: [legal, contract, daily] + min_cooldown_seconds: 1800 + reactions_guidance: | + For any contract with `status: needs_counsel`, route an entity-scoped event + to the assigned reviewer. For contracts >90 days unsigned, escalate to the + counterparty owner; never auto-resolve risk items. prompt: | Review active contracts for approaching deadlines, unsigned agreements, and unresolved risk items. Flag any clauses that still need counsel approval. extraction_schema: diff --git a/examples/lobu-crm/models/reactions/funnel-digest.reaction.ts b/examples/lobu-crm/models/reactions/funnel-digest.reaction.ts new file mode 100644 index 000000000..0b902ec4f --- /dev/null +++ b/examples/lobu-crm/models/reactions/funnel-digest.reaction.ts @@ -0,0 +1,51 @@ +/** + * Reaction for the `funnel-digest` watcher. + * + * Runs after the weekly Monday-9am window completes. `ctx.extracted_data` is + * whatever the watcher's `extraction_schema` produced — funnel snapshot, top + * action, stale leads, etc. We persist the digest as a `funnel_digest` event + * linked to every lead the watcher knows about so the next digest can compare + * stage_counts week-over-week without re-running classification. + * + * Pair with `notification_priority: high` on the watcher — the OS notification + * fires regardless of whether this script succeeds; this just produces durable + * knowledge. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface DigestData { + top_action?: string; + stage_counts?: Record; + conversations_this_week?: number; + gap?: string; +} + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as DigestData; + const stageSummary = Object.entries(data.stage_counts ?? {}) + .map(([stage, n]) => `${stage}: ${n}`) + .join(", "); + const content = [ + `Weekly funnel digest — ${ctx.window.window_end.slice(0, 10)}`, + `Top action: ${data.top_action ?? "(none)"}`, + `Stages: ${stageSummary || "(empty)"}`, + `Conversations this week: ${data.conversations_this_week ?? 0}`, + data.gap ? `Gap: ${data.gap}` : null, + ] + .filter(Boolean) + .join("\n"); + + await client.knowledge.save({ + // Attaching to the whole watcher's entity set keeps the digest scoped to + // CRM data and discoverable from any lead the watcher already touches. + entity_ids: ctx.entities.map((e) => e.id), + content, + semantic_type: "funnel_digest", + metadata: { + window_id: ctx.window.id, + watcher_slug: ctx.watcher.slug, + stage_counts: data.stage_counts ?? {}, + top_action: data.top_action ?? null, + }, + }); +}; diff --git a/examples/lobu-crm/models/reactions/inbound-triage.reaction.ts b/examples/lobu-crm/models/reactions/inbound-triage.reaction.ts new file mode 100644 index 000000000..2a8a98ca6 --- /dev/null +++ b/examples/lobu-crm/models/reactions/inbound-triage.reaction.ts @@ -0,0 +1,44 @@ +/** + * Reaction for the `inbound-triage` watcher. + * + * Fires every 2h after the watcher LLM extracts new and enriched leads from + * GitHub/X/HN signals. The script writes a `lead_interaction` event per + * recommended action so the next digest can count them — the watcher itself + * already creates the `lead` rows, so we don't duplicate that here. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface TriageData { + new_leads?: Array<{ + name: string; + source: string; + stage: string; + why?: string; + }>; + recommended_actions?: string[]; + notable?: boolean; +} + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as TriageData; + // Nothing notable → nothing to persist. The watcher's prompt is explicit + // about not manufacturing noise; we mirror that here. + if (!data.notable) return; + + const actions = data.recommended_actions ?? []; + if (actions.length === 0) return; + + await client.knowledge.save({ + entity_ids: ctx.entities.map((e) => e.id), + content: [ + `Triage run ${ctx.window.window_end.slice(0, 16)} — ${actions.length} action(s)`, + ...actions.map((a, i) => `${i + 1}. ${a}`), + ].join("\n"), + semantic_type: "lead_interaction", + metadata: { + window_id: ctx.window.id, + new_lead_count: data.new_leads?.length ?? 0, + action_count: actions.length, + }, + }); +}; diff --git a/examples/lobu-crm/models/schema.yaml b/examples/lobu-crm/models/schema.yaml index ae14a35a6..ac5890dec 100644 --- a/examples/lobu-crm/models/schema.yaml +++ b/examples/lobu-crm/models/schema.yaml @@ -111,6 +111,11 @@ watchers: agent: crm name: Weekly funnel digest schedule: 0 9 * * 1 + notification_priority: high + notification_channel: both + tags: [crm, weekly] + min_cooldown_seconds: 3600 + reaction_script: ./reactions/funnel-digest.reaction.ts prompt: | Produce the weekly funnel digest and post it to Slack. Keep it short. @@ -173,6 +178,10 @@ watchers: agent: crm name: Inbound triage schedule: 0 8-22/2 * * * + notification_priority: normal + tags: [crm, triage] + min_cooldown_seconds: 300 + reaction_script: ./reactions/inbound-triage.reaction.ts prompt: | Look for new top-of-funnel signals since the last run, across the connectors in this org: diff --git a/examples/market/models/reactions/founder-activity-tracker.reaction.ts b/examples/market/models/reactions/founder-activity-tracker.reaction.ts new file mode 100644 index 000000000..c61c5fe2a --- /dev/null +++ b/examples/market/models/reactions/founder-activity-tracker.reaction.ts @@ -0,0 +1,39 @@ +/** + * Reaction for the `founder-activity-tracker` watcher. + * + * Records notable public activity (tweets, blog posts, hiring posts, fundraise + * rumors) as `founder_activity` events. The opportunity-matcher watcher reads + * these events to suggest cross-portfolio introductions. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface FounderActivityData { + signals?: Array<{ + founder: string; + activity_type: string; + summary: string; + importance?: "low" | "medium" | "high"; + }>; +} + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as FounderActivityData; + const signals = data.signals ?? []; + // High-importance only — low-noise channel for the intel feed. + const notable = signals.filter((s) => s.importance === "high"); + if (notable.length === 0) return; + + for (const s of notable) { + await client.knowledge.save({ + entity_ids: ctx.entities.map((e) => e.id), + content: `${s.founder} — ${s.activity_type}: ${s.summary}`, + semantic_type: "founder_activity", + metadata: { + founder: s.founder, + activity_type: s.activity_type, + importance: s.importance, + window_id: ctx.window.id, + }, + }); + } +}; diff --git a/examples/market/models/schema.yaml b/examples/market/models/schema.yaml index 43880daeb..2ed59782a 100644 --- a/examples/market/models/schema.yaml +++ b/examples/market/models/schema.yaml @@ -534,6 +534,10 @@ watchers: agent: vc-tracking name: Founder Activity Tracker schedule: 0 10 * * * + notification_priority: normal + tags: [vc, founders, daily] + min_cooldown_seconds: 600 + reaction_script: ./reactions/founder-activity-tracker.reaction.ts prompt: | You are a venture capital analyst tracking the public activity of startup founders in your portfolio. @@ -647,6 +651,9 @@ watchers: agent: vc-tracking name: Opportunity Matcher schedule: 0 */12 * * * + notification_priority: normal + tags: [vc, matching] + min_cooldown_seconds: 600 prompt: | You are a community intelligence agent for a private founder community managed by a venture capital fund. Your job is to monitor founder activity and identify high-quality introduction opportunities between portfolio founders. diff --git a/examples/office-bot/models/lunch.yaml b/examples/office-bot/models/lunch.yaml index 9cbba830b..c344c3137 100644 --- a/examples/office-bot/models/lunch.yaml +++ b/examples/office-bot/models/lunch.yaml @@ -60,6 +60,10 @@ watchers: name: Open the lunch run # Workdays 11:00 Europe/London — post the lunch call and open the thread. schedule: "0 11 * * 1-5" + notification_priority: high + notification_channel: both + tags: [lunch, daily] + min_cooldown_seconds: 600 prompt: | Open today's office lunch run (step 1 in your instructions): @@ -95,6 +99,13 @@ watchers: # Workdays 11:35 Europe/London — read the thread, post options/collect orders, # build the Deliveroo basket, post the summary, hand off to a human. schedule: "35 11 * * 1-5" + notification_priority: high + tags: [lunch, daily] + min_cooldown_seconds: 600 + reactions_guidance: | + When the run ends in `placed` or `manual`, store the basket link + per-head cost + back into a `lunch:placed` event on the lunch-run entity so the next day's + lunch-open can read the most-recent restaurant. prompt: | Finalize today's office lunch run (step 2 in your instructions): diff --git a/examples/personal-finance/models/schema.yaml b/examples/personal-finance/models/schema.yaml index c92ab4e45..bb377691d 100644 --- a/examples/personal-finance/models/schema.yaml +++ b/examples/personal-finance/models/schema.yaml @@ -1162,6 +1162,9 @@ watchers: agent: personal-finance name: Gmail financial-event extractor schedule: '*/30 * * * *' + notification_priority: low + tags: [personal-finance, gmail, ingestion] + min_cooldown_seconds: 300 prompt: | You are a private financial accountant scanning the user's forwarded Gmail messages for events that matter to a UK Self Assessment return. diff --git a/examples/sales/models/reactions/account-health-monitor.reaction.ts b/examples/sales/models/reactions/account-health-monitor.reaction.ts new file mode 100644 index 000000000..007587f9d --- /dev/null +++ b/examples/sales/models/reactions/account-health-monitor.reaction.ts @@ -0,0 +1,44 @@ +/** + * Reaction for the `account-health-monitor` watcher. + * + * When the watcher detects a material risk-level change on a tracked account, + * persist a `health_change` event so the renewal-risk view + weekly digest + * have a stable record without re-extracting from the CRM stream. + */ +import type { ReactionContext } from "@lobu/connector-sdk"; + +interface HealthData { + account_changes?: Array<{ + account: string; + previous_risk: "low" | "medium" | "high"; + current_risk: "low" | "medium" | "high"; + signals: string[]; + }>; +} + +const RISK_ORDER = { low: 0, medium: 1, high: 2 } as const; + +export default async (ctx: ReactionContext, client: any): Promise => { + const data = ctx.extracted_data as HealthData; + const changes = data.account_changes ?? []; + // Only persist *worsening* transitions — improvements are visible in the + // CRM stream and don't need a durable flag. + const escalations = changes.filter( + (c) => RISK_ORDER[c.current_risk] > RISK_ORDER[c.previous_risk] + ); + if (escalations.length === 0) return; + + for (const c of escalations) { + await client.knowledge.save({ + entity_ids: ctx.entities.map((e) => e.id), + content: `Account ${c.account}: risk ${c.previous_risk} → ${c.current_risk}\nSignals: ${c.signals.join("; ")}`, + semantic_type: "health_change", + metadata: { + account: c.account, + from: c.previous_risk, + to: c.current_risk, + window_id: ctx.window.id, + }, + }); + } +}; diff --git a/examples/sales/models/schema.yaml b/examples/sales/models/schema.yaml index a07231c52..a2b287a01 100644 --- a/examples/sales/models/schema.yaml +++ b/examples/sales/models/schema.yaml @@ -134,6 +134,11 @@ watchers: agent: sales name: Account health monitor schedule: 0 */12 * * * + notification_priority: high + notification_channel: both + tags: [sales, health, renewals] + min_cooldown_seconds: 1800 + reaction_script: ./reactions/account-health-monitor.reaction.ts prompt: | Poll CRM data for tracked accounts. Track expansion progress, risk level changes, and renewal timeline. extraction_schema: diff --git a/packages/cli/src/commands/_lib/apply/__tests__/client.test.ts b/packages/cli/src/commands/_lib/apply/__tests__/client.test.ts index 89180acf6..0aff6ed5d 100644 --- a/packages/cli/src/commands/_lib/apply/__tests__/client.test.ts +++ b/packages/cli/src/commands/_lib/apply/__tests__/client.test.ts @@ -40,7 +40,11 @@ describe("ApplyClient", () => { ); const watchers = await client.listWatchers(); - expect(calls[0]?.url).toBe("https://example.test/api/acme/watchers"); + // `include_details=true` so the apply diff can see prompt / + // extraction_schema / reactions_guidance / etc. for drift detection. + expect(calls[0]?.url).toBe( + "https://example.test/api/acme/watchers?include_details=true" + ); expect(calls[0]?.init?.method).toBe("GET"); expect(watchers).toEqual([{ slug: "digest", name: "Digest" }]); }); diff --git a/packages/cli/src/commands/_lib/apply/__tests__/desired-state-extra.test.ts b/packages/cli/src/commands/_lib/apply/__tests__/desired-state-extra.test.ts index a4c4e28a2..432b89ade 100644 --- a/packages/cli/src/commands/_lib/apply/__tests__/desired-state-extra.test.ts +++ b/packages/cli/src/commands/_lib/apply/__tests__/desired-state-extra.test.ts @@ -344,3 +344,243 @@ credentials: expect(state.connectors.authProfiles).toHaveLength(0); }); }); + +// ── Watcher admin-only fields ──────────────────────────────────────────────── + +describe("loadDesiredState — watcher admin-only fields", () => { + test("watcher with reaction_script reads sibling .ts and resolves relative to YAML", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: with-reaction + name: With Reaction + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reaction_script: ./funnel.reaction.ts +`, + "funnel.reaction.ts": "export default async (ctx, client) => {};\n", + }); + const { state } = await loadDesiredState({ cwd: dir }); + expect(state.watchers).toHaveLength(1); + const w = state.watchers[0]!; + expect(w.reactionScript?.sourceCode).toContain("export default async"); + expect(w.reactionScript?.sourcePath).toContain("funnel.reaction.ts"); + }); + + test("watcher with reaction_script pointing at missing file → ValidationError", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: bad-reaction + name: Bad + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reaction_script: ./nonexistent.ts +`, + }); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /reaction_script.*does not exist/ + ); + }); + + test("inline reaction_script string (not a path) → ValidationError", async () => { + // The hint here is that inline scripts are rejected — they'd skip IDE + // type-checking and the path-vs-source ambiguity is the whole reason we + // require a sibling file. + const dir = mkProjectWithModels({ + // smol-toml + yaml libraries can parse multiline literals fine; we just + // need a string that doesn't look like a path AND fails the file read. + "w.yaml": `version: 2 +watchers: + - slug: inline-reaction + name: Inline + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reaction_script: "export default async () => {};" +`, + }); + // Falls through to the "does not exist" branch since the source string + // is interpreted as a path. + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /reaction_script/ + ); + }); + + test("watcher with all admin-only scalar fields parses correctly", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: full + name: Full watcher + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reactions_guidance: "Be terse." + device_worker_id: 550e8400-e29b-41d4-a716-446655440000 + scheduler_client_id: mcp-client-1 + notification_channel: both + notification_priority: high + min_cooldown_seconds: 60 + tags: [crm, weekly] + agent_kind: notifier +`, + }); + const { state } = await loadDesiredState({ cwd: dir }); + const w = state.watchers[0]!; + expect(w.reactionsGuidance).toBe("Be terse."); + expect(w.deviceWorkerId).toBe("550e8400-e29b-41d4-a716-446655440000"); + expect(w.schedulerClientId).toBe("mcp-client-1"); + expect(w.notificationChannel).toBe("both"); + expect(w.notificationPriority).toBe("high"); + expect(w.minCooldownSeconds).toBe(60); + expect(w.tags).toEqual(["crm", "weekly"]); + expect(w.agentKind).toBe("notifier"); + }); + + test("watcher with non-UUID device_worker_id → ValidationError", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: bad-device + name: Bad device + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + device_worker_id: not-a-uuid +`, + }); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /device_worker_id.*UUID/ + ); + }); + + test("watcher with invalid notification_priority → ValidationError", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: bad-priority + name: Bad priority + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + notification_priority: critical +`, + }); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /notification_priority/ + ); + }); + + test("watcher with too-frequent cron → ValidationError", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: too-frequent + name: Too frequent + agent: triage + prompt: "Do work." + schedule: "*/30 * * * * *" +`, + }); + // The 6-field cron is rejected as an invalid expression at validate time; + // the diagnostic should mention the schedule. + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow(/schedule/i); + }); +}); + +// ── Connection device_worker_id ────────────────────────────────────────────── + +describe("loadDesiredState — connection device_worker_id", () => { + test("connection with device_worker_id UUID parses correctly", async () => { + const dir = mkProject(BASE_TOML); + mkdirSync(join(dir, "connectors"), { recursive: true }); + writeFileSync( + join(dir, "connectors", "c.yaml"), + `version: 1 +type: connection +slug: my-conn +connector: example +device_worker_id: 550e8400-e29b-41d4-a716-446655440000 +` + ); + const { state } = await loadDesiredState({ cwd: dir }); + expect(state.connectors.connections[0]?.deviceWorkerId).toBe( + "550e8400-e29b-41d4-a716-446655440000" + ); + }); + + test("connection with non-UUID device_worker_id → ValidationError", async () => { + const dir = mkProject(BASE_TOML); + mkdirSync(join(dir, "connectors"), { recursive: true }); + writeFileSync( + join(dir, "connectors", "c.yaml"), + `version: 1 +type: connection +slug: my-conn +connector: example +device_worker_id: not-a-uuid +` + ); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /device_worker_id.*UUID/ + ); + }); +}); + +// ── Reaction script path traversal / size guards ──────────────────────────── + +describe("loadDesiredState — reaction_script path validation", () => { + test("absolute path is rejected", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: bad-reaction + name: Bad + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reaction_script: /etc/passwd +`, + }); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /relative POSIX path/ + ); + }); + + test(".. segment is rejected", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: bad-reaction + name: Bad + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reaction_script: ../../../etc/passwd +`, + }); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /must not contain `\.\.`/ + ); + }); + + test("non-.ts extension is rejected", async () => { + const dir = mkProjectWithModels({ + "w.yaml": `version: 2 +watchers: + - slug: bad-reaction + name: Bad + agent: triage + prompt: "Do work." + schedule: "0 9 * * 1" + reaction_script: ./reactions/funnel.js +`, + }); + await expect(loadDesiredState({ cwd: dir })).rejects.toThrow( + /must end in `\.ts`/ + ); + }); +}); diff --git a/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts b/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts index 87a34632a..549d50d10 100644 --- a/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts +++ b/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts @@ -1,8 +1,8 @@ import { describe, expect, test } from "bun:test"; -import chalk from "chalk"; import type { AgentSettings } from "@lobu/core"; -import { computeDiff, type RemoteSnapshot } from "../diff.js"; +import chalk from "chalk"; import type { DesiredAgent, DesiredState } from "../desired-state.js"; +import { computeDiff, type RemoteSnapshot } from "../diff.js"; import { renderPlan, renderSummary } from "../render.js"; // Force chalk to render plain text in snapshots regardless of TTY detection. @@ -10,892 +10,983 @@ import { renderPlan, renderSummary } from "../render.js"; chalk.level = 0; function buildDesiredAgent( - agentId: string, - overrides: Partial = {} + agentId: string, + overrides: Partial = {}, ): DesiredAgent { - return { - metadata: { agentId, name: agentId, description: undefined }, - settings: {}, - platforms: [], - ...overrides, - }; + return { + metadata: { agentId, name: agentId, description: undefined }, + settings: {}, + platforms: [], + ...overrides, + }; } function buildState( - agents: DesiredAgent[], - overrides: Partial = {} + agents: DesiredAgent[], + overrides: Partial = {}, ): DesiredState { - return { - agents, - memorySchema: { entityTypes: [], relationshipTypes: [] }, - watchers: [], - connectors: { definitions: [], authProfiles: [], connections: [] }, - requiredSecrets: [], - ...overrides, - }; + return { + agents, + memorySchema: { entityTypes: [], relationshipTypes: [] }, + watchers: [], + connectors: { definitions: [], authProfiles: [], connections: [] }, + requiredSecrets: [], + ...overrides, + }; } function emptyRemote(): RemoteSnapshot { - return { - agents: [], - agentSettings: new Map(), - platformsByAgent: new Map(), - entityTypes: [], - relationshipTypes: [], - watchers: [], - connectorDefinitions: [], - authProfiles: [], - connections: [], - feedsByConnectionId: new Map(), - }; + return { + agents: [], + agentSettings: new Map(), + platformsByAgent: new Map(), + entityTypes: [], + relationshipTypes: [], + watchers: [], + connectorDefinitions: [], + authProfiles: [], + connections: [], + feedsByConnectionId: new Map(), + }; } describe("apply diff — agents", () => { - test("create from empty remote", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { - agentId: "triage", - name: "Triage", - description: "Triage bot", - }, - }), - ]); - const plan = computeDiff(desired, emptyRemote()); - - expect(plan.counts).toEqual({ create: 2, update: 0, noop: 0, drift: 0 }); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("noop when remote matches desired", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.noop).toBeGreaterThan(0); - expect(plan.counts.create).toBe(0); - expect(plan.counts.update).toBe(0); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("update when name differs", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Renamed" }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Original" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.update).toBeGreaterThan(0); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("drift when remote has agent not in desired", () => { - const desired = buildState([]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "stale", name: "Stale Agent" }], - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.drift).toBe(1); - expect(renderPlan(plan)).toMatchSnapshot(); - }); + test("create from empty remote", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { + agentId: "triage", + name: "Triage", + description: "Triage bot", + }, + }), + ]); + const plan = computeDiff(desired, emptyRemote()); + + expect(plan.counts).toEqual({ create: 2, update: 0, noop: 0, drift: 0 }); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("noop when remote matches desired", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.noop).toBeGreaterThan(0); + expect(plan.counts.create).toBe(0); + expect(plan.counts.update).toBe(0); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("update when name differs", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Renamed" }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Original" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.update).toBeGreaterThan(0); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("drift when remote has agent not in desired", () => { + const desired = buildState([]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "stale", name: "Stale Agent" }], + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.drift).toBe(1); + expect(renderPlan(plan)).toMatchSnapshot(); + }); }); describe("apply diff — settings", () => { - test("update on networkConfig change", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: ["github.com"] }, - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["pypi.org"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("networkConfig"); - } - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("updates when provider declarations change but ignores installedAt churn", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - installedProviders: [ - { providerId: "anthropic", installedAt: 200 }, - { providerId: "openai", installedAt: 200 }, - ], - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - installedProviders: [{ providerId: "anthropic", installedAt: 100 }], - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("installedProviders"); - } - - const unchanged = computeDiff(desired, { - ...remote, - agentSettings: new Map([ - [ - "triage", - { - installedProviders: [ - { providerId: "anthropic", installedAt: 1 }, - { providerId: "openai", installedAt: 2 }, - ], - updatedAt: 0, - }, - ], - ]), - }); - const unchangedSettingsRow = unchanged.rows.find( - (r) => r.kind === "settings" - ); - expect(unchangedSettingsRow?.verb).toBe("noop"); - }); + test("update on networkConfig change", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: ["github.com"] }, + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["pypi.org"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("networkConfig"); + } + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("updates when provider declarations change but ignores installedAt churn", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + installedProviders: [ + { providerId: "anthropic", installedAt: 200 }, + { providerId: "openai", installedAt: 200 }, + ], + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + installedProviders: [{ providerId: "anthropic", installedAt: 100 }], + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("installedProviders"); + } + + const unchanged = computeDiff(desired, { + ...remote, + agentSettings: new Map([ + [ + "triage", + { + installedProviders: [ + { providerId: "anthropic", installedAt: 1 }, + { providerId: "openai", installedAt: 2 }, + ], + updatedAt: 0, + }, + ], + ]), + }); + const unchangedSettingsRow = unchanged.rows.find( + (r) => r.kind === "settings", + ); + expect(unchangedSettingsRow?.verb).toBe("noop"); + }); }); describe("apply diff — platforms", () => { - test("create on empty remote", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: { botToken: "abc" }, - }, - ], - }), - ]); - const plan = computeDiff(desired, emptyRemote()); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("create"); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("update with willRestart when config changes", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: { botToken: "new" }, - }, - ], - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([ - [ - "triage", - [ - { - id: "triage-telegram", - platform: "telegram", - config: { botToken: "old" }, - }, - ], - ], - ]), - }; - const plan = computeDiff(desired, remote); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("update"); - if (platformRow?.kind === "platform") { - expect(platformRow.willRestart).toBe(true); - } - expect(renderPlan(plan)).toMatchSnapshot(); - }); + test("create on empty remote", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: { botToken: "abc" }, + }, + ], + }), + ]); + const plan = computeDiff(desired, emptyRemote()); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("create"); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("update with willRestart when config changes", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: { botToken: "new" }, + }, + ], + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([ + [ + "triage", + [ + { + id: "triage-telegram", + platform: "telegram", + config: { botToken: "old" }, + }, + ], + ], + ]), + }; + const plan = computeDiff(desired, remote); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("update"); + if (platformRow?.kind === "platform") { + expect(platformRow.willRestart).toBe(true); + } + expect(renderPlan(plan)).toMatchSnapshot(); + }); }); describe("apply diff — memory schema", () => { - test("creates entity + relationship types", () => { - const desired: DesiredState = { - agents: [], - memorySchema: { - entityTypes: [{ slug: "company", name: "Company", required: ["name"] }], - relationshipTypes: [ - { - slug: "works_at", - name: "Works At", - rules: [{ source: "person", target: "company" }], - }, - ], - }, - watchers: [], - requiredSecrets: [], - }; - const plan = computeDiff(desired, emptyRemote()); - expect(plan.counts.create).toBe(2); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("noop when remote matches", () => { - const desired: DesiredState = { - agents: [], - memorySchema: { - entityTypes: [{ slug: "company", name: "Company" }], - relationshipTypes: [], - }, - watchers: [], - requiredSecrets: [], - }; - const remote: RemoteSnapshot = { - ...emptyRemote(), - entityTypes: [{ slug: "company", name: "Company" }], - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.noop).toBe(1); - expect(plan.counts.update).toBe(0); - }); + test("creates entity + relationship types", () => { + const desired: DesiredState = { + agents: [], + memorySchema: { + entityTypes: [{ slug: "company", name: "Company", required: ["name"] }], + relationshipTypes: [ + { + slug: "works_at", + name: "Works At", + rules: [{ source: "person", target: "company" }], + }, + ], + }, + watchers: [], + requiredSecrets: [], + }; + const plan = computeDiff(desired, emptyRemote()); + expect(plan.counts.create).toBe(2); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("noop when remote matches", () => { + const desired: DesiredState = { + agents: [], + memorySchema: { + entityTypes: [{ slug: "company", name: "Company" }], + relationshipTypes: [], + }, + watchers: [], + requiredSecrets: [], + }; + const remote: RemoteSnapshot = { + ...emptyRemote(), + entityTypes: [{ slug: "company", name: "Company" }], + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.noop).toBe(1); + expect(plan.counts.update).toBe(0); + }); }); describe("apply diff — empty container preservation", () => { - // Bug fix: previously canonical() collapsed [] and {} to null, which - // meant clearing a remote allowlist by setting it to [] silently - // round-tripped as a noop instead of an update. - test("clearing networkConfig.allowedDomains from non-empty to [] is an update", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: [] }, - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["foo.com"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("networkConfig"); - } - }); - - test("[] is not equal to null (preserved as distinct values)", () => { - // When desired sets allowedDomains: [] and remote has the field - // missing entirely, the diff should still treat them as equivalent - // for the case where remote literally doesn't have the field — but - // [] vs the explicit array ["foo"] must differ. - const desiredEmpty = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: [] }, - }, - }), - ]); - const remoteWithItems: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["x.com"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desiredEmpty, remoteWithItems); - expect(plan.counts.update).toBeGreaterThan(0); - }); - - test("{} is not equal to populated object", () => { - // empty config object vs populated config object must show as drift/update - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: {}, - }, - ], - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([ - [ - "triage", - [ - { - id: "triage-telegram", - platform: "telegram", - config: { botToken: "abc" }, - }, - ], - ], - ]), - }; - const plan = computeDiff(desired, remote); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("update"); - }); + // Bug fix: previously canonical() collapsed [] and {} to null, which + // meant clearing a remote allowlist by setting it to [] silently + // round-tripped as a noop instead of an update. + test("clearing networkConfig.allowedDomains from non-empty to [] is an update", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: [] }, + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["foo.com"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("networkConfig"); + } + }); + + test("[] is not equal to null (preserved as distinct values)", () => { + // When desired sets allowedDomains: [] and remote has the field + // missing entirely, the diff should still treat them as equivalent + // for the case where remote literally doesn't have the field — but + // [] vs the explicit array ["foo"] must differ. + const desiredEmpty = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: [] }, + }, + }), + ]); + const remoteWithItems: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["x.com"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desiredEmpty, remoteWithItems); + expect(plan.counts.update).toBeGreaterThan(0); + }); + + test("{} is not equal to populated object", () => { + // empty config object vs populated config object must show as drift/update + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: {}, + }, + ], + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([ + [ + "triage", + [ + { + id: "triage-telegram", + platform: "telegram", + config: { botToken: "abc" }, + }, + ], + ], + ]), + }; + const plan = computeDiff(desired, remote); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("update"); + }); }); describe("apply diff — watchers", () => { - const desiredWatcher = { - slug: "weekly-digest", - name: "Weekly digest", - prompt: "Produce a digest.", - extractionSchema: { type: "object" as const }, - schedule: "0 9 * * 1", - }; - - test("create when watcher missing remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const plan = computeDiff(desired, emptyRemote()); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("create"); - expect(row?.id).toBe("weekly-digest"); - }); - - test("noop when watcher already exists remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [{ slug: "weekly-digest", name: "Weekly digest" }], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("noop"); - expect(plan.counts.create).toBe(0); - }); - - test("drift when remote watcher not declared in models", () => { - const desired = buildState([], { watchers: [] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [{ slug: "orphan-watcher" }], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("drift"); - expect(plan.counts.drift).toBe(1); - }); + const desiredWatcher = { + slug: "weekly-digest", + agent: "triage", + name: "Weekly digest", + prompt: "Produce a digest.", + extractionSchema: { type: "object" as const }, + schedule: "0 9 * * 1", + }; + + test("create when watcher missing remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const plan = computeDiff(desired, emptyRemote()); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("create"); + expect(row?.id).toBe("weekly-digest"); + }); + + test("noop when remote matches every field the diff covers", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("noop"); + expect(plan.counts.create).toBe(0); + }); + + test("update with scalar drift when schedule changes remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 10 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect(row?.changedFields).toContain("schedule"); + expect( + (row as { versionBoundFields?: string[] }).versionBoundFields, + ).toBeUndefined(); + }); + + test("update with version-bound drift when prompt changes remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Old prompt", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect( + (row as { versionBoundFields?: string[] }).versionBoundFields, + ).toEqual(["prompt"]); + }); + + test("reaction_script declared → always re-pushed (idempotent)", () => { + const desired = buildState([], { + watchers: [ + { + ...desiredWatcher, + reactionScript: { + sourcePath: "/abs/path/r.ts", + sourceCode: "export default async () => {};", + }, + }, + ], + }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect(row?.changedFields).toEqual(["reaction_script"]); + expect( + (row as { reactionScriptDeclared?: boolean }).reactionScriptDeclared, + ).toBe(true); + }); + + test("drift when remote watcher not declared in models", () => { + const desired = buildState([], { watchers: [] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [{ slug: "orphan-watcher" }], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("drift"); + expect(plan.counts.drift).toBe(1); + }); }); describe("renderSummary", () => { - test("renders zero-row plan", () => { - const desired = buildState([]); - const plan = computeDiff(desired, emptyRemote()); - expect(renderSummary(plan)).toMatchSnapshot(); - }); + test("renders zero-row plan", () => { + const desired = buildState([]); + const plan = computeDiff(desired, emptyRemote()); + expect(renderSummary(plan)).toMatchSnapshot(); + }); }); describe("apply diff — connectors", () => { - const builtinConnectorDef = { - key: "hackernews", - name: "Hacker News", - installed: false, - installable: true, - }; - - function connectorState() { - return buildState([], { - connectors: { - definitions: [ - { - key: "acme", - sourcePath: "/proj/connectors/acme.connector.ts", - sourceCode: "export default class {}", - sourceFile: "connectors/acme.connector.ts", - }, - ], - authProfiles: [ - { - slug: "hn-token", - connector: "hackernews", - kind: "env" as const, - name: "HN token", - credentials: { HN_TOKEN: "$HN_TOKEN" }, - sourceFile: "connectors/hackernews.yaml", - }, - { - slug: "x-account", - connector: "x", - kind: "oauth_account" as const, - sourceFile: "connectors/x.yaml", - }, - ], - connections: [ - { - slug: "hn-frontpage", - connector: "hackernews", - name: "HN front page", - authProfileSlug: "hn-token", - feeds: [{ feedKey: "stories", schedule: "0 * * * *" }], - sourceFile: "connectors/hackernews.yaml", - }, - ], - }, - }); - } - - test("create verbs for new connector def, auth profile, connection, feed", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - }); - const def = plan.rows.find((r) => r.kind === "connector-definition"); - expect(def?.verb).toBe("create"); - const authEnv = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "hn-token" - ); - expect(authEnv?.verb).toBe("create"); - const authOauth = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "x-account" - ); - expect(authOauth?.verb).toBe("create"); - expect( - authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined - ).toBe(true); - const conn = plan.rows.find((r) => r.kind === "connection"); - expect(conn?.verb).toBe("create"); - const feed = plan.rows.find((r) => r.kind === "feed"); - expect(feed?.verb).toBe("create"); - expect(feed?.id).toBe("hn-frontpage/stories"); - }); - - test("noop when connection + feed already match remotely", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "active", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 * * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - expect(plan.rows.find((r) => r.kind === "connection")?.verb).toBe("noop"); - expect(plan.rows.find((r) => r.kind === "feed")?.verb).toBe("noop"); - expect( - plan.rows.find((r) => r.kind === "auth-profile" && r.id === "x-account") - ?.verb - ).toBe("noop"); - }); - - test("update when feed schedule changes; needs-auth when oauth profile inactive", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "pending_auth", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 0 * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - const feed = plan.rows.find((r) => r.kind === "feed"); - expect(feed?.verb).toBe("update"); - expect(feed && "changedFields" in feed ? feed.changedFields : []).toEqual([ - "schedule", - ]); - const authOauth = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "x-account" - ); - expect( - authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined - ).toBe(true); - }); - - test("undeclared remote connector becomes an informational note (no uninstall)", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [ - builtinConnectorDef, - { - key: "legacy", - name: "Legacy", - installed: true, - installable: false, - }, - ], - }; - const plan = computeDiff(connectorState(), remote); - expect(plan.notes.some((n) => n.includes('"legacy"'))).toBe(true); - expect( - plan.rows.some( - (r) => r.kind === "connector-definition" && r.id === "legacy" - ) - ).toBe(false); - }); - - test("connectors are skipped when --only is set", () => { - const plan = computeDiff(connectorState(), emptyRemote(), { - only: "agents", - }); - expect(plan.rows.some((r) => r.kind === "connection")).toBe(false); - expect(plan.rows.some((r) => r.kind === "connector-definition")).toBe( - false - ); - }); - - test("render includes the connectors sections", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - }); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - // ── round-2 ────────────────────────────────────────────────────────────── - - test("connection slug bound to a different connector remotely is a hard error", () => { - expect(() => - computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - connections: [ - { - id: 9, - slug: "hn-frontpage", - connector_key: "rss", - status: "active", - auth_profile_slug: null, - app_auth_profile_slug: null, - config: {}, - }, - ], - }) - ).toThrow(/bound to connector "rss" remotely.*declares "hackernews"/); - }); - - test("auth-profile slug bound to a different kind remotely is a hard error", () => { - expect(() => - computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - connector_key: "hackernews", - profile_kind: "oauth_app", - status: "active", - }, - ], - }) - ).toThrow(/auth_profile "hn-token" is bound to hackernews\/oauth_app/); - }); - - test("credential rotation re-pushes: env profile shows update (credentials)", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - ], - }); - const row = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "hn-token" - ); - expect(row?.verb).toBe("update"); - expect(row && "changedFields" in row ? row.changedFields : []).toContain( - "credentials" - ); - }); - - test("a fully-converged remote state produces no connector create/update (except idempotent connector-def re-push)", () => { - // Build a remote snapshot that exactly mirrors connectorState(): the env - // auth profile has no declared-credential drift suppression, so it would - // re-push (update credentials). The acme connector def is installed, so it - // shows as a (no-op-on-server) "update". Everything else is noop. - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [ - { key: "hackernews", installed: false, installable: true }, - { key: "x", installed: false, installable: true }, - { key: "acme", installed: true, installable: false }, - ], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "active", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 * * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - // Only "update" rows allowed: the connector-def re-push and the - // env-credential re-push — both idempotent on the server. - const nonIdempotentChurn = plan.rows.filter( - (r) => - (r.verb === "create" || r.verb === "update") && - !(r.kind === "connector-definition") && - !(r.kind === "auth-profile" && r.id === "hn-token") - ); - expect(nonIdempotentChurn).toEqual([]); - expect(plan.notes).toEqual([]); - }); - - test("connector-definition with an already-installed key renders as update, not create", () => { - const installedAcme = { key: "acme", installed: true, installable: false }; - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef, installedAcme], - }); - // connectorState()'s acme def has key:"acme"; it is installed remotely. - const row = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") - ); - expect(row?.verb).toBe("update"); - }); - - // ── round-4 ────────────────────────────────────────────────────────────── - - test("referenced-but-not-installed bundled connector becomes a connector-definition create row", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [ - // hackernews: installable + has a server-side source_uri, not installed - { - key: "hackernews", - installed: false, - installable: true, - source_uri: "file:///app/connectors/hackernews.ts", - }, - // x: same - { - key: "x", - installed: false, - installable: true, - source_uri: "file:///app/connectors/x.ts", - }, - ], - }); - const hn = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id === "hackernews" - ); - expect(hn?.verb).toBe("create"); - const x = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id === "x" - ); - expect(x?.verb).toBe("create"); - // acme is locally declared (sourcePath) — it still gets its own row. - expect( - plan.rows.some( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") - ) - ).toBe(true); - }); - - test("a locally-supplied connector key is NOT also a bundled-install row (no double mutation)", () => { - // Pretend "acme" is *also* in the bundled catalog with a source_uri; the - // local .connector.ts should win — no bundled row for "acme". - const state = connectorState(); - // Make a connection reference "acme" so it's in referencedConnectorKeys. - state.connectors.connections.push({ - slug: "acme-conn", - connector: "acme", - feeds: [], - sourceFile: "connectors/acme.yaml", - }); - const plan = computeDiff(state, { - ...emptyRemote(), - connectorDefinitions: [ - { - key: "acme", - installed: false, - installable: true, - source_uri: "file:///app/connectors/acme.ts", - }, - ], - }); - const acmeRows = plan.rows.filter( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") - ); - // Exactly one row — the locally-declared def — never a bundled duplicate. - expect(acmeRows).toHaveLength(1); - }); + const builtinConnectorDef = { + key: "hackernews", + name: "Hacker News", + installed: false, + installable: true, + }; + + function connectorState() { + return buildState([], { + connectors: { + definitions: [ + { + key: "acme", + sourcePath: "/proj/connectors/acme.connector.ts", + sourceCode: "export default class {}", + sourceFile: "connectors/acme.connector.ts", + }, + ], + authProfiles: [ + { + slug: "hn-token", + connector: "hackernews", + kind: "env" as const, + name: "HN token", + credentials: { HN_TOKEN: "$HN_TOKEN" }, + sourceFile: "connectors/hackernews.yaml", + }, + { + slug: "x-account", + connector: "x", + kind: "oauth_account" as const, + sourceFile: "connectors/x.yaml", + }, + ], + connections: [ + { + slug: "hn-frontpage", + connector: "hackernews", + name: "HN front page", + authProfileSlug: "hn-token", + feeds: [{ feedKey: "stories", schedule: "0 * * * *" }], + sourceFile: "connectors/hackernews.yaml", + }, + ], + }, + }); + } + + test("create verbs for new connector def, auth profile, connection, feed", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + }); + const def = plan.rows.find((r) => r.kind === "connector-definition"); + expect(def?.verb).toBe("create"); + const authEnv = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "hn-token", + ); + expect(authEnv?.verb).toBe("create"); + const authOauth = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "x-account", + ); + expect(authOauth?.verb).toBe("create"); + expect( + authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined, + ).toBe(true); + const conn = plan.rows.find((r) => r.kind === "connection"); + expect(conn?.verb).toBe("create"); + const feed = plan.rows.find((r) => r.kind === "feed"); + expect(feed?.verb).toBe("create"); + expect(feed?.id).toBe("hn-frontpage/stories"); + }); + + test("noop when connection + feed already match remotely", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "active", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 * * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + expect(plan.rows.find((r) => r.kind === "connection")?.verb).toBe("noop"); + expect(plan.rows.find((r) => r.kind === "feed")?.verb).toBe("noop"); + expect( + plan.rows.find((r) => r.kind === "auth-profile" && r.id === "x-account") + ?.verb, + ).toBe("noop"); + }); + + test("update when feed schedule changes; needs-auth when oauth profile inactive", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "pending_auth", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 0 * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + const feed = plan.rows.find((r) => r.kind === "feed"); + expect(feed?.verb).toBe("update"); + expect(feed && "changedFields" in feed ? feed.changedFields : []).toEqual([ + "schedule", + ]); + const authOauth = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "x-account", + ); + expect( + authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined, + ).toBe(true); + }); + + test("undeclared remote connector becomes an informational note (no uninstall)", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [ + builtinConnectorDef, + { + key: "legacy", + name: "Legacy", + installed: true, + installable: false, + }, + ], + }; + const plan = computeDiff(connectorState(), remote); + expect(plan.notes.some((n) => n.includes('"legacy"'))).toBe(true); + expect( + plan.rows.some( + (r) => r.kind === "connector-definition" && r.id === "legacy", + ), + ).toBe(false); + }); + + test("connectors are skipped when --only is set", () => { + const plan = computeDiff(connectorState(), emptyRemote(), { + only: "agents", + }); + expect(plan.rows.some((r) => r.kind === "connection")).toBe(false); + expect(plan.rows.some((r) => r.kind === "connector-definition")).toBe( + false, + ); + }); + + test("render includes the connectors sections", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + }); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + // ── round-2 ────────────────────────────────────────────────────────────── + + test("connection slug bound to a different connector remotely is a hard error", () => { + expect(() => + computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + connections: [ + { + id: 9, + slug: "hn-frontpage", + connector_key: "rss", + status: "active", + auth_profile_slug: null, + app_auth_profile_slug: null, + config: {}, + }, + ], + }), + ).toThrow(/bound to connector "rss" remotely.*declares "hackernews"/); + }); + + test("auth-profile slug bound to a different kind remotely is a hard error", () => { + expect(() => + computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + connector_key: "hackernews", + profile_kind: "oauth_app", + status: "active", + }, + ], + }), + ).toThrow(/auth_profile "hn-token" is bound to hackernews\/oauth_app/); + }); + + test("credential rotation re-pushes: env profile shows update (credentials)", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + ], + }); + const row = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "hn-token", + ); + expect(row?.verb).toBe("update"); + expect(row && "changedFields" in row ? row.changedFields : []).toContain( + "credentials", + ); + }); + + test("a fully-converged remote state produces no connector create/update (except idempotent connector-def re-push)", () => { + // Build a remote snapshot that exactly mirrors connectorState(): the env + // auth profile has no declared-credential drift suppression, so it would + // re-push (update credentials). The acme connector def is installed, so it + // shows as a (no-op-on-server) "update". Everything else is noop. + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [ + { key: "hackernews", installed: false, installable: true }, + { key: "x", installed: false, installable: true }, + { key: "acme", installed: true, installable: false }, + ], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "active", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 * * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + // Only "update" rows allowed: the connector-def re-push and the + // env-credential re-push — both idempotent on the server. + const nonIdempotentChurn = plan.rows.filter( + (r) => + (r.verb === "create" || r.verb === "update") && + !(r.kind === "connector-definition") && + !(r.kind === "auth-profile" && r.id === "hn-token"), + ); + expect(nonIdempotentChurn).toEqual([]); + expect(plan.notes).toEqual([]); + }); + + test("connector-definition with an already-installed key renders as update, not create", () => { + const installedAcme = { key: "acme", installed: true, installable: false }; + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef, installedAcme], + }); + // connectorState()'s acme def has key:"acme"; it is installed remotely. + const row = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), + ); + expect(row?.verb).toBe("update"); + }); + + // ── round-4 ────────────────────────────────────────────────────────────── + + test("referenced-but-not-installed bundled connector becomes a connector-definition create row", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [ + // hackernews: installable + has a server-side source_uri, not installed + { + key: "hackernews", + installed: false, + installable: true, + source_uri: "file:///app/connectors/hackernews.ts", + }, + // x: same + { + key: "x", + installed: false, + installable: true, + source_uri: "file:///app/connectors/x.ts", + }, + ], + }); + const hn = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id === "hackernews", + ); + expect(hn?.verb).toBe("create"); + const x = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id === "x", + ); + expect(x?.verb).toBe("create"); + // acme is locally declared (sourcePath) — it still gets its own row. + expect( + plan.rows.some( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), + ), + ).toBe(true); + }); + + test("a locally-supplied connector key is NOT also a bundled-install row (no double mutation)", () => { + // Pretend "acme" is *also* in the bundled catalog with a source_uri; the + // local .connector.ts should win — no bundled row for "acme". + const state = connectorState(); + // Make a connection reference "acme" so it's in referencedConnectorKeys. + state.connectors.connections.push({ + slug: "acme-conn", + connector: "acme", + feeds: [], + sourceFile: "connectors/acme.yaml", + }); + const plan = computeDiff(state, { + ...emptyRemote(), + connectorDefinitions: [ + { + key: "acme", + installed: false, + installable: true, + source_uri: "file:///app/connectors/acme.ts", + }, + ], + }); + const acmeRows = plan.rows.filter( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), + ); + // Exactly one row — the locally-declared def — never a bundled duplicate. + expect(acmeRows).toHaveLength(1); + }); }); diff --git a/packages/cli/src/commands/_lib/apply/apply-cmd.ts b/packages/cli/src/commands/_lib/apply/apply-cmd.ts index 3bd950b6a..4b4e6ce9f 100644 --- a/packages/cli/src/commands/_lib/apply/apply-cmd.ts +++ b/packages/cli/src/commands/_lib/apply/apply-cmd.ts @@ -670,22 +670,141 @@ async function executePlan( printText(renderProgress(row.verb, "relationship-type", row.id)); } - // 6) Watchers (create-only; drift ignored) + // 6) Watchers — create (full payload + reaction script) or update (scalar + // row fields via `update`, version-bound fields via `create_version`, + // reaction script via `set_reaction_script`). Drift detection lives in + // `diffWatcher`; this loop just routes to the right admin action. + const remoteWatcherBySlug = new Map( + ctx.remote.watchers.map((w) => [w.slug, w]) + ); for (const row of rowsByKind("watcher")) { if (row.kind !== "watcher") continue; if (!row.desired) continue; const w = row.desired; - await ctx.client.createWatcher({ - slug: w.slug, - agentId: w.agent, - name: w.name, - description: w.description, - prompt: w.prompt, - extraction_schema: w.extractionSchema, - schedule: w.schedule, - sources: w.sources, - }); - printText(renderProgress(row.verb, "watcher", row.id)); + let watcherId: string | undefined; + if (row.verb === "create") { + const created = await ctx.client.createWatcher({ + slug: w.slug, + agentId: w.agent, + name: w.name, + description: w.description, + prompt: w.prompt, + extraction_schema: w.extractionSchema, + schedule: w.schedule, + sources: w.sources, + reactions_guidance: w.reactionsGuidance, + device_worker_id: w.deviceWorkerId, + scheduler_client_id: w.schedulerClientId, + notification_channel: w.notificationChannel, + notification_priority: w.notificationPriority, + min_cooldown_seconds: w.minCooldownSeconds, + tags: w.tags, + agent_kind: w.agentKind, + json_template: w.jsonTemplate, + keying_config: w.keyingConfig, + classifiers: w.classifiers, + condensation_prompt: w.condensationPrompt, + condensation_window_count: w.condensationWindowCount, + }); + watcherId = created.watcher_id; + } else if (row.verb === "update") { + const remote = remoteWatcherBySlug.get(w.slug); + watcherId = remote?.watcher_id; + if (!watcherId) { + throw new ApiError( + `update watcher "${w.slug}" failed: remote row is missing watcher_id (refetch may be stale)` + ); + } + const versionBound = new Set(row.versionBoundFields ?? []); + const changed = new Set(row.changedFields ?? []); + const scalarChanges = [...changed].filter( + (f) => !versionBound.has(f) && f !== "reaction_script" + ); + // a) Scalar fields → manage_watchers update + if (scalarChanges.length > 0) { + await ctx.client.updateWatcher({ + watcher_id: watcherId, + ...(scalarChanges.includes("schedule") + ? { schedule: w.schedule ?? null } + : {}), + ...(scalarChanges.includes("agent_id") ? { agent_id: w.agent } : {}), + ...(scalarChanges.includes("device_worker_id") + ? { device_worker_id: w.deviceWorkerId ?? null } + : {}), + ...(scalarChanges.includes("scheduler_client_id") + ? { scheduler_client_id: w.schedulerClientId ?? null } + : {}), + ...(scalarChanges.includes("notification_channel") && + w.notificationChannel + ? { notification_channel: w.notificationChannel } + : {}), + ...(scalarChanges.includes("notification_priority") && + w.notificationPriority + ? { notification_priority: w.notificationPriority } + : {}), + ...(scalarChanges.includes("min_cooldown_seconds") && + w.minCooldownSeconds !== undefined + ? { min_cooldown_seconds: w.minCooldownSeconds } + : {}), + ...(scalarChanges.includes("tags") && w.tags ? { tags: w.tags } : {}), + ...(scalarChanges.includes("agent_kind") + ? { agent_kind: w.agentKind ?? null } + : {}), + }); + } + // b) Version-bound fields → manage_watchers create_version (server + // inherits unset fields from the previous version row, but we always + // send the desired-side values for the changed keys). + if (row.versionBoundFields && row.versionBoundFields.length > 0) { + await ctx.client.createWatcherVersion({ + watcher_id: watcherId, + ...(versionBound.has("prompt") ? { prompt: w.prompt } : {}), + ...(versionBound.has("extraction_schema") + ? { extraction_schema: w.extractionSchema } + : {}), + ...(versionBound.has("sources") && w.sources !== undefined + ? { sources: w.sources } + : {}), + ...(versionBound.has("reactions_guidance") && + w.reactionsGuidance !== undefined + ? { reactions_guidance: w.reactionsGuidance } + : {}), + ...(versionBound.has("json_template") && w.jsonTemplate !== undefined + ? { json_template: w.jsonTemplate } + : {}), + ...(versionBound.has("keying_config") && w.keyingConfig !== undefined + ? { keying_config: w.keyingConfig } + : {}), + ...(versionBound.has("classifiers") && w.classifiers !== undefined + ? { classifiers: w.classifiers } + : {}), + ...(versionBound.has("condensation_prompt") && + w.condensationPrompt !== undefined + ? { condensation_prompt: w.condensationPrompt } + : {}), + ...(versionBound.has("condensation_window_count") && + w.condensationWindowCount !== undefined + ? { condensation_window_count: w.condensationWindowCount } + : {}), + }); + } + } + // c) Reaction script — push when declared (idempotent server-side, no + // drift signal available because it's not returned by list_watchers). + if (w.reactionScript && watcherId) { + await ctx.client.setReactionScript( + watcherId, + w.reactionScript.sourceCode + ); + } + printText( + renderProgress( + row.verb, + "watcher", + row.id, + row.changedFields ? `(${row.changedFields.join(", ")})` : undefined + ) + ); } // Auth profiles (create / update; interactive kinds → punch-list) @@ -743,6 +862,9 @@ async function executePlan( authProfileSlug: desired.authProfileSlug ?? null, appAuthProfileSlug: desired.appAuthProfileSlug ?? null, config: desired.config ?? {}, + // Always pass — server treats undefined as "leave alone", null as + // "unpin to server", and a uuid as "move to that device". + deviceWorkerId: desired.deviceWorkerId ?? null, }); connectionIdBySlug.set(desired.slug, updated.id); } else { @@ -753,6 +875,9 @@ async function executePlan( authProfileSlug: desired.authProfileSlug, appAuthProfileSlug: desired.appAuthProfileSlug, config: desired.config, + ...(desired.deviceWorkerId + ? { deviceWorkerId: desired.deviceWorkerId } + : {}), }); connectionIdBySlug.set(desired.slug, created.id); } diff --git a/packages/cli/src/commands/_lib/apply/client.ts b/packages/cli/src/commands/_lib/apply/client.ts index c73126984..76e801f5c 100644 --- a/packages/cli/src/commands/_lib/apply/client.ts +++ b/packages/cli/src/commands/_lib/apply/client.ts @@ -47,6 +47,28 @@ export interface RemoteWatcher { slug: string; name?: string; watcher_id?: string; + agent_id?: string | null; + schedule?: string | null; + device_worker_id?: string | null; + goal_id?: number | null; + scheduler_client_id?: string | null; + agent_kind?: string | null; + notification_channel?: string | null; + notification_priority?: string | null; + min_cooldown_seconds?: number | null; + tags?: string[] | null; + sources?: Array<{ name: string; query: string }> | null; + // include_details=true → version-bound fields + description?: string | null; + prompt?: string | null; + extraction_schema?: Record | null; + classifiers?: unknown[] | null; + json_template?: unknown; + keying_config?: Record | null; + condensation_prompt?: string | null; + condensation_window_count?: number | null; + reactions_guidance?: string | null; + // NB: reaction_script is NOT in list_watchers — push always (idempotent). } export interface UpsertPlatformResult { @@ -99,6 +121,7 @@ export interface RemoteConnection { auth_profile_slug?: string | null; app_auth_profile_slug?: string | null; config?: Record | null; + device_worker_id?: string | null; } export interface RemoteFeed { @@ -528,10 +551,40 @@ export class ApplyClient { // ── Watchers ────────────────────────────────────────────────────────────── + /** + * Fetch a single watcher's full payload — `getWatcher` server-side, which + * returns reaction_script (not in the list response). Used by `lobu export` + * to round-trip reaction scripts back to sibling `.ts` files. + */ + async getWatcherDetail(watcherId: string): Promise<{ + reaction_script?: string | null; + description?: string | null; + } | null> { + try { + const { body } = await this.request<{ + watcher?: { + reaction_script?: string | null; + description?: string | null; + }; + }>( + "GET", + `/api/${this.orgSlug}/watchers?watcher_id=${encodeURIComponent(watcherId)}` + ); + return body.watcher ?? null; + } catch (err) { + if (err instanceof ApiError && err.status === 404) return null; + throw err; + } + } + async listWatchers(): Promise { + // `include_details=true` pulls the version-bound fields (prompt, + // extraction_schema, classifiers, json_template, keying_config, + // condensation_*, reactions_guidance) too. Apply diffs against these to + // detect drift on the prompt / schema / sources / etc. const { body } = await this.request<{ watchers?: RemoteWatcher[] }>( "GET", - `/api/${this.orgSlug}/watchers` + `/api/${this.orgSlug}/watchers?include_details=true` ); return body.watchers ?? []; } @@ -551,17 +604,197 @@ export class ApplyClient { extraction_schema: Record; schedule?: string; sources?: Array<{ name: string; query: string }>; + reactions_guidance?: string; + device_worker_id?: string; + scheduler_client_id?: string; + notification_channel?: "canvas" | "notification" | "both"; + notification_priority?: "low" | "normal" | "high"; + min_cooldown_seconds?: number; + tags?: string[]; + agent_kind?: string; + json_template?: unknown; + keying_config?: Record; + classifiers?: unknown[]; + condensation_prompt?: string; + condensation_window_count?: number; + }): Promise<{ watcher_id?: string }> { + const { body } = await this.request<{ watcher_id?: string }>( + "POST", + `/api/${this.orgSlug}/manage_watchers`, + { + action: "create", + slug: payload.slug, + agent_id: payload.agentId, + ...(payload.name ? { name: payload.name } : {}), + ...(payload.description ? { description: payload.description } : {}), + prompt: payload.prompt, + extraction_schema: payload.extraction_schema, + ...(payload.schedule ? { schedule: payload.schedule } : {}), + ...(payload.sources?.length ? { sources: payload.sources } : {}), + ...(payload.reactions_guidance !== undefined + ? { reactions_guidance: payload.reactions_guidance } + : {}), + ...(payload.device_worker_id !== undefined + ? { device_worker_id: payload.device_worker_id } + : {}), + ...(payload.scheduler_client_id !== undefined + ? { scheduler_client_id: payload.scheduler_client_id } + : {}), + ...(payload.notification_channel !== undefined + ? { notification_channel: payload.notification_channel } + : {}), + ...(payload.notification_priority !== undefined + ? { notification_priority: payload.notification_priority } + : {}), + ...(payload.min_cooldown_seconds !== undefined + ? { min_cooldown_seconds: payload.min_cooldown_seconds } + : {}), + ...(payload.tags?.length ? { tags: payload.tags } : {}), + ...(payload.agent_kind !== undefined + ? { agent_kind: payload.agent_kind } + : {}), + ...(payload.json_template !== undefined + ? { json_template: payload.json_template } + : {}), + ...(payload.keying_config !== undefined + ? { keying_config: payload.keying_config } + : {}), + ...(payload.classifiers !== undefined + ? { classifiers: payload.classifiers } + : {}), + ...(payload.condensation_prompt !== undefined + ? { condensation_prompt: payload.condensation_prompt } + : {}), + ...(payload.condensation_window_count !== undefined + ? { condensation_window_count: payload.condensation_window_count } + : {}), + } + ); + return { ...(body.watcher_id ? { watcher_id: body.watcher_id } : {}) }; + } + + /** + * Update the **scalar** fields on the `watchers` row — these don't require + * a new version. Version-bound fields (prompt / extraction_schema / sources + * / reactions_guidance / json_template / keying_config / classifiers / + * condensation_*) require `createWatcherVersion` instead. + * + * `null` clears nullable fields (device_worker_id, scheduler_client_id, + * goal_id, agent_kind) per the server contract. + */ + async updateWatcher(payload: { + watcher_id: string; + schedule?: string | null; + agent_id?: string; + device_worker_id?: string | null; + scheduler_client_id?: string | null; + notification_channel?: "canvas" | "notification" | "both"; + notification_priority?: "low" | "normal" | "high"; + min_cooldown_seconds?: number; + tags?: string[]; + agent_kind?: string | null; + goal_id?: number | null; + model_config?: Record; }): Promise { await this.request("POST", `/api/${this.orgSlug}/manage_watchers`, { - action: "create", - slug: payload.slug, - agent_id: payload.agentId, - ...(payload.name ? { name: payload.name } : {}), - ...(payload.description ? { description: payload.description } : {}), - prompt: payload.prompt, - extraction_schema: payload.extraction_schema, - ...(payload.schedule ? { schedule: payload.schedule } : {}), - ...(payload.sources?.length ? { sources: payload.sources } : {}), + action: "update", + watcher_id: payload.watcher_id, + ...(payload.schedule !== undefined ? { schedule: payload.schedule } : {}), + ...(payload.agent_id !== undefined ? { agent_id: payload.agent_id } : {}), + ...(payload.device_worker_id !== undefined + ? { device_worker_id: payload.device_worker_id } + : {}), + ...(payload.scheduler_client_id !== undefined + ? { scheduler_client_id: payload.scheduler_client_id } + : {}), + ...(payload.notification_channel !== undefined + ? { notification_channel: payload.notification_channel } + : {}), + ...(payload.notification_priority !== undefined + ? { notification_priority: payload.notification_priority } + : {}), + ...(payload.min_cooldown_seconds !== undefined + ? { min_cooldown_seconds: payload.min_cooldown_seconds } + : {}), + ...(payload.tags !== undefined ? { tags: payload.tags } : {}), + ...(payload.agent_kind !== undefined + ? { agent_kind: payload.agent_kind } + : {}), + ...(payload.goal_id !== undefined ? { goal_id: payload.goal_id } : {}), + ...(payload.model_config !== undefined + ? { model_config: payload.model_config } + : {}), + }); + } + + /** + * Create a new watcher_versions row carrying the version-bound fields, then + * upgrade the watcher's `current_version_id` to that new version. Server + * inherits unset fields from the previous version row. + */ + async createWatcherVersion(payload: { + watcher_id: string; + prompt?: string; + extraction_schema?: Record; + sources?: Array<{ name: string; query: string }>; + json_template?: unknown; + keying_config?: Record; + classifiers?: unknown[]; + reactions_guidance?: string; + condensation_prompt?: string; + condensation_window_count?: number; + change_notes?: string; + }): Promise<{ version?: number }> { + const { body } = await this.request<{ version?: number }>( + "POST", + `/api/${this.orgSlug}/manage_watchers`, + { + action: "create_version", + watcher_id: payload.watcher_id, + set_as_current: true, + ...(payload.prompt !== undefined ? { prompt: payload.prompt } : {}), + ...(payload.extraction_schema !== undefined + ? { extraction_schema: payload.extraction_schema } + : {}), + ...(payload.sources !== undefined ? { sources: payload.sources } : {}), + ...(payload.json_template !== undefined + ? { json_template: payload.json_template } + : {}), + ...(payload.keying_config !== undefined + ? { keying_config: payload.keying_config } + : {}), + ...(payload.classifiers !== undefined + ? { classifiers: payload.classifiers } + : {}), + ...(payload.reactions_guidance !== undefined + ? { reactions_guidance: payload.reactions_guidance } + : {}), + ...(payload.condensation_prompt !== undefined + ? { condensation_prompt: payload.condensation_prompt } + : {}), + ...(payload.condensation_window_count !== undefined + ? { condensation_window_count: payload.condensation_window_count } + : {}), + ...(payload.change_notes + ? { change_notes: payload.change_notes } + : { change_notes: "lobu apply" }), + } + ); + return body.version !== undefined ? { version: body.version } : {}; + } + + /** + * Attach (or clear) a reaction script. Pass an empty string to remove it — + * matches the admin tool contract. + */ + async setReactionScript( + watcherId: string, + reactionScript: string + ): Promise { + await this.request("POST", `/api/${this.orgSlug}/manage_watchers`, { + action: "set_reaction_script", + watcher_id: watcherId, + reaction_script: reactionScript, }); } @@ -756,6 +989,7 @@ export class ApplyClient { authProfileSlug?: string; appAuthProfileSlug?: string; config?: Record; + deviceWorkerId?: string; }): Promise { const body = await this.connectionsTool<{ connection?: RemoteConnection }>({ action: "create", @@ -769,6 +1003,9 @@ export class ApplyClient { ? { app_auth_profile_slug: payload.appAuthProfileSlug } : {}), ...(payload.config ? { config: payload.config } : {}), + ...(payload.deviceWorkerId + ? { device_worker_id: payload.deviceWorkerId } + : {}), }); if (!body.connection) { throw new ApiError( @@ -785,6 +1022,7 @@ export class ApplyClient { authProfileSlug?: string | null; appAuthProfileSlug?: string | null; config?: Record; + deviceWorkerId?: string | null; } ): Promise { const body = await this.connectionsTool<{ connection?: RemoteConnection }>({ @@ -802,6 +1040,9 @@ export class ApplyClient { ...(payload.config !== undefined ? { config: payload.config, replace_config: true } : {}), + ...(payload.deviceWorkerId !== undefined + ? { device_worker_id: payload.deviceWorkerId } + : {}), }); if (!body.connection) { throw new ApiError( diff --git a/packages/cli/src/commands/_lib/apply/desired-state.ts b/packages/cli/src/commands/_lib/apply/desired-state.ts index 0d0baed0a..9de42b06f 100644 --- a/packages/cli/src/commands/_lib/apply/desired-state.ts +++ b/packages/cli/src/commands/_lib/apply/desired-state.ts @@ -117,6 +117,40 @@ export interface DesiredWatcher { extractionSchema: Record; /** Optional SQL data sources; server applies a default when omitted. */ sources?: Array<{ name: string; query: string }>; + /** + * Reaction script — TypeScript source compiled + executed in an isolate at + * watcher-firing time. Authored as a sibling `.ts` file (`reaction_script: + * ./funnel-digest.ts` in the YAML), the CLI reads it and pushes raw source + * via `set_reaction_script`. Inline strings are rejected so the IDE can + * type-check the script. + */ + reactionScript?: { sourcePath: string; sourceCode: string }; + /** LLM guidance for the watcher's downstream reaction agent. */ + reactionsGuidance?: string; + /** UUID of a device worker to pin this watcher's runs to (see `device_workers.id`). */ + deviceWorkerId?: string; + /** MCP client id that should auto-run this watcher. */ + schedulerClientId?: string; + /** Where firings surface — defaults to canvas server-side. */ + notificationChannel?: "canvas" | "notification" | "both"; + /** Priority class used by the dispatcher interrupt budget. */ + notificationPriority?: "low" | "normal" | "high"; + /** Minimum seconds between two firings of this watcher (0 = no cooldown). */ + minCooldownSeconds?: number; + /** Free-form tags for filtering. */ + tags?: string[]; + /** Optional agent-kind override (e.g. "background", "notifier"). */ + agentKind?: string; + /** Optional JSON template for renderer. */ + jsonTemplate?: unknown; + /** Stable key generation across windows. */ + keyingConfig?: Record; + /** Classifier definitions for extraction (server-side feature). */ + classifiers?: unknown[]; + /** Handlebars prompt for condensing windows into a rollup. */ + condensationPrompt?: string; + /** How many leaf windows to condense into one rollup (default 4 server-side). */ + condensationWindowCount?: number; } export interface DesiredFeed { @@ -138,6 +172,12 @@ export interface DesiredConnection { /** Slug of the OAuth-app auth profile (`app_auth:` in the manifest). */ appAuthProfileSlug?: string; config?: Record; + /** + * Optional UUID pinning the connection's syncs/actions to a specific device + * worker (`device_workers.id`). Required for connectors that declare a + * `required_capability`; omit it for serverless-on-Lobu runs. + */ + deviceWorkerId?: string; feeds: DesiredFeed[]; /** Relative path of the YAML file the doc came from (for error messages). */ sourceFile: string; @@ -753,7 +793,12 @@ function parseEntityType(raw: unknown): DesiredEntityType { return out; } -function parseWatcher(raw: unknown): DesiredWatcher { +const NOTIFICATION_CHANNELS = new Set(["canvas", "notification", "both"]); +const NOTIFICATION_PRIORITIES = new Set(["low", "normal", "high"]); +const UUID_PATTERN = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +function parseWatcher(raw: unknown, modelFileAbsPath: string): DesiredWatcher { if (!isRecord(raw) || typeof raw.slug !== "string") { throw new ValidationError( `watcher model files must be objects with a "slug" string field; got ${JSON.stringify(raw)}` @@ -780,7 +825,13 @@ function parseWatcher(raw: unknown): DesiredWatcher { }; if (typeof raw.name === "string") out.name = raw.name; if (typeof raw.description === "string") out.description = raw.description; - if (typeof raw.schedule === "string") out.schedule = raw.schedule; + if (typeof raw.schedule === "string") { + const err = cronError(raw.schedule); + if (err) { + throw new ValidationError(`watcher "${raw.slug}" ${err}`); + } + out.schedule = raw.schedule; + } if (Array.isArray(raw.sources)) { out.sources = raw.sources .filter(isRecord) @@ -790,6 +841,202 @@ function parseWatcher(raw: unknown): DesiredWatcher { ) .map((s) => ({ name: s.name, query: s.query })); } + + // Reaction script — sibling `.ts` file, resolved relative to the YAML. + // Path constraints: must be a relative POSIX-style path that stays under + // the YAML's directory tree (no leading `/`, no `..` segments), must end + // in `.ts`, and the file must be ≤ 256 KiB. The server compiles and + // executes the source in an isolate, so the trust boundary is the + // operator's file system — this validation prevents a hostile YAML + // (e.g. a PR that touches an unrelated model file) from sucking in a + // sensitive file like `/etc/passwd` or `../../.ssh/id_rsa`. + if (raw.reaction_script !== undefined) { + if ( + typeof raw.reaction_script !== "string" || + !raw.reaction_script.trim() + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` must be a path to a sibling .ts file (e.g. \`reaction_script: ./funnel-digest.ts\`). Inline scripts are not supported — keep the TypeScript in its own file so your IDE can type-check it.` + ); + } + const rel = raw.reaction_script.trim(); + if (rel.startsWith("/") || rel.includes("\\")) { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` must be a relative POSIX path (./foo.ts) — absolute paths and backslashes are not allowed` + ); + } + if (rel.split("/").some((seg) => seg === "..")) { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` must not contain \`..\` segments — keep the script under the model file's directory tree` + ); + } + if (!rel.endsWith(".ts")) { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` must end in \`.ts\` (got ${JSON.stringify(rel)})` + ); + } + const baseDir = resolve(modelFileAbsPath, ".."); + const abs = resolve(baseDir, rel); + // Belt-and-braces — symlinks or unusual relative-path forms shouldn't + // escape the baseDir even if the above checks let one through. + if (!abs.startsWith(`${baseDir}/`) && abs !== baseDir) { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` resolves outside the model directory (${abs})` + ); + } + let sourceCode: string; + try { + sourceCode = readFileSync(abs, "utf-8"); + } catch { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` ${rel} does not exist (resolved to ${abs})` + ); + } + const REACTION_SCRIPT_MAX_BYTES = 256 * 1024; + if (Buffer.byteLength(sourceCode, "utf8") > REACTION_SCRIPT_MAX_BYTES) { + throw new ValidationError( + `watcher "${raw.slug}" \`reaction_script\` exceeds the ${REACTION_SCRIPT_MAX_BYTES}-byte cap — reaction scripts should be a few hundred lines, not a vendored library` + ); + } + out.reactionScript = { sourcePath: abs, sourceCode }; + } + + if (raw.reactions_guidance !== undefined) { + if (typeof raw.reactions_guidance !== "string") { + throw new ValidationError( + `watcher "${raw.slug}" \`reactions_guidance\` must be a string` + ); + } + out.reactionsGuidance = raw.reactions_guidance; + } + + if (raw.device_worker_id !== undefined) { + if ( + typeof raw.device_worker_id !== "string" || + !UUID_PATTERN.test(raw.device_worker_id.trim()) + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`device_worker_id\` must be a UUID string (the device_workers.id of the device this watcher should run on)` + ); + } + out.deviceWorkerId = raw.device_worker_id.trim(); + } + + if (raw.scheduler_client_id !== undefined) { + if ( + typeof raw.scheduler_client_id !== "string" || + !raw.scheduler_client_id.trim() + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`scheduler_client_id\` must be a non-empty string` + ); + } + out.schedulerClientId = raw.scheduler_client_id.trim(); + } + + if (raw.notification_channel !== undefined) { + if ( + typeof raw.notification_channel !== "string" || + !NOTIFICATION_CHANNELS.has(raw.notification_channel) + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`notification_channel\` must be one of: canvas, notification, both` + ); + } + out.notificationChannel = + raw.notification_channel as DesiredWatcher["notificationChannel"]; + } + + if (raw.notification_priority !== undefined) { + if ( + typeof raw.notification_priority !== "string" || + !NOTIFICATION_PRIORITIES.has(raw.notification_priority) + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`notification_priority\` must be one of: low, normal, high` + ); + } + out.notificationPriority = + raw.notification_priority as DesiredWatcher["notificationPriority"]; + } + + if (raw.min_cooldown_seconds !== undefined) { + if ( + typeof raw.min_cooldown_seconds !== "number" || + !Number.isFinite(raw.min_cooldown_seconds) || + raw.min_cooldown_seconds < 0 + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`min_cooldown_seconds\` must be a non-negative number` + ); + } + out.minCooldownSeconds = raw.min_cooldown_seconds; + } + + if (raw.tags !== undefined) { + if ( + !Array.isArray(raw.tags) || + !raw.tags.every((t) => typeof t === "string") + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`tags\` must be an array of strings` + ); + } + out.tags = raw.tags as string[]; + } + + if (raw.agent_kind !== undefined) { + if (typeof raw.agent_kind !== "string" || !raw.agent_kind.trim()) { + throw new ValidationError( + `watcher "${raw.slug}" \`agent_kind\` must be a non-empty string` + ); + } + out.agentKind = raw.agent_kind.trim(); + } + + if (raw.json_template !== undefined) { + out.jsonTemplate = raw.json_template; + } + + if (raw.keying_config !== undefined) { + if (!isRecord(raw.keying_config)) { + throw new ValidationError( + `watcher "${raw.slug}" \`keying_config\` must be an object` + ); + } + out.keyingConfig = raw.keying_config; + } + + if (raw.classifiers !== undefined) { + if (!Array.isArray(raw.classifiers)) { + throw new ValidationError( + `watcher "${raw.slug}" \`classifiers\` must be an array` + ); + } + out.classifiers = raw.classifiers; + } + + if (raw.condensation_prompt !== undefined) { + if (typeof raw.condensation_prompt !== "string") { + throw new ValidationError( + `watcher "${raw.slug}" \`condensation_prompt\` must be a string` + ); + } + out.condensationPrompt = raw.condensation_prompt; + } + + if (raw.condensation_window_count !== undefined) { + if ( + typeof raw.condensation_window_count !== "number" || + raw.condensation_window_count < 2 + ) { + throw new ValidationError( + `watcher "${raw.slug}" \`condensation_window_count\` must be a number ≥ 2` + ); + } + out.condensationWindowCount = raw.condensation_window_count; + } + return out; } @@ -890,7 +1137,13 @@ async function loadMemoryModels( } else if (model.modelType === "relationship") { relationshipTypes.push(parseRelationshipType(model.data)); } else if (model.modelType === "watcher") { - watchers.push(parseWatcher(model.data)); + // `model.file` is like `schema.yaml:watchers[0]` (optionally with + // `#docIdx` for multi-doc streams) — strip the synthetic suffix + // before resolving on disk, then map through `modelsPath` to the + // absolute YAML path. `parseWatcher` reads `reaction_script:` + // sibling .ts files relative to that. + const yamlRel = model.file.replace(/[:#].*$/, ""); + watchers.push(parseWatcher(model.data, join(modelsPath, yamlRel))); } } } @@ -986,6 +1239,17 @@ function parseConnectionDoc( if (auth) out.authProfileSlug = auth; const appAuth = asString(raw.app_auth); if (appAuth) out.appAuthProfileSlug = appAuth; + if (raw.device_worker_id !== undefined) { + if ( + typeof raw.device_worker_id !== "string" || + !UUID_PATTERN.test(raw.device_worker_id.trim()) + ) { + throw new ValidationError( + `${file}: connection "${slug}" \`device_worker_id\` must be a UUID (the device_workers.id of the device this connection runs on)` + ); + } + out.deviceWorkerId = raw.device_worker_id.trim(); + } if (raw.config !== undefined) { if (!isRecord(raw.config)) { throw new ValidationError( diff --git a/packages/cli/src/commands/_lib/apply/diff.ts b/packages/cli/src/commands/_lib/apply/diff.ts index bcb0bd9f1..cae95808d 100644 --- a/packages/cli/src/commands/_lib/apply/diff.ts +++ b/packages/cli/src/commands/_lib/apply/diff.ts @@ -75,8 +75,19 @@ export interface WatcherDiffRow extends BaseRow { kind: "watcher"; desired?: DesiredWatcher; remote?: RemoteWatcher; - /** Always absent — watchers are create-or-noop, never updated by apply. */ + /** Per-field changes when verb === "update". */ changedFields?: string[]; + /** + * Field names that require a `create_version` + `upgrade` (vs a plain + * `update`). Apply uses this to route writes to the right admin action. + */ + versionBoundFields?: string[]; + /** + * True when the desired watcher declares a `reaction_script` — server stores + * it write-only, so the diff can't tell whether it changed; apply always + * re-pushes (idempotent). Matches the auth-profile credentials pattern. + */ + reactionScriptDeclared?: boolean; } export interface ConnectorDefinitionDiffRow extends BaseRow { @@ -422,25 +433,152 @@ function diffRelationshipType( } /** - * Watchers are sync-on-create only: a model file declares a watcher, and the - * first apply creates it. Subsequent applies see the slug remotely and noop. - * Remote watchers without a desired model are reported as drift, never - * deleted — and we deliberately don't diff prompt/schema/schedule changes in - * v1, so editing a watcher in the UI won't be clobbered by apply. + * Watcher drift fields split into two routing categories: + * - **scalar** lives on the `watchers` row → `manage_watchers update`. + * - **version-bound** lives on the `watcher_versions` row → must go through + * `create_version` + `upgrade` (server-side bumps `current_version_id`). + * The diff returns both lists; apply-cmd routes accordingly. + * + * Reaction scripts aren't returned by `list_watchers` (write-only on the row), + * so we can't compare them — apply always re-pushes when declared (idempotent). + * Remote watchers without a desired model are reported as drift, never deleted. */ function diffWatcher( desired: DesiredWatcher, remote: RemoteWatcher | undefined ): WatcherDiffRow { + const reactionScriptDeclared = desired.reactionScript !== undefined; if (!remote) { - return { kind: "watcher", verb: "create", id: desired.slug, desired }; + return { + kind: "watcher", + verb: "create", + id: desired.slug, + desired, + ...(reactionScriptDeclared ? { reactionScriptDeclared: true } : {}), + }; + } + + const scalar: string[] = []; + if ((desired.schedule ?? null) !== (remote.schedule ?? null)) { + scalar.push("schedule"); + } + if (desired.agent !== (remote.agent_id ?? "")) { + scalar.push("agent_id"); + } + if ( + desired.deviceWorkerId !== undefined && + desired.deviceWorkerId !== (remote.device_worker_id ?? undefined) + ) { + scalar.push("device_worker_id"); + } + if ( + desired.schedulerClientId !== undefined && + desired.schedulerClientId !== (remote.scheduler_client_id ?? undefined) + ) { + scalar.push("scheduler_client_id"); + } + if ( + desired.notificationChannel !== undefined && + desired.notificationChannel !== (remote.notification_channel ?? undefined) + ) { + scalar.push("notification_channel"); + } + if ( + desired.notificationPriority !== undefined && + desired.notificationPriority !== (remote.notification_priority ?? undefined) + ) { + scalar.push("notification_priority"); + } + if ( + desired.minCooldownSeconds !== undefined && + desired.minCooldownSeconds !== (remote.min_cooldown_seconds ?? undefined) + ) { + scalar.push("min_cooldown_seconds"); + } + if ( + desired.tags !== undefined && + !deepEqual(desired.tags, remote.tags ?? []) + ) { + scalar.push("tags"); + } + if ( + desired.agentKind !== undefined && + desired.agentKind !== (remote.agent_kind ?? undefined) + ) { + scalar.push("agent_kind"); + } + + const versionBound: string[] = []; + if (desired.prompt !== (remote.prompt ?? "")) { + versionBound.push("prompt"); + } + if ( + !deepEqual(desired.extractionSchema ?? {}, remote.extraction_schema ?? {}) + ) { + versionBound.push("extraction_schema"); + } + // Sources live on the watchers row but are written as part of create_version + // when changed (server copies them to the version's per-assignment scope). + // Diff against `remote.sources` (also from the row) and route through + // create_version so the version chain stays consistent. + if ( + desired.sources !== undefined && + !deepEqual(desired.sources, remote.sources ?? []) + ) { + versionBound.push("sources"); + } + if ( + desired.reactionsGuidance !== undefined && + desired.reactionsGuidance !== (remote.reactions_guidance ?? "") + ) { + versionBound.push("reactions_guidance"); + } + if ( + desired.jsonTemplate !== undefined && + !deepEqual(desired.jsonTemplate, remote.json_template) + ) { + versionBound.push("json_template"); + } + if ( + desired.keyingConfig !== undefined && + !deepEqual(desired.keyingConfig, remote.keying_config ?? {}) + ) { + versionBound.push("keying_config"); + } + if ( + desired.classifiers !== undefined && + !deepEqual(desired.classifiers, remote.classifiers ?? []) + ) { + versionBound.push("classifiers"); + } + if ( + desired.condensationPrompt !== undefined && + desired.condensationPrompt !== (remote.condensation_prompt ?? "") + ) { + versionBound.push("condensation_prompt"); + } + if ( + desired.condensationWindowCount !== undefined && + desired.condensationWindowCount !== + (remote.condensation_window_count ?? undefined) + ) { + versionBound.push("condensation_window_count"); + } + + const changed = [...scalar, ...versionBound]; + if (reactionScriptDeclared) changed.push("reaction_script"); + if (changed.length === 0) { + return { kind: "watcher", verb: "noop", id: desired.slug, desired, remote }; } return { kind: "watcher", - verb: "noop", + verb: "update", id: desired.slug, desired, remote, + changedFields: changed, + ...(versionBound.length > 0 ? { versionBoundFields: versionBound } : {}), + ...(reactionScriptDeclared ? { reactionScriptDeclared: true } : {}), }; } @@ -581,6 +719,11 @@ function diffConnection( name: "config", changed: (d, r) => !deepEqual(d.config ?? {}, r.config ?? {}), }, + { + name: "device_worker_id", + changed: (d, r) => + (d.deviceWorkerId ?? null) !== (r.device_worker_id ?? null), + }, ], }) as ConnectionDiffRow; } diff --git a/packages/cli/src/commands/_lib/export/__tests__/export-cmd.test.ts b/packages/cli/src/commands/_lib/export/__tests__/export-cmd.test.ts new file mode 100644 index 000000000..bb5bafa6f --- /dev/null +++ b/packages/cli/src/commands/_lib/export/__tests__/export-cmd.test.ts @@ -0,0 +1,334 @@ +/** + * Tests for `lobu export`. + * + * Covers the canonical round-trip: server state → YAML + sibling reaction- + * script `.ts` files that `lobu apply` can read back. Network is stubbed + * through a fetch impl that returns the canned responses listWatchers / + * listEntityTypes / etc. would normally produce. The CLI's auth resolution + * still runs, so we set the right env vars to avoid hitting the keyring. + */ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; +import { mkdir } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { parse as parseYaml, parseAllDocuments } from "yaml"; +import { exportCommand } from "../export-cmd.js"; + +const tempDirs: string[] = []; + +afterEach(() => { + while (tempDirs.length > 0) { + const d = tempDirs.pop(); + if (d) rmSync(d, { recursive: true, force: true }); + } +}); + +function mkTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), "lobu-export-")); + tempDirs.push(dir); + return dir; +} + +function buildFetch(routes: Record unknown>): typeof fetch { + return (async (input: RequestInfo | URL, _init?: RequestInit) => { + const url = String(input); + for (const [pattern, handler] of Object.entries(routes)) { + if (url.includes(pattern)) { + return new Response(JSON.stringify(handler()), { status: 200 }); + } + } + throw new Error(`unexpected fetch: ${url}`); + }) as typeof fetch; +} + +const ORIG_ENV: Record = {}; + +beforeEach(() => { + for (const key of [ + "LOBU_API_URL", + "LOBU_TOKEN", + "LOBU_ORG", + "LOBU_CONTEXT_DIR", + ]) { + ORIG_ENV[key] = process.env[key]; + } + process.env.LOBU_API_URL = "https://example.test"; + process.env.LOBU_TOKEN = "test-token"; + process.env.LOBU_ORG = "acme"; +}); + +afterEach(() => { + for (const [key, val] of Object.entries(ORIG_ENV)) { + if (val === undefined) delete process.env[key]; + else process.env[key] = val; + } +}); + +describe("lobu export", () => { + test("writes models bundle with entity / relationship / watcher docs", async () => { + const out = mkTempDir(); + const fetchImpl = buildFetch({ + manage_entity_schema: () => ({ + entity_types: [ + { + slug: "lead", + name: "Lead", + description: "A sales lead", + required: ["stage"], + properties: { stage: { type: "string" } }, + }, + ], + relationship_types: [ + { + slug: "converted-to", + name: "Converted To", + description: "Lead → Pilot", + rules: [{ source: "lead", target: "pilot" }], + }, + ], + }), + "watchers?watcher_id": () => ({ + watcher: { reaction_script: null, description: null }, + }), + "watchers?include_details": () => ({ + watchers: [ + { + slug: "weekly-digest", + watcher_id: "1", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + sources: [{ name: "content", query: "SELECT * FROM events" }], + tags: ["crm"], + device_worker_id: null, + notification_priority: "normal", + notification_channel: "canvas", + min_cooldown_seconds: 0, + }, + ], + }), + auth_profiles: () => ({ auth_profiles: [] }), + manage_connections: () => ({ connections: [] }), + }); + + await exportCommand({ + cwd: out, + out, + fetchImpl, + only: "models", + }); + + const bundleRaw = readFileSync( + join(out, "models", "exported.yaml"), + "utf-8" + ); + const bundle = parseYaml(bundleRaw) as { + version: number; + entities?: unknown[]; + relationships?: unknown[]; + watchers?: Array>; + }; + expect(bundle.version).toBe(2); + expect(bundle.entities?.length).toBe(1); + expect(bundle.relationships?.length).toBe(1); + expect(bundle.watchers?.length).toBe(1); + const watcher = bundle.watchers?.[0]!; + expect(watcher.slug).toBe("weekly-digest"); + expect(watcher.agent).toBe("triage"); + expect(watcher.prompt).toBe("Produce a digest."); + expect(watcher.tags).toEqual(["crm"]); + // Default scalar values are omitted from the exported YAML so the file + // stays minimal — assert they don't appear unless overridden. + expect(watcher.notification_priority).toBeUndefined(); + expect(watcher.notification_channel).toBeUndefined(); + expect(watcher.min_cooldown_seconds).toBeUndefined(); + }); + + test("watcher with reaction_script → writes sibling .ts and references it", async () => { + const out = mkTempDir(); + const fetchImpl = buildFetch({ + manage_entity_schema: () => ({ + entity_types: [], + relationship_types: [], + }), + "watchers?watcher_id": () => ({ + watcher: { + reaction_script: "export default async (ctx, client) => {};\n", + description: null, + }, + }), + "watchers?include_details": () => ({ + watchers: [ + { + slug: "with-reaction", + watcher_id: "42", + agent_id: "triage", + prompt: "Work.", + extraction_schema: { type: "object" }, + }, + ], + }), + }); + + await exportCommand({ + cwd: out, + out, + fetchImpl, + only: "models", + }); + + const reactionBody = readFileSync( + join(out, "models", "reactions", "with-reaction.reaction.ts"), + "utf-8" + ); + expect(reactionBody).toContain("export default async"); + + const bundle = parseYaml( + readFileSync(join(out, "models", "exported.yaml"), "utf-8") + ) as { watchers: Array> }; + expect(bundle.watchers[0]?.reaction_script).toBe( + "./reactions/with-reaction.reaction.ts" + ); + }); + + test("connections export writes type:connection + type:auth_profile docs (creds redacted)", async () => { + const out = mkTempDir(); + const fetchImpl = buildFetch({ + manage_entity_schema: () => ({ + entity_types: [], + relationship_types: [], + }), + "watchers?include_details": () => ({ watchers: [] }), + auth_profiles: () => ({ + auth_profiles: [ + { + slug: "gh-token", + display_name: "GitHub Token", + connector_key: "github", + profile_kind: "env", + status: "active", + }, + ], + }), + manage_connections: () => ({ + connections: [ + { + id: 7, + slug: "gh-main", + connector_key: "github", + display_name: "GitHub main", + status: "active", + auth_profile_slug: "gh-token", + config: { repo: "lobu-ai/lobu" }, + device_worker_id: null, + }, + ], + }), + manage_feeds: () => ({ feeds: [] }), + }); + + await exportCommand({ cwd: out, out, fetchImpl }); + + const raw = readFileSync(join(out, "connectors", "exported.yaml"), "utf-8"); + const docs = parseAllDocuments(raw) + .map((d) => d.toJSON()) + .filter((d) => d !== null); + expect(docs.length).toBe(2); + const profileDoc = docs.find((d) => d.type === "auth_profile"); + const connDoc = docs.find((d) => d.type === "connection"); + expect(profileDoc?.slug).toBe("gh-token"); + expect(profileDoc?.kind).toBe("env"); + // Credentials must never appear in the exported doc — the server doesn't + // expose them, and even if it did the CLI should never emit them. + expect("credentials" in (profileDoc ?? {})).toBe(false); + expect(connDoc?.slug).toBe("gh-main"); + expect(connDoc?.connector).toBe("github"); + expect(connDoc?.auth).toBe("gh-token"); + expect(connDoc?.config).toEqual({ repo: "lobu-ai/lobu" }); + }); + + test("skips reaction file when it already exists AND omits the YAML reference", async () => { + // Regression: previously, export would skip overwriting an existing + // local reaction file but still emit `reaction_script: ./reactions/...` + // in the YAML — re-applying would then upload whatever stale code was + // on disk instead of the server's actual script. Now the reference is + // dropped when we don't overwrite, and a warning is printed. + const out = mkTempDir(); + await mkdir(join(out, "models", "reactions"), { recursive: true }); + const localScript = "// stale local version\nexport default async () => {};\n"; + writeFileSync( + join(out, "models", "reactions", "with-reaction.reaction.ts"), + localScript, + ); + + const fetchImpl = buildFetch({ + manage_entity_schema: () => ({ + entity_types: [], + relationship_types: [], + }), + "watchers?watcher_id": () => ({ + watcher: { + reaction_script: "export default async () => 'NEW SERVER VERSION';\n", + description: null, + }, + }), + "watchers?include_details": () => ({ + watchers: [ + { + slug: "with-reaction", + watcher_id: "42", + agent_id: "triage", + prompt: "Work.", + extraction_schema: { type: "object" }, + }, + ], + }), + }); + + await exportCommand({ cwd: out, out, fetchImpl, only: "models" }); + + // Local script is untouched. + expect( + readFileSync( + join(out, "models", "reactions", "with-reaction.reaction.ts"), + "utf-8", + ), + ).toBe(localScript); + + // YAML does NOT reference reaction_script. + const bundle = parseYaml( + readFileSync(join(out, "models", "exported.yaml"), "utf-8"), + ) as { watchers: Array> }; + expect(bundle.watchers[0]?.reaction_script).toBeUndefined(); + }); + + test("does not clobber existing files unless --force", async () => { + const out = mkTempDir(); + await mkdir(join(out, "models"), { recursive: true }); + const bundlePath = join(out, "models", "exported.yaml"); + writeFileSync(bundlePath, "pre-existing\n"); + + const fetchImpl = buildFetch({ + manage_entity_schema: () => ({ + entity_types: [], + relationship_types: [], + }), + "watchers?include_details": () => ({ watchers: [] }), + }); + + await exportCommand({ cwd: out, out, fetchImpl, only: "models" }); + expect(readFileSync(bundlePath, "utf-8")).toBe("pre-existing\n"); + + await exportCommand({ + cwd: out, + out, + fetchImpl, + only: "models", + force: true, + }); + expect(readFileSync(bundlePath, "utf-8")).not.toBe("pre-existing\n"); + }); +}); diff --git a/packages/cli/src/commands/_lib/export/export-cmd.ts b/packages/cli/src/commands/_lib/export/export-cmd.ts new file mode 100644 index 000000000..6adad3997 --- /dev/null +++ b/packages/cli/src/commands/_lib/export/export-cmd.ts @@ -0,0 +1,430 @@ +/** + * `lobu export` — pull server state into apply-compatible files. + * + * Round-trips with `lobu apply`: every file written here is a valid input for + * a future apply on the same (or another) org. Scope is intentionally narrow: + * memory models (entity types, relationship types, watchers including + * reaction scripts as sibling `.ts` files) and connectors (connections + + * auth_profile placeholders). Agent config / lobu.toml / SOUL.md aren't + * exported — those are author-time files that don't get edited in the UI. + * + * The default destination is `/models/exported.yaml` + + * `/connectors/exported.yaml`. Both write paths are skipped when the + * file exists unless `--force` is passed. + */ + +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { existsSync } from "node:fs"; +import { join, resolve } from "node:path"; +import chalk from "chalk"; +import { stringify as stringifyYaml } from "yaml"; +import { resolveApplyClient } from "../apply/client.js"; +import type { + ApplyClient, + RemoteAuthProfile, + RemoteConnection, + RemoteEntityType, + RemoteFeed, + RemoteRelationshipType, + RemoteWatcher, +} from "../apply/client.js"; +import { printText } from "../../memory/_lib/output.js"; + +export interface ExportOptions { + cwd?: string; + /** Override the destination directory (defaults to `cwd`). */ + out?: string; + /** Allow overwriting existing exported files. */ + force?: boolean; + /** Org slug (defaults to active session). */ + org?: string; + /** Server URL override. */ + url?: string; + /** Restrict to one resource family. */ + only?: "models" | "connectors"; + /** Test seam — inject fetch. */ + fetchImpl?: typeof fetch; +} + +interface ExportedFile { + /** Relative path under `out`. */ + path: string; + /** Body to write. */ + body: string; + /** Did we skip (existing file, no --force)? */ + skipped?: boolean; +} + +// ── Helpers ──────────────────────────────────────────────────────────────── + +function entityTypeDoc(e: RemoteEntityType): Record { + const out: Record = { + slug: e.slug, + ...(e.name ? { name: e.name } : {}), + ...(e.description ? { description: e.description } : {}), + }; + if ( + (e.required && e.required.length > 0) || + (e.properties && Object.keys(e.properties).length > 0) + ) { + const metadata: Record = {}; + if (e.required?.length) metadata.required = e.required; + if (e.properties && Object.keys(e.properties).length > 0) { + metadata.properties = e.properties; + } + out.metadata_schema = metadata; + } + return out; +} + +function relationshipTypeDoc( + r: RemoteRelationshipType +): Record { + const out: Record = { + slug: r.slug, + ...(r.name ? { name: r.name } : {}), + ...(r.description ? { description: r.description } : {}), + }; + if (r.rules?.length) out.rules = r.rules; + return out; +} + +function watcherDoc( + w: RemoteWatcher, + reactionScriptRelPath: string | undefined +): Record { + const out: Record = { + slug: w.slug, + ...(w.name ? { name: w.name } : {}), + ...(w.agent_id ? { agent: w.agent_id } : {}), + ...(w.description ? { description: w.description } : {}), + }; + if (w.schedule) out.schedule = w.schedule; + if (w.prompt) out.prompt = w.prompt; + if (w.extraction_schema && Object.keys(w.extraction_schema).length > 0) { + out.extraction_schema = w.extraction_schema; + } + if (w.sources?.length) out.sources = w.sources; + if (w.reactions_guidance) out.reactions_guidance = w.reactions_guidance; + if (reactionScriptRelPath) out.reaction_script = reactionScriptRelPath; + if (w.device_worker_id) out.device_worker_id = w.device_worker_id; + if (w.scheduler_client_id) out.scheduler_client_id = w.scheduler_client_id; + if (w.notification_channel && w.notification_channel !== "canvas") { + out.notification_channel = w.notification_channel; + } + if (w.notification_priority && w.notification_priority !== "normal") { + out.notification_priority = w.notification_priority; + } + if ( + w.min_cooldown_seconds !== undefined && + w.min_cooldown_seconds !== null && + w.min_cooldown_seconds !== 0 + ) { + out.min_cooldown_seconds = w.min_cooldown_seconds; + } + if (w.tags?.length) out.tags = w.tags; + if (w.agent_kind) out.agent_kind = w.agent_kind; + if (w.json_template) out.json_template = w.json_template; + if (w.keying_config && Object.keys(w.keying_config).length > 0) { + out.keying_config = w.keying_config; + } + if (w.classifiers?.length) out.classifiers = w.classifiers; + if (w.condensation_prompt) out.condensation_prompt = w.condensation_prompt; + if (w.condensation_window_count) { + out.condensation_window_count = w.condensation_window_count; + } + return out; +} + +function connectionDoc( + c: RemoteConnection, + feeds: RemoteFeed[] +): Record { + const out: Record = { + version: 1, + type: "connection", + slug: c.slug, + connector: c.connector_key, + ...(c.display_name ? { name: c.display_name } : {}), + ...(c.auth_profile_slug ? { auth: c.auth_profile_slug } : {}), + ...(c.app_auth_profile_slug ? { app_auth: c.app_auth_profile_slug } : {}), + }; + if (c.config && Object.keys(c.config).length > 0) out.config = c.config; + if (c.device_worker_id) out.device_worker_id = c.device_worker_id; + if (feeds.length > 0) { + out.feeds = feeds.map((f) => { + const feed: Record = { feed: f.feed_key }; + if (f.display_name) feed.name = f.display_name; + if (f.schedule) feed.schedule = f.schedule; + if (f.config && Object.keys(f.config).length > 0) feed.config = f.config; + return feed; + }); + } + return out; +} + +function authProfileDoc(p: RemoteAuthProfile): Record { + // We never export credentials — they're write-only on the server, and we + // mustn't emit literal secrets to disk. Operators fill credentials back in + // (typically via `$ENV` refs) before re-applying. + return { + version: 1, + type: "auth_profile", + slug: p.slug, + connector: p.connector_key, + kind: profileKindForExport(p.profile_kind), + ...(p.display_name ? { name: p.display_name } : {}), + }; +} + +function profileKindForExport(kind: string): string { + // Server returns its canonical kind; CLI consumes the same names. No mapping + // needed today — this stub exists to keep one place to centralize any + // future divergence. + return kind; +} + +async function loadFeedsByConnection( + client: ApplyClient, + connections: RemoteConnection[] +): Promise> { + const out = new Map(); + for (const conn of connections) { + out.set(conn.id, await client.listFeeds(conn.id)); + } + return out; +} + +/** Write a file if it doesn't exist, or overwrite when `force`. */ +async function writeIfFreeOrForced( + absPath: string, + body: string, + force: boolean +): Promise<{ skipped: boolean }> { + if (existsSync(absPath) && !force) { + return { skipped: true }; + } + await mkdir(resolve(absPath, ".."), { recursive: true }); + await writeFile(absPath, body, "utf-8"); + return { skipped: false }; +} + +// ── Multi-doc YAML helpers ───────────────────────────────────────────────── + +function modelBundleYaml( + entityTypes: RemoteEntityType[], + relationshipTypes: RemoteRelationshipType[], + watchers: Array<{ + watcher: RemoteWatcher; + reactionScriptRelPath: string | undefined; + }> +): string { + // Single dbt-style bundle so apply's loader handles it. Empty sections are + // omitted to keep the file tidy. + const bundle: Record = { version: 2 }; + if (entityTypes.length > 0) { + bundle.entities = entityTypes.map(entityTypeDoc); + } + if (relationshipTypes.length > 0) { + bundle.relationships = relationshipTypes.map(relationshipTypeDoc); + } + if (watchers.length > 0) { + bundle.watchers = watchers.map(({ watcher, reactionScriptRelPath }) => + watcherDoc(watcher, reactionScriptRelPath) + ); + } + return stringifyYaml(bundle, { lineWidth: 0, blockQuote: "literal" }); +} + +function connectorBundleYaml( + connections: RemoteConnection[], + feedsByConnection: Map, + authProfiles: RemoteAuthProfile[] +): string { + // Multi-document YAML stream — one doc per connection / auth_profile, the + // shape `loadConnectors` already understands. + const docs: Record[] = []; + for (const p of authProfiles) docs.push(authProfileDoc(p)); + for (const c of connections) { + docs.push(connectionDoc(c, feedsByConnection.get(c.id) ?? [])); + } + return docs + .map((doc) => stringifyYaml(doc, { lineWidth: 0, blockQuote: "literal" })) + .join("---\n"); +} + +// ── Top-level ────────────────────────────────────────────────────────────── + +export async function exportCommand(opts: ExportOptions = {}): Promise { + const cwd = opts.cwd ?? process.cwd(); + const out = resolve(cwd, opts.out ?? "."); + const force = !!opts.force; + + const { client, orgSlug } = await resolveApplyClient({ + url: opts.url, + org: opts.org, + fetchImpl: opts.fetchImpl, + }); + printText(chalk.dim(`Exporting from org: ${orgSlug}`)); + printText(chalk.dim(`Destination: ${out}`)); + + const wantModels = !opts.only || opts.only === "models"; + const wantConnectors = !opts.only || opts.only === "connectors"; + + const written: ExportedFile[] = []; + + if (wantModels) { + const [entityTypes, relationshipTypes, watchers] = await Promise.all([ + client.listEntityTypes(), + client.listRelationshipTypes(), + client.listWatchers(), + ]); + + // Reaction scripts aren't on the list response — fetch each watcher's + // detail to pick up `reaction_script`. Sequential to keep load on the + // server bounded; a per-watcher GET is cheap. + const withReactions: Array<{ + watcher: RemoteWatcher; + reactionScriptRelPath: string | undefined; + }> = []; + for (const w of watchers) { + let reactionScriptRelPath: string | undefined; + if (w.watcher_id) { + const detail = await client.getWatcherDetail(w.watcher_id); + const script = detail?.reaction_script ?? null; + if (script) { + // Defensive slug sanitization — the watcher slug is used as a + // filesystem path component below. The server's slug constraint + // should already keep this safe, but a stale or corrupted row could + // contain `..` or `/`. Reject anything that isn't a tight basename. + const safeSlug = String(w.slug); + if (!/^[a-z0-9][a-z0-9-]{0,62}$/.test(safeSlug)) { + printText( + chalk.yellow( + ` ⚠ skipping reaction script for watcher slug "${safeSlug}" — slug is not a safe filename basename; export the watcher manually if needed.` + ) + ); + } else { + const rel = `reactions/${safeSlug}.reaction.ts`; + const abs = join(out, "models", rel); + const res = await writeIfFreeOrForced(abs, script, force); + // When the local file exists and --force isn't set, don't emit a + // `reaction_script:` reference — re-applying would otherwise + // upload whatever stale code happens to be on disk, masking the + // server's actual script. Loudly warn so the operator notices. + if (res.skipped) { + printText( + chalk.yellow( + ` ⚠ keeping existing ${rel}; YAML will NOT reference the server script (re-run with --force to overwrite and re-link).` + ) + ); + written.push({ + path: join("models", rel), + body: script, + skipped: true, + }); + } else { + written.push({ + path: join("models", rel), + body: script, + }); + reactionScriptRelPath = `./${rel}`; + } + } + } + if (detail?.description && !w.description) { + w.description = detail.description; + } + } + withReactions.push({ watcher: w, reactionScriptRelPath }); + } + + const bundleBody = modelBundleYaml( + entityTypes, + relationshipTypes, + withReactions + ); + const bundlePath = join(out, "models", "exported.yaml"); + const res = await writeIfFreeOrForced(bundlePath, bundleBody, force); + written.push({ + path: join("models", "exported.yaml"), + body: bundleBody, + ...(res.skipped ? { skipped: true } : {}), + }); + } + + if (wantConnectors) { + const [authProfiles, connections] = await Promise.all([ + client.listAuthProfiles(), + client.listConnections(), + ]); + if (authProfiles.length > 0 || connections.length > 0) { + const feedsByConnection = await loadFeedsByConnection( + client, + connections + ); + const body = connectorBundleYaml( + connections, + feedsByConnection, + authProfiles + ); + const path = join(out, "connectors", "exported.yaml"); + const res = await writeIfFreeOrForced(path, body, force); + written.push({ + path: join("connectors", "exported.yaml"), + body, + ...(res.skipped ? { skipped: true } : {}), + }); + } + } + + // ── Report ──────────────────────────────────────────────────────────────── + if (written.length === 0) { + printText( + chalk.dim( + "\nNothing to export — server has no models or connectors for this org." + ) + ); + return; + } + const skipped = written.filter((w) => w.skipped); + const wrote = written.filter((w) => !w.skipped); + if (wrote.length > 0) { + printText(chalk.bold("\nWrote:")); + for (const w of wrote) printText(` ${chalk.green("+")} ${w.path}`); + } + if (skipped.length > 0) { + printText( + chalk.bold("\nSkipped (file exists — use --force to overwrite):") + ); + for (const w of skipped) printText(` ${chalk.yellow("·")} ${w.path}`); + } + // Auth profiles are kind-only — flag any oauth_app/env profile in the export + // so the operator knows they need to re-add `credentials:` (or `$ENV` refs) + // before the next apply. Quick re-read of the file to find their slugs is + // overkill; instead print the list directly from what we exported. + if (wantConnectors) { + const authProfiles = await client.listAuthProfiles(); + const credentialed = authProfiles.filter( + (p) => p.profile_kind === "env" || p.profile_kind === "oauth_app" + ); + if (credentialed.length > 0) { + printText( + chalk.bold( + "\nNote — credentials are write-only on the server, not exported:" + ) + ); + for (const p of credentialed) { + printText( + ` ${chalk.yellow("·")} auth_profile "${p.slug}" (${p.profile_kind}) — re-add \`credentials:\` (typically with \`$ENV\` refs) before applying.` + ); + } + } + } +} + +// Stub kept for the future if we want to validate the export against the +// loaded desired-state. Today exported files round-trip through +// `loadDesiredState` directly because they share the same schema, so we don't +// re-validate here. Imports kept to satisfy the bundler if this lib grows. +export const __exportInternals = { readFile }; diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index dca6d18c6..0a9556385 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -343,6 +343,55 @@ Memory: } ); + // ─── export ───────────────────────────────────────────────────────── + program + .command("export") + .description( + "Pull memory schema + connectors from the org into apply-compatible files" + ) + .option( + "--out ", + "Destination directory (defaults to cwd; creates models/, connectors/)" + ) + .option("--force", "Overwrite existing models/connectors files") + .option("--org ", "Org slug override (defaults to active session)") + .option("--url ", "Server URL override") + .option( + "--only ", + "Restrict to one resource family: 'models' | 'connectors'" + ) + .action( + async (options: { + out?: string; + force?: boolean; + org?: string; + url?: string; + only?: string; + }) => { + if ( + options.only !== undefined && + options.only !== "models" && + options.only !== "connectors" + ) { + console.error( + chalk.red("\n Error:"), + `--only must be 'models' or 'connectors' (got: ${options.only})` + ); + process.exit(2); + } + const { exportCommand } = await import( + "./commands/_lib/export/export-cmd.js" + ); + await exportCommand({ + out: options.out, + force: options.force, + org: options.org, + url: options.url, + only: options.only as "models" | "connectors" | undefined, + }); + } + ); + // ─── run / dev / start ────────────────────────────────────────────── program .command("run") diff --git a/packages/server/package.json b/packages/server/package.json index 55c7deb75..7978c37ef 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -9,8 +9,8 @@ "node": ">=22 <25" }, "scripts": { - "dev": "tsx watch --ignore=../web/** --ignore=../../node_modules/** src/server.ts", - "dev:local": "tsx watch --ignore=../web/** --ignore=../../node_modules/** src/start-local.ts", + "dev": "tsx watch --ignore=../web/** --ignore=../owletto/** --ignore=../../node_modules/** src/server.ts", + "dev:local": "tsx watch --ignore=../web/** --ignore=../owletto/** --ignore=../../node_modules/** src/start-local.ts", "start": "tsx src/server.ts", "build:server": "node ./scripts/build-server-bundle.mjs", "test": "vitest", diff --git a/packages/server/src/tools/admin/manage_watchers.ts b/packages/server/src/tools/admin/manage_watchers.ts index 8c3269d66..332312e19 100644 --- a/packages/server/src/tools/admin/manage_watchers.ts +++ b/packages/server/src/tools/admin/manage_watchers.ts @@ -2297,7 +2297,13 @@ async function handleList( i.scheduler_client_id, i.model_config, i.sources, - i.tags, + -- text[] is returned as the Postgres array literal "{a,b}" by PGlite's + -- TCP socket; wrap in to_jsonb so clients get a real JSON array. + to_jsonb(i.tags) AS tags, + i.notification_channel, + i.notification_priority, + i.min_cooldown_seconds, + i.agent_kind, i.watcher_group_id, i.source_watcher_id, wr.id as watcher_run_id, diff --git a/packages/server/src/utils/table-schema.ts b/packages/server/src/utils/table-schema.ts index f139380a6..de6853ed4 100644 --- a/packages/server/src/utils/table-schema.ts +++ b/packages/server/src/utils/table-schema.ts @@ -141,7 +141,16 @@ export const QUERYABLE_SCHEMA = { 'reaction_script_compiled', 'connection_id', 'source_watcher_id', - 'watcher_group_id' + 'watcher_group_id', + // Scalar columns added in earlier features (device pinning, notification + // routing, run rate-limiting) that were missing from this list — drift + // test caught it. + 'device_worker_id', + 'agent_kind', + 'notification_channel', + 'notification_priority', + 'min_cooldown_seconds', + 'last_fired_at' ), }, // event_classifications