From 52d03e3b9c38075ecb24d249c710a0b3b071e440 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Wed, 5 Feb 2020 21:50:27 +0100 Subject: [PATCH 1/5] Don't start pollEsNodesVersion unless someone subscribes By not polling until subscribed to, we prevent verbose error logs when the optimizer is run (which automatically skips migrations). --- .../server/elasticsearch/elasticsearch_service.ts | 15 ++------------- .../server/saved_objects/saved_objects_service.ts | 8 ++++++++ 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/src/core/server/elasticsearch/elasticsearch_service.ts b/src/core/server/elasticsearch/elasticsearch_service.ts index 9eaf125cc006f..2a7f59be4cb5e 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.ts @@ -18,7 +18,7 @@ */ import { ConnectableObservable, Observable, Subscription } from 'rxjs'; -import { filter, first, map, publishReplay, switchMap, take } from 'rxjs/operators'; +import { filter, first, map, publishReplay, switchMap, take, shareReplay } from 'rxjs/operators'; import { CoreService } from '../../types'; import { merge } from '../../utils'; @@ -164,18 +164,7 @@ export class ElasticsearchService implements CoreService).connect(); - - // TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983 - esNodesCompatibility$.subscribe(({ isCompatible, message }) => { - if (!isCompatible && message) { - this.log.error(message); - } - }); + }).pipe(shareReplay({ refCount: true, bufferSize: 1 })); return { legacy: { config$: clients$.pipe(map(clients => clients.config)) }, diff --git a/src/core/server/saved_objects/saved_objects_service.ts b/src/core/server/saved_objects/saved_objects_service.ts index 2f07dbe51a09e..cd4b7bd895753 100644 --- a/src/core/server/saved_objects/saved_objects_service.ts +++ b/src/core/server/saved_objects/saved_objects_service.ts @@ -299,6 +299,14 @@ export class SavedObjectsService this.logger.info( 'Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations...' ); + + // TODO: Move to Status Service https://github.com/elastic/kibana/issues/41983 + this.setupDeps!.elasticsearch.esNodesCompatibility$.subscribe(({ isCompatible, message }) => { + if (!isCompatible && message) { + this.logger.error(message); + } + }); + await this.setupDeps!.elasticsearch.esNodesCompatibility$.pipe( filter(nodes => nodes.isCompatible), take(1) From 67293d4e38dae15f2148533c752ef1a8dd16b3dd Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Fri, 21 Feb 2020 21:03:52 +0100 Subject: [PATCH 2/5] Test pollEsNodeVersions behaviour --- .../elasticsearch_service.test.ts | 138 +++++++++++++----- .../elasticsearch/elasticsearch_service.ts | 38 ++--- 2 files changed, 123 insertions(+), 53 deletions(-) diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.ts b/src/core/server/elasticsearch/elasticsearch_service.test.ts index 022a03e01d37d..1696e84e46582 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.ts @@ -21,7 +21,7 @@ import { first } from 'rxjs/operators'; import { MockClusterClient } from './elasticsearch_service.test.mocks'; -import { BehaviorSubject } from 'rxjs'; +import { BehaviorSubject, Subject } from 'rxjs'; import { Env } from '../config'; import { getEnvOptions } from '../config/__mocks__/env'; import { CoreContext } from '../core_context'; @@ -32,6 +32,13 @@ import { ElasticsearchConfig } from './elasticsearch_config'; import { ElasticsearchService } from './elasticsearch_service'; import { elasticsearchServiceMock } from './elasticsearch_service.mock'; import { duration } from 'moment'; +import { TestScheduler } from 'rxjs/testing'; +import { pollEsNodesVersion } from './version_check/ensure_es_version'; + +const getTestScheduler = () => + new TestScheduler((actual, expected) => { + expect(actual).toEqual(expected); + }); let elasticsearchService: ElasticsearchService; const configService = configServiceMock.create(); @@ -42,7 +49,7 @@ configService.atPath.mockReturnValue( new BehaviorSubject({ hosts: ['http://1.2.3.4'], healthCheck: { - delay: duration(2000), + delay: duration(10), }, ssl: { verificationMode: 'none', @@ -125,21 +132,21 @@ describe('#setup', () => { const config = MockClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` -Object { - "healthCheckDelay": "PT2S", - "hosts": Array [ - "http://8.8.8.8", - ], - "logQueries": true, - "requestHeadersWhitelist": Array [ - undefined, - ], - "ssl": Object { - "certificate": "certificate-value", - "verificationMode": "none", - }, -} -`); + Object { + "healthCheckDelay": "PT0.01S", + "hosts": Array [ + "http://8.8.8.8", + ], + "logQueries": true, + "requestHeadersWhitelist": Array [ + undefined, + ], + "ssl": Object { + "certificate": "certificate-value", + "verificationMode": "none", + }, + } + `); }); it('falls back to elasticsearch config if custom config not passed', async () => { const setupContract = await elasticsearchService.setup(deps); @@ -150,24 +157,24 @@ Object { const config = MockClusterClient.mock.calls[0][0]; expect(config).toMatchInlineSnapshot(` -Object { - "healthCheckDelay": "PT2S", - "hosts": Array [ - "http://1.2.3.4", - ], - "requestHeadersWhitelist": Array [ - undefined, - ], - "ssl": Object { - "alwaysPresentCertificate": undefined, - "certificate": undefined, - "certificateAuthorities": undefined, - "key": undefined, - "keyPassphrase": undefined, - "verificationMode": "none", - }, -} -`); + Object { + "healthCheckDelay": "PT0.01S", + "hosts": Array [ + "http://1.2.3.4", + ], + "requestHeadersWhitelist": Array [ + undefined, + ], + "ssl": Object { + "alwaysPresentCertificate": undefined, + "certificate": undefined, + "certificateAuthorities": undefined, + "key": undefined, + "keyPassphrase": undefined, + "verificationMode": "none", + }, + } + `); }); it('does not merge elasticsearch hosts if custom config overrides', async () => { @@ -213,6 +220,45 @@ Object { `); }); }); + + it('esNodeVersionCompatibility$ only starts polling when subscribed to', async done => { + const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + MockClusterClient.mockImplementationOnce( + () => mockAdminClusterClientInstance + ).mockImplementationOnce(() => mockDataClusterClientInstance); + + mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + + const setupContract = await elasticsearchService.setup(deps); + await new Promise(resolve => setTimeout(resolve, 10)); + + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + setupContract.esNodesCompatibility$.subscribe(() => { + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + done(); + }); + }); + + it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async done => { + const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + MockClusterClient.mockImplementationOnce( + () => mockAdminClusterClientInstance + ).mockImplementationOnce(() => mockDataClusterClientInstance); + + mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + + const setupContract = await elasticsearchService.setup(deps); + + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); + const sub = setupContract.esNodesCompatibility$.subscribe(async () => { + sub.unsubscribe(); + await new Promise(resolve => setTimeout(resolve, 100)); + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + done(); + }); + }); }); describe('#stop', () => { @@ -229,4 +275,26 @@ describe('#stop', () => { expect(mockAdminClusterClientInstance.close).toHaveBeenCalledTimes(1); expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1); }); + + it('stops pollEsNodeVersions even if there are active subscriptions', async () => { + const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient(); + // @ts-ignore + mockDataClusterClientInstance.close = mockAdminClusterClientInstance.close = jest.fn(); + MockClusterClient.mockImplementationOnce( + () => mockAdminClusterClientInstance + ).mockImplementationOnce(() => mockDataClusterClientInstance); + + mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); + + const setupContract = await elasticsearchService.setup(deps); + + setupContract.esNodesCompatibility$.subscribe(() => {}); + await new Promise(resolve => setTimeout(resolve, 10)); + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + + await elasticsearchService.stop(); + await new Promise(resolve => setTimeout(resolve, 100)); + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/core/server/elasticsearch/elasticsearch_service.ts b/src/core/server/elasticsearch/elasticsearch_service.ts index 2a7f59be4cb5e..6616b42f136c0 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.ts @@ -17,8 +17,17 @@ * under the License. */ -import { ConnectableObservable, Observable, Subscription } from 'rxjs'; -import { filter, first, map, publishReplay, switchMap, take, shareReplay } from 'rxjs/operators'; +import { ConnectableObservable, Observable, Subscription, Subject } from 'rxjs'; +import { + filter, + first, + map, + publishReplay, + switchMap, + take, + shareReplay, + takeUntil, +} from 'rxjs/operators'; import { CoreService } from '../../types'; import { merge } from '../../utils'; @@ -47,13 +56,8 @@ interface SetupDeps { export class ElasticsearchService implements CoreService { private readonly log: Logger; private readonly config$: Observable; - private subscriptions: { - client?: Subscription; - esNodesCompatibility?: Subscription; - } = { - client: undefined, - esNodesCompatibility: undefined, - }; + private subscription: Subscription | undefined; + private stop$ = new Subject(); private kibanaVersion: string; constructor(private readonly coreContext: CoreContext) { @@ -69,7 +73,7 @@ export class ElasticsearchService implements CoreService { - if (this.subscriptions.client !== undefined) { + if (this.subscription !== undefined) { this.log.error('Clients cannot be changed after they are created'); return false; } @@ -100,7 +104,7 @@ export class ElasticsearchService implements CoreService; - this.subscriptions.client = clients$.connect(); + this.subscription = clients$.connect(); const config = await this.config$.pipe(first()).toPromise(); @@ -164,7 +168,7 @@ export class ElasticsearchService implements CoreService clients.config)) }, @@ -184,12 +188,10 @@ export class ElasticsearchService implements CoreService Date: Fri, 21 Feb 2020 21:22:27 +0100 Subject: [PATCH 3/5] Cleanup unused code --- .../server/elasticsearch/elasticsearch_service.test.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.ts b/src/core/server/elasticsearch/elasticsearch_service.test.ts index 1696e84e46582..4a5d7272b9707 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.ts @@ -21,7 +21,7 @@ import { first } from 'rxjs/operators'; import { MockClusterClient } from './elasticsearch_service.test.mocks'; -import { BehaviorSubject, Subject } from 'rxjs'; +import { BehaviorSubject } from 'rxjs'; import { Env } from '../config'; import { getEnvOptions } from '../config/__mocks__/env'; import { CoreContext } from '../core_context'; @@ -32,13 +32,6 @@ import { ElasticsearchConfig } from './elasticsearch_config'; import { ElasticsearchService } from './elasticsearch_service'; import { elasticsearchServiceMock } from './elasticsearch_service.mock'; import { duration } from 'moment'; -import { TestScheduler } from 'rxjs/testing'; -import { pollEsNodesVersion } from './version_check/ensure_es_version'; - -const getTestScheduler = () => - new TestScheduler((actual, expected) => { - expect(actual).toEqual(expected); - }); let elasticsearchService: ElasticsearchService; const configService = configServiceMock.create(); From f1f4868029e8dff083789da5426a750c530f3eb0 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Tue, 25 Feb 2020 14:36:22 +0100 Subject: [PATCH 4/5] PR Feedback --- .../elasticsearch_service.test.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.ts b/src/core/server/elasticsearch/elasticsearch_service.test.ts index 4a5d7272b9707..37f92561b1642 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.ts @@ -33,6 +33,9 @@ import { ElasticsearchService } from './elasticsearch_service'; import { elasticsearchServiceMock } from './elasticsearch_service.mock'; import { duration } from 'moment'; +const delay = async (durationMs: number) => + await new Promise(resolve => setTimeout(resolve, durationMs)); + let elasticsearchService: ElasticsearchService; const configService = configServiceMock.create(); const deps = { @@ -224,7 +227,7 @@ describe('#setup', () => { mockAdminClusterClientInstance.callAsInternalUser.mockRejectedValue(new Error()); const setupContract = await elasticsearchService.setup(deps); - await new Promise(resolve => setTimeout(resolve, 10)); + await delay(10); expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); setupContract.esNodesCompatibility$.subscribe(() => { @@ -247,7 +250,7 @@ describe('#setup', () => { expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(0); const sub = setupContract.esNodesCompatibility$.subscribe(async () => { sub.unsubscribe(); - await new Promise(resolve => setTimeout(resolve, 100)); + await delay(100); expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); done(); }); @@ -270,10 +273,9 @@ describe('#stop', () => { }); it('stops pollEsNodeVersions even if there are active subscriptions', async () => { - const mockAdminClusterClientInstance = elasticsearchServiceMock.createClusterClient(); - const mockDataClusterClientInstance = elasticsearchServiceMock.createClusterClient(); - // @ts-ignore - mockDataClusterClientInstance.close = mockAdminClusterClientInstance.close = jest.fn(); + const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient(); + const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient(); + MockClusterClient.mockImplementationOnce( () => mockAdminClusterClientInstance ).mockImplementationOnce(() => mockDataClusterClientInstance); @@ -283,11 +285,11 @@ describe('#stop', () => { const setupContract = await elasticsearchService.setup(deps); setupContract.esNodesCompatibility$.subscribe(() => {}); - await new Promise(resolve => setTimeout(resolve, 10)); + await delay(10); expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); await elasticsearchService.stop(); - await new Promise(resolve => setTimeout(resolve, 100)); + await delay(100); expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); }); }); From 5daa81060d13cc56a4979dde4225f22854c8c141 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Thu, 27 Feb 2020 15:45:01 +0100 Subject: [PATCH 5/5] Make test more stable --- .../elasticsearch/elasticsearch_service.test.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/core/server/elasticsearch/elasticsearch_service.test.ts b/src/core/server/elasticsearch/elasticsearch_service.test.ts index 37f92561b1642..2667859f303d4 100644 --- a/src/core/server/elasticsearch/elasticsearch_service.test.ts +++ b/src/core/server/elasticsearch/elasticsearch_service.test.ts @@ -272,7 +272,8 @@ describe('#stop', () => { expect(mockDataClusterClientInstance.close).toHaveBeenCalledTimes(1); }); - it('stops pollEsNodeVersions even if there are active subscriptions', async () => { + it('stops pollEsNodeVersions even if there are active subscriptions', async done => { + expect.assertions(2); const mockAdminClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient(); const mockDataClusterClientInstance = elasticsearchServiceMock.createCustomClusterClient(); @@ -284,12 +285,13 @@ describe('#stop', () => { const setupContract = await elasticsearchService.setup(deps); - setupContract.esNodesCompatibility$.subscribe(() => {}); - await delay(10); - expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + setupContract.esNodesCompatibility$.subscribe(async () => { + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); - await elasticsearchService.stop(); - await delay(100); - expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + await elasticsearchService.stop(); + await delay(100); + expect(mockAdminClusterClientInstance.callAsInternalUser).toHaveBeenCalledTimes(1); + done(); + }); }); });