diff --git a/db/migrations/20260528120000_auth_profiles_pending_per_user.sql b/db/migrations/20260528120000_auth_profiles_pending_per_user.sql new file mode 100644 index 000000000..634fc124e --- /dev/null +++ b/db/migrations/20260528120000_auth_profiles_pending_per_user.sql @@ -0,0 +1,35 @@ +-- migrate:up + +-- The old `auth_profiles_pending_unique` partial index keyed pending-row +-- uniqueness on (organization_id, connector_key, profile_kind, provider) with +-- no caller dimension. For `oauth_account` (which is user-personal per the +-- comment in handleCreateAuthProfile and the per-user authData semantics), +-- that blocked two users in the same org from running parallel OAuth flows +-- for the same connector — User B's INSERT collided with User A's in-flight +-- pending row. The semantically correct constraint for oauth_account is +-- per-(user, connector, provider). +-- +-- Other profile_kinds today are written with status='active' from creation +-- (oauth_app, env via upsertConnectorAuthProfiles; browser_session via +-- worker-api), so the old "one pending per kind/provider" bound was unused +-- in practice for them. Drop the constraint there rather than reproduce it +-- under a different shape; createAuthProfile still raises a typed error if +-- a future pending INSERT ever collides on the new index. +DROP INDEX IF EXISTS public.auth_profiles_pending_unique; + +CREATE UNIQUE INDEX auth_profiles_pending_oauth_account_unique + ON public.auth_profiles + USING btree (organization_id, connector_key, provider, created_by) + WHERE (status = 'pending_auth'::text AND profile_kind = 'oauth_account'::text); + +COMMENT ON INDEX public.auth_profiles_pending_oauth_account_unique IS + 'At most one in-flight oauth_account pending-auth profile per (org, connector, provider, user). Lets distinct members start parallel OAuth flows for the same connector; blocks the same user from duplicating their own flow.'; + +-- migrate:down + +DROP INDEX IF EXISTS public.auth_profiles_pending_oauth_account_unique; + +CREATE UNIQUE INDEX auth_profiles_pending_unique + ON public.auth_profiles + USING btree (organization_id, connector_key, profile_kind, provider) + WHERE (status = 'pending_auth'::text); diff --git a/packages/server/src/__tests__/integration/connectors/pending-auth-conflict.test.ts b/packages/server/src/__tests__/integration/connectors/pending-auth-conflict.test.ts new file mode 100644 index 000000000..42f8de13d --- /dev/null +++ b/packages/server/src/__tests__/integration/connectors/pending-auth-conflict.test.ts @@ -0,0 +1,283 @@ +/** + * Regression: the partial unique index `auth_profiles_pending_unique` allows + * exactly one pending oauth_account profile per (org, connector_key, + * profile_kind, provider). Repeat "Create OAuth account" clicks from the web + * UI omit `slug`, so prior to this guard the second insert collided with the + * index and leaked a raw PG `duplicate key value violates unique constraint` + * message into a toast. Cover: + * 1. No-slug repeat call → reuses the existing pending row (idempotent). + * 2. Explicit different slug while a pending row exists → + * PendingAuthConflictError with a friendly message that does NOT + * include "duplicate key" or the constraint name. + * 3. Direct `createAuthProfile` collision → same friendly error class. + */ + +import { beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import type { Env } from '../../../index'; +import { manageAuthProfiles } from '../../../tools/admin/manage_auth_profiles'; +import type { ToolContext } from '../../../tools/registry'; +import { + PendingAuthConflictError, + createAuthProfile, +} from '../../../utils/auth-profiles'; +import { initWorkspaceProvider } from '../../../workspace'; +import { + addUserToOrganization, + createTestConnectorDefinition, + createTestOrganization, + createTestUser, +} from '../../setup/test-fixtures'; +import { cleanupTestDatabase, getTestDb } from '../../setup/test-db'; + +const TEST_ENV = {} as Env; + +function ctxFor(organizationId: string, userId: string): ToolContext { + return { + organizationId, + userId, + memberRole: 'owner', + agentId: null, + isAuthenticated: true, + clientId: null, + scopes: ['mcp:read', 'mcp:write', 'mcp:admin'], + tokenType: 'oauth', + scopedToOrg: true, + allowCrossOrg: false, + } as ToolContext; +} + +async function setupOAuthConnector(orgId: string) { + await createTestConnectorDefinition({ + key: 'demo.oauth', + name: 'Demo OAuth', + organization_id: orgId, + auth_schema: { + methods: [ + { + type: 'oauth', + provider: 'demo', + requiredScopes: ['read'], + clientIdKey: 'DEMO_CLIENT_ID', + clientSecretKey: 'DEMO_CLIENT_SECRET', + }, + ], + }, + feeds_schema: { items: {} }, + }); +} + +describe('auth profiles — pending-auth conflict handling', () => { + beforeAll(async () => { + await initWorkspaceProvider(); + }); + + beforeEach(async () => { + await cleanupTestDatabase(); + }); + + it('repeat create_auth_profile without slug reuses the pending row', async () => { + const org = await createTestOrganization({ name: 'Pending Conflict Org' }); + const user = await createTestUser({ name: 'Pending Conflict User' }); + await addUserToOrganization(user.id, org.id, 'owner'); + const ctx = ctxFor(org.id, user.id); + await setupOAuthConnector(org.id); + + const first = await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'Demo Account', + }, + TEST_ENV, + ctx + ); + expect('auth_profile' in first).toBe(true); + if (!('auth_profile' in first)) throw new Error('first call missing auth_profile'); + const firstProfile = first.auth_profile; + expect(firstProfile.status).toBe('pending_auth'); + + const second = await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'Demo Account', + }, + TEST_ENV, + ctx + ); + expect('auth_profile' in second).toBe(true); + if (!('auth_profile' in second)) throw new Error('second call missing auth_profile'); + expect(second.auth_profile.id).toBe(firstProfile.id); + expect(second.auth_profile.slug).toBe(firstProfile.slug); + expect('connect_token' in second && second.connect_token).toBeTruthy(); + + // Exactly one auth_profile row should exist. + const sql = getTestDb(); + const rows = await sql` + SELECT id FROM auth_profiles + WHERE organization_id = ${org.id} + AND connector_key = 'demo.oauth' + AND profile_kind = 'oauth_account' + `; + expect(rows).toHaveLength(1); + }); + + it('create_auth_profile with a fresh slug while a pending row exists returns a friendly error', async () => { + const org = await createTestOrganization({ name: 'Pending Conflict Org 2' }); + const user = await createTestUser({ name: 'Pending Conflict User 2' }); + await addUserToOrganization(user.id, org.id, 'owner'); + const ctx = ctxFor(org.id, user.id); + await setupOAuthConnector(org.id); + + await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'Demo Account', + slug: 'demo-account-a', + }, + TEST_ENV, + ctx + ); + + const collide = await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'Demo Account B', + slug: 'demo-account-b', + }, + TEST_ENV, + ctx + ); + + expect('error' in collide).toBe(true); + if (!('error' in collide)) throw new Error('expected error response'); + expect(collide.error).toMatch(/already pending authorization/i); + expect(collide.error).not.toMatch(/duplicate key/i); + expect(collide.error).not.toMatch(/auth_profiles_pending_unique/i); + expect(collide.error).toContain('demo-account-a'); + }); + + it('two users in the same org can run parallel pending oauth_account flows', async () => { + const org = await createTestOrganization({ name: 'Parallel Flows Org' }); + const userA = await createTestUser({ name: 'User A' }); + const userB = await createTestUser({ name: 'User B' }); + await addUserToOrganization(userA.id, org.id, 'owner'); + await addUserToOrganization(userB.id, org.id, 'admin'); + await setupOAuthConnector(org.id); + + const resA = await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'A Account', + }, + TEST_ENV, + ctxFor(org.id, userA.id) + ); + expect('auth_profile' in resA).toBe(true); + if (!('auth_profile' in resA)) throw new Error('A missing auth_profile'); + + const resB = await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'B Account', + }, + TEST_ENV, + ctxFor(org.id, userB.id) + ); + expect('auth_profile' in resB).toBe(true); + if (!('auth_profile' in resB)) throw new Error('B missing auth_profile'); + + expect(resB.auth_profile.id).not.toBe(resA.auth_profile.id); + expect(resA.auth_profile.created_by).toBe(userA.id); + expect(resB.auth_profile.created_by).toBe(userB.id); + + // Both pending rows must coexist — the new index is per-user. + const sql = getTestDb(); + const rows = await sql` + SELECT id, created_by FROM auth_profiles + WHERE organization_id = ${org.id} + AND connector_key = 'demo.oauth' + AND profile_kind = 'oauth_account' + AND status = 'pending_auth' + ORDER BY id + `; + expect(rows).toHaveLength(2); + + // User B repeating without a slug still reuses *their own* pending row, + // not A's — per-user dedup. + const resBAgain = await manageAuthProfiles( + { + action: 'create_auth_profile', + connector_key: 'demo.oauth', + profile_kind: 'oauth_account', + display_name: 'B Account again', + }, + TEST_ENV, + ctxFor(org.id, userB.id) + ); + if (!('auth_profile' in resBAgain)) throw new Error('B repeat missing auth_profile'); + expect(resBAgain.auth_profile.id).toBe(resB.auth_profile.id); + }); + + it('createAuthProfile throws PendingAuthConflictError carrying the existing row', async () => { + const org = await createTestOrganization({ name: 'Pending Conflict Org 3' }); + const user = await createTestUser({ name: 'Pending Conflict User 3' }); + await addUserToOrganization(user.id, org.id, 'owner'); + await setupOAuthConnector(org.id); + + const first = await createAuthProfile({ + organizationId: org.id, + connectorKey: 'demo.oauth', + displayName: 'Demo Account', + slug: 'demo-account-first', + profileKind: 'oauth_account', + provider: 'demo', + status: 'pending_auth', + createdBy: user.id, + }); + + await expect( + createAuthProfile({ + organizationId: org.id, + connectorKey: 'demo.oauth', + displayName: 'Demo Account Again', + slug: 'demo-account-second', + profileKind: 'oauth_account', + provider: 'demo', + status: 'pending_auth', + createdBy: user.id, + }) + ).rejects.toBeInstanceOf(PendingAuthConflictError); + + try { + await createAuthProfile({ + organizationId: org.id, + connectorKey: 'demo.oauth', + displayName: 'Demo Account Again', + slug: 'demo-account-third', + profileKind: 'oauth_account', + provider: 'demo', + status: 'pending_auth', + createdBy: user.id, + }); + throw new Error('expected PendingAuthConflictError'); + } catch (err) { + expect(err).toBeInstanceOf(PendingAuthConflictError); + const conflict = err as PendingAuthConflictError; + expect(conflict.existing.id).toBe(first.id); + expect(conflict.existing.slug).toBe('demo-account-first'); + expect(conflict.httpStatus).toBe(409); + expect(conflict.message).not.toMatch(/duplicate key/i); + } + }); +}); diff --git a/packages/server/src/rest-api.ts b/packages/server/src/rest-api.ts index 2a11abd5a..996c6356c 100644 --- a/packages/server/src/rest-api.ts +++ b/packages/server/src/rest-api.ts @@ -238,7 +238,7 @@ export async function restToolProxy( return c.json(toJsonSafe(result)); } catch (error) { if (error instanceof ToolUserError) { - return c.json({ error: error.message }, error.httpStatus as 400 | 404); + return c.json({ error: error.message }, error.httpStatus as 400 | 403 | 404 | 409 | 422); } if (error instanceof ToolNotRegisteredError) { // Registry/frontend drift — surface to Sentry so the next "Tool not diff --git a/packages/server/src/tools/admin/manage_auth_profiles.ts b/packages/server/src/tools/admin/manage_auth_profiles.ts index b1fec0144..bac501731 100644 --- a/packages/server/src/tools/admin/manage_auth_profiles.ts +++ b/packages/server/src/tools/admin/manage_auth_profiles.ts @@ -20,11 +20,13 @@ import { type AuthProfileStatus, createAuthProfile, deleteAuthProfile, + findPendingAuthProfile, getAuthProfileBySlug, getBrowserSessionReadiness, listAuthProfiles, normalizeAuthProfileSlug, normalizeAuthValues, + PendingAuthConflictError, revokeOAuthAppProfileAtomic, setDefaultAuthProfileForConnector, summarizeBrowserSessionAuthData, @@ -503,9 +505,25 @@ async function handleCreateAuthProfile( // fresh connect token if it still needs auth). This lets a connection // reference `auth_profile_slug = ` in the *same* `lobu apply` — // previously this branch never persisted a row, so the slug didn't resolve. - const existing = args.slug + let existing = args.slug ? await getAuthProfileBySlug(ctx.organizationId, normalizeAuthProfileSlug(args.slug, displayName)) : null; + // When no slug was provided, repeat clicks from the UI ("create OAuth + // account") would otherwise hit the partial unique index + // `auth_profiles_pending_oauth_account_unique` and leak a raw PG error + // to the user. Reuse any existing pending row this caller already owns + // for the (connector, provider) tuple instead — same behavior as the + // slug-keyed branch. The index is keyed per-user, so two members can + // still run parallel OAuth flows for the same connector. + if (!existing && !args.slug) { + existing = await findPendingAuthProfile({ + organizationId: ctx.organizationId, + connectorKey, + profileKind: 'oauth_account', + provider: oauthMethod.provider, + createdBy: ctx.userId ?? null, + }); + } if (existing) { if (existing.profile_kind !== 'oauth_account' || existing.connector_key !== connectorKey) { return { @@ -551,17 +569,29 @@ async function handleCreateAuthProfile( // Create a real `pending_auth` row up front so downstream // `auth_profile_slug` lookups (connection create, etc.) resolve. The OAuth // callback flips it to `active` once the user finishes authorization. - const authProfile = await createAuthProfile({ - organizationId: ctx.organizationId, - connectorKey, - displayName, - slug: args.slug, - profileKind: 'oauth_account', - authData: {}, - provider: oauthMethod.provider.toLowerCase(), - status: 'pending_auth', - createdBy: ctx.userId ?? 'api', - }); + // The `findPendingAuthProfile` lookup above is the happy-path dedup; the + // try/catch here closes the race window between that SELECT and this + // INSERT (and surfaces an explicit slug collision with a friendly error + // instead of the raw PG constraint name). + let authProfile: Awaited>; + try { + authProfile = await createAuthProfile({ + organizationId: ctx.organizationId, + connectorKey, + displayName, + slug: args.slug, + profileKind: 'oauth_account', + authData: {}, + provider: oauthMethod.provider.toLowerCase(), + status: 'pending_auth', + createdBy: ctx.userId ?? 'api', + }); + } catch (err) { + if (err instanceof PendingAuthConflictError) { + return { error: err.message }; + } + throw err; + } const requestedScopes = resolveRequestedOAuthScopes(oauthMethod, args.requested_scopes); let connectToken: Awaited>; diff --git a/packages/server/src/utils/auth-profiles.ts b/packages/server/src/utils/auth-profiles.ts index a6e01f252..7f7b018d4 100644 --- a/packages/server/src/utils/auth-profiles.ts +++ b/packages/server/src/utils/auth-profiles.ts @@ -1,5 +1,27 @@ import { getDb } from '../db/client'; import { generateSlug } from './entity-management'; +import { ToolUserError } from './errors'; +import { isUniqueViolation } from './pg-errors'; + +/** + * Thrown when an INSERT into auth_profiles with status='pending_auth' collides + * with the partial unique index `auth_profiles_pending_oauth_account_unique` + * (one pending oauth_account profile per org+connector+provider+user). + * Carries the existing row so callers can choose to recover (return the + * existing profile) instead of surfacing the error. + */ +export class PendingAuthConflictError extends ToolUserError { + readonly existing: AuthProfileRow; + + constructor(existing: AuthProfileRow) { + super( + `An ${existing.profile_kind} auth profile for connector '${existing.connector_key ?? ''}'${existing.provider ? ` (${existing.provider})` : ''} is already pending authorization (slug: '${existing.slug}'). Finish authorizing it or delete it before creating a new one.`, + 409 + ); + this.name = 'PendingAuthConflictError'; + this.existing = existing; + } +} export type AuthProfileKind = | 'env' @@ -332,44 +354,100 @@ export async function createAuthProfile(params: { slug: normalizeAuthProfileSlug(params.slug, params.displayName), }); - const rows = await sql` - INSERT INTO auth_profiles ( - organization_id, - slug, - display_name, - connector_key, - profile_kind, - status, - auth_data, - account_id, - provider, - created_by, - device_worker_id, - browser_kind, - user_data_dir, - cdp_url - ) VALUES ( - ${params.organizationId}, - ${slug}, - ${params.displayName}, - ${params.connectorKey ?? null}, - ${params.profileKind}, - ${params.status ?? 'active'}, - ${sql.json(normalizeAuthData(params.profileKind, params.authData ?? {}))}, - ${params.accountId ?? null}, - ${params.provider ? params.provider.toLowerCase() : null}, - ${params.createdBy ?? null}, - ${params.deviceWorkerId ?? null}, - ${params.browserKind ?? null}, - ${params.userDataDir ?? null}, - ${params.cdpUrl ?? null} - ) - RETURNING ${sql.unsafe(AUTH_PROFILE_COLUMNS)} - `; + const normalizedProvider = params.provider ? params.provider.toLowerCase() : null; + + let rows: unknown[]; + try { + rows = await sql` + INSERT INTO auth_profiles ( + organization_id, + slug, + display_name, + connector_key, + profile_kind, + status, + auth_data, + account_id, + provider, + created_by, + device_worker_id, + browser_kind, + user_data_dir, + cdp_url + ) VALUES ( + ${params.organizationId}, + ${slug}, + ${params.displayName}, + ${params.connectorKey ?? null}, + ${params.profileKind}, + ${params.status ?? 'active'}, + ${sql.json(normalizeAuthData(params.profileKind, params.authData ?? {}))}, + ${params.accountId ?? null}, + ${normalizedProvider}, + ${params.createdBy ?? null}, + ${params.deviceWorkerId ?? null}, + ${params.browserKind ?? null}, + ${params.userDataDir ?? null}, + ${params.cdpUrl ?? null} + ) + RETURNING ${sql.unsafe(AUTH_PROFILE_COLUMNS)} + `; + } catch (err) { + // Partial unique index `auth_profiles_pending_oauth_account_unique` + // enforces one pending oauth_account row per (org, connector_key, + // provider, created_by). Translate a raw 23505 into a structured error + // carrying the existing row so callers can recover (idempotent reuse) + // or surface a clean message instead of leaking the constraint name. + if (isUniqueViolation(err, 'auth_profiles_pending_oauth_account_unique')) { + if ( + params.profileKind === 'oauth_account' && + params.connectorKey !== null && + normalizedProvider !== null + ) { + const existing = await findPendingAuthProfile({ + organizationId: params.organizationId, + connectorKey: params.connectorKey, + profileKind: 'oauth_account', + provider: normalizedProvider, + createdBy: params.createdBy ?? null, + }); + if (existing) throw new PendingAuthConflictError(existing); + } + } + throw err; + } return rows[0] as AuthProfileRow; } +/** + * Find the existing pending_auth profile that the partial unique index would + * collide with. For `oauth_account`, that's keyed per-user; pass `createdBy` + * so the lookup matches the index. Returns null when no pending row exists. + */ +export async function findPendingAuthProfile(params: { + organizationId: string; + connectorKey: string; + profileKind: AuthProfileKind; + provider: string; + createdBy?: string | null; +}): Promise { + const sql = getDb(); + const restrictByUser = params.profileKind === 'oauth_account'; + const rows = await sql` + SELECT ${sql.unsafe(AUTH_PROFILE_COLUMNS)} + FROM auth_profiles + WHERE organization_id = ${params.organizationId} + AND connector_key = ${params.connectorKey} + AND profile_kind = ${params.profileKind} + AND provider = ${params.provider.toLowerCase()} + AND status = 'pending_auth' + ${restrictByUser ? sql`AND created_by IS NOT DISTINCT FROM ${params.createdBy ?? null}` : sql``} + LIMIT 1 + `; + return rows.length > 0 ? (rows[0] as AuthProfileRow) : null; +} + export async function updateAuthProfile(params: { organizationId: string; slug: string;