Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,31 +479,4 @@ describe('getOAuthAuthorizationCodeAccessToken', () => {
);
});
});

describe('concurrency lock', () => {
it('queues concurrent calls for the same connector so only one refresh runs', async () => {
const lockedConnectorId = 'connector-lock-test';
// First call inside the lock sees an expired token and refreshes it.
// Second call (queued behind the first) re-fetches and sees the valid token.
connectorTokenClient.get
.mockResolvedValueOnce({
hasErrors: false,
connectorToken: { ...expiredToken, connectorId: lockedConnectorId },
})
.mockResolvedValueOnce({
hasErrors: false,
connectorToken: { ...validToken, connectorId: lockedConnectorId },
});
(requestOAuthRefreshToken as jest.Mock).mockResolvedValueOnce(refreshResponse);

const [result1, result2] = await Promise.all([
getOAuthAuthorizationCodeAccessToken({ ...baseOpts, connectorId: lockedConnectorId }),
getOAuthAuthorizationCodeAccessToken({ ...baseOpts, connectorId: lockedConnectorId }),
]);

expect(requestOAuthRefreshToken).toHaveBeenCalledTimes(1);
expect(result1).toBe('Bearer new-access-token');
expect(result2).toBe('stored-access-token');
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ const expiredToken = {
expiresAt: new Date('2024-01-15T11:00:00.000Z').toISOString(),
};

// Per-user token: access/refresh stored under credentials.accessToken / credentials.refreshToken
const validPerUserToken = {
id: 'token-1',
profileUid: 'profile-1',
connectorId: 'connector-1',
credentialType: 'oauth',
credentials: {
accessToken: 'stored-per-user-access-token',
refreshToken: 'stored-per-user-refresh-token',
},
createdAt: new Date('2024-01-15T10:00:00.000Z').toISOString(),
updatedAt: new Date('2024-01-15T10:00:00.000Z').toISOString(),
expiresAt: new Date('2024-01-15T13:00:00.000Z').toISOString(),
refreshTokenExpiresAt: new Date('2024-01-22T12:00:00.000Z').toISOString(),
};

const expiredPerUserToken = {
...validPerUserToken,
expiresAt: new Date('2024-01-15T11:00:00.000Z').toISOString(),
};

const refreshResponse = {
tokenType: 'Bearer',
accessToken: 'new-access-token',
Expand Down Expand Up @@ -252,4 +273,101 @@ describe('getStoredTokenWithRefresh', () => {
);
});
});

describe('concurrency lock', () => {
it('queues concurrent calls for the same connector so only one refresh runs', async () => {
const lockedConnectorId = 'connector-lock-shared';
// First call inside the lock sees an expired token and refreshes it.
// Second call (queued behind the first) re-fetches and sees the valid token.
connectorTokenClient.get
.mockResolvedValueOnce({
hasErrors: false,
connectorToken: { ...expiredToken, connectorId: lockedConnectorId },
})
.mockResolvedValueOnce({
hasErrors: false,
connectorToken: { ...validToken, connectorId: lockedConnectorId },
});
refreshFn.mockResolvedValueOnce(refreshResponse);

const [result1, result2] = await Promise.all([
getStoredTokenWithRefresh({ ...baseOpts, connectorId: lockedConnectorId }),
getStoredTokenWithRefresh({ ...baseOpts, connectorId: lockedConnectorId }),
]);

expect(refreshFn).toHaveBeenCalledTimes(1);
expect(result1).toBe('Bearer new-access-token');
expect(result2).toBe('stored-access-token');
});

it('queues concurrent per-user calls for the same connector+user so only one refresh runs', async () => {
const lockedConnectorId = 'connector-lock-per-user-same';
// Same profileUid => same lock key => serialized.
// First call refreshes, second call re-fetches and sees the valid token.
connectorTokenClient.get
.mockResolvedValueOnce({
hasErrors: false,
connectorToken: { ...expiredPerUserToken, connectorId: lockedConnectorId },
})
.mockResolvedValueOnce({
hasErrors: false,
connectorToken: { ...validPerUserToken, connectorId: lockedConnectorId },
});
refreshFn.mockResolvedValueOnce(refreshResponse);

const opts = {
...baseOpts,
connectorId: lockedConnectorId,
isPerUser: true as const,
profileUid: 'profile-1',
};
const [result1, result2] = await Promise.all([
getStoredTokenWithRefresh(opts),
getStoredTokenWithRefresh(opts),
]);

expect(refreshFn).toHaveBeenCalledTimes(1);
expect(result1).toBe('Bearer new-access-token');
expect(result2).toBe('stored-per-user-access-token');
});

it('allows concurrent per-user calls for different users on the same connector to refresh independently', async () => {
const lockedConnectorId = 'connector-lock-per-user-diff';
// Different profileUids => different lock keys => independent execution.
// Both users see an expired token and each triggers its own refresh.
const expiredUser1Token = { ...expiredPerUserToken, connectorId: lockedConnectorId };
const expiredUser2Token = {
...expiredPerUserToken,
id: 'token-2',
profileUid: 'profile-2',
connectorId: lockedConnectorId,
};
connectorTokenClient.get
.mockResolvedValueOnce({ hasErrors: false, connectorToken: expiredUser1Token })
.mockResolvedValueOnce({ hasErrors: false, connectorToken: expiredUser2Token });
refreshFn
.mockResolvedValueOnce(refreshResponse)
.mockResolvedValueOnce({ ...refreshResponse, accessToken: 'new-access-token-user2' });

const [result1, result2] = await Promise.all([
getStoredTokenWithRefresh({
...baseOpts,
connectorId: lockedConnectorId,
isPerUser: true,
profileUid: 'profile-1',
}),
getStoredTokenWithRefresh({
...baseOpts,
connectorId: lockedConnectorId,
isPerUser: true,
profileUid: 'profile-2',
}),
]);

// Each user refreshed independently — no sharing of the lock
expect(refreshFn).toHaveBeenCalledTimes(2);
expect(result1).toBe('Bearer new-access-token');
expect(result2).toBe('Bearer new-access-token-user2');
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import type { Logger } from '@kbn/core/server';
import type { OAuthTokenResponse } from './request_oauth_token';
import type { ConnectorToken, ConnectorTokenClientContract, UserConnectorToken } from '../types';

// Per-connector locks to prevent concurrent token refreshes for the same connector
// Per-connector (or per-connector-per-user) locks to prevent concurrent token refreshes
const tokenRefreshLocks = new Map<string, ReturnType<typeof pLimit>>();

function getOrCreateLock(connectorId: string): ReturnType<typeof pLimit> {
if (!tokenRefreshLocks.has(connectorId)) {
tokenRefreshLocks.set(connectorId, pLimit(1));
function getOrCreateLock(lockKey: string): ReturnType<typeof pLimit> {
if (!tokenRefreshLocks.has(lockKey)) {
tokenRefreshLocks.set(lockKey, pLimit(1));
}
return tokenRefreshLocks.get(connectorId)!;
return tokenRefreshLocks.get(lockKey)!;
}

export interface GetStoredTokenWithRefreshOpts {
Expand Down Expand Up @@ -94,8 +94,10 @@ export const getStoredTokenWithRefresh = async ({
authMode,
refreshFn,
}: GetStoredTokenWithRefreshOpts): Promise<string | null> => {
// Acquire lock for this connector to prevent concurrent token refreshes
const lock = getOrCreateLock(connectorId);
// Acquire lock scoped to the connector (shared mode) or to the connector + user (per-user mode),
// so concurrent requests for different users don't block each other unnecessarily.
const lockKey = isPerUser ? `${connectorId}:${profileUid}` : connectorId;
const lock = getOrCreateLock(lockKey);

const result = await lock(async () => {
// Re-fetch token inside lock - another request may have already refreshed it
Expand Down
Loading