From 0f599162b33630c1af69bf9e78935226e4cdece7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 01:01:45 +0100 Subject: [PATCH 1/4] refactor(server): move pending interactions to Postgres MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the in-process `Map` in `interaction-bridge.ts` with a `public.pending_interactions` table so a button click landing on pod B can claim a question registered on pod A. `claimPendingQuestion` is an atomic `UPDATE … SET claimed_at = now() WHERE id = $1 AND claimed_at IS NULL RETURNING entry_payload` — Slack webhook retries and racing clicks serialize on the row, the winner gets the payload, losers see null and no-op. The non-serializable platform `SentMessage` handle (used to edit the original card after a click) stays in a small per-pod cache. Losing it cross-pod only degrades card-edit UX; the answer still routes correctly. A 24h stale-row sweep piggybacks on the existing scheduled `sweep-ephemeral-tables` task via `sweepEphemeralTables` in `core-services.ts`. Migration is additive (one new table, two indexes). Single-pod behavior is unchanged. SSE fan-out (sse-manager.ts) is still per-pod and left for a follow-up PR. --- .../20260518000000_pending_interactions.sql | 34 ++++++ db/schema.sql | 50 +++++++- .../pending-interaction-store.test.ts | 90 +++++++++++++++ .../gateway/connections/interaction-bridge.ts | 105 +++++++++++------ .../connections/pending-interaction-store.ts | 107 ++++++++++++++++++ .../src/gateway/services/core-services.ts | 22 ++-- 6 files changed, 362 insertions(+), 46 deletions(-) create mode 100644 db/migrations/20260518000000_pending_interactions.sql create mode 100644 packages/server/src/gateway/__tests__/pending-interaction-store.test.ts create mode 100644 packages/server/src/gateway/connections/pending-interaction-store.ts diff --git a/db/migrations/20260518000000_pending_interactions.sql b/db/migrations/20260518000000_pending_interactions.sql new file mode 100644 index 000000000..d0ccf3537 --- /dev/null +++ b/db/migrations/20260518000000_pending_interactions.sql @@ -0,0 +1,34 @@ +-- migrate:up + +-- Per-question state for the chat-interaction bridge — moved out of the +-- gateway's in-process Map so a button click that lands on pod B can claim +-- a question registered on pod A. The bridge keeps a small per-pod cache for +-- the platform `SentMessage` (used to edit the original card on click) since +-- that's a non-serializable SDK handle; everything that matters for routing +-- the click back into the worker (PostedQuestion + connection context) lives +-- here. + +CREATE TABLE public.pending_interactions ( + id text PRIMARY KEY, + organization_id text REFERENCES public.organization(id) ON DELETE CASCADE, + entry_payload jsonb NOT NULL, + created_at timestamp with time zone NOT NULL DEFAULT now(), + claimed_at timestamp with time zone +); + +-- Claim path is `UPDATE … SET claimed_at = now() WHERE id = $1 AND claimed_at +-- IS NULL RETURNING entry_payload`; a partial index on the unclaimed predicate +-- keeps that lookup index-only. +CREATE INDEX idx_pending_interactions_unclaimed + ON public.pending_interactions (id) + WHERE claimed_at IS NULL; + +-- Background sweeper drops rows older than 24h; index keeps that scan cheap. +CREATE INDEX idx_pending_interactions_created_at + ON public.pending_interactions (created_at); + +-- migrate:down + +DROP INDEX IF EXISTS public.idx_pending_interactions_created_at; +DROP INDEX IF EXISTS public.idx_pending_interactions_unclaimed; +DROP TABLE IF EXISTS public.pending_interactions; diff --git a/db/schema.sql b/db/schema.sql index a5945e7ce..11aa66821 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -1419,6 +1419,20 @@ CREATE TABLE public.organization_lobu_links ( updated_at timestamp with time zone DEFAULT now() NOT NULL ); + +-- +-- Name: pending_interactions; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.pending_interactions ( + id text NOT NULL, + organization_id text, + entry_payload jsonb NOT NULL, + created_at timestamp with time zone DEFAULT now() NOT NULL, + claimed_at timestamp with time zone +); + + -- -- Name: personal_access_tokens; Type: TABLE; Schema: public; Owner: - -- @@ -2633,6 +2647,15 @@ ALTER TABLE ONLY public.organization ALTER TABLE ONLY public.organization ADD CONSTRAINT organization_slug_key UNIQUE (slug); + +-- +-- Name: pending_interactions pending_interactions_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.pending_interactions + ADD CONSTRAINT pending_interactions_pkey PRIMARY KEY (id); + + -- -- Name: personal_access_tokens personal_access_tokens_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -3613,6 +3636,21 @@ CREATE INDEX idx_notification_targets_user_all ON public.notification_targets US CREATE INDEX idx_notification_targets_user_unread ON public.notification_targets USING btree (user_id, delivered_at DESC) WHERE (read_at IS NULL); + +-- +-- Name: idx_pending_interactions_created_at; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_pending_interactions_created_at ON public.pending_interactions USING btree (created_at); + + +-- +-- Name: idx_pending_interactions_unclaimed; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX idx_pending_interactions_unclaimed ON public.pending_interactions USING btree (id) WHERE (claimed_at IS NULL); + + -- -- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: - -- @@ -4768,6 +4806,15 @@ ALTER TABLE ONLY public.organization_lobu_links ALTER TABLE ONLY public.organization_lobu_links ADD CONSTRAINT organization_lobu_links_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; + +-- +-- Name: pending_interactions pending_interactions_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.pending_interactions + ADD CONSTRAINT pending_interactions_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; + + -- -- Name: personal_access_tokens personal_access_tokens_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -5030,4 +5077,5 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260517050000'), ('20260517060000'), ('20260517150000'), - ('20260517160000'); + ('20260517160000'), + ('20260518000000'); diff --git a/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts b/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts new file mode 100644 index 000000000..e9b8acbc9 --- /dev/null +++ b/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts @@ -0,0 +1,90 @@ +/** + * Tier B: claim atomicity and sweep behavior for the PG-backed + * pending-interaction store that backs the chat interaction bridge. + * + * The bridge moved its `Map` into + * `public.pending_interactions` so a button click landing on pod B can + * claim a question registered on pod A. Two things must hold: + * 1. `claimPendingQuestion` is single-winner — two concurrent claims + * return the payload exactly once. + * 2. `sweepStalePendingInteractions` only deletes rows older than the + * given max-age cutoff. + */ +import { beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import { + ensurePgliteForGatewayTests, + resetTestDatabase, +} from "./helpers/db-setup.js"; +import { getDb } from "../../db/client.js"; +import { + claimPendingQuestion, + restashPendingQuestion, + storePendingQuestion, + sweepStalePendingInteractions, +} from "../connections/pending-interaction-store.js"; +import type { PostedQuestion } from "../interactions.js"; + +function buildQuestion(id: string): PostedQuestion { + return { + id, + teamId: undefined, + channelId: "C1", + conversationId: "C1", + userId: "U1", + platform: "slack", + question: "go?", + options: ["yes", "no"], + } as PostedQuestion; +} + +describe("pending-interaction-store", () => { + beforeAll(async () => { + await ensurePgliteForGatewayTests(); + }); + beforeEach(async () => { + await resetTestDatabase(); + }); + + test("claim returns the stored payload exactly once", async () => { + const q = buildQuestion("q-1"); + await storePendingQuestion(q.id, undefined, { question: q }); + + const first = await claimPendingQuestion(q.id); + expect(first?.question.id).toBe("q-1"); + + const second = await claimPendingQuestion(q.id); + expect(second).toBeNull(); + }); + + test("restash makes a claimed question claimable again", async () => { + const q = buildQuestion("q-restash"); + await storePendingQuestion(q.id, undefined, { question: q }); + await claimPendingQuestion(q.id); + + await restashPendingQuestion(q.id, undefined, { question: q }); + const reclaimed = await claimPendingQuestion(q.id); + expect(reclaimed?.question.id).toBe("q-restash"); + }); + + test("sweep deletes only rows older than the cutoff", async () => { + const sql = getDb(); + const fresh = buildQuestion("q-fresh"); + const stale = buildQuestion("q-stale"); + await storePendingQuestion(fresh.id, undefined, { question: fresh }); + await storePendingQuestion(stale.id, undefined, { question: stale }); + + // Backdate one row past the 24h cutoff. + await sql` + UPDATE pending_interactions + SET created_at = now() - interval '48 hours' + WHERE id = ${stale.id} + `; + + const deleted = await sweepStalePendingInteractions(); + expect(deleted).toBe(1); + + // Fresh row is still claimable; stale row is gone. + expect((await claimPendingQuestion(fresh.id))?.question.id).toBe("q-fresh"); + expect(await claimPendingQuestion(stale.id)).toBeNull(); + }); +}); diff --git a/packages/server/src/gateway/connections/interaction-bridge.ts b/packages/server/src/gateway/connections/interaction-bridge.ts index c2b9f1514..5a3deaa67 100644 --- a/packages/server/src/gateway/connections/interaction-bridge.ts +++ b/packages/server/src/gateway/connections/interaction-bridge.ts @@ -12,6 +12,11 @@ import type { } from "../interactions.js"; import type { GrantStore } from "../permissions/grant-store.js"; import type { ChatInstanceManager } from "./chat-instance-manager.js"; +import { + claimPendingQuestion, + restashPendingQuestion, + storePendingQuestion, +} from "./pending-interaction-store.js"; import type { PlatformConnection } from "./types.js"; const logger = createLogger("chat-interaction-bridge"); @@ -190,42 +195,71 @@ export function registerInteractionBridge( return sent; } - // Tracks posted question cards + their original routing context so a click - // can (a) strip the buttons via SentMessage.edit and (b) feed the clicked - // value back through the inbound-enqueue pipeline. - const pendingQuestions = new Map(); - const pendingQuestionTimers = new Map(); - function trackQuestion(entry: PendingQuestionEntry): void { - pendingQuestions.set(entry.question.id, entry); - const timer = setTimeout(() => { - pendingQuestions.delete(entry.question.id); - pendingQuestionTimers.delete(entry.question.id); - }, 300_000); - pendingQuestionTimers.set(entry.question.id, timer); + // Pending questions are persisted in `public.pending_interactions` so a + // click landing on a different pod can still claim the entry. The local + // `pendingSentMessages` map holds the non-serializable platform + // `SentMessage` (used to strip card buttons on click) — losing it + // cross-pod is best-effort UX, not correctness. + const pendingSentMessages = new Map(); + async function trackQuestion(entry: PendingQuestionEntry): Promise { + if (entry.sent) { + pendingSentMessages.set(entry.question.id, entry.sent); + } + try { + await storePendingQuestion( + entry.question.id, + connection.organizationId, + { question: entry.question } + ); + } catch (error) { + pendingSentMessages.delete(entry.question.id); + logger.error( + { connectionId, questionId: entry.question.id, error: String(error) }, + "Failed to persist pending question" + ); + } } - function claimQuestion(questionId: string): PendingQuestionEntry | undefined { - const entry = pendingQuestions.get(questionId); - pendingQuestions.delete(questionId); - const timer = pendingQuestionTimers.get(questionId); - if (timer) { - clearTimeout(timer); - pendingQuestionTimers.delete(questionId); + async function claimQuestion( + questionId: string + ): Promise { + const stored = await claimPendingQuestion(questionId).catch((error) => { + logger.error( + { connectionId, questionId, error: String(error) }, + "Failed to claim pending question" + ); + return null; + }); + if (!stored) { + // Whoever won the race already took the SentMessage entry on their pod. + pendingSentMessages.delete(questionId); + return undefined; } - return entry; + const sent = pendingSentMessages.get(questionId); + pendingSentMessages.delete(questionId); + return { question: stored.question, sent }; } /** * Put a previously-claimed entry back. Used when a click is rejected * (e.g. wrong user) so the rightful owner can still answer later. */ - function restashQuestion( + async function restashQuestion( questionId: string, entry: PendingQuestionEntry - ): void { - if (pendingQuestions.has(questionId)) return; - trackQuestion(entry); - if (entry.question.id !== questionId) { - pendingQuestions.delete(entry.question.id); - pendingQuestions.set(questionId, entry); + ): Promise { + if (entry.sent) { + pendingSentMessages.set(questionId, entry.sent); + } + try { + await restashPendingQuestion( + questionId, + connection.organizationId, + { question: entry.question } + ); + } catch (error) { + logger.error( + { connectionId, questionId, error: String(error) }, + "Failed to restash pending question" + ); } } const onQuestionCreated = async (event: PostedQuestion) => { @@ -260,7 +294,7 @@ export function registerInteractionBridge( connectionId, "question interaction" ); - trackQuestion({ question: event, sent: sent ?? undefined }); + await trackQuestion({ question: event, sent: sent ?? undefined }); } catch (error) { logger.error( { connectionId, error: String(error) }, @@ -428,9 +462,10 @@ export function registerInteractionBridge( claimApprovalCard, async (questionId, value, thread, author) => { // Fast path — Slack's block_actions webhook requires a <3s response. - // Claim synchronously (Map.delete), then fire-and-forget the slow - // platform API calls (post receipt, edit card, enqueue worker turn). - const entry = claimQuestion(questionId); + // The claim is a single `UPDATE … RETURNING` on a PK and stays well + // under the budget; the slow platform API calls (post receipt, edit + // card, enqueue worker turn) still fire-and-forget below. + const entry = await claimQuestion(questionId); if (!entry) { logger.debug( { connectionId, questionId }, @@ -468,7 +503,7 @@ export function registerInteractionBridge( }, "Question click ignored: clicker is not the original requester" ); - restashQuestion(questionId, entry); + await restashQuestion(questionId, entry); return; } const receiptText = value @@ -548,11 +583,7 @@ export function registerInteractionBridge( } pendingApprovalTimers.clear(); pendingApprovalCards.clear(); - for (const timer of pendingQuestionTimers.values()) { - clearTimeout(timer); - } - pendingQuestionTimers.clear(); - pendingQuestions.clear(); + pendingSentMessages.clear(); logger.info({ connectionId, platform }, "Interaction bridge unregistered"); }; } diff --git a/packages/server/src/gateway/connections/pending-interaction-store.ts b/packages/server/src/gateway/connections/pending-interaction-store.ts new file mode 100644 index 000000000..aa6657a4b --- /dev/null +++ b/packages/server/src/gateway/connections/pending-interaction-store.ts @@ -0,0 +1,107 @@ +/** + * Postgres-backed store for chat-interaction-bridge pending questions. + * + * Replaces the in-process `Map` so a + * button click that lands on pod B can claim a question that was registered + * on pod A. Backed by `public.pending_interactions`; `claimPendingQuestion` + * is an atomic `UPDATE … RETURNING` so concurrent clicks (Slack webhook + * retries, two users racing) serialize on the row. + * + * Only the serializable parts of `PendingQuestionEntry` (the `PostedQuestion`) + * live here. The non-serializable platform `SentMessage` handle stays in a + * small per-pod cache inside the bridge — losing it only degrades the + * card-edit-on-click UX (the answer routes correctly either way). + */ + +import { getDb } from "../../db/client.js"; +import type { PostedQuestion } from "../interactions.js"; + +export interface StoredPendingQuestion { + question: PostedQuestion; +} + +export async function storePendingQuestion( + questionId: string, + organizationId: string | undefined, + entry: StoredPendingQuestion, +): Promise { + const sql = getDb(); + await sql` + INSERT INTO pending_interactions (id, organization_id, entry_payload) + VALUES ( + ${questionId}, + ${organizationId ?? null}, + ${sql.json(entry as object)} + ) + ON CONFLICT (id) DO UPDATE SET + organization_id = EXCLUDED.organization_id, + entry_payload = EXCLUDED.entry_payload, + created_at = now(), + claimed_at = NULL + `; +} + +/** + * Atomically mark a pending question as claimed and return its payload. + * Returns null if the question doesn't exist or was already claimed — + * concurrent retries see null and no-op. + */ +export async function claimPendingQuestion( + questionId: string, +): Promise { + const sql = getDb(); + const rows = await sql` + UPDATE pending_interactions + SET claimed_at = now() + WHERE id = ${questionId} + AND claimed_at IS NULL + RETURNING entry_payload + `; + if (rows.length === 0) return null; + return (rows[0] as { entry_payload: StoredPendingQuestion }).entry_payload ?? null; +} + +/** + * Restore a previously-claimed entry — used when the click is rejected + * (wrong user) so the rightful owner can still answer later. Best-effort: + * if the row was already swept we re-INSERT it. + */ +export async function restashPendingQuestion( + questionId: string, + organizationId: string | undefined, + entry: StoredPendingQuestion, +): Promise { + const sql = getDb(); + await sql` + INSERT INTO pending_interactions (id, organization_id, entry_payload) + VALUES ( + ${questionId}, + ${organizationId ?? null}, + ${sql.json(entry as object)} + ) + ON CONFLICT (id) DO UPDATE SET + claimed_at = NULL, + entry_payload = EXCLUDED.entry_payload + `; +} + +/** + * Delete pending_interactions rows older than `maxAgeMs`. Called from the + * scheduled cleanup task so claimed and abandoned rows don't accumulate. + * Returns the deleted row count. + */ +export async function sweepStalePendingInteractions( + maxAgeMs = 24 * 60 * 60 * 1000, +): Promise { + const sql = getDb(); + const cutoff = new Date(Date.now() - maxAgeMs); + const rows = await sql` + WITH deleted AS ( + DELETE FROM pending_interactions + WHERE created_at < ${cutoff} + RETURNING id + ) + SELECT count(*)::int AS count FROM deleted + `; + return Number((rows[0] as { count?: number } | undefined)?.count ?? 0); +} diff --git a/packages/server/src/gateway/services/core-services.ts b/packages/server/src/gateway/services/core-services.ts index a0c4f627c..7fa9131c8 100644 --- a/packages/server/src/gateway/services/core-services.ts +++ b/packages/server/src/gateway/services/core-services.ts @@ -29,6 +29,7 @@ import { import { sweepExpiredRateLimits } from "../utils/rate-limiter.js"; import { sweepExpiredGrants } from "../permissions/grant-store.js"; import { sweepCompletedRuns } from "../infrastructure/queue/runs-queue.js"; +import { sweepStalePendingInteractions } from "../connections/pending-interaction-store.js"; import { ProviderCatalogService } from "../auth/provider-catalog.js"; import { AgentSettingsStore } from "../auth/settings/agent-settings-store.js"; import { AuthProfilesManager } from "../auth/settings/auth-profiles-manager.js"; @@ -266,15 +267,20 @@ export class CoreServices { * make this a hygiene task — running ~5 minutes apart is plenty. */ async sweepEphemeralTables(): Promise { try { - const [oauthStates, rate, grants, completedRuns] = await Promise.all([ - sweepExpiredOAuthStates(), - sweepExpiredRateLimits(), - sweepExpiredGrants(), - sweepCompletedRuns(), - ]); - if (oauthStates + rate + grants + completedRuns > 0) { + const [oauthStates, rate, grants, completedRuns, pendingInteractions] = + await Promise.all([ + sweepExpiredOAuthStates(), + sweepExpiredRateLimits(), + sweepExpiredGrants(), + sweepCompletedRuns(), + sweepStalePendingInteractions(), + ]); + if ( + oauthStates + rate + grants + completedRuns + pendingInteractions > + 0 + ) { logger.debug( - { oauthStates, rate, grants, completedRuns }, + { oauthStates, rate, grants, completedRuns, pendingInteractions }, "Ephemeral table sweeper deleted expired rows" ); } From 096b377b6617327415997919ea5fea810a5c92c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 01:40:04 +0100 Subject: [PATCH 2/4] fix(server): scope pending-interaction claims by org+conn+user, add SentMessage TTL, persist-then-post MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the four pi-review findings on PR #834. [HIGH #1] Cross-tenant claim hole — keying by `id` alone let a leaked or forged question id be consumed by any connection in any org. The claim SQL now scopes by `(id, organization_id, connection_id, expected_user_id)` and the migration adds those columns as NOT NULL. Cross-tenant and cross-connection clicks return null without consuming the row. [MED #2] Unbounded pendingSentMessages — the old 5-minute timer was removed by mistake. A per-bridge `setInterval` now sweeps SentMessage handles older than 24h (matching the DB row TTL) and also drops handles for any row the scheduled DB sweep deleted. The sweep is unref'd and cleared on bridge unregister. [MED #3] Claim-then-auth race — folded into the SQL claim. The `expected_user_id = $4` predicate means wrong-user clicks never set `claimed_at`, so process death after the claim can no longer leave the row in a half-consumed state. The old "claim → check author → async restash" three-step (and `restashPendingQuestion` itself) is gone. [LOW #4] Post-then-persist ordering — `onQuestionCreated` now persists the row first, then posts the card. If the persist fails the card is never shown; if the post fails after a successful persist the row is dropped immediately rather than waiting for the 24h sweep. Red→green coverage: the new test file asserts the scoped claim rejects cross-tenant, cross-connection, and wrong-user clicks AND that the row remains claimable by the rightful owner. These same assertions fail on the pre-fix commit (verified locally against `claimPendingQuestion(id)`). --- .../20260518000000_pending_interactions.sql | 25 +- db/schema.sql | 6 +- .../pending-interaction-store.test.ts | 124 +++++++--- .../gateway/connections/interaction-bridge.ts | 231 ++++++++++++------ .../connections/pending-interaction-store.ts | 117 ++++----- .../src/gateway/services/core-services.ts | 3 +- 6 files changed, 342 insertions(+), 164 deletions(-) diff --git a/db/migrations/20260518000000_pending_interactions.sql b/db/migrations/20260518000000_pending_interactions.sql index d0ccf3537..b51ecae10 100644 --- a/db/migrations/20260518000000_pending_interactions.sql +++ b/db/migrations/20260518000000_pending_interactions.sql @@ -7,20 +7,35 @@ -- that's a non-serializable SDK handle; everything that matters for routing -- the click back into the worker (PostedQuestion + connection context) lives -- here. +-- +-- The claim path scopes by `(id, organization_id, connection_id, +-- expected_user_id)` — keying by `id` alone would let a click from one +-- connection or one user consume a question registered for another. The +-- columns are NOT NULL so the SQL claim is a single index hit with no +-- branching for NULL semantics. CREATE TABLE public.pending_interactions ( id text PRIMARY KEY, - organization_id text REFERENCES public.organization(id) ON DELETE CASCADE, + organization_id text NOT NULL REFERENCES public.organization(id) ON DELETE CASCADE, + connection_id text NOT NULL, + expected_user_id text NOT NULL, entry_payload jsonb NOT NULL, created_at timestamp with time zone NOT NULL DEFAULT now(), claimed_at timestamp with time zone ); --- Claim path is `UPDATE … SET claimed_at = now() WHERE id = $1 AND claimed_at --- IS NULL RETURNING entry_payload`; a partial index on the unclaimed predicate --- keeps that lookup index-only. +-- Claim path is +-- UPDATE pending_interactions +-- SET claimed_at = now() +-- WHERE id = $1 +-- AND organization_id = $2 +-- AND connection_id = $3 +-- AND expected_user_id = $4 +-- AND claimed_at IS NULL +-- RETURNING entry_payload +-- — a partial index on the unclaimed predicate keeps the lookup index-only. CREATE INDEX idx_pending_interactions_unclaimed - ON public.pending_interactions (id) + ON public.pending_interactions (id, organization_id, connection_id, expected_user_id) WHERE claimed_at IS NULL; -- Background sweeper drops rows older than 24h; index keeps that scan cheap. diff --git a/db/schema.sql b/db/schema.sql index 11aa66821..d749657d5 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -1426,7 +1426,9 @@ CREATE TABLE public.organization_lobu_links ( CREATE TABLE public.pending_interactions ( id text NOT NULL, - organization_id text, + organization_id text NOT NULL, + connection_id text NOT NULL, + expected_user_id text NOT NULL, entry_payload jsonb NOT NULL, created_at timestamp with time zone DEFAULT now() NOT NULL, claimed_at timestamp with time zone @@ -3648,7 +3650,7 @@ CREATE INDEX idx_pending_interactions_created_at ON public.pending_interactions -- Name: idx_pending_interactions_unclaimed; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX idx_pending_interactions_unclaimed ON public.pending_interactions USING btree (id) WHERE (claimed_at IS NULL); +CREATE INDEX idx_pending_interactions_unclaimed ON public.pending_interactions USING btree (id, organization_id, connection_id, expected_user_id) WHERE (claimed_at IS NULL); -- diff --git a/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts b/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts index e9b8acbc9..6a01550a5 100644 --- a/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts +++ b/packages/server/src/gateway/__tests__/pending-interaction-store.test.ts @@ -1,14 +1,19 @@ /** - * Tier B: claim atomicity and sweep behavior for the PG-backed + * Tier B: claim atomicity, scoping, and sweep behavior for the PG-backed * pending-interaction store that backs the chat interaction bridge. * - * The bridge moved its `Map` into - * `public.pending_interactions` so a button click landing on pod B can - * claim a question registered on pod A. Two things must hold: + * The store backs the bridge's `Map` + * with `public.pending_interactions`. Three properties matter: * 1. `claimPendingQuestion` is single-winner — two concurrent claims - * return the payload exactly once. - * 2. `sweepStalePendingInteractions` only deletes rows older than the - * given max-age cutoff. + * with matching scope return the payload exactly once. + * 2. The scope tuple `(id, organization_id, connection_id, + * expected_user_id)` is enforced inside the SQL claim — mismatched + * org / connection / user clicks return null and DO NOT consume the + * row. These are the red→green checks that gate findings #1 (cross- + * tenant claim hole) and #3 (claim-then-auth race). + * 3. `sweepStalePendingInteractions` only deletes rows older than the + * given max-age cutoff and returns the deleted ids so the bridge + * can sync its local SentMessage cache. */ import { beforeAll, beforeEach, describe, expect, test } from "bun:test"; import { @@ -18,19 +23,34 @@ import { import { getDb } from "../../db/client.js"; import { claimPendingQuestion, - restashPendingQuestion, storePendingQuestion, sweepStalePendingInteractions, } from "../connections/pending-interaction-store.js"; import type { PostedQuestion } from "../interactions.js"; -function buildQuestion(id: string): PostedQuestion { +const ORG_A = "org-a"; +const ORG_B = "org-b"; +const CONN_A = "conn-a"; +const CONN_B = "conn-b"; +const USER_A = "U_A"; +const USER_B = "U_B"; + +async function seedOrg(id: string): Promise { + const sql = getDb(); + await sql` + INSERT INTO organization (id, name, slug) + VALUES (${id}, ${id}, ${id}) + ON CONFLICT (id) DO NOTHING + `; +} + +function buildQuestion(id: string, userId = USER_A): PostedQuestion { return { id, teamId: undefined, channelId: "C1", conversationId: "C1", - userId: "U1", + userId, platform: "slack", question: "go?", options: ["yes", "no"], @@ -43,35 +63,79 @@ describe("pending-interaction-store", () => { }); beforeEach(async () => { await resetTestDatabase(); + await seedOrg(ORG_A); + await seedOrg(ORG_B); }); - test("claim returns the stored payload exactly once", async () => { + test("matching-scope claim returns the stored payload exactly once", async () => { const q = buildQuestion("q-1"); - await storePendingQuestion(q.id, undefined, { question: q }); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); - const first = await claimPendingQuestion(q.id); + const first = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); expect(first?.question.id).toBe("q-1"); - const second = await claimPendingQuestion(q.id); + const second = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); expect(second).toBeNull(); }); - test("restash makes a claimed question claimable again", async () => { - const q = buildQuestion("q-restash"); - await storePendingQuestion(q.id, undefined, { question: q }); - await claimPendingQuestion(q.id); + // Finding #1 — cross-tenant claim hole. + // + // Red on the previous commit: `claimPendingQuestion(id)` was keyed by + // `id` only, so org B could consume org A's row by replaying the id. + // Green here: scope by `organization_id` blocks the claim, AND the + // row stays untouched so the rightful org can still claim it. + test("cross-tenant claim is rejected and does NOT consume the row", async () => { + const q = buildQuestion("q-cross-tenant"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const wrongOrg = await claimPendingQuestion(q.id, ORG_B, CONN_A, USER_A); + expect(wrongOrg).toBeNull(); + + const rightful = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(rightful?.question.id).toBe("q-cross-tenant"); + }); + + test("cross-connection claim is rejected and does NOT consume the row", async () => { + const q = buildQuestion("q-cross-conn"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const wrongConn = await claimPendingQuestion(q.id, ORG_A, CONN_B, USER_A); + expect(wrongConn).toBeNull(); + + const rightful = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(rightful?.question.id).toBe("q-cross-conn"); + }); + + // Finding #3 — claim-then-auth race. + // + // Red on the previous commit: the handler did `claimQuestion(id)` + // first, then compared `author.userId` against the row's userId, then + // async-restashed on mismatch. A crash between claim and restash + // permanently consumed the row until the 24h sweep. + // Green here: `expected_user_id` is part of the SQL claim, so a wrong- + // user click returns null without ever setting `claimed_at`. No + // restash is needed because no claim happens. + test("wrong-user click is rejected and does NOT consume the row", async () => { + const q = buildQuestion("q-wrong-user", USER_A); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const wrongUser = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_B); + expect(wrongUser).toBeNull(); - await restashPendingQuestion(q.id, undefined, { question: q }); - const reclaimed = await claimPendingQuestion(q.id); - expect(reclaimed?.question.id).toBe("q-restash"); + const rightful = await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(rightful?.question.id).toBe("q-wrong-user"); }); - test("sweep deletes only rows older than the cutoff", async () => { + test("sweep deletes only rows older than the cutoff and returns their ids", async () => { const sql = getDb(); const fresh = buildQuestion("q-fresh"); const stale = buildQuestion("q-stale"); - await storePendingQuestion(fresh.id, undefined, { question: fresh }); - await storePendingQuestion(stale.id, undefined, { question: stale }); + await storePendingQuestion(fresh.id, ORG_A, CONN_A, USER_A, { + question: fresh, + }); + await storePendingQuestion(stale.id, ORG_A, CONN_A, USER_A, { + question: stale, + }); // Backdate one row past the 24h cutoff. await sql` @@ -80,11 +144,15 @@ describe("pending-interaction-store", () => { WHERE id = ${stale.id} `; - const deleted = await sweepStalePendingInteractions(); - expect(deleted).toBe(1); + const deletedIds = await sweepStalePendingInteractions(); + expect(deletedIds).toEqual(["q-stale"]); // Fresh row is still claimable; stale row is gone. - expect((await claimPendingQuestion(fresh.id))?.question.id).toBe("q-fresh"); - expect(await claimPendingQuestion(stale.id)).toBeNull(); + expect( + (await claimPendingQuestion(fresh.id, ORG_A, CONN_A, USER_A))?.question.id + ).toBe("q-fresh"); + expect( + await claimPendingQuestion(stale.id, ORG_A, CONN_A, USER_A) + ).toBeNull(); }); }); diff --git a/packages/server/src/gateway/connections/interaction-bridge.ts b/packages/server/src/gateway/connections/interaction-bridge.ts index 5a3deaa67..d79ee130a 100644 --- a/packages/server/src/gateway/connections/interaction-bridge.ts +++ b/packages/server/src/gateway/connections/interaction-bridge.ts @@ -14,8 +14,8 @@ import type { GrantStore } from "../permissions/grant-store.js"; import type { ChatInstanceManager } from "./chat-instance-manager.js"; import { claimPendingQuestion, - restashPendingQuestion, storePendingQuestion, + sweepStalePendingInteractions, } from "./pending-interaction-store.js"; import type { PlatformConnection } from "./types.js"; @@ -200,67 +200,88 @@ export function registerInteractionBridge( // `pendingSentMessages` map holds the non-serializable platform // `SentMessage` (used to strip card buttons on click) — losing it // cross-pod is best-effort UX, not correctness. - const pendingSentMessages = new Map(); - async function trackQuestion(entry: PendingQuestionEntry): Promise { - if (entry.sent) { - pendingSentMessages.set(entry.question.id, entry.sent); + // + // Each entry remembers `registeredAt` so the periodic sweep can evict + // stale handles that match the 24h DB-row TTL. Without this the Map would + // grow unbounded for questions that are never clicked. The sweep also + // removes the local handle for any row the DB sweeper actually deleted, + // so the two stay in sync. + const PENDING_SENT_TTL_MS = 24 * 60 * 60 * 1000; + const PENDING_SENT_SWEEP_INTERVAL_MS = 60 * 60 * 1000; + interface CachedSent { + sent: SentMessage; + registeredAt: number; + } + const pendingSentMessages = new Map(); + const pendingSentSweepTimer = setInterval(() => { + sweepPendingSent().catch((error) => { + logger.warn( + { connectionId, error: String(error) }, + "pendingSentMessages sweep failed" + ); + }); + }, PENDING_SENT_SWEEP_INTERVAL_MS); + pendingSentSweepTimer.unref?.(); + async function sweepPendingSent(): Promise { + const ttlCutoff = Date.now() - PENDING_SENT_TTL_MS; + for (const [id, entry] of pendingSentMessages) { + if (entry.registeredAt <= ttlCutoff) { + pendingSentMessages.delete(id); + } } + // Also drop local handles for any DB rows the scheduled sweeper just + // deleted — keeps the local cache from outliving its DB row. + let deletedIds: string[] = []; try { - await storePendingQuestion( - entry.question.id, - connection.organizationId, - { question: entry.question } - ); + deletedIds = await sweepStalePendingInteractions(); } catch (error) { - pendingSentMessages.delete(entry.question.id); - logger.error( - { connectionId, questionId: entry.question.id, error: String(error) }, - "Failed to persist pending question" + // The store logs its own DB errors; treat as best-effort here. + logger.debug( + { connectionId, error: String(error) }, + "sweepStalePendingInteractions failed during local sweep" ); } + for (const id of deletedIds) { + pendingSentMessages.delete(id); + } + } + /** + * Persist a pending question row, then cache its SentMessage handle so a + * click on this pod can edit the card. The persist happens first — see + * `onQuestionCreated` for the post-then-persist policy that wraps the + * card-post; this function is invoked only after the row is durable. + */ + function rememberSentMessage( + questionId: string, + sent: SentMessage | undefined + ): void { + if (!sent) return; + pendingSentMessages.set(questionId, { + sent, + registeredAt: Date.now(), + }); } async function claimQuestion( - questionId: string + questionId: string, + organizationId: string, + expectedUserId: string ): Promise { - const stored = await claimPendingQuestion(questionId).catch((error) => { + const stored = await claimPendingQuestion( + questionId, + organizationId, + connectionId, + expectedUserId + ).catch((error) => { logger.error( { connectionId, questionId, error: String(error) }, "Failed to claim pending question" ); return null; }); - if (!stored) { - // Whoever won the race already took the SentMessage entry on their pod. - pendingSentMessages.delete(questionId); - return undefined; - } - const sent = pendingSentMessages.get(questionId); + if (!stored) return undefined; + const cached = pendingSentMessages.get(questionId); pendingSentMessages.delete(questionId); - return { question: stored.question, sent }; - } - /** - * Put a previously-claimed entry back. Used when a click is rejected - * (e.g. wrong user) so the rightful owner can still answer later. - */ - async function restashQuestion( - questionId: string, - entry: PendingQuestionEntry - ): Promise { - if (entry.sent) { - pendingSentMessages.set(questionId, entry.sent); - } - try { - await restashPendingQuestion( - questionId, - connection.organizationId, - { question: entry.question } - ); - } catch (error) { - logger.error( - { connectionId, questionId, error: String(error) }, - "Failed to restash pending question" - ); - } + return { question: stored.question, sent: cached?.sent }; } const onQuestionCreated = async (event: PostedQuestion) => { try { @@ -268,6 +289,25 @@ export function registerInteractionBridge( if (handledEvents.has(event.id)) return; markHandled(event.id); + // Cross-tenant scoping: every pending row must carry the bridge's + // org. Without a known org we can't safely persist or claim, so + // drop the event rather than write an un-scoped row. + const organizationId = connection.organizationId; + if (!organizationId) { + logger.warn( + { connectionId, questionId: event.id }, + "Skipping question:created — connection has no organizationId" + ); + return; + } + if (!event.userId) { + logger.warn( + { connectionId, questionId: event.id }, + "Skipping question:created — event has no userId" + ); + return; + } + const thread = await resolveThread( manager, connectionId, @@ -276,6 +316,26 @@ export function registerInteractionBridge( ); if (!thread) return; + // Persist the pending row BEFORE posting the card. If the persist + // fails we never show buttons that would no-op on click. If the row + // is written but the post fails, we delete it on the way out so a + // stale row doesn't sit waiting for a click that will never arrive. + try { + await storePendingQuestion( + event.id, + organizationId, + connectionId, + event.userId, + { question: event } + ); + } catch (error) { + logger.error( + { connectionId, questionId: event.id, error: String(error) }, + "Failed to persist pending question — not posting card" + ); + return; + } + const { Card, CardText, Actions, Button } = await import("chat"); const buttons = event.options.map((option, i) => Button({ @@ -294,7 +354,27 @@ export function registerInteractionBridge( connectionId, "question interaction" ); - await trackQuestion({ question: event, sent: sent ?? undefined }); + if (!sent) { + // Post failed entirely. The row exists but no card was rendered, + // so a click can never come — drop the row to keep the table + // clean. The DB sweep would catch it eventually; doing it now is + // cheaper and avoids a 24h-stale row. + try { + await claimPendingQuestion( + event.id, + organizationId, + connectionId, + event.userId + ); + } catch (error) { + logger.debug( + { connectionId, questionId: event.id, error: String(error) }, + "Failed to drop pending row after post failure" + ); + } + return; + } + rememberSentMessage(event.id, sent); } catch (error) { logger.error( { connectionId, error: String(error) }, @@ -465,11 +545,37 @@ export function registerInteractionBridge( // The claim is a single `UPDATE … RETURNING` on a PK and stays well // under the budget; the slow platform API calls (post receipt, edit // card, enqueue worker turn) still fire-and-forget below. - const entry = await claimQuestion(questionId); - if (!entry) { + // + // Authorisation lives INSIDE the SQL claim: the row only matches when + // `(organization_id, connection_id, expected_user_id)` line up with + // the clicker's context. Wrong-user / cross-connection / cross-tenant + // clicks return null without consuming the row — no claim-then-auth + // race, no restash needed. + const organizationId = connection.organizationId; + if (!organizationId) { + logger.warn( + { connectionId, questionId }, + "Question click on connection with no organizationId — ignoring" + ); + return; + } + if (!author?.userId) { logger.debug( { connectionId, questionId }, - "Question click with no pending entry — ignoring" + "Question click without author.userId — ignoring" + ); + return; + } + + const entry = await claimQuestion( + questionId, + organizationId, + author.userId + ); + if (!entry) { + logger.debug( + { connectionId, questionId, clickerUserId: author.userId }, + "Question click did not match any pending row — ignoring" ); return; } @@ -484,28 +590,6 @@ export function registerInteractionBridge( } const { question } = entry; - - // Only the user who was originally asked may answer. Without this, - // anyone in a Slack/Telegram channel could click another user's - // approval/question buttons and silently impersonate them. Re-stash - // the entry so the rightful owner can still click later. - if ( - author?.userId && - question.userId && - author.userId !== question.userId - ) { - logger.warn( - { - connectionId, - questionId, - clickerUserId: author.userId, - originalUserId: question.userId, - }, - "Question click ignored: clicker is not the original requester" - ); - await restashQuestion(questionId, entry); - return; - } const receiptText = value ? `*You submitted:* ${value}` : "*You submitted a response.*"; @@ -583,6 +667,7 @@ export function registerInteractionBridge( } pendingApprovalTimers.clear(); pendingApprovalCards.clear(); + clearInterval(pendingSentSweepTimer); pendingSentMessages.clear(); logger.info({ connectionId, platform }, "Interaction bridge unregistered"); }; diff --git a/packages/server/src/gateway/connections/pending-interaction-store.ts b/packages/server/src/gateway/connections/pending-interaction-store.ts index aa6657a4b..f77057c00 100644 --- a/packages/server/src/gateway/connections/pending-interaction-store.ts +++ b/packages/server/src/gateway/connections/pending-interaction-store.ts @@ -2,15 +2,23 @@ * Postgres-backed store for chat-interaction-bridge pending questions. * * Replaces the in-process `Map` so a - * button click that lands on pod B can claim a question that was registered - * on pod A. Backed by `public.pending_interactions`; `claimPendingQuestion` - * is an atomic `UPDATE … RETURNING` so concurrent clicks (Slack webhook - * retries, two users racing) serialize on the row. + * button click that lands on pod B can claim a question registered on + * pod A. Backed by `public.pending_interactions`. * - * Only the serializable parts of `PendingQuestionEntry` (the `PostedQuestion`) - * live here. The non-serializable platform `SentMessage` handle stays in a - * small per-pod cache inside the bridge — losing it only degrades the - * card-edit-on-click UX (the answer routes correctly either way). + * `claimPendingQuestion` is a single atomic `UPDATE … RETURNING` scoped + * by `(id, organization_id, connection_id, expected_user_id)`: + * - cross-tenant: a leaked/forged id in another org cannot match. + * - cross-connection: a click on connection X cannot consume a row + * registered for connection Y in the same org. + * - wrong-user: a click from someone other than the original requester + * never sets `claimed_at`, so process death after the SQL check leaves + * the row claimable by the rightful owner — no restash needed. + * + * Only the serializable parts of `PendingQuestionEntry` (the + * `PostedQuestion`) live here. The non-serializable platform `SentMessage` + * handle stays in a small per-pod cache inside the bridge — losing it + * only degrades the card-edit-on-click UX (the answer routes correctly + * either way). */ import { getDb } from "../../db/client.js"; @@ -22,86 +30,85 @@ export interface StoredPendingQuestion { export async function storePendingQuestion( questionId: string, - organizationId: string | undefined, + organizationId: string, + connectionId: string, + expectedUserId: string, entry: StoredPendingQuestion, ): Promise { const sql = getDb(); await sql` - INSERT INTO pending_interactions (id, organization_id, entry_payload) + INSERT INTO pending_interactions ( + id, + organization_id, + connection_id, + expected_user_id, + entry_payload + ) VALUES ( ${questionId}, - ${organizationId ?? null}, + ${organizationId}, + ${connectionId}, + ${expectedUserId}, ${sql.json(entry as object)} ) ON CONFLICT (id) DO UPDATE SET - organization_id = EXCLUDED.organization_id, - entry_payload = EXCLUDED.entry_payload, - created_at = now(), - claimed_at = NULL + organization_id = EXCLUDED.organization_id, + connection_id = EXCLUDED.connection_id, + expected_user_id = EXCLUDED.expected_user_id, + entry_payload = EXCLUDED.entry_payload, + created_at = now(), + claimed_at = NULL `; } /** * Atomically mark a pending question as claimed and return its payload. - * Returns null if the question doesn't exist or was already claimed — - * concurrent retries see null and no-op. + * + * Scoped by `(id, organization_id, connection_id, expected_user_id)` — a + * click that doesn't match all four leaves the row untouched and returns + * null. This fixes three classes of bug that a key-by-id-only claim + * permitted: cross-tenant claim hijacking, cross-connection takeover, and + * the claim-then-auth race where a wrong-user click would consume the + * row and rely on an async restash to put it back. */ export async function claimPendingQuestion( questionId: string, + organizationId: string, + connectionId: string, + expectedUserId: string, ): Promise { const sql = getDb(); const rows = await sql` UPDATE pending_interactions SET claimed_at = now() - WHERE id = ${questionId} + WHERE id = ${questionId} + AND organization_id = ${organizationId} + AND connection_id = ${connectionId} + AND expected_user_id = ${expectedUserId} AND claimed_at IS NULL RETURNING entry_payload `; if (rows.length === 0) return null; - return (rows[0] as { entry_payload: StoredPendingQuestion }).entry_payload ?? null; -} - -/** - * Restore a previously-claimed entry — used when the click is rejected - * (wrong user) so the rightful owner can still answer later. Best-effort: - * if the row was already swept we re-INSERT it. - */ -export async function restashPendingQuestion( - questionId: string, - organizationId: string | undefined, - entry: StoredPendingQuestion, -): Promise { - const sql = getDb(); - await sql` - INSERT INTO pending_interactions (id, organization_id, entry_payload) - VALUES ( - ${questionId}, - ${organizationId ?? null}, - ${sql.json(entry as object)} - ) - ON CONFLICT (id) DO UPDATE SET - claimed_at = NULL, - entry_payload = EXCLUDED.entry_payload - `; + return ( + (rows[0] as { entry_payload: StoredPendingQuestion }).entry_payload ?? null + ); } /** - * Delete pending_interactions rows older than `maxAgeMs`. Called from the - * scheduled cleanup task so claimed and abandoned rows don't accumulate. - * Returns the deleted row count. + * Delete pending_interactions rows older than `maxAgeMs` and return their + * ids. The bridge calls this from the scheduled sweep so it can also evict + * the corresponding per-pod `SentMessage` cache entries — otherwise that + * Map would grow unbounded for questions that are never clicked. */ export async function sweepStalePendingInteractions( maxAgeMs = 24 * 60 * 60 * 1000, -): Promise { +): Promise { const sql = getDb(); const cutoff = new Date(Date.now() - maxAgeMs); - const rows = await sql` - WITH deleted AS ( - DELETE FROM pending_interactions - WHERE created_at < ${cutoff} - RETURNING id - ) - SELECT count(*)::int AS count FROM deleted + const rows = await sql<{ id: string }>` + DELETE FROM pending_interactions + WHERE created_at < ${cutoff} + RETURNING id `; - return Number((rows[0] as { count?: number } | undefined)?.count ?? 0); + return rows.map((r) => r.id); } diff --git a/packages/server/src/gateway/services/core-services.ts b/packages/server/src/gateway/services/core-services.ts index 7fa9131c8..2bf031ec2 100644 --- a/packages/server/src/gateway/services/core-services.ts +++ b/packages/server/src/gateway/services/core-services.ts @@ -267,7 +267,7 @@ export class CoreServices { * make this a hygiene task — running ~5 minutes apart is plenty. */ async sweepEphemeralTables(): Promise { try { - const [oauthStates, rate, grants, completedRuns, pendingInteractions] = + const [oauthStates, rate, grants, completedRuns, pendingIds] = await Promise.all([ sweepExpiredOAuthStates(), sweepExpiredRateLimits(), @@ -275,6 +275,7 @@ export class CoreServices { sweepCompletedRuns(), sweepStalePendingInteractions(), ]); + const pendingInteractions = pendingIds.length; if ( oauthStates + rate + grants + completedRuns + pendingInteractions > 0 From be52b90bf01e9b953bff5ebc7722208c1069a84f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 02:53:06 +0100 Subject: [PATCH 3/4] chore(schema): collapse double-blank-lines around pending_interactions to match dbmate dump --- db/schema.sql | 9 --------- 1 file changed, 9 deletions(-) diff --git a/db/schema.sql b/db/schema.sql index d749657d5..b803be343 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -1419,7 +1419,6 @@ CREATE TABLE public.organization_lobu_links ( updated_at timestamp with time zone DEFAULT now() NOT NULL ); - -- -- Name: pending_interactions; Type: TABLE; Schema: public; Owner: - -- @@ -1434,7 +1433,6 @@ CREATE TABLE public.pending_interactions ( claimed_at timestamp with time zone ); - -- -- Name: personal_access_tokens; Type: TABLE; Schema: public; Owner: - -- @@ -2649,7 +2647,6 @@ ALTER TABLE ONLY public.organization ALTER TABLE ONLY public.organization ADD CONSTRAINT organization_slug_key UNIQUE (slug); - -- -- Name: pending_interactions pending_interactions_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -2657,7 +2654,6 @@ ALTER TABLE ONLY public.organization ALTER TABLE ONLY public.pending_interactions ADD CONSTRAINT pending_interactions_pkey PRIMARY KEY (id); - -- -- Name: personal_access_tokens personal_access_tokens_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -3638,21 +3634,18 @@ CREATE INDEX idx_notification_targets_user_all ON public.notification_targets US CREATE INDEX idx_notification_targets_user_unread ON public.notification_targets USING btree (user_id, delivered_at DESC) WHERE (read_at IS NULL); - -- -- Name: idx_pending_interactions_created_at; Type: INDEX; Schema: public; Owner: - -- CREATE INDEX idx_pending_interactions_created_at ON public.pending_interactions USING btree (created_at); - -- -- Name: idx_pending_interactions_unclaimed; Type: INDEX; Schema: public; Owner: - -- CREATE INDEX idx_pending_interactions_unclaimed ON public.pending_interactions USING btree (id, organization_id, connection_id, expected_user_id) WHERE (claimed_at IS NULL); - -- -- Name: idx_personal_access_tokens_worker_id; Type: INDEX; Schema: public; Owner: - -- @@ -4808,7 +4801,6 @@ ALTER TABLE ONLY public.organization_lobu_links ALTER TABLE ONLY public.organization_lobu_links ADD CONSTRAINT organization_lobu_links_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; - -- -- Name: pending_interactions pending_interactions_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- @@ -4816,7 +4808,6 @@ ALTER TABLE ONLY public.organization_lobu_links ALTER TABLE ONLY public.pending_interactions ADD CONSTRAINT pending_interactions_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES public.organization(id) ON DELETE CASCADE; - -- -- Name: personal_access_tokens personal_access_tokens_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- From 3c1f16f1bbd18d4832d46e83c8564595fa50f20b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Burak=20Emre=20Kabakc=C4=B1?= Date: Mon, 18 May 2026 03:06:48 +0100 Subject: [PATCH 4/4] fix(test): seed organizationId on makeConnection in connections-platform-isolation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bridge's new per-tenant guard returns before resolveThread when connection.organizationId is missing — three tests asserting instanceChat.channel() got called fell to 0 calls. Fixture now sets organizationId: "test-org" so the path reaches resolveThread; the subsequent persist still fails on FK in CI (no org row), but is caught by the bridge's persist-then-post catch block — which is what the bridge does in production when persistence drops. --- .../_lib/apply/__tests__/diff.test.ts | 1894 ++++++++--------- .../connections-platform-isolation.test.ts | 4 + 2 files changed, 951 insertions(+), 947 deletions(-) diff --git a/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts b/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts index 549d50d10..e8d79393c 100644 --- a/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts +++ b/packages/cli/src/commands/_lib/apply/__tests__/diff.test.ts @@ -10,983 +10,983 @@ import { renderPlan, renderSummary } from "../render.js"; chalk.level = 0; function buildDesiredAgent( - agentId: string, - overrides: Partial = {}, + agentId: string, + overrides: Partial = {} ): DesiredAgent { - return { - metadata: { agentId, name: agentId, description: undefined }, - settings: {}, - platforms: [], - ...overrides, - }; + return { + metadata: { agentId, name: agentId, description: undefined }, + settings: {}, + platforms: [], + ...overrides, + }; } function buildState( - agents: DesiredAgent[], - overrides: Partial = {}, + agents: DesiredAgent[], + overrides: Partial = {} ): DesiredState { - return { - agents, - memorySchema: { entityTypes: [], relationshipTypes: [] }, - watchers: [], - connectors: { definitions: [], authProfiles: [], connections: [] }, - requiredSecrets: [], - ...overrides, - }; + return { + agents, + memorySchema: { entityTypes: [], relationshipTypes: [] }, + watchers: [], + connectors: { definitions: [], authProfiles: [], connections: [] }, + requiredSecrets: [], + ...overrides, + }; } function emptyRemote(): RemoteSnapshot { - return { - agents: [], - agentSettings: new Map(), - platformsByAgent: new Map(), - entityTypes: [], - relationshipTypes: [], - watchers: [], - connectorDefinitions: [], - authProfiles: [], - connections: [], - feedsByConnectionId: new Map(), - }; + return { + agents: [], + agentSettings: new Map(), + platformsByAgent: new Map(), + entityTypes: [], + relationshipTypes: [], + watchers: [], + connectorDefinitions: [], + authProfiles: [], + connections: [], + feedsByConnectionId: new Map(), + }; } describe("apply diff — agents", () => { - test("create from empty remote", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { - agentId: "triage", - name: "Triage", - description: "Triage bot", - }, - }), - ]); - const plan = computeDiff(desired, emptyRemote()); - - expect(plan.counts).toEqual({ create: 2, update: 0, noop: 0, drift: 0 }); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("noop when remote matches desired", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.noop).toBeGreaterThan(0); - expect(plan.counts.create).toBe(0); - expect(plan.counts.update).toBe(0); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("update when name differs", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Renamed" }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Original" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.update).toBeGreaterThan(0); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("drift when remote has agent not in desired", () => { - const desired = buildState([]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "stale", name: "Stale Agent" }], - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.drift).toBe(1); - expect(renderPlan(plan)).toMatchSnapshot(); - }); + test("create from empty remote", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { + agentId: "triage", + name: "Triage", + description: "Triage bot", + }, + }), + ]); + const plan = computeDiff(desired, emptyRemote()); + + expect(plan.counts).toEqual({ create: 2, update: 0, noop: 0, drift: 0 }); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("noop when remote matches desired", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.noop).toBeGreaterThan(0); + expect(plan.counts.create).toBe(0); + expect(plan.counts.update).toBe(0); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("update when name differs", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Renamed" }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Original" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.update).toBeGreaterThan(0); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("drift when remote has agent not in desired", () => { + const desired = buildState([]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "stale", name: "Stale Agent" }], + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.drift).toBe(1); + expect(renderPlan(plan)).toMatchSnapshot(); + }); }); describe("apply diff — settings", () => { - test("update on networkConfig change", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: ["github.com"] }, - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["pypi.org"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("networkConfig"); - } - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("updates when provider declarations change but ignores installedAt churn", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - installedProviders: [ - { providerId: "anthropic", installedAt: 200 }, - { providerId: "openai", installedAt: 200 }, - ], - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - installedProviders: [{ providerId: "anthropic", installedAt: 100 }], - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("installedProviders"); - } - - const unchanged = computeDiff(desired, { - ...remote, - agentSettings: new Map([ - [ - "triage", - { - installedProviders: [ - { providerId: "anthropic", installedAt: 1 }, - { providerId: "openai", installedAt: 2 }, - ], - updatedAt: 0, - }, - ], - ]), - }); - const unchangedSettingsRow = unchanged.rows.find( - (r) => r.kind === "settings", - ); - expect(unchangedSettingsRow?.verb).toBe("noop"); - }); + test("update on networkConfig change", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: ["github.com"] }, + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["pypi.org"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("networkConfig"); + } + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("updates when provider declarations change but ignores installedAt churn", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + installedProviders: [ + { providerId: "anthropic", installedAt: 200 }, + { providerId: "openai", installedAt: 200 }, + ], + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + installedProviders: [{ providerId: "anthropic", installedAt: 100 }], + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("installedProviders"); + } + + const unchanged = computeDiff(desired, { + ...remote, + agentSettings: new Map([ + [ + "triage", + { + installedProviders: [ + { providerId: "anthropic", installedAt: 1 }, + { providerId: "openai", installedAt: 2 }, + ], + updatedAt: 0, + }, + ], + ]), + }); + const unchangedSettingsRow = unchanged.rows.find( + (r) => r.kind === "settings" + ); + expect(unchangedSettingsRow?.verb).toBe("noop"); + }); }); describe("apply diff — platforms", () => { - test("create on empty remote", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: { botToken: "abc" }, - }, - ], - }), - ]); - const plan = computeDiff(desired, emptyRemote()); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("create"); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("update with willRestart when config changes", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: { botToken: "new" }, - }, - ], - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([ - [ - "triage", - [ - { - id: "triage-telegram", - platform: "telegram", - config: { botToken: "old" }, - }, - ], - ], - ]), - }; - const plan = computeDiff(desired, remote); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("update"); - if (platformRow?.kind === "platform") { - expect(platformRow.willRestart).toBe(true); - } - expect(renderPlan(plan)).toMatchSnapshot(); - }); + test("create on empty remote", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: { botToken: "abc" }, + }, + ], + }), + ]); + const plan = computeDiff(desired, emptyRemote()); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("create"); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("update with willRestart when config changes", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: { botToken: "new" }, + }, + ], + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([ + [ + "triage", + [ + { + id: "triage-telegram", + platform: "telegram", + config: { botToken: "old" }, + }, + ], + ], + ]), + }; + const plan = computeDiff(desired, remote); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("update"); + if (platformRow?.kind === "platform") { + expect(platformRow.willRestart).toBe(true); + } + expect(renderPlan(plan)).toMatchSnapshot(); + }); }); describe("apply diff — memory schema", () => { - test("creates entity + relationship types", () => { - const desired: DesiredState = { - agents: [], - memorySchema: { - entityTypes: [{ slug: "company", name: "Company", required: ["name"] }], - relationshipTypes: [ - { - slug: "works_at", - name: "Works At", - rules: [{ source: "person", target: "company" }], - }, - ], - }, - watchers: [], - requiredSecrets: [], - }; - const plan = computeDiff(desired, emptyRemote()); - expect(plan.counts.create).toBe(2); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - test("noop when remote matches", () => { - const desired: DesiredState = { - agents: [], - memorySchema: { - entityTypes: [{ slug: "company", name: "Company" }], - relationshipTypes: [], - }, - watchers: [], - requiredSecrets: [], - }; - const remote: RemoteSnapshot = { - ...emptyRemote(), - entityTypes: [{ slug: "company", name: "Company" }], - }; - const plan = computeDiff(desired, remote); - expect(plan.counts.noop).toBe(1); - expect(plan.counts.update).toBe(0); - }); + test("creates entity + relationship types", () => { + const desired: DesiredState = { + agents: [], + memorySchema: { + entityTypes: [{ slug: "company", name: "Company", required: ["name"] }], + relationshipTypes: [ + { + slug: "works_at", + name: "Works At", + rules: [{ source: "person", target: "company" }], + }, + ], + }, + watchers: [], + requiredSecrets: [], + }; + const plan = computeDiff(desired, emptyRemote()); + expect(plan.counts.create).toBe(2); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + test("noop when remote matches", () => { + const desired: DesiredState = { + agents: [], + memorySchema: { + entityTypes: [{ slug: "company", name: "Company" }], + relationshipTypes: [], + }, + watchers: [], + requiredSecrets: [], + }; + const remote: RemoteSnapshot = { + ...emptyRemote(), + entityTypes: [{ slug: "company", name: "Company" }], + }; + const plan = computeDiff(desired, remote); + expect(plan.counts.noop).toBe(1); + expect(plan.counts.update).toBe(0); + }); }); describe("apply diff — empty container preservation", () => { - // Bug fix: previously canonical() collapsed [] and {} to null, which - // meant clearing a remote allowlist by setting it to [] silently - // round-tripped as a noop instead of an update. - test("clearing networkConfig.allowedDomains from non-empty to [] is an update", () => { - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: [] }, - }, - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["foo.com"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desired, remote); - const settingsRow = plan.rows.find((r) => r.kind === "settings"); - expect(settingsRow?.verb).toBe("update"); - if (settingsRow?.kind === "settings") { - expect(settingsRow.changedFields).toContain("networkConfig"); - } - }); - - test("[] is not equal to null (preserved as distinct values)", () => { - // When desired sets allowedDomains: [] and remote has the field - // missing entirely, the diff should still treat them as equivalent - // for the case where remote literally doesn't have the field — but - // [] vs the explicit array ["foo"] must differ. - const desiredEmpty = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - settings: { - networkConfig: { allowedDomains: [] }, - }, - }), - ]); - const remoteWithItems: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([ - [ - "triage", - { - networkConfig: { allowedDomains: ["x.com"] }, - updatedAt: 0, - }, - ], - ]), - platformsByAgent: new Map([["triage", []]]), - }; - const plan = computeDiff(desiredEmpty, remoteWithItems); - expect(plan.counts.update).toBeGreaterThan(0); - }); - - test("{} is not equal to populated object", () => { - // empty config object vs populated config object must show as drift/update - const desired = buildState([ - buildDesiredAgent("triage", { - metadata: { agentId: "triage", name: "Triage" }, - platforms: [ - { - stableId: "triage-telegram", - type: "telegram", - config: {}, - }, - ], - }), - ]); - const remote: RemoteSnapshot = { - ...emptyRemote(), - agents: [{ agentId: "triage", name: "Triage" }], - agentSettings: new Map([["triage", null]]), - platformsByAgent: new Map([ - [ - "triage", - [ - { - id: "triage-telegram", - platform: "telegram", - config: { botToken: "abc" }, - }, - ], - ], - ]), - }; - const plan = computeDiff(desired, remote); - const platformRow = plan.rows.find((r) => r.kind === "platform"); - expect(platformRow?.verb).toBe("update"); - }); + // Bug fix: previously canonical() collapsed [] and {} to null, which + // meant clearing a remote allowlist by setting it to [] silently + // round-tripped as a noop instead of an update. + test("clearing networkConfig.allowedDomains from non-empty to [] is an update", () => { + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: [] }, + }, + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["foo.com"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desired, remote); + const settingsRow = plan.rows.find((r) => r.kind === "settings"); + expect(settingsRow?.verb).toBe("update"); + if (settingsRow?.kind === "settings") { + expect(settingsRow.changedFields).toContain("networkConfig"); + } + }); + + test("[] is not equal to null (preserved as distinct values)", () => { + // When desired sets allowedDomains: [] and remote has the field + // missing entirely, the diff should still treat them as equivalent + // for the case where remote literally doesn't have the field — but + // [] vs the explicit array ["foo"] must differ. + const desiredEmpty = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + settings: { + networkConfig: { allowedDomains: [] }, + }, + }), + ]); + const remoteWithItems: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([ + [ + "triage", + { + networkConfig: { allowedDomains: ["x.com"] }, + updatedAt: 0, + }, + ], + ]), + platformsByAgent: new Map([["triage", []]]), + }; + const plan = computeDiff(desiredEmpty, remoteWithItems); + expect(plan.counts.update).toBeGreaterThan(0); + }); + + test("{} is not equal to populated object", () => { + // empty config object vs populated config object must show as drift/update + const desired = buildState([ + buildDesiredAgent("triage", { + metadata: { agentId: "triage", name: "Triage" }, + platforms: [ + { + stableId: "triage-telegram", + type: "telegram", + config: {}, + }, + ], + }), + ]); + const remote: RemoteSnapshot = { + ...emptyRemote(), + agents: [{ agentId: "triage", name: "Triage" }], + agentSettings: new Map([["triage", null]]), + platformsByAgent: new Map([ + [ + "triage", + [ + { + id: "triage-telegram", + platform: "telegram", + config: { botToken: "abc" }, + }, + ], + ], + ]), + }; + const plan = computeDiff(desired, remote); + const platformRow = plan.rows.find((r) => r.kind === "platform"); + expect(platformRow?.verb).toBe("update"); + }); }); describe("apply diff — watchers", () => { - const desiredWatcher = { - slug: "weekly-digest", - agent: "triage", - name: "Weekly digest", - prompt: "Produce a digest.", - extractionSchema: { type: "object" as const }, - schedule: "0 9 * * 1", - }; - - test("create when watcher missing remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const plan = computeDiff(desired, emptyRemote()); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("create"); - expect(row?.id).toBe("weekly-digest"); - }); - - test("noop when remote matches every field the diff covers", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Produce a digest.", - extraction_schema: { type: "object" }, - schedule: "0 9 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("noop"); - expect(plan.counts.create).toBe(0); - }); - - test("update with scalar drift when schedule changes remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Produce a digest.", - extraction_schema: { type: "object" }, - schedule: "0 10 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("update"); - expect(row?.changedFields).toContain("schedule"); - expect( - (row as { versionBoundFields?: string[] }).versionBoundFields, - ).toBeUndefined(); - }); - - test("update with version-bound drift when prompt changes remotely", () => { - const desired = buildState([], { watchers: [desiredWatcher] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Old prompt", - extraction_schema: { type: "object" }, - schedule: "0 9 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("update"); - expect( - (row as { versionBoundFields?: string[] }).versionBoundFields, - ).toEqual(["prompt"]); - }); - - test("reaction_script declared → always re-pushed (idempotent)", () => { - const desired = buildState([], { - watchers: [ - { - ...desiredWatcher, - reactionScript: { - sourcePath: "/abs/path/r.ts", - sourceCode: "export default async () => {};", - }, - }, - ], - }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [ - { - slug: "weekly-digest", - name: "Weekly digest", - agent_id: "triage", - prompt: "Produce a digest.", - extraction_schema: { type: "object" }, - schedule: "0 9 * * 1", - }, - ], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("update"); - expect(row?.changedFields).toEqual(["reaction_script"]); - expect( - (row as { reactionScriptDeclared?: boolean }).reactionScriptDeclared, - ).toBe(true); - }); - - test("drift when remote watcher not declared in models", () => { - const desired = buildState([], { watchers: [] }); - const remote: RemoteSnapshot = { - ...emptyRemote(), - watchers: [{ slug: "orphan-watcher" }], - }; - const plan = computeDiff(desired, remote); - const row = plan.rows.find((r) => r.kind === "watcher"); - expect(row?.verb).toBe("drift"); - expect(plan.counts.drift).toBe(1); - }); + const desiredWatcher = { + slug: "weekly-digest", + agent: "triage", + name: "Weekly digest", + prompt: "Produce a digest.", + extractionSchema: { type: "object" as const }, + schedule: "0 9 * * 1", + }; + + test("create when watcher missing remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const plan = computeDiff(desired, emptyRemote()); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("create"); + expect(row?.id).toBe("weekly-digest"); + }); + + test("noop when remote matches every field the diff covers", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("noop"); + expect(plan.counts.create).toBe(0); + }); + + test("update with scalar drift when schedule changes remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 10 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect(row?.changedFields).toContain("schedule"); + expect( + (row as { versionBoundFields?: string[] }).versionBoundFields + ).toBeUndefined(); + }); + + test("update with version-bound drift when prompt changes remotely", () => { + const desired = buildState([], { watchers: [desiredWatcher] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Old prompt", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect( + (row as { versionBoundFields?: string[] }).versionBoundFields + ).toEqual(["prompt"]); + }); + + test("reaction_script declared → always re-pushed (idempotent)", () => { + const desired = buildState([], { + watchers: [ + { + ...desiredWatcher, + reactionScript: { + sourcePath: "/abs/path/r.ts", + sourceCode: "export default async () => {};", + }, + }, + ], + }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [ + { + slug: "weekly-digest", + name: "Weekly digest", + agent_id: "triage", + prompt: "Produce a digest.", + extraction_schema: { type: "object" }, + schedule: "0 9 * * 1", + }, + ], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("update"); + expect(row?.changedFields).toEqual(["reaction_script"]); + expect( + (row as { reactionScriptDeclared?: boolean }).reactionScriptDeclared + ).toBe(true); + }); + + test("drift when remote watcher not declared in models", () => { + const desired = buildState([], { watchers: [] }); + const remote: RemoteSnapshot = { + ...emptyRemote(), + watchers: [{ slug: "orphan-watcher" }], + }; + const plan = computeDiff(desired, remote); + const row = plan.rows.find((r) => r.kind === "watcher"); + expect(row?.verb).toBe("drift"); + expect(plan.counts.drift).toBe(1); + }); }); describe("renderSummary", () => { - test("renders zero-row plan", () => { - const desired = buildState([]); - const plan = computeDiff(desired, emptyRemote()); - expect(renderSummary(plan)).toMatchSnapshot(); - }); + test("renders zero-row plan", () => { + const desired = buildState([]); + const plan = computeDiff(desired, emptyRemote()); + expect(renderSummary(plan)).toMatchSnapshot(); + }); }); describe("apply diff — connectors", () => { - const builtinConnectorDef = { - key: "hackernews", - name: "Hacker News", - installed: false, - installable: true, - }; - - function connectorState() { - return buildState([], { - connectors: { - definitions: [ - { - key: "acme", - sourcePath: "/proj/connectors/acme.connector.ts", - sourceCode: "export default class {}", - sourceFile: "connectors/acme.connector.ts", - }, - ], - authProfiles: [ - { - slug: "hn-token", - connector: "hackernews", - kind: "env" as const, - name: "HN token", - credentials: { HN_TOKEN: "$HN_TOKEN" }, - sourceFile: "connectors/hackernews.yaml", - }, - { - slug: "x-account", - connector: "x", - kind: "oauth_account" as const, - sourceFile: "connectors/x.yaml", - }, - ], - connections: [ - { - slug: "hn-frontpage", - connector: "hackernews", - name: "HN front page", - authProfileSlug: "hn-token", - feeds: [{ feedKey: "stories", schedule: "0 * * * *" }], - sourceFile: "connectors/hackernews.yaml", - }, - ], - }, - }); - } - - test("create verbs for new connector def, auth profile, connection, feed", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - }); - const def = plan.rows.find((r) => r.kind === "connector-definition"); - expect(def?.verb).toBe("create"); - const authEnv = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "hn-token", - ); - expect(authEnv?.verb).toBe("create"); - const authOauth = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "x-account", - ); - expect(authOauth?.verb).toBe("create"); - expect( - authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined, - ).toBe(true); - const conn = plan.rows.find((r) => r.kind === "connection"); - expect(conn?.verb).toBe("create"); - const feed = plan.rows.find((r) => r.kind === "feed"); - expect(feed?.verb).toBe("create"); - expect(feed?.id).toBe("hn-frontpage/stories"); - }); - - test("noop when connection + feed already match remotely", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "active", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 * * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - expect(plan.rows.find((r) => r.kind === "connection")?.verb).toBe("noop"); - expect(plan.rows.find((r) => r.kind === "feed")?.verb).toBe("noop"); - expect( - plan.rows.find((r) => r.kind === "auth-profile" && r.id === "x-account") - ?.verb, - ).toBe("noop"); - }); - - test("update when feed schedule changes; needs-auth when oauth profile inactive", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "pending_auth", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 0 * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - const feed = plan.rows.find((r) => r.kind === "feed"); - expect(feed?.verb).toBe("update"); - expect(feed && "changedFields" in feed ? feed.changedFields : []).toEqual([ - "schedule", - ]); - const authOauth = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "x-account", - ); - expect( - authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined, - ).toBe(true); - }); - - test("undeclared remote connector becomes an informational note (no uninstall)", () => { - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [ - builtinConnectorDef, - { - key: "legacy", - name: "Legacy", - installed: true, - installable: false, - }, - ], - }; - const plan = computeDiff(connectorState(), remote); - expect(plan.notes.some((n) => n.includes('"legacy"'))).toBe(true); - expect( - plan.rows.some( - (r) => r.kind === "connector-definition" && r.id === "legacy", - ), - ).toBe(false); - }); - - test("connectors are skipped when --only is set", () => { - const plan = computeDiff(connectorState(), emptyRemote(), { - only: "agents", - }); - expect(plan.rows.some((r) => r.kind === "connection")).toBe(false); - expect(plan.rows.some((r) => r.kind === "connector-definition")).toBe( - false, - ); - }); - - test("render includes the connectors sections", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - }); - expect(renderPlan(plan)).toMatchSnapshot(); - }); - - // ── round-2 ────────────────────────────────────────────────────────────── - - test("connection slug bound to a different connector remotely is a hard error", () => { - expect(() => - computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - connections: [ - { - id: 9, - slug: "hn-frontpage", - connector_key: "rss", - status: "active", - auth_profile_slug: null, - app_auth_profile_slug: null, - config: {}, - }, - ], - }), - ).toThrow(/bound to connector "rss" remotely.*declares "hackernews"/); - }); - - test("auth-profile slug bound to a different kind remotely is a hard error", () => { - expect(() => - computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - connector_key: "hackernews", - profile_kind: "oauth_app", - status: "active", - }, - ], - }), - ).toThrow(/auth_profile "hn-token" is bound to hackernews\/oauth_app/); - }); - - test("credential rotation re-pushes: env profile shows update (credentials)", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - ], - }); - const row = plan.rows.find( - (r) => r.kind === "auth-profile" && r.id === "hn-token", - ); - expect(row?.verb).toBe("update"); - expect(row && "changedFields" in row ? row.changedFields : []).toContain( - "credentials", - ); - }); - - test("a fully-converged remote state produces no connector create/update (except idempotent connector-def re-push)", () => { - // Build a remote snapshot that exactly mirrors connectorState(): the env - // auth profile has no declared-credential drift suppression, so it would - // re-push (update credentials). The acme connector def is installed, so it - // shows as a (no-op-on-server) "update". Everything else is noop. - const remote: RemoteSnapshot = { - ...emptyRemote(), - connectorDefinitions: [ - { key: "hackernews", installed: false, installable: true }, - { key: "x", installed: false, installable: true }, - { key: "acme", installed: true, installable: false }, - ], - authProfiles: [ - { - slug: "hn-token", - display_name: "HN token", - connector_key: "hackernews", - profile_kind: "env", - status: "active", - }, - { - slug: "x-account", - connector_key: "x", - profile_kind: "oauth_account", - status: "active", - }, - ], - connections: [ - { - id: 7, - slug: "hn-frontpage", - connector_key: "hackernews", - display_name: "HN front page", - status: "active", - auth_profile_slug: "hn-token", - app_auth_profile_slug: null, - config: {}, - }, - ], - feedsByConnectionId: new Map([ - [ - 7, - [ - { - id: 11, - connection_id: 7, - feed_key: "stories", - status: "active", - schedule: "0 * * * *", - config: {}, - }, - ], - ], - ]), - }; - const plan = computeDiff(connectorState(), remote); - // Only "update" rows allowed: the connector-def re-push and the - // env-credential re-push — both idempotent on the server. - const nonIdempotentChurn = plan.rows.filter( - (r) => - (r.verb === "create" || r.verb === "update") && - !(r.kind === "connector-definition") && - !(r.kind === "auth-profile" && r.id === "hn-token"), - ); - expect(nonIdempotentChurn).toEqual([]); - expect(plan.notes).toEqual([]); - }); - - test("connector-definition with an already-installed key renders as update, not create", () => { - const installedAcme = { key: "acme", installed: true, installable: false }; - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [builtinConnectorDef, installedAcme], - }); - // connectorState()'s acme def has key:"acme"; it is installed remotely. - const row = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), - ); - expect(row?.verb).toBe("update"); - }); - - // ── round-4 ────────────────────────────────────────────────────────────── - - test("referenced-but-not-installed bundled connector becomes a connector-definition create row", () => { - const plan = computeDiff(connectorState(), { - ...emptyRemote(), - connectorDefinitions: [ - // hackernews: installable + has a server-side source_uri, not installed - { - key: "hackernews", - installed: false, - installable: true, - source_uri: "file:///app/connectors/hackernews.ts", - }, - // x: same - { - key: "x", - installed: false, - installable: true, - source_uri: "file:///app/connectors/x.ts", - }, - ], - }); - const hn = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id === "hackernews", - ); - expect(hn?.verb).toBe("create"); - const x = plan.rows.find( - (r) => r.kind === "connector-definition" && r.id === "x", - ); - expect(x?.verb).toBe("create"); - // acme is locally declared (sourcePath) — it still gets its own row. - expect( - plan.rows.some( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), - ), - ).toBe(true); - }); - - test("a locally-supplied connector key is NOT also a bundled-install row (no double mutation)", () => { - // Pretend "acme" is *also* in the bundled catalog with a source_uri; the - // local .connector.ts should win — no bundled row for "acme". - const state = connectorState(); - // Make a connection reference "acme" so it's in referencedConnectorKeys. - state.connectors.connections.push({ - slug: "acme-conn", - connector: "acme", - feeds: [], - sourceFile: "connectors/acme.yaml", - }); - const plan = computeDiff(state, { - ...emptyRemote(), - connectorDefinitions: [ - { - key: "acme", - installed: false, - installable: true, - source_uri: "file:///app/connectors/acme.ts", - }, - ], - }); - const acmeRows = plan.rows.filter( - (r) => r.kind === "connector-definition" && r.id?.startsWith("acme"), - ); - // Exactly one row — the locally-declared def — never a bundled duplicate. - expect(acmeRows).toHaveLength(1); - }); + const builtinConnectorDef = { + key: "hackernews", + name: "Hacker News", + installed: false, + installable: true, + }; + + function connectorState() { + return buildState([], { + connectors: { + definitions: [ + { + key: "acme", + sourcePath: "/proj/connectors/acme.connector.ts", + sourceCode: "export default class {}", + sourceFile: "connectors/acme.connector.ts", + }, + ], + authProfiles: [ + { + slug: "hn-token", + connector: "hackernews", + kind: "env" as const, + name: "HN token", + credentials: { HN_TOKEN: "$HN_TOKEN" }, + sourceFile: "connectors/hackernews.yaml", + }, + { + slug: "x-account", + connector: "x", + kind: "oauth_account" as const, + sourceFile: "connectors/x.yaml", + }, + ], + connections: [ + { + slug: "hn-frontpage", + connector: "hackernews", + name: "HN front page", + authProfileSlug: "hn-token", + feeds: [{ feedKey: "stories", schedule: "0 * * * *" }], + sourceFile: "connectors/hackernews.yaml", + }, + ], + }, + }); + } + + test("create verbs for new connector def, auth profile, connection, feed", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + }); + const def = plan.rows.find((r) => r.kind === "connector-definition"); + expect(def?.verb).toBe("create"); + const authEnv = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "hn-token" + ); + expect(authEnv?.verb).toBe("create"); + const authOauth = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "x-account" + ); + expect(authOauth?.verb).toBe("create"); + expect( + authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined + ).toBe(true); + const conn = plan.rows.find((r) => r.kind === "connection"); + expect(conn?.verb).toBe("create"); + const feed = plan.rows.find((r) => r.kind === "feed"); + expect(feed?.verb).toBe("create"); + expect(feed?.id).toBe("hn-frontpage/stories"); + }); + + test("noop when connection + feed already match remotely", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "active", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 * * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + expect(plan.rows.find((r) => r.kind === "connection")?.verb).toBe("noop"); + expect(plan.rows.find((r) => r.kind === "feed")?.verb).toBe("noop"); + expect( + plan.rows.find((r) => r.kind === "auth-profile" && r.id === "x-account") + ?.verb + ).toBe("noop"); + }); + + test("update when feed schedule changes; needs-auth when oauth profile inactive", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "pending_auth", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 0 * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + const feed = plan.rows.find((r) => r.kind === "feed"); + expect(feed?.verb).toBe("update"); + expect(feed && "changedFields" in feed ? feed.changedFields : []).toEqual([ + "schedule", + ]); + const authOauth = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "x-account" + ); + expect( + authOauth && "needsAuth" in authOauth ? authOauth.needsAuth : undefined + ).toBe(true); + }); + + test("undeclared remote connector becomes an informational note (no uninstall)", () => { + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [ + builtinConnectorDef, + { + key: "legacy", + name: "Legacy", + installed: true, + installable: false, + }, + ], + }; + const plan = computeDiff(connectorState(), remote); + expect(plan.notes.some((n) => n.includes('"legacy"'))).toBe(true); + expect( + plan.rows.some( + (r) => r.kind === "connector-definition" && r.id === "legacy" + ) + ).toBe(false); + }); + + test("connectors are skipped when --only is set", () => { + const plan = computeDiff(connectorState(), emptyRemote(), { + only: "agents", + }); + expect(plan.rows.some((r) => r.kind === "connection")).toBe(false); + expect(plan.rows.some((r) => r.kind === "connector-definition")).toBe( + false + ); + }); + + test("render includes the connectors sections", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + }); + expect(renderPlan(plan)).toMatchSnapshot(); + }); + + // ── round-2 ────────────────────────────────────────────────────────────── + + test("connection slug bound to a different connector remotely is a hard error", () => { + expect(() => + computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + connections: [ + { + id: 9, + slug: "hn-frontpage", + connector_key: "rss", + status: "active", + auth_profile_slug: null, + app_auth_profile_slug: null, + config: {}, + }, + ], + }) + ).toThrow(/bound to connector "rss" remotely.*declares "hackernews"/); + }); + + test("auth-profile slug bound to a different kind remotely is a hard error", () => { + expect(() => + computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + connector_key: "hackernews", + profile_kind: "oauth_app", + status: "active", + }, + ], + }) + ).toThrow(/auth_profile "hn-token" is bound to hackernews\/oauth_app/); + }); + + test("credential rotation re-pushes: env profile shows update (credentials)", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + ], + }); + const row = plan.rows.find( + (r) => r.kind === "auth-profile" && r.id === "hn-token" + ); + expect(row?.verb).toBe("update"); + expect(row && "changedFields" in row ? row.changedFields : []).toContain( + "credentials" + ); + }); + + test("a fully-converged remote state produces no connector create/update (except idempotent connector-def re-push)", () => { + // Build a remote snapshot that exactly mirrors connectorState(): the env + // auth profile has no declared-credential drift suppression, so it would + // re-push (update credentials). The acme connector def is installed, so it + // shows as a (no-op-on-server) "update". Everything else is noop. + const remote: RemoteSnapshot = { + ...emptyRemote(), + connectorDefinitions: [ + { key: "hackernews", installed: false, installable: true }, + { key: "x", installed: false, installable: true }, + { key: "acme", installed: true, installable: false }, + ], + authProfiles: [ + { + slug: "hn-token", + display_name: "HN token", + connector_key: "hackernews", + profile_kind: "env", + status: "active", + }, + { + slug: "x-account", + connector_key: "x", + profile_kind: "oauth_account", + status: "active", + }, + ], + connections: [ + { + id: 7, + slug: "hn-frontpage", + connector_key: "hackernews", + display_name: "HN front page", + status: "active", + auth_profile_slug: "hn-token", + app_auth_profile_slug: null, + config: {}, + }, + ], + feedsByConnectionId: new Map([ + [ + 7, + [ + { + id: 11, + connection_id: 7, + feed_key: "stories", + status: "active", + schedule: "0 * * * *", + config: {}, + }, + ], + ], + ]), + }; + const plan = computeDiff(connectorState(), remote); + // Only "update" rows allowed: the connector-def re-push and the + // env-credential re-push — both idempotent on the server. + const nonIdempotentChurn = plan.rows.filter( + (r) => + (r.verb === "create" || r.verb === "update") && + !(r.kind === "connector-definition") && + !(r.kind === "auth-profile" && r.id === "hn-token") + ); + expect(nonIdempotentChurn).toEqual([]); + expect(plan.notes).toEqual([]); + }); + + test("connector-definition with an already-installed key renders as update, not create", () => { + const installedAcme = { key: "acme", installed: true, installable: false }; + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [builtinConnectorDef, installedAcme], + }); + // connectorState()'s acme def has key:"acme"; it is installed remotely. + const row = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") + ); + expect(row?.verb).toBe("update"); + }); + + // ── round-4 ────────────────────────────────────────────────────────────── + + test("referenced-but-not-installed bundled connector becomes a connector-definition create row", () => { + const plan = computeDiff(connectorState(), { + ...emptyRemote(), + connectorDefinitions: [ + // hackernews: installable + has a server-side source_uri, not installed + { + key: "hackernews", + installed: false, + installable: true, + source_uri: "file:///app/connectors/hackernews.ts", + }, + // x: same + { + key: "x", + installed: false, + installable: true, + source_uri: "file:///app/connectors/x.ts", + }, + ], + }); + const hn = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id === "hackernews" + ); + expect(hn?.verb).toBe("create"); + const x = plan.rows.find( + (r) => r.kind === "connector-definition" && r.id === "x" + ); + expect(x?.verb).toBe("create"); + // acme is locally declared (sourcePath) — it still gets its own row. + expect( + plan.rows.some( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") + ) + ).toBe(true); + }); + + test("a locally-supplied connector key is NOT also a bundled-install row (no double mutation)", () => { + // Pretend "acme" is *also* in the bundled catalog with a source_uri; the + // local .connector.ts should win — no bundled row for "acme". + const state = connectorState(); + // Make a connection reference "acme" so it's in referencedConnectorKeys. + state.connectors.connections.push({ + slug: "acme-conn", + connector: "acme", + feeds: [], + sourceFile: "connectors/acme.yaml", + }); + const plan = computeDiff(state, { + ...emptyRemote(), + connectorDefinitions: [ + { + key: "acme", + installed: false, + installable: true, + source_uri: "file:///app/connectors/acme.ts", + }, + ], + }); + const acmeRows = plan.rows.filter( + (r) => r.kind === "connector-definition" && r.id?.startsWith("acme") + ); + // Exactly one row — the locally-declared def — never a bundled duplicate. + expect(acmeRows).toHaveLength(1); + }); }); diff --git a/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts b/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts index f1bbe9158..3d573a895 100644 --- a/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts +++ b/packages/server/src/gateway/__tests__/connections-platform-isolation.test.ts @@ -47,6 +47,10 @@ function makeConnection( ): PlatformConnection { return { id: connectionId, + // Required by the bridge's per-tenant `pending_interactions` write — + // a connection without an org would be dropped before `resolveThread` + // (where `instanceChat.channel()` is asserted). + organizationId: "test-org", platform, config: { platform } as PlatformAdapterConfig, settings: {},