diff --git a/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.test.ts b/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.test.ts index 4f7101ca9b61a..75e4cf5a3c337 100644 --- a/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.test.ts +++ b/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.test.ts @@ -11,7 +11,7 @@ import type { NodesInfo } from './ensure_es_version'; import { mapNodesVersionCompatibility, pollEsNodesVersion } from './ensure_es_version'; import { loggingSystemMock } from '@kbn/core-logging-server-mocks'; import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks'; -import { take, of, delay } from 'rxjs'; +import { take, of, delay, takeWhile } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; const mockLoggerFactory = loggingSystemMock.create(); @@ -145,7 +145,7 @@ describe('pollEsNodesVersion', () => { }; it('returns isCompatible=false and keeps polling when nodes.info requests fail', (done) => { - expect.assertions(3); + expect.assertions(4); const expectedCompatibilityResults = [false, false, true]; jest.clearAllMocks(); @@ -153,6 +153,7 @@ describe('pollEsNodesVersion', () => { nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit not compatible // poll cycle 2 nodeInfosErrorOnce('mock request error'); // error + nodeInfosErrorOnce('mock request error'); // retry error nodeInfosErrorOnce('mock request error'); // retry error, emit error // poll cycle 3 nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1')); // emit compatible @@ -163,28 +164,35 @@ describe('pollEsNodesVersion', () => { ignoreVersionMismatch: false, kibanaVersion: KIBANA_VERSION, log: mockLogger, - healthCheckRetry: 1, + healthCheckRetry: 2, }) .pipe(take(3)) .subscribe({ next: (result) => { expect(result.isCompatible).toBe(expectedCompatibilityResults.shift()); }, - complete: done, + complete: () => { + expect(internalClient.nodes.info).toHaveBeenCalledTimes(5); + done(); + }, error: done, }); }); it('returns the error from a failed nodes.info poll attempt when all the retries are exhausted', (done) => { - expect.assertions(2); + expect.assertions(3); const expectedCompatibilityResults = [false]; const expectedMessageResults = [ 'Unable to retrieve version information from Elasticsearch nodes. mock request error', ]; jest.clearAllMocks(); - nodeInfosErrorOnce('mock request error'); // initial - nodeInfosErrorOnce('mock request error'); // retry emit + nodeInfosErrorOnce('mock request error'); // first failure + + for (let i = 0; i < 10; i++) { + // 10 retries + nodeInfosErrorOnce('mock request error'); + } pollEsNodesVersion({ internalClient, @@ -192,7 +200,7 @@ describe('pollEsNodesVersion', () => { ignoreVersionMismatch: false, kibanaVersion: KIBANA_VERSION, log: mockLogger, - healthCheckRetry: 1, + healthCheckRetry: 10, }) .pipe(take(1)) .subscribe({ @@ -200,13 +208,16 @@ describe('pollEsNodesVersion', () => { expect(result.isCompatible).toBe(expectedCompatibilityResults.shift()); expect(result.message).toBe(expectedMessageResults.shift()); }, - complete: done, + complete: () => { + expect(internalClient.nodes.info).toHaveBeenCalledTimes(11); + done(); + }, error: done, }); }); it('only emits if the error from a failed nodes.info call changed from the previous poll', (done) => { - expect.assertions(4); + expect.assertions(5); const expectedCompatibilityResults = [false, false]; const expectedMessageResults = [ 'Unable to retrieve version information from Elasticsearch nodes. mock request error', @@ -237,18 +248,21 @@ describe('pollEsNodesVersion', () => { expect(result.message).toBe(expectedMessageResults.shift()); expect(result.isCompatible).toBe(expectedCompatibilityResults.shift()); }, - complete: done, + complete: () => { + expect(internalClient.nodes.info).toHaveBeenCalledTimes(6); + done(); + }, error: done, }); }); it('returns isCompatible=false and keeps polling when requests fail, only emitting again if the error message has changed', (done) => { - expect.assertions(8); + expect.assertions(9); const expectedCompatibilityResults = [false, false, true, false]; const expectedMessageResults = [ 'This version of Kibana (v5.1.0) is incompatible with the following Elasticsearch nodes in your cluster: v5.0.0 @ http_address (ip)', 'Unable to retrieve version information from Elasticsearch nodes. mock request error', - "You're running Kibana 5.1.0 with some different versions of Elasticsearch. Update Kibana or Elasticsearch to the same version to prevent compatibility issues: v5.2.0 @ http_address (ip), v5.1.1-Beta1 @ http_address (ip)", + "You're running Kibana 5.1.0 with some different versions of Elasticsearch. Update Kibana or Elasticsearch to the same version to prevent compatibility issues: v5.1.1-Beta1 @ http_address (ip), v5.2.0 @ http_address (ip)", 'Unable to retrieve version information from Elasticsearch nodes. mock request error', ]; jest.clearAllMocks(); @@ -256,14 +270,17 @@ describe('pollEsNodesVersion', () => { nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // poll 1 emit nodeInfosErrorOnce('mock request error'); // poll 2 + nodeInfosErrorOnce('mock request error'); // retry attempt nodeInfosErrorOnce('mock request error'); // retry attempt, emit nodeInfosErrorOnce('mock request error'); // poll 3 + nodeInfosErrorOnce('mock request error'); // retry attempt nodeInfosErrorOnce('mock request error'); // retry doesn't emit same error as cycle 1 nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1')); // poll 4 emit nodeInfosErrorOnce('mock request error'); // poll 5 + nodeInfosErrorOnce('mock request error'); // retry attempt nodeInfosErrorOnce('mock request error'); // retry emit pollEsNodesVersion({ @@ -272,7 +289,7 @@ describe('pollEsNodesVersion', () => { ignoreVersionMismatch: false, kibanaVersion: KIBANA_VERSION, log: mockLogger, - healthCheckRetry: 1, + healthCheckRetry: 2, }) .pipe(take(4)) .subscribe({ @@ -280,13 +297,16 @@ describe('pollEsNodesVersion', () => { expect(result.isCompatible).toBe(expectedCompatibilityResults.shift()); expect(result.message).toBe(expectedMessageResults.shift()); }, - complete: done, + complete: () => { + expect(internalClient.nodes.info).toHaveBeenCalledTimes(11); + done(); + }, error: done, }); }); it('returns compatibility results', (done) => { - expect.assertions(1); + expect.assertions(2); const nodes = createNodes('5.1.0', '5.2.0', '5.0.0'); nodeInfosSuccessOnce(nodes); @@ -304,19 +324,24 @@ describe('pollEsNodesVersion', () => { next: (result) => { expect(result).toEqual(mapNodesVersionCompatibility(nodes, KIBANA_VERSION, false)); }, - complete: done, + complete: () => { + expect(internalClient.nodes.info).toHaveBeenCalledTimes(1); + done(); + }, error: done, }); }); it('only emits when node versions changed since the previous poll', (done) => { - expect.assertions(4); + // Test will cause 7 version polls before completing, but only 5 emissions + expect.assertions(5); nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // ignore, same versions, different ordering nodeInfosSuccessOnce(createNodes('5.1.1', '5.2.0', '5.0.0')); // emit nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // emit nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // ignore nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // emit, different from previous version + nodeInfosSuccessOnce(createNodes('5.1.0', '5.1.0', '5.1.0')); // emit, no warning nodes, used to detect end of test pollEsNodesVersion({ internalClient, @@ -326,10 +351,14 @@ describe('pollEsNodesVersion', () => { log: mockLogger, healthCheckRetry: 1, }) - .pipe(take(4)) + .pipe(takeWhile((result) => !(result.warningNodes.length === 0), true)) .subscribe({ - next: (result) => expect(result).toBeDefined(), - complete: done, + next: (result) => { + expect(result.isCompatible).toBeDefined(); + }, + complete: () => { + done(); + }, error: done, }); }); diff --git a/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.ts b/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.ts index dc76f7b766f26..0af7b5c251a8e 100644 --- a/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.ts +++ b/src/core/packages/elasticsearch/server-internal/src/version_check/ensure_es_version.ts @@ -16,7 +16,6 @@ import type { Observable } from 'rxjs'; import { interval, of, - from, BehaviorSubject, map, distinctUntilChanged, @@ -28,6 +27,7 @@ import { shareReplay, retry, timer, + defer, } from 'rxjs'; import type { Logger } from '@kbn/logging'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; @@ -101,10 +101,17 @@ export function mapNodesVersionCompatibility( nodesInfoRequestError: nodesInfoResponse.nodesInfoRequestError, }; } + + // Sort by version first, then by IP for stable ordering + const sortNodes = (a: NodeInfo, b: NodeInfo) => { + const versionCompare = a.version.localeCompare(b.version); + return versionCompare !== 0 ? versionCompare : a.ip.localeCompare(b.ip); + }; + const nodes = Object.keys(nodesInfoResponse.nodes) - .sort() // Sorting ensures a stable node ordering for comparison .map((key) => nodesInfoResponse.nodes[key]) - .map((node) => Object.assign({}, node, { name: getHumanizedNodeName(node) })); + .map((node) => Object.assign({}, node, { name: getHumanizedNodeName(node) })) + .sort(sortNodes); // Sorting ensures stable ordering for comparison // Aggregate incompatible ES nodes. const incompatibleNodes = nodes.filter( @@ -155,6 +162,7 @@ function compareNodesInfoErrorMessages( // Returns true if two NodesVersionCompatibility entries match function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompatibility) { const nodesEqual = (n: NodeInfo, m: NodeInfo) => n.ip === m.ip && n.version === m.version; + return ( curr.isCompatible === prev.isCompatible && curr.incompatibleNodes.length === prev.incompatibleNodes.length && @@ -195,16 +203,16 @@ export const pollEsNodesVersion = ({ switchMap((checkInterval) => interval(checkInterval)), startWith(0), exhaustMap(() => { - return from( - internalClient.nodes.info( + return defer(() => { + return internalClient.nodes.info( { node_id: '_all', metric: '_none', filter_path: ['nodes.*.version', 'nodes.*.http.publish_address', 'nodes.*.ip'], }, { requestTimeout: HEALTH_CHECK_REQUEST_TIMEOUT } - ) - ).pipe( + ); + }).pipe( retry({ count: healthCheckRetry, delay: (e) => {