From c988f2a399c5f8f452a44458cd46c102c02695ec Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Fri, 22 Aug 2025 13:20:01 -0400 Subject: [PATCH 01/10] fix: possible race condition where multiple organizations could be created --- controlplane/src/core/controllers/auth.ts | 19 ++-- controlplane/src/core/util.ts | 24 +++++ controlplane/test/test-util.ts | 14 +++ controlplane/test/utils.test.ts | 104 +++++++++++++++++++++- 4 files changed, 153 insertions(+), 8 deletions(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index c49688d4fe..975d300960 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -18,6 +18,7 @@ import { AuthenticationError } from '../errors/errors.js'; import { OrganizationInvitationRepository } from '../repositories/OrganizationInvitationRepository.js'; import { DefaultNamespace, NamespaceRepository } from '../repositories/NamespaceRepository.js'; import { OrganizationGroupRepository } from '../repositories/OrganizationGroupRepository.js'; +import { runLocking } from '../util.js'; export type AuthControllerOptions = { db: PostgresJsDatabase; @@ -255,11 +256,15 @@ const plugin: FastifyPluginCallback = function Auth(fasti return insertedSessions[0]; }); - const orgs = await opts.organizationRepository.memberships({ - userId, - }); + const userOrganizations = await runLocking(userId, async () => { + const orgs = await opts.organizationRepository.memberships({ + userId, + }); + + if (orgs.length > 0) { + return orgs; + } - if (orgs.length === 0) { await opts.keycloakClient.authenticateClient(); const organizationSlug = uid(8); @@ -319,7 +324,9 @@ const plugin: FastifyPluginCallback = function Auth(fasti user_first_name: firstName, user_last_name: lastName, }); - } + + return []; + }); // Create a JWT token containing the session id and user id. const jwt = await encrypt({ @@ -343,7 +350,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti } else { res.redirect(opts.webBaseUrl); } - } else if (orgs.length === 0) { + } else if (userOrganizations.length === 0) { res.redirect(opts.webBaseUrl + '?migrate=true'); } else { res.redirect(opts.webBaseUrl); diff --git a/controlplane/src/core/util.ts b/controlplane/src/core/util.ts index 9b6c17a3fd..31862c35b5 100644 --- a/controlplane/src/core/util.ts +++ b/controlplane/src/core/util.ts @@ -42,6 +42,8 @@ const schemaTagRegex = /^(?![/-])[\d/A-Za-z-]+(?>(); + /** * Wraps a function with a try/catch block and logs any errors that occur. * If the error is a public error, it is returned as a response message. @@ -645,3 +647,25 @@ export function newCompositionOptions(disableResolvabilityValidation?: boolean): disableResolvabilityValidation, }; } + +export function runLocking(key: string, action: () => Promise): Promise { + const existingPromise = lockedPromises.get(key); + if (existingPromise) { + return existingPromise as Promise; + } + + const promise = (async () => await action())(); + lockedPromises.set(key, promise); + + promise.finally(() => { + const other = lockedPromises.get(key); + if (other === promise) { + // Remove only if we're still pointing at the same promise (avoid race). + lockedPromises.delete(key); + } + + console.log(lockedPromises); + }); + + return promise; +} diff --git a/controlplane/test/test-util.ts b/controlplane/test/test-util.ts index 66144353a6..e7e2484dcf 100644 --- a/controlplane/test/test-util.ts +++ b/controlplane/test/test-util.ts @@ -975,3 +975,17 @@ export const resolvabilitySDLTwo = ` name: String! } `; + +export function deferred() { + let resolve!: (v: T | PromiseLike) => void; + let reject!: (e?: unknown) => void; + + const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); + return { promise, resolve, reject }; +} + +export async function expectPending(p: Promise) { + const timeout = new Promise((resolve) => setTimeout(() => resolve("timeout"), 20)); + const race = await Promise.race([p.then(() => "resolved" as const), timeout]); + expect(race).toBe("timeout"); +} diff --git a/controlplane/test/utils.test.ts b/controlplane/test/utils.test.ts index aa6f5a6a19..840fe5e312 100644 --- a/controlplane/test/utils.test.ts +++ b/controlplane/test/utils.test.ts @@ -1,5 +1,12 @@ -import { describe, expect, test } from 'vitest'; -import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers, isGoogleCloudStorageUrl } from '../src/core/util.js'; +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; +import { + isValidLabelMatchers, + mergeUrls, + normalizeLabelMatchers, + isGoogleCloudStorageUrl, + runLocking +} from '../src/core/util.js'; +import { deferred, expectPending } from "./test-util.js"; describe('Utils', () => { test('isValidLabelMatchers', () => { @@ -49,4 +56,97 @@ describe('Utils', () => { expect(isGoogleCloudStorageUrl('https://storage.googleapis.com.evil.com')).toBe(false); }); }); + + describe('runLocking', () => { + beforeEach(() => vi.useFakeTimers()); + afterEach(() => vi.useRealTimers()); + + test('that single call returns without locking', async () => { + const result = await runLocking('test', () => Promise.resolve('hello world')); + expect(result).toBe('hello world'); + }); + + test('coalesces concurrent calls for the SAME key', async () => { + const gate = deferred(); + + const worker = vi.fn(async () => { + await gate.promise; // hold until we release the gate + return "OK"; + }); + + // Start two concurrent calls with the same key + const p1 = runLocking("user:42", worker); + const p2 = runLocking("user:42", worker); + + // Both callers must share the exact same Promise + expect(p1).toBe(p2); + + // Underlying work should have started only once + expect(worker).toHaveBeenCalledTimes(1); + + // Let it finish and verify both callers get the same result + gate.resolve(); + + const [r1, r2] = await Promise.all([p1, p2]); + expect(r1).toBe("OK"); + expect(r2).toBe("OK"); + }); + + test('does NOT lock across DIFFERENT keys (independent execution)', async () => { + const gateA = deferred(); + const gateB = deferred(); + + const worker = vi.fn(async (val: string, gate: ReturnType>) => { + await gate.promise; + return val; + }); + + const pA = runLocking("user:A", () => worker("A", gateA)); + const pB = runLocking("user:B", () => worker("B", gateB)); + + // Different keys should produce different Promises and start separate work + expect(pA).not.toBe(pB); + expect(worker).toHaveBeenCalledTimes(2); + + // Resolve B first; A should still be pending + gateB.resolve(); + + const rB = await pB; + expect(rB).toBe("B"); + + const p = expectPending(pA); // A hasn't been released yet + await vi.advanceTimersByTimeAsync(100); + + await p; + + // Now resolve A + gateA.resolve(); + const rA = await pA; + expect(rA).toBe("A"); + }) + }); + + test('starts fresh after a call completes (entry cleanup)', async () => { + // One worker whose gate we can swap between runs + let gate = deferred(); + const worker = vi.fn(async () => { + await gate.promise; + return "done"; + }); + + // First run + const p1 = runLocking("user:99", worker); + gate.resolve(); + await p1; + + // Swap to a new gate and call again — should start a NEW underlying run + gate = deferred(); + + const p2 = runLocking("user:99", worker); + expect(p2).not.toBe(p1); // new promise after cleanup + expect(worker).toHaveBeenCalledTimes(2); + + gate.resolve(); + await p2; + }) }); From 8db60b688167c04f5a508f094a0759d69acdff5d Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Fri, 22 Aug 2025 13:29:43 -0400 Subject: [PATCH 02/10] chore: remove `console.log` --- controlplane/src/core/util.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/controlplane/src/core/util.ts b/controlplane/src/core/util.ts index 31862c35b5..3a97686332 100644 --- a/controlplane/src/core/util.ts +++ b/controlplane/src/core/util.ts @@ -663,8 +663,6 @@ export function runLocking(key: string, action: () => Promise) // Remove only if we're still pointing at the same promise (avoid race). lockedPromises.delete(key); } - - console.log(lockedPromises); }); return promise; From 52a85e905f0337e0858802a9b0613f7514a42f6a Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Fri, 22 Aug 2025 16:47:44 -0400 Subject: [PATCH 03/10] chore: create organization in two steps without lock --- controlplane/src/core/controllers/auth.ts | 49 +++++----- .../repositories/OrganizationRepository.ts | 3 +- controlplane/src/core/util.ts | 22 ----- controlplane/test/test-util.ts | 14 --- controlplane/test/utils.test.ts | 97 +------------------ 5 files changed, 30 insertions(+), 155 deletions(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index 975d300960..85dca97291 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -256,19 +256,33 @@ const plugin: FastifyPluginCallback = function Auth(fasti return insertedSessions[0]; }); - const userOrganizations = await runLocking(userId, async () => { - const orgs = await opts.organizationRepository.memberships({ - userId, - }); + const orgs = await opts.organizationRepository.memberships({ userId }); + if (orgs.length === 0) { + const organizationSlug = uid(8); - if (orgs.length > 0) { - return orgs; - } + // First, we need to create the organization and add the user as an organization member + const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => { + const orgRepo = new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId); - await opts.keycloakClient.authenticateClient(); + // Create the organization... + const inserted = await orgRepo.createOrganization({ + organizationName: userEmail.split('@')[0], + organizationSlug, + ownerID: userId, + }); - const organizationSlug = uid(8); + // ...and add the user as an organization member. + const orgMember = await orgRepo.addOrganizationMember({ + organizationID: inserted.id, + userID: userId, + }); + return [inserted, orgMember]; + }); + + // Finalize the organization setup by seeding the Keycloak group structure + await opts.keycloakClient.authenticateClient(); + await new Promise((resolve) => setTimeout(resolve, 10_000)); const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({ userID: userId, organizationSlug, @@ -279,18 +293,11 @@ const plugin: FastifyPluginCallback = function Auth(fasti const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); const orgGroupRepo = new OrganizationGroupRepository(tx); - const insertedOrg = await orgRepo.createOrganization({ - organizationName: userEmail.split('@')[0], - organizationSlug, - ownerID: userId, + await orgRepo.updateOrganization({ + id: insertedSession.id, kcGroupId: kcRootGroupId, }); - const orgMember = await orgRepo.addOrganizationMember({ - organizationID: insertedOrg.id, - userID: userId, - }); - await orgGroupRepo.importKeycloakGroups({ organizationId: insertedOrg.id, kcGroups: kcCreatedGroups, @@ -324,9 +331,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti user_first_name: firstName, user_last_name: lastName, }); - - return []; - }); + } // Create a JWT token containing the session id and user id. const jwt = await encrypt({ @@ -350,7 +355,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti } else { res.redirect(opts.webBaseUrl); } - } else if (userOrganizations.length === 0) { + } else if (orgs.length === 0) { res.redirect(opts.webBaseUrl + '?migrate=true'); } else { res.redirect(opts.webBaseUrl); diff --git a/controlplane/src/core/repositories/OrganizationRepository.ts b/controlplane/src/core/repositories/OrganizationRepository.ts index 014ce4d045..0327f194c3 100644 --- a/controlplane/src/core/repositories/OrganizationRepository.ts +++ b/controlplane/src/core/repositories/OrganizationRepository.ts @@ -95,12 +95,13 @@ export class OrganizationRepository { return org; } - public async updateOrganization(input: { id: string; slug?: string; name?: string }) { + public async updateOrganization(input: { id: string; slug?: string; name?: string; kcGroupId?: string }) { await this.db .update(organizations) .set({ name: input.name, slug: input.slug, + kcGroupId: input.kcGroupId, }) .where(eq(organizations.id, input.id)) .execute(); diff --git a/controlplane/src/core/util.ts b/controlplane/src/core/util.ts index 3a97686332..9b6c17a3fd 100644 --- a/controlplane/src/core/util.ts +++ b/controlplane/src/core/util.ts @@ -42,8 +42,6 @@ const schemaTagRegex = /^(?![/-])[\d/A-Za-z-]+(?>(); - /** * Wraps a function with a try/catch block and logs any errors that occur. * If the error is a public error, it is returned as a response message. @@ -647,23 +645,3 @@ export function newCompositionOptions(disableResolvabilityValidation?: boolean): disableResolvabilityValidation, }; } - -export function runLocking(key: string, action: () => Promise): Promise { - const existingPromise = lockedPromises.get(key); - if (existingPromise) { - return existingPromise as Promise; - } - - const promise = (async () => await action())(); - lockedPromises.set(key, promise); - - promise.finally(() => { - const other = lockedPromises.get(key); - if (other === promise) { - // Remove only if we're still pointing at the same promise (avoid race). - lockedPromises.delete(key); - } - }); - - return promise; -} diff --git a/controlplane/test/test-util.ts b/controlplane/test/test-util.ts index e7e2484dcf..66144353a6 100644 --- a/controlplane/test/test-util.ts +++ b/controlplane/test/test-util.ts @@ -975,17 +975,3 @@ export const resolvabilitySDLTwo = ` name: String! } `; - -export function deferred() { - let resolve!: (v: T | PromiseLike) => void; - let reject!: (e?: unknown) => void; - - const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject; }); - return { promise, resolve, reject }; -} - -export async function expectPending(p: Promise) { - const timeout = new Promise((resolve) => setTimeout(() => resolve("timeout"), 20)); - const race = await Promise.race([p.then(() => "resolved" as const), timeout]); - expect(race).toBe("timeout"); -} diff --git a/controlplane/test/utils.test.ts b/controlplane/test/utils.test.ts index 840fe5e312..a9cb2cb584 100644 --- a/controlplane/test/utils.test.ts +++ b/controlplane/test/utils.test.ts @@ -1,12 +1,10 @@ -import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; +import { describe, expect, test } from 'vitest'; import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers, isGoogleCloudStorageUrl, - runLocking } from '../src/core/util.js'; -import { deferred, expectPending } from "./test-util.js"; describe('Utils', () => { test('isValidLabelMatchers', () => { @@ -56,97 +54,4 @@ describe('Utils', () => { expect(isGoogleCloudStorageUrl('https://storage.googleapis.com.evil.com')).toBe(false); }); }); - - describe('runLocking', () => { - beforeEach(() => vi.useFakeTimers()); - afterEach(() => vi.useRealTimers()); - - test('that single call returns without locking', async () => { - const result = await runLocking('test', () => Promise.resolve('hello world')); - expect(result).toBe('hello world'); - }); - - test('coalesces concurrent calls for the SAME key', async () => { - const gate = deferred(); - - const worker = vi.fn(async () => { - await gate.promise; // hold until we release the gate - return "OK"; - }); - - // Start two concurrent calls with the same key - const p1 = runLocking("user:42", worker); - const p2 = runLocking("user:42", worker); - - // Both callers must share the exact same Promise - expect(p1).toBe(p2); - - // Underlying work should have started only once - expect(worker).toHaveBeenCalledTimes(1); - - // Let it finish and verify both callers get the same result - gate.resolve(); - - const [r1, r2] = await Promise.all([p1, p2]); - expect(r1).toBe("OK"); - expect(r2).toBe("OK"); - }); - - test('does NOT lock across DIFFERENT keys (independent execution)', async () => { - const gateA = deferred(); - const gateB = deferred(); - - const worker = vi.fn(async (val: string, gate: ReturnType>) => { - await gate.promise; - return val; - }); - - const pA = runLocking("user:A", () => worker("A", gateA)); - const pB = runLocking("user:B", () => worker("B", gateB)); - - // Different keys should produce different Promises and start separate work - expect(pA).not.toBe(pB); - expect(worker).toHaveBeenCalledTimes(2); - - // Resolve B first; A should still be pending - gateB.resolve(); - - const rB = await pB; - expect(rB).toBe("B"); - - const p = expectPending(pA); // A hasn't been released yet - await vi.advanceTimersByTimeAsync(100); - - await p; - - // Now resolve A - gateA.resolve(); - const rA = await pA; - expect(rA).toBe("A"); - }) - }); - - test('starts fresh after a call completes (entry cleanup)', async () => { - // One worker whose gate we can swap between runs - let gate = deferred(); - const worker = vi.fn(async () => { - await gate.promise; - return "done"; - }); - - // First run - const p1 = runLocking("user:99", worker); - gate.resolve(); - await p1; - - // Swap to a new gate and call again — should start a NEW underlying run - gate = deferred(); - - const p2 = runLocking("user:99", worker); - expect(p2).not.toBe(p1); // new promise after cleanup - expect(worker).toHaveBeenCalledTimes(2); - - gate.resolve(); - await p2; - }) }); From 4f414516a55183d3c43b6384f189db392cfa6745 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Fri, 22 Aug 2025 16:50:33 -0400 Subject: [PATCH 04/10] chore: fix incorrect variable --- controlplane/src/core/controllers/auth.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index 85dca97291..5f8a26137e 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -294,7 +294,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti const orgGroupRepo = new OrganizationGroupRepository(tx); await orgRepo.updateOrganization({ - id: insertedSession.id, + id: insertedOrg.id, kcGroupId: kcRootGroupId, }); From cc38fe367e1a7f20941a19421b89ae19783a832c Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Fri, 22 Aug 2025 17:02:12 -0400 Subject: [PATCH 05/10] chore: remove fake delay --- controlplane/src/core/controllers/auth.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index 5f8a26137e..e289cf7309 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -282,7 +282,6 @@ const plugin: FastifyPluginCallback = function Auth(fasti // Finalize the organization setup by seeding the Keycloak group structure await opts.keycloakClient.authenticateClient(); - await new Promise((resolve) => setTimeout(resolve, 10_000)); const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({ userID: userId, organizationSlug, From c393048276a3a2aef39c8c837cf9771a84579ece Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Fri, 22 Aug 2025 17:09:01 -0400 Subject: [PATCH 06/10] chore: use transaction for org creation --- controlplane/src/core/controllers/auth.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index e289cf7309..e76d14284a 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -18,7 +18,6 @@ import { AuthenticationError } from '../errors/errors.js'; import { OrganizationInvitationRepository } from '../repositories/OrganizationInvitationRepository.js'; import { DefaultNamespace, NamespaceRepository } from '../repositories/NamespaceRepository.js'; import { OrganizationGroupRepository } from '../repositories/OrganizationGroupRepository.js'; -import { runLocking } from '../util.js'; export type AuthControllerOptions = { db: PostgresJsDatabase; @@ -262,7 +261,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti // First, we need to create the organization and add the user as an organization member const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => { - const orgRepo = new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId); + const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); // Create the organization... const inserted = await orgRepo.createOrganization({ From af7ebe198ed2b55f422e4f00ea85affa1c6f75c2 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Mon, 25 Aug 2025 16:17:56 -0400 Subject: [PATCH 07/10] chore: use transaction lock --- controlplane/src/core/controllers/auth.ts | 107 +++++++++++----------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index e76d14284a..0c8d909ec5 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -1,7 +1,7 @@ import { FastifyPluginCallback } from 'fastify'; import fp from 'fastify-plugin'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import { eq } from 'drizzle-orm'; +import { eq, sql } from 'drizzle-orm'; import { lru } from 'tiny-lru'; import { uid } from 'uid'; import { PlatformEventName } from '@wundergraph/cosmo-connect/dist/notifications/events_pb'; @@ -255,74 +255,79 @@ const plugin: FastifyPluginCallback = function Auth(fasti return insertedSessions[0]; }); - const orgs = await opts.organizationRepository.memberships({ userId }); - if (orgs.length === 0) { - const organizationSlug = uid(8); + const orgs = await opts.db.transaction(async (tx) => { + await tx.execute(sql`select pg_advisory_xact_lock(hashtext(${userId}))`); - // First, we need to create the organization and add the user as an organization member - const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => { - const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); - - // Create the organization... - const inserted = await orgRepo.createOrganization({ - organizationName: userEmail.split('@')[0], - organizationSlug, - ownerID: userId, - }); + const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); - // ...and add the user as an organization member. - const orgMember = await orgRepo.addOrganizationMember({ - organizationID: inserted.id, - userID: userId, - }); - - return [inserted, orgMember]; - }); + // Retrieve all the organizations the user is a member of + const memberships = await orgRepo.memberships({ userId }); + if (memberships.length > 0) { + // The user is already part of at least one organization + return memberships; + } - // Finalize the organization setup by seeding the Keycloak group structure + // Authenticate on Keycloak and create the organization group await opts.keycloakClient.authenticateClient(); + + const organizationSlug = uid(8); const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({ userID: userId, organizationSlug, realm: opts.keycloakRealm, }); - await opts.db.transaction(async (tx) => { - const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); - const orgGroupRepo = new OrganizationGroupRepository(tx); + // Create the new organization and add the user as a member of the organization + const insertedOrg = await orgRepo.createOrganization({ + organizationName: userEmail.split('@')[0], + organizationSlug, + ownerID: userId, + kcGroupId: kcRootGroupId, + }); - await orgRepo.updateOrganization({ - id: insertedOrg.id, - kcGroupId: kcRootGroupId, - }); + const orgMember = await orgRepo.addOrganizationMember({ + organizationID: insertedOrg.id, + userID: userId, + }); - await orgGroupRepo.importKeycloakGroups({ - organizationId: insertedOrg.id, - kcGroups: kcCreatedGroups, - }); + // Create the organization groups + const orgGroupRepo = new OrganizationGroupRepository(tx); - const orgAdminGroup = await orgGroupRepo.byName({ - organizationId: insertedOrg.id, - name: 'admin', - }); + await orgGroupRepo.importKeycloakGroups({ + organizationId: insertedOrg.id, + kcGroups: kcCreatedGroups, + }); - if (orgAdminGroup) { - await orgGroupRepo.addUserToGroup({ - organizationMemberId: orgMember.id, - groupId: orgAdminGroup.groupId, - }); - } + const orgAdminGroup = await orgGroupRepo.byName({ + organizationId: insertedOrg.id, + name: 'admin', + }); - const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id); - const ns = await namespaceRepo.create({ - name: DefaultNamespace, - createdBy: userId, + if (orgAdminGroup) { + await orgGroupRepo.addUserToGroup({ + organizationMemberId: orgMember.id, + groupId: orgAdminGroup.groupId, }); - if (!ns) { - throw new Error(`Could not create ${DefaultNamespace} namespace`); - } + } + + // Create the default namespace for the organization + const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id); + const ns = await namespaceRepo.create({ + name: DefaultNamespace, + createdBy: userId, }); + if (!ns) { + throw new Error(`Could not create ${DefaultNamespace} namespace`); + } + + // We return an empty even when we just created the organization, that way we can still send the + // user registered webhook and prompt the user to migrate + return []; + }); + + if (orgs.length === 0) { + // Send a notification to the platform that a new user has been created opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, { user_id: userId, user_email: userEmail, From 169bc299b62cd23195d1814eee3e7acc32af662f Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Mon, 25 Aug 2025 16:46:56 -0400 Subject: [PATCH 08/10] chore: introduce lighter method to check if user is already member of one organization --- controlplane/src/core/controllers/auth.ts | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index 0c8d909ec5..9a70ee6ee6 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -260,11 +260,16 @@ const plugin: FastifyPluginCallback = function Auth(fasti const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); - // Retrieve all the organizations the user is a member of - const memberships = await orgRepo.memberships({ userId }); - if (memberships.length > 0) { - // The user is already part of at least one organization - return memberships; + // Check if the user is already a member of at least one organization + const existingMemberships = await tx + .select({ one: sql`1`.as('one') }) + .from(organizationsMembers) + .where(eq(organizationsMembers.userId, userId)) + .limit(1) + .execute(); + + if (existingMemberships.length > 0) { + return existingMemberships.length; } // Authenticate on Keycloak and create the organization group @@ -323,10 +328,10 @@ const plugin: FastifyPluginCallback = function Auth(fasti // We return an empty even when we just created the organization, that way we can still send the // user registered webhook and prompt the user to migrate - return []; + return 0; }); - if (orgs.length === 0) { + if (orgs === 0) { // Send a notification to the platform that a new user has been created opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, { user_id: userId, @@ -358,7 +363,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti } else { res.redirect(opts.webBaseUrl); } - } else if (orgs.length === 0) { + } else if (orgs === 0) { res.redirect(opts.webBaseUrl + '?migrate=true'); } else { res.redirect(opts.webBaseUrl); From d0561df45e58097eb6c2ff0f3d8fff3248ea9416 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Tue, 30 Sep 2025 10:07:19 -0400 Subject: [PATCH 09/10] chore: fast exit when the lock already exists --- controlplane/src/core/controllers/auth.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index 9a70ee6ee6..9690390d1f 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -256,7 +256,14 @@ const plugin: FastifyPluginCallback = function Auth(fasti }); const orgs = await opts.db.transaction(async (tx) => { - await tx.execute(sql`select pg_advisory_xact_lock(hashtext(${userId}))`); + const advisoryLockRows = await tx.execute( + sql`select pg_try_advisory_xact_lock(hashtext(${userId})) as acquired` + ); + + if (!advisoryLockRows?.[0]?.acquired) { + // We need to identify when we failed to acquire the lock because another request already acquired it + return -1; + } const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId); @@ -331,6 +338,12 @@ const plugin: FastifyPluginCallback = function Auth(fasti return 0; }); + if (orgs === -1) { + // We failed to acquire the lock, so we need to retry the request + await res.code(429).send('Slow down'); + return; + } + if (orgs === 0) { // Send a notification to the platform that a new user has been created opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, { From e5971f7493468fb580bf51d42f5558b1401daac4 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Tue, 30 Sep 2025 10:10:45 -0400 Subject: [PATCH 10/10] chore: liting --- controlplane/src/core/controllers/auth.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/core/controllers/auth.ts b/controlplane/src/core/controllers/auth.ts index 9690390d1f..448e482410 100644 --- a/controlplane/src/core/controllers/auth.ts +++ b/controlplane/src/core/controllers/auth.ts @@ -257,7 +257,7 @@ const plugin: FastifyPluginCallback = function Auth(fasti const orgs = await opts.db.transaction(async (tx) => { const advisoryLockRows = await tx.execute( - sql`select pg_try_advisory_xact_lock(hashtext(${userId})) as acquired` + sql`select pg_try_advisory_xact_lock(hashtext(${userId})) as acquired`, ); if (!advisoryLockRows?.[0]?.acquired) {