Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { kibanaResponseFactory, RequestHandler } from 'src/core/server';

import { errors as esErrors } from '@elastic/elasticsearch';
import { handleEsError } from '../shared_imports';
import { createMockRouter, MockRouter, routeHandlerContextMock } from './__mocks__/routes.mock';
import { createRequestMock } from './__mocks__/request.mock';
Expand Down Expand Up @@ -283,18 +284,15 @@ describe('ML snapshots APIs', () => {
});

(
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.tasks.list as jest.Mock
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.transport
.request as jest.Mock
).mockResolvedValue({
body: {
nodes: {
[NODE_ID]: {
tasks: {
[`${NODE_ID}:12345`]: {
description: `job-snapshot-upgrade-${JOB_ID}-${SNAPSHOT_ID}`,
},
},
model_snapshot_upgrades: [
{
state: 'loading_old_state',
},
},
],
},
});

Expand All @@ -321,7 +319,7 @@ describe('ML snapshots APIs', () => {
});
});

it('returns "complete" status if snapshot upgrade has completed', async () => {
it('fails when snapshot upgrade status returns has status="failed"', async () => {
(
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.ml
.getModelSnapshots as jest.Mock
Expand Down Expand Up @@ -359,17 +357,77 @@ describe('ML snapshots APIs', () => {
});

(
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.tasks.list as jest.Mock
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.transport
.request as jest.Mock
).mockResolvedValue({
body: {
nodes: {
[NODE_ID]: {
tasks: {},
model_snapshot_upgrades: [
{
state: 'failed',
},
],
},
});

const resp = await routeDependencies.router.getHandler({
method: 'get',
pathPattern: '/api/upgrade_assistant/ml_snapshots/{jobId}/{snapshotId}',
})(
routeHandlerContextMock,
createRequestMock({
params: {
snapshotId: SNAPSHOT_ID,
jobId: JOB_ID,
},
}),
kibanaResponseFactory
);

expect(resp.status).toEqual(500);
});

it('returns "complete" status if snapshot upgrade has completed', async () => {
(
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.ml
.getModelSnapshots as jest.Mock
).mockResolvedValue({
body: {
count: 1,
model_snapshots: [
{
job_id: JOB_ID,
min_version: '6.4.0',
timestamp: 1575402237000,
description: 'State persisted due to job close at 2019-12-03T19:43:57+0000',
snapshot_id: SNAPSHOT_ID,
snapshot_doc_count: 1,
model_size_stats: {},
latest_record_time_stamp: 1576971072000,
latest_result_time_stamp: 1576965600000,
retain: false,
},
],
},
});

(routeHandlerContextMock.core.savedObjects.client.find as jest.Mock).mockResolvedValue({
total: 1,
saved_objects: [
{
attributes: {
nodeId: NODE_ID,
jobId: JOB_ID,
snapshotId: SNAPSHOT_ID,
},
},
],
});

(
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.transport
.request as jest.Mock
).mockRejectedValue(new esErrors.ResponseError({ statusCode: 404 } as any));

(
routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.migration
.deprecations as jest.Mock
Expand Down
78 changes: 52 additions & 26 deletions x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
* 2.0.
*/

import { i18n } from '@kbn/i18n';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { ApiResponse } from '@elastic/elasticsearch';
import { schema } from '@kbn/config-schema';
import { IScopedClusterClient, SavedObjectsClientContract } from 'kibana/server';

import { API_BASE_PATH } from '../../common/constants';
import { MlOperation, ML_UPGRADE_OP_TYPE } from '../../common/types';
import { versionCheckHandlerWrapper } from '../lib/es_version_precheck';
Expand Down Expand Up @@ -98,6 +101,35 @@ const verifySnapshotUpgrade = async (
}
};

interface ModelSnapshotUpgradeStatus {
model_snapshot_upgrades: Array<{
state: 'saving_new_state' | 'loading_old_state' | 'stopped' | 'failed';
}>;
}

const getModelSnapshotUpgradeStatus = async (
esClient: IScopedClusterClient,
jobId: string,
snapshotId: string
) => {
try {
const { body } = (await esClient.asCurrentUser.transport.request({
method: 'GET',
path: `/_ml/anomaly_detectors/${jobId}/model_snapshots/${snapshotId}/_upgrade/_stats`,
})) as ApiResponse<ModelSnapshotUpgradeStatus>;

return body && body.model_snapshot_upgrades[0];
} catch (err) {
// If the api returns a 404 then it means that the model snapshot upgrade that was started
// doesn't exist. Since the start migration call returned success, this means the upgrade must have
// completed, so the upgrade assistant can continue to use its current logic. Otherwise we re-throw
// the exception so that it can be caught at route level.
if (err.statusCode !== 404) {
throw err;
}
}
};

export function registerMlSnapshotRoutes({ router, lib: { handleEsError } }: RouteDependencies) {
// Upgrade ML model snapshot
router.post(
Expand Down Expand Up @@ -198,43 +230,37 @@ export function registerMlSnapshotRoutes({ router, lib: { handleEsError } }: Rou
});
}

const upgradeStatus = await getModelSnapshotUpgradeStatus(esClient, jobId, snapshotId);
// Create snapshotInfo payload to send back in the response
const snapshotOp = foundSnapshots.saved_objects[0];
const { nodeId } = snapshotOp.attributes;

// Now that we have the node ID, check the upgrade snapshot task progress
const { body: taskResponse } = await esClient.asCurrentUser.tasks.list({
nodes: [nodeId],
actions: 'xpack/ml/job/snapshot/upgrade',
detailed: true, // necessary in order to filter if there are more than 1 snapshot upgrades in progress
});

const nodeTaskInfo = taskResponse?.nodes && taskResponse!.nodes[nodeId];
const snapshotInfo: MlOperation = {
...snapshotOp.attributes,
};

if (nodeTaskInfo) {
// Find the correct snapshot task ID based on the task description
const snapshotTaskId = Object.keys(nodeTaskInfo.tasks).find((task) => {
// The description is in the format of "job-snapshot-upgrade-<job_id>-<snapshot_id>"
const taskDescription = nodeTaskInfo.tasks[task].description;
const taskSnapshotAndJobIds = taskDescription!.replace('job-snapshot-upgrade-', '');
const taskSnapshotAndJobIdParts = taskSnapshotAndJobIds.split('-');
const taskSnapshotId =
taskSnapshotAndJobIdParts[taskSnapshotAndJobIdParts.length - 1];
const taskJobId = taskSnapshotAndJobIdParts.slice(0, 1).join('-');

return taskSnapshotId === snapshotId && taskJobId === jobId;
});

// If the snapshot task exists, assume the upgrade is in progress
if (snapshotTaskId && nodeTaskInfo.tasks[snapshotTaskId]) {
if (upgradeStatus) {
if (
upgradeStatus.state === 'loading_old_state' ||
upgradeStatus.state === 'saving_new_state'
) {
return response.ok({
body: {
...snapshotInfo,
status: 'in_progress',
},
});
} else if (upgradeStatus.state === 'failed') {
return response.customError({
statusCode: 500,
body: {
message: i18n.translate(
'xpack.upgradeAssistant.ml_snapshots.modelSnapshotUpgradeFailed',
{
defaultMessage:
"The upgrade that was started for this model snapshot doesn't exist anymore.",
}
),
},
});
} else {
// The task ID was not found; verify the deprecation was resolved
const { isSuccessful: isSnapshotDeprecationResolved, error: upgradeSnapshotError } =
Expand Down