Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { NodesInfo } from './ensure_es_version';
import { mapNodesVersionCompatibility, pollEsNodesVersion } from './ensure_es_version';
import { loggingSystemMock } from '@kbn/core-logging-server-mocks';
import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { take, of, delay } from 'rxjs';
import { take, of, delay, takeWhile } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';

const mockLoggerFactory = loggingSystemMock.create();
Expand Down Expand Up @@ -145,14 +145,15 @@ describe('pollEsNodesVersion', () => {
};

it('returns isCompatible=false and keeps polling when nodes.info requests fail', (done) => {
expect.assertions(3);
expect.assertions(4);
const expectedCompatibilityResults = [false, false, true];
jest.clearAllMocks();

// poll cycle 1
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit not compatible
// poll cycle 2
nodeInfosErrorOnce('mock request error'); // error
nodeInfosErrorOnce('mock request error'); // retry error
nodeInfosErrorOnce('mock request error'); // retry error, emit error
// poll cycle 3
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1')); // emit compatible
Expand All @@ -163,50 +164,60 @@ describe('pollEsNodesVersion', () => {
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
log: mockLogger,
healthCheckRetry: 1,
healthCheckRetry: 2,
})
.pipe(take(3))
.subscribe({
next: (result) => {
expect(result.isCompatible).toBe(expectedCompatibilityResults.shift());
},
complete: done,
complete: () => {
expect(internalClient.nodes.info).toHaveBeenCalledTimes(5);
done();
},
error: done,
});
});

it('returns the error from a failed nodes.info poll attempt when all the retries are exhausted', (done) => {
expect.assertions(2);
expect.assertions(3);
const expectedCompatibilityResults = [false];
const expectedMessageResults = [
'Unable to retrieve version information from Elasticsearch nodes. mock request error',
];
jest.clearAllMocks();

nodeInfosErrorOnce('mock request error'); // initial
nodeInfosErrorOnce('mock request error'); // retry emit
nodeInfosErrorOnce('mock request error'); // first failure

for (let i = 0; i < 10; i++) {
// 10 retries
nodeInfosErrorOnce('mock request error');
}

pollEsNodesVersion({
internalClient,
healthCheckInterval: 1,
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
log: mockLogger,
healthCheckRetry: 1,
healthCheckRetry: 10,
})
.pipe(take(1))
.subscribe({
next: (result) => {
expect(result.isCompatible).toBe(expectedCompatibilityResults.shift());
expect(result.message).toBe(expectedMessageResults.shift());
},
complete: done,
complete: () => {
expect(internalClient.nodes.info).toHaveBeenCalledTimes(11);
done();
},
error: done,
});
});

it('only emits if the error from a failed nodes.info call changed from the previous poll', (done) => {
expect.assertions(4);
expect.assertions(5);
const expectedCompatibilityResults = [false, false];
const expectedMessageResults = [
'Unable to retrieve version information from Elasticsearch nodes. mock request error',
Expand Down Expand Up @@ -237,33 +248,39 @@ describe('pollEsNodesVersion', () => {
expect(result.message).toBe(expectedMessageResults.shift());
expect(result.isCompatible).toBe(expectedCompatibilityResults.shift());
},
complete: done,
complete: () => {
expect(internalClient.nodes.info).toHaveBeenCalledTimes(6);
done();
},
error: done,
});
});

it('returns isCompatible=false and keeps polling when requests fail, only emitting again if the error message has changed', (done) => {
expect.assertions(8);
expect.assertions(9);
const expectedCompatibilityResults = [false, false, true, false];
const expectedMessageResults = [
'This version of Kibana (v5.1.0) is incompatible with the following Elasticsearch nodes in your cluster: v5.0.0 @ http_address (ip)',
'Unable to retrieve version information from Elasticsearch nodes. mock request error',
"You're running Kibana 5.1.0 with some different versions of Elasticsearch. Update Kibana or Elasticsearch to the same version to prevent compatibility issues: v5.2.0 @ http_address (ip), v5.1.1-Beta1 @ http_address (ip)",
"You're running Kibana 5.1.0 with some different versions of Elasticsearch. Update Kibana or Elasticsearch to the same version to prevent compatibility issues: v5.1.1-Beta1 @ http_address (ip), v5.2.0 @ http_address (ip)",
'Unable to retrieve version information from Elasticsearch nodes. mock request error',
];
jest.clearAllMocks();

nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // poll 1 emit

nodeInfosErrorOnce('mock request error'); // poll 2
nodeInfosErrorOnce('mock request error'); // retry attempt
nodeInfosErrorOnce('mock request error'); // retry attempt, emit

nodeInfosErrorOnce('mock request error'); // poll 3
nodeInfosErrorOnce('mock request error'); // retry attempt
nodeInfosErrorOnce('mock request error'); // retry doesn't emit same error as cycle 1

nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.1.1-Beta1')); // poll 4 emit

nodeInfosErrorOnce('mock request error'); // poll 5
nodeInfosErrorOnce('mock request error'); // retry attempt
nodeInfosErrorOnce('mock request error'); // retry emit

pollEsNodesVersion({
Expand All @@ -272,21 +289,24 @@ describe('pollEsNodesVersion', () => {
ignoreVersionMismatch: false,
kibanaVersion: KIBANA_VERSION,
log: mockLogger,
healthCheckRetry: 1,
healthCheckRetry: 2,
})
.pipe(take(4))
.subscribe({
next: (result) => {
expect(result.isCompatible).toBe(expectedCompatibilityResults.shift());
expect(result.message).toBe(expectedMessageResults.shift());
},
complete: done,
complete: () => {
expect(internalClient.nodes.info).toHaveBeenCalledTimes(11);
done();
},
error: done,
});
});

it('returns compatibility results', (done) => {
expect.assertions(1);
expect.assertions(2);
const nodes = createNodes('5.1.0', '5.2.0', '5.0.0');

nodeInfosSuccessOnce(nodes);
Expand All @@ -304,19 +324,24 @@ describe('pollEsNodesVersion', () => {
next: (result) => {
expect(result).toEqual(mapNodesVersionCompatibility(nodes, KIBANA_VERSION, false));
},
complete: done,
complete: () => {
expect(internalClient.nodes.info).toHaveBeenCalledTimes(1);
done();
},
error: done,
});
});

it('only emits when node versions changed since the previous poll', (done) => {
expect.assertions(4);
// Test will cause 7 version polls before completing, but only 5 emissions
expect.assertions(5);
nodeInfosSuccessOnce(createNodes('5.1.0', '5.2.0', '5.0.0')); // emit
nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // ignore, same versions, different ordering
nodeInfosSuccessOnce(createNodes('5.1.1', '5.2.0', '5.0.0')); // emit
nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // emit
nodeInfosSuccessOnce(createNodes('5.1.1', '5.1.2', '5.1.3')); // ignore
nodeInfosSuccessOnce(createNodes('5.0.0', '5.1.0', '5.2.0')); // emit, different from previous version
nodeInfosSuccessOnce(createNodes('5.1.0', '5.1.0', '5.1.0')); // emit, no warning nodes, used to detect end of test

pollEsNodesVersion({
internalClient,
Expand All @@ -326,10 +351,14 @@ describe('pollEsNodesVersion', () => {
log: mockLogger,
healthCheckRetry: 1,
})
.pipe(take(4))
.pipe(takeWhile((result) => !(result.warningNodes.length === 0), true))
.subscribe({
next: (result) => expect(result).toBeDefined(),
complete: done,
next: (result) => {
expect(result.isCompatible).toBeDefined();
},
complete: () => {
done();
},
error: done,
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import type { Observable } from 'rxjs';
import {
interval,
of,
from,
BehaviorSubject,
map,
distinctUntilChanged,
Expand All @@ -28,6 +27,7 @@ import {
shareReplay,
retry,
timer,
defer,
} from 'rxjs';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
Expand Down Expand Up @@ -101,10 +101,17 @@ export function mapNodesVersionCompatibility(
nodesInfoRequestError: nodesInfoResponse.nodesInfoRequestError,
};
}

// Sort by version first, then by IP for stable ordering
const sortNodes = (a: NodeInfo, b: NodeInfo) => {
const versionCompare = a.version.localeCompare(b.version);
return versionCompare !== 0 ? versionCompare : a.ip.localeCompare(b.ip);
};

const nodes = Object.keys(nodesInfoResponse.nodes)
.sort() // Sorting ensures a stable node ordering for comparison
.map((key) => nodesInfoResponse.nodes[key])
.map((node) => Object.assign({}, node, { name: getHumanizedNodeName(node) }));
.map((node) => Object.assign({}, node, { name: getHumanizedNodeName(node) }))
.sort(sortNodes); // Sorting ensures stable ordering for comparison

// Aggregate incompatible ES nodes.
const incompatibleNodes = nodes.filter(
Expand Down Expand Up @@ -155,6 +162,7 @@ function compareNodesInfoErrorMessages(
// Returns true if two NodesVersionCompatibility entries match
function compareNodes(prev: NodesVersionCompatibility, curr: NodesVersionCompatibility) {
const nodesEqual = (n: NodeInfo, m: NodeInfo) => n.ip === m.ip && n.version === m.version;

return (
curr.isCompatible === prev.isCompatible &&
curr.incompatibleNodes.length === prev.incompatibleNodes.length &&
Expand Down Expand Up @@ -195,16 +203,16 @@ export const pollEsNodesVersion = ({
switchMap((checkInterval) => interval(checkInterval)),
startWith(0),
exhaustMap(() => {
return from(
internalClient.nodes.info(
return defer(() => {
return internalClient.nodes.info(
{
node_id: '_all',
metric: '_none',
filter_path: ['nodes.*.version', 'nodes.*.http.publish_address', 'nodes.*.ip'],
},
{ requestTimeout: HEALTH_CHECK_REQUEST_TIMEOUT }
)
).pipe(
);
}).pipe(
retry({
count: healthCheckRetry,
delay: (e) => {
Expand Down