diff --git a/x-pack/plugins/upgrade_assistant/common/types.ts b/x-pack/plugins/upgrade_assistant/common/types.ts index a0c12154988a1..ceb3a6dd60166 100644 --- a/x-pack/plugins/upgrade_assistant/common/types.ts +++ b/x-pack/plugins/upgrade_assistant/common/types.ts @@ -28,6 +28,19 @@ export enum ReindexStatus { } export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation'; + +export interface QueueSettings extends SavedObjectAttributes { + queuedAt: number; +} + +export interface ReindexOptions extends SavedObjectAttributes { + /** + * Set this key to configure a reindex operation as part of a + * batch to be run in series. + */ + queueSettings?: QueueSettings; +} + export interface ReindexOperation extends SavedObjectAttributes { indexName: string; newIndexName: string; @@ -40,6 +53,15 @@ export interface ReindexOperation extends SavedObjectAttributes { // This field is only used for the singleton IndexConsumerType documents. runningReindexCount: number | null; + + /** + * Options for the reindexing strategy. + * + * @remark + * Marked as optional for backwards compatibility. We should still + * be able to handle older ReindexOperation objects. + */ + reindexOptions?: ReindexOptions; } export type ReindexSavedObject = SavedObject; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts b/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts index e7636eea66479..6182a82f6f1bd 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/es_version_precheck.ts @@ -90,9 +90,9 @@ export const esVersionCheck = async ( } }; -export const versionCheckHandlerWrapper = (handler: RequestHandler) => async ( +export const versionCheckHandlerWrapper = (handler: RequestHandler) => async ( ctx: RequestHandlerContext, - request: KibanaRequest, + request: KibanaRequest, response: KibanaResponseFactory ) => { const errorResponse = await esVersionCheck(ctx, response); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts index b7bc197fbd162..59922abd3e635 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error.ts @@ -12,6 +12,7 @@ import { ReindexTaskFailed, ReindexAlreadyInProgress, MultipleReindexJobsFound, + ReindexCannotBeCancelled, } from './error_symbols'; export class ReindexError extends Error { @@ -32,4 +33,5 @@ export const error = { reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted), reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress), multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound), + reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled), }; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts index 9e49d280d1be2..d5e8d643f4595 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/error_symbols.ts @@ -11,5 +11,6 @@ export const CannotCreateIndex = Symbol('CannotCreateIndex'); export const ReindexTaskFailed = Symbol('ReindexTaskFailed'); export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted'); export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress'); +export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled'); export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound'); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts new file mode 100644 index 0000000000000..dbed7de13f010 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { flow } from 'fp-ts/lib/function'; +import { ReindexSavedObject } from '../../../common/types'; + +export interface SortedReindexSavedObjects { + /** + * Reindex objects sorted into this array represent Elasticsearch reindex tasks that + * have no inherent order and are considered to be processed in parallel. + */ + parallel: ReindexSavedObject[]; + + /** + * Reindex objects sorted into this array represent Elasticsearch reindex tasks that + * are consistently ordered (see {@link orderQueuedReindexOperations}) and should be + * processed in order. + */ + queue: ReindexSavedObject[]; +} + +const sortReindexOperations = (ops: ReindexSavedObject[]): SortedReindexSavedObjects => { + const parallel: ReindexSavedObject[] = []; + const queue: ReindexSavedObject[] = []; + for (const op of ops) { + if (op.attributes.reindexOptions?.queueSettings) { + queue.push(op); + } else { + parallel.push(op); + } + } + + return { + parallel, + queue, + }; +}; +const orderQueuedReindexOperations = ({ + parallel, + queue, +}: SortedReindexSavedObjects): SortedReindexSavedObjects => ({ + parallel, + // Sort asc + queue: queue.sort( + (a, b) => + a.attributes.reindexOptions!.queueSettings!.queuedAt - + b.attributes.reindexOptions!.queueSettings!.queuedAt + ), +}); + +export const sortAndOrderReindexOperations = flow( + sortReindexOperations, + orderQueuedReindexOperations +); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts index 2ae340f12d80c..422e78c2f12ad 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_actions.ts @@ -11,6 +11,7 @@ import { IndexGroup, REINDEX_OP_TYPE, ReindexOperation, + ReindexOptions, ReindexSavedObject, ReindexStatus, ReindexStep, @@ -34,8 +35,9 @@ export interface ReindexActions { /** * Creates a new reindexOp, does not perform any pre-flight checks. * @param indexName + * @param opts Options for the reindex operation */ - createReindexOp(indexName: string): Promise; + createReindexOp(indexName: string, opts?: ReindexOptions): Promise; /** * Deletes a reindexOp. @@ -150,7 +152,7 @@ export const reindexActionsFactory = ( // ----- Public interface return { - async createReindexOp(indexName: string) { + async createReindexOp(indexName: string, opts?: ReindexOptions) { return client.create(REINDEX_OP_TYPE, { indexName, newIndexName: generateNewIndexName(indexName), @@ -161,6 +163,7 @@ export const reindexActionsFactory = ( reindexTaskPercComplete: null, errorMessage: null, runningReindexCount: null, + reindexOptions: opts, }); }, diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts index 6c3b2c869dc7f..886ea6761e3b7 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.test.ts @@ -215,7 +215,7 @@ describe('reindexService', () => { await service.createReindexOperation('myIndex'); - expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex'); + expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex', undefined); }); it('fails if index does not exist', async () => { diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts index b274743bdf279..aa91b925b744b 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/reindex_service.ts @@ -8,6 +8,7 @@ import { first } from 'rxjs/operators'; import { IndexGroup, + ReindexOptions, ReindexSavedObject, ReindexStatus, ReindexStep, @@ -51,8 +52,9 @@ export interface ReindexService { /** * Creates a new reindex operation for a given index. * @param indexName + * @param opts */ - createReindexOperation(indexName: string): Promise; + createReindexOperation(indexName: string, opts?: ReindexOptions): Promise; /** * Retrieves all reindex operations that have the given status. @@ -83,8 +85,9 @@ export interface ReindexService { /** * Resumes the paused reindex operation for a given index. * @param indexName + * @param opts As with {@link createReindexOperation} we support this setting. */ - resumeReindexOperation(indexName: string): Promise; + resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise; /** * Cancel an in-progress reindex operation for a given index. Only allowed when the @@ -517,7 +520,7 @@ export const reindexServiceFactory = ( } }, - async createReindexOperation(indexName: string) { + async createReindexOperation(indexName: string, opts?: ReindexOptions) { const indexExists = await callAsUser('indices.exists', { index: indexName }); if (!indexExists) { throw error.indexNotFound(`Index ${indexName} does not exist in this cluster.`); @@ -539,7 +542,7 @@ export const reindexServiceFactory = ( } } - return actions.createReindexOp(indexName); + return actions.createReindexOp(indexName, opts); }, async findReindexOperation(indexName: string) { @@ -627,7 +630,7 @@ export const reindexServiceFactory = ( }); }, - async resumeReindexOperation(indexName: string) { + async resumeReindexOperation(indexName: string, opts?: ReindexOptions) { const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { @@ -642,7 +645,10 @@ export const reindexServiceFactory = ( throw new Error(`Reindex operation must be paused in order to be resumed.`); } - return actions.updateReindexOp(op, { status: ReindexStatus.inProgress }); + return actions.updateReindexOp(op, { + status: ReindexStatus.inProgress, + reindexOptions: opts, + }); }); }, @@ -650,11 +656,13 @@ export const reindexServiceFactory = ( const reindexOp = await this.findReindexOperation(indexName); if (!reindexOp) { - throw new Error(`No reindex operation found for index ${indexName}`); + throw error.indexNotFound(`No reindex operation found for index ${indexName}`); } else if (reindexOp.attributes.status !== ReindexStatus.inProgress) { - throw new Error(`Reindex operation is not in progress`); + throw error.reindexCannotBeCancelled(`Reindex operation is not in progress`); } else if (reindexOp.attributes.lastCompletedStep !== ReindexStep.reindexStarted) { - throw new Error(`Reindex operation is not current waiting for reindex task to complete`); + throw error.reindexCannotBeCancelled( + `Reindex operation is not currently waiting for reindex task to complete` + ); } const resp = await callAsUser('tasks.cancel', { @@ -662,7 +670,7 @@ export const reindexServiceFactory = ( }); if (resp.node_failures && resp.node_failures.length > 0) { - throw new Error(`Could not cancel reindex.`); + throw error.reindexCannotBeCancelled(`Could not cancel reindex.`); } return reindexOp; diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index bad6db62efe41..482b9f280ad7e 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -5,12 +5,12 @@ */ import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server'; import moment from 'moment'; - import { ReindexSavedObject, ReindexStatus } from '../../../common/types'; import { CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; +import { sortAndOrderReindexOperations } from './op_utils'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -105,15 +105,17 @@ export class ReindexWorker { private startUpdateOperationLoop = async () => { this.updateOperationLoopRunning = true; - while (this.inProgressOps.length > 0) { - this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`); + try { + while (this.inProgressOps.length > 0) { + this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`); - // Push each operation through the state machine and refresh. - await Promise.all(this.inProgressOps.map(this.processNextStep)); - await this.refresh(); + // Push each operation through the state machine and refresh. + await Promise.all(this.inProgressOps.map(this.processNextStep)); + await this.refresh(); + } + } finally { + this.updateOperationLoopRunning = false; } - - this.updateOperationLoopRunning = false; }; private pollForOperations = async () => { @@ -126,14 +128,28 @@ export class ReindexWorker { } }; - private refresh = async () => { + private updateInProgressOps = async () => { try { - this.inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); + const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress); + const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps); + + const [firstOpInQueue] = queue; + + if (firstOpInQueue) { + this.log.debug( + `Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})` + ); + } + + this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []); } catch (e) { - this.log.debug(`Could not fetch reindex operations from Elasticsearch`); + this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`); this.inProgressOps = []; } + }; + private refresh = async () => { + await this.updateInProgressOps(); // If there are operations in progress and we're not already updating operations, kick off the update loop if (!this.updateOperationLoopRunning) { this.startUpdateOperationLoop(); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts new file mode 100644 index 0000000000000..9f1d3e4021c3f --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices'; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts new file mode 100644 index 0000000000000..944b4a225d442 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { i18n } from '@kbn/i18n'; +import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana/server'; + +import { LicensingPluginSetup } from '../../../../licensing/server'; + +import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types'; + +import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; +import { reindexServiceFactory } from '../../lib/reindexing'; +import { CredentialStore } from '../../lib/reindexing/credential_store'; +import { error } from '../../lib/reindexing/error'; + +interface ReindexHandlerArgs { + savedObjects: SavedObjectsClientContract; + dataClient: IScopedClusterClient; + indexName: string; + log: Logger; + licensing: LicensingPluginSetup; + headers: Record; + credentialStore: CredentialStore; + enqueue?: boolean; +} + +export const reindexHandler = async ({ + credentialStore, + dataClient, + headers, + indexName, + licensing, + log, + savedObjects, + enqueue, +}: ReindexHandlerArgs): Promise => { + const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); + const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser); + const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing); + + if (!(await reindexService.hasRequiredPrivileges(indexName))) { + throw error.accessForbidden( + i18n.translate('xpack.upgradeAssistant.reindex.reindexPrivilegesErrorBatch', { + defaultMessage: `You do not have adequate privileges to reindex "{indexName}".`, + values: { indexName }, + }) + ); + } + + const existingOp = await reindexService.findReindexOperation(indexName); + + const opts: ReindexOptions | undefined = enqueue + ? { queueSettings: { queuedAt: Date.now() } } + : undefined; + + // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. + const reindexOp = + existingOp && existingOp.attributes.status === ReindexStatus.paused + ? await reindexService.resumeReindexOperation(indexName, opts) + : await reindexService.createReindexOperation(indexName, opts); + + // Add users credentials for the worker to use + credentialStore.set(reindexOp, headers); + + return reindexOp.attributes; +}; diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts similarity index 70% rename from x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.test.ts rename to x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts index 695bb6304cfdf..af4f7f436ec81 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.test.ts @@ -5,9 +5,9 @@ */ import { kibanaResponseFactory } from 'src/core/server'; -import { licensingMock } from '../../../licensing/server/mocks'; -import { createMockRouter, MockRouter, routeHandlerContextMock } from './__mocks__/routes.mock'; -import { createRequestMock } from './__mocks__/request.mock'; +import { licensingMock } from '../../../../licensing/server/mocks'; +import { createMockRouter, MockRouter, routeHandlerContextMock } from '../__mocks__/routes.mock'; +import { createRequestMock } from '../__mocks__/request.mock'; const mockReindexService = { hasRequiredPrivileges: jest.fn(), @@ -21,18 +21,23 @@ const mockReindexService = { cancelReindexing: jest.fn(), }; -jest.mock('../lib/es_version_precheck', () => ({ +jest.mock('../../lib/es_version_precheck', () => ({ versionCheckHandlerWrapper: (a: any) => a, })); -jest.mock('../lib/reindexing', () => { +jest.mock('../../lib/reindexing', () => { return { reindexServiceFactory: () => mockReindexService, }; }); -import { IndexGroup, ReindexSavedObject, ReindexStatus, ReindexWarning } from '../../common/types'; -import { credentialStoreFactory } from '../lib/reindexing/credential_store'; +import { + IndexGroup, + ReindexSavedObject, + ReindexStatus, + ReindexWarning, +} from '../../../common/types'; +import { credentialStoreFactory } from '../../lib/reindexing/credential_store'; import { registerReindexIndicesRoutes } from './reindex_indices'; /** @@ -76,7 +81,7 @@ describe('reindex API', () => { }); afterEach(() => { - jest.resetAllMocks(); + jest.clearAllMocks(); }); describe('GET /api/upgrade_assistant/reindex/{indexName}', () => { @@ -161,7 +166,7 @@ describe('reindex API', () => { ); // It called create correctly - expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex'); + expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', undefined); // It returned the right results expect(resp.status).toEqual(200); @@ -228,7 +233,7 @@ describe('reindex API', () => { kibanaResponseFactory ); // It called resume correctly - expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex'); + expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', undefined); expect(mockReindexService.createReindexOperation).not.toHaveBeenCalled(); // It returned the right results @@ -255,6 +260,111 @@ describe('reindex API', () => { }); }); + describe('POST /api/upgrade_assistant/reindex/batch', () => { + const queueSettingsArg = { + queueSettings: { queuedAt: expect.any(Number) }, + }; + it('creates a collection of index operations', async () => { + mockReindexService.createReindexOperation + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex1' }, + }) + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex2' }, + }) + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex3' }, + }); + + const resp = await routeDependencies.router.getHandler({ + method: 'post', + pathPattern: '/api/upgrade_assistant/reindex/batch', + })( + routeHandlerContextMock, + createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }), + kibanaResponseFactory + ); + + // It called create correctly + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 1, + 'theIndex1', + queueSettingsArg + ); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 2, + 'theIndex2', + queueSettingsArg + ); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 3, + 'theIndex3', + queueSettingsArg + ); + + // It returned the right results + expect(resp.status).toEqual(200); + const data = resp.payload; + expect(data).toEqual({ + errors: [], + enqueued: [ + { indexName: 'theIndex1' }, + { indexName: 'theIndex2' }, + { indexName: 'theIndex3' }, + ], + }); + }); + + it('gracefully handles partial successes', async () => { + mockReindexService.createReindexOperation + .mockResolvedValueOnce({ + attributes: { indexName: 'theIndex1' }, + }) + .mockRejectedValueOnce(new Error('oops!')); + + mockReindexService.hasRequiredPrivileges + .mockResolvedValueOnce(true) + .mockResolvedValueOnce(false) + .mockResolvedValueOnce(true); + + const resp = await routeDependencies.router.getHandler({ + method: 'post', + pathPattern: '/api/upgrade_assistant/reindex/batch', + })( + routeHandlerContextMock, + createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }), + kibanaResponseFactory + ); + + // It called create correctly + expect(mockReindexService.createReindexOperation).toHaveBeenCalledTimes(2); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 1, + 'theIndex1', + queueSettingsArg + ); + expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith( + 2, + 'theIndex3', + queueSettingsArg + ); + + // It returned the right results + expect(resp.status).toEqual(200); + const data = resp.payload; + expect(data).toEqual({ + errors: [ + { + indexName: 'theIndex2', + message: 'You do not have adequate privileges to reindex "theIndex2".', + }, + { indexName: 'theIndex3', message: 'oops!' }, + ], + enqueued: [{ indexName: 'theIndex1' }], + }); + }); + }); + describe('POST /api/upgrade_assistant/reindex/{indexName}/cancel', () => { it('returns a 501', async () => { mockReindexService.cancelReindexing.mockResolvedValueOnce({}); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts similarity index 58% rename from x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.ts rename to x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index 72c2f2c29b72e..697b73d8e10f6 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -3,31 +3,38 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ - import { schema } from '@kbn/config-schema'; import { - Logger, ElasticsearchServiceSetup, - SavedObjectsClient, kibanaResponseFactory, -} from '../../../../../src/core/server'; -import { ReindexStatus } from '../../common/types'; -import { versionCheckHandlerWrapper } from '../lib/es_version_precheck'; -import { reindexServiceFactory, ReindexWorker } from '../lib/reindexing'; -import { CredentialStore } from '../lib/reindexing/credential_store'; -import { reindexActionsFactory } from '../lib/reindexing/reindex_actions'; -import { RouteDependencies } from '../types'; -import { LicensingPluginSetup } from '../../../licensing/server'; -import { ReindexError } from '../lib/reindexing/error'; + Logger, + SavedObjectsClient, +} from '../../../../../../src/core/server'; + +import { LicensingPluginSetup } from '../../../../licensing/server'; + +import { ReindexStatus } from '../../../common/types'; + +import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck'; +import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing'; +import { CredentialStore } from '../../lib/reindexing/credential_store'; +import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions'; +import { sortAndOrderReindexOperations } from '../../lib/reindexing/op_utils'; +import { ReindexError } from '../../lib/reindexing/error'; +import { RouteDependencies } from '../../types'; import { AccessForbidden, - IndexNotFound, CannotCreateIndex, + IndexNotFound, + MultipleReindexJobsFound, ReindexAlreadyInProgress, + ReindexCannotBeCancelled, ReindexTaskCannotBeDeleted, ReindexTaskFailed, - MultipleReindexJobsFound, -} from '../lib/reindexing/error_symbols'; +} from '../../lib/reindexing/error_symbols'; + +import { reindexHandler } from './reindex_handler'; +import { GetBatchQueueResponse, PostBatchResponse } from './types'; interface CreateReindexWorker { logger: Logger; @@ -63,6 +70,7 @@ const mapAnyErrorToKibanaHttpResponse = (e: any) => { return kibanaResponseFactory.customError({ body: e.message, statusCode: 422 }); case ReindexAlreadyInProgress: case MultipleReindexJobsFound: + case ReindexCannotBeCancelled: return kibanaResponseFactory.badRequest({ body: e.message }); default: // nothing matched @@ -91,46 +99,31 @@ export function registerReindexIndicesRoutes( async ( { core: { - savedObjects, + savedObjects: { client: savedObjectsClient }, elasticsearch: { dataClient }, }, }, request, response ) => { - const { indexName } = request.params as any; - const { client } = savedObjects; - const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); - const reindexActions = reindexActionsFactory(client, callAsCurrentUser); - const reindexService = reindexServiceFactory( - callAsCurrentUser, - reindexActions, - log, - licensing - ); - + const { indexName } = request.params; try { - if (!(await reindexService.hasRequiredPrivileges(indexName))) { - return response.forbidden({ - body: `You do not have adequate privileges to reindex this index.`, - }); - } - - const existingOp = await reindexService.findReindexOperation(indexName); - - // If the reindexOp already exists and it's paused, resume it. Otherwise create a new one. - const reindexOp = - existingOp && existingOp.attributes.status === ReindexStatus.paused - ? await reindexService.resumeReindexOperation(indexName) - : await reindexService.createReindexOperation(indexName); - - // Add users credentials for the worker to use - credentialStore.set(reindexOp, request.headers); + const result = await reindexHandler({ + savedObjects: savedObjectsClient, + dataClient, + indexName, + log, + licensing, + headers: request.headers, + credentialStore, + }); // Kick the worker on this node to immediately pickup the new reindex operation. getWorker().forceRefresh(); - return response.ok({ body: reindexOp.attributes }); + return response.ok({ + body: result, + }); } catch (e) { return mapAnyErrorToKibanaHttpResponse(e); } @@ -138,6 +131,97 @@ export function registerReindexIndicesRoutes( ) ); + // Get the current batch queue + router.get( + { + path: `${BASE_PATH}/batch/queue`, + validate: {}, + }, + async ( + { + core: { + elasticsearch: { dataClient }, + savedObjects, + }, + }, + request, + response + ) => { + const { client } = savedObjects; + const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); + const reindexActions = reindexActionsFactory(client, callAsCurrentUser); + try { + const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress); + const { queue } = sortAndOrderReindexOperations(inProgressOps); + const result: GetBatchQueueResponse = { + queue: queue.map(savedObject => savedObject.attributes), + }; + return response.ok({ + body: result, + }); + } catch (e) { + return mapAnyErrorToKibanaHttpResponse(e); + } + } + ); + + // Add indices for reindexing to the worker's batch + router.post( + { + path: `${BASE_PATH}/batch`, + validate: { + body: schema.object({ + indexNames: schema.arrayOf(schema.string()), + }), + }, + }, + versionCheckHandlerWrapper( + async ( + { + core: { + savedObjects: { client: savedObjectsClient }, + elasticsearch: { dataClient }, + }, + }, + request, + response + ) => { + const { indexNames } = request.body; + const results: PostBatchResponse = { + enqueued: [], + errors: [], + }; + for (const indexName of indexNames) { + try { + const result = await reindexHandler({ + savedObjects: savedObjectsClient, + dataClient, + indexName, + log, + licensing, + headers: request.headers, + credentialStore, + enqueue: true, + }); + results.enqueued.push(result); + } catch (e) { + results.errors.push({ + indexName, + message: e.message, + }); + } + } + + if (results.errors.length < indexNames.length) { + // Kick the worker on this node to immediately pickup the batch. + getWorker().forceRefresh(); + } + + return response.ok({ body: results }); + } + ) + ); + // Get status router.get( { @@ -160,7 +244,7 @@ export function registerReindexIndicesRoutes( response ) => { const { client } = savedObjects; - const { indexName } = request.params as any; + const { indexName } = request.params; const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(client, callAsCurrentUser); const reindexService = reindexServiceFactory( @@ -215,7 +299,7 @@ export function registerReindexIndicesRoutes( request, response ) => { - const { indexName } = request.params as any; + const { indexName } = request.params; const { client } = savedObjects; const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient); const reindexActions = reindexActionsFactory(client, callAsCurrentUser); diff --git a/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts new file mode 100644 index 0000000000000..251450a9e37f2 --- /dev/null +++ b/x-pack/plugins/upgrade_assistant/server/routes/reindex_indices/types.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { ReindexOperation } from '../../../common/types'; + +// These types represent contracts from the reindex RESTful API endpoints and +// should be changed in a way that respects backwards compatibility. + +export interface PostBatchResponse { + enqueued: ReindexOperation[]; + errors: Array<{ indexName: string; message: string }>; +} + +export interface GetBatchQueueResponse { + queue: ReindexOperation[]; +} diff --git a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js index 38fc1f0c6356f..a99c02ffef23e 100644 --- a/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js +++ b/x-pack/test/upgrade_assistant_integration/upgrade_assistant/reindexing.js @@ -7,6 +7,7 @@ import expect from '@kbn/expect'; import { ReindexStatus, REINDEX_OP_TYPE } from '../../../plugins/upgrade_assistant/common/types'; +import { generateNewIndexName } from '../../../plugins/upgrade_assistant/server/lib/reindexing/index_settings'; export default function({ getService }) { const supertest = getService('supertest'); @@ -134,5 +135,73 @@ export default function({ getService }) { expect(lastState.errorMessage).to.equal(null); expect(lastState.status).to.equal(ReindexStatus.completed); }); + + it('should reindex a batch in order and report queue state', async () => { + const assertQueueState = async (firstInQueueIndexName, queueLength) => { + const response = await supertest + .get(`/api/upgrade_assistant/reindex/batch/queue`) + .set('kbn-xsrf', 'xxx') + .expect(200); + + const { queue } = response.body; + + const [firstInQueue] = queue; + + if (!firstInQueueIndexName) { + expect(firstInQueueIndexName).to.be(undefined); + } else { + expect(firstInQueue.indexName).to.be(firstInQueueIndexName); + } + + expect(queue.length).to.be(queueLength); + }; + + const test1 = 'batch-reindex-test1'; + const test2 = 'batch-reindex-test2'; + const test3 = 'batch-reindex-test3'; + + const cleanupReindex = async indexName => { + try { + await es.indices.delete({ index: generateNewIndexName(indexName) }); + } catch (e) { + try { + await es.indices.delete({ index: indexName }); + } catch (e) { + // Ignore + } + } + }; + + try { + // Set up indices for the batch + await es.indices.create({ index: test1 }); + await es.indices.create({ index: test2 }); + await es.indices.create({ index: test3 }); + + const result = await supertest + .post(`/api/upgrade_assistant/reindex/batch`) + .set('kbn-xsrf', 'xxx') + .send({ indexNames: [test1, test2, test3] }) + .expect(200); + + expect(result.body.enqueued.length).to.equal(3); + expect(result.body.errors.length).to.equal(0); + + await assertQueueState(test1, 3); + await waitForReindexToComplete(test1); + + await assertQueueState(test2, 2); + await waitForReindexToComplete(test2); + + await assertQueueState(test3, 1); + await waitForReindexToComplete(test3); + + await assertQueueState(undefined, 0); + } finally { + await cleanupReindex(test1); + await cleanupReindex(test2); + await cleanupReindex(test3); + } + }); }); }