diff --git a/packages/owletto b/packages/owletto index 8a3c9bbe1..dcb2172f4 160000 --- a/packages/owletto +++ b/packages/owletto @@ -1 +1 @@ -Subproject commit 8a3c9bbe1fbee757419048095db45309c938fa03 +Subproject commit dcb2172f44408ff234a0fc9d53278828abd6c7ea diff --git a/packages/server/src/gateway/__tests__/multi-tenant-isolation-reproducers.test.ts b/packages/server/src/gateway/__tests__/multi-tenant-isolation-reproducers.test.ts index c99cd37d3..f162f77e6 100644 --- a/packages/server/src/gateway/__tests__/multi-tenant-isolation-reproducers.test.ts +++ b/packages/server/src/gateway/__tests__/multi-tenant-isolation-reproducers.test.ts @@ -135,6 +135,103 @@ describe("[finding 1] lookupPlaceholderMapping enforces caller's expected org", expect(mapping?.organizationId).toBe("org-a"); }); + // Fix #2 — legacy mapping bypass. + // + // Pre-fix: the org check was gated on `mapping.organizationId` being set, + // so a legacy mapping minted before the org-id pivot (no organizationId) + // sailed through under any expected org. A worker from org B could + // resolve a legacy mapping owned by org A under org B's request URL. + // Post-fix: presence-on-either-side forces a match — a legacy mapping + // can no longer be resolved under any expected-org context. + test("legacy mapping (no organizationId) is rejected when expected org is set", () => { + // Mint a "legacy" placeholder — no organizationId option. Pre-pivot + // shape that may still be in flight from older mint sites. + const placeholder = generatePlaceholder( + "agent-legacy", + "OPENAI_API_KEY", + createBuiltinSecretRef("deployments/agent-legacy/OPENAI_API_KEY"), + "deploy-legacy" + // no { organizationId } — this is the legacy shape. + ); + // A caller from org B presents this legacy placeholder. Pre-fix the + // check skipped entirely because `mapping.organizationId` was + // undefined. Post-fix: the check fires and the lookup rejects. + expect(lookupPlaceholderMapping(placeholder, "org-b")).toBeNull(); + }); + + // Legacy-mapping access with no expected org still resolves (so existing + // call sites that don't yet thread expectedOrganizationId aren't broken). + // The WARN log is the deprecation signal — we don't assert on it here, + // but it's emitted on every such call. + test("legacy mapping with no expected org still resolves (warn path)", () => { + const placeholder = generatePlaceholder( + "agent-legacy-2", + "OPENAI_API_KEY", + createBuiltinSecretRef("deployments/agent-legacy-2/OPENAI_API_KEY"), + "deploy-legacy-2" + ); + const mapping = lookupPlaceholderMapping(placeholder); + expect(mapping?.agentId).toBe("agent-legacy-2"); + expect(mapping?.organizationId).toBeUndefined(); + }); + + // Fix #1 — fail-closed on agentOrgResolver DB error. + // + // Pre-fix the catch block logged a warning and fell through with + // `expectedOrganizationId = undefined`. A worker from any org could + // present a placeholder during a transient DB error window and the + // downstream binding step would never get its org-anchor → potential + // cross-tenant access. Post-fix: 503 on resolver error. + test("SecretProxy.forward rejects with 503 when agentOrgResolver throws", async () => { + const placeholder = generatePlaceholder( + "agent-x", + "OPENAI_API_KEY", + createBuiltinSecretRef("deployments/agent-x/OPENAI_API_KEY"), + "deploy-x", + { organizationId: "org-a" } + ); + + const stubStore: SecretStore = { get: async () => "real-secret-x" }; + const proxy = new SecretProxy( + { defaultUpstreamUrl: "https://upstream.example.com" }, + stubStore + ); + proxy.registerUpstream( + { slug: "openai", upstreamBaseUrl: "https://api.openai.example.com" }, + "openai" + ); + // The resolver throws — simulates a transient DB error in + // `agentOrgResolver`. Pre-fix: warning + fall through, request + // forwarded with no org expectation. Post-fix: 503. + proxy.setAgentOrgResolver(async () => { + throw new Error("simulated DB hiccup"); + }); + + let upstreamCalled = false; + const originalFetch = globalThis.fetch; + globalThis.fetch = async () => { + upstreamCalled = true; + return new Response("{}", { status: 200 }); + }; + + try { + const res = await proxy + .getApp() + .request("/api/proxy/openai/a/agent-x/v1/chat/completions", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: `Bearer ${placeholder}`, + }, + body: JSON.stringify({ prompt: "test" }), + }); + expect(res.status).toBe(503); + expect(upstreamCalled).toBe(false); + } finally { + globalThis.fetch = originalFetch; + } + }); + test("SecretProxy.forward rejects an org-A placeholder used on an org-B agent's URL", async () => { // Mint a placeholder for org A's `agent-A1`. Pre-fix, no production call // site supplied `expectedOrganizationId`, so `lookupPlaceholderMapping` diff --git a/packages/server/src/gateway/__tests__/pending-interaction-cleanup.test.ts b/packages/server/src/gateway/__tests__/pending-interaction-cleanup.test.ts new file mode 100644 index 000000000..831e4c5e6 --- /dev/null +++ b/packages/server/src/gateway/__tests__/pending-interaction-cleanup.test.ts @@ -0,0 +1,312 @@ +/** + * Cleanup-path tests for `pending_interactions`: + * + * - Retry-on-conflict preserves `created_at`. Pre-fix, `ON CONFLICT + * … created_at = now()` reset the 24h TTL clock on every webhook retry, + * so a misbehaving retry loop could keep the same row alive + * indefinitely. Post-fix: `created_at` survives retries. + * - `sweepStalePendingInteractions` honours its LIMIT. Pre-fix the DELETE + * was unbounded, so a multi-million-row backlog could lock the table. + * Post-fix: default 1000 / configurable cap, remaining rows drain + * across subsequent cycles. + * - `deletePendingQuestion` hard-deletes the row instead of just setting + * `claimed_at`. Pre-fix, the post-failure drop path called + * `claimPendingQuestion` (UPDATE), leaving a stale row sitting there + * until the 24h sweep. Post-fix: row count goes to 0 immediately. + */ +import { beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import { getDb } from "../../db/client.js"; +import { registerInteractionBridge } from "../connections/interaction-bridge.js"; +import { + claimPendingQuestion, + deletePendingQuestion, + storePendingQuestion, + sweepStalePendingInteractions, +} from "../connections/pending-interaction-store.js"; +import { InteractionService, type PostedQuestion } from "../interactions.js"; +import { + ensurePgliteForGatewayTests, + resetTestDatabase, +} from "./helpers/db-setup.js"; + +const ORG_A = "org-a"; +const CONN_A = "conn-a"; +const USER_A = "U_A"; + +async function seedOrg(id: string): Promise { + const sql = getDb(); + await sql` + INSERT INTO organization (id, name, slug) + VALUES (${id}, ${id}, ${id}) + ON CONFLICT (id) DO NOTHING + `; +} + +function buildQuestion(id: string, userId = USER_A): PostedQuestion { + return { + id, + teamId: undefined, + channelId: "C1", + conversationId: "C1", + userId, + platform: "slack", + question: "go?", + options: ["yes", "no"], + } as PostedQuestion; +} + +describe("pending-interaction-store cleanup paths", () => { + beforeAll(async () => { + await ensurePgliteForGatewayTests(); + }); + beforeEach(async () => { + await resetTestDatabase(); + await seedOrg(ORG_A); + }); + + // Fix #3 — ON CONFLICT no longer resets created_at. + test("retry storePendingQuestion preserves the original created_at", async () => { + const sql = getDb(); + const q = buildQuestion("q-retry"); + + // First persist. + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + const initial = await sql<{ created_at: Date }>` + SELECT created_at FROM pending_interactions WHERE id = ${q.id} + `; + const originalCreatedAt = new Date(initial[0]!.created_at).getTime(); + + // Backdate the row so a "retry preserves created_at" assertion is + // distinguishable from "row was just inserted now()". Without this the + // pre/post-fix versions could both end up at now() within the same ms. + await sql` + UPDATE pending_interactions + SET created_at = now() - interval '1 hour' + WHERE id = ${q.id} + `; + const backdated = await sql<{ created_at: Date }>` + SELECT created_at FROM pending_interactions WHERE id = ${q.id} + `; + const backdatedTs = new Date(backdated[0]!.created_at).getTime(); + expect(backdatedTs).toBeLessThan(originalCreatedAt); + + // Webhook retry — same id, same scope, slightly different payload. + const retry = { ...q, question: "go? (retry)" }; + await storePendingQuestion(retry.id, ORG_A, CONN_A, USER_A, { + question: retry, + }); + + const after = await sql<{ created_at: Date; claimed_at: Date | null }>` + SELECT created_at, claimed_at + FROM pending_interactions WHERE id = ${q.id} + `; + const afterTs = new Date(after[0]!.created_at).getTime(); + + // Pre-fix: this assertion fails — the ON CONFLICT clause moved + // created_at to now() and `afterTs` ≈ now() ≫ `backdatedTs`. + // Post-fix: created_at is unchanged across retries. + expect(afterTs).toBe(backdatedTs); + // claimed_at is still reset on conflict so a legitimate retry can + // be claimed. + expect(after[0]!.claimed_at).toBeNull(); + }); + + // Fix #6 — bounded sweep. + test("sweepStalePendingInteractions honours the LIMIT and remainder drains next cycle", async () => { + const sql = getDb(); + const total = 25; + for (let i = 0; i < total; i++) { + const q = buildQuestion(`q-${i}`); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + } + // Backdate all rows past the cutoff. + await sql` + UPDATE pending_interactions + SET created_at = now() - interval '48 hours' + `; + + // First sweep caps at LIMIT=10. + const first = await sweepStalePendingInteractions( + 24 * 60 * 60 * 1000, + 10 + ); + expect(first).toHaveLength(10); + + const remainingAfterFirst = await sql<{ c: number }>` + SELECT COUNT(*)::int AS c FROM pending_interactions + `; + expect(remainingAfterFirst[0]!.c).toBe(total - 10); + + // Second sweep drains the rest (LIMIT > remaining). + const second = await sweepStalePendingInteractions( + 24 * 60 * 60 * 1000, + 100 + ); + expect(second).toHaveLength(total - 10); + + const remainingAfterSecond = await sql<{ c: number }>` + SELECT COUNT(*)::int AS c FROM pending_interactions + `; + expect(remainingAfterSecond[0]!.c).toBe(0); + }); + + // Fix #7 — drop-on-post-failure uses DELETE not claim. + test("deletePendingQuestion hard-deletes the row, not claim-then-leave", async () => { + const sql = getDb(); + const q = buildQuestion("q-drop"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + const beforeDrop = await sql<{ c: number }>` + SELECT COUNT(*)::int AS c FROM pending_interactions WHERE id = ${q.id} + `; + expect(beforeDrop[0]!.c).toBe(1); + + const deleted = await deletePendingQuestion(q.id, ORG_A, CONN_A, USER_A); + expect(deleted).toBe(true); + + // The whole row is gone. A subsequent claim sees no row. + const afterDrop = await sql<{ c: number }>` + SELECT COUNT(*)::int AS c FROM pending_interactions WHERE id = ${q.id} + `; + expect(afterDrop[0]!.c).toBe(0); + expect(await claimPendingQuestion(q.id, ORG_A, CONN_A, USER_A)).toBeNull(); + }); + + // Production-call-site test for Fix #7. + // + // The unit tests above prove `deletePendingQuestion` is correct in + // isolation, but they would still pass if `registerInteractionBridge` + // kept calling `claimPendingQuestion` on post failure. Drive the + // bridge end-to-end here: emit `question:created`, force the thread + // post to fail, then assert the row is gone (not just claimed_at-set). + test("interaction-bridge: post-failure path DELETEs the pending row (not claim-only)", async () => { + const sql = getDb(); + const connectionId = CONN_A; + const questionId = "q-bridge-postfail"; + + // Mocked ChatInstanceManager: `has` says yes, `getInstance` returns a + // stub whose chat.channel() yields a thread whose .post() rejects — + // forces the bridge into the post-failure branch in + // `interaction-bridge.ts:onQuestionCreated`. + const postSpy = { + calls: 0, + lastError: undefined as unknown, + }; + const mockThread = { + post: async () => { + postSpy.calls += 1; + const err = new Error("simulated post failure"); + postSpy.lastError = err; + throw err; + }, + }; + const mockChat = { + channel: () => mockThread, + // registerActionHandlers wires `chat.onAction(...)`. We don't care + // about action dispatch for this test (we drive question:created + // directly); just make it a no-op so registration succeeds. + onAction: () => undefined, + }; + const manager = { + has: (id: string) => id === connectionId, + getInstance: (id: string) => + id === connectionId + ? { + chat: mockChat, + connection: { + id: connectionId, + platform: "slack", + organizationId: ORG_A, + }, + } + : undefined, + } as any; + const connection = { + id: connectionId, + platform: "slack", + organizationId: ORG_A, + } as any; + + const interactionService = new InteractionService(); + const unregister = registerInteractionBridge( + interactionService, + manager, + connection, + mockChat as any + ); + + try { + // Drive a question through the bridge. Use the bare emit (rather + // than postQuestion) so the test doesn't depend on the public + // factory's id-generation; we want a known id we can SELECT for. + const event: PostedQuestion = { + id: questionId, + teamId: undefined, + channelId: "C1", + conversationId: "C1", + userId: USER_A, + connectionId, + platform: "slack", + question: "go?", + options: ["yes", "no"], + } as PostedQuestion; + interactionService.emit("question:created", event); + + // The handler is async (store → dynamic-import → post → fail → + // delete). Wait for `thread.post` to be called first, then poll + // for the row to disappear. + const start = Date.now(); + while (postSpy.calls === 0 && Date.now() - start < 5000) { + await new Promise((r) => setTimeout(r, 25)); + } + let remaining = 1; + while (Date.now() - start < 5000) { + const rows = await sql<{ c: number }>` + SELECT COUNT(*)::int AS c + FROM pending_interactions WHERE id = ${questionId} + `; + remaining = rows[0]!.c; + if (remaining === 0) break; + await new Promise((r) => setTimeout(r, 25)); + } + + expect(postSpy.calls).toBeGreaterThan(0); + // Row must be GONE — pre-fix, this assertion fails because + // claimPendingQuestion only sets claimed_at and leaves the row. + expect(remaining).toBe(0); + } finally { + unregister(); + } + }); + + // Scoping invariant of deletePendingQuestion — same safety as claim. + test("deletePendingQuestion is scoped by (org, connection, user) — leaked id alone cannot delete", async () => { + const sql = getDb(); + await seedOrg("org-b"); + const q = buildQuestion("q-scope"); + await storePendingQuestion(q.id, ORG_A, CONN_A, USER_A, { question: q }); + + // Wrong org — no row deleted. + expect( + await deletePendingQuestion(q.id, "org-b", CONN_A, USER_A) + ).toBe(false); + // Wrong connection — no row deleted. + expect( + await deletePendingQuestion(q.id, ORG_A, "conn-other", USER_A) + ).toBe(false); + // Wrong user — no row deleted. + expect( + await deletePendingQuestion(q.id, ORG_A, CONN_A, "U_B") + ).toBe(false); + + const stillThere = await sql<{ c: number }>` + SELECT COUNT(*)::int AS c FROM pending_interactions WHERE id = ${q.id} + `; + expect(stillThere[0]!.c).toBe(1); + + // Correct scope — deletes. + expect( + await deletePendingQuestion(q.id, ORG_A, CONN_A, USER_A) + ).toBe(true); + }); +}); diff --git a/packages/server/src/gateway/__tests__/secret-proxy.test.ts b/packages/server/src/gateway/__tests__/secret-proxy.test.ts index 0da8af361..8310e23b1 100644 --- a/packages/server/src/gateway/__tests__/secret-proxy.test.ts +++ b/packages/server/src/gateway/__tests__/secret-proxy.test.ts @@ -135,16 +135,35 @@ describe("lookupPlaceholderMapping org scoping", () => { expect(mapping).toBeNull(); }); - test("falls through when mapping has no org tag (legacy)", () => { + test("rejects when mapping has no org tag (legacy) and caller has an expected org", () => { const placeholder = generatePlaceholder( "agent-1", "API_KEY", createBuiltinSecretRef("deployments/agent-1/API_KEY"), "deploy-1" ); - // Mapping has no org → caller's expectation isn't enforceable. - const mapping = lookupPlaceholderMapping(placeholder, "org-a"); + // Previously this fell through ("legacy mapping isn't enforceable") — + // that was the bypass that let a caller from any org resolve a + // legacy unscoped mapping. With an expected org supplied, the check + // now fires and rejects regardless of whether the mapping has its + // own org tag. + expect(lookupPlaceholderMapping(placeholder, "org-a")).toBeNull(); + }); + + test("legacy mapping resolves when no expected org is supplied (warn path)", () => { + const placeholder = generatePlaceholder( + "agent-1", + "API_KEY", + createBuiltinSecretRef("deployments/agent-1/API_KEY"), + "deploy-1" + ); + // A caller that doesn't pass `expectedOrganizationId` still resolves + // the legacy mapping (so existing un-org-scoped call sites aren't + // broken). The WARN log emitted on every such call is the + // deprecation signal. + const mapping = lookupPlaceholderMapping(placeholder); expect(mapping?.agentId).toBe("agent-1"); + expect(mapping?.organizationId).toBeUndefined(); }); }); diff --git a/packages/server/src/gateway/connections/interaction-bridge.ts b/packages/server/src/gateway/connections/interaction-bridge.ts index d79ee130a..57484ae2c 100644 --- a/packages/server/src/gateway/connections/interaction-bridge.ts +++ b/packages/server/src/gateway/connections/interaction-bridge.ts @@ -14,8 +14,8 @@ import type { GrantStore } from "../permissions/grant-store.js"; import type { ChatInstanceManager } from "./chat-instance-manager.js"; import { claimPendingQuestion, + deletePendingQuestion, storePendingQuestion, - sweepStalePendingInteractions, } from "./pending-interaction-store.js"; import type { PlatformConnection } from "./types.js"; @@ -201,11 +201,12 @@ export function registerInteractionBridge( // `SentMessage` (used to strip card buttons on click) — losing it // cross-pod is best-effort UX, not correctness. // - // Each entry remembers `registeredAt` so the periodic sweep can evict - // stale handles that match the 24h DB-row TTL. Without this the Map would - // grow unbounded for questions that are never clicked. The sweep also - // removes the local handle for any row the DB sweeper actually deleted, - // so the two stay in sync. + // DB-row sweeping is owned globally by `coreServices.sweepEphemeralTables` + // (scheduled every 5 minutes in `packages/server/src/scheduled/jobs.ts`). + // We do NOT call `sweepStalePendingInteractions` per-bridge — N bridges + // hitting the same table N times is wasted work. The local sweep below + // is in-memory only: it evicts cache entries past their TTL so the Map + // doesn't grow unbounded for questions that are never clicked. const PENDING_SENT_TTL_MS = 24 * 60 * 60 * 1000; const PENDING_SENT_SWEEP_INTERVAL_MS = 60 * 60 * 1000; interface CachedSent { @@ -214,37 +215,14 @@ export function registerInteractionBridge( } const pendingSentMessages = new Map(); const pendingSentSweepTimer = setInterval(() => { - sweepPendingSent().catch((error) => { - logger.warn( - { connectionId, error: String(error) }, - "pendingSentMessages sweep failed" - ); - }); - }, PENDING_SENT_SWEEP_INTERVAL_MS); - pendingSentSweepTimer.unref?.(); - async function sweepPendingSent(): Promise { const ttlCutoff = Date.now() - PENDING_SENT_TTL_MS; for (const [id, entry] of pendingSentMessages) { if (entry.registeredAt <= ttlCutoff) { pendingSentMessages.delete(id); } } - // Also drop local handles for any DB rows the scheduled sweeper just - // deleted — keeps the local cache from outliving its DB row. - let deletedIds: string[] = []; - try { - deletedIds = await sweepStalePendingInteractions(); - } catch (error) { - // The store logs its own DB errors; treat as best-effort here. - logger.debug( - { connectionId, error: String(error) }, - "sweepStalePendingInteractions failed during local sweep" - ); - } - for (const id of deletedIds) { - pendingSentMessages.delete(id); - } - } + }, PENDING_SENT_SWEEP_INTERVAL_MS); + pendingSentSweepTimer.unref?.(); /** * Persist a pending question row, then cache its SentMessage handle so a * click on this pod can edit the card. The persist happens first — see @@ -356,11 +334,15 @@ export function registerInteractionBridge( ); if (!sent) { // Post failed entirely. The row exists but no card was rendered, - // so a click can never come — drop the row to keep the table - // clean. The DB sweep would catch it eventually; doing it now is - // cheaper and avoids a 24h-stale row. + // so a click can never come — DELETE the row to keep the table + // clean. Pre-fix used `claimPendingQuestion` (UPDATE setting + // claimed_at), which leaves a phantom row sitting around with + // claimed_at set until the 24h sweep. Hard-delete is the + // semantically correct end state, and the four-field scoping + // matches the claim path's safety invariant: a leaked id alone + // cannot delete another tenant's row. try { - await claimPendingQuestion( + await deletePendingQuestion( event.id, organizationId, connectionId, diff --git a/packages/server/src/gateway/connections/pending-interaction-store.ts b/packages/server/src/gateway/connections/pending-interaction-store.ts index f77057c00..c175178e8 100644 --- a/packages/server/src/gateway/connections/pending-interaction-store.ts +++ b/packages/server/src/gateway/connections/pending-interaction-store.ts @@ -56,9 +56,13 @@ export async function storePendingQuestion( connection_id = EXCLUDED.connection_id, expected_user_id = EXCLUDED.expected_user_id, entry_payload = EXCLUDED.entry_payload, - created_at = now(), claimed_at = NULL `; + // Note: `created_at` intentionally NOT touched on conflict — webhook + // retries that re-stash the same id must NOT reset the 24h TTL clock, + // or a misbehaving retry loop could keep the row alive indefinitely. + // `claimed_at = NULL` is still reset so an unclaimed retry can still + // be claimed by a legitimate click. } /** @@ -94,21 +98,63 @@ export async function claimPendingQuestion( ); } +/** + * Default cap on rows deleted per sweep call. The sweep runs every ~5 + * minutes via the scheduled `sweepEphemeralTables` task — a hard cap + * keeps a stale-row backlog (mass-retry storm, paused sweep, etc.) from + * locking the table with a multi-million-row delete. Remaining rows wait + * for the next cycle. + */ +const DEFAULT_SWEEP_LIMIT = 1000; + /** * Delete pending_interactions rows older than `maxAgeMs` and return their * ids. The bridge calls this from the scheduled sweep so it can also evict * the corresponding per-pod `SentMessage` cache entries — otherwise that * Map would grow unbounded for questions that are never clicked. + * + * Bounded by `limit` (default 1000) per call to avoid a long lock under + * a backlog. The next scheduled cycle picks up where this one left off. */ export async function sweepStalePendingInteractions( maxAgeMs = 24 * 60 * 60 * 1000, + limit: number = DEFAULT_SWEEP_LIMIT, ): Promise { const sql = getDb(); const cutoff = new Date(Date.now() - maxAgeMs); const rows = await sql<{ id: string }>` DELETE FROM pending_interactions - WHERE created_at < ${cutoff} + WHERE id IN ( + SELECT id FROM pending_interactions + WHERE created_at < ${cutoff} + LIMIT ${limit} + ) RETURNING id `; return rows.map((r) => r.id); } + +/** + * Hard-delete a pending row. Used by the bridge's drop-on-post-failure + * path so a stale row doesn't sit around with `claimed_at` set, waiting + * 24h for the sweep. Scoped by the same four-field tuple as + * `claimPendingQuestion` so the safety invariant is identical: a leaked + * id alone cannot delete another tenant's row. + */ +export async function deletePendingQuestion( + questionId: string, + organizationId: string, + connectionId: string, + expectedUserId: string, +): Promise { + const sql = getDb(); + const rows = await sql<{ id: string }>` + DELETE FROM pending_interactions + WHERE id = ${questionId} + AND organization_id = ${organizationId} + AND connection_id = ${connectionId} + AND expected_user_id = ${expectedUserId} + RETURNING id + `; + return rows.length > 0; +} diff --git a/packages/server/src/gateway/proxy/secret-proxy.ts b/packages/server/src/gateway/proxy/secret-proxy.ts index 82551a139..ef529c623 100644 --- a/packages/server/src/gateway/proxy/secret-proxy.ts +++ b/packages/server/src/gateway/proxy/secret-proxy.ts @@ -4,6 +4,7 @@ import { Hono } from "hono"; import type { AuthProfilesManager } from "../auth/settings/auth-profiles-manager.js"; import type { ProviderCredentialContext } from "../embedded.js"; import type { ProviderUpstreamConfig } from "../modules/module-system.js"; +import { orgContext } from "../../lobu/stores/org-context.js"; import type { SecretStore } from "../secrets/index.js"; import { getClientIp } from "../utils/rate-limiter.js"; @@ -193,10 +194,16 @@ export function lookupPlaceholderMapping( const mapping = placeholderCache.get(uuid); if (!mapping) return null; if ( - expectedOrganizationId && - mapping.organizationId && + expectedOrganizationId !== undefined && mapping.organizationId !== expectedOrganizationId ) { + // Force the check whenever the caller supplied an expected org. + // Pre-fix this also gated on `mapping.organizationId` being set, + // which let a legacy mapping (minted before the org-id pivot) sail + // through whenever the caller's URL named any org — a worker from + // org B could resolve a legacy unscoped mapping owned by org A under + // org B's request. Now: if the caller has an org expectation, the + // mapping must match it, including refusing to match `undefined`. logger.warn( { mappingAgentId: mapping.agentId, @@ -207,6 +214,18 @@ export function lookupPlaceholderMapping( ); return null; } + // Surface every legacy unscoped access so the deprecation can be + // planned. A mapping with no `organizationId` is from before the pivot + // and should disappear once all in-flight placeholders rotate. + if (!mapping.organizationId) { + logger.warn( + { + mappingAgentId: mapping.agentId, + expectedOrg: expectedOrganizationId, + }, + "Placeholder mapping accessed without organizationId — legacy row, schedule rotation" + ); + } return mapping; } @@ -543,9 +562,20 @@ export class SecretProxy { const orgId = await this.agentOrgResolver(urlAgentId); if (orgId) expectedOrganizationId = orgId; } catch (err) { - logger.warn( + // Fail closed. Falling through with `expectedOrganizationId = + // undefined` on a transient DB error downgrades the placeholder / + // secret-lookup org checks for the entire request — a window where + // a worker bound to org A could resolve a placeholder pointed at + // org B's URL because the binding step lost its expected-org + // anchor. The isolation invariant matters more than the brief + // 503 window during a DB hiccup. + logger.error( { urlAgentId, err: String(err) }, - "agentOrgResolver failed — falling through without org expectation" + "agentOrgResolver failed — rejecting request to preserve org isolation" + ); + return c.json( + { error: "Service Unavailable: failed to resolve agent organization" }, + 503 ); } } @@ -617,18 +647,36 @@ export class SecretProxy { if (urlAgentId && resolvedSlug && this.authProfilesManager) { const providerId = this.slugToProviderId.get(resolvedSlug); if (providerId) { - const profile = await this.authProfilesManager.getBestProfile( - urlAgentId, - providerId, - undefined, - providerContext + // Run the credential lookup under the caller's expected org context + // when we have one. Without this wrapper, `AuthProfilesManager` + // calls its OWN `agentOrgResolver` to derive the org — and on a + // transient DB error that resolver logs a warning and returns + // undefined, then falls through to unscoped credential reads + // (`auth-profiles-manager.ts:251-275`). Wrapping here makes the + // org explicit so the resolver short-circuits and a DB hiccup + // cannot downgrade scoping for a request whose org we already + // know from the worker token / URL. + const runWithOrg = (fn: () => Promise): Promise => + expectedOrganizationId + ? orgContext.run({ organizationId: expectedOrganizationId }, fn) + : fn(); + const authProfilesManager = this.authProfilesManager; + const profile = await runWithOrg(() => + authProfilesManager.getBestProfile( + urlAgentId, + providerId, + undefined, + providerContext + ) ); const userIdForRefresh = providerContext?.userId; const credential = profile && userIdForRefresh - ? await this.authProfilesManager.ensureFreshCredential(profile, { - userId: userIdForRefresh, - agentId: urlAgentId, - }) + ? await runWithOrg(() => + authProfilesManager.ensureFreshCredential(profile, { + userId: userIdForRefresh, + agentId: urlAgentId, + }) + ) : profile?.credential; if (credential) { headers.authorization = `Bearer ${credential}`;