Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c988f2a
fix: possible race condition where multiple organizations could be cr…
wilsonrivera Aug 22, 2025
8db60b6
chore: remove `console.log`
wilsonrivera Aug 22, 2025
52a85e9
chore: create organization in two steps without lock
wilsonrivera Aug 22, 2025
4f41451
chore: fix incorrect variable
wilsonrivera Aug 22, 2025
cc38fe3
chore: remove fake delay
wilsonrivera Aug 22, 2025
c393048
chore: use transaction for org creation
wilsonrivera Aug 22, 2025
24a33fd
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Aug 25, 2025
af7ebe1
chore: use transaction lock
wilsonrivera Aug 25, 2025
169bc29
chore: introduce lighter method to check if user is already member of…
wilsonrivera Aug 25, 2025
1e29919
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Aug 25, 2025
a113f8b
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Sep 3, 2025
ce68d56
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Sep 3, 2025
d33dc79
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Sep 10, 2025
120dc60
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Sep 30, 2025
d0561df
chore: fast exit when the lock already exists
wilsonrivera Sep 30, 2025
e5971f7
chore: liting
wilsonrivera Sep 30, 2025
6be46ba
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Oct 1, 2025
e07d6e1
Merge branch 'main' into wilson/eng-7670-organization-created-twice
wilsonrivera Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions controlplane/src/core/controllers/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { AuthenticationError } from '../errors/errors.js';
import { OrganizationInvitationRepository } from '../repositories/OrganizationInvitationRepository.js';
import { DefaultNamespace, NamespaceRepository } from '../repositories/NamespaceRepository.js';
import { OrganizationGroupRepository } from '../repositories/OrganizationGroupRepository.js';
import { runLocking } from '../util.js';
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

export type AuthControllerOptions = {
db: PostgresJsDatabase<typeof schema>;
Expand Down Expand Up @@ -255,11 +256,15 @@ const plugin: FastifyPluginCallback<AuthControllerOptions> = function Auth(fasti
return insertedSessions[0];
});

const orgs = await opts.organizationRepository.memberships({
userId,
});
const userOrganizations = await runLocking(userId, async () => {
const orgs = await opts.organizationRepository.memberships({
userId,
});

if (orgs.length > 0) {
return orgs;
}

if (orgs.length === 0) {
await opts.keycloakClient.authenticateClient();

const organizationSlug = uid(8);
Expand Down Expand Up @@ -319,7 +324,9 @@ const plugin: FastifyPluginCallback<AuthControllerOptions> = function Auth(fasti
user_first_name: firstName,
user_last_name: lastName,
});
}

return [];
});

// Create a JWT token containing the session id and user id.
const jwt = await encrypt<UserSession>({
Expand All @@ -343,7 +350,7 @@ const plugin: FastifyPluginCallback<AuthControllerOptions> = function Auth(fasti
} else {
res.redirect(opts.webBaseUrl);
}
} else if (orgs.length === 0) {
} else if (userOrganizations.length === 0) {
res.redirect(opts.webBaseUrl + '?migrate=true');
} else {
res.redirect(opts.webBaseUrl);
Expand Down
22 changes: 22 additions & 0 deletions controlplane/src/core/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const schemaTagRegex = /^(?![/-])[\d/A-Za-z-]+(?<![/-])$/;
const graphNameRegex = /^[\dA-Za-z]+(?:[./@_-][\dA-Za-z]+)*$/;
const pluginVersionRegex = /^v\d+$/;

const lockedPromises = new Map<string, Promise<unknown>>();

/**
* Wraps a function with a try/catch block and logs any errors that occur.
* If the error is a public error, it is returned as a response message.
Expand Down Expand Up @@ -645,3 +647,23 @@ export function newCompositionOptions(disableResolvabilityValidation?: boolean):
disableResolvabilityValidation,
};
}

export function runLocking<TResult>(key: string, action: () => Promise<TResult>): Promise<TResult> {
const existingPromise = lockedPromises.get(key);
if (existingPromise) {
return existingPromise as Promise<TResult>;
}

const promise = (async () => await action())();
lockedPromises.set(key, promise);

promise.finally(() => {
const other = lockedPromises.get(key);
if (other === promise) {
// Remove only if we're still pointing at the same promise (avoid race).
lockedPromises.delete(key);
}
});

return promise;
}
14 changes: 14 additions & 0 deletions controlplane/test/test-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -975,3 +975,17 @@ export const resolvabilitySDLTwo = `
name: String!
}
`;

export function deferred<T = void>() {
let resolve!: (v: T | PromiseLike<T>) => void;
let reject!: (e?: unknown) => void;

const promise = new Promise<T>((_resolve, _reject) => { resolve = _resolve; reject = _reject; });
return { promise, resolve, reject };
}

export async function expectPending(p: Promise<unknown>) {
const timeout = new Promise((resolve) => setTimeout(() => resolve("timeout"), 20));
const race = await Promise.race([p.then(() => "resolved" as const), timeout]);
expect(race).toBe("timeout");
}
104 changes: 102 additions & 2 deletions controlplane/test/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { describe, expect, test } from 'vitest';
import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers, isGoogleCloudStorageUrl } from '../src/core/util.js';
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
import {
isValidLabelMatchers,
mergeUrls,
normalizeLabelMatchers,
isGoogleCloudStorageUrl,
runLocking
} from '../src/core/util.js';
import { deferred, expectPending } from "./test-util.js";

describe('Utils', () => {
test('isValidLabelMatchers', () => {
Expand Down Expand Up @@ -49,4 +56,97 @@ describe('Utils', () => {
expect(isGoogleCloudStorageUrl('https://storage.googleapis.com.evil.com')).toBe(false);
});
});

describe('runLocking', () => {
beforeEach(() => vi.useFakeTimers());
afterEach(() => vi.useRealTimers());

test('that single call returns without locking', async () => {
const result = await runLocking('test', () => Promise.resolve('hello world'));
expect(result).toBe('hello world');
});

test('coalesces concurrent calls for the SAME key', async () => {
const gate = deferred<void>();

const worker = vi.fn(async () => {
await gate.promise; // hold until we release the gate
return "OK";
});

// Start two concurrent calls with the same key
const p1 = runLocking("user:42", worker);
const p2 = runLocking("user:42", worker);

// Both callers must share the exact same Promise
expect(p1).toBe(p2);

// Underlying work should have started only once
expect(worker).toHaveBeenCalledTimes(1);

// Let it finish and verify both callers get the same result
gate.resolve();

const [r1, r2] = await Promise.all([p1, p2]);
expect(r1).toBe("OK");
expect(r2).toBe("OK");
});

test('does NOT lock across DIFFERENT keys (independent execution)', async () => {
const gateA = deferred<void>();
const gateB = deferred<void>();

const worker = vi.fn(async (val: string, gate: ReturnType<typeof deferred<void>>) => {
await gate.promise;
return val;
});

const pA = runLocking("user:A", () => worker("A", gateA));
const pB = runLocking("user:B", () => worker("B", gateB));

// Different keys should produce different Promises and start separate work
expect(pA).not.toBe(pB);
expect(worker).toHaveBeenCalledTimes(2);

// Resolve B first; A should still be pending
gateB.resolve();

const rB = await pB;
expect(rB).toBe("B");

const p = expectPending(pA); // A hasn't been released yet
await vi.advanceTimersByTimeAsync(100);

await p;

// Now resolve A
gateA.resolve();
const rA = await pA;
expect(rA).toBe("A");
})
});

test('starts fresh after a call completes (entry cleanup)', async () => {
// One worker whose gate we can swap between runs
let gate = deferred<void>();
const worker = vi.fn(async () => {
await gate.promise;
return "done";
});

// First run
const p1 = runLocking("user:99", worker);
gate.resolve();
await p1;

// Swap to a new gate and call again — should start a NEW underlying run
gate = deferred<void>();

const p2 = runLocking("user:99", worker);
expect(p2).not.toBe(p1); // new promise after cleanup
expect(worker).toHaveBeenCalledTimes(2);

gate.resolve();
await p2;
})
});