Skip to content

Commit 3d67eaa

Browse files
License fetching concurrency (#77560)
* Switching license polling from a switchMap to an exhaustMap When it's ES is slow to respond with a license, or Kibana is overloaded and is slow making the request or handling the response, we were trying to fetch the license again. This will just skip that refresh interval, and catch it the next time around. * Explicitly ignoring a 429 from triggering a license refresh * The existing unit tests pass! * Only refreshing the license once at a time * Adding test for the onPreResponse licensing handler * Removing errant newline * Removing errant 'foo' * Now with better comments! * Fixing oddity with the exhaustMap * Just a bit of tidying up * Use process.nextTick() instead of the confusing Promise.resolve Co-authored-by: Elastic Machine <[email protected]>
1 parent 8727dc7 commit 3d67eaa

File tree

5 files changed

+56
-28
lines changed

5 files changed

+56
-28
lines changed

x-pack/plugins/licensing/common/license_update.test.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ describe('licensing update', () => {
6666
expect(first.type).toBe('basic');
6767

6868
trigger$.next();
69+
// waiting on a promise gives the exhaustMap time to complete and not de-dupe these calls
70+
await Promise.resolve();
6971
trigger$.next();
7072

7173
const [, second] = await license$.pipe(take(2), toArray()).toPromise();
@@ -89,18 +91,15 @@ describe('licensing update', () => {
8991
expect(fetcher).toHaveBeenCalledTimes(1);
9092
});
9193

92-
it('handles fetcher race condition', async () => {
94+
it('ignores trigger if license fetching is delayed ', async () => {
9395
const delayMs = 100;
94-
let firstCall = true;
95-
const fetcher = jest.fn().mockImplementation(
96+
const fetcher = jest.fn().mockImplementationOnce(
9697
() =>
9798
new Promise((resolve) => {
98-
if (firstCall) {
99-
firstCall = false;
100-
setTimeout(() => resolve(licenseMock.createLicense()), delayMs);
101-
} else {
102-
resolve(licenseMock.createLicense({ license: { type: 'gold' } }));
103-
}
99+
setTimeout(
100+
() => resolve(licenseMock.createLicense({ license: { type: 'gold' } })),
101+
delayMs
102+
);
104103
})
105104
);
106105
const trigger$ = new Subject();
@@ -113,7 +112,7 @@ describe('licensing update', () => {
113112

114113
await delay(delayMs * 2);
115114

116-
await expect(fetcher).toHaveBeenCalledTimes(2);
115+
await expect(fetcher).toHaveBeenCalledTimes(1);
117116
await expect(values).toHaveLength(1);
118117
await expect(values[0].type).toBe('gold');
119118
});
@@ -144,7 +143,7 @@ describe('licensing update', () => {
144143
expect(fetcher).toHaveBeenCalledTimes(0);
145144
});
146145

147-
it('refreshManually guarantees license fetching', async () => {
146+
it(`refreshManually multiple times gets new license`, async () => {
148147
const trigger$ = new Subject();
149148
const firstLicense = licenseMock.createLicense({ license: { uid: 'first', type: 'basic' } });
150149
const secondLicense = licenseMock.createLicense({ license: { uid: 'second', type: 'gold' } });

x-pack/plugins/licensing/common/license_update.ts

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,43 +5,52 @@
55
*/
66
import { ConnectableObservable, Observable, Subject, from, merge } from 'rxjs';
77

8-
import { filter, map, pairwise, switchMap, publishReplay, takeUntil } from 'rxjs/operators';
8+
import {
9+
filter,
10+
map,
11+
pairwise,
12+
exhaustMap,
13+
publishReplay,
14+
share,
15+
take,
16+
takeUntil,
17+
} from 'rxjs/operators';
918
import { hasLicenseInfoChanged } from './has_license_info_changed';
1019
import { ILicense } from './types';
1120

1221
export function createLicenseUpdate(
13-
trigger$: Observable<unknown>,
22+
triggerRefresh$: Observable<unknown>,
1423
stop$: Observable<unknown>,
1524
fetcher: () => Promise<ILicense>,
1625
initialValues?: ILicense
1726
) {
18-
const triggerRefresh$ = trigger$.pipe(switchMap(fetcher));
19-
const manuallyFetched$ = new Subject<ILicense>();
27+
const manuallyRefresh$ = new Subject();
28+
const fetched$ = merge(triggerRefresh$, manuallyRefresh$).pipe(exhaustMap(fetcher), share());
2029

21-
const fetched$ = merge(triggerRefresh$, manuallyFetched$).pipe(
30+
const cached$ = fetched$.pipe(
2231
takeUntil(stop$),
2332
publishReplay(1)
2433
// have to cast manually as pipe operator cannot return ConnectableObservable
2534
// https://github.com/ReactiveX/rxjs/issues/2972
2635
) as ConnectableObservable<ILicense>;
2736

28-
const fetchSubscription = fetched$.connect();
29-
stop$.subscribe({ complete: () => fetchSubscription.unsubscribe() });
37+
const cachedSubscription = cached$.connect();
38+
stop$.subscribe({ complete: () => cachedSubscription.unsubscribe() });
3039

3140
const initialValues$ = initialValues ? from([undefined, initialValues]) : from([undefined]);
3241

33-
const license$: Observable<ILicense> = merge(initialValues$, fetched$).pipe(
42+
const license$: Observable<ILicense> = merge(initialValues$, cached$).pipe(
3443
pairwise(),
3544
filter(([previous, next]) => hasLicenseInfoChanged(previous, next!)),
3645
map(([, next]) => next!)
3746
);
3847

3948
return {
4049
license$,
41-
async refreshManually() {
42-
const license = await fetcher();
43-
manuallyFetched$.next(license);
44-
return license;
50+
refreshManually() {
51+
const licensePromise = fetched$.pipe(take(1)).toPromise();
52+
manuallyRefresh$.next();
53+
return licensePromise;
4554
},
4655
};
4756
}

x-pack/plugins/licensing/public/plugin.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ describe('licensing plugin', () => {
115115
refresh();
116116
} else if (i === 2) {
117117
expect(value.type).toBe('gold');
118-
refresh();
118+
// since this is a synchronous subscription, we need to give the exhaustMap a chance
119+
// to mark the subscription as complete before emitting another value on the Subject
120+
process.nextTick(() => refresh());
119121
} else if (i === 3) {
120122
expect(value.type).toBe('platinum');
121123
done();

x-pack/plugins/licensing/server/on_pre_response_handler.test.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,23 @@ describe('createOnPreResponseHandler', () => {
2525
},
2626
});
2727
});
28-
it('sets license.signature header after refresh for non-error responses', async () => {
28+
it('sets license.signature header immediately for 429 error responses', async () => {
29+
const refresh = jest.fn();
30+
const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' }));
31+
const toolkit = httpServiceMock.createOnPreResponseToolkit();
32+
33+
const interceptor = createOnPreResponseHandler(refresh, license$);
34+
await interceptor(httpServerMock.createKibanaRequest(), { statusCode: 429 }, toolkit);
35+
36+
expect(refresh).toHaveBeenCalledTimes(0);
37+
expect(toolkit.next).toHaveBeenCalledTimes(1);
38+
expect(toolkit.next).toHaveBeenCalledWith({
39+
headers: {
40+
'kbn-license-sig': 'foo',
41+
},
42+
});
43+
});
44+
it('sets license.signature header after refresh for other error responses', async () => {
2945
const updatedLicense = licenseMock.createLicense({ signature: 'bar' });
3046
const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' }));
3147
const refresh = jest.fn().mockImplementation(

x-pack/plugins/licensing/server/on_pre_response_handler.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ export function createOnPreResponseHandler(
1515
return async (req, res, t) => {
1616
// If we're returning an error response, refresh license info from
1717
// Elasticsearch in case the error is due to a change in license information
18-
// in Elasticsearch.
19-
// https://github.com/elastic/x-pack-kibana/pull/2876
20-
if (res.statusCode >= 400) {
18+
// in Elasticsearch. https://github.com/elastic/x-pack-kibana/pull/2876
19+
// We're explicit ignoring a 429 "Too Many Requests". This is being used to communicate
20+
// that back-pressure should be applied, and we don't need to refresh the license in these
21+
// situations.
22+
if (res.statusCode >= 400 && res.statusCode !== 429) {
2123
await refresh();
2224
}
2325
const license = await license$.pipe(take(1)).toPromise();

0 commit comments

Comments
 (0)