diff --git a/db/migrations/20260516120000_agents_per_org_pk_swap.sql b/db/migrations/20260516120000_agents_per_org_pk_swap.sql new file mode 100644 index 000000000..4936e26af --- /dev/null +++ b/db/migrations/20260516120000_agents_per_org_pk_swap.sql @@ -0,0 +1,125 @@ +-- migrate:up +-- Phase C: swap `agents` PK from globally-unique `id` to per-org composite +-- `(organization_id, id)`. The application has always treated agents as +-- org-scoped (every read/list filters by organization_id) but the global PK +-- silently blocked two orgs from sharing an agent ID — for example, a stale +-- `food-ordering` in one org would prevent `food-ordering` in another. +-- +-- Phase A (20260515120000) added the org column + composite indexes on the +-- 5 FK-holding child tables and backfilled values from agents. Phase B (the +-- application-code refactor in this PR) plumbs `organization_id` through every +-- INSERT/UPDATE/DELETE/SELECT touching these tables. This migration is the +-- final structural swap: it drops the single-column PK + FKs, adds the +-- composite PK + FKs, and widens the per-(agent,kind,pattern) uniques on +-- agent_users / agent_grants / grants with organization_id. + +-- ── 0. Set NOT NULL on the columns Phase A backfilled. +-- Backfill defensively in case any rows snuck in NULL (e.g. embedded PGlite +-- installs that bypassed the dbmate runner during a partial state). +UPDATE public.agent_grants c SET organization_id = a.organization_id FROM public.agents a WHERE c.organization_id IS NULL AND c.agent_id = a.id; +UPDATE public.agent_connections c SET organization_id = a.organization_id FROM public.agents a WHERE c.organization_id IS NULL AND c.agent_id = a.id; +UPDATE public.agent_users c SET organization_id = a.organization_id FROM public.agents a WHERE c.organization_id IS NULL AND c.agent_id = a.id; +UPDATE public.agent_channel_bindings c SET organization_id = a.organization_id FROM public.agents a WHERE c.organization_id IS NULL AND c.agent_id = a.id; +UPDATE public.grants c SET organization_id = a.organization_id FROM public.agents a WHERE c.organization_id IS NULL AND c.agent_id = a.id; + +-- Drop any orphan rows (agent_id with no matching agents row). Backfill +-- can't recover these. +DELETE FROM public.agent_grants WHERE organization_id IS NULL; +DELETE FROM public.agent_connections WHERE organization_id IS NULL; +DELETE FROM public.agent_users WHERE organization_id IS NULL; +DELETE FROM public.agent_channel_bindings WHERE organization_id IS NULL; +DELETE FROM public.grants WHERE organization_id IS NULL; + +ALTER TABLE public.agent_grants ALTER COLUMN organization_id SET NOT NULL; +ALTER TABLE public.agent_connections ALTER COLUMN organization_id SET NOT NULL; +ALTER TABLE public.agent_users ALTER COLUMN organization_id SET NOT NULL; +ALTER TABLE public.agent_channel_bindings ALTER COLUMN organization_id SET NOT NULL; +ALTER TABLE public.grants ALTER COLUMN organization_id SET NOT NULL; + +-- ── 1. Drop the 6 single-column FKs into agents(id). +ALTER TABLE public.agent_grants DROP CONSTRAINT IF EXISTS agent_grants_agent_id_fkey; +ALTER TABLE public.agent_connections DROP CONSTRAINT IF EXISTS agent_connections_agent_id_fkey; +ALTER TABLE public.agent_users DROP CONSTRAINT IF EXISTS agent_users_agent_id_fkey; +ALTER TABLE public.agent_channel_bindings DROP CONSTRAINT IF EXISTS agent_channel_bindings_agent_id_fkey; +ALTER TABLE public.grants DROP CONSTRAINT IF EXISTS grants_agent_id_fkey; +ALTER TABLE public.scheduled_jobs DROP CONSTRAINT IF EXISTS scheduled_jobs_agent_fkey; + +-- ── 2. Drop the unique/PK constraints on child tables that scope to bare agent_id. +ALTER TABLE public.agent_grants DROP CONSTRAINT IF EXISTS agent_grants_agent_id_pattern_key; +ALTER TABLE public.agent_users DROP CONSTRAINT IF EXISTS agent_users_pkey; +ALTER TABLE public.grants DROP CONSTRAINT IF EXISTS grants_pkey; + +-- ── 3. Swap the PK on agents from (id) to (organization_id, id). +ALTER TABLE public.agents DROP CONSTRAINT IF EXISTS agents_pkey; +ALTER TABLE public.agents ADD CONSTRAINT agents_pkey PRIMARY KEY (organization_id, id); + +-- ── 4. Re-add per-org-scoped uniques on the child tables. +ALTER TABLE public.agent_grants + ADD CONSTRAINT agent_grants_org_agent_pattern_key UNIQUE (organization_id, agent_id, pattern); +ALTER TABLE public.agent_users + ADD CONSTRAINT agent_users_pkey PRIMARY KEY (organization_id, agent_id, platform, user_id); +ALTER TABLE public.grants + ADD CONSTRAINT grants_pkey PRIMARY KEY (organization_id, agent_id, kind, pattern); + +-- ── 5. Re-add composite FKs (organization_id, agent_id) → agents(organization_id, id). +ALTER TABLE public.agent_grants + ADD CONSTRAINT agent_grants_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; +ALTER TABLE public.agent_connections + ADD CONSTRAINT agent_connections_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; +ALTER TABLE public.agent_users + ADD CONSTRAINT agent_users_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; +ALTER TABLE public.agent_channel_bindings + ADD CONSTRAINT agent_channel_bindings_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; +ALTER TABLE public.grants + ADD CONSTRAINT grants_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; +ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_org_agent_fkey + FOREIGN KEY (organization_id, created_by_agent) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; + +-- migrate:down +-- Reverse the swap. NOTE: this WILL FAIL if two orgs ended up sharing an +-- agent ID after this migration shipped (the previous PK on (id) requires +-- global uniqueness). That's by design — this migration's whole purpose is +-- to allow per-org agent IDs that the old PK forbids. + +ALTER TABLE public.scheduled_jobs DROP CONSTRAINT IF EXISTS scheduled_jobs_org_agent_fkey; +ALTER TABLE public.grants DROP CONSTRAINT IF EXISTS grants_org_agent_fkey; +ALTER TABLE public.agent_channel_bindings DROP CONSTRAINT IF EXISTS agent_channel_bindings_org_agent_fkey; +ALTER TABLE public.agent_users DROP CONSTRAINT IF EXISTS agent_users_org_agent_fkey; +ALTER TABLE public.agent_connections DROP CONSTRAINT IF EXISTS agent_connections_org_agent_fkey; +ALTER TABLE public.agent_grants DROP CONSTRAINT IF EXISTS agent_grants_org_agent_fkey; + +ALTER TABLE public.grants DROP CONSTRAINT IF EXISTS grants_pkey; +ALTER TABLE public.agent_users DROP CONSTRAINT IF EXISTS agent_users_pkey; +ALTER TABLE public.agent_grants DROP CONSTRAINT IF EXISTS agent_grants_org_agent_pattern_key; + +ALTER TABLE public.agents DROP CONSTRAINT IF EXISTS agents_pkey; +ALTER TABLE public.agents ADD CONSTRAINT agents_pkey PRIMARY KEY (id); + +ALTER TABLE public.agent_grants ADD CONSTRAINT agent_grants_agent_id_pattern_key UNIQUE (agent_id, pattern); +ALTER TABLE public.agent_users ADD CONSTRAINT agent_users_pkey PRIMARY KEY (agent_id, platform, user_id); +ALTER TABLE public.grants ADD CONSTRAINT grants_pkey PRIMARY KEY (agent_id, kind, pattern); + +ALTER TABLE public.agent_grants + ADD CONSTRAINT agent_grants_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; +ALTER TABLE public.agent_connections + ADD CONSTRAINT agent_connections_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; +ALTER TABLE public.agent_users + ADD CONSTRAINT agent_users_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; +ALTER TABLE public.agent_channel_bindings + ADD CONSTRAINT agent_channel_bindings_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; +ALTER TABLE public.grants + ADD CONSTRAINT grants_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; +ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_agent_fkey FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; + +ALTER TABLE public.grants ALTER COLUMN organization_id DROP NOT NULL; +ALTER TABLE public.agent_channel_bindings ALTER COLUMN organization_id DROP NOT NULL; +ALTER TABLE public.agent_users ALTER COLUMN organization_id DROP NOT NULL; +ALTER TABLE public.agent_connections ALTER COLUMN organization_id DROP NOT NULL; +ALTER TABLE public.agent_grants ALTER COLUMN organization_id DROP NOT NULL; diff --git a/db/schema.sql b/db/schema.sql index 5da36e5a6..447bbba1e 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -114,7 +114,7 @@ CREATE TABLE public.agent_channel_bindings ( channel_id text NOT NULL, team_id text, created_at timestamp with time zone DEFAULT now() NOT NULL, - organization_id text + organization_id text NOT NULL ); -- @@ -132,7 +132,7 @@ CREATE TABLE public.agent_connections ( error_message text, created_at timestamp with time zone DEFAULT now() NOT NULL, updated_at timestamp with time zone DEFAULT now() NOT NULL, - organization_id text, + organization_id text NOT NULL, CONSTRAINT agent_connections_status_check CHECK ((status = ANY (ARRAY['active'::text, 'stopped'::text, 'error'::text]))) ); @@ -147,7 +147,7 @@ CREATE TABLE public.agent_grants ( expires_at timestamp with time zone, granted_at timestamp with time zone DEFAULT now() NOT NULL, denied boolean DEFAULT false, - organization_id text + organization_id text NOT NULL ); -- @@ -191,7 +191,7 @@ CREATE TABLE public.agent_users ( platform text NOT NULL, user_id text NOT NULL, created_at timestamp with time zone DEFAULT now() NOT NULL, - organization_id text + organization_id text NOT NULL ); -- @@ -1109,7 +1109,7 @@ CREATE TABLE public.grants ( expires_at timestamp with time zone, granted_at timestamp with time zone DEFAULT now() NOT NULL, denied boolean DEFAULT false NOT NULL, - organization_id text + organization_id text NOT NULL ); -- @@ -2214,11 +2214,11 @@ ALTER TABLE ONLY public.agent_connections ADD CONSTRAINT agent_connections_pkey PRIMARY KEY (id); -- --- Name: agent_grants agent_grants_agent_id_pattern_key; Type: CONSTRAINT; Schema: public; Owner: - +-- Name: agent_grants agent_grants_org_agent_pattern_key; Type: CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.agent_grants - ADD CONSTRAINT agent_grants_agent_id_pattern_key UNIQUE (agent_id, pattern); + ADD CONSTRAINT agent_grants_org_agent_pattern_key UNIQUE (organization_id, agent_id, pattern); -- -- Name: agent_grants agent_grants_pkey; Type: CONSTRAINT; Schema: public; Owner: - @@ -2239,14 +2239,14 @@ ALTER TABLE ONLY public.agent_secrets -- ALTER TABLE ONLY public.agent_users - ADD CONSTRAINT agent_users_pkey PRIMARY KEY (agent_id, platform, user_id); + ADD CONSTRAINT agent_users_pkey PRIMARY KEY (organization_id, agent_id, platform, user_id); -- -- Name: agents agents_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.agents - ADD CONSTRAINT agents_pkey PRIMARY KEY (id); + ADD CONSTRAINT agents_pkey PRIMARY KEY (organization_id, id); -- -- Name: auth_profiles auth_profiles_org_id_unique; Type: CONSTRAINT; Schema: public; Owner: - @@ -2421,7 +2421,7 @@ ALTER TABLE ONLY public.feeds -- ALTER TABLE ONLY public.grants - ADD CONSTRAINT grants_pkey PRIMARY KEY (agent_id, kind, pattern); + ADD CONSTRAINT grants_pkey PRIMARY KEY (organization_id, agent_id, kind, pattern); -- -- Name: watcher_versions insight_template_versions_pkey; Type: CONSTRAINT; Schema: public; Owner: - @@ -4104,32 +4104,32 @@ ALTER TABLE ONLY public.account ADD CONSTRAINT "account_userId_fkey" FOREIGN KEY ("userId") REFERENCES public."user"(id) ON DELETE CASCADE; -- --- Name: agent_channel_bindings agent_channel_bindings_agent_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- Name: agent_channel_bindings agent_channel_bindings_org_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.agent_channel_bindings - ADD CONSTRAINT agent_channel_bindings_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; + ADD CONSTRAINT agent_channel_bindings_org_agent_fkey FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; -- --- Name: agent_connections agent_connections_agent_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- Name: agent_connections agent_connections_org_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.agent_connections - ADD CONSTRAINT agent_connections_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; + ADD CONSTRAINT agent_connections_org_agent_fkey FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; -- --- Name: agent_grants agent_grants_agent_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- Name: agent_grants agent_grants_org_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.agent_grants - ADD CONSTRAINT agent_grants_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; + ADD CONSTRAINT agent_grants_org_agent_fkey FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; -- --- Name: agent_users agent_users_agent_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- Name: agent_users agent_users_org_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.agent_users - ADD CONSTRAINT agent_users_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; + ADD CONSTRAINT agent_users_org_agent_fkey FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; -- -- Name: agents agents_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - @@ -4545,11 +4545,11 @@ ALTER TABLE ONLY public.event_classifiers ADD CONSTRAINT fk_event_classifiers_insight FOREIGN KEY (watcher_id) REFERENCES public.watchers(id) ON DELETE SET NULL; -- --- Name: grants grants_agent_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- Name: grants grants_org_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.grants - ADD CONSTRAINT grants_agent_id_fkey FOREIGN KEY (agent_id) REFERENCES public.agents(id) ON DELETE CASCADE; + ADD CONSTRAINT grants_org_agent_fkey FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; -- -- Name: watcher_versions insight_template_versions_created_by_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - @@ -4811,11 +4811,11 @@ ALTER TABLE ONLY public.runs ADD CONSTRAINT runs_window_id_fkey FOREIGN KEY (window_id) REFERENCES public.watcher_windows(id) ON DELETE SET NULL; -- --- Name: scheduled_jobs scheduled_jobs_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- Name: scheduled_jobs scheduled_jobs_org_agent_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - -- ALTER TABLE ONLY public.scheduled_jobs - ADD CONSTRAINT scheduled_jobs_agent_fkey FOREIGN KEY (created_by_agent) REFERENCES public.agents(id) ON DELETE CASCADE; + ADD CONSTRAINT scheduled_jobs_org_agent_fkey FOREIGN KEY (organization_id, created_by_agent) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE; -- -- Name: scheduled_jobs scheduled_jobs_organization_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - @@ -4990,4 +4990,5 @@ INSERT INTO public.schema_migrations (version) VALUES ('20260514160000'), ('20260515120000'), ('20260515150000'), - ('20260515160000'); + ('20260515160000'), + ('20260516120000'); diff --git a/examples/office-bot/lobu.toml b/examples/office-bot/lobu.toml index eae2d83ac..e7943aeac 100644 --- a/examples/office-bot/lobu.toml +++ b/examples/office-bot/lobu.toml @@ -67,6 +67,7 @@ fail closed and deny with a reason. """ [memory] +organization_id = "UdNAH1bb3csC842vhOgxAHVcfX4tYU5A" enabled = true org = "lobu-team" name = "Lobu Team" diff --git a/packages/core/src/agent-store.ts b/packages/core/src/agent-store.ts index 158e577f9..10b52520c 100644 --- a/packages/core/src/agent-store.ts +++ b/packages/core/src/agent-store.ts @@ -107,6 +107,12 @@ export interface StoredConnection { id: string; platform: string; agentId?: string; + /** + * Organization id this connection belongs to. Optional in the type for + * back-compat with in-memory tests, but required at the storage layer + * (`agent_connections.organization_id` is NOT NULL post-Phase-C). + */ + organizationId?: string; config: Record; settings: ConnectionSettings; metadata: Record; diff --git a/packages/server/src/db/embedded-schema-patches.ts b/packages/server/src/db/embedded-schema-patches.ts index 480a60d7f..02a49ec28 100644 --- a/packages/server/src/db/embedded-schema-patches.ts +++ b/packages/server/src/db/embedded-schema-patches.ts @@ -568,4 +568,133 @@ export const EMBEDDED_SCHEMA_PATCHES: EmbeddedSchemaPatch[] = [ ); }, }, + { + // Mirrors db/migrations/20260516120000_agents_per_org_pk_swap.sql. + // Detects whether the swap has already happened by reading the current + // PK definition on `agents`; skips silently when the composite PK is + // already in place. + id: 'agents-per-org-pk-phase-c', + apply: async (sql) => { + const pkDef = (await sql.unsafe(` + SELECT pg_get_constraintdef(c.oid) AS def + FROM pg_constraint c + JOIN pg_class t ON t.oid = c.conrelid + JOIN pg_namespace n ON n.oid = t.relnamespace + WHERE n.nspname = 'public' + AND t.relname = 'agents' + AND c.contype = 'p' + LIMIT 1 + `)) as Array<{ def: string }>; + const def = pkDef[0]?.def ?? ''; + if (def.includes('organization_id') && def.includes('id')) { + // Composite PK already in place — nothing to do. + return; + } + + // Backfill any stragglers and drop orphans. + for (const t of [ + 'agent_grants', + 'agent_connections', + 'agent_users', + 'agent_channel_bindings', + 'grants', + ]) { + await sql.unsafe(` + UPDATE public.${t} c + SET organization_id = a.organization_id + FROM public.agents a + WHERE c.organization_id IS NULL AND c.agent_id = a.id + `); + await sql.unsafe(` + DELETE FROM public.${t} WHERE organization_id IS NULL + `); + await sql.unsafe(` + ALTER TABLE public.${t} ALTER COLUMN organization_id SET NOT NULL + `); + } + + // Drop legacy single-column FKs. + await sql.unsafe( + `ALTER TABLE public.agent_grants DROP CONSTRAINT IF EXISTS agent_grants_agent_id_fkey` + ); + await sql.unsafe( + `ALTER TABLE public.agent_connections DROP CONSTRAINT IF EXISTS agent_connections_agent_id_fkey` + ); + await sql.unsafe( + `ALTER TABLE public.agent_users DROP CONSTRAINT IF EXISTS agent_users_agent_id_fkey` + ); + await sql.unsafe( + `ALTER TABLE public.agent_channel_bindings DROP CONSTRAINT IF EXISTS agent_channel_bindings_agent_id_fkey` + ); + await sql.unsafe( + `ALTER TABLE public.grants DROP CONSTRAINT IF EXISTS grants_agent_id_fkey` + ); + await sql.unsafe( + `ALTER TABLE public.scheduled_jobs DROP CONSTRAINT IF EXISTS scheduled_jobs_agent_fkey` + ); + + // Drop legacy uniques/PKs scoped to bare agent_id. + await sql.unsafe( + `ALTER TABLE public.agent_grants DROP CONSTRAINT IF EXISTS agent_grants_agent_id_pattern_key` + ); + await sql.unsafe( + `ALTER TABLE public.agent_users DROP CONSTRAINT IF EXISTS agent_users_pkey` + ); + await sql.unsafe( + `ALTER TABLE public.grants DROP CONSTRAINT IF EXISTS grants_pkey` + ); + + // Swap PK on agents. + await sql.unsafe(`ALTER TABLE public.agents DROP CONSTRAINT IF EXISTS agents_pkey`); + await sql.unsafe( + `ALTER TABLE public.agents ADD CONSTRAINT agents_pkey PRIMARY KEY (organization_id, id)` + ); + + // Re-add per-org-scoped uniques. + await sql.unsafe(` + ALTER TABLE public.agent_grants + ADD CONSTRAINT agent_grants_org_agent_pattern_key UNIQUE (organization_id, agent_id, pattern) + `); + await sql.unsafe(` + ALTER TABLE public.agent_users + ADD CONSTRAINT agent_users_pkey PRIMARY KEY (organization_id, agent_id, platform, user_id) + `); + await sql.unsafe(` + ALTER TABLE public.grants + ADD CONSTRAINT grants_pkey PRIMARY KEY (organization_id, agent_id, kind, pattern) + `); + + // Re-add composite FKs into agents(organization_id, id). + await sql.unsafe(` + ALTER TABLE public.agent_grants + ADD CONSTRAINT agent_grants_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE + `); + await sql.unsafe(` + ALTER TABLE public.agent_connections + ADD CONSTRAINT agent_connections_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE + `); + await sql.unsafe(` + ALTER TABLE public.agent_users + ADD CONSTRAINT agent_users_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE + `); + await sql.unsafe(` + ALTER TABLE public.agent_channel_bindings + ADD CONSTRAINT agent_channel_bindings_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE + `); + await sql.unsafe(` + ALTER TABLE public.grants + ADD CONSTRAINT grants_org_agent_fkey + FOREIGN KEY (organization_id, agent_id) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE + `); + await sql.unsafe(` + ALTER TABLE public.scheduled_jobs + ADD CONSTRAINT scheduled_jobs_org_agent_fkey + FOREIGN KEY (organization_id, created_by_agent) REFERENCES public.agents(organization_id, id) ON DELETE CASCADE + `); + }, + }, ]; diff --git a/packages/server/src/gateway/__tests__/base-deployment-grants.test.ts b/packages/server/src/gateway/__tests__/base-deployment-grants.test.ts index 9ddd379c1..e8969a701 100644 --- a/packages/server/src/gateway/__tests__/base-deployment-grants.test.ts +++ b/packages/server/src/gateway/__tests__/base-deployment-grants.test.ts @@ -60,6 +60,7 @@ function buildPayload(overrides: Partial): MessagePayload { channelId: "ch", teamId: "t", agentId: "agent-1", + organizationId: "test-org", botId: "b", platform: "slack", messageText: "hi", diff --git a/packages/server/src/gateway/__tests__/grant-store.test.ts b/packages/server/src/gateway/__tests__/grant-store.test.ts index 68391d561..0f7dbb68b 100644 --- a/packages/server/src/gateway/__tests__/grant-store.test.ts +++ b/packages/server/src/gateway/__tests__/grant-store.test.ts @@ -1,4 +1,5 @@ import { beforeAll, beforeEach, describe, expect, test } from "bun:test"; +import { orgContext } from "../../lobu/stores/org-context.js"; import { GrantStore } from "../permissions/grant-store.js"; import { ensurePgliteForGatewayTests, @@ -6,6 +7,18 @@ import { seedAgentRow, } from "./helpers/db-setup.js"; +const ORG_ID = "test-org"; + +/** + * Wrap a test body in `orgContext.run` so the GrantStore (which scopes by + * `tryGetOrgId()` post-Phase-C) can resolve the active org for the seeded + * agent row. Every test in this file uses the default org_id from + * `seedAgentRow` ("test-org"). + */ +function withOrg(fn: () => Promise): Promise { + return orgContext.run({ organizationId: ORG_ID }, fn); +} + describe("GrantStore (PG-backed)", () => { let store: GrantStore; @@ -23,153 +36,201 @@ describe("GrantStore (PG-backed)", () => { describe("grant", () => { test("stores grant without expiry when expiresAt is null", async () => { - await store.grant("agent-1", "api.openai.com", null); - const grants = await store.listGrants("agent-1"); - expect(grants).toHaveLength(1); - expect(grants[0]?.pattern).toBe("api.openai.com"); - expect(grants[0]?.expiresAt).toBeNull(); - expect(grants[0]?.grantedAt).toBeGreaterThan(0); + await withOrg(async () => { + await store.grant("agent-1", "api.openai.com", null); + const grants = await store.listGrants("agent-1"); + expect(grants).toHaveLength(1); + expect(grants[0]?.pattern).toBe("api.openai.com"); + expect(grants[0]?.expiresAt).toBeNull(); + expect(grants[0]?.grantedAt).toBeGreaterThan(0); + }); }); test("stores grant with expiry when expiresAt is set", async () => { - const future = Date.now() + 60_000; - await store.grant("agent-1", "api.openai.com", future); - const grants = await store.listGrants("agent-1"); - expect(grants).toHaveLength(1); - expect(grants[0]?.expiresAt).not.toBeNull(); + await withOrg(async () => { + const future = Date.now() + 60_000; + await store.grant("agent-1", "api.openai.com", future); + const grants = await store.listGrants("agent-1"); + expect(grants).toHaveLength(1); + expect(grants[0]?.expiresAt).not.toBeNull(); + }); }); test("stores denied grant", async () => { - await store.grant("agent-1", "evil.com", null, true); - const grants = await store.listGrants("agent-1"); - expect(grants[0]?.denied).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "evil.com", null, true); + const grants = await store.listGrants("agent-1"); + expect(grants[0]?.denied).toBe(true); + }); }); test("preserves MCP path casing when storing grants", async () => { - await store.grant("agent-1", "/mcp/Gmail/tools/SendEmail", null); - expect( - await store.hasGrant("agent-1", "/mcp/Gmail/tools/SendEmail") - ).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "/mcp/Gmail/tools/SendEmail", null); + expect( + await store.hasGrant("agent-1", "/mcp/Gmail/tools/SendEmail") + ).toBe(true); + }); }); }); describe("hasGrant", () => { test("returns true for existing grant", async () => { - await store.grant("agent-1", "api.openai.com", null); - expect(await store.hasGrant("agent-1", "api.openai.com")).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "api.openai.com", null); + expect(await store.hasGrant("agent-1", "api.openai.com")).toBe(true); + }); }); test("returns false for missing grant", async () => { - expect(await store.hasGrant("agent-1", "unknown.com")).toBe(false); + await withOrg(async () => { + expect(await store.hasGrant("agent-1", "unknown.com")).toBe(false); + }); }); test("returns false for denied grant", async () => { - await store.grant("agent-1", "evil.com", null, true); - expect(await store.hasGrant("agent-1", "evil.com")).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "evil.com", null, true); + expect(await store.hasGrant("agent-1", "evil.com")).toBe(false); + }); }); test("matches MCP wildcard pattern", async () => { - await store.grant("agent-1", "/mcp/gmail/tools/*", null); - expect( - await store.hasGrant("agent-1", "/mcp/gmail/tools/send_email") - ).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "/mcp/gmail/tools/*", null); + expect( + await store.hasGrant("agent-1", "/mcp/gmail/tools/send_email") + ).toBe(true); + }); }); test("matches exact MCP path with original casing", async () => { - await store.grant("agent-1", "/mcp/Gmail/tools/SendEmail", null); - expect( - await store.hasGrant("agent-1", "/mcp/Gmail/tools/SendEmail") - ).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "/mcp/Gmail/tools/SendEmail", null); + expect( + await store.hasGrant("agent-1", "/mcp/Gmail/tools/SendEmail") + ).toBe(true); + }); }); test("MCP wildcard denied blocks access", async () => { - await store.grant("agent-1", "/mcp/gmail/tools/*", null, true); - expect( - await store.hasGrant("agent-1", "/mcp/gmail/tools/send_email") - ).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "/mcp/gmail/tools/*", null, true); + expect( + await store.hasGrant("agent-1", "/mcp/gmail/tools/send_email") + ).toBe(false); + }); }); test("matches domain wildcard pattern", async () => { - await store.grant("agent-1", "*.example.com", null); - expect(await store.hasGrant("agent-1", "api.example.com")).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "*.example.com", null); + expect(await store.hasGrant("agent-1", "api.example.com")).toBe(true); + }); }); test("matches leading-dot domain wildcard pattern", async () => { - await store.grant("agent-1", ".example.com", null); - expect(await store.hasGrant("agent-1", "api.example.com")).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", ".example.com", null); + expect(await store.hasGrant("agent-1", "api.example.com")).toBe(true); + }); }); test("domain wildcard does not match two-part domains", async () => { - await store.grant("agent-1", "*.example.com", null); - expect(await store.hasGrant("agent-1", "example.com")).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "*.example.com", null); + expect(await store.hasGrant("agent-1", "example.com")).toBe(false); + }); }); test("domain wildcard denied blocks access", async () => { - await store.grant("agent-1", "*.evil.com", null, true); - expect(await store.hasGrant("agent-1", "sub.evil.com")).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "*.evil.com", null, true); + expect(await store.hasGrant("agent-1", "sub.evil.com")).toBe(false); + }); }); test("exact match takes precedence over wildcards", async () => { - await store.grant("agent-1", "api.example.com", null); - expect(await store.hasGrant("agent-1", "api.example.com")).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "api.example.com", null); + expect(await store.hasGrant("agent-1", "api.example.com")).toBe(true); + }); }); test("non-MCP non-domain path returns false", async () => { - expect(await store.hasGrant("agent-1", "/some/other/path")).toBe(false); + await withOrg(async () => { + expect(await store.hasGrant("agent-1", "/some/other/path")).toBe(false); + }); }); test("expired grant is filtered out", async () => { - const past = Date.now() - 1000; - await store.grant("agent-1", "stale.com", past); - expect(await store.hasGrant("agent-1", "stale.com")).toBe(false); + await withOrg(async () => { + const past = Date.now() - 1000; + await store.grant("agent-1", "stale.com", past); + expect(await store.hasGrant("agent-1", "stale.com")).toBe(false); + }); }); }); describe("isDenied", () => { test("returns true for denied grant", async () => { - await store.grant("agent-1", "evil.com", null, true); - expect(await store.isDenied("agent-1", "evil.com")).toBe(true); + await withOrg(async () => { + await store.grant("agent-1", "evil.com", null, true); + expect(await store.isDenied("agent-1", "evil.com")).toBe(true); + }); }); test("returns false for allowed grant", async () => { - await store.grant("agent-1", "good.com", null); - expect(await store.isDenied("agent-1", "good.com")).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "good.com", null); + expect(await store.isDenied("agent-1", "good.com")).toBe(false); + }); }); test("returns false for missing grant", async () => { - expect(await store.isDenied("agent-1", "unknown.com")).toBe(false); + await withOrg(async () => { + expect(await store.isDenied("agent-1", "unknown.com")).toBe(false); + }); }); }); describe("revoke", () => { test("removes grant", async () => { - await store.grant("agent-1", "api.openai.com", null); - expect(await store.hasGrant("agent-1", "api.openai.com")).toBe(true); - await store.revoke("agent-1", "api.openai.com"); - expect(await store.hasGrant("agent-1", "api.openai.com")).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "api.openai.com", null); + expect(await store.hasGrant("agent-1", "api.openai.com")).toBe(true); + await store.revoke("agent-1", "api.openai.com"); + expect(await store.hasGrant("agent-1", "api.openai.com")).toBe(false); + }); }); test("removes normalized wildcard grant variants", async () => { - await store.grant("agent-1", "*.github.com", null); - expect(await store.hasGrant("agent-1", "api.github.com")).toBe(true); - await store.revoke("agent-1", ".github.com"); - expect(await store.hasGrant("agent-1", "api.github.com")).toBe(false); + await withOrg(async () => { + await store.grant("agent-1", "*.github.com", null); + expect(await store.hasGrant("agent-1", "api.github.com")).toBe(true); + await store.revoke("agent-1", ".github.com"); + expect(await store.hasGrant("agent-1", "api.github.com")).toBe(false); + }); }); }); describe("listGrants", () => { test("returns empty array when no grants", async () => { - const grants = await store.listGrants("agent-1"); - expect(grants).toEqual([]); + await withOrg(async () => { + const grants = await store.listGrants("agent-1"); + expect(grants).toEqual([]); + }); }); test("lists every active grant for the agent", async () => { - await store.grant("agent-1", "api.openai.com", null); - await store.grant("agent-1", "*.github.com", null); - const grants = await store.listGrants("agent-1"); - expect(grants).toHaveLength(2); - const patterns = grants.map((g) => g.pattern).sort(); - expect(patterns).toEqual([".github.com", "api.openai.com"]); + await withOrg(async () => { + await store.grant("agent-1", "api.openai.com", null); + await store.grant("agent-1", "*.github.com", null); + const grants = await store.listGrants("agent-1"); + expect(grants).toHaveLength(2); + const patterns = grants.map((g) => g.pattern).sort(); + expect(patterns).toEqual([".github.com", "api.openai.com"]); + }); }); }); }); diff --git a/packages/server/src/gateway/__tests__/helpers/db-setup.ts b/packages/server/src/gateway/__tests__/helpers/db-setup.ts index 9a40b095c..703529e2b 100644 --- a/packages/server/src/gateway/__tests__/helpers/db-setup.ts +++ b/packages/server/src/gateway/__tests__/helpers/db-setup.ts @@ -105,7 +105,7 @@ export async function seedAgentRow( ${agentId}, ${orgId}, ${options.name ?? agentId}, ${options.ownerPlatform ?? null}, ${options.ownerUserId ?? null} ) - ON CONFLICT (id) DO NOTHING + ON CONFLICT (organization_id, id) DO NOTHING `; return orgId; } diff --git a/packages/server/src/gateway/__tests__/mcp-proxy-edge-cases.test.ts b/packages/server/src/gateway/__tests__/mcp-proxy-edge-cases.test.ts index 42be7d609..bb8c2620a 100644 --- a/packages/server/src/gateway/__tests__/mcp-proxy-edge-cases.test.ts +++ b/packages/server/src/gateway/__tests__/mcp-proxy-edge-cases.test.ts @@ -451,7 +451,13 @@ describe("cross-agent JWT isolation", () => { await toolCache.set("shared-mcp", [{ name: "delete_everything" }], "agent2"); // Grant only to agent1 - await grantStore.grant("agent1", "/mcp/shared-mcp/tools/delete_everything", null); + await grantStore.grant( + "agent1", + "/mcp/shared-mcp/tools/delete_everything", + null, + undefined, + "test-org" + ); successFetch({ jsonrpc: "2.0", @@ -618,7 +624,13 @@ describe("tool approval — onToolBlocked and wildcard grants", () => { ); // Wildcard grant for the whole server - await grantStore.grant("agent1", "/mcp/gh-mcp/tools/*", null); + await grantStore.grant( + "agent1", + "/mcp/gh-mcp/tools/*", + null, + undefined, + "test-org" + ); const configSource = createConfigSource({ "gh-mcp": { id: "gh-mcp", upstreamUrl: "http://gh.example.com/mcp" }, diff --git a/packages/server/src/gateway/__tests__/mcp-proxy.test.ts b/packages/server/src/gateway/__tests__/mcp-proxy.test.ts index d2557ba2e..8ed39da5b 100644 --- a/packages/server/src/gateway/__tests__/mcp-proxy.test.ts +++ b/packages/server/src/gateway/__tests__/mcp-proxy.test.ts @@ -540,7 +540,9 @@ describe("McpProxy", () => { await grantStore.grant( "agent1", "/mcp/test-mcp/tools/dangerous_tool", - null + null, + undefined, + "test-org" ); mockUpstreamFetch({ diff --git a/packages/server/src/gateway/auth/base-provider-module.ts b/packages/server/src/gateway/auth/base-provider-module.ts index 97b835c97..383406956 100644 --- a/packages/server/src/gateway/auth/base-provider-module.ts +++ b/packages/server/src/gateway/auth/base-provider-module.ts @@ -21,27 +21,30 @@ const logger = createLogger("base-provider-module"); * decrypt — every miss path is silent so the caller can keep walking the * resolution chain. * - * Org id is derived from the agent row (joined through `agents`) rather than - * `tryGetOrgId()`. The worker-spawn code path that calls `buildEnvVars` runs - * outside the org-routing middleware's AsyncLocalStorage scope, so the ALS - * helper returns null there. Joining via `agents.id` is correct as long as - * agent IDs are globally unique — which they are today (the per-org PK swap - * ships in the Phase B/C refactor). + * The lookup is keyed strictly on `(organization_id, name)` against + * `agent_secrets`. The orgId comes from one of two sources, in order: + * 1. `context.organizationId` — set by the worker-spawn code path that + * threads the org id through `ProviderCredentialContext`. + * 2. `tryGetOrgId()` — the AsyncLocalStorage-backed org context set by + * request middleware. + * + * Returns null when neither is set. Earlier revisions joined through + * `agents WHERE id = agentId` to derive the org, which became ambiguous + * once agent ids were per-org-unique. */ async function readOrgSharedProviderKey( - agentId: string, - providerId: string + providerId: string, + context?: ProviderCredentialContext ): Promise { + const orgId = context?.organizationId ?? tryGetOrgId(); + if (!orgId) return null; const sql = getDb(); - const orgFromContext = tryGetOrgId(); const rows = (await sql` - SELECT s.ciphertext - FROM agent_secrets s - JOIN agents a ON a.organization_id = s.organization_id - WHERE a.id = ${agentId} - AND s.name = ${providerOrgSecretName(providerId)} - AND (s.expires_at IS NULL OR s.expires_at > now()) - AND (${orgFromContext}::text IS NULL OR s.organization_id = ${orgFromContext}) + SELECT ciphertext + FROM agent_secrets + WHERE organization_id = ${orgId} + AND name = ${providerOrgSecretName(providerId)} + AND (expires_at IS NULL OR expires_at > now()) LIMIT 1 `) as Array<{ ciphertext: string }>; const ciphertext = rows[0]?.ciphertext; @@ -253,7 +256,7 @@ export abstract class BaseProviderModule } else { // No per-user auth profile — fall back to the org-shared API key // declared via `lobu apply` (`provider::apiKey` in agent_secrets). - const orgKey = await readOrgSharedProviderKey(agentId, this.providerId); + const orgKey = await readOrgSharedProviderKey(this.providerId, context); if (orgKey) { logger.info( `Injecting ${credVar} for agent ${agentId} (${this.providerId}) from org-shared secret` @@ -288,7 +291,7 @@ export abstract class BaseProviderModule if (credential) { return credential; } - const orgKey = await readOrgSharedProviderKey(agentId, this.providerId); + const orgKey = await readOrgSharedProviderKey(this.providerId, context); if (orgKey) return orgKey; const sysVar = this.providerConfig.systemEnvVarName || diff --git a/packages/server/src/gateway/auth/user-agents-store.ts b/packages/server/src/gateway/auth/user-agents-store.ts index e4a4bd7cb..932d1e43e 100644 --- a/packages/server/src/gateway/auth/user-agents-store.ts +++ b/packages/server/src/gateway/auth/user-agents-store.ts @@ -1,23 +1,37 @@ import { createLogger } from "@lobu/core"; import { getDb } from "../../db/client.js"; +import { tryGetOrgId } from "../../lobu/stores/org-context.js"; const logger = createLogger("user-agents-store"); /** * Track which agents belong to which users. Read-through to * `public.agent_users`. + * + * Methods accept an optional `organizationId` for callers outside the + * AsyncLocalStorage org-context scope (worker spawn, OAuth callbacks). + * Inside a request handler the ALS-backed `tryGetOrgId()` is used. + * `addAgent` requires an explicit `organizationId` because the row is + * an INSERT and the table requires a non-null org column. */ export class UserAgentsStore { async addAgent( platform: string, userId: string, - agentId: string + agentId: string, + organizationId?: string ): Promise { + const orgId = organizationId ?? tryGetOrgId(); + if (!orgId) { + throw new Error( + "UserAgentsStore.addAgent requires organizationId (explicit or via orgContext)" + ); + } const sql = getDb(); await sql` - INSERT INTO agent_users (agent_id, platform, user_id, created_at) - VALUES (${agentId}, ${platform}, ${userId}, now()) - ON CONFLICT (agent_id, platform, user_id) DO NOTHING + INSERT INTO agent_users (organization_id, agent_id, platform, user_id, created_at) + VALUES (${orgId}, ${agentId}, ${platform}, ${userId}, now()) + ON CONFLICT (organization_id, agent_id, platform, user_id) DO NOTHING `; logger.info(`Added agent ${agentId} to user ${platform}/${userId}`); } @@ -25,32 +39,55 @@ export class UserAgentsStore { async removeAgent( platform: string, userId: string, - agentId: string + agentId: string, + organizationId?: string ): Promise { const sql = getDb(); - await sql` - DELETE FROM agent_users - WHERE agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} - `; + const orgId = organizationId ?? tryGetOrgId(); + if (orgId) { + await sql` + DELETE FROM agent_users + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} + `; + } else { + await sql` + DELETE FROM agent_users + WHERE agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} + `; + } logger.info(`Removed agent ${agentId} from user ${platform}/${userId}`); } - async listAgents(platform: string, userId: string): Promise { + async listAgents( + platform: string, + userId: string, + organizationId?: string + ): Promise { const sql = getDb(); - const rows = await sql` - SELECT agent_id - FROM agent_users - WHERE platform = ${platform} AND user_id = ${userId} - `; + const orgId = organizationId ?? tryGetOrgId(); + const rows = orgId + ? await sql` + SELECT agent_id + FROM agent_users + WHERE organization_id = ${orgId} + AND platform = ${platform} AND user_id = ${userId} + ` + : await sql` + SELECT agent_id + FROM agent_users + WHERE platform = ${platform} AND user_id = ${userId} + `; return rows.map((r: any) => r.agent_id as string); } async ownsAgent( platform: string, userId: string, - agentId: string + agentId: string, + organizationId?: string ): Promise { - const agents = await this.listAgents(platform, userId); + const agents = await this.listAgents(platform, userId, organizationId); return agents.includes(agentId); } } diff --git a/packages/server/src/gateway/channels/binding-service.ts b/packages/server/src/gateway/channels/binding-service.ts index 8c263a3c2..888159e0b 100644 --- a/packages/server/src/gateway/channels/binding-service.ts +++ b/packages/server/src/gateway/channels/binding-service.ts @@ -1,5 +1,6 @@ import { createLogger } from "@lobu/core"; import { getDb } from "../../db/client.js"; +import { tryGetOrgId } from "../../lobu/stores/org-context.js"; const logger = createLogger("channel-binding-service"); @@ -39,22 +40,40 @@ export class ChannelBindingService { async getBinding( platform: string, channelId: string, - teamId?: string + teamId?: string, + organizationId?: string ): Promise { const sql = getDb(); + const orgId = organizationId ?? tryGetOrgId(); const rows = teamId - ? await sql` - SELECT * FROM agent_channel_bindings - WHERE platform = ${platform} - AND channel_id = ${channelId} - AND team_id = ${teamId} - ` - : await sql` - SELECT * FROM agent_channel_bindings - WHERE platform = ${platform} - AND channel_id = ${channelId} - AND team_id IS NULL - `; + ? orgId + ? await sql` + SELECT * FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} + AND channel_id = ${channelId} + AND team_id = ${teamId} + ` + : await sql` + SELECT * FROM agent_channel_bindings + WHERE platform = ${platform} + AND channel_id = ${channelId} + AND team_id = ${teamId} + ` + : orgId + ? await sql` + SELECT * FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} + AND channel_id = ${channelId} + AND team_id IS NULL + ` + : await sql` + SELECT * FROM agent_channel_bindings + WHERE platform = ${platform} + AND channel_id = ${channelId} + AND team_id IS NULL + `; if (rows.length === 0) return null; return rowToBinding(rows[0]); } @@ -64,27 +83,35 @@ export class ChannelBindingService { platform: string, channelId: string, teamId?: string, - _options?: { configuredBy?: string; wasAdmin?: boolean } + options?: { configuredBy?: string; wasAdmin?: boolean; organizationId?: string } ): Promise { const sql = getDb(); + const orgId = options?.organizationId ?? tryGetOrgId(); + if (!orgId) { + throw new Error( + "ChannelBindingService.createBinding requires organizationId (explicit or via orgContext)" + ); + } if (teamId) { // The (platform, channel_id, team_id) UNIQUE covers the team-id-set case. await sql` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${agentId}, ${platform}, ${channelId}, ${teamId}, now()) + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${orgId}, ${agentId}, ${platform}, ${channelId}, ${teamId}, now()) ON CONFLICT (platform, channel_id, team_id) DO UPDATE SET - agent_id = EXCLUDED.agent_id + agent_id = EXCLUDED.agent_id, + organization_id = EXCLUDED.organization_id `; } else { // For team_id IS NULL the unique constraint above doesn't fire (PG // treats NULL as distinct). The companion partial unique index // (agent_channel_bindings_no_team_unique) is what we conflict on. await sql` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${agentId}, ${platform}, ${channelId}, NULL, now()) + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${orgId}, ${agentId}, ${platform}, ${channelId}, NULL, now()) ON CONFLICT (platform, channel_id) WHERE team_id IS NULL - DO UPDATE SET agent_id = EXCLUDED.agent_id + DO UPDATE SET agent_id = EXCLUDED.agent_id, + organization_id = EXCLUDED.organization_id `; } logger.info(`Created binding: ${platform}/${channelId} → ${agentId}`); @@ -94,10 +121,12 @@ export class ChannelBindingService { agentId: string, platform: string, channelId: string, - teamId?: string + teamId?: string, + organizationId?: string ): Promise { const sql = getDb(); - const existing = await this.getBinding(platform, channelId, teamId); + const orgId = organizationId ?? tryGetOrgId(); + const existing = await this.getBinding(platform, channelId, teamId, orgId ?? undefined); if (!existing) { logger.warn(`No binding found for ${platform}/${channelId}`); return false; @@ -110,35 +139,70 @@ export class ChannelBindingService { } if (teamId) { - await sql` - DELETE FROM agent_channel_bindings - WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} - `; + if (orgId) { + await sql` + DELETE FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + `; + } else { + await sql` + DELETE FROM agent_channel_bindings + WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + `; + } } else { - await sql` - DELETE FROM agent_channel_bindings - WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL - `; + if (orgId) { + await sql` + DELETE FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL + `; + } else { + await sql` + DELETE FROM agent_channel_bindings + WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL + `; + } } logger.info(`Deleted binding: ${platform}/${channelId} from ${agentId}`); return true; } - async listBindings(agentId: string): Promise { + async listBindings( + agentId: string, + organizationId?: string + ): Promise { const sql = getDb(); - const rows = await sql` - SELECT * FROM agent_channel_bindings WHERE agent_id = ${agentId} - `; + const orgId = organizationId ?? tryGetOrgId(); + const rows = orgId + ? await sql` + SELECT * FROM agent_channel_bindings + WHERE agent_id = ${agentId} AND organization_id = ${orgId} + ` + : await sql` + SELECT * FROM agent_channel_bindings WHERE agent_id = ${agentId} + `; return rows.map(rowToBinding); } - async deleteAllBindings(agentId: string): Promise { + async deleteAllBindings( + agentId: string, + organizationId?: string + ): Promise { const sql = getDb(); - const rows = await sql` - DELETE FROM agent_channel_bindings - WHERE agent_id = ${agentId} - RETURNING platform, channel_id, team_id - `; + const orgId = organizationId ?? tryGetOrgId(); + const rows = orgId + ? await sql` + DELETE FROM agent_channel_bindings + WHERE agent_id = ${agentId} AND organization_id = ${orgId} + RETURNING platform, channel_id, team_id + ` + : await sql` + DELETE FROM agent_channel_bindings + WHERE agent_id = ${agentId} + RETURNING platform, channel_id, team_id + `; logger.info(`Deleted ${rows.length} bindings for agent ${agentId}`); return rows.length; } diff --git a/packages/server/src/gateway/connections/chat-instance-manager.ts b/packages/server/src/gateway/connections/chat-instance-manager.ts index 3e60926e9..068a927dc 100644 --- a/packages/server/src/gateway/connections/chat-instance-manager.ts +++ b/packages/server/src/gateway/connections/chat-instance-manager.ts @@ -30,7 +30,6 @@ import { } from "../secrets/index.js"; import { resolveAgentOptions } from "../services/platform-helpers.js"; import { orgContext, tryGetOrgId } from "../../lobu/stores/org-context.js"; -import { getAgentOrganizationId } from "../../lobu/stores/postgres-stores.js"; import { ConversationStateStore, type HistoryEntry, @@ -215,11 +214,13 @@ export class ChatInstanceManager { const id = stableId ?? randomUUID().replace(/-/g, "").slice(0, 16); const now = Date.now(); + const organizationId = tryGetOrgId() ?? undefined; const connection: PlatformConnection = { id, platform, ...(agentId ? { agentId } : {}), + ...(organizationId ? { organizationId } : {}), config, settings: settings ?? { allowGroups: true }, metadata, @@ -547,15 +548,14 @@ export class ChatInstanceManager { // owning org keeps per-org secret refs resolvable from every entry // point — no caller has to remember. const callerOrgId = tryGetOrgId(); - if (!callerOrgId && connection.agentId) { - const organizationId = await getAgentOrganizationId( - connection.agentId + if (!callerOrgId && connection.organizationId) { + // Connection rows now carry their owning org id directly — use it + // to set up the AsyncLocalStorage scope before resolving any + // org-scoped secret refs. + return orgContext.run( + { organizationId: connection.organizationId }, + () => this.startInstanceUnscoped(connection) ); - if (organizationId) { - return orgContext.run({ organizationId }, () => - this.startInstanceUnscoped(connection) - ); - } } return this.startInstanceUnscoped(connection); } @@ -1554,6 +1554,7 @@ function storedToPlatform(stored: StoredConnection): PlatformConnection { updatedAt: stored.updatedAt, }; if (stored.agentId) out.agentId = stored.agentId; + if (stored.organizationId) out.organizationId = stored.organizationId; if (stored.errorMessage) out.errorMessage = stored.errorMessage; return out; } diff --git a/packages/server/src/gateway/connections/interaction-bridge.ts b/packages/server/src/gateway/connections/interaction-bridge.ts index f71a3d246..c2b9f1514 100644 --- a/packages/server/src/gateway/connections/interaction-bridge.ts +++ b/packages/server/src/gateway/connections/interaction-bridge.ts @@ -679,7 +679,13 @@ export function registerActionHandlers( if (decision === "deny") { if (grantStore) { await grantStore - .grant(pending.agentId, pattern, null, true) + .grant( + pending.agentId, + pattern, + null, + true, + connection.organizationId + ) .catch(() => undefined); } try { @@ -697,7 +703,13 @@ export function registerActionHandlers( if (grantStore) { try { - await grantStore.grant(pending.agentId, pattern, expiresAt); + await grantStore.grant( + pending.agentId, + pattern, + expiresAt, + undefined, + connection.organizationId + ); logger.info( { requestId, diff --git a/packages/server/src/gateway/connections/message-handler-bridge.ts b/packages/server/src/gateway/connections/message-handler-bridge.ts index 736774f68..4541e573f 100644 --- a/packages/server/src/gateway/connections/message-handler-bridge.ts +++ b/packages/server/src/gateway/connections/message-handler-bridge.ts @@ -331,9 +331,14 @@ export class MessageHandlerBridge { // admin API. Idempotent — agent_users has a (agent_id, platform, user_id) // unique constraint. const userAgentsStore = this.services.getUserAgentsStore(); - if (userAgentsStore) { + if (userAgentsStore && this.connection.organizationId) { try { - await userAgentsStore.addAgent(platform, userId, agentId); + await userAgentsStore.addAgent( + platform, + userId, + agentId, + this.connection.organizationId + ); } catch (error) { logger.warn( { agentId, userId, error: String(error) }, @@ -528,6 +533,7 @@ export class MessageHandlerBridge { conversationId, teamId: isGroup ? channelId : platform, agentId, + organizationId: this.connection.organizationId, messageId, messageText, channelId, diff --git a/packages/server/src/gateway/connections/types.ts b/packages/server/src/gateway/connections/types.ts index 5d3b0b695..a6afb5bc5 100644 --- a/packages/server/src/gateway/connections/types.ts +++ b/packages/server/src/gateway/connections/types.ts @@ -56,6 +56,12 @@ export interface PlatformConnection { id: string; platform: string; agentId?: string; + /** + * Organization id this connection belongs to. Mirrors + * `agent_connections.organization_id`. Optional in the type for + * back-compat with in-memory tests; required at the storage layer. + */ + organizationId?: string; config: PlatformAdapterConfig; settings: ConnectionSettings; metadata: Record; diff --git a/packages/server/src/gateway/embedded.ts b/packages/server/src/gateway/embedded.ts index 0da878b19..6d7030d00 100644 --- a/packages/server/src/gateway/embedded.ts +++ b/packages/server/src/gateway/embedded.ts @@ -10,6 +10,14 @@ export interface ProviderCredentialContext { deploymentName?: string; platform?: string; connectionId?: string; + /** + * Organization id of the agent whose credentials are being looked up. + * Plumbed from the worker spawn path so that org-scoped resources + * (`agent_secrets`, `auth_profiles`) can be located without joining + * through `agents` (agent ids are per-org-unique, so the join is + * ambiguous). + */ + organizationId?: string; /** * The worker's JWT, when the caller is a worker. Providers whose gateway * routes authenticate via worker auth (e.g. Bedrock) use this as the diff --git a/packages/server/src/gateway/infrastructure/queue/queue-producer.ts b/packages/server/src/gateway/infrastructure/queue/queue-producer.ts index 70381677e..59c639f9a 100644 --- a/packages/server/src/gateway/infrastructure/queue/queue-producer.ts +++ b/packages/server/src/gateway/infrastructure/queue/queue-producer.ts @@ -30,6 +30,10 @@ export interface MessagePayload { channelId: string; // Platform channel ID teamId: string; // Team/workspace ID (required for all platforms) agentId: string; // Agent/session ID for isolation (universal identifier) + // Organization id of the agent. Plumbed through so child queries (grants, + // user-agents, channel-bindings, secrets) can scope by org — agent ids + // are per-org-unique, so `agent_id = ?` alone is ambiguous. + organizationId?: string; // Bot & platform info (passed through to worker) botId: string; // Bot identifier diff --git a/packages/server/src/gateway/orchestration/base-deployment-manager.ts b/packages/server/src/gateway/orchestration/base-deployment-manager.ts index eabe06879..e2dbf6d5e 100644 --- a/packages/server/src/gateway/orchestration/base-deployment-manager.ts +++ b/packages/server/src/gateway/orchestration/base-deployment-manager.ts @@ -453,6 +453,7 @@ export abstract class BaseDeploymentManager { messageData: MessagePayload ): Promise { const agentId = messageData.agentId; + const orgId = messageData.organizationId; // Sync networkConfig.allowedDomains to grant store if ( @@ -461,7 +462,7 @@ export abstract class BaseDeploymentManager { messageData.networkConfig?.allowedDomains?.length ) { for (const domain of messageData.networkConfig.allowedDomains) { - await this.grantStore.grant(agentId, domain, null); + await this.grantStore.grant(agentId, domain, null, undefined, orgId); } logger.info( `Synced network config domains as grants for ${deploymentName}: ${messageData.networkConfig.allowedDomains.join(", ")}` @@ -471,7 +472,7 @@ export abstract class BaseDeploymentManager { // Sync operator-pre-approved MCP tool patterns to grant store if (this.grantStore && agentId && messageData.preApprovedTools?.length) { for (const pattern of messageData.preApprovedTools) { - await this.grantStore.grant(agentId, pattern, null); + await this.grantStore.grant(agentId, pattern, null, undefined, orgId); } logger.info( `Synced pre-approved tool patterns as grants for ${deploymentName}: ${messageData.preApprovedTools.join(", ")}` @@ -493,7 +494,7 @@ export abstract class BaseDeploymentManager { "releases.nixos.org", ]; for (const domain of NIX_DOMAINS) { - await this.grantStore.grant(agentId, domain, null); + await this.grantStore.grant(agentId, domain, null, undefined, orgId); } logger.info( `Added Nix cache domains as grants for ${deploymentName}: ${NIX_DOMAINS.join(", ")}` @@ -541,11 +542,13 @@ export abstract class BaseDeploymentManager { return; } + const orgId = messageData.organizationId; + // Revoke patterns that were previously granted but are no longer // present in the current config. for (const pattern of previous) { if (!nextPatterns.has(pattern)) { - await this.grantStore.revoke(agentId, pattern); + await this.grantStore.revoke(agentId, pattern, orgId); } } @@ -553,7 +556,7 @@ export abstract class BaseDeploymentManager { // idempotent, but skipping them saves writes. for (const pattern of nextPatterns) { if (!previous.has(pattern)) { - await this.grantStore.grant(agentId, pattern, null); + await this.grantStore.grant(agentId, pattern, null, undefined, orgId); } } @@ -804,6 +807,7 @@ export abstract class BaseDeploymentManager { typeof platformMetadata?.connectionId === "string" ? platformMetadata.connectionId : undefined, + organizationId: validated.organizationId, }; const workerToken = generateWorkerToken( @@ -963,8 +967,9 @@ export abstract class BaseDeploymentManager { ); if (hasCliBackendProviders && this.grantStore && agentId) { const NPM_DOMAINS = ["registry.npmjs.org", "registry.npmmirror.com"]; + const orgId = validated.organizationId; for (const domain of NPM_DOMAINS) { - await this.grantStore.grant(agentId, domain, null); + await this.grantStore.grant(agentId, domain, null, undefined, orgId); } logger.info( `Added npm registry domains as grants for ${deploymentName}: ${NPM_DOMAINS.join(", ")}` diff --git a/packages/server/src/gateway/permissions/grant-store.ts b/packages/server/src/gateway/permissions/grant-store.ts index dc9ebdb07..9e44df8ec 100644 --- a/packages/server/src/gateway/permissions/grant-store.ts +++ b/packages/server/src/gateway/permissions/grant-store.ts @@ -6,6 +6,7 @@ import { type GrantKind, } from "@lobu/core"; import { getDb, pgTextArray } from "../../db/client.js"; +import { tryGetOrgId } from "../../lobu/stores/org-context.js"; const logger = createLogger("grant-store"); @@ -53,17 +54,24 @@ export class GrantStore { agentId: string, pattern: string, expiresAt: number | null, - denied?: boolean + denied?: boolean, + organizationId?: string ): Promise { pattern = normalizeDomainPattern(pattern); const kind = inferGrantKind(pattern); const expiresAtTs = expiresAt === null ? null : new Date(expiresAt); + const orgId = organizationId ?? tryGetOrgId(); + if (!orgId) { + throw new Error( + "GrantStore.grant requires organizationId (explicit or via orgContext)" + ); + } const sql = getDb(); await sql` - INSERT INTO grants (agent_id, kind, pattern, expires_at, granted_at, denied) - VALUES (${agentId}, ${kind}, ${pattern}, ${expiresAtTs}, now(), ${denied ?? false}) - ON CONFLICT (agent_id, kind, pattern) DO UPDATE SET + INSERT INTO grants (organization_id, agent_id, kind, pattern, expires_at, granted_at, denied) + VALUES (${orgId}, ${agentId}, ${kind}, ${pattern}, ${expiresAtTs}, now(), ${denied ?? false}) + ON CONFLICT (organization_id, agent_id, kind, pattern) DO UPDATE SET expires_at = EXCLUDED.expires_at, granted_at = EXCLUDED.granted_at, denied = EXCLUDED.denied @@ -76,9 +84,14 @@ export class GrantStore { * Checks exact match first, then wildcard parents. * Returns false if the matched grant has `denied: true`. */ - async hasGrant(agentId: string, pattern: string): Promise { + async hasGrant( + agentId: string, + pattern: string, + organizationId?: string + ): Promise { pattern = normalizeDomainPattern(pattern); const kind = inferGrantKind(pattern); + const orgId = organizationId ?? tryGetOrgId(); // Build the candidate pattern set (exact + wildcards) and look them // up in a single query. @@ -106,14 +119,24 @@ export class GrantStore { const sql = getDb(); try { - const rows = await sql` - SELECT pattern, granted_at, expires_at, denied - FROM grants - WHERE agent_id = ${agentId} - AND kind = ${kind} - AND pattern = ANY(${pgTextArray(candidates)}::text[]) - AND (expires_at IS NULL OR expires_at > now()) - `; + const rows = orgId + ? await sql` + SELECT pattern, granted_at, expires_at, denied + FROM grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND kind = ${kind} + AND pattern = ANY(${pgTextArray(candidates)}::text[]) + AND (expires_at IS NULL OR expires_at > now()) + ` + : await sql` + SELECT pattern, granted_at, expires_at, denied + FROM grants + WHERE agent_id = ${agentId} + AND kind = ${kind} + AND pattern = ANY(${pgTextArray(candidates)}::text[]) + AND (expires_at IS NULL OR expires_at > now()) + `; if (rows.length === 0) return false; @@ -137,23 +160,40 @@ export class GrantStore { /** * Check if a pattern is explicitly denied for an agent. */ - async isDenied(agentId: string, pattern: string): Promise { + async isDenied( + agentId: string, + pattern: string, + organizationId?: string + ): Promise { pattern = normalizeDomainPattern(pattern); const kind = inferGrantKind(pattern); const candidates = getDomainGrantCandidates(pattern); + const orgId = organizationId ?? tryGetOrgId(); const sql = getDb(); try { - const rows = await sql<{ denied: boolean }>` - SELECT denied - FROM grants - WHERE agent_id = ${agentId} - AND kind = ${kind} - AND pattern = ANY(${pgTextArray(candidates)}::text[]) - AND (expires_at IS NULL OR expires_at > now()) - AND denied = true - LIMIT 1 - `; + const rows = orgId + ? await sql<{ denied: boolean }>` + SELECT denied + FROM grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND kind = ${kind} + AND pattern = ANY(${pgTextArray(candidates)}::text[]) + AND (expires_at IS NULL OR expires_at > now()) + AND denied = true + LIMIT 1 + ` + : await sql<{ denied: boolean }>` + SELECT denied + FROM grants + WHERE agent_id = ${agentId} + AND kind = ${kind} + AND pattern = ANY(${pgTextArray(candidates)}::text[]) + AND (expires_at IS NULL OR expires_at > now()) + AND denied = true + LIMIT 1 + `; return rows.length > 0; } catch (error) { logger.error("Failed to check denied grant", { @@ -168,16 +208,26 @@ export class GrantStore { /** * List all active grants for an agent. */ - async listGrants(agentId: string): Promise { + async listGrants(agentId: string, organizationId?: string): Promise { const sql = getDb(); + const orgId = organizationId ?? tryGetOrgId(); try { - const rows = await sql` - SELECT pattern, kind, granted_at, expires_at, denied - FROM grants - WHERE agent_id = ${agentId} - AND (expires_at IS NULL OR expires_at > now()) - ORDER BY granted_at DESC - `; + const rows = orgId + ? await sql` + SELECT pattern, kind, granted_at, expires_at, denied + FROM grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND (expires_at IS NULL OR expires_at > now()) + ORDER BY granted_at DESC + ` + : await sql` + SELECT pattern, kind, granted_at, expires_at, denied + FROM grants + WHERE agent_id = ${agentId} + AND (expires_at IS NULL OR expires_at > now()) + ORDER BY granted_at DESC + `; return rows.map((row) => ({ pattern: row.pattern, @@ -195,16 +245,31 @@ export class GrantStore { /** * Revoke a grant for an agent. */ - async revoke(agentId: string, pattern: string): Promise { + async revoke( + agentId: string, + pattern: string, + organizationId?: string + ): Promise { const candidates = getDomainGrantCandidates(pattern); const kind = inferGrantKind(pattern); + const orgId = organizationId ?? tryGetOrgId(); const sql = getDb(); - await sql` - DELETE FROM grants - WHERE agent_id = ${agentId} - AND kind = ${kind} - AND pattern = ANY(${pgTextArray(candidates)}::text[]) - `; + if (orgId) { + await sql` + DELETE FROM grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND kind = ${kind} + AND pattern = ANY(${pgTextArray(candidates)}::text[]) + `; + } else { + await sql` + DELETE FROM grants + WHERE agent_id = ${agentId} + AND kind = ${kind} + AND pattern = ANY(${pgTextArray(candidates)}::text[]) + `; + } logger.info("Revoked grant", { agentId, pattern }); } } diff --git a/packages/server/src/gateway/services/platform-helpers.ts b/packages/server/src/gateway/services/platform-helpers.ts index b88991a67..e34ae4350 100644 --- a/packages/server/src/gateway/services/platform-helpers.ts +++ b/packages/server/src/gateway/services/platform-helpers.ts @@ -194,6 +194,7 @@ export function buildMessagePayload(params: { conversationId: string; teamId: string; agentId: string; + organizationId?: string; messageId: string; messageText: string; channelId: string; @@ -216,6 +217,7 @@ export function buildMessagePayload(params: { conversationId: params.conversationId, teamId: params.teamId, agentId: params.agentId, + organizationId: params.organizationId, messageId: params.messageId, messageText: params.messageText, channelId: params.channelId, diff --git a/packages/server/src/lobu/__tests__/agent-routes-apply.test.ts b/packages/server/src/lobu/__tests__/agent-routes-apply.test.ts index fa1ba6161..a951c8694 100644 --- a/packages/server/src/lobu/__tests__/agent-routes-apply.test.ts +++ b/packages/server/src/lobu/__tests__/agent-routes-apply.test.ts @@ -169,7 +169,7 @@ async function seedAgent(orgId: string, agentId: string): Promise { await sql` INSERT INTO agents (id, organization_id, name) VALUES (${agentId}, ${orgId}, ${agentId}) - ON CONFLICT (id) DO NOTHING + ON CONFLICT (organization_id, id) DO NOTHING `; } @@ -264,21 +264,36 @@ describe('POST /agents — idempotent same-org create', () => { }); }); - test('cross-org collision still returns 409', async () => { + test('cross-org create succeeds — agent ids are per-org-unique', async () => { + // Post-Phase-C the agents PK is (organization_id, id), so two orgs can + // own an agent with the same id without colliding. Pre-seed an agent in + // org-b, then create the same id under org-a — it should succeed + // independently. Closes the multi-month-old footgun where a stale + // `food-ordering` in one org silently blocked another org from using + // the same name. const app = await importAgentRoutes(); - // Pre-seed an agent in org-b so the org-a POST collides cross-org. await seedAgent(ORG_B, 'shared-id'); authStash.organizationId = ORG_A; const response = await app.request('/', { method: 'POST', headers: { 'content-type': 'application/json' }, - body: JSON.stringify({ agentId: 'shared-id', name: 'Should Fail' }), + body: JSON.stringify({ agentId: 'shared-id', name: 'Org A copy' }), }); - expect(response.status).toBe(409); + expect(response.status).toBe(201); const body = (await response.json()) as any; - expect(body.error).toContain('another organization'); + expect(body.agentId).toBe('shared-id'); + expect(body.name).toBe('Org A copy'); + + // Both orgs now have their own row — no leak from one to the other. + const { getDb } = await import('../../db/client.js'); + const sql = getDb(); + const rows = await sql` + SELECT organization_id, name FROM agents WHERE id = 'shared-id' + ORDER BY organization_id + `; + expect(rows).toHaveLength(2); }); }); @@ -538,7 +553,7 @@ describe('concurrent-apply race fixes', () => { }); // Both requests fire before either response — exercises the - // ON CONFLICT (id) DO NOTHING claim path. + // ON CONFLICT (organization_id, id) DO NOTHING claim path. const [r1, r2] = await Promise.all([ app.request('/', { method: 'POST', diff --git a/packages/server/src/lobu/agent-routes.ts b/packages/server/src/lobu/agent-routes.ts index 1c6b0b3c7..13f96b72d 100644 --- a/packages/server/src/lobu/agent-routes.ts +++ b/packages/server/src/lobu/agent-routes.ts @@ -23,7 +23,6 @@ import { AGENT_ID_PATTERN, createPostgresAgentConfigStore, createPostgresAgentConnectionStore, - getAgentOrganizationId, } from './stores/postgres-stores'; import { orgContext } from './stores/org-context'; @@ -531,29 +530,26 @@ routes.post('/', async (c) => { 'lobu', ${user.id}, ${sql.json(ownerMcpServers)}, ${sql.json(ownerPreApprovedTools)}, ${now}, ${now} ) - ON CONFLICT (id) DO NOTHING + ON CONFLICT (organization_id, id) DO NOTHING RETURNING id `; if (inserted.length === 0) { - // Another writer (or a previous apply cycle) already owns this id. - // If they're in this org → idempotent 200. If another org → 409. - const existingOrgId = await getAgentOrganizationId(agentId); - if (existingOrgId === orgId) { - const existing = await configStore.getMetadata(agentId); - if (!existing) { - return c.json({ error: 'Agent metadata missing' }, 500); - } - return c.json( - { - agentId, - name: existing.name, - description: existing.description, - }, - 200 - ); + // Another writer (or a previous apply cycle) already owns this id in + // *this* org. Return idempotent 200 with the existing row's metadata. + // Cross-org collisions are no longer possible — the PK is per-org now. + const existing = await configStore.getMetadata(agentId); + if (!existing) { + return c.json({ error: 'Agent metadata missing' }, 500); } - return c.json({ error: 'Agent ID already exists in another organization' }, 409); + return c.json( + { + agentId, + name: existing.name, + description: existing.description, + }, + 200 + ); } return c.json({ agentId, name, description }, 201); @@ -574,7 +570,7 @@ routes.get('/:agentId', async (c) => { count(*)::int as connection_count, count(*) FILTER (WHERE status = 'active')::int as active_connection_count FROM agent_connections - WHERE agent_id = ${agentId} + WHERE agent_id = ${agentId} AND organization_id = ${organizationId} `; const clientIds = new Set(); const runtimeClientCounts = await countRuntimeMessagingClientsByAgent(organizationId); @@ -859,9 +855,9 @@ routes.put('/:agentId/providers/:providerId/api-key', async (c) => { const ciphertext = encrypt(value); const name = providerOrgSecretName(providerId); - const orgId = await getAgentOrganizationId(agentId); + const orgId = (c.get('organizationId') as string | undefined) ?? null; if (!orgId) { - return c.json({ error: 'Agent has no organization' }, 500); + return c.json({ error: 'Organization context not available' }, 500); } const sql = getDb(); @@ -1199,12 +1195,13 @@ routes.put('/:agentId/platforms/by-stable-id/:stableId', async (c) => { // the manager call). const sql = getDb(); const claimNow = new Date(); + const claimOrgId = c.get('organizationId') as string; const claimed = await sql` INSERT INTO agent_connections ( - id, agent_id, platform, config, settings, metadata, status, created_at, updated_at + id, organization_id, agent_id, platform, config, settings, metadata, status, created_at, updated_at ) VALUES ( - ${stableId}, ${agentId}, ${platform}, + ${stableId}, ${claimOrgId}, ${agentId}, ${platform}, ${sql.json({})}, ${sql.json({})}, ${sql.json({})}, 'stopped', ${claimNow}, ${claimNow} ) diff --git a/packages/server/src/lobu/client-routes.ts b/packages/server/src/lobu/client-routes.ts index b40c4aea9..8b68f3539 100644 --- a/packages/server/src/lobu/client-routes.ts +++ b/packages/server/src/lobu/client-routes.ts @@ -301,8 +301,9 @@ async function listMessagingClients(options: { au.created_at, a.name AS agent_name FROM agent_users au - JOIN agents a ON a.id = au.agent_id - WHERE a.organization_id = ${options.organizationId} + JOIN agents a + ON a.organization_id = au.organization_id AND a.id = au.agent_id + WHERE au.organization_id = ${options.organizationId} ${options.agentId ? sql`AND au.agent_id = ${options.agentId}` : sql``} ORDER BY au.created_at DESC `) as Array<{ diff --git a/packages/server/src/lobu/stores/__tests__/agents-per-org-pk.test.ts b/packages/server/src/lobu/stores/__tests__/agents-per-org-pk.test.ts new file mode 100644 index 000000000..ffe2d8905 --- /dev/null +++ b/packages/server/src/lobu/stores/__tests__/agents-per-org-pk.test.ts @@ -0,0 +1,176 @@ +/** + * Per-org agent-id PK regression test. + * + * Pre-Phase-C, the `agents` table had a single-column PK on `id`, so two orgs + * could not share an agent id (a stale `food-ordering` in one org silently + * blocked another org from using the same name). Phase C swapped the PK to + * `(organization_id, id)` and widened every per-(agent, …) UNIQUE on the FK + * children with `organization_id`. + * + * This test pins that contract end-to-end through the storage interfaces: + * two agents with the same id but different orgs coexist; their grants and + * user-agent associations are independent; and writes scope by org. + */ + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { + cleanupTestDatabase, + getTestDb, +} from '../../../__tests__/setup/test-db'; +import { createTestOrganization } from '../../../__tests__/setup/test-fixtures'; +import { orgContext } from '../org-context'; +import { + createPostgresAgentAccessStore, + createPostgresAgentConfigStore, +} from '../postgres-stores'; + +describe('agents per-org PK — same agent id in two orgs', () => { + let orgA: string; + let orgB: string; + const sharedAgentId = 'shared-id'; + + beforeEach(async () => { + await cleanupTestDatabase(); + const a = await createTestOrganization({ name: 'Org A' }); + const b = await createTestOrganization({ name: 'Org B' }); + orgA = a.id; + orgB = b.id; + }); + + afterEach(async () => { + const db = getTestDb(); + await db`TRUNCATE agents CASCADE`; + }); + + it('lets two orgs each own an agent named "shared-id" without colliding', async () => { + const config = createPostgresAgentConfigStore(); + + await orgContext.run({ organizationId: orgA }, async () => { + await config.saveMetadata(sharedAgentId, { + agentId: sharedAgentId, + name: 'Agent in A', + owner: { platform: 'lobu', userId: 'u-a' }, + createdAt: Date.now(), + }); + }); + + // Same agent id in a different org must succeed (pre-fix: throws + // "already exists in another organization"). + await orgContext.run({ organizationId: orgB }, async () => { + await config.saveMetadata(sharedAgentId, { + agentId: sharedAgentId, + name: 'Agent in B', + owner: { platform: 'lobu', userId: 'u-b' }, + createdAt: Date.now(), + }); + }); + + // Each org sees only its own row. + const fromA = await orgContext.run( + { organizationId: orgA }, + () => config.getMetadata(sharedAgentId) + ); + const fromB = await orgContext.run( + { organizationId: orgB }, + () => config.getMetadata(sharedAgentId) + ); + + expect(fromA?.name).toBe('Agent in A'); + expect(fromB?.name).toBe('Agent in B'); + + // listAgents is org-scoped — neither org sees the other's row. + const listA = await orgContext.run( + { organizationId: orgA }, + () => config.listAgents() + ); + const listB = await orgContext.run( + { organizationId: orgB }, + () => config.listAgents() + ); + expect(listA.map((a) => a.agentId)).toEqual([sharedAgentId]); + expect(listB.map((a) => a.agentId)).toEqual([sharedAgentId]); + + // Deleting in A leaves B's row intact. + await orgContext.run({ organizationId: orgA }, () => + config.deleteMetadata(sharedAgentId) + ); + const stillInB = await orgContext.run( + { organizationId: orgB }, + () => config.getMetadata(sharedAgentId) + ); + const goneFromA = await orgContext.run( + { organizationId: orgA }, + () => config.getMetadata(sharedAgentId) + ); + expect(stillInB?.name).toBe('Agent in B'); + expect(goneFromA).toBeNull(); + }); + + it('keeps agent_grants and agent_users isolated per org for the same agent id', async () => { + const config = createPostgresAgentConfigStore(); + const access = createPostgresAgentAccessStore(); + + // Seed both orgs with the same agent id. + for (const orgId of [orgA, orgB]) { + await orgContext.run({ organizationId: orgId }, async () => { + await config.saveMetadata(sharedAgentId, { + agentId: sharedAgentId, + name: `Agent in ${orgId}`, + owner: { platform: 'lobu', userId: 'u' }, + createdAt: Date.now(), + }); + }); + } + + // Org A grants `*.example.com`; Org B grants `*.other.com`. + await orgContext.run({ organizationId: orgA }, () => + access.grant(sharedAgentId, '*.example.com', null) + ); + await orgContext.run({ organizationId: orgB }, () => + access.grant(sharedAgentId, '*.other.com', null) + ); + + const grantsA = await orgContext.run( + { organizationId: orgA }, + () => access.listGrants(sharedAgentId) + ); + const grantsB = await orgContext.run( + { organizationId: orgB }, + () => access.listGrants(sharedAgentId) + ); + expect(grantsA.map((g) => g.pattern)).toEqual(['*.example.com']); + expect(grantsB.map((g) => g.pattern)).toEqual(['*.other.com']); + + // hasGrant scopes by the active org context. + const aSeesExampleGrant = await orgContext.run( + { organizationId: orgA }, + () => access.hasGrant(sharedAgentId, '*.example.com') + ); + const bSeesExampleGrant = await orgContext.run( + { organizationId: orgB }, + () => access.hasGrant(sharedAgentId, '*.example.com') + ); + expect(aSeesExampleGrant).toBe(true); + expect(bSeesExampleGrant).toBe(false); + + // user-agent associations: same (platform, user, agent) triple in two + // orgs is allowed because the PK is now (organization_id, agent_id, + // platform, user_id). + await orgContext.run({ organizationId: orgA }, () => + access.addUserAgent('telegram', 'tg-user', sharedAgentId) + ); + await orgContext.run({ organizationId: orgB }, () => + access.addUserAgent('telegram', 'tg-user', sharedAgentId) + ); + const ownsInA = await orgContext.run( + { organizationId: orgA }, + () => access.ownsAgent('telegram', 'tg-user', sharedAgentId) + ); + const ownsInB = await orgContext.run( + { organizationId: orgB }, + () => access.ownsAgent('telegram', 'tg-user', sharedAgentId) + ); + expect(ownsInA).toBe(true); + expect(ownsInB).toBe(true); + }); +}); diff --git a/packages/server/src/lobu/stores/postgres-stores.ts b/packages/server/src/lobu/stores/postgres-stores.ts index 2534ceffe..a68491d73 100644 --- a/packages/server/src/lobu/stores/postgres-stores.ts +++ b/packages/server/src/lobu/stores/postgres-stores.ts @@ -34,17 +34,6 @@ export async function agentExistsInOrganization( return rows.length > 0; } -export async function getAgentOrganizationId(agentId: string): Promise { - const sql = getDb(); - const rows = await sql` - SELECT organization_id - FROM agents - WHERE id = ${agentId} - LIMIT 1 - `; - return rows.length > 0 ? (rows[0].organization_id as string) : null; -} - export async function touchAgentLastUsed(organizationId: string, agentId: string): Promise { const sql = getDb(); await sql` @@ -152,6 +141,7 @@ function rowToConnection(row: Record): StoredConnection { id: row.id, platform: row.platform, agentId: row.agent_id ?? undefined, + organizationId: row.organization_id ?? undefined, config: decryptLegacyEncryptedConfig(row.config ?? {}), settings: row.settings ?? {}, metadata: row.metadata ?? {}, @@ -295,7 +285,10 @@ export function createPostgresAgentConfigStore(): AgentConfigStore { const sql = getDb(); const orgId = getOrgId(); const now = new Date(); - const rows = await sql` + // The PK is (organization_id, id) — UPSERT on the composite key. Two + // orgs can independently own an agent with the same id; the conflict + // path here only triggers for re-saves within the *same* org. + await sql` INSERT INTO agents (id, organization_id, name, description, owner_platform, owner_user_id, is_workspace_agent, workspace_id, created_at) VALUES ( @@ -304,7 +297,7 @@ export function createPostgresAgentConfigStore(): AgentConfigStore { ${metadata.isWorkspaceAgent ?? false}, ${metadata.workspaceId ?? null}, ${metadata.createdAt ? new Date(metadata.createdAt) : now} ) - ON CONFLICT (id) DO UPDATE SET + ON CONFLICT (organization_id, id) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description, owner_platform = EXCLUDED.owner_platform, @@ -313,12 +306,7 @@ export function createPostgresAgentConfigStore(): AgentConfigStore { workspace_id = EXCLUDED.workspace_id, last_used_at = ${metadata.lastUsedAt ? new Date(metadata.lastUsedAt) : null}, updated_at = ${now} - WHERE agents.organization_id = EXCLUDED.organization_id - RETURNING organization_id `; - if (rows.length === 0) { - throw new Error(`Agent '${agentId}' already exists in another organization.`); - } }, async updateMetadata(agentId, updates) { const existing = await store.getMetadata(agentId); @@ -362,13 +350,12 @@ export function createPostgresAgentConnectionStore(): AgentConnectionStore { const orgId = tryGetOrgId(); const rows = orgId ? await sql` - SELECT c.* FROM agent_connections c - JOIN agents a ON a.id = c.agent_id - WHERE c.id = ${connectionId} AND a.organization_id = ${orgId} + SELECT * FROM agent_connections + WHERE id = ${connectionId} AND organization_id = ${orgId} ` : await sql` - SELECT c.* FROM agent_connections c - WHERE c.id = ${connectionId} + SELECT * FROM agent_connections + WHERE id = ${connectionId} `; if (rows.length === 0) return null; return rowToConnection(rows[0]); @@ -380,72 +367,69 @@ export function createPostgresAgentConnectionStore(): AgentConnectionStore { if (filter?.agentId && filter?.platform) { const rows = orgId ? await sql` - SELECT c.* FROM agent_connections c - JOIN agents a ON a.id = c.agent_id - WHERE a.organization_id = ${orgId} - AND c.agent_id = ${filter.agentId} - AND c.platform = ${filter.platform} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE organization_id = ${orgId} + AND agent_id = ${filter.agentId} + AND platform = ${filter.platform} + ORDER BY created_at DESC ` : await sql` - SELECT c.* FROM agent_connections c - WHERE c.agent_id = ${filter.agentId} - AND c.platform = ${filter.platform} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE agent_id = ${filter.agentId} + AND platform = ${filter.platform} + ORDER BY created_at DESC `; return rows.map(rowToConnection); } if (filter?.agentId) { const rows = orgId ? await sql` - SELECT c.* FROM agent_connections c - JOIN agents a ON a.id = c.agent_id - WHERE a.organization_id = ${orgId} AND c.agent_id = ${filter.agentId} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE organization_id = ${orgId} AND agent_id = ${filter.agentId} + ORDER BY created_at DESC ` : await sql` - SELECT c.* FROM agent_connections c - WHERE c.agent_id = ${filter.agentId} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE agent_id = ${filter.agentId} + ORDER BY created_at DESC `; return rows.map(rowToConnection); } if (filter?.platform) { const rows = orgId ? await sql` - SELECT c.* FROM agent_connections c - JOIN agents a ON a.id = c.agent_id - WHERE a.organization_id = ${orgId} AND c.platform = ${filter.platform} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE organization_id = ${orgId} AND platform = ${filter.platform} + ORDER BY created_at DESC ` : await sql` - SELECT c.* FROM agent_connections c - WHERE c.platform = ${filter.platform} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE platform = ${filter.platform} + ORDER BY created_at DESC `; return rows.map(rowToConnection); } const rows = orgId ? await sql` - SELECT c.* FROM agent_connections c - JOIN agents a ON a.id = c.agent_id - WHERE a.organization_id = ${orgId} - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + WHERE organization_id = ${orgId} + ORDER BY created_at DESC ` : await sql` - SELECT c.* FROM agent_connections c - ORDER BY c.created_at DESC + SELECT * FROM agent_connections + ORDER BY created_at DESC `; return rows.map(rowToConnection); }, async saveConnection(connection) { const sql = getDb(); + const orgId = getOrgId(); const configToPersist = { ...connection.config }; const existingRows = await sql` SELECT config FROM agent_connections - WHERE id = ${connection.id} + WHERE id = ${connection.id} AND organization_id = ${orgId} LIMIT 1 `; const existingConfig = @@ -471,9 +455,9 @@ export function createPostgresAgentConnectionStore(): AgentConnectionStore { const now = new Date(); await sql` - INSERT INTO agent_connections (id, agent_id, platform, config, settings, metadata, status, error_message, created_at, updated_at) + INSERT INTO agent_connections (id, organization_id, agent_id, platform, config, settings, metadata, status, error_message, created_at, updated_at) VALUES ( - ${connection.id}, ${connection.agentId ?? null}, ${connection.platform}, + ${connection.id}, ${orgId}, ${connection.agentId ?? null}, ${connection.platform}, ${sql.json(configToPersist)}, ${sql.json(connection.settings)}, ${sql.json(connection.metadata)}, ${connection.status}, ${connection.errorMessage ?? null}, ${now}, ${now} ) @@ -499,10 +483,7 @@ export function createPostgresAgentConnectionStore(): AgentConnectionStore { if (orgId) { await sql` DELETE FROM agent_connections - USING agents a - WHERE agent_connections.agent_id = a.id - AND agent_connections.id = ${connectionId} - AND a.organization_id = ${orgId} + WHERE id = ${connectionId} AND organization_id = ${orgId} `; } else { await sql`DELETE FROM agent_connections WHERE id = ${connectionId}`; @@ -510,63 +491,95 @@ export function createPostgresAgentConnectionStore(): AgentConnectionStore { }, async getChannelBinding(platform, channelId, teamId) { const sql = getDb(); + const orgId = tryGetOrgId(); const rows = teamId - ? await sql` - SELECT * FROM agent_channel_bindings - WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} - ` - : await sql` - SELECT * FROM agent_channel_bindings - WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL - `; + ? orgId + ? await sql` + SELECT * FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + ` + : await sql` + SELECT * FROM agent_channel_bindings + WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + ` + : orgId + ? await sql` + SELECT * FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL + ` + : await sql` + SELECT * FROM agent_channel_bindings + WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL + `; if (rows.length === 0) return null; return rowToChannelBinding(rows[0]); }, async createChannelBinding(binding) { const sql = getDb(); + const orgId = getOrgId(); if (binding.teamId) { await sql` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${binding.agentId}, ${binding.platform}, ${binding.channelId}, ${binding.teamId}, now()) + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${orgId}, ${binding.agentId}, ${binding.platform}, ${binding.channelId}, ${binding.teamId}, now()) ON CONFLICT (platform, channel_id, team_id) DO UPDATE SET - agent_id = EXCLUDED.agent_id + agent_id = EXCLUDED.agent_id, + organization_id = EXCLUDED.organization_id `; } else { // PG treats NULL as distinct under the (platform, channel_id, team_id) // UNIQUE; the team_id IS NULL branch upserts via the partial unique // index agent_channel_bindings_no_team_unique. await sql` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${binding.agentId}, ${binding.platform}, ${binding.channelId}, NULL, now()) + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${orgId}, ${binding.agentId}, ${binding.platform}, ${binding.channelId}, NULL, now()) ON CONFLICT (platform, channel_id) WHERE team_id IS NULL - DO UPDATE SET agent_id = EXCLUDED.agent_id + DO UPDATE SET agent_id = EXCLUDED.agent_id, + organization_id = EXCLUDED.organization_id `; } }, async deleteChannelBinding(platform, channelId, teamId) { const sql = getDb(); + const orgId = tryGetOrgId(); if (teamId) { + if (orgId) { + await sql` + DELETE FROM agent_channel_bindings + WHERE organization_id = ${orgId} + AND platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + `; + } else { + await sql` + DELETE FROM agent_channel_bindings + WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + `; + } + return; + } + + if (orgId) { await sql` DELETE FROM agent_channel_bindings - WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id = ${teamId} + WHERE organization_id = ${orgId} + AND platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL + `; + } else { + await sql` + DELETE FROM agent_channel_bindings + WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL `; - return; } - - await sql` - DELETE FROM agent_channel_bindings - WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL - `; }, async listChannelBindings(agentId) { const sql = getDb(); const orgId = tryGetOrgId(); const rows = orgId ? await sql` - SELECT b.* FROM agent_channel_bindings b - JOIN agents a ON a.id = b.agent_id - WHERE b.agent_id = ${agentId} AND a.organization_id = ${orgId} + SELECT * FROM agent_channel_bindings + WHERE agent_id = ${agentId} AND organization_id = ${orgId} ` : await sql` SELECT * FROM agent_channel_bindings WHERE agent_id = ${agentId} @@ -578,11 +591,8 @@ export function createPostgresAgentConnectionStore(): AgentConnectionStore { const orgId = tryGetOrgId(); const rows = orgId ? await sql` - DELETE FROM agent_channel_bindings b - USING agents a - WHERE b.agent_id = a.id - AND b.agent_id = ${agentId} - AND a.organization_id = ${orgId} + DELETE FROM agent_channel_bindings + WHERE agent_id = ${agentId} AND organization_id = ${orgId} RETURNING 1 ` : await sql` @@ -597,11 +607,12 @@ export function createPostgresAgentAccessStore(): AgentAccessStore { return { async grant(agentId, pattern, expiresAt, denied) { const sql = getDb(); + const orgId = getOrgId(); const exp = expiresAt ? new Date(expiresAt) : null; await sql` - INSERT INTO agent_grants (agent_id, pattern, expires_at, granted_at, denied) - VALUES (${agentId}, ${pattern}, ${exp}, now(), ${denied ?? false}) - ON CONFLICT (agent_id, pattern) DO UPDATE SET + INSERT INTO agent_grants (organization_id, agent_id, pattern, expires_at, granted_at, denied) + VALUES (${orgId}, ${agentId}, ${pattern}, ${exp}, now(), ${denied ?? false}) + ON CONFLICT (organization_id, agent_id, pattern) DO UPDATE SET expires_at = EXCLUDED.expires_at, granted_at = EXCLUDED.granted_at, denied = EXCLUDED.denied @@ -609,78 +620,148 @@ export function createPostgresAgentAccessStore(): AgentAccessStore { }, async hasGrant(agentId, pattern) { const sql = getDb(); - const exact = await sql` - SELECT 1 FROM agent_grants - WHERE agent_id = ${agentId} AND pattern = ${pattern} - AND denied IS NOT TRUE - AND (expires_at IS NULL OR expires_at > now()) - LIMIT 1 - `; + const orgId = tryGetOrgId(); + const exact = orgId + ? await sql` + SELECT 1 FROM agent_grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} AND pattern = ${pattern} + AND denied IS NOT TRUE + AND (expires_at IS NULL OR expires_at > now()) + LIMIT 1 + ` + : await sql` + SELECT 1 FROM agent_grants + WHERE agent_id = ${agentId} AND pattern = ${pattern} + AND denied IS NOT TRUE + AND (expires_at IS NULL OR expires_at > now()) + LIMIT 1 + `; if (exact.length > 0) return true; - const grants = await sql` - SELECT pattern FROM agent_grants - WHERE agent_id = ${agentId} - AND denied IS NOT TRUE - AND (expires_at IS NULL OR expires_at > now()) - `; + const grants = orgId + ? await sql` + SELECT pattern FROM agent_grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} + AND denied IS NOT TRUE + AND (expires_at IS NULL OR expires_at > now()) + ` + : await sql` + SELECT pattern FROM agent_grants + WHERE agent_id = ${agentId} + AND denied IS NOT TRUE + AND (expires_at IS NULL OR expires_at > now()) + `; return grants.some((g: any) => wildcardMatch(g.pattern, pattern)); }, async isDenied(agentId, pattern) { const sql = getDb(); - const rows = await sql` - SELECT 1 FROM agent_grants - WHERE agent_id = ${agentId} AND pattern = ${pattern} - AND denied = TRUE - AND (expires_at IS NULL OR expires_at > now()) - LIMIT 1 - `; + const orgId = tryGetOrgId(); + const rows = orgId + ? await sql` + SELECT 1 FROM agent_grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} AND pattern = ${pattern} + AND denied = TRUE + AND (expires_at IS NULL OR expires_at > now()) + LIMIT 1 + ` + : await sql` + SELECT 1 FROM agent_grants + WHERE agent_id = ${agentId} AND pattern = ${pattern} + AND denied = TRUE + AND (expires_at IS NULL OR expires_at > now()) + LIMIT 1 + `; return rows.length > 0; }, async listGrants(agentId) { const sql = getDb(); - const rows = await sql` - SELECT pattern, expires_at, granted_at, denied - FROM agent_grants - WHERE agent_id = ${agentId} - ORDER BY granted_at DESC - `; + const orgId = tryGetOrgId(); + const rows = orgId + ? await sql` + SELECT pattern, expires_at, granted_at, denied + FROM agent_grants + WHERE organization_id = ${orgId} AND agent_id = ${agentId} + ORDER BY granted_at DESC + ` + : await sql` + SELECT pattern, expires_at, granted_at, denied + FROM agent_grants + WHERE agent_id = ${agentId} + ORDER BY granted_at DESC + `; return rows.map(rowToGrant); }, async revokeGrant(agentId, pattern) { const sql = getDb(); - await sql` - DELETE FROM agent_grants WHERE agent_id = ${agentId} AND pattern = ${pattern} - `; + const orgId = tryGetOrgId(); + if (orgId) { + await sql` + DELETE FROM agent_grants + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} AND pattern = ${pattern} + `; + } else { + await sql` + DELETE FROM agent_grants WHERE agent_id = ${agentId} AND pattern = ${pattern} + `; + } }, async addUserAgent(platform, userId, agentId) { const sql = getDb(); + const orgId = getOrgId(); await sql` - INSERT INTO agent_users (agent_id, platform, user_id, created_at) - VALUES (${agentId}, ${platform}, ${userId}, now()) - ON CONFLICT (agent_id, platform, user_id) DO NOTHING + INSERT INTO agent_users (organization_id, agent_id, platform, user_id, created_at) + VALUES (${orgId}, ${agentId}, ${platform}, ${userId}, now()) + ON CONFLICT (organization_id, agent_id, platform, user_id) DO NOTHING `; }, async removeUserAgent(platform, userId, agentId) { const sql = getDb(); - await sql` - DELETE FROM agent_users WHERE agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} - `; + const orgId = tryGetOrgId(); + if (orgId) { + await sql` + DELETE FROM agent_users + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} + `; + } else { + await sql` + DELETE FROM agent_users WHERE agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} + `; + } }, async listUserAgents(platform, userId) { const sql = getDb(); - const rows = await sql` - SELECT agent_id FROM agent_users WHERE platform = ${platform} AND user_id = ${userId} - `; + const orgId = tryGetOrgId(); + const rows = orgId + ? await sql` + SELECT agent_id FROM agent_users + WHERE organization_id = ${orgId} + AND platform = ${platform} AND user_id = ${userId} + ` + : await sql` + SELECT agent_id FROM agent_users WHERE platform = ${platform} AND user_id = ${userId} + `; return rows.map((r: any) => r.agent_id); }, async ownsAgent(platform, userId, agentId) { const sql = getDb(); - const rows = await sql` - SELECT 1 FROM agent_users - WHERE agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} - LIMIT 1 - `; + const orgId = tryGetOrgId(); + const rows = orgId + ? await sql` + SELECT 1 FROM agent_users + WHERE organization_id = ${orgId} + AND agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} + LIMIT 1 + ` + : await sql` + SELECT 1 FROM agent_users + WHERE agent_id = ${agentId} AND platform = ${platform} AND user_id = ${userId} + LIMIT 1 + `; return rows.length > 0; }, }; diff --git a/packages/server/src/preview/__tests__/slack-preview.test.ts b/packages/server/src/preview/__tests__/slack-preview.test.ts index 51a2080fd..be74ed0d4 100644 --- a/packages/server/src/preview/__tests__/slack-preview.test.ts +++ b/packages/server/src/preview/__tests__/slack-preview.test.ts @@ -356,8 +356,8 @@ describe("Public preview — /lobu try a demo agent", () => { const sql = getDb(); await sql` - INSERT INTO agent_connections (id, agent_id, platform, config, settings, metadata, status, created_at, updated_at) - VALUES (${PREVIEW_CONN}, ${CONCIERGE}, 'slack', ${sql.json({ platform: "slack" })}, + INSERT INTO agent_connections (id, organization_id, agent_id, platform, config, settings, metadata, status, created_at, updated_at) + VALUES (${PREVIEW_CONN}, ${PREVIEW_ORG}, ${CONCIERGE}, 'slack', ${sql.json({ platform: "slack" })}, ${sql.json({ previewMode: true })}, ${sql.json({})}, 'active', now(), now()) `; }); diff --git a/packages/server/src/preview/slack.ts b/packages/server/src/preview/slack.ts index e4f973df2..6ec003b4f 100644 --- a/packages/server/src/preview/slack.ts +++ b/packages/server/src/preview/slack.ts @@ -239,13 +239,16 @@ async function upsertBinding( platform: string, channelId: string, teamId: string | undefined, - agentId: string + agentId: string, + organizationId: string ): Promise { if (teamId) { await tx` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${agentId}, ${platform}, ${channelId}, ${teamId}, now()) - ON CONFLICT (platform, channel_id, team_id) DO UPDATE SET agent_id = EXCLUDED.agent_id + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${organizationId}, ${agentId}, ${platform}, ${channelId}, ${teamId}, now()) + ON CONFLICT (platform, channel_id, team_id) DO UPDATE SET + agent_id = EXCLUDED.agent_id, + organization_id = EXCLUDED.organization_id `; } else { await tx` @@ -253,8 +256,8 @@ async function upsertBinding( WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL `; await tx` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${agentId}, ${platform}, ${channelId}, NULL, now()) + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${organizationId}, ${agentId}, ${platform}, ${channelId}, NULL, now()) `; } } @@ -299,7 +302,7 @@ export async function consumePreviewClaim(args: { return { status: 'surface_not_allowed' as const, surfaceType }; } - await upsertBinding(tx, platform, channelId, teamId, claim.agentId); + await upsertBinding(tx, platform, channelId, teamId, claim.agentId, claim.organizationId); // The code was minted by an authenticated `lobu run`, so the sender is the // same Lobu user. Record the chat-platform → Lobu-user link so they can @@ -346,10 +349,9 @@ async function resolvePreviewConnectionOrg( ): Promise<{ organizationId: string; owningAgentId: string } | null> { const sql = getDb(); const rows = (await sql` - SELECT a.organization_id, c.agent_id - FROM agent_connections c - JOIN agents a ON a.id = c.agent_id - WHERE c.id = ${connectionId} + SELECT organization_id, agent_id + FROM agent_connections + WHERE id = ${connectionId} LIMIT 1 `) as Array<{ organization_id: string | null; agent_id: string | null }>; const row = rows[0]; @@ -423,9 +425,11 @@ export async function bindChatToPreviewAgent(args: { const { platform, teamId, channelId } = args; if (teamId) { await sql` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${target.id}, ${platform}, ${channelId}, ${teamId}, now()) - ON CONFLICT (platform, channel_id, team_id) DO UPDATE SET agent_id = EXCLUDED.agent_id + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${org.organizationId}, ${target.id}, ${platform}, ${channelId}, ${teamId}, now()) + ON CONFLICT (platform, channel_id, team_id) DO UPDATE SET + agent_id = EXCLUDED.agent_id, + organization_id = EXCLUDED.organization_id `; } else { await sql` @@ -433,8 +437,8 @@ export async function bindChatToPreviewAgent(args: { WHERE platform = ${platform} AND channel_id = ${channelId} AND team_id IS NULL `; await sql` - INSERT INTO agent_channel_bindings (agent_id, platform, channel_id, team_id, created_at) - VALUES (${target.id}, ${platform}, ${channelId}, NULL, now()) + INSERT INTO agent_channel_bindings (organization_id, agent_id, platform, channel_id, team_id, created_at) + VALUES (${org.organizationId}, ${target.id}, ${platform}, ${channelId}, NULL, now()) `; } return { status: 'bound', agentId: target.id }; @@ -513,14 +517,17 @@ export async function bindChatToAgentForOwner(args: { }): Promise { const { platform, teamId, channelId, agentId, lobuUserId } = args; const sql = getDb(); - const owned = await sql<{ one: number }>` - SELECT 1 AS one + const owned = await sql<{ organization_id: string }>` + SELECT a.organization_id FROM agents a JOIN "member" m ON m."organizationId" = a.organization_id WHERE a.id = ${agentId} AND m."userId" = ${lobuUserId} LIMIT 1 `; if (owned.length === 0) return { status: 'forbidden' }; - await sql.begin((tx) => upsertBinding(tx, platform, channelId, teamId, agentId)); + const organizationId = owned[0].organization_id; + await sql.begin((tx) => + upsertBinding(tx, platform, channelId, teamId, agentId, organizationId) + ); return { status: 'bound' }; }