Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions x-pack/plugins/upgrade_assistant/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ export enum ReindexStatus {
}

export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation';

export interface QueueSettings extends SavedObjectAttributes {
queuedAt: number;
}

export interface ReindexOptions extends SavedObjectAttributes {
/**
* Set this key to configure a reindex operation as part of a
* batch to be run in series.
*/
queueSettings?: QueueSettings;
}

export interface ReindexOperation extends SavedObjectAttributes {
indexName: string;
newIndexName: string;
Expand All @@ -40,6 +53,15 @@ export interface ReindexOperation extends SavedObjectAttributes {

// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;

/**
* Options for the reindexing strategy.
*
* @remark
* Marked as optional for backwards compatibility. We should still
* be able to handle older ReindexOperation objects.
*/
reindexOptions?: ReindexOptions;
}

export type ReindexSavedObject = SavedObject<ReindexOperation>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ export const esVersionCheck = async (
}
};

export const versionCheckHandlerWrapper = (handler: RequestHandler<any, any, any>) => async (
export const versionCheckHandlerWrapper = <P, Q, B>(handler: RequestHandler<P, Q, B>) => async (
ctx: RequestHandlerContext,
request: KibanaRequest,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) => {
const errorResponse = await esVersionCheck(ctx, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ReindexAlreadyInProgress,
MultipleReindexJobsFound,
CannotReindexSystemIndexInCurrent,
ReindexCannotBeCancelled,
} from './error_symbols';

export class ReindexError extends Error {
Expand All @@ -34,4 +35,5 @@ export const error = {
reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress),
reindexSystemIndex: createErrorFactory(CannotReindexSystemIndexInCurrent),
multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound),
reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled),
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export const ReindexTaskFailed = Symbol('ReindexTaskFailed');
export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted');
export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress');
export const CannotReindexSystemIndexInCurrent = Symbol('CannotReindexSystemIndexInCurrent');
export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled');

export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound');
56 changes: 56 additions & 0 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flow } from 'fp-ts/lib/function';
import { ReindexSavedObject } from '../../../common/types';

export interface SortedReindexSavedObjects {
/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* have no inherent order and are considered to be processed in parallel.
*/
parallel: ReindexSavedObject[];

/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* are consistently ordered (see {@link orderQueuedReindexOperations}) and should be
* processed in order.
*/
queue: ReindexSavedObject[];
}

const sortReindexOperations = (ops: ReindexSavedObject[]): SortedReindexSavedObjects => {
const parallel: ReindexSavedObject[] = [];
const queue: ReindexSavedObject[] = [];
for (const op of ops) {
if (op.attributes.reindexOptions?.queueSettings) {
queue.push(op);
} else {
parallel.push(op);
}
}

return {
parallel,
queue,
};
};
const orderQueuedReindexOperations = ({
parallel,
queue,
}: SortedReindexSavedObjects): SortedReindexSavedObjects => ({
parallel,
// Sort asc
queue: queue.sort(
(a, b) =>
a.attributes.reindexOptions!.queueSettings!.queuedAt -
b.attributes.reindexOptions!.queueSettings!.queuedAt
),
});

export const sortAndOrderReindexOperations = flow(
sortReindexOperations,
orderQueuedReindexOperations
);
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
IndexGroup,
REINDEX_OP_TYPE,
ReindexOperation,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand All @@ -34,8 +35,9 @@ export interface ReindexActions {
/**
* Creates a new reindexOp, does not perform any pre-flight checks.
* @param indexName
* @param opts Options for the reindex operation
*/
createReindexOp(indexName: string): Promise<ReindexSavedObject>;
createReindexOp(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Deletes a reindexOp.
Expand Down Expand Up @@ -156,7 +158,7 @@ export const reindexActionsFactory = (

// ----- Public interface
return {
async createReindexOp(indexName: string) {
async createReindexOp(indexName: string, opts?: ReindexOptions) {
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
indexName,
newIndexName: generateNewIndexName(indexName),
Expand All @@ -167,6 +169,7 @@ export const reindexActionsFactory = (
reindexTaskPercComplete: null,
errorMessage: null,
runningReindexCount: null,
reindexOptions: opts,
});
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ describe('reindexService', () => {

await service.createReindexOperation('myIndex');

expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex');
expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex', undefined);
});

it('fails if index does not exist', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { first } from 'rxjs/operators';

import {
IndexGroup,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
Expand Down Expand Up @@ -54,8 +55,9 @@ export interface ReindexService {
/**
* Creates a new reindex operation for a given index.
* @param indexName
* @param opts
*/
createReindexOperation(indexName: string): Promise<ReindexSavedObject>;
createReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Retrieves all reindex operations that have the given status.
Expand Down Expand Up @@ -86,8 +88,9 @@ export interface ReindexService {
/**
* Resumes the paused reindex operation for a given index.
* @param indexName
* @param opts As with {@link createReindexOperation} we support this setting.
*/
resumeReindexOperation(indexName: string): Promise<ReindexSavedObject>;
resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;

/**
* Cancel an in-progress reindex operation for a given index. Only allowed when the
Expand Down Expand Up @@ -537,7 +540,7 @@ export const reindexServiceFactory = (
}
},

async createReindexOperation(indexName: string) {
async createReindexOperation(indexName: string, opts?: ReindexOptions) {
if (isSystemIndex(indexName)) {
throw error.reindexSystemIndex(
`Reindexing system indices are not yet supported within this major version. Upgrade to the latest ${CURRENT_MAJOR_VERSION}.x minor version.`
Expand Down Expand Up @@ -565,7 +568,7 @@ export const reindexServiceFactory = (
}
}

return actions.createReindexOp(indexName);
return actions.createReindexOp(indexName, opts);
},

async findReindexOperation(indexName: string) {
Expand Down Expand Up @@ -653,7 +656,7 @@ export const reindexServiceFactory = (
});
},

async resumeReindexOperation(indexName: string) {
async resumeReindexOperation(indexName: string, opts?: ReindexOptions) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
Expand All @@ -668,27 +671,32 @@ export const reindexServiceFactory = (
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}

return actions.updateReindexOp(op, { status: ReindexStatus.inProgress });
return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: opts,
});
});
},

async cancelReindexing(indexName: string) {
const reindexOp = await this.findReindexOperation(indexName);

if (!reindexOp) {
throw new Error(`No reindex operation found for index ${indexName}`);
throw error.indexNotFound(`No reindex operation found for index ${indexName}`);
} else if (reindexOp.attributes.status !== ReindexStatus.inProgress) {
throw new Error(`Reindex operation is not in progress`);
throw error.reindexCannotBeCancelled(`Reindex operation is not in progress`);
} else if (reindexOp.attributes.lastCompletedStep !== ReindexStep.reindexStarted) {
throw new Error(`Reindex operation is not current waiting for reindex task to complete`);
throw error.reindexCannotBeCancelled(
`Reindex operation is not currently waiting for reindex task to complete`
);
}

const resp = await callAsUser('tasks.cancel', {
taskId: reindexOp.attributes.reindexTaskId,
});

if (resp.node_failures && resp.node_failures.length > 0) {
throw new Error(`Could not cancel reindex.`);
throw error.reindexCannotBeCancelled(`Could not cancel reindex.`);
}

return reindexOp;
Expand Down
38 changes: 27 additions & 11 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
*/
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';

import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations } from './op_utils';

const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
Expand Down Expand Up @@ -107,15 +107,17 @@ export class ReindexWorker {
private startUpdateOperationLoop = async () => {
this.updateOperationLoopRunning = true;

while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);
try {
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);

// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
}
} finally {
this.updateOperationLoopRunning = false;
}

this.updateOperationLoopRunning = false;
};

private pollForOperations = async () => {
Expand All @@ -128,14 +130,28 @@ export class ReindexWorker {
}
};

private refresh = async () => {
private updateInProgressOps = async () => {
try {
this.inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);

const [firstOpInQueue] = queue;

if (firstOpInQueue) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
}

this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
} catch (e) {
this.log.debug(`Could not fetch reindex operations from Elasticsearch`);
this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`);
this.inProgressOps = [];
}
};

private refresh = async () => {
await this.updateInProgressOps();
// If there are operations in progress and we're not already updating operations, kick off the update loop
if (!this.updateOperationLoopRunning) {
this.startUpdateOperationLoop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices';
Loading