Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export interface ReindexOperation {
errorMessage: string | null;
// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;
rollupJob?: string;

/**
* The original index settings to set after reindex is completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks';
import type { ScopedClusterClientMock } from '@kbn/core-elasticsearch-client-server-mocks';
import moment from 'moment';

Expand All @@ -22,10 +22,15 @@ import { getMockVersionInfo } from '../__fixtures__/version';

const { currentMajor, prevMajor } = getMockVersionInfo();

jest.mock('../rollup_job', () => ({
getRollupJobByIndexName: jest.fn(),
}));

describe('ReindexActions', () => {
let client: jest.Mocked<any>;
let clusterClient: ScopedClusterClientMock;
let actions: ReindexActions;
const log = loggingSystemMock.createLogger();

const unimplemented = (name: string) => () =>
Promise.reject(`Mock function ${name} was not implemented!`);
Expand All @@ -45,7 +50,7 @@ describe('ReindexActions', () => {
) as any,
};
clusterClient = elasticsearchServiceMock.createScopedClusterClient();
actions = reindexActionsFactory(client, clusterClient.asCurrentUser);
actions = reindexActionsFactory(client, clusterClient.asCurrentUser, log);
});

describe('createReindexOp', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
SavedObjectsFindResponse,
SavedObjectsClientContract,
ElasticsearchClient,
Logger,
} from '@kbn/core/server';
import {
REINDEX_OP_TYPE,
Expand All @@ -22,6 +23,7 @@ import {
} from '../../../common/types';
import { generateNewIndexName } from './index_settings';
import { FlatSettings } from './types';
import { getRollupJobByIndexName } from '../rollup_job';

// TODO: base on elasticsearch.requestTimeout?
export const LOCK_WINDOW = moment.duration(90, 'seconds');
Expand Down Expand Up @@ -84,7 +86,8 @@ export interface ReindexActions {

export const reindexActionsFactory = (
client: SavedObjectsClientContract,
esClient: ElasticsearchClient
esClient: ElasticsearchClient,
log: Logger
): ReindexActions => {
// ----- Internal functions
const isLocked = (reindexOp: ReindexSavedObject) => {
Expand Down Expand Up @@ -125,6 +128,9 @@ export const reindexActionsFactory = (
// ----- Public interface
return {
async createReindexOp(indexName: string, opts?: ReindexOptions) {
// gets rollup job if it exists and needs stopping, otherwise returns undefined
const rollupJob = await getRollupJobByIndexName(esClient, log, indexName);

return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
indexName,
newIndexName: generateNewIndexName(indexName),
Expand All @@ -136,6 +142,7 @@ export const reindexActionsFactory = (
errorMessage: null,
runningReindexCount: null,
reindexOptions: opts,
rollupJob,
});
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ export const reindexServiceFactory = (
* @param reindexOp
*/
const setReadonly = async (reindexOp: ReindexSavedObject) => {
const { indexName } = reindexOp.attributes;
const { indexName, rollupJob } = reindexOp.attributes;

if (rollupJob) {
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
}

const putReadonly = await esClient.indices.putSettings({
index: indexName,
body: { blocks: { write: true } },
Expand Down Expand Up @@ -428,6 +433,11 @@ export const reindexServiceFactory = (
await esClient.indices.close({ index: indexName });
}

if (reindexOp.attributes.rollupJob) {
// start the rollup job. rollupJob is undefined if the rollup job is stopped
await esClient.rollup.startJob({ id: reindexOp.attributes.rollupJob });
}

return actions.updateReindexOp(reindexOp, {
lastCompletedStep: ReindexStep.aliasCreated,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export class ReindexWorker {

this.reindexService = reindexServiceFactory(
callAsInternalUser,
reindexActionsFactory(this.client, callAsInternalUser),
reindexActionsFactory(this.client, callAsInternalUser, this.log),
log,
this.licensing
);
Expand Down Expand Up @@ -173,7 +173,7 @@ export class ReindexWorker {
const fakeRequest: FakeRequest = { headers: credential };
const scopedClusterClient = this.clusterClient.asScoped(fakeRequest);
const callAsCurrentUser = scopedClusterClient.asCurrentUser;
const actions = reindexActionsFactory(this.client, callAsCurrentUser);
const actions = reindexActionsFactory(this.client, callAsCurrentUser, this.log);
return reindexServiceFactory(callAsCurrentUser, actions, this.log, this.licensing);
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import {
RollupGetRollupIndexCapsResponse,
RollupGetJobsResponse,
} from '@elastic/elasticsearch/lib/api/types';

export async function getRollupJobByIndexName(
esClient: ElasticsearchClient,
log: Logger,
index: string
) {
let rollupCaps: RollupGetRollupIndexCapsResponse;

try {
rollupCaps = await esClient.rollup.getRollupIndexCaps({ index }, { ignore: [404] });
// may catch if not found in some circumstances, such as a closed index, etc
// would be nice to handle the error better but little info is provided
} catch (e) {
log.warn(`Get rollup index capabilities failed: ${e}`);
return;
}

const rollupIndices = Object.keys(rollupCaps);
let rollupJob: string | undefined;

// there should only be one job
if (rollupIndices.length === 1) {
rollupJob = rollupCaps[rollupIndices[0]].rollup_jobs[0].job_id;
let jobs: RollupGetJobsResponse;

try {
jobs = await esClient.rollup.getJobs({ id: rollupJob }, { ignore: [404] });
// may catch if not found in some circumstances, such as a closed index, etc
// would be nice to handle the error better but little info is provided
} catch (e) {
log.warn(`Get rollup job failed: ${e}`);
return;
}

// there can only be one job. If its stopped then we don't need rollup handling
if (
// zero jobs shouldn't happen but we can handle it gracefully
jobs.jobs.length === 0 ||
// rollup job is stopped so we can treat it like a regular index
(jobs.jobs.length === 1 && jobs.jobs[0].status.job_state === 'stopped')
) {
rollupJob = undefined;
// this shouldn't be possible but just in case
} else if (jobs.jobs.length > 1) {
throw new Error(`Multiple jobs returned for a single rollup job id: + ${rollupJob}`);
}
// this shouldn't be possible but just in case
} else if (rollupIndices.length > 1) {
throw new Error(`Multiple indices returned for a single index name: + ${index}`);
}

return rollupJob;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
* 2.0.
*/

import type { ElasticsearchClient } from '@kbn/core/server';
import type { ElasticsearchClient, Logger } from '@kbn/core/server';
import type { UpdateIndexOperation } from '../../../common/update_index';
import { getRollupJobByIndexName } from '../rollup_job';

export interface UpdateIndexParams {
esClient: ElasticsearchClient;
index: string;
operations: UpdateIndexOperation[];
log: Logger;
}

/**
Expand All @@ -20,12 +22,18 @@ export interface UpdateIndexParams {
* @param index The index to update
* @param operations The operations to perform on the specified index
*/
export async function updateIndex({ esClient, index, operations }: UpdateIndexParams) {
export async function updateIndex({ esClient, index, operations, log }: UpdateIndexParams) {
for (const operation of operations) {
let res;

switch (operation) {
case 'blockWrite': {
// stop related rollup job if it exists
const rollupJob = await getRollupJobByIndexName(esClient, log, index);
if (rollupJob) {
await esClient.rollup.stopJob({ id: rollupJob, wait_for_completion: true });
}

res = await esClient.indices.addBlock({ index, block: 'write' });
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function registerESDeprecationRoutes({
dataSourceExclusions,
});
const asCurrentUser = client.asCurrentUser;
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser);
const reindexActions = reindexActionsFactory(savedObjectsClient, asCurrentUser, log);
const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing);
const indexNames = [...status.migrationsDeprecations, ...status.enrichedHealthIndicators]
.filter(({ index }) => typeof index !== 'undefined')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ export function registerBatchReindexIndicesRoutes(
const callAsCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
callAsCurrentUser
callAsCurrentUser,
log
);
try {
const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const reindexHandler = async ({
security,
}: ReindexHandlerArgs): Promise<ReindexOperation> => {
const callAsCurrentUser = dataClient.asCurrentUser;
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser);
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser, log);
const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing);

if (!(await reindexService.hasRequiredPrivileges(indexName))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ export function registerReindexIndicesRoutes(
const asCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
asCurrentUser
asCurrentUser,
log
);
const reindexService = reindexServiceFactory(asCurrentUser, reindexActions, log, licensing);

Expand Down Expand Up @@ -184,7 +185,8 @@ export function registerReindexIndicesRoutes(
const callAsCurrentUser = esClient.asCurrentUser;
const reindexActions = reindexActionsFactory(
getClient({ includedHiddenTypes: [REINDEX_OP_TYPE] }),
callAsCurrentUser
callAsCurrentUser,
log
);
const reindexService = reindexServiceFactory(
callAsCurrentUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import { versionCheckHandlerWrapper } from '../lib/es_version_precheck';
import type { RouteDependencies } from '../types';
import { updateIndex } from '../lib/update_index';

export function registerUpdateIndexRoute({ router, lib: { handleEsError } }: RouteDependencies) {
export function registerUpdateIndexRoute({
router,
log,
lib: { handleEsError },
}: RouteDependencies) {
const BASE_PATH = `${API_BASE_PATH}/update_index`;
router.post(
{
Expand Down Expand Up @@ -46,7 +50,7 @@ export function registerUpdateIndexRoute({ router, lib: { handleEsError } }: Rou
const { index } = request.params;
const { operations } = request.body;
try {
await updateIndex({ esClient: client.asCurrentUser, index, operations });
await updateIndex({ esClient: client.asCurrentUser, index, operations, log });
return response.ok();
} catch (err) {
if (err instanceof errors.ResponseError) {
Expand Down