Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions x-pack/plugins/licensing/common/license_update.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ describe('licensing update', () => {
expect(first.type).toBe('basic');

trigger$.next();
// waiting on a promise gives the exhaustMap time to complete and not de-dupe these calls
await Promise.resolve();
trigger$.next();

const [, second] = await license$.pipe(take(2), toArray()).toPromise();
Expand All @@ -89,18 +91,15 @@ describe('licensing update', () => {
expect(fetcher).toHaveBeenCalledTimes(1);
});

it('handles fetcher race condition', async () => {
it('ignores trigger if license fetching is delayed ', async () => {
const delayMs = 100;
let firstCall = true;
const fetcher = jest.fn().mockImplementation(
const fetcher = jest.fn().mockImplementationOnce(
() =>
new Promise((resolve) => {
if (firstCall) {
firstCall = false;
setTimeout(() => resolve(licenseMock.createLicense()), delayMs);
} else {
resolve(licenseMock.createLicense({ license: { type: 'gold' } }));
}
setTimeout(
() => resolve(licenseMock.createLicense({ license: { type: 'gold' } })),
delayMs
);
})
);
const trigger$ = new Subject();
Expand All @@ -113,7 +112,7 @@ describe('licensing update', () => {

await delay(delayMs * 2);

await expect(fetcher).toHaveBeenCalledTimes(2);
await expect(fetcher).toHaveBeenCalledTimes(1);
await expect(values).toHaveLength(1);
await expect(values[0].type).toBe('gold');
});
Expand Down Expand Up @@ -144,7 +143,7 @@ describe('licensing update', () => {
expect(fetcher).toHaveBeenCalledTimes(0);
});

it('refreshManually guarantees license fetching', async () => {
it(`refreshManually multiple times gets new license`, async () => {
const trigger$ = new Subject();
const firstLicense = licenseMock.createLicense({ license: { uid: 'first', type: 'basic' } });
const secondLicense = licenseMock.createLicense({ license: { uid: 'second', type: 'gold' } });
Expand Down
33 changes: 21 additions & 12 deletions x-pack/plugins/licensing/common/license_update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,52 @@
*/
import { ConnectableObservable, Observable, Subject, from, merge } from 'rxjs';

import { filter, map, pairwise, switchMap, publishReplay, takeUntil } from 'rxjs/operators';
import {
filter,
map,
pairwise,
exhaustMap,
publishReplay,
share,
take,
takeUntil,
} from 'rxjs/operators';
import { hasLicenseInfoChanged } from './has_license_info_changed';
import { ILicense } from './types';

export function createLicenseUpdate(
trigger$: Observable<unknown>,
triggerRefresh$: Observable<unknown>,
stop$: Observable<unknown>,
fetcher: () => Promise<ILicense>,
initialValues?: ILicense
) {
const triggerRefresh$ = trigger$.pipe(switchMap(fetcher));
const manuallyFetched$ = new Subject<ILicense>();
const manuallyRefresh$ = new Subject();
const fetched$ = merge(triggerRefresh$, manuallyRefresh$).pipe(exhaustMap(fetcher), share());

const fetched$ = merge(triggerRefresh$, manuallyFetched$).pipe(
const cached$ = fetched$.pipe(
takeUntil(stop$),
publishReplay(1)
// have to cast manually as pipe operator cannot return ConnectableObservable
// https://github.com/ReactiveX/rxjs/issues/2972
) as ConnectableObservable<ILicense>;

const fetchSubscription = fetched$.connect();
stop$.subscribe({ complete: () => fetchSubscription.unsubscribe() });
const cachedSubscription = cached$.connect();
stop$.subscribe({ complete: () => cachedSubscription.unsubscribe() });

const initialValues$ = initialValues ? from([undefined, initialValues]) : from([undefined]);

const license$: Observable<ILicense> = merge(initialValues$, fetched$).pipe(
const license$: Observable<ILicense> = merge(initialValues$, cached$).pipe(
pairwise(),
filter(([previous, next]) => hasLicenseInfoChanged(previous, next!)),
map(([, next]) => next!)
);

return {
license$,
async refreshManually() {
const license = await fetcher();
manuallyFetched$.next(license);
return license;
refreshManually() {
const licensePromise = fetched$.pipe(take(1)).toPromise();
manuallyRefresh$.next();
return licensePromise;
},
};
}
4 changes: 3 additions & 1 deletion x-pack/plugins/licensing/public/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ describe('licensing plugin', () => {
refresh();
} else if (i === 2) {
expect(value.type).toBe('gold');
refresh();
// since this is a synchronous subscription, we need to give the exhaustMap a chance
// to mark the subscription as complete before emitting another value on the Subject
process.nextTick(() => refresh());
} else if (i === 3) {
expect(value.type).toBe('platinum');
done();
Expand Down
18 changes: 17 additions & 1 deletion x-pack/plugins/licensing/server/on_pre_response_handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,23 @@ describe('createOnPreResponseHandler', () => {
},
});
});
it('sets license.signature header after refresh for non-error responses', async () => {
it('sets license.signature header immediately for 429 error responses', async () => {
const refresh = jest.fn();
const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' }));
const toolkit = httpServiceMock.createOnPreResponseToolkit();

const interceptor = createOnPreResponseHandler(refresh, license$);
await interceptor(httpServerMock.createKibanaRequest(), { statusCode: 429 }, toolkit);

expect(refresh).toHaveBeenCalledTimes(0);
expect(toolkit.next).toHaveBeenCalledTimes(1);
expect(toolkit.next).toHaveBeenCalledWith({
headers: {
'kbn-license-sig': 'foo',
},
});
});
it('sets license.signature header after refresh for other error responses', async () => {
const updatedLicense = licenseMock.createLicense({ signature: 'bar' });
const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' }));
const refresh = jest.fn().mockImplementation(
Expand Down
8 changes: 5 additions & 3 deletions x-pack/plugins/licensing/server/on_pre_response_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ export function createOnPreResponseHandler(
return async (req, res, t) => {
// If we're returning an error response, refresh license info from
// Elasticsearch in case the error is due to a change in license information
// in Elasticsearch.
// https://github.com/elastic/x-pack-kibana/pull/2876
if (res.statusCode >= 400) {
// in Elasticsearch. https://github.com/elastic/x-pack-kibana/pull/2876
// We're explicit ignoring a 429 "Too Many Requests". This is being used to communicate
// that back-pressure should be applied, and we don't need to refresh the license in these
// situations.
if (res.statusCode >= 400 && res.statusCode !== 429) {
await refresh();
}
const license = await license$.pipe(take(1)).toPromise();
Expand Down