Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
*/

import Boom from '@hapi/boom';
import { Logger } from '@kbn/core/server';
import { ElasticsearchClient, Logger } from '@kbn/core/server';
import { snakeCase } from 'lodash';
import moment from 'moment';
import uuid from 'uuid/v4';
import { ProcessorEvent } from '@kbn/observability-plugin/common';
import { waitForIndexStatus } from '@kbn/core-saved-objects-migration-server-internal';
import { ML_ERRORS } from '../../../common/anomaly_detection';
import { METRICSET_NAME, PROCESSOR_EVENT } from '../../../common/es_fields/apm';
import { Environment } from '../../../common/environment_rt';
Expand All @@ -21,13 +22,17 @@ import { APM_ML_JOB_GROUP, ML_MODULE_ID_APM_TRANSACTION } from './constants';
import { getAnomalyDetectionJobs } from './get_anomaly_detection_jobs';
import { ApmIndicesConfig } from '../../routes/settings/apm_indices/get_apm_indices';

const DEFAULT_TIMEOUT = '60s';

export async function createAnomalyDetectionJobs({
mlClient,
esClient,
indices,
environments,
logger,
}: {
mlClient?: MlClient;
esClient: ElasticsearchClient;
indices: ApmIndicesConfig;
environments: Environment[];
logger: Logger;
Expand All @@ -51,19 +56,35 @@ export async function createAnomalyDetectionJobs({
);

const apmMetricIndex = indices.metric;
const responses = await Promise.all(
uniqueMlJobEnvs.map((environment) =>
createAnomalyDetectionJob({ mlClient, environment, apmMetricIndex })
)
);
const responses = [];
const failedJobs = [];
// Avoid the creation of multiple ml jobs in parallel
// https://github.com/elastic/elasticsearch/issues/36271
for (const environment of uniqueMlJobEnvs) {
try {
responses.push(
await createAnomalyDetectionJob({
mlClient,
esClient,
environment,
apmMetricIndex,
})
);
} catch (e) {
if (!e.id || !e.error) {
throw e;
}
failedJobs.push({ id: e.id, error: e.error });
}
}

const jobResponses = responses.flatMap((response) => response.jobs);
const failedJobs = jobResponses.filter(({ success }) => !success);

if (failedJobs.length > 0) {
const errors = failedJobs.map(({ id, error }) => ({ id, error }));
throw new Error(
`An error occurred while creating ML jobs: ${JSON.stringify(errors)}`
`An error occurred while creating ML jobs: ${JSON.stringify(
failedJobs
)}`
);
}

Expand All @@ -73,17 +94,19 @@ export async function createAnomalyDetectionJobs({

async function createAnomalyDetectionJob({
mlClient,
esClient,
environment,
apmMetricIndex,
}: {
mlClient: Required<MlClient>;
esClient: ElasticsearchClient;
environment: string;
apmMetricIndex: string;
}) {
return withApmSpan('create_anomaly_detection_job', async () => {
const randomToken = uuid().substr(-4);

return mlClient.modules.setup({
const anomalyDetectionJob = mlClient.modules.setup({
moduleId: ML_MODULE_ID_APM_TRANSACTION,
prefix: `${APM_ML_JOB_GROUP}-${snakeCase(environment)}-${randomToken}-`,
groups: [APM_ML_JOB_GROUP],
Expand Down Expand Up @@ -112,6 +135,15 @@ async function createAnomalyDetectionJob({
},
],
});

waitForIndexStatus({
client: esClient,
index: '.ml-*',
timeout: DEFAULT_TIMEOUT,
status: 'yellow',
});

return anomalyDetectionJob;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const createAnomalyDetectionJobsRoute = createApmServerRoute({
const { environments } = params.body;
const licensingContext = await context.licensing;
const coreContext = await context.core;
const esClient = (await context.core).elasticsearch.client;

const [mlClient, indices] = await Promise.all([
getMlClient(resources),
Expand All @@ -88,6 +89,7 @@ const createAnomalyDetectionJobsRoute = createApmServerRoute({

await createAnomalyDetectionJobs({
mlClient,
esClient: esClient.asCurrentUser,
indices,
environments,
logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ export async function updateToV3({
);
}

await createAnomalyDetectionJobs({ mlClient, indices, environments, logger });
await createAnomalyDetectionJobs({
mlClient,
esClient,
indices,
environments,
logger,
});

return true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ async function createAndRunApmMlJob({
}
);

await es.cluster.health({ index: '.ml-state-*', wait_for_status: 'yellow' });
await es.cluster.health({ index: '.ml-*', wait_for_status: 'yellow' });
}