diff --git a/x-pack/plugins/licensing/common/license_update.test.ts b/x-pack/plugins/licensing/common/license_update.test.ts index b0209e0717da2..4c922e75f8f6f 100644 --- a/x-pack/plugins/licensing/common/license_update.test.ts +++ b/x-pack/plugins/licensing/common/license_update.test.ts @@ -66,6 +66,8 @@ describe('licensing update', () => { expect(first.type).toBe('basic'); trigger$.next(); + // waiting on a promise gives the exhaustMap time to complete and not de-dupe these calls + await Promise.resolve(); trigger$.next(); const [, second] = await license$.pipe(take(2), toArray()).toPromise(); @@ -89,18 +91,15 @@ describe('licensing update', () => { expect(fetcher).toHaveBeenCalledTimes(1); }); - it('handles fetcher race condition', async () => { + it('ignores trigger if license fetching is delayed ', async () => { const delayMs = 100; - let firstCall = true; - const fetcher = jest.fn().mockImplementation( + const fetcher = jest.fn().mockImplementationOnce( () => new Promise((resolve) => { - if (firstCall) { - firstCall = false; - setTimeout(() => resolve(licenseMock.createLicense()), delayMs); - } else { - resolve(licenseMock.createLicense({ license: { type: 'gold' } })); - } + setTimeout( + () => resolve(licenseMock.createLicense({ license: { type: 'gold' } })), + delayMs + ); }) ); const trigger$ = new Subject(); @@ -113,7 +112,7 @@ describe('licensing update', () => { await delay(delayMs * 2); - await expect(fetcher).toHaveBeenCalledTimes(2); + await expect(fetcher).toHaveBeenCalledTimes(1); await expect(values).toHaveLength(1); await expect(values[0].type).toBe('gold'); }); @@ -144,7 +143,7 @@ describe('licensing update', () => { expect(fetcher).toHaveBeenCalledTimes(0); }); - it('refreshManually guarantees license fetching', async () => { + it(`refreshManually multiple times gets new license`, async () => { const trigger$ = new Subject(); const firstLicense = licenseMock.createLicense({ license: { uid: 'first', type: 'basic' } }); const secondLicense = licenseMock.createLicense({ license: { uid: 'second', type: 'gold' } }); diff --git a/x-pack/plugins/licensing/common/license_update.ts b/x-pack/plugins/licensing/common/license_update.ts index 0197ca5396ad1..cd5052b0b49a3 100644 --- a/x-pack/plugins/licensing/common/license_update.ts +++ b/x-pack/plugins/licensing/common/license_update.ts @@ -5,32 +5,41 @@ */ import { ConnectableObservable, Observable, Subject, from, merge } from 'rxjs'; -import { filter, map, pairwise, switchMap, publishReplay, takeUntil } from 'rxjs/operators'; +import { + filter, + map, + pairwise, + exhaustMap, + publishReplay, + share, + take, + takeUntil, +} from 'rxjs/operators'; import { hasLicenseInfoChanged } from './has_license_info_changed'; import { ILicense } from './types'; export function createLicenseUpdate( - trigger$: Observable, + triggerRefresh$: Observable, stop$: Observable, fetcher: () => Promise, initialValues?: ILicense ) { - const triggerRefresh$ = trigger$.pipe(switchMap(fetcher)); - const manuallyFetched$ = new Subject(); + const manuallyRefresh$ = new Subject(); + const fetched$ = merge(triggerRefresh$, manuallyRefresh$).pipe(exhaustMap(fetcher), share()); - const fetched$ = merge(triggerRefresh$, manuallyFetched$).pipe( + const cached$ = fetched$.pipe( takeUntil(stop$), publishReplay(1) // have to cast manually as pipe operator cannot return ConnectableObservable // https://github.com/ReactiveX/rxjs/issues/2972 ) as ConnectableObservable; - const fetchSubscription = fetched$.connect(); - stop$.subscribe({ complete: () => fetchSubscription.unsubscribe() }); + const cachedSubscription = cached$.connect(); + stop$.subscribe({ complete: () => cachedSubscription.unsubscribe() }); const initialValues$ = initialValues ? from([undefined, initialValues]) : from([undefined]); - const license$: Observable = merge(initialValues$, fetched$).pipe( + const license$: Observable = merge(initialValues$, cached$).pipe( pairwise(), filter(([previous, next]) => hasLicenseInfoChanged(previous, next!)), map(([, next]) => next!) @@ -38,10 +47,10 @@ export function createLicenseUpdate( return { license$, - async refreshManually() { - const license = await fetcher(); - manuallyFetched$.next(license); - return license; + refreshManually() { + const licensePromise = fetched$.pipe(take(1)).toPromise(); + manuallyRefresh$.next(); + return licensePromise; }, }; } diff --git a/x-pack/plugins/licensing/public/plugin.test.ts b/x-pack/plugins/licensing/public/plugin.test.ts index 960fe3699e210..c20563dd15913 100644 --- a/x-pack/plugins/licensing/public/plugin.test.ts +++ b/x-pack/plugins/licensing/public/plugin.test.ts @@ -115,7 +115,9 @@ describe('licensing plugin', () => { refresh(); } else if (i === 2) { expect(value.type).toBe('gold'); - refresh(); + // since this is a synchronous subscription, we need to give the exhaustMap a chance + // to mark the subscription as complete before emitting another value on the Subject + process.nextTick(() => refresh()); } else if (i === 3) { expect(value.type).toBe('platinum'); done(); diff --git a/x-pack/plugins/licensing/server/on_pre_response_handler.test.ts b/x-pack/plugins/licensing/server/on_pre_response_handler.test.ts index 5c768a00783a8..af3ec42ab4ec5 100644 --- a/x-pack/plugins/licensing/server/on_pre_response_handler.test.ts +++ b/x-pack/plugins/licensing/server/on_pre_response_handler.test.ts @@ -25,7 +25,23 @@ describe('createOnPreResponseHandler', () => { }, }); }); - it('sets license.signature header after refresh for non-error responses', async () => { + it('sets license.signature header immediately for 429 error responses', async () => { + const refresh = jest.fn(); + const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' })); + const toolkit = httpServiceMock.createOnPreResponseToolkit(); + + const interceptor = createOnPreResponseHandler(refresh, license$); + await interceptor(httpServerMock.createKibanaRequest(), { statusCode: 429 }, toolkit); + + expect(refresh).toHaveBeenCalledTimes(0); + expect(toolkit.next).toHaveBeenCalledTimes(1); + expect(toolkit.next).toHaveBeenCalledWith({ + headers: { + 'kbn-license-sig': 'foo', + }, + }); + }); + it('sets license.signature header after refresh for other error responses', async () => { const updatedLicense = licenseMock.createLicense({ signature: 'bar' }); const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' })); const refresh = jest.fn().mockImplementation( diff --git a/x-pack/plugins/licensing/server/on_pre_response_handler.ts b/x-pack/plugins/licensing/server/on_pre_response_handler.ts index c8befceb4fe32..6428e41b18058 100644 --- a/x-pack/plugins/licensing/server/on_pre_response_handler.ts +++ b/x-pack/plugins/licensing/server/on_pre_response_handler.ts @@ -15,9 +15,11 @@ export function createOnPreResponseHandler( return async (req, res, t) => { // If we're returning an error response, refresh license info from // Elasticsearch in case the error is due to a change in license information - // in Elasticsearch. - // https://github.com/elastic/x-pack-kibana/pull/2876 - if (res.statusCode >= 400) { + // in Elasticsearch. https://github.com/elastic/x-pack-kibana/pull/2876 + // We're explicit ignoring a 429 "Too Many Requests". This is being used to communicate + // that back-pressure should be applied, and we don't need to refresh the license in these + // situations. + if (res.statusCode >= 400 && res.statusCode !== 429) { await refresh(); } const license = await license$.pipe(take(1)).toPromise();