From 6c7f5d7da2b756a95ad1252a7d92635a130a85a6 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 26 Feb 2020 15:48:07 +0100 Subject: [PATCH 01/17] Added server side logic for handling batch reindex --- .../server/lib/es_version_precheck.ts | 4 +- .../server/routes/reindex_indices/index.ts | 7 + .../routes/reindex_indices/reindex_handler.ts | 63 ++++++++ .../reindex_indices.test.ts | 103 ++++++++++++- .../{ => reindex_indices}/reindex_indices.ts | 140 ++++++++++++------ 5 files changed, 265 insertions(+), 52 deletions(-) create mode 100644 x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts create mode 100644 x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts rename x-pack/plugins/upgrade_assistant/server/routes/{ => reindex_indices}/reindex_indices.test.ts (73%) rename x-pack/plugins/upgrade_assistant/server/routes/{ => reindex_indices}/reindex_indices.ts (59%) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts b/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts index e7636eea66479..6182a82f6f1bd 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts @@ -90,9 +90,9 @@ export const esVersionCheck = async ( } }; -export const versionCheckHandlerWrapper = (handler: RequestHandler) => async ( +export const versionCheckHandlerWrapper = (handler: RequestHandler) => async ( ctx: RequestHandlerContext, - request: KibanaRequest, + request: KibanaRequest, response: KibanaResponseFactory ) => { const errorResponse = await esVersionCheck(ctx, response); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts new file mode 100644 index 0000000000000..9f1d3e4021c3f --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices'; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts new file mode 100644 index 0000000000000..6452358fd0df9 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana/server'; + +import { LicensingPluginSetup } from '../../../../licensing/server'; + +import { ReindexStatus } from '../../../common/types'; + +import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; +import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing'; +import { CredentialStore } from '../../lib/reindexing/credential_store'; + +export const SYMBOL_FORBIDDEN = Symbol('Forbidden'); + +interface ReindexHandlerArgs { + savedObjects: SavedObjectsClientContract; + dataClient: IScopedClusterClient; + indexName: string; + log: Logger; + licensing: LicensingPluginSetup; + headers: Record; + credentialStore: CredentialStore; + getWorker: () => ReindexWorker; +} + +export const reindexHandler = async ({ + credentialStore, + dataClient, + headers, + indexName, + licensing, + log, + savedObjects, + getWorker, +}: ReindexHandlerArgs) => { + const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); + const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser); + const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing); + + if (!(await reindexService.hasRequiredPrivileges(indexName))) { + throw SYMBOL_FORBIDDEN; + } + + const existingOp = await reindexService.findReindexOperation(indexName); + + // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. + const reindexOp = + existingOp && existingOp.attributes.status === ReindexStatus.paused + ? await reindexService.resumeReindexOperation(indexName) + : await reindexService.createReindexOperation(indexName); + + // Add users credentials for the worker to use + credentialStore.set(reindexOp, headers); + + // Kick the worker on this node to immediately pickup the new reindex operation. + getWorker().forceRefresh(); + + return reindexOp.attributes; +}; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts similarity index 73% rename from x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.test.ts rename to x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index 695bb6304cfdf..64132e3bb11a1 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -5,9 +5,9 @@ */ import { kibanaResponseFactory } from 'src/core/server'; -import { licensingMock } from '../../../licensing/server/mocks'; -import { createMockRouter, MockRouter, routeHandlerContextMock } from './__mocks__/routes.mock'; -import { createRequestMock } from './__mocks__/request.mock'; +import { licensingMock } from '../../../../licensing/server/mocks'; +import { createMockRouter, MockRouter, routeHandlerContextMock } from '../__mocks__/routes.mock'; +import { createRequestMock } from '../__mocks__/request.mock'; const mockReindexService = { hasRequiredPrivileges: jest.fn(), @@ -21,18 +21,23 @@ const mockReindexService = { cancelReindexing: jest.fn(), }; -jest.mock('../lib/es_version_precheck', () => ({ +jest.mock('../../lib/es_version_precheck', () => ({ versionCheckHandlerWrapper: (a: any) => a, })); -jest.mock('../lib/reindexing', () => { +jest.mock('../../lib/reindexing', () => { return { reindexServiceFactory: () => mockReindexService, }; }); -import { IndexGroup, ReindexSavedObject, ReindexStatus, ReindexWarning } from '../../common/types'; -import { credentialStoreFactory } from '../lib/reindexing/credential_store'; +import { + IndexGroup, + ReindexSavedObject, + ReindexStatus, + ReindexWarning, +} from '../../../common/types'; +import { credentialStoreFactory } from '../../lib/reindexing/credential_store'; import { registerReindexIndicesRoutes } from './reindex_indices'; /** @@ -76,7 +81,7 @@ describe('reindex API', () => { }); afterEach(() => { - jest.resetAllMocks(); + jest.clearAllMocks(); }); describe('GET /api/upgrade_assistant/reindex/{indexName}', () => { @@ -255,6 +260,88 @@ describe('reindex API', () => { }); }); + describe('POST /api/upgrade_assistant/reindex/batch', () => { + it('creates a collection of index operations', async () => { + mockReindexService.createReindexOperation + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex1' }, + }) + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex2' }, + }) + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex3' }, + }); + + const resp = await routeDependencies.router.getHandler({ + method: 'post', + pathPattern: '/api/upgrade_assistant/reindex/batch', + })( + routeHandlerContextMock, + createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }), + kibanaResponseFactory + ); + + // It called create correctly + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(1, 'theIndex1'); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(2, 'theIndex2'); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(3, 'theIndex3'); + + // It returned the right results + expect(resp.status).toEqual(200); + const data = resp.payload; + expect(data).toEqual({ + errors: [], + successes: [ + { indexName: 'theIndex1' }, + { indexName: 'theIndex2' }, + { indexName: 'theIndex3' }, + ], + }); + }); + + it('gracefully handles partial successes', async () => { + mockReindexService.createReindexOperation + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex1' }, + }) + .mockRejectedValueOnce(new Error('oops!')); + + mockReindexService.hasRequiredPrivileges + .mockResolvedValueOnce(true) + .mockResolvedValueOnce(false) + .mockResolvedValueOnce(true); + + const resp = await routeDependencies.router.getHandler({ + method: 'post', + pathPattern: '/api/upgrade_assistant/reindex/batch', + })( + routeHandlerContextMock, + createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }), + kibanaResponseFactory + ); + + // It called create correctly + expect(mockReindexService.createReindexOperation).toHaveBeenCalledTimes(2); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(1, 'theIndex1'); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(2, 'theIndex3'); + + // It returned the right results + expect(resp.status).toEqual(200); + const data = resp.payload; + expect(data).toEqual({ + errors: [ + { + indexName: 'theIndex2', + message: 'You do not have adequate privileges to reindex "theIndex2".', + }, + { indexName: 'theIndex3', message: 'oops!' }, + ], + successes: [{ indexName: 'theIndex1' }], + }); + }); + }); + describe('POST /api/upgrade_assistant/reindex/{indexName}/cancel', () => { it('returns a 501', async () => { mockReindexService.cancelReindexing.mockResolvedValueOnce({}); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts similarity index 59% rename from x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.ts rename to x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index a910145474061..286d893464005 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -3,16 +3,21 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ - +import { i18n } from '@kbn/i18n'; import { schema } from '@kbn/config-schema'; -import { Logger, ElasticsearchServiceSetup, SavedObjectsClient } from 'src/core/server'; -import { ReindexStatus } from '../../common/types'; -import { versionCheckHandlerWrapper } from '../lib/es_version_precheck'; -import { reindexServiceFactory, ReindexWorker } from '../lib/reindexing'; -import { CredentialStore } from '../lib/reindexing/credential_store'; -import { reindexActionsFactory } from '../lib/reindexing/reindex_actions'; -import { RouteDependencies } from '../types'; -import { LicensingPluginSetup } from '../../../licensing/server'; +import { Logger, ElasticsearchServiceSetup, SavedObjectsClient } from 'kibana/server'; + +import { LicensingPluginSetup } from '../../../../licensing/server'; + +import { ReindexOperation } from '../../../common/types'; + +import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck'; +import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing'; +import { CredentialStore } from '../../lib/reindexing/credential_store'; +import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; +import { RouteDependencies } from '../../types'; + +import { reindexHandler, SYMBOL_FORBIDDEN } from './reindex_handler'; interface CreateReindexWorker { logger: Logger; @@ -53,49 +58,100 @@ export function registerReindexIndicesRoutes( async ( { core: { - savedObjects, + savedObjects: { client: savedObjectsClient }, elasticsearch: { dataClient }, }, }, request, response ) => { - const { indexName } = request.params as any; - const { client } = savedObjects; - const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); - const reindexActions = reindexActionsFactory(client, callAsCurrentUser); - const reindexService = reindexServiceFactory( - callAsCurrentUser, - reindexActions, - log, - licensing - ); - + const { indexName } = request.params; try { - if (!(await reindexService.hasRequiredPrivileges(indexName))) { + return response.ok({ + body: await reindexHandler({ + savedObjects: savedObjectsClient, + dataClient, + indexName, + log, + licensing, + headers: request.headers, + credentialStore, + getWorker, + }), + }); + } catch (e) { + if (e === SYMBOL_FORBIDDEN) { return response.forbidden({ - body: `You do not have adequate privileges to reindex this index.`, + body: i18n.translate('xpack.upgradeAssistant.reindex.reindexPrivilegesErrorSingle', { + defaultMessage: `You do not have adequate privileges to reindex this index.`, + }), }); } + return response.internalError(e); + } + } + ) + ); - const existingOp = await reindexService.findReindexOperation(indexName); - - // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. - const reindexOp = - existingOp && existingOp.attributes.status === ReindexStatus.paused - ? await reindexService.resumeReindexOperation(indexName) - : await reindexService.createReindexOperation(indexName); - - // Add users credentials for the worker to use - credentialStore.set(reindexOp, request.headers); - - // Kick the worker on this node to immediately pickup the new reindex operation. - getWorker().forceRefresh(); - - return response.ok({ body: reindexOp.attributes }); - } catch (e) { - return response.internalError({ body: e }); + router.post( + { + path: `${BASE_PATH}/batch`, + validate: { + body: schema.object({ + indexNames: schema.arrayOf(schema.string()), + }), + }, + }, + versionCheckHandlerWrapper( + async ( + { + core: { + savedObjects: { client: savedObjectsClient }, + elasticsearch: { dataClient }, + }, + }, + request, + response + ) => { + const { indexNames } = request.body; + const results = { + successes: [] as ReindexOperation[], + errors: [] as Array<{ indexName: string; message: string }>, + }; + for (const indexName of indexNames) { + try { + const result = await reindexHandler({ + savedObjects: savedObjectsClient, + dataClient, + indexName, + log, + licensing, + headers: request.headers, + credentialStore, + getWorker, + }); + results.successes.push(result); + } catch (e) { + if (e === SYMBOL_FORBIDDEN) { + results.errors.push({ + message: i18n.translate( + 'xpack.upgradeAssistant.reindex.reindexPrivilegesErrorBatch', + { + defaultMessage: `You do not have adequate privileges to reindex "${indexName}".`, + } + ), + indexName, + }); + } else { + results.errors.push({ + indexName, + message: e.message, + }); + } + } } + + return response.ok({ body: results }); } ) ); @@ -122,7 +178,7 @@ export function registerReindexIndicesRoutes( response ) => { const { client } = savedObjects; - const { indexName } = request.params as any; + const { indexName } = request.params; const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(client, callAsCurrentUser); const reindexService = reindexServiceFactory( @@ -185,7 +241,7 @@ export function registerReindexIndicesRoutes( request, response ) => { - const { indexName } = request.params as any; + const { indexName } = request.params; const { client } = savedObjects; const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(client, callAsCurrentUser); From cb0f786041e9ffa9e1107e411b984b7d9f77cb01 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 26 Feb 2020 16:45:48 +0100 Subject: [PATCH 02/17] Remove literal string interpolation from translation --- .../server/routes/reindex_indices/reindex_indices.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index 286d893464005..09f801e8b0aac 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -137,7 +137,8 @@ export function registerReindexIndicesRoutes( message: i18n.translate( 'xpack.upgradeAssistant.reindex.reindexPrivilegesErrorBatch', { - defaultMessage: `You do not have adequate privileges to reindex "${indexName}".`, + defaultMessage: `You do not have adequate privileges to reindex "{indexName}".`, + values: { indexName }, } ), indexName, From 6407b6a5ef5ca03554839c9f1e5cd8231ced9d22 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Mon, 2 Mar 2020 12:22:11 +0100 Subject: [PATCH 03/17] Refactor return value of batch endpoint "sucesses" does not communicate accurately what has happened. "started" more closely reflects what has happened. --- .../server/routes/reindex_indices/reindex_indices.test.ts | 4 ++-- .../server/routes/reindex_indices/reindex_indices.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index 64132e3bb11a1..adf5b130b108e 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -292,7 +292,7 @@ describe('reindex API', () => { const data = resp.payload; expect(data).toEqual({ errors: [], - successes: [ + started: [ { indexName: 'theIndex1' }, { indexName: 'theIndex2' }, { indexName: 'theIndex3' }, @@ -337,7 +337,7 @@ describe('reindex API', () => { }, { indexName: 'theIndex3', message: 'oops!' }, ], - successes: [{ indexName: 'theIndex1' }], + started: [{ indexName: 'theIndex1' }], }); }); }); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index 874e4a474692f..32d404168fc98 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -145,7 +145,7 @@ export function registerReindexIndicesRoutes( ) => { const { indexNames } = request.body; const results = { - successes: [] as ReindexOperation[], + started: [] as ReindexOperation[], errors: [] as Array<{ indexName: string; message: string }>, }; for (const indexName of indexNames) { @@ -160,7 +160,7 @@ export function registerReindexIndicesRoutes( credentialStore, getWorker, }); - results.successes.push(result); + results.started.push(result); } catch (e) { results.errors.push({ indexName, From b4263fd894b0f59a3eb50870bb4fb2dbceeab009 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 3 Mar 2020 11:03:22 +0100 Subject: [PATCH 04/17] First iteration of batch queues --- .../__snapshots__/batch_queues.test.ts.snap | 3 + .../lib/reindexing/batch_queues.test.ts | 66 ++++++++++++++++ .../server/lib/reindexing/batch_queues.ts | 77 +++++++++++++++++++ 3 files changed, 146 insertions(+) create mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queues.test.ts.snap create mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts create mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queues.test.ts.snap b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queues.test.ts.snap new file mode 100644 index 0000000000000..242994232e9f9 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queues.test.ts.snap @@ -0,0 +1,3 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`BatchQueues prevents adding items that have already been queued 1`] = `"The following item(s) are already enqueued: [s, t]"`; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts new file mode 100644 index 0000000000000..de61ccc39cde7 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { BatchQueues } from './batch_queues'; + +describe('BatchQueues', () => { + test('addQueue', () => { + const myQueue = ['a', 'b']; + const batchQueues = new BatchQueues(); + batchQueues.addQueue(myQueue); + const queues = batchQueues.getQueues(); + expect(queues[0].items).toEqual(myQueue); + }); + + test('readNextQueueItem and shiftQueue', () => { + const myQueue = ['a', 'b']; + + const batchQueues = new BatchQueues(); + expect(batchQueues.readNextQueueItem('nada')).toBeUndefined(); + const id = batchQueues.addQueue(myQueue); + + expect(batchQueues.readNextQueueItem(id)).toBe('a'); + // Second read yields same item + expect(batchQueues.readNextQueueItem(id)).toBe('a'); + + expect(batchQueues.shiftQueue(id)).toBe('a'); + expect(batchQueues.shiftQueue(id)).toBe('b'); + expect(batchQueues.shiftQueue(id)).toBe(undefined); + + expect(() => batchQueues.shiftQueue('nada')).toThrow('No queue found'); + }); + + test('deleteQueue', () => { + const myQueue = ['a', 'b']; + const batchQueues = new BatchQueues(); + expect(batchQueues.deleteQueue('nothing')).toBe(false); + const id = batchQueues.addQueue(myQueue); + expect(batchQueues.deleteQueue(id)).toBe(true); + }); + + test('findItemQueue', () => { + const myQueueA = ['a', 'b']; + const myQueueB = ['s', 't']; + const batchQueues = new BatchQueues(); + batchQueues.addQueue(myQueueA); + batchQueues.addQueue(myQueueB); + + expect(batchQueues.findItemQueue('s')!.items).toEqual(myQueueB); + }); + + it('prevents adding items that have already been queued', () => { + const myQueueA = ['s', 't']; + const myQueueB = ['s', 't', 'u']; + const batchQueues = new BatchQueues(); + batchQueues.addQueue(myQueueA); + try { + batchQueues.addQueue(myQueueB); + } catch (e) { + expect(e.message).toMatchSnapshot(); + return; + } + fail('Adding duplicate items to batch queues did not throw!'); + }); +}); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts new file mode 100644 index 0000000000000..ffd29eee85047 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts @@ -0,0 +1,77 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { v4 as uuid } from 'uuid'; + +interface Queue { + id: string; + items: string[]; +} + +export class BatchQueues { + private queues: Map = new Map(); + + private requireQueue(id: string): Queue { + if (this.queues.has(id)) { + return this.queues.get(id)!; + } + throw new Error(`No queue found with id ${id}`); + } + + private runChecks(queue: string[]): void { + const errorItems: string[] = []; + queue.forEach(item => { + const maybeQueue = this.findItemQueue(item); + if (maybeQueue) { + errorItems.push(item); + } + }); + + if (errorItems.length) { + throw new Error(`The following item(s) are already enqueued: [${[errorItems.join(', ')]}]`); + } + } + + public getQueue(id: string): Queue | undefined { + return this.queues.get(id); + } + + public addQueue(queue: string[]): string { + this.runChecks(queue); + const id = uuid(); + this.queues.set(id, { + id, + items: queue, + }); + + return id; + } + + public getQueues(): Queue[] { + return Array.from(this.queues.values()); + } + + public readNextQueueItem(id: string): string | undefined { + const queue = this.getQueue(id); + if (queue) { + return queue.items[0]; + } + } + + public shiftQueue(id: string): string | undefined { + return this.requireQueue(id).items.shift(); + } + + public deleteQueue(id: string): boolean { + return this.queues.delete(id); + } + + public findItemQueue(item: string): Queue | undefined { + return this.getQueues().find(queue => { + return queue.items.find(i => i === item); + }); + } +} From 8214a6962c8dcb43253bef73e73e21f11fe3a068 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 3 Mar 2020 13:02:22 +0100 Subject: [PATCH 05/17] Single queue Changed the batchqueues implementation to only using a single queue - since there is only one ES that it is interacting with. Before continuing with this work, just making sure that these pre- cautions are necessary! --- ....test.ts.snap => batch_queue.test.ts.snap} | 0 .../server/lib/reindexing/batch_queue.test.ts | 46 +++++++++++ .../server/lib/reindexing/batch_queue.ts | 49 ++++++++++++ .../lib/reindexing/batch_queues.test.ts | 66 ---------------- .../server/lib/reindexing/batch_queues.ts | 77 ------------------- .../server/lib/reindexing/worker.ts | 7 ++ .../upgrade_assistant/server/plugin.ts | 11 ++- 7 files changed, 109 insertions(+), 147 deletions(-) rename x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/{batch_queues.test.ts.snap => batch_queue.test.ts.snap} (100%) create mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts create mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts delete mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts delete mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queues.test.ts.snap b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queue.test.ts.snap similarity index 100% rename from x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queues.test.ts.snap rename to x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queue.test.ts.snap diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts new file mode 100644 index 0000000000000..7bc86d4a1fac5 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { BatchQueue } from './batch_queue'; + +describe('BatchQueues', () => { + test('addItems', () => { + const myItems = ['a', 'b']; + const batchQueue = new BatchQueue(); + batchQueue.addItems(myItems); + const queue = batchQueue.getQueue(); + expect(queue).toEqual(myItems); + }); + + test('readNextItem and shiftQueue', () => { + const myItems = ['a', 'b']; + + const batchQueue = new BatchQueue(); + expect(batchQueue.readNextItem()).toBeUndefined(); + batchQueue.addItems(myItems); + + expect(batchQueue.readNextItem()).toBe('a'); + // Second read yields same item + expect(batchQueue.readNextItem()).toBe('a'); + + expect(batchQueue.shiftQueue()).toBe('a'); + expect(batchQueue.shiftQueue()).toBe('b'); + expect(batchQueue.shiftQueue()).toBe(undefined); + }); + + it('prevents adding items that have already been queued', () => { + const myItemsA = ['s', 't']; + const myItemsB = ['s', 't', 'u']; + const batchQueues = new BatchQueue(); + batchQueues.addItems(myItemsA); + try { + batchQueues.addItems(myItemsB); + } catch (e) { + expect(e.message).toMatchSnapshot(); + return; + } + fail('Adding duplicate items to batch queues did not throw!'); + }); +}); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts new file mode 100644 index 0000000000000..ed915041fddfb --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export class BatchQueue { + private queue: string[] = []; + + private runChecks(items: string[]): void { + const errors: string[] = []; + items.forEach(item => { + if (this.queue.includes(item)) { + errors.push(item); + } + }); + + if (errors.length) { + throw new Error(`The following item(s) are already enqueued: [${[errors.join(', ')]}]`); + } + } + + public addItems(items: string[]): void { + this.runChecks(items); + items.forEach(item => { + this.queue.push(item); + }); + } + + public readNextItem(): string | undefined { + return this.queue[0]; + } + + public getQueue(): string[] { + return this.queue.slice(); + } + + public shiftQueue(): string | undefined { + return this.queue.shift(); + } + + public has(item: string): boolean { + return this.queue.includes(item); + } + + public size() { + return this.queue.length; + } +} diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts deleted file mode 100644 index de61ccc39cde7..0000000000000 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.test.ts +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -import { BatchQueues } from './batch_queues'; - -describe('BatchQueues', () => { - test('addQueue', () => { - const myQueue = ['a', 'b']; - const batchQueues = new BatchQueues(); - batchQueues.addQueue(myQueue); - const queues = batchQueues.getQueues(); - expect(queues[0].items).toEqual(myQueue); - }); - - test('readNextQueueItem and shiftQueue', () => { - const myQueue = ['a', 'b']; - - const batchQueues = new BatchQueues(); - expect(batchQueues.readNextQueueItem('nada')).toBeUndefined(); - const id = batchQueues.addQueue(myQueue); - - expect(batchQueues.readNextQueueItem(id)).toBe('a'); - // Second read yields same item - expect(batchQueues.readNextQueueItem(id)).toBe('a'); - - expect(batchQueues.shiftQueue(id)).toBe('a'); - expect(batchQueues.shiftQueue(id)).toBe('b'); - expect(batchQueues.shiftQueue(id)).toBe(undefined); - - expect(() => batchQueues.shiftQueue('nada')).toThrow('No queue found'); - }); - - test('deleteQueue', () => { - const myQueue = ['a', 'b']; - const batchQueues = new BatchQueues(); - expect(batchQueues.deleteQueue('nothing')).toBe(false); - const id = batchQueues.addQueue(myQueue); - expect(batchQueues.deleteQueue(id)).toBe(true); - }); - - test('findItemQueue', () => { - const myQueueA = ['a', 'b']; - const myQueueB = ['s', 't']; - const batchQueues = new BatchQueues(); - batchQueues.addQueue(myQueueA); - batchQueues.addQueue(myQueueB); - - expect(batchQueues.findItemQueue('s')!.items).toEqual(myQueueB); - }); - - it('prevents adding items that have already been queued', () => { - const myQueueA = ['s', 't']; - const myQueueB = ['s', 't', 'u']; - const batchQueues = new BatchQueues(); - batchQueues.addQueue(myQueueA); - try { - batchQueues.addQueue(myQueueB); - } catch (e) { - expect(e.message).toMatchSnapshot(); - return; - } - fail('Adding duplicate items to batch queues did not throw!'); - }); -}); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts deleted file mode 100644 index ffd29eee85047..0000000000000 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queues.ts +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { v4 as uuid } from 'uuid'; - -interface Queue { - id: string; - items: string[]; -} - -export class BatchQueues { - private queues: Map = new Map(); - - private requireQueue(id: string): Queue { - if (this.queues.has(id)) { - return this.queues.get(id)!; - } - throw new Error(`No queue found with id ${id}`); - } - - private runChecks(queue: string[]): void { - const errorItems: string[] = []; - queue.forEach(item => { - const maybeQueue = this.findItemQueue(item); - if (maybeQueue) { - errorItems.push(item); - } - }); - - if (errorItems.length) { - throw new Error(`The following item(s) are already enqueued: [${[errorItems.join(', ')]}]`); - } - } - - public getQueue(id: string): Queue | undefined { - return this.queues.get(id); - } - - public addQueue(queue: string[]): string { - this.runChecks(queue); - const id = uuid(); - this.queues.set(id, { - id, - items: queue, - }); - - return id; - } - - public getQueues(): Queue[] { - return Array.from(this.queues.values()); - } - - public readNextQueueItem(id: string): string | undefined { - const queue = this.getQueue(id); - if (queue) { - return queue.items[0]; - } - } - - public shiftQueue(id: string): string | undefined { - return this.requireQueue(id).items.shift(); - } - - public deleteQueue(id: string): boolean { - return this.queues.delete(id); - } - - public findItemQueue(item: string): Queue | undefined { - return this.getQueues().find(queue => { - return queue.items.find(i => i === item); - }); - } -} diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index bad6db62efe41..fab6026b89052 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -11,6 +11,8 @@ import { CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; +import { BatchQueues } from './batch_queues'; +import { BatchQueue } from './batch_queue'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -42,6 +44,7 @@ export class ReindexWorker { constructor( private client: SavedObjectsClientContract, private credentialStore: CredentialStore, + private batchQueue: BatchQueue, private clusterClient: IClusterClient, log: Logger, private licensing: LicensingPluginSetup @@ -134,6 +137,10 @@ export class ReindexWorker { this.inProgressOps = []; } + if (this.batchQueue.size()) { + const nextItem = this.batchQueue.readNextItem(); + } + // If there are operations in progress and we're not already updating operations, kick off the update loop if (!this.updateOperationLoopRunning) { this.startUpdateOperationLoop(); diff --git a/x-pack/plugins/upgrade_assistant/server/plugin.ts b/x-pack/plugins/upgrade_assistant/server/plugin.ts index 6ccd073a9e020..c7d3701082bfb 100644 --- a/x-pack/plugins/upgrade_assistant/server/plugin.ts +++ b/x-pack/plugins/upgrade_assistant/server/plugin.ts @@ -19,7 +19,9 @@ import { import { CloudSetup } from '../../cloud/server'; import { LicensingPluginSetup } from '../../licensing/server'; -import { CredentialStore, credentialStoreFactory } from './lib/reindexing/credential_store'; +import { credentialStoreFactory } from './lib/reindexing/credential_store'; +import { BatchQueues } from './lib/reindexing/batch_queues'; + import { ReindexWorker } from './lib/reindexing'; import { registerUpgradeAssistantUsageCollector } from './lib/telemetry'; import { registerClusterCheckupRoutes } from './routes/cluster_checkup'; @@ -36,7 +38,9 @@ interface PluginsSetup { export class UpgradeAssistantServerPlugin implements Plugin { private readonly logger: Logger; - private readonly credentialStore: CredentialStore; + + private readonly credentialStore = credentialStoreFactory(); + private readonly batchQueues = new BatchQueues(); // Properties set at setup private licensing?: LicensingPluginSetup; @@ -48,7 +52,6 @@ export class UpgradeAssistantServerPlugin implements Plugin { constructor({ logger }: PluginInitializerContext) { this.logger = logger.get(); - this.credentialStore = credentialStoreFactory(); } private getWorker() { @@ -114,7 +117,7 @@ export class UpgradeAssistantServerPlugin implements Plugin { ), }); - this.worker.start(); + this.worker!.start(); } stop(): void { From d805855b2e529821eb8bb00faf7b5fe800708dbf Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 3 Mar 2020 15:01:12 +0100 Subject: [PATCH 06/17] Clean up old batch queue implementation --- .../upgrade_assistant/server/lib/reindexing/worker.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index fab6026b89052..bad6db62efe41 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -11,8 +11,6 @@ import { CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; -import { BatchQueues } from './batch_queues'; -import { BatchQueue } from './batch_queue'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -44,7 +42,6 @@ export class ReindexWorker { constructor( private client: SavedObjectsClientContract, private credentialStore: CredentialStore, - private batchQueue: BatchQueue, private clusterClient: IClusterClient, log: Logger, private licensing: LicensingPluginSetup @@ -137,10 +134,6 @@ export class ReindexWorker { this.inProgressOps = []; } - if (this.batchQueue.size()) { - const nextItem = this.batchQueue.readNextItem(); - } - // If there are operations in progress and we're not already updating operations, kick off the update loop if (!this.updateOperationLoopRunning) { this.startUpdateOperationLoop(); From c4cd0480512baaa4e4a6328c984334e3231966b5 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 3 Mar 2020 16:51:23 +0100 Subject: [PATCH 07/17] Slight refactor --- .../server/lib/reindexing/batch_queue.test.ts | 23 +++++++------ .../server/lib/reindexing/batch_queue.ts | 33 +++++++++++++------ 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts index 7bc86d4a1fac5..50f519a2abac3 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts @@ -3,40 +3,43 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { BatchQueue } from './batch_queue'; +import { createBatchQueue } from './batch_queue'; describe('BatchQueues', () => { test('addItems', () => { const myItems = ['a', 'b']; - const batchQueue = new BatchQueue(); - batchQueue.addItems(myItems); - const queue = batchQueue.getQueue(); + const batchQueue = createBatchQueue(myItems); + const queue = batchQueue.toArray(); expect(queue).toEqual(myItems); }); test('readNextItem and shiftQueue', () => { - const myItems = ['a', 'b']; + const myItems = ['a', 'b', 'a', 'c', 'd', 'e', 'e', 'e']; - const batchQueue = new BatchQueue(); + const batchQueue = createBatchQueue([]); expect(batchQueue.readNextItem()).toBeUndefined(); - batchQueue.addItems(myItems); + batchQueue.enqueue(myItems); expect(batchQueue.readNextItem()).toBe('a'); // Second read yields same item expect(batchQueue.readNextItem()).toBe('a'); + // Test order items, also items are deduped expect(batchQueue.shiftQueue()).toBe('a'); expect(batchQueue.shiftQueue()).toBe('b'); + expect(batchQueue.shiftQueue()).toBe('c'); + expect(batchQueue.shiftQueue()).toBe('d'); + expect(batchQueue.shiftQueue()).toBe('e'); expect(batchQueue.shiftQueue()).toBe(undefined); }); it('prevents adding items that have already been queued', () => { const myItemsA = ['s', 't']; const myItemsB = ['s', 't', 'u']; - const batchQueues = new BatchQueue(); - batchQueues.addItems(myItemsA); + const batchQueues = createBatchQueue([]); + batchQueues.enqueue(myItemsA); try { - batchQueues.addItems(myItemsB); + batchQueues.enqueue(myItemsB); } catch (e) { expect(e.message).toMatchSnapshot(); return; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts index ed915041fddfb..7574bba95cc42 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts @@ -5,12 +5,16 @@ */ export class BatchQueue { - private queue: string[] = []; + private queue: Set = new Set(); + + constructor(items: string[]) { + this.enqueue(items); + } private runChecks(items: string[]): void { const errors: string[] = []; items.forEach(item => { - if (this.queue.includes(item)) { + if (this.queue.has(item)) { errors.push(item); } }); @@ -20,30 +24,39 @@ export class BatchQueue { } } - public addItems(items: string[]): void { + public enqueue(items: string[]): void { this.runChecks(items); items.forEach(item => { - this.queue.push(item); + this.queue.add(item); }); } public readNextItem(): string | undefined { - return this.queue[0]; + return this.toArray()[0]; } - public getQueue(): string[] { - return this.queue.slice(); + public toArray(): string[] { + return Array.from(this.queue.values()); } public shiftQueue(): string | undefined { - return this.queue.shift(); + const nextItem = this.readNextItem(); + if (nextItem) { + this.queue.delete(nextItem); + return nextItem; + } + return; } public has(item: string): boolean { - return this.queue.includes(item); + return this.queue.has(item); } public size() { - return this.queue.length; + return this.queue.size; } } + +export const createBatchQueue = (items: string[]): BatchQueue => { + return new BatchQueue(items); +}; From 0a64e0df0a4d17c65819a0959795ea7c683c36a7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 4 Mar 2020 09:16:20 +0100 Subject: [PATCH 08/17] Revert batch queues implementation --- .../__snapshots__/batch_queue.test.ts.snap | 3 - .../server/lib/reindexing/batch_queue.test.ts | 49 --------------- .../server/lib/reindexing/batch_queue.ts | 62 ------------------- .../upgrade_assistant/server/plugin.ts | 11 ++-- 4 files changed, 4 insertions(+), 121 deletions(-) delete mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queue.test.ts.snap delete mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts delete mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queue.test.ts.snap b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queue.test.ts.snap deleted file mode 100644 index 242994232e9f9..0000000000000 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/__snapshots__/batch_queue.test.ts.snap +++ /dev/null @@ -1,3 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`BatchQueues prevents adding items that have already been queued 1`] = `"The following item(s) are already enqueued: [s, t]"`; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts deleted file mode 100644 index 50f519a2abac3..0000000000000 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.test.ts +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -import { createBatchQueue } from './batch_queue'; - -describe('BatchQueues', () => { - test('addItems', () => { - const myItems = ['a', 'b']; - const batchQueue = createBatchQueue(myItems); - const queue = batchQueue.toArray(); - expect(queue).toEqual(myItems); - }); - - test('readNextItem and shiftQueue', () => { - const myItems = ['a', 'b', 'a', 'c', 'd', 'e', 'e', 'e']; - - const batchQueue = createBatchQueue([]); - expect(batchQueue.readNextItem()).toBeUndefined(); - batchQueue.enqueue(myItems); - - expect(batchQueue.readNextItem()).toBe('a'); - // Second read yields same item - expect(batchQueue.readNextItem()).toBe('a'); - - // Test order items, also items are deduped - expect(batchQueue.shiftQueue()).toBe('a'); - expect(batchQueue.shiftQueue()).toBe('b'); - expect(batchQueue.shiftQueue()).toBe('c'); - expect(batchQueue.shiftQueue()).toBe('d'); - expect(batchQueue.shiftQueue()).toBe('e'); - expect(batchQueue.shiftQueue()).toBe(undefined); - }); - - it('prevents adding items that have already been queued', () => { - const myItemsA = ['s', 't']; - const myItemsB = ['s', 't', 'u']; - const batchQueues = createBatchQueue([]); - batchQueues.enqueue(myItemsA); - try { - batchQueues.enqueue(myItemsB); - } catch (e) { - expect(e.message).toMatchSnapshot(); - return; - } - fail('Adding duplicate items to batch queues did not throw!'); - }); -}); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts deleted file mode 100644 index 7574bba95cc42..0000000000000 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/batch_queue.ts +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -export class BatchQueue { - private queue: Set = new Set(); - - constructor(items: string[]) { - this.enqueue(items); - } - - private runChecks(items: string[]): void { - const errors: string[] = []; - items.forEach(item => { - if (this.queue.has(item)) { - errors.push(item); - } - }); - - if (errors.length) { - throw new Error(`The following item(s) are already enqueued: [${[errors.join(', ')]}]`); - } - } - - public enqueue(items: string[]): void { - this.runChecks(items); - items.forEach(item => { - this.queue.add(item); - }); - } - - public readNextItem(): string | undefined { - return this.toArray()[0]; - } - - public toArray(): string[] { - return Array.from(this.queue.values()); - } - - public shiftQueue(): string | undefined { - const nextItem = this.readNextItem(); - if (nextItem) { - this.queue.delete(nextItem); - return nextItem; - } - return; - } - - public has(item: string): boolean { - return this.queue.has(item); - } - - public size() { - return this.queue.size; - } -} - -export const createBatchQueue = (items: string[]): BatchQueue => { - return new BatchQueue(items); -}; diff --git a/x-pack/plugins/upgrade_assistant/server/plugin.ts b/x-pack/plugins/upgrade_assistant/server/plugin.ts index c7d3701082bfb..6ccd073a9e020 100644 --- a/x-pack/plugins/upgrade_assistant/server/plugin.ts +++ b/x-pack/plugins/upgrade_assistant/server/plugin.ts @@ -19,9 +19,7 @@ import { import { CloudSetup } from '../../cloud/server'; import { LicensingPluginSetup } from '../../licensing/server'; -import { credentialStoreFactory } from './lib/reindexing/credential_store'; -import { BatchQueues } from './lib/reindexing/batch_queues'; - +import { CredentialStore, credentialStoreFactory } from './lib/reindexing/credential_store'; import { ReindexWorker } from './lib/reindexing'; import { registerUpgradeAssistantUsageCollector } from './lib/telemetry'; import { registerClusterCheckupRoutes } from './routes/cluster_checkup'; @@ -38,9 +36,7 @@ interface PluginsSetup { export class UpgradeAssistantServerPlugin implements Plugin { private readonly logger: Logger; - - private readonly credentialStore = credentialStoreFactory(); - private readonly batchQueues = new BatchQueues(); + private readonly credentialStore: CredentialStore; // Properties set at setup private licensing?: LicensingPluginSetup; @@ -52,6 +48,7 @@ export class UpgradeAssistantServerPlugin implements Plugin { constructor({ logger }: PluginInitializerContext) { this.logger = logger.get(); + this.credentialStore = credentialStoreFactory(); } private getWorker() { @@ -117,7 +114,7 @@ export class UpgradeAssistantServerPlugin implements Plugin { ), }); - this.worker!.start(); + this.worker.start(); } stop(): void { From b9c93a60f63aa365a712cc770fc07cbe5ef76fbf Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 4 Mar 2020 09:54:42 +0100 Subject: [PATCH 09/17] Introduction of QueueSettings Queue settings can be set on a reindex operation and set a timemstamp value on the reindex operation for the scheduler to use down the line for ordering operations and running them in series --- .../plugins/upgrade_assistant/common/types.ts | 22 +++++++++++++ .../server/lib/reindexing/reindex_actions.ts | 7 +++-- .../server/lib/reindexing/reindex_service.ts | 8 +++-- .../routes/reindex_indices/reindex_handler.ts | 17 +++++----- .../routes/reindex_indices/reindex_indices.ts | 31 ++++++++++++------- 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/common/types.ts b/x-pack/plugins/upgrade_assistant/common/types.ts index a0c12154988a1..ceb3a6dd60166 100644 --- a/x-pack/plugins/upgrade_assistant/common/types.ts +++ b/x-pack/plugins/upgrade_assistant/common/types.ts @@ -28,6 +28,19 @@ export enum ReindexStatus { } export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation'; + +export interface QueueSettings extends SavedObjectAttributes { + queuedAt: number; +} + +export interface ReindexOptions extends SavedObjectAttributes { + /** + * Set this key to configure a reindex operation as part of a + * batch to be run in series. + */ + queueSettings?: QueueSettings; +} + export interface ReindexOperation extends SavedObjectAttributes { indexName: string; newIndexName: string; @@ -40,6 +53,15 @@ export interface ReindexOperation extends SavedObjectAttributes { // This field is only used for the singleton IndexConsumerType documents. runningReindexCount: number | null; + + /** + * Options for the reindexing strategy. + * + * @remark + * Marked as optional for backwards compatibility. We should still + * be able to handle older ReindexOperation objects. + */ + reindexOptions?: ReindexOptions; } export type ReindexSavedObject = SavedObject; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts index 2ae340f12d80c..422e78c2f12ad 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts @@ -11,6 +11,7 @@ import { IndexGroup, REINDEX_OP_TYPE, ReindexOperation, + ReindexOptions, ReindexSavedObject, ReindexStatus, ReindexStep, @@ -34,8 +35,9 @@ export interface ReindexActions { /** * Creates a new reindexOp, does not perform any pre-flight checks. * @param indexName + * @param opts Options for the reindex operation */ - createReindexOp(indexName: string): Promise; + createReindexOp(indexName: string, opts?: ReindexOptions): Promise; /** * Deletes a reindexOp. @@ -150,7 +152,7 @@ export const reindexActionsFactory = ( // ----- Public interface return { - async createReindexOp(indexName: string) { + async createReindexOp(indexName: string, opts?: ReindexOptions) { return client.create(REINDEX_OP_TYPE, { indexName, newIndexName: generateNewIndexName(indexName), @@ -161,6 +163,7 @@ export const reindexActionsFactory = ( reindexTaskPercComplete: null, errorMessage: null, runningReindexCount: null, + reindexOptions: opts, }); }, diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts index b274743bdf279..8997c9c2008c5 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts @@ -8,6 +8,7 @@ import { first } from 'rxjs/operators'; import { IndexGroup, + ReindexOptions, ReindexSavedObject, ReindexStatus, ReindexStep, @@ -51,8 +52,9 @@ export interface ReindexService { /** * Creates a new reindex operation for a given index. * @param indexName + * @param opts */ - createReindexOperation(indexName: string): Promise; + createReindexOperation(indexName: string, opts?: ReindexOptions): Promise; /** * Retrieves all reindex operations that have the given status. @@ -517,7 +519,7 @@ export const reindexServiceFactory = ( } }, - async createReindexOperation(indexName: string) { + async createReindexOperation(indexName: string, opts?: ReindexOptions) { const indexExists = await callAsUser('indices.exists', { index: indexName }); if (!indexExists) { throw error.indexNotFound(`Index ${indexName} does not exist in this cluster.`); @@ -539,7 +541,7 @@ export const reindexServiceFactory = ( } } - return actions.createReindexOp(indexName); + return actions.createReindexOp(indexName, opts); }, async findReindexOperation(indexName: string) { diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts index 58723803868ae..f54d0d3b06526 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -8,10 +8,10 @@ import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana import { LicensingPluginSetup } from '../../../../licensing/server'; -import { ReindexStatus } from '../../../common/types'; +import { ReindexOptions, ReindexStatus } from '../../../common/types'; import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; -import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing'; +import { reindexServiceFactory } from '../../lib/reindexing'; import { CredentialStore } from '../../lib/reindexing/credential_store'; import { error } from '../../lib/reindexing/error'; @@ -23,7 +23,7 @@ interface ReindexHandlerArgs { licensing: LicensingPluginSetup; headers: Record; credentialStore: CredentialStore; - getWorker: () => ReindexWorker; + enqueue?: boolean; } export const reindexHandler = async ({ @@ -34,7 +34,7 @@ export const reindexHandler = async ({ licensing, log, savedObjects, - getWorker, + enqueue, }: ReindexHandlerArgs) => { const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser); @@ -51,17 +51,18 @@ export const reindexHandler = async ({ const existingOp = await reindexService.findReindexOperation(indexName); + const opts: ReindexOptions | undefined = enqueue + ? { queueSettings: { queuedAt: Date.now() } } + : undefined; + // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. const reindexOp = existingOp && existingOp.attributes.status === ReindexStatus.paused ? await reindexService.resumeReindexOperation(indexName) - : await reindexService.createReindexOperation(indexName); + : await reindexService.createReindexOperation(indexName, opts); // Add users credentials for the worker to use credentialStore.set(reindexOp, headers); - // Kick the worker on this node to immediately pickup the new reindex operation. - getWorker().forceRefresh(); - return reindexOp.attributes; }; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index 32d404168fc98..21034355c1144 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -104,17 +104,21 @@ export function registerReindexIndicesRoutes( ) => { const { indexName } = request.params; try { + const result = await reindexHandler({ + savedObjects: savedObjectsClient, + dataClient, + indexName, + log, + licensing, + headers: request.headers, + credentialStore, + }); + + // Kick the worker on this node to immediately pickup the new reindex operation. + getWorker().forceRefresh(); + return response.ok({ - body: await reindexHandler({ - savedObjects: savedObjectsClient, - dataClient, - indexName, - log, - licensing, - headers: request.headers, - credentialStore, - getWorker, - }), + body: result, }); } catch (e) { return mapAnyErrorToKibanaHttpResponse(e); @@ -158,7 +162,7 @@ export function registerReindexIndicesRoutes( licensing, headers: request.headers, credentialStore, - getWorker, + enqueue: true, }); results.started.push(result); } catch (e) { @@ -169,6 +173,11 @@ export function registerReindexIndicesRoutes( } } + if (results.errors.length < indexNames.length) { + // Kick the worker on this node to immediately pickup the batch. + getWorker().forceRefresh(); + } + return response.ok({ body: results }); } ) From 23ab266232344e763c0ca80fcfa609b2c50679b1 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 4 Mar 2020 11:34:52 +0100 Subject: [PATCH 10/17] Updated worker logic to handle items in queue in series --- .../server/lib/reindexing/worker.ts | 51 +++++++++++++++---- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index bad6db62efe41..3837991254122 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -5,7 +5,6 @@ */ import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server'; import moment from 'moment'; - import { ReindexSavedObject, ReindexStatus } from '../../../common/types'; import { CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; @@ -105,15 +104,17 @@ export class ReindexWorker { private startUpdateOperationLoop = async () => { this.updateOperationLoopRunning = true; - while (this.inProgressOps.length > 0) { - this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`); + try { + while (this.inProgressOps.length > 0) { + this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`); - // Push each operation through the state machine and refresh. - await Promise.all(this.inProgressOps.map(this.processNextStep)); - await this.refresh(); + // Push each operation through the state machine and refresh. + await Promise.all(this.inProgressOps.map(this.processNextStep)); + await this.refresh(); + } + } finally { + this.updateOperationLoopRunning = false; } - - this.updateOperationLoopRunning = false; }; private pollForOperations = async () => { @@ -126,14 +127,42 @@ export class ReindexWorker { } }; - private refresh = async () => { + private updateInProgressOps = async () => { try { - this.inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); + const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); + const parallelOps: ReindexSavedObject[] = []; + const queueOps: ReindexSavedObject[] = []; + for (const inProgressOp of inProgressOps) { + if (inProgressOp.attributes.reindexOptions?.queueSettings) { + queueOps.push(inProgressOp); + } else { + parallelOps.push(inProgressOp); + } + } + + if (queueOps.length) { + const [firstInQueueOp] = queueOps.sort( + (a, b) => + a.attributes.reindexOptions!.queueSettings!.queuedAt - + b.attributes.reindexOptions!.queueSettings!.queuedAt + ); + + this.log.debug( + `Queue detected; current length ${queueOps.length}, current item ReindexOperation(id: ${firstInQueueOp.id}, indexName: ${firstInQueueOp.attributes.indexName})` + ); + + this.inProgressOps = parallelOps.concat(firstInQueueOp); + } else { + this.inProgressOps = parallelOps; + } } catch (e) { - this.log.debug(`Could not fetch reindex operations from Elasticsearch`); + this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`); this.inProgressOps = []; } + }; + private refresh = async () => { + await this.updateInProgressOps(); // If there are operations in progress and we're not already updating operations, kick off the update loop if (!this.updateOperationLoopRunning) { this.startUpdateOperationLoop(); From 6cf9acc950723da7ce35fb38c8ab1830ef3fd1ae Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 4 Mar 2020 11:46:19 +0100 Subject: [PATCH 11/17] Refactor /batch endpoint response to "enqueued" not "started" --- .../routes/reindex_indices/reindex_indices.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index 21034355c1144..fe013b8ed514f 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -41,6 +41,11 @@ interface CreateReindexWorker { licensing: LicensingPluginSetup; } +interface BatchResponse { + enqueued: ReindexOperation[]; + errors: Array<{ indexName: string; message: string }>; +} + export function createReindexWorker({ logger, elasticsearchService, @@ -148,9 +153,9 @@ export function registerReindexIndicesRoutes( response ) => { const { indexNames } = request.body; - const results = { - started: [] as ReindexOperation[], - errors: [] as Array<{ indexName: string; message: string }>, + const results: BatchResponse = { + enqueued: [], + errors: [], }; for (const indexName of indexNames) { try { @@ -164,7 +169,7 @@ export function registerReindexIndicesRoutes( credentialStore, enqueue: true, }); - results.started.push(result); + results.enqueued.push(result); } catch (e) { results.errors.push({ indexName, From 02ae1b260e6c745227fdc73b0349c441ed61a37d Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Wed, 4 Mar 2020 12:05:26 +0100 Subject: [PATCH 12/17] Fixed jest tests --- .../lib/reindexing/reindex_service.test.ts | 2 +- .../reindex_indices/reindex_indices.test.ts | 39 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts index 6c3b2c869dc7f..886ea6761e3b7 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts @@ -215,7 +215,7 @@ describe('reindexService', () => { await service.createReindexOperation('myIndex'); - expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex'); + expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex', undefined); }); it('fails if index does not exist', async () => { diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index adf5b130b108e..fb4fab739a650 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -166,7 +166,7 @@ describe('reindex API', () => { ); // It called create correctly - expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex'); + expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', undefined); // It returned the right results expect(resp.status).toEqual(200); @@ -261,6 +261,9 @@ describe('reindex API', () => { }); describe('POST /api/upgrade_assistant/reindex/batch', () => { + const queueSettingsArg = { + queueSettings: { queuedAt: expect.any(Number) }, + }; it('creates a collection of index operations', async () => { mockReindexService.createReindexOperation .mockResolvedValueOnce({ @@ -283,16 +286,28 @@ describe('reindex API', () => { ); // It called create correctly - expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(1, 'theIndex1'); - expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(2, 'theIndex2'); - expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(3, 'theIndex3'); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 1, + 'theIndex1', + queueSettingsArg + ); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 2, + 'theIndex2', + queueSettingsArg + ); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 3, + 'theIndex3', + queueSettingsArg + ); // It returned the right results expect(resp.status).toEqual(200); const data = resp.payload; expect(data).toEqual({ errors: [], - started: [ + enqueued: [ { indexName: 'theIndex1' }, { indexName: 'theIndex2' }, { indexName: 'theIndex3' }, @@ -323,8 +338,16 @@ describe('reindex API', () => { // It called create correctly expect(mockReindexService.createReindexOperation).toHaveBeenCalledTimes(2); - expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(1, 'theIndex1'); - expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(2, 'theIndex3'); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 1, + 'theIndex1', + queueSettingsArg + ); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 2, + 'theIndex3', + queueSettingsArg + ); // It returned the right results expect(resp.status).toEqual(200); @@ -337,7 +360,7 @@ describe('reindex API', () => { }, { indexName: 'theIndex3', message: 'oops!' }, ], - started: [{ indexName: 'theIndex1' }], + enqueued: [{ indexName: 'theIndex1' }], }); }); }); From 4d57be9241e182bf493ba0cfdb3a25e2ebc81daa Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 5 Mar 2020 10:23:06 +0100 Subject: [PATCH 13/17] Refactor worker refresh operations for readability Created a new file op_utils where logic repsonsible for sorting and ordering reindex operation saved objects is. --- .../server/lib/reindexing/op_utils.ts | 56 +++++++++++++++++++ .../server/lib/reindexing/worker.ts | 27 +++------ 2 files changed, 63 insertions(+), 20 deletions(-) create mode 100644 x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts new file mode 100644 index 0000000000000..dbed7de13f010 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { flow } from 'fp-ts/lib/function'; +import { ReindexSavedObject } from '../../../common/types'; + +export interface SortedReindexSavedObjects { + /** + * Reindex objects sorted into this array represent Elasticsearch reindex tasks that + * have no inherent order and are considered to be processed in parallel. + */ + parallel: ReindexSavedObject[]; + + /** + * Reindex objects sorted into this array represent Elasticsearch reindex tasks that + * are consistently ordered (see {@link orderQueuedReindexOperations}) and should be + * processed in order. + */ + queue: ReindexSavedObject[]; +} + +const sortReindexOperations = (ops: ReindexSavedObject[]): SortedReindexSavedObjects => { + const parallel: ReindexSavedObject[] = []; + const queue: ReindexSavedObject[] = []; + for (const op of ops) { + if (op.attributes.reindexOptions?.queueSettings) { + queue.push(op); + } else { + parallel.push(op); + } + } + + return { + parallel, + queue, + }; +}; +const orderQueuedReindexOperations = ({ + parallel, + queue, +}: SortedReindexSavedObjects): SortedReindexSavedObjects => ({ + parallel, + // Sort asc + queue: queue.sort( + (a, b) => + a.attributes.reindexOptions!.queueSettings!.queuedAt - + b.attributes.reindexOptions!.queueSettings!.queuedAt + ), +}); + +export const sortAndOrderReindexOperations = flow( + sortReindexOperations, + orderQueuedReindexOperations +); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index 3837991254122..482b9f280ad7e 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -10,6 +10,7 @@ import { CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; +import { sortAndOrderReindexOperations } from './op_utils'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -130,31 +131,17 @@ export class ReindexWorker { private updateInProgressOps = async () => { try { const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); - const parallelOps: ReindexSavedObject[] = []; - const queueOps: ReindexSavedObject[] = []; - for (const inProgressOp of inProgressOps) { - if (inProgressOp.attributes.reindexOptions?.queueSettings) { - queueOps.push(inProgressOp); - } else { - parallelOps.push(inProgressOp); - } - } + const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps); - if (queueOps.length) { - const [firstInQueueOp] = queueOps.sort( - (a, b) => - a.attributes.reindexOptions!.queueSettings!.queuedAt - - b.attributes.reindexOptions!.queueSettings!.queuedAt - ); + const [firstOpInQueue] = queue; + if (firstOpInQueue) { this.log.debug( - `Queue detected; current length ${queueOps.length}, current item ReindexOperation(id: ${firstInQueueOp.id}, indexName: ${firstInQueueOp.attributes.indexName})` + `Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})` ); - - this.inProgressOps = parallelOps.concat(firstInQueueOp); - } else { - this.inProgressOps = parallelOps; } + + this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []); } catch (e) { this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`); this.inProgressOps = []; From 0fd95c2cc466112f126133dacbc52d66d4a4e4a0 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 5 Mar 2020 10:52:49 +0100 Subject: [PATCH 14/17] Add batch API integration test Also assert that reindexing is happening in the expected order --- .../upgrade_assistant/reindexing.js | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js index 38fc1f0c6356f..95b89f9cf37cc 100644 --- a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js +++ b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js @@ -7,6 +7,7 @@ import expect from '@kbn/expect'; import { ReindexStatus, REINDEX_OP_TYPE } from '../../../plugins/upgrade_assistant/common/types'; +import { generateNewIndexName } from '../../../plugins/upgrade_assistant/server/lib/reindexing/index_settings'; export default function({ getService }) { const supertest = getService('supertest'); @@ -31,6 +32,18 @@ export default function({ getService }) { return lastState; }; + const assertInProgress = async indexName => { + const lastState = ( + await supertest.get(`/api/upgrade_assistant/reindex/${indexName}`).expect(200) + ).body.reindexOp; + + if (lastState.status !== ReindexStatus.inProgress) { + throw new Error( + `${indexName} status ${lastState.status}, expected ${ReindexStatus.inProgress}` + ); + } + }; + describe('reindexing', () => { afterEach(() => { // Cleanup saved objects @@ -134,5 +147,51 @@ export default function({ getService }) { expect(lastState.errorMessage).to.equal(null); expect(lastState.status).to.equal(ReindexStatus.completed); }); + + it('should reindex a batch in order', async () => { + const test1 = 'batch-reindex-test1'; + const test2 = 'batch-reindex-test2'; + const test3 = 'batch-reindex-test3'; + + const cleanupReindex = async indexName => { + try { + await es.indices.delete({ index: generateNewIndexName(indexName) }); + } catch (e) { + try { + await es.indices.delete({ index: indexName }); + } catch (e) { + // Ignore + } + } + }; + + try { + await es.indices.create({ index: test1 }); + await es.indices.create({ index: test2 }); + await es.indices.create({ index: test3 }); + + const result = await supertest + .post(`/api/upgrade_assistant/reindex/batch`) + .set('kbn-xsrf', 'xxx') + .send({ indexNames: [test1, test2, test3] }) + .expect(200); + + expect(result.body.enqueued.length).to.equal(3); + expect(result.body.errors.length).to.equal(0); + + await assertInProgress(test1); + await waitForReindexToComplete(test1); + + await assertInProgress(test2); + await waitForReindexToComplete(test2); + + await assertInProgress(test3); + await waitForReindexToComplete(test3); + } finally { + await cleanupReindex(test1); + await cleanupReindex(test2); + await cleanupReindex(test3); + } + }); }); } From 2f294041a94b34271ab66a5fffcef7d9a0e6cfcd Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 5 Mar 2020 11:49:56 +0100 Subject: [PATCH 15/17] Added a new endpoint: GET batch/queue This allows users of the API to see what the current queue state is for visibility. Using the queue endpoint int he API integration tests for batch too. --- .../routes/reindex_indices/reindex_handler.ts | 4 +- .../routes/reindex_indices/reindex_indices.ts | 54 +++++++++++++++---- .../server/routes/reindex_indices/types.ts | 19 +++++++ .../upgrade_assistant/reindexing.js | 42 +++++++++------ 4 files changed, 90 insertions(+), 29 deletions(-) create mode 100644 x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts index f54d0d3b06526..cd46ada097e57 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -8,7 +8,7 @@ import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana import { LicensingPluginSetup } from '../../../../licensing/server'; -import { ReindexOptions, ReindexStatus } from '../../../common/types'; +import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types'; import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; import { reindexServiceFactory } from '../../lib/reindexing'; @@ -35,7 +35,7 @@ export const reindexHandler = async ({ log, savedObjects, enqueue, -}: ReindexHandlerArgs) => { +}: ReindexHandlerArgs): ReindexOperation => { const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser); const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index fe013b8ed514f..9bd7a24e204d0 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -5,33 +5,35 @@ */ import { schema } from '@kbn/config-schema'; import { - Logger, ElasticsearchServiceSetup, - SavedObjectsClient, kibanaResponseFactory, + Logger, + SavedObjectsClient, } from '../../../../../../src/core/server'; import { LicensingPluginSetup } from '../../../../licensing/server'; -import { ReindexOperation } from '../../../common/types'; +import { ReindexStatus } from '../../../common/types'; import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck'; import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing'; import { CredentialStore } from '../../lib/reindexing/credential_store'; import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; +import { sortAndOrderReindexOperations } from '../../lib/reindexing/op_utils'; import { ReindexError } from '../../lib/reindexing/error'; import { RouteDependencies } from '../../types'; import { AccessForbidden, - IndexNotFound, CannotCreateIndex, + IndexNotFound, + MultipleReindexJobsFound, ReindexAlreadyInProgress, ReindexTaskCannotBeDeleted, ReindexTaskFailed, - MultipleReindexJobsFound, } from '../../lib/reindexing/error_symbols'; import { reindexHandler } from './reindex_handler'; +import { GetBatchQueueResponse, PostBatchResponse } from './types'; interface CreateReindexWorker { logger: Logger; @@ -41,11 +43,6 @@ interface CreateReindexWorker { licensing: LicensingPluginSetup; } -interface BatchResponse { - enqueued: ReindexOperation[]; - errors: Array<{ indexName: string; message: string }>; -} - export function createReindexWorker({ logger, elasticsearchService, @@ -132,6 +129,41 @@ export function registerReindexIndicesRoutes( ) ); + // Get the current batch queue + router.get( + { + path: `${BASE_PATH}/batch/queue`, + validate: {}, + }, + async ( + { + core: { + elasticsearch: { dataClient }, + savedObjects, + }, + }, + request, + response + ) => { + const { client } = savedObjects; + const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); + const reindexActions = reindexActionsFactory(client, callAsCurrentUser); + try { + const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress); + const { queue } = sortAndOrderReindexOperations(inProgressOps); + const result: GetBatchQueueResponse = { + queue: queue.map(savedObject => savedObject.attributes), + }; + return response.ok({ + body: result, + }); + } catch (e) { + return mapAnyErrorToKibanaHttpResponse(e); + } + } + ); + + // Add indices for reindexing to the worker's batch router.post( { path: `${BASE_PATH}/batch`, @@ -153,7 +185,7 @@ export function registerReindexIndicesRoutes( response ) => { const { indexNames } = request.body; - const results: BatchResponse = { + const results: PostBatchResponse = { enqueued: [], errors: [], }; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts new file mode 100644 index 0000000000000..251450a9e37f2 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { ReindexOperation } from '../../../common/types'; + +// These types represent contracts from the reindex RESTful API endpoints and +// should be changed in a way that respects backwards compatibility. + +export interface PostBatchResponse { + enqueued: ReindexOperation[]; + errors: Array<{ indexName: string; message: string }>; +} + +export interface GetBatchQueueResponse { + queue: ReindexOperation[]; +} diff --git a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js index 95b89f9cf37cc..a99c02ffef23e 100644 --- a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js +++ b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js @@ -32,18 +32,6 @@ export default function({ getService }) { return lastState; }; - const assertInProgress = async indexName => { - const lastState = ( - await supertest.get(`/api/upgrade_assistant/reindex/${indexName}`).expect(200) - ).body.reindexOp; - - if (lastState.status !== ReindexStatus.inProgress) { - throw new Error( - `${indexName} status ${lastState.status}, expected ${ReindexStatus.inProgress}` - ); - } - }; - describe('reindexing', () => { afterEach(() => { // Cleanup saved objects @@ -148,7 +136,26 @@ export default function({ getService }) { expect(lastState.status).to.equal(ReindexStatus.completed); }); - it('should reindex a batch in order', async () => { + it('should reindex a batch in order and report queue state', async () => { + const assertQueueState = async (firstInQueueIndexName, queueLength) => { + const response = await supertest + .get(`/api/upgrade_assistant/reindex/batch/queue`) + .set('kbn-xsrf', 'xxx') + .expect(200); + + const { queue } = response.body; + + const [firstInQueue] = queue; + + if (!firstInQueueIndexName) { + expect(firstInQueueIndexName).to.be(undefined); + } else { + expect(firstInQueue.indexName).to.be(firstInQueueIndexName); + } + + expect(queue.length).to.be(queueLength); + }; + const test1 = 'batch-reindex-test1'; const test2 = 'batch-reindex-test2'; const test3 = 'batch-reindex-test3'; @@ -166,6 +173,7 @@ export default function({ getService }) { }; try { + // Set up indices for the batch await es.indices.create({ index: test1 }); await es.indices.create({ index: test2 }); await es.indices.create({ index: test3 }); @@ -179,14 +187,16 @@ export default function({ getService }) { expect(result.body.enqueued.length).to.equal(3); expect(result.body.errors.length).to.equal(0); - await assertInProgress(test1); + await assertQueueState(test1, 3); await waitForReindexToComplete(test1); - await assertInProgress(test2); + await assertQueueState(test2, 2); await waitForReindexToComplete(test2); - await assertInProgress(test3); + await assertQueueState(test3, 1); await waitForReindexToComplete(test3); + + await assertQueueState(undefined, 0); } finally { await cleanupReindex(test1); await cleanupReindex(test2); From e0f3a0e773cd14444abba4b6f65955a65ee4c812 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 5 Mar 2020 12:18:29 +0100 Subject: [PATCH 16/17] Reset the queuedAt timestamp on resume If a reindexOperation is being resumed and put in a queue we also need to reset the queuedAt timestamp to respect the new batch queue ordering. --- .../server/lib/reindexing/error.ts | 2 ++ .../server/lib/reindexing/error_symbols.ts | 1 + .../server/lib/reindexing/reindex_service.ts | 20 ++++++++++++------- .../routes/reindex_indices/reindex_handler.ts | 4 ++-- .../routes/reindex_indices/reindex_indices.ts | 2 ++ 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts index b7bc197fbd162..59922abd3e635 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts @@ -12,6 +12,7 @@ import { ReindexTaskFailed, ReindexAlreadyInProgress, MultipleReindexJobsFound, + ReindexCannotBeCancelled, } from './error_symbols'; export class ReindexError extends Error { @@ -32,4 +33,5 @@ export const error = { reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted), reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress), multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound), + reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled), }; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts index 9e49d280d1be2..d5e8d643f4595 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts @@ -11,5 +11,6 @@ export const CannotCreateIndex = Symbol('CannotCreateIndex'); export const ReindexTaskFailed = Symbol('ReindexTaskFailed'); export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted'); export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress'); +export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled'); export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound'); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts index 8997c9c2008c5..aa91b925b744b 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts @@ -85,8 +85,9 @@ export interface ReindexService { /** * Resumes the paused reindex operation for a given index. * @param indexName + * @param opts As with {@link createReindexOperation} we support this setting. */ - resumeReindexOperation(indexName: string): Promise; + resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise; /** * Cancel an in-progress reindex operation for a given index. Only allowed when the @@ -629,7 +630,7 @@ export const reindexServiceFactory = ( }); }, - async resumeReindexOperation(indexName: string) { + async resumeReindexOperation(indexName: string, opts?: ReindexOptions) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { @@ -644,7 +645,10 @@ export const reindexServiceFactory = ( throw new Error(`Reindex operation must be paused in order to be resumed.`); } - return actions.updateReindexOp(op, { status: ReindexStatus.inProgress }); + return actions.updateReindexOp(op, { + status: ReindexStatus.inProgress, + reindexOptions: opts, + }); }); }, @@ -652,11 +656,13 @@ export const reindexServiceFactory = ( const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { - throw new Error(`No reindex operation found for index ${indexName}`); + throw error.indexNotFound(`No reindex operation found for index ${indexName}`); } else if (reindexOp.attributes.status !== ReindexStatus.inProgress) { - throw new Error(`Reindex operation is not in progress`); + throw error.reindexCannotBeCancelled(`Reindex operation is not in progress`); } else if (reindexOp.attributes.lastCompletedStep !== ReindexStep.reindexStarted) { - throw new Error(`Reindex operation is not current waiting for reindex task to complete`); + throw error.reindexCannotBeCancelled( + `Reindex operation is not currently waiting for reindex task to complete` + ); } const resp = await callAsUser('tasks.cancel', { @@ -664,7 +670,7 @@ export const reindexServiceFactory = ( }); if (resp.node_failures && resp.node_failures.length > 0) { - throw new Error(`Could not cancel reindex.`); + throw error.reindexCannotBeCancelled(`Could not cancel reindex.`); } return reindexOp; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts index cd46ada097e57..944b4a225d442 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -35,7 +35,7 @@ export const reindexHandler = async ({ log, savedObjects, enqueue, -}: ReindexHandlerArgs): ReindexOperation => { +}: ReindexHandlerArgs): Promise => { const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser); const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing); @@ -58,7 +58,7 @@ export const reindexHandler = async ({ // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. const reindexOp = existingOp && existingOp.attributes.status === ReindexStatus.paused - ? await reindexService.resumeReindexOperation(indexName) + ? await reindexService.resumeReindexOperation(indexName, opts) : await reindexService.createReindexOperation(indexName, opts); // Add users credentials for the worker to use diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index 9bd7a24e204d0..697b73d8e10f6 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -28,6 +28,7 @@ import { IndexNotFound, MultipleReindexJobsFound, ReindexAlreadyInProgress, + ReindexCannotBeCancelled, ReindexTaskCannotBeDeleted, ReindexTaskFailed, } from '../../lib/reindexing/error_symbols'; @@ -69,6 +70,7 @@ const mapAnyErrorToKibanaHttpResponse = (e: any) => { return kibanaResponseFactory.customError({ body: e.message, statusCode: 422 }); case ReindexAlreadyInProgress: case MultipleReindexJobsFound: + case ReindexCannotBeCancelled: return kibanaResponseFactory.badRequest({ body: e.message }); default: // nothing matched From 63f66597a69a86b4865b8b6461fd1146147bcb00 Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Thu, 5 Mar 2020 14:56:54 +0100 Subject: [PATCH 17/17] Fix jest test Added 'undefined' as the second optional param to resumeIndexOperation call. --- .../server/routes/reindex_indices/reindex_indices.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index fb4fab739a650..af4f7f436ec81 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -233,7 +233,7 @@ describe('reindex API', () => { kibanaResponseFactory ); // It called resume correctly - expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex'); + expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', undefined); expect(mockReindexService.createReindexOperation).not.toHaveBeenCalled(); // It returned the right results