diff --git a/x-pack/platform/plugins/private/upgrade_assistant/common/types.ts b/x-pack/platform/plugins/private/upgrade_assistant/common/types.ts index 62ea8cc8376a9..5f3a9ce77a7a1 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/common/types.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/common/types.ts @@ -115,6 +115,7 @@ export interface ReindexOperation { errorMessage: string | null; // This field is only used for the singleton IndexConsumerType documents. runningReindexCount: number | null; + rollupJob?: string; /** * The original index settings to set after reindex is completed. diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.test.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.test.ts index a02f3a2ca0b9d..ff1ab17f59441 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.test.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.test.ts @@ -6,7 +6,7 @@ */ import { SavedObjectsErrorHelpers } from '@kbn/core/server'; -import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; +import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; import type { ScopedClusterClientMock } from '@kbn/core-elasticsearch-client-server-mocks'; import moment from 'moment'; @@ -22,10 +22,15 @@ import { getMockVersionInfo } from '../__fixtures__/version'; const { currentMajor, prevMajor } = getMockVersionInfo(); +jest.mock('../rollup_job', () => ({ + getRollupJobByIndexName: jest.fn(), +})); + describe('ReindexActions', () => { let client: jest.Mocked; let clusterClient: ScopedClusterClientMock; let actions: ReindexActions; + const log = loggingSystemMock.createLogger(); const unimplemented = (name: string) => () => Promise.reject(`Mock function ${name} was not implemented!`); @@ -45,7 +50,7 @@ describe('ReindexActions', () => { ) as any, }; clusterClient = elasticsearchServiceMock.createScopedClusterClient(); - actions = reindexActionsFactory(client, clusterClient.asCurrentUser); + actions = reindexActionsFactory(client, clusterClient.asCurrentUser, log); }); describe('createReindexOp', () => { diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.ts index ce588e8bdb47a..04c0615e24dea 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_actions.ts @@ -11,6 +11,7 @@ import { SavedObjectsFindResponse, SavedObjectsClientContract, ElasticsearchClient, + Logger, } from '@kbn/core/server'; import { REINDEX_OP_TYPE, @@ -22,6 +23,7 @@ import { } from '../../../common/types'; import { generateNewIndexName } from './index_settings'; import { FlatSettings } from './types'; +import { getRollupJobByIndexName } from '../rollup_job'; // TODO: base on elasticsearch.requestTimeout? export const LOCK_WINDOW = moment.duration(90, 'seconds'); @@ -84,7 +86,8 @@ export interface ReindexActions { export const reindexActionsFactory = ( client: SavedObjectsClientContract, - esClient: ElasticsearchClient + esClient: ElasticsearchClient, + log: Logger ): ReindexActions => { // ----- Internal functions const isLocked = (reindexOp: ReindexSavedObject) => { @@ -125,6 +128,9 @@ export const reindexActionsFactory = ( // ----- Public interface return { async createReindexOp(indexName: string, opts?: ReindexOptions) { + // gets rollup job if it exists and needs stopping, otherwise returns undefined + const rollupJob = await getRollupJobByIndexName(esClient, log, indexName); + return client.create(REINDEX_OP_TYPE, { indexName, newIndexName: generateNewIndexName(indexName), @@ -136,6 +142,7 @@ export const reindexActionsFactory = ( errorMessage: null, runningReindexCount: null, reindexOptions: opts, + rollupJob, }); }, diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_service.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_service.ts index e05e0323b3ee8..2f3c3103e1fae 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_service.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/reindex_service.ts @@ -169,7 +169,12 @@ export const reindexServiceFactory = ( * @param reindexOp */ const setReadonly = async (reindexOp: ReindexSavedObject) => { - const { indexName } = reindexOp.attributes; + const { indexName, rollupJob } = reindexOp.attributes; + + if (rollupJob) { + await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true }); + } + const putReadonly = await esClient.indices.putSettings({ index: indexName, body: { blocks: { write: true } }, @@ -428,6 +433,11 @@ export const reindexServiceFactory = ( await esClient.indices.close({ index: indexName }); } + if (reindexOp.attributes.rollupJob) { + // start the rollup job. rollupJob is undefined if the rollup job is stopped + await esClient.rollup.startJob({ id: reindexOp.attributes.rollupJob }); + } + return actions.updateReindexOp(reindexOp, { lastCompletedStep: ReindexStep.aliasCreated, }); diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/worker.ts index 902c7e83c59b4..c8040d4a0c017 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/reindexing/worker.ts @@ -88,7 +88,7 @@ export class ReindexWorker { this.reindexService = reindexServiceFactory( callAsInternalUser, - reindexActionsFactory(this.client, callAsInternalUser), + reindexActionsFactory(this.client, callAsInternalUser, this.log), log, this.licensing ); @@ -173,7 +173,7 @@ export class ReindexWorker { const fakeRequest: FakeRequest = { headers: credential }; const scopedClusterClient = this.clusterClient.asScoped(fakeRequest); const callAsCurrentUser = scopedClusterClient.asCurrentUser; - const actions = reindexActionsFactory(this.client, callAsCurrentUser); + const actions = reindexActionsFactory(this.client, callAsCurrentUser, this.log); return reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing); }; diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/rollup_job.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/rollup_job.ts new file mode 100644 index 0000000000000..e331b28091a33 --- /dev/null +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/rollup_job.ts @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { + RollupGetRollupIndexCapsResponse, + RollupGetJobsResponse, +} from '@elastic/elasticsearch/lib/api/types'; + +export async function getRollupJobByIndexName( + esClient: ElasticsearchClient, + log: Logger, + index: string +) { + let rollupCaps: RollupGetRollupIndexCapsResponse; + + try { + rollupCaps = await esClient.rollup.getRollupIndexCaps({ index }, { ignore: [404] }); + // may catch if not found in some circumstances, such as a closed index, etc + // would be nice to handle the error better but little info is provided + } catch (e) { + log.warn(`Get rollup index capabilities failed: ${e}`); + return; + } + + const rollupIndices = Object.keys(rollupCaps); + let rollupJob: string | undefined; + + // there should only be one job + if (rollupIndices.length === 1) { + rollupJob = rollupCaps[rollupIndices[0]].rollup_jobs[0].job_id; + let jobs: RollupGetJobsResponse; + + try { + jobs = await esClient.rollup.getJobs({ id: rollupJob }, { ignore: [404] }); + // may catch if not found in some circumstances, such as a closed index, etc + // would be nice to handle the error better but little info is provided + } catch (e) { + log.warn(`Get rollup job failed: ${e}`); + return; + } + + // there can only be one job. If its stopped then we don't need rollup handling + if ( + // zero jobs shouldn't happen but we can handle it gracefully + jobs.jobs.length === 0 || + // rollup job is stopped so we can treat it like a regular index + (jobs.jobs.length === 1 && jobs.jobs[0].status.job_state === 'stopped') + ) { + rollupJob = undefined; + // this shouldn't be possible but just in case + } else if (jobs.jobs.length > 1) { + throw new Error(`Multiple jobs returned for a single rollup job id: + ${rollupJob}`); + } + // this shouldn't be possible but just in case + } else if (rollupIndices.length > 1) { + throw new Error(`Multiple indices returned for a single index name: + ${index}`); + } + + return rollupJob; +} diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.ts index ec466dc544237..860185bc1aa9c 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/lib/update_index/index.ts @@ -5,13 +5,15 @@ * 2.0. */ -import type { ElasticsearchClient } from '@kbn/core/server'; +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; import type { UpdateIndexOperation } from '../../../common/update_index'; +import { getRollupJobByIndexName } from '../rollup_job'; export interface UpdateIndexParams { esClient: ElasticsearchClient; index: string; operations: UpdateIndexOperation[]; + log: Logger; } /** @@ -20,12 +22,18 @@ export interface UpdateIndexParams { * @param index The index to update * @param operations The operations to perform on the specified index */ -export async function updateIndex({ esClient, index, operations }: UpdateIndexParams) { +export async function updateIndex({ esClient, index, operations, log }: UpdateIndexParams) { for (const operation of operations) { let res; switch (operation) { case 'blockWrite': { + // stop related rollup job if it exists + const rollupJob = await getRollupJobByIndexName(esClient, log, index); + if (rollupJob) { + await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true }); + } + res = await esClient.indices.addBlock({ index, block: 'write' }); break; } diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/es_deprecations.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/es_deprecations.ts index 132d1329faa25..b42ea170d2234 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/es_deprecations.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/es_deprecations.ts @@ -41,7 +41,7 @@ export function registerESDeprecationRoutes({ dataSourceExclusions, }); const asCurrentUser = client.asCurrentUser; - const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser); + const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser, log); const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing); const indexNames = [...status.migrationsDeprecations, ...status.enrichedHealthIndicators] .filter(({ index }) => typeof index !== 'undefined') diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/batch_reindex_indices.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/batch_reindex_indices.ts index 31857ed979c07..6abc8d4e40ee7 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/batch_reindex_indices.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/batch_reindex_indices.ts @@ -57,7 +57,8 @@ export function registerBatchReindexIndicesRoutes( const callAsCurrentUser = esClient.asCurrentUser; const reindexActions = reindexActionsFactory( getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }), - callAsCurrentUser + callAsCurrentUser, + log ); try { const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress); diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts index 970a11b3eb048..7808a0885c0ae 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_handler.ts @@ -49,7 +49,7 @@ export const reindexHandler = async ({ security, }: ReindexHandlerArgs): Promise => { const callAsCurrentUser = dataClient.asCurrentUser; - const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser); + const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser, log); const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing); if (!(await reindexService.hasRequiredPrivileges(indexName))) { diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts index ae3d66a84ec2c..2b2c9cdabb2b0 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/reindex_indices/reindex_indices.ts @@ -113,7 +113,8 @@ export function registerReindexIndicesRoutes( const asCurrentUser = esClient.asCurrentUser; const reindexActions = reindexActionsFactory( getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }), - asCurrentUser + asCurrentUser, + log ); const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing); @@ -184,7 +185,8 @@ export function registerReindexIndicesRoutes( const callAsCurrentUser = esClient.asCurrentUser; const reindexActions = reindexActionsFactory( getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }), - callAsCurrentUser + callAsCurrentUser, + log ); const reindexService = reindexServiceFactory( callAsCurrentUser, diff --git a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/update_index.ts b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/update_index.ts index 46f30ead65dfb..9df36e9bf74e7 100644 --- a/x-pack/platform/plugins/private/upgrade_assistant/server/routes/update_index.ts +++ b/x-pack/platform/plugins/private/upgrade_assistant/server/routes/update_index.ts @@ -13,7 +13,11 @@ import { versionCheckHandlerWrapper } from '../lib/es_version_precheck'; import type { RouteDependencies } from '../types'; import { updateIndex } from '../lib/update_index'; -export function registerUpdateIndexRoute({ router, lib: { handleEsError } }: RouteDependencies) { +export function registerUpdateIndexRoute({ + router, + log, + lib: { handleEsError }, +}: RouteDependencies) { const BASE_PATH = `${API_BASE_PATH}/update_index`; router.post( { @@ -46,7 +50,7 @@ export function registerUpdateIndexRoute({ router, lib: { handleEsError } }: Rou const { index } = request.params; const { operations } = request.body; try { - await updateIndex({ esClient: client.asCurrentUser, index, operations }); + await updateIndex({ esClient: client.asCurrentUser, index, operations, log }); return response.ok(); } catch (err) { if (err instanceof errors.ResponseError) {