diff --git a/db/migrations/20260518000000_pending_interactions.sql b/db/migrations/20260518000000_pending_interactions.sql new file mode 100644 index 000000000..b51ecae10 --- /dev/null +++ b/db/migrations/20260518000000_pending_interactions.sql @@ -0,0 +1,49 @@ +-- migrate:up + +-- Per-question state for the chat-interaction bridge — moved out of the +-- gateway's in-process Map so a button click that lands on pod B can claim +-- a question registered on pod A. The bridge keeps a small per-pod cache for +-- the platform `SentMessage` (used to edit the original card on click) since +-- that's a non-serializable SDK handle; everything that matters for routing +-- the click back into the worker (PostedQuestion + connection context) lives +-- here. +-- +-- The claim path scopes by `(id, organization_id, connection_id, +-- expected_user_id)` — keying by `id` alone would let a click from one +-- connection or one user consume a question registered for another. The +-- columns are NOT NULL so the SQL claim is a single index hit with no +-- branching for NULL semantics. + +CREATE TABLE public.pending_interactions ( + id text PRIMARY KEY, + organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, + connection_id text NOT NULL, + expected_user_id text NOT NULL, + entry_payload jsonb NOT NULL, + created_at timestamp with time zone NOT NULL DEFAULT now(), + claimed_at timestamp with time zone +); + +-- Claim path is +-- UPDATE pending_interactions +-- SET claimed_at = now() +-- WHERE id = $1 +-- AND organization_id = $2 +-- AND connection_id = $3 +-- AND expected_user_id = $4 +-- AND claimed_at IS NULL +-- RETURNING entry_payload +-- — a partial index on the unclaimed predicate keeps the lookup index-only. +CREATE INDEX idx_pending_interactions_unclaimed + ON public.pending_interactions (id, organization_id, connection_id, expected_user_id) + WHERE claimed_at IS NULL; + +-- Background sweeper drops rows older than 24h; index keeps that scan cheap. +CREATE INDEX idx_pending_interactions_created_at + ON public.pending_interactions (created_at); + +-- migrate:down + +DROP INDEX IF EXISTS public.idx_pending_interactions_created_at; +DROP INDEX IF EXISTS public.idx_pending_interactions_unclaimed; +DROP TABLE IF EXISTS public.pending_interactions; diff --git a/db/schema.sql b/db/schema.sql index a5945e7ce..b803be343 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -1419,6 +1419,20 @@ CREATE TABLE public.organization_lobu_links ( updated_at timestamp with time zone DEFAULT now() NOT NULL ); +-- +-- Name: pending_interactions; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.pending_interactions ( + id text NOT NULL, + organization_id text NOT NULL, + connection_id text NOT NULL, + expected_user_id text NOT NULL, + entry_payload jsonb NOT NULL, + created_at timestamp with time zone DEFAULT now() NOT NULL, + claimed_at timestamp with time zone +); + -- -- Name: personal_access_tokens; Type: TABLE; Schema: public; Owner: - -- @@ -2633,6 +2647,13 @@ ALTER TABLE ONLY public.organization ALTER TABLE ONLY public.organization ADD CONSTRAINT organization_slug_key UNIQUE (slug); +-- +-- Name: pending_interactions pending_interactions_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.pending_interactions + ADD CONSTRAINT pending_interactions_pkey PRIMARY KEY (id); + -- -- Name: personal_access_tokens personal_access_tokens_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -3613,6 +3634,18 @@ CREATE INDEX idx_notification_targets_user_all ON public.notification_targets US CREATE INDEX idx_notification_targets_user_unread ON public.notification_targets USING btree (user_id, delivered_at DESC) WHERE (read_at IS NULL); +-- +-- Name: idx_pending_interactions_created_at; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_pending_interactions_created_at ON public.pending_interactions USING btree (created_at); + +-- +-- Name: idx_pending_interactions_unclaimed; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_pending_interactions_unclaimed ON public.pending_interactions USING btree (id, organization_id, connection_id, expected_user_id) WHERE (claimed_at IS NULL); + -- -- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: - -- @@ -4768,6 +4801,13 @@ ALTER TABLE ONLY public.organization_lobu_links ALTER TABLE ONLY public.organization_lobu_links ADD CONSTRAINT organization_lobu_links_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; +-- +-- Name: pending_interactions pending_interactions_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.pending_interactions + ADD CONSTRAINT pending_interactions_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; + -- -- Name: personal_access_tokens personal_access_tokens_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -5030,4 +5070,5 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260517050000'), ('20260517060000'), ('20260517150000'), - ('20260517160000'); + ('20260517160000'), + ('20260518000000'); diff --git a/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts b/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts index 549d50d10..e8d79393c 100644 --- a/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts +++ b/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts @@ -10,983 +10,983 @@ import { renderPlan, renderSummary } from "../render.js"; chalk.level = 0; function buildDesiredAgent( - agentId: string, - overrides: Partial = {}, + agentId: string, + overrides: Partial = {} ): DesiredAgent { - return { - metadata: { agentId, name: agentId, description: undefined }, - settings: {}, - platforms: [], - ...overrides, - }; + return { + metadata: { agentId, name: agentId, description: undefined }, + settings: {}, + platforms: [], + ...overrides, + }; } function buildState( - agents: DesiredAgent[], - overrides: Partial = {}, + agents: DesiredAgent[], + overrides: Partial = {} ): DesiredState { - return { - agents, - memorySchema: { entityTypes: [], relationshipTypes: [] }, - watchers: [], - connectors: { definitions: [], authProfiles: [], connections: [] }, - requiredSecrets: [], - ...overrides, - }; + return { + agents, + memorySchema: { entityTypes: [], relationshipTypes: [] }, + watchers: [], + connectors: { definitions: [], authProfiles: [], connections: [] }, + requiredSecrets: [], + ...overrides, + }; } function emptyRemote(): RemoteSnapshot { - return { - agents: [], - agentSettings: new Map(), - platformsByAgent: new Map(), - entityTypes: [], - relationshipTypes: [], - watchers: [], - connectorDefinitions: [], - authProfiles: [], - connections: [], - feedsByConnectionId: new Map(), - }; + return { + agents: [], + agentSettings: new Map(), + platformsByAgent: new Map(), + entityTypes: [], + relationshipTypes: [], + watchers: [], + connectorDefinitions: [], + authProfiles: [], + connections: [], + feedsByConnectionId: new Map(), + }; } describe("apply diff — agents", () => { - test("create from empty remote", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { - agentId: "triage", - name: "Triage", - description: "Triage bot", - }, - }), - ]); - const plan = computeDiff(desired, emptyRemote()); - - expect(plan.counts).toEqual({ create: 2, update: 0, noop: 0, drift: 0 }); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("noop when remote matches desired", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.noop).toBeGreaterThan(0); - expect(plan.counts.create).toBe(0); - expect(plan.counts.update).toBe(0); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("update when name differs", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Renamed" }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Original" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.update).toBeGreaterThan(0); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("drift when remote has agent not in desired", () => { - const desired = buildState([]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "stale", name: "Stale Agent" }], - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.drift).toBe(1); - expect(renderPlan(plan)).toMatchSnapshot(); - }); + test("create from empty remote", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { + agentId: "triage", + name: "Triage", + description: "Triage bot", + }, + }), + ]); + const plan = computeDiff(desired, emptyRemote()); + + expect(plan.counts).toEqual({ create: 2, update: 0, noop: 0, drift: 0 }); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("noop when remote matches desired", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.noop).toBeGreaterThan(0); + expect(plan.counts.create).toBe(0); + expect(plan.counts.update).toBe(0); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("update when name differs", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Renamed" }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Original" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.update).toBeGreaterThan(0); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("drift when remote has agent not in desired", () => { + const desired = buildState([]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "stale", name: "Stale Agent" }], + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.drift).toBe(1); + expect(renderPlan(plan)).toMatchSnapshot(); + }); }); describe("apply diff — settings", () => { - test("update on networkConfig change", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: ["github.com"] }, - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["pypi.org"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("networkConfig"); - } - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("updates when provider declarations change but ignores installedAt churn", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - installedProviders: [ - { providerId: "anthropic", installedAt: 200 }, - { providerId: "openai", installedAt: 200 }, - ], - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - installedProviders: [{ providerId: "anthropic", installedAt: 100 }], - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("installedProviders"); - } - - const unchanged = computeDiff(desired, { - ...remote, - agentSettings: new Map([ - [ - "triage", - { - installedProviders: [ - { providerId: "anthropic", installedAt: 1 }, - { providerId: "openai", installedAt: 2 }, - ], - updatedAt: 0, - }, - ], - ]), - }); - const unchangedSettingsRow = unchanged.rows.find( - (r) => r.kind === "settings", - ); - expect(unchangedSettingsRow?.verb).toBe("noop"); - }); + test("update on networkConfig change", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: ["github.com"] }, + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["pypi.org"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("networkConfig"); + } + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("updates when provider declarations change but ignores installedAt churn", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + installedProviders: [ + { providerId: "anthropic", installedAt: 200 }, + { providerId: "openai", installedAt: 200 }, + ], + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + installedProviders: [{ providerId: "anthropic", installedAt: 100 }], + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("installedProviders"); + } + + const unchanged = computeDiff(desired, { + ...remote, + agentSettings: new Map([ + [ + "triage", + { + installedProviders: [ + { providerId: "anthropic", installedAt: 1 }, + { providerId: "openai", installedAt: 2 }, + ], + updatedAt: 0, + }, + ], + ]), + }); + const unchangedSettingsRow = unchanged.rows.find( + (r) => r.kind === "settings" + ); + expect(unchangedSettingsRow?.verb).toBe("noop"); + }); }); describe("apply diff — platforms", () => { - test("create on empty remote", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: { botToken: "abc" }, - }, - ], - }), - ]); - const plan = computeDiff(desired, emptyRemote()); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("create"); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("update with willRestart when config changes", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: { botToken: "new" }, - }, - ], - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([ - [ - "triage", - [ - { - id: "triage-telegram", - platform: "telegram", - config: { botToken: "old" }, - }, - ], - ], - ]), - }; - const plan = computeDiff(desired, remote); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("update"); - if (platformRow?.kind === "platform") { - expect(platformRow.willRestart).toBe(true); - } - expect(renderPlan(plan)).toMatchSnapshot(); - }); + test("create on empty remote", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: { botToken: "abc" }, + }, + ], + }), + ]); + const plan = computeDiff(desired, emptyRemote()); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("create"); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("update with willRestart when config changes", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: { botToken: "new" }, + }, + ], + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([ + [ + "triage", + [ + { + id: "triage-telegram", + platform: "telegram", + config: { botToken: "old" }, + }, + ], + ], + ]), + }; + const plan = computeDiff(desired, remote); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("update"); + if (platformRow?.kind === "platform") { + expect(platformRow.willRestart).toBe(true); + } + expect(renderPlan(plan)).toMatchSnapshot(); + }); }); describe("apply diff — memory schema", () => { - test("creates entity + relationship types", () => { - const desired: DesiredState = { - agents: [], - memorySchema: { - entityTypes: [{ slug: "company", name: "Company", required: ["name"] }], - relationshipTypes: [ - { - slug: "works_at", - name: "Works At", - rules: [{ source: "person", target: "company" }], - }, - ], - }, - watchers: [], - requiredSecrets: [], - }; - const plan = computeDiff(desired, emptyRemote()); - expect(plan.counts.create).toBe(2); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("noop when remote matches", () => { - const desired: DesiredState = { - agents: [], - memorySchema: { - entityTypes: [{ slug: "company", name: "Company" }], - relationshipTypes: [], - }, - watchers: [], - requiredSecrets: [], - }; - const remote: RemoteSnapshot = { - ...emptyRemote(), - entityTypes: [{ slug: "company", name: "Company" }], - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.noop).toBe(1); - expect(plan.counts.update).toBe(0); - }); + test("creates entity + relationship types", () => { + const desired: DesiredState = { + agents: [], + memorySchema: { + entityTypes: [{ slug: "company", name: "Company", required: ["name"] }], + relationshipTypes: [ + { + slug: "works_at", + name: "Works At", + rules: [{ source: "person", target: "company" }], + }, + ], + }, + watchers: [], + requiredSecrets: [], + }; + const plan = computeDiff(desired, emptyRemote()); + expect(plan.counts.create).toBe(2); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("noop when remote matches", () => { + const desired: DesiredState = { + agents: [], + memorySchema: { + entityTypes: [{ slug: "company", name: "Company" }], + relationshipTypes: [], + }, + watchers: [], + requiredSecrets: [], + }; + const remote: RemoteSnapshot = { + ...emptyRemote(), + entityTypes: [{ slug: "company", name: "Company" }], + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.noop).toBe(1); + expect(plan.counts.update).toBe(0); + }); }); describe("apply diff — empty container preservation", () => { - // Bug fix: previously canonical() collapsed [] and {} to null, which - // meant clearing a remote allowlist by setting it to [] silently - // round-tripped as a noop instead of an update. - test("clearing networkConfig.allowedDomains from non-empty to [] is an update", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: [] }, - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["foo.com"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("networkConfig"); - } - }); - - test("[] is not equal to null (preserved as distinct values)", () => { - // When desired sets allowedDomains: [] and remote has the field - // missing entirely, the diff should still treat them as equivalent - // for the case where remote literally doesn't have the field — but - // [] vs the explicit array ["foo"] must differ. - const desiredEmpty = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: [] }, - }, - }), - ]); - const remoteWithItems: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["x.com"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desiredEmpty, remoteWithItems); - expect(plan.counts.update).toBeGreaterThan(0); - }); - - test("{} is not equal to populated object", () => { - // empty config object vs populated config object must show as drift/update - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: {}, - }, - ], - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([ - [ - "triage", - [ - { - id: "triage-telegram", - platform: "telegram", - config: { botToken: "abc" }, - }, - ], - ], - ]), - }; - const plan = computeDiff(desired, remote); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("update"); - }); + // Bug fix: previously canonical() collapsed [] and {} to null, which + // meant clearing a remote allowlist by setting it to [] silently + // round-tripped as a noop instead of an update. + test("clearing networkConfig.allowedDomains from non-empty to [] is an update", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: [] }, + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["foo.com"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("networkConfig"); + } + }); + + test("[] is not equal to null (preserved as distinct values)", () => { + // When desired sets allowedDomains: [] and remote has the field + // missing entirely, the diff should still treat them as equivalent + // for the case where remote literally doesn't have the field — but + // [] vs the explicit array ["foo"] must differ. + const desiredEmpty = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: [] }, + }, + }), + ]); + const remoteWithItems: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["x.com"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desiredEmpty, remoteWithItems); + expect(plan.counts.update).toBeGreaterThan(0); + }); + + test("{} is not equal to populated object", () => { + // empty config object vs populated config object must show as drift/update + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: {}, + }, + ], + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([ + [ + "triage", + [ + { + id: "triage-telegram", + platform: "telegram", + config: { botToken: "abc" }, + }, + ], + ], + ]), + }; + const plan = computeDiff(desired, remote); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("update"); + }); }); describe("apply diff — watchers", () => { - const desiredWatcher = { - slug: "weekly-digest", - agent: "triage", - name: "Weekly digest", - prompt: "Produce a digest.", - extractionSchema: { type: "object" as const }, - schedule: "0 9 * * 1", - }; - - test("create when watcher missing remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const plan = computeDiff(desired, emptyRemote()); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("create"); - expect(row?.id).toBe("weekly-digest"); - }); - - test("noop when remote matches every field the diff covers", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Produce a digest.", - extraction_schema: { type: "object" }, - schedule: "0 9 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("noop"); - expect(plan.counts.create).toBe(0); - }); - - test("update with scalar drift when schedule changes remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Produce a digest.", - extraction_schema: { type: "object" }, - schedule: "0 10 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("update"); - expect(row?.changedFields).toContain("schedule"); - expect( - (row as { versionBoundFields?: string[] }).versionBoundFields, - ).toBeUndefined(); - }); - - test("update with version-bound drift when prompt changes remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Old prompt", - extraction_schema: { type: "object" }, - schedule: "0 9 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("update"); - expect( - (row as { versionBoundFields?: string[] }).versionBoundFields, - ).toEqual(["prompt"]); - }); - - test("reaction_script declared → always re-pushed (idempotent)", () => { - const desired = buildState([], { - watchers: [ - { - ...desiredWatcher, - reactionScript: { - sourcePath: "/abs/path/r.ts", - sourceCode: "export default async () => {};", - }, - }, - ], - }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Produce a digest.", - extraction_schema: { type: "object" }, - schedule: "0 9 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("update"); - expect(row?.changedFields).toEqual(["reaction_script"]); - expect( - (row as { reactionScriptDeclared?: boolean }).reactionScriptDeclared, - ).toBe(true); - }); - - test("drift when remote watcher not declared in models", () => { - const desired = buildState([], { watchers: [] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [{ slug: "orphan-watcher" }], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("drift"); - expect(plan.counts.drift).toBe(1); - }); + const desiredWatcher = { + slug: "weekly-digest", + agent: "triage", + name: "Weekly digest", + prompt: "Produce a digest.", + extractionSchema: { type: "object" as const }, + schedule: "0 9 * * 1", + }; + + test("create when watcher missing remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const plan = computeDiff(desired, emptyRemote()); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("create"); + expect(row?.id).toBe("weekly-digest"); + }); + + test("noop when remote matches every field the diff covers", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("noop"); + expect(plan.counts.create).toBe(0); + }); + + test("update with scalar drift when schedule changes remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 10 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect(row?.changedFields).toContain("schedule"); + expect( + (row as { versionBoundFields?: string[] }).versionBoundFields + ).toBeUndefined(); + }); + + test("update with version-bound drift when prompt changes remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Old prompt", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect( + (row as { versionBoundFields?: string[] }).versionBoundFields + ).toEqual(["prompt"]); + }); + + test("reaction_script declared → always re-pushed (idempotent)", () => { + const desired = buildState([], { + watchers: [ + { + ...desiredWatcher, + reactionScript: { + sourcePath: "/abs/path/r.ts", + sourceCode: "export default async () => {};", + }, + }, + ], + }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect(row?.changedFields).toEqual(["reaction_script"]); + expect( + (row as { reactionScriptDeclared?: boolean }).reactionScriptDeclared + ).toBe(true); + }); + + test("drift when remote watcher not declared in models", () => { + const desired = buildState([], { watchers: [] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [{ slug: "orphan-watcher" }], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("drift"); + expect(plan.counts.drift).toBe(1); + }); }); describe("renderSummary", () => { - test("renders zero-row plan", () => { - const desired = buildState([]); - const plan = computeDiff(desired, emptyRemote()); - expect(renderSummary(plan)).toMatchSnapshot(); - }); + test("renders zero-row plan", () => { + const desired = buildState([]); + const plan = computeDiff(desired, emptyRemote()); + expect(renderSummary(plan)).toMatchSnapshot(); + }); }); describe("apply diff — connectors", () => { - const builtinConnectorDef = { - key: "hackernews", - name: "Hacker News", - installed: false, - installable: true, - }; - - function connectorState() { - return buildState([], { - connectors: { - definitions: [ - { - key: "acme", - sourcePath: "/proj/connectors/acme.connector.ts", - sourceCode: "export default class {}", - sourceFile: "connectors/acme.connector.ts", - }, - ], - authProfiles: [ - { - slug: "hn-token", - connector: "hackernews", - kind: "env" as const, - name: "HN token", - credentials: { HN_TOKEN: "$HN_TOKEN" }, - sourceFile: "connectors/hackernews.yaml", - }, - { - slug: "x-account", - connector: "x", - kind: "oauth_account" as const, - sourceFile: "connectors/x.yaml", - }, - ], - connections: [ - { - slug: "hn-frontpage", - connector: "hackernews", - name: "HN front page", - authProfileSlug: "hn-token", - feeds: [{ feedKey: "stories", schedule: "0 * * * *" }], - sourceFile: "connectors/hackernews.yaml", - }, - ], - }, - }); - } - - test("create verbs for new connector def, auth profile, connection, feed", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - }); - const def = plan.rows.find((r) => r.kind === "connector-definition"); - expect(def?.verb).toBe("create"); - const authEnv = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "hn-token", - ); - expect(authEnv?.verb).toBe("create"); - const authOauth = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "x-account", - ); - expect(authOauth?.verb).toBe("create"); - expect( - authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined, - ).toBe(true); - const conn = plan.rows.find((r) => r.kind === "connection"); - expect(conn?.verb).toBe("create"); - const feed = plan.rows.find((r) => r.kind === "feed"); - expect(feed?.verb).toBe("create"); - expect(feed?.id).toBe("hn-frontpage/stories"); - }); - - test("noop when connection + feed already match remotely", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "active", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 * * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - expect(plan.rows.find((r) => r.kind === "connection")?.verb).toBe("noop"); - expect(plan.rows.find((r) => r.kind === "feed")?.verb).toBe("noop"); - expect( - plan.rows.find((r) => r.kind === "auth-profile" && r.id === "x-account") - ?.verb, - ).toBe("noop"); - }); - - test("update when feed schedule changes; needs-auth when oauth profile inactive", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "pending_auth", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 0 * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - const feed = plan.rows.find((r) => r.kind === "feed"); - expect(feed?.verb).toBe("update"); - expect(feed && "changedFields" in feed ? feed.changedFields : []).toEqual([ - "schedule", - ]); - const authOauth = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "x-account", - ); - expect( - authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined, - ).toBe(true); - }); - - test("undeclared remote connector becomes an informational note (no uninstall)", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [ - builtinConnectorDef, - { - key: "legacy", - name: "Legacy", - installed: true, - installable: false, - }, - ], - }; - const plan = computeDiff(connectorState(), remote); - expect(plan.notes.some((n) => n.includes('"legacy"'))).toBe(true); - expect( - plan.rows.some( - (r) => r.kind === "connector-definition" && r.id === "legacy", - ), - ).toBe(false); - }); - - test("connectors are skipped when --only is set", () => { - const plan = computeDiff(connectorState(), emptyRemote(), { - only: "agents", - }); - expect(plan.rows.some((r) => r.kind === "connection")).toBe(false); - expect(plan.rows.some((r) => r.kind === "connector-definition")).toBe( - false, - ); - }); - - test("render includes the connectors sections", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - }); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - // ── round-2 ────────────────────────────────────────────────────────────── - - test("connection slug bound to a different connector remotely is a hard error", () => { - expect(() => - computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - connections: [ - { - id: 9, - slug: "hn-frontpage", - connector_key: "rss", - status: "active", - auth_profile_slug: null, - app_auth_profile_slug: null, - config: {}, - }, - ], - }), - ).toThrow(/bound to connector "rss" remotely.*declares "hackernews"/); - }); - - test("auth-profile slug bound to a different kind remotely is a hard error", () => { - expect(() => - computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - connector_key: "hackernews", - profile_kind: "oauth_app", - status: "active", - }, - ], - }), - ).toThrow(/auth_profile "hn-token" is bound to hackernews\/oauth_app/); - }); - - test("credential rotation re-pushes: env profile shows update (credentials)", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - ], - }); - const row = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "hn-token", - ); - expect(row?.verb).toBe("update"); - expect(row && "changedFields" in row ? row.changedFields : []).toContain( - "credentials", - ); - }); - - test("a fully-converged remote state produces no connector create/update (except idempotent connector-def re-push)", () => { - // Build a remote snapshot that exactly mirrors connectorState(): the env - // auth profile has no declared-credential drift suppression, so it would - // re-push (update credentials). The acme connector def is installed, so it - // shows as a (no-op-on-server) "update". Everything else is noop. - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [ - { key: "hackernews", installed: false, installable: true }, - { key: "x", installed: false, installable: true }, - { key: "acme", installed: true, installable: false }, - ], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "active", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 * * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - // Only "update" rows allowed: the connector-def re-push and the - // env-credential re-push — both idempotent on the server. - const nonIdempotentChurn = plan.rows.filter( - (r) => - (r.verb === "create" || r.verb === "update") && - !(r.kind === "connector-definition") && - !(r.kind === "auth-profile" && r.id === "hn-token"), - ); - expect(nonIdempotentChurn).toEqual([]); - expect(plan.notes).toEqual([]); - }); - - test("connector-definition with an already-installed key renders as update, not create", () => { - const installedAcme = { key: "acme", installed: true, installable: false }; - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef, installedAcme], - }); - // connectorState()'s acme def has key:"acme"; it is installed remotely. - const row = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), - ); - expect(row?.verb).toBe("update"); - }); - - // ── round-4 ────────────────────────────────────────────────────────────── - - test("referenced-but-not-installed bundled connector becomes a connector-definition create row", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [ - // hackernews: installable + has a server-side source_uri, not installed - { - key: "hackernews", - installed: false, - installable: true, - source_uri: "file:///app/connectors/hackernews.ts", - }, - // x: same - { - key: "x", - installed: false, - installable: true, - source_uri: "file:///app/connectors/x.ts", - }, - ], - }); - const hn = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id === "hackernews", - ); - expect(hn?.verb).toBe("create"); - const x = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id === "x", - ); - expect(x?.verb).toBe("create"); - // acme is locally declared (sourcePath) — it still gets its own row. - expect( - plan.rows.some( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), - ), - ).toBe(true); - }); - - test("a locally-supplied connector key is NOT also a bundled-install row (no double mutation)", () => { - // Pretend "acme" is *also* in the bundled catalog with a source_uri; the - // local .connector.ts should win — no bundled row for "acme". - const state = connectorState(); - // Make a connection reference "acme" so it's in referencedConnectorKeys. - state.connectors.connections.push({ - slug: "acme-conn", - connector: "acme", - feeds: [], - sourceFile: "connectors/acme.yaml", - }); - const plan = computeDiff(state, { - ...emptyRemote(), - connectorDefinitions: [ - { - key: "acme", - installed: false, - installable: true, - source_uri: "file:///app/connectors/acme.ts", - }, - ], - }); - const acmeRows = plan.rows.filter( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), - ); - // Exactly one row — the locally-declared def — never a bundled duplicate. - expect(acmeRows).toHaveLength(1); - }); + const builtinConnectorDef = { + key: "hackernews", + name: "Hacker News", + installed: false, + installable: true, + }; + + function connectorState() { + return buildState([], { + connectors: { + definitions: [ + { + key: "acme", + sourcePath: "/proj/connectors/acme.connector.ts", + sourceCode: "export default class {}", + sourceFile: "connectors/acme.connector.ts", + }, + ], + authProfiles: [ + { + slug: "hn-token", + connector: "hackernews", + kind: "env" as const, + name: "HN token", + credentials: { HN_TOKEN: "$HN_TOKEN" }, + sourceFile: "connectors/hackernews.yaml", + }, + { + slug: "x-account", + connector: "x", + kind: "oauth_account" as const, + sourceFile: "connectors/x.yaml", + }, + ], + connections: [ + { + slug: "hn-frontpage", + connector: "hackernews", + name: "HN front page", + authProfileSlug: "hn-token", + feeds: [{ feedKey: "stories", schedule: "0 * * * *" }], + sourceFile: "connectors/hackernews.yaml", + }, + ], + }, + }); + } + + test("create verbs for new connector def, auth profile, connection, feed", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + }); + const def = plan.rows.find((r) => r.kind === "connector-definition"); + expect(def?.verb).toBe("create"); + const authEnv = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "hn-token" + ); + expect(authEnv?.verb).toBe("create"); + const authOauth = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "x-account" + ); + expect(authOauth?.verb).toBe("create"); + expect( + authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined + ).toBe(true); + const conn = plan.rows.find((r) => r.kind === "connection"); + expect(conn?.verb).toBe("create"); + const feed = plan.rows.find((r) => r.kind === "feed"); + expect(feed?.verb).toBe("create"); + expect(feed?.id).toBe("hn-frontpage/stories"); + }); + + test("noop when connection + feed already match remotely", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "active", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 * * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + expect(plan.rows.find((r) => r.kind === "connection")?.verb).toBe("noop"); + expect(plan.rows.find((r) => r.kind === "feed")?.verb).toBe("noop"); + expect( + plan.rows.find((r) => r.kind === "auth-profile" && r.id === "x-account") + ?.verb + ).toBe("noop"); + }); + + test("update when feed schedule changes; needs-auth when oauth profile inactive", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "pending_auth", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 0 * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + const feed = plan.rows.find((r) => r.kind === "feed"); + expect(feed?.verb).toBe("update"); + expect(feed && "changedFields" in feed ? feed.changedFields : []).toEqual([ + "schedule", + ]); + const authOauth = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "x-account" + ); + expect( + authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined + ).toBe(true); + }); + + test("undeclared remote connector becomes an informational note (no uninstall)", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [ + builtinConnectorDef, + { + key: "legacy", + name: "Legacy", + installed: true, + installable: false, + }, + ], + }; + const plan = computeDiff(connectorState(), remote); + expect(plan.notes.some((n) => n.includes('"legacy"'))).toBe(true); + expect( + plan.rows.some( + (r) => r.kind === "connector-definition" && r.id === "legacy" + ) + ).toBe(false); + }); + + test("connectors are skipped when --only is set", () => { + const plan = computeDiff(connectorState(), emptyRemote(), { + only: "agents", + }); + expect(plan.rows.some((r) => r.kind === "connection")).toBe(false); + expect(plan.rows.some((r) => r.kind === "connector-definition")).toBe( + false + ); + }); + + test("render includes the connectors sections", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + }); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + // ── round-2 ────────────────────────────────────────────────────────────── + + test("connection slug bound to a different connector remotely is a hard error", () => { + expect(() => + computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + connections: [ + { + id: 9, + slug: "hn-frontpage", + connector_key: "rss", + status: "active", + auth_profile_slug: null, + app_auth_profile_slug: null, + config: {}, + }, + ], + }) + ).toThrow(/bound to connector "rss" remotely.*declares "hackernews"/); + }); + + test("auth-profile slug bound to a different kind remotely is a hard error", () => { + expect(() => + computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + connector_key: "hackernews", + profile_kind: "oauth_app", + status: "active", + }, + ], + }) + ).toThrow(/auth_profile "hn-token" is bound to hackernews\/oauth_app/); + }); + + test("credential rotation re-pushes: env profile shows update (credentials)", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + ], + }); + const row = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "hn-token" + ); + expect(row?.verb).toBe("update"); + expect(row && "changedFields" in row ? row.changedFields : []).toContain( + "credentials" + ); + }); + + test("a fully-converged remote state produces no connector create/update (except idempotent connector-def re-push)", () => { + // Build a remote snapshot that exactly mirrors connectorState(): the env + // auth profile has no declared-credential drift suppression, so it would + // re-push (update credentials). The acme connector def is installed, so it + // shows as a (no-op-on-server) "update". Everything else is noop. + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [ + { key: "hackernews", installed: false, installable: true }, + { key: "x", installed: false, installable: true }, + { key: "acme", installed: true, installable: false }, + ], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "active", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 * * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + // Only "update" rows allowed: the connector-def re-push and the + // env-credential re-push — both idempotent on the server. + const nonIdempotentChurn = plan.rows.filter( + (r) => + (r.verb === "create" || r.verb === "update") && + !(r.kind === "connector-definition") && + !(r.kind === "auth-profile" && r.id === "hn-token") + ); + expect(nonIdempotentChurn).toEqual([]); + expect(plan.notes).toEqual([]); + }); + + test("connector-definition with an already-installed key renders as update, not create", () => { + const installedAcme = { key: "acme", installed: true, installable: false }; + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef, installedAcme], + }); + // connectorState()'s acme def has key:"acme"; it is installed remotely. + const row = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") + ); + expect(row?.verb).toBe("update"); + }); + + // ── round-4 ────────────────────────────────────────────────────────────── + + test("referenced-but-not-installed bundled connector becomes a connector-definition create row", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [ + // hackernews: installable + has a server-side source_uri, not installed + { + key: "hackernews", + installed: false, + installable: true, + source_uri: "file:///app/connectors/hackernews.ts", + }, + // x: same + { + key: "x", + installed: false, + installable: true, + source_uri: "file:///app/connectors/x.ts", + }, + ], + }); + const hn = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id === "hackernews" + ); + expect(hn?.verb).toBe("create"); + const x = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id === "x" + ); + expect(x?.verb).toBe("create"); + // acme is locally declared (sourcePath) — it still gets its own row. + expect( + plan.rows.some( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") + ) + ).toBe(true); + }); + + test("a locally-supplied connector key is NOT also a bundled-install row (no double mutation)", () => { + // Pretend "acme" is *also* in the bundled catalog with a source_uri; the + // local .connector.ts should win — no bundled row for "acme". + const state = connectorState(); + // Make a connection reference "acme" so it's in referencedConnectorKeys. + state.connectors.connections.push({ + slug: "acme-conn", + connector: "acme", + feeds: [], + sourceFile: "connectors/acme.yaml", + }); + const plan = computeDiff(state, { + ...emptyRemote(), + connectorDefinitions: [ + { + key: "acme", + installed: false, + installable: true, + source_uri: "file:///app/connectors/acme.ts", + }, + ], + }); + const acmeRows = plan.rows.filter( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") + ); + // Exactly one row — the locally-declared def — never a bundled duplicate. + expect(acmeRows).toHaveLength(1); + }); }); diff --git a/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts b/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts index f1bbe9158..3d573a895 100644 --- a/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts +++ b/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts @@ -47,6 +47,10 @@ function makeConnection( ): PlatformConnection { return { id: connectionId, + // Required by the bridge's per-tenant `pending_interactions` write — + // a connection without an org would be dropped before `resolveThread` + // (where `instanceChat.channel()` is asserted). + organizationId: "test-org", platform, config: { platform } as PlatformAdapterConfig, settings: {}, diff --git a/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts b/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts new file mode 100644 index 000000000..6a01550a5 --- /dev/null +++ b/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts @@ -0,0 +1,158 @@ +/** + * Tier B: claim atomicity, scoping, and sweep behavior for the PG-backed + * pending-interaction store that backs the chat interaction bridge. + * + * The store backs the bridge's `Map` + * with `public.pending_interactions`. Three properties matter: + * 1. `claimPendingQuestion` is single-winner — two concurrent claims + * with matching scope return the payload exactly once. + * 2. The scope tuple `(id, organization_id, connection_id, + * expected_user_id)` is enforced inside the SQL claim — mismatched + * org / connection / user clicks return null and DO NOT consume the + * row. These are the red→green checks that gate findings #1 (cross- + * tenant claim hole) and #3 (claim-then-auth race). + * 3. `sweepStalePendingInteractions` only deletes rows older than the + * given max-age cutoff and returns the deleted ids so the bridge + * can sync its local SentMessage cache. + */ +import { beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import { + ensurePgliteForGatewayTests, + resetTestDatabase, +} from "./helpers/db-setup.js"; +import { getDb } from "../../db/client.js"; +import { + claimPendingQuestion, + storePendingQuestion, + sweepStalePendingInteractions, +} from "../connections/pending-interaction-store.js"; +import type { PostedQuestion } from "../interactions.js"; + +const ORG_A = "org-a"; +const ORG_B = "org-b"; +const CONN_A = "conn-a"; +const CONN_B = "conn-b"; +const USER_A = "U_A"; +const USER_B = "U_B"; + +async function seedOrg(id: string): Promise { + const sql = getDb(); + await sql` + INSERT INTO organization (id, name, slug) + VALUES (${id}, ${id}, ${id}) + ON CONFLICT (id) DO NOTHING + `; +} + +function buildQuestion(id: string, userId = USER_A): PostedQuestion { + return { + id, + teamId: undefined, + channelId: "C1", + conversationId: "C1", + userId, + platform: "slack", + question: "go?", + options: ["yes", "no"], + } as PostedQuestion; +} + +describe("pending-interaction-store", () => { + beforeAll(async () => { + await ensurePgliteForGatewayTests(); + }); + beforeEach(async () => { + await resetTestDatabase(); + await seedOrg(ORG_A); + await seedOrg(ORG_B); + }); + + test("matching-scope claim returns the stored payload exactly once", async () => { + const q = buildQuestion("q-1"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const first = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(first?.question.id).toBe("q-1"); + + const second = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(second).toBeNull(); + }); + + // Finding #1 — cross-tenant claim hole. + // + // Red on the previous commit: `claimPendingQuestion(id)` was keyed by + // `id` only, so org B could consume org A's row by replaying the id. + // Green here: scope by `organization_id` blocks the claim, AND the + // row stays untouched so the rightful org can still claim it. + test("cross-tenant claim is rejected and does NOT consume the row", async () => { + const q = buildQuestion("q-cross-tenant"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const wrongOrg = await claimPendingQuestion(q.id, ORG_B, CONN_A, USER_A); + expect(wrongOrg).toBeNull(); + + const rightful = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(rightful?.question.id).toBe("q-cross-tenant"); + }); + + test("cross-connection claim is rejected and does NOT consume the row", async () => { + const q = buildQuestion("q-cross-conn"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const wrongConn = await claimPendingQuestion(q.id, ORG_A, CONN_B, USER_A); + expect(wrongConn).toBeNull(); + + const rightful = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(rightful?.question.id).toBe("q-cross-conn"); + }); + + // Finding #3 — claim-then-auth race. + // + // Red on the previous commit: the handler did `claimQuestion(id)` + // first, then compared `author.userId` against the row's userId, then + // async-restashed on mismatch. A crash between claim and restash + // permanently consumed the row until the 24h sweep. + // Green here: `expected_user_id` is part of the SQL claim, so a wrong- + // user click returns null without ever setting `claimed_at`. No + // restash is needed because no claim happens. + test("wrong-user click is rejected and does NOT consume the row", async () => { + const q = buildQuestion("q-wrong-user", USER_A); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const wrongUser = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_B); + expect(wrongUser).toBeNull(); + + const rightful = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(rightful?.question.id).toBe("q-wrong-user"); + }); + + test("sweep deletes only rows older than the cutoff and returns their ids", async () => { + const sql = getDb(); + const fresh = buildQuestion("q-fresh"); + const stale = buildQuestion("q-stale"); + await storePendingQuestion(fresh.id, ORG_A, CONN_A, USER_A, { + question: fresh, + }); + await storePendingQuestion(stale.id, ORG_A, CONN_A, USER_A, { + question: stale, + }); + + // Backdate one row past the 24h cutoff. + await sql` + UPDATE pending_interactions + SET created_at = now() - interval '48 hours' + WHERE id = ${stale.id} + `; + + const deletedIds = await sweepStalePendingInteractions(); + expect(deletedIds).toEqual(["q-stale"]); + + // Fresh row is still claimable; stale row is gone. + expect( + (await claimPendingQuestion(fresh.id, ORG_A, CONN_A, USER_A))?.question.id + ).toBe("q-fresh"); + expect( + await claimPendingQuestion(stale.id, ORG_A, CONN_A, USER_A) + ).toBeNull(); + }); +}); diff --git a/packages/server/src/gateway/connections/interaction-bridge.ts b/packages/server/src/gateway/connections/interaction-bridge.ts index c2b9f1514..d79ee130a 100644 --- a/packages/server/src/gateway/connections/interaction-bridge.ts +++ b/packages/server/src/gateway/connections/interaction-bridge.ts @@ -12,6 +12,11 @@ import type { } from "../interactions.js"; import type { GrantStore } from "../permissions/grant-store.js"; import type { ChatInstanceManager } from "./chat-instance-manager.js"; +import { + claimPendingQuestion, + storePendingQuestion, + sweepStalePendingInteractions, +} from "./pending-interaction-store.js"; import type { PlatformConnection } from "./types.js"; const logger = createLogger("chat-interaction-bridge"); @@ -190,43 +195,93 @@ export function registerInteractionBridge( return sent; } - // Tracks posted question cards + their original routing context so a click - // can (a) strip the buttons via SentMessage.edit and (b) feed the clicked - // value back through the inbound-enqueue pipeline. - const pendingQuestions = new Map(); - const pendingQuestionTimers = new Map(); - function trackQuestion(entry: PendingQuestionEntry): void { - pendingQuestions.set(entry.question.id, entry); - const timer = setTimeout(() => { - pendingQuestions.delete(entry.question.id); - pendingQuestionTimers.delete(entry.question.id); - }, 300_000); - pendingQuestionTimers.set(entry.question.id, timer); + // Pending questions are persisted in `public.pending_interactions` so a + // click landing on a different pod can still claim the entry. The local + // `pendingSentMessages` map holds the non-serializable platform + // `SentMessage` (used to strip card buttons on click) — losing it + // cross-pod is best-effort UX, not correctness. + // + // Each entry remembers `registeredAt` so the periodic sweep can evict + // stale handles that match the 24h DB-row TTL. Without this the Map would + // grow unbounded for questions that are never clicked. The sweep also + // removes the local handle for any row the DB sweeper actually deleted, + // so the two stay in sync. + const PENDING_SENT_TTL_MS = 24 * 60 * 60 * 1000; + const PENDING_SENT_SWEEP_INTERVAL_MS = 60 * 60 * 1000; + interface CachedSent { + sent: SentMessage; + registeredAt: number; } - function claimQuestion(questionId: string): PendingQuestionEntry | undefined { - const entry = pendingQuestions.get(questionId); - pendingQuestions.delete(questionId); - const timer = pendingQuestionTimers.get(questionId); - if (timer) { - clearTimeout(timer); - pendingQuestionTimers.delete(questionId); + const pendingSentMessages = new Map(); + const pendingSentSweepTimer = setInterval(() => { + sweepPendingSent().catch((error) => { + logger.warn( + { connectionId, error: String(error) }, + "pendingSentMessages sweep failed" + ); + }); + }, PENDING_SENT_SWEEP_INTERVAL_MS); + pendingSentSweepTimer.unref?.(); + async function sweepPendingSent(): Promise { + const ttlCutoff = Date.now() - PENDING_SENT_TTL_MS; + for (const [id, entry] of pendingSentMessages) { + if (entry.registeredAt <= ttlCutoff) { + pendingSentMessages.delete(id); + } + } + // Also drop local handles for any DB rows the scheduled sweeper just + // deleted — keeps the local cache from outliving its DB row. + let deletedIds: string[] = []; + try { + deletedIds = await sweepStalePendingInteractions(); + } catch (error) { + // The store logs its own DB errors; treat as best-effort here. + logger.debug( + { connectionId, error: String(error) }, + "sweepStalePendingInteractions failed during local sweep" + ); + } + for (const id of deletedIds) { + pendingSentMessages.delete(id); } - return entry; } /** - * Put a previously-claimed entry back. Used when a click is rejected - * (e.g. wrong user) so the rightful owner can still answer later. + * Persist a pending question row, then cache its SentMessage handle so a + * click on this pod can edit the card. The persist happens first — see + * `onQuestionCreated` for the post-then-persist policy that wraps the + * card-post; this function is invoked only after the row is durable. */ - function restashQuestion( + function rememberSentMessage( questionId: string, - entry: PendingQuestionEntry + sent: SentMessage | undefined ): void { - if (pendingQuestions.has(questionId)) return; - trackQuestion(entry); - if (entry.question.id !== questionId) { - pendingQuestions.delete(entry.question.id); - pendingQuestions.set(questionId, entry); - } + if (!sent) return; + pendingSentMessages.set(questionId, { + sent, + registeredAt: Date.now(), + }); + } + async function claimQuestion( + questionId: string, + organizationId: string, + expectedUserId: string + ): Promise { + const stored = await claimPendingQuestion( + questionId, + organizationId, + connectionId, + expectedUserId + ).catch((error) => { + logger.error( + { connectionId, questionId, error: String(error) }, + "Failed to claim pending question" + ); + return null; + }); + if (!stored) return undefined; + const cached = pendingSentMessages.get(questionId); + pendingSentMessages.delete(questionId); + return { question: stored.question, sent: cached?.sent }; } const onQuestionCreated = async (event: PostedQuestion) => { try { @@ -234,6 +289,25 @@ export function registerInteractionBridge( if (handledEvents.has(event.id)) return; markHandled(event.id); + // Cross-tenant scoping: every pending row must carry the bridge's + // org. Without a known org we can't safely persist or claim, so + // drop the event rather than write an un-scoped row. + const organizationId = connection.organizationId; + if (!organizationId) { + logger.warn( + { connectionId, questionId: event.id }, + "Skipping question:created — connection has no organizationId" + ); + return; + } + if (!event.userId) { + logger.warn( + { connectionId, questionId: event.id }, + "Skipping question:created — event has no userId" + ); + return; + } + const thread = await resolveThread( manager, connectionId, @@ -242,6 +316,26 @@ export function registerInteractionBridge( ); if (!thread) return; + // Persist the pending row BEFORE posting the card. If the persist + // fails we never show buttons that would no-op on click. If the row + // is written but the post fails, we delete it on the way out so a + // stale row doesn't sit waiting for a click that will never arrive. + try { + await storePendingQuestion( + event.id, + organizationId, + connectionId, + event.userId, + { question: event } + ); + } catch (error) { + logger.error( + { connectionId, questionId: event.id, error: String(error) }, + "Failed to persist pending question — not posting card" + ); + return; + } + const { Card, CardText, Actions, Button } = await import("chat"); const buttons = event.options.map((option, i) => Button({ @@ -260,7 +354,27 @@ export function registerInteractionBridge( connectionId, "question interaction" ); - trackQuestion({ question: event, sent: sent ?? undefined }); + if (!sent) { + // Post failed entirely. The row exists but no card was rendered, + // so a click can never come — drop the row to keep the table + // clean. The DB sweep would catch it eventually; doing it now is + // cheaper and avoids a 24h-stale row. + try { + await claimPendingQuestion( + event.id, + organizationId, + connectionId, + event.userId + ); + } catch (error) { + logger.debug( + { connectionId, questionId: event.id, error: String(error) }, + "Failed to drop pending row after post failure" + ); + } + return; + } + rememberSentMessage(event.id, sent); } catch (error) { logger.error( { connectionId, error: String(error) }, @@ -428,13 +542,40 @@ export function registerInteractionBridge( claimApprovalCard, async (questionId, value, thread, author) => { // Fast path — Slack's block_actions webhook requires a <3s response. - // Claim synchronously (Map.delete), then fire-and-forget the slow - // platform API calls (post receipt, edit card, enqueue worker turn). - const entry = claimQuestion(questionId); - if (!entry) { + // The claim is a single `UPDATE … RETURNING` on a PK and stays well + // under the budget; the slow platform API calls (post receipt, edit + // card, enqueue worker turn) still fire-and-forget below. + // + // Authorisation lives INSIDE the SQL claim: the row only matches when + // `(organization_id, connection_id, expected_user_id)` line up with + // the clicker's context. Wrong-user / cross-connection / cross-tenant + // clicks return null without consuming the row — no claim-then-auth + // race, no restash needed. + const organizationId = connection.organizationId; + if (!organizationId) { + logger.warn( + { connectionId, questionId }, + "Question click on connection with no organizationId — ignoring" + ); + return; + } + if (!author?.userId) { logger.debug( { connectionId, questionId }, - "Question click with no pending entry — ignoring" + "Question click without author.userId — ignoring" + ); + return; + } + + const entry = await claimQuestion( + questionId, + organizationId, + author.userId + ); + if (!entry) { + logger.debug( + { connectionId, questionId, clickerUserId: author.userId }, + "Question click did not match any pending row — ignoring" ); return; } @@ -449,28 +590,6 @@ export function registerInteractionBridge( } const { question } = entry; - - // Only the user who was originally asked may answer. Without this, - // anyone in a Slack/Telegram channel could click another user's - // approval/question buttons and silently impersonate them. Re-stash - // the entry so the rightful owner can still click later. - if ( - author?.userId && - question.userId && - author.userId !== question.userId - ) { - logger.warn( - { - connectionId, - questionId, - clickerUserId: author.userId, - originalUserId: question.userId, - }, - "Question click ignored: clicker is not the original requester" - ); - restashQuestion(questionId, entry); - return; - } const receiptText = value ? `*You submitted:* ${value}` : "*You submitted a response.*"; @@ -548,11 +667,8 @@ export function registerInteractionBridge( } pendingApprovalTimers.clear(); pendingApprovalCards.clear(); - for (const timer of pendingQuestionTimers.values()) { - clearTimeout(timer); - } - pendingQuestionTimers.clear(); - pendingQuestions.clear(); + clearInterval(pendingSentSweepTimer); + pendingSentMessages.clear(); logger.info({ connectionId, platform }, "Interaction bridge unregistered"); }; } diff --git a/packages/server/src/gateway/connections/pending-interaction-store.ts b/packages/server/src/gateway/connections/pending-interaction-store.ts new file mode 100644 index 000000000..f77057c00 --- /dev/null +++ b/packages/server/src/gateway/connections/pending-interaction-store.ts @@ -0,0 +1,114 @@ +/** + * Postgres-backed store for chat-interaction-bridge pending questions. + * + * Replaces the in-process `Map` so a + * button click that lands on pod B can claim a question registered on + * pod A. Backed by `public.pending_interactions`. + * + * `claimPendingQuestion` is a single atomic `UPDATE … RETURNING` scoped + * by `(id, organization_id, connection_id, expected_user_id)`: + * - cross-tenant: a leaked/forged id in another org cannot match. + * - cross-connection: a click on connection X cannot consume a row + * registered for connection Y in the same org. + * - wrong-user: a click from someone other than the original requester + * never sets `claimed_at`, so process death after the SQL check leaves + * the row claimable by the rightful owner — no restash needed. + * + * Only the serializable parts of `PendingQuestionEntry` (the + * `PostedQuestion`) live here. The non-serializable platform `SentMessage` + * handle stays in a small per-pod cache inside the bridge — losing it + * only degrades the card-edit-on-click UX (the answer routes correctly + * either way). + */ + +import { getDb } from "../../db/client.js"; +import type { PostedQuestion } from "../interactions.js"; + +export interface StoredPendingQuestion { + question: PostedQuestion; +} + +export async function storePendingQuestion( + questionId: string, + organizationId: string, + connectionId: string, + expectedUserId: string, + entry: StoredPendingQuestion, +): Promise { + const sql = getDb(); + await sql` + INSERT INTO pending_interactions ( + id, + organization_id, + connection_id, + expected_user_id, + entry_payload + ) + VALUES ( + ${questionId}, + ${organizationId}, + ${connectionId}, + ${expectedUserId}, + ${sql.json(entry as object)} + ) + ON CONFLICT (id) DO UPDATE SET + organization_id = EXCLUDED.organization_id, + connection_id = EXCLUDED.connection_id, + expected_user_id = EXCLUDED.expected_user_id, + entry_payload = EXCLUDED.entry_payload, + created_at = now(), + claimed_at = NULL + `; +} + +/** + * Atomically mark a pending question as claimed and return its payload. + * + * Scoped by `(id, organization_id, connection_id, expected_user_id)` — a + * click that doesn't match all four leaves the row untouched and returns + * null. This fixes three classes of bug that a key-by-id-only claim + * permitted: cross-tenant claim hijacking, cross-connection takeover, and + * the claim-then-auth race where a wrong-user click would consume the + * row and rely on an async restash to put it back. + */ +export async function claimPendingQuestion( + questionId: string, + organizationId: string, + connectionId: string, + expectedUserId: string, +): Promise { + const sql = getDb(); + const rows = await sql` + UPDATE pending_interactions + SET claimed_at = now() + WHERE id = ${questionId} + AND organization_id = ${organizationId} + AND connection_id = ${connectionId} + AND expected_user_id = ${expectedUserId} + AND claimed_at IS NULL + RETURNING entry_payload + `; + if (rows.length === 0) return null; + return ( + (rows[0] as { entry_payload: StoredPendingQuestion }).entry_payload ?? null + ); +} + +/** + * Delete pending_interactions rows older than `maxAgeMs` and return their + * ids. The bridge calls this from the scheduled sweep so it can also evict + * the corresponding per-pod `SentMessage` cache entries — otherwise that + * Map would grow unbounded for questions that are never clicked. + */ +export async function sweepStalePendingInteractions( + maxAgeMs = 24 * 60 * 60 * 1000, +): Promise { + const sql = getDb(); + const cutoff = new Date(Date.now() - maxAgeMs); + const rows = await sql<{ id: string }>` + DELETE FROM pending_interactions + WHERE created_at < ${cutoff} + RETURNING id + `; + return rows.map((r) => r.id); +} diff --git a/packages/server/src/gateway/services/core-services.ts b/packages/server/src/gateway/services/core-services.ts index a0c4f627c..2bf031ec2 100644 --- a/packages/server/src/gateway/services/core-services.ts +++ b/packages/server/src/gateway/services/core-services.ts @@ -29,6 +29,7 @@ import { import { sweepExpiredRateLimits } from "../utils/rate-limiter.js"; import { sweepExpiredGrants } from "../permissions/grant-store.js"; import { sweepCompletedRuns } from "../infrastructure/queue/runs-queue.js"; +import { sweepStalePendingInteractions } from "../connections/pending-interaction-store.js"; import { ProviderCatalogService } from "../auth/provider-catalog.js"; import { AgentSettingsStore } from "../auth/settings/agent-settings-store.js"; import { AuthProfilesManager } from "../auth/settings/auth-profiles-manager.js"; @@ -266,15 +267,21 @@ export class CoreServices { * make this a hygiene task — running ~5 minutes apart is plenty. */ async sweepEphemeralTables(): Promise { try { - const [oauthStates, rate, grants, completedRuns] = await Promise.all([ - sweepExpiredOAuthStates(), - sweepExpiredRateLimits(), - sweepExpiredGrants(), - sweepCompletedRuns(), - ]); - if (oauthStates + rate + grants + completedRuns > 0) { + const [oauthStates, rate, grants, completedRuns, pendingIds] = + await Promise.all([ + sweepExpiredOAuthStates(), + sweepExpiredRateLimits(), + sweepExpiredGrants(), + sweepCompletedRuns(), + sweepStalePendingInteractions(), + ]); + const pendingInteractions = pendingIds.length; + if ( + oauthStates + rate + grants + completedRuns + pendingInteractions > + 0 + ) { logger.debug( - { oauthStates, rate, grants, completedRuns }, + { oauthStates, rate, grants, completedRuns, pendingInteractions }, "Ephemeral table sweeper deleted expired rows" ); }