diff --git a/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.test.ts b/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.test.ts index 995e3a46cef0e..603a18f2274b1 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.test.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.test.ts @@ -7,6 +7,7 @@ import { kibanaResponseFactory, RequestHandler } from 'src/core/server'; +import { errors as esErrors } from '@elastic/elasticsearch'; import { handleEsError } from '../shared_imports'; import { createMockRouter, MockRouter, routeHandlerContextMock } from './__mocks__/routes.mock'; import { createRequestMock } from './__mocks__/request.mock'; @@ -283,18 +284,15 @@ describe('ML snapshots APIs', () => { }); ( - routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.tasks.list as jest.Mock + routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.transport + .request as jest.Mock ).mockResolvedValue({ body: { - nodes: { - [NODE_ID]: { - tasks: { - [`${NODE_ID}:12345`]: { - description: `job-snapshot-upgrade-${JOB_ID}-${SNAPSHOT_ID}`, - }, - }, + model_snapshot_upgrades: [ + { + state: 'loading_old_state', }, - }, + ], }, }); @@ -321,7 +319,7 @@ describe('ML snapshots APIs', () => { }); }); - it('returns "complete" status if snapshot upgrade has completed', async () => { + it('fails when snapshot upgrade status returns has status="failed"', async () => { ( routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.ml .getModelSnapshots as jest.Mock @@ -359,17 +357,77 @@ describe('ML snapshots APIs', () => { }); ( - routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.tasks.list as jest.Mock + routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.transport + .request as jest.Mock ).mockResolvedValue({ body: { - nodes: { - [NODE_ID]: { - tasks: {}, + model_snapshot_upgrades: [ + { + state: 'failed', }, + ], + }, + }); + + const resp = await routeDependencies.router.getHandler({ + method: 'get', + pathPattern: '/api/upgrade_assistant/ml_snapshots/{jobId}/{snapshotId}', + })( + routeHandlerContextMock, + createRequestMock({ + params: { + snapshotId: SNAPSHOT_ID, + jobId: JOB_ID, }, + }), + kibanaResponseFactory + ); + + expect(resp.status).toEqual(500); + }); + + it('returns "complete" status if snapshot upgrade has completed', async () => { + ( + routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.ml + .getModelSnapshots as jest.Mock + ).mockResolvedValue({ + body: { + count: 1, + model_snapshots: [ + { + job_id: JOB_ID, + min_version: '6.4.0', + timestamp: 1575402237000, + description: 'State persisted due to job close at 2019-12-03T19:43:57+0000', + snapshot_id: SNAPSHOT_ID, + snapshot_doc_count: 1, + model_size_stats: {}, + latest_record_time_stamp: 1576971072000, + latest_result_time_stamp: 1576965600000, + retain: false, + }, + ], }, }); + (routeHandlerContextMock.core.savedObjects.client.find as jest.Mock).mockResolvedValue({ + total: 1, + saved_objects: [ + { + attributes: { + nodeId: NODE_ID, + jobId: JOB_ID, + snapshotId: SNAPSHOT_ID, + }, + }, + ], + }); + + ( + routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.transport + .request as jest.Mock + ).mockRejectedValue(new esErrors.ResponseError({ statusCode: 404 } as any)); + ( routeHandlerContextMock.core.elasticsearch.client.asCurrentUser.migration .deprecations as jest.Mock diff --git a/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.ts b/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.ts index 1d30817ff1b75..036ff81179434 100644 --- a/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.ts +++ b/x-pack/plugins/upgrade_assistant/server/routes/ml_snapshots.ts @@ -5,9 +5,12 @@ * 2.0. */ +import { i18n } from '@kbn/i18n'; import { ResponseError } from '@elastic/elasticsearch/lib/errors'; +import { ApiResponse } from '@elastic/elasticsearch'; import { schema } from '@kbn/config-schema'; import { IScopedClusterClient, SavedObjectsClientContract } from 'kibana/server'; + import { API_BASE_PATH } from '../../common/constants'; import { MlOperation, ML_UPGRADE_OP_TYPE } from '../../common/types'; import { versionCheckHandlerWrapper } from '../lib/es_version_precheck'; @@ -98,6 +101,35 @@ const verifySnapshotUpgrade = async ( } }; +interface ModelSnapshotUpgradeStatus { + model_snapshot_upgrades: Array<{ + state: 'saving_new_state' | 'loading_old_state' | 'stopped' | 'failed'; + }>; +} + +const getModelSnapshotUpgradeStatus = async ( + esClient: IScopedClusterClient, + jobId: string, + snapshotId: string +) => { + try { + const { body } = (await esClient.asCurrentUser.transport.request({ + method: 'GET', + path: `/_ml/anomaly_detectors/${jobId}/model_snapshots/${snapshotId}/_upgrade/_stats`, + })) as ApiResponse; + + return body && body.model_snapshot_upgrades[0]; + } catch (err) { + // If the api returns a 404 then it means that the model snapshot upgrade that was started + // doesn't exist. Since the start migration call returned success, this means the upgrade must have + // completed, so the upgrade assistant can continue to use its current logic. Otherwise we re-throw + // the exception so that it can be caught at route level. + if (err.statusCode !== 404) { + throw err; + } + } +}; + export function registerMlSnapshotRoutes({ router, lib: { handleEsError } }: RouteDependencies) { // Upgrade ML model snapshot router.post( @@ -198,43 +230,37 @@ export function registerMlSnapshotRoutes({ router, lib: { handleEsError } }: Rou }); } + const upgradeStatus = await getModelSnapshotUpgradeStatus(esClient, jobId, snapshotId); + // Create snapshotInfo payload to send back in the response const snapshotOp = foundSnapshots.saved_objects[0]; - const { nodeId } = snapshotOp.attributes; - - // Now that we have the node ID, check the upgrade snapshot task progress - const { body: taskResponse } = await esClient.asCurrentUser.tasks.list({ - nodes: [nodeId], - actions: 'xpack/ml/job/snapshot/upgrade', - detailed: true, // necessary in order to filter if there are more than 1 snapshot upgrades in progress - }); - - const nodeTaskInfo = taskResponse?.nodes && taskResponse!.nodes[nodeId]; const snapshotInfo: MlOperation = { ...snapshotOp.attributes, }; - if (nodeTaskInfo) { - // Find the correct snapshot task ID based on the task description - const snapshotTaskId = Object.keys(nodeTaskInfo.tasks).find((task) => { - // The description is in the format of "job-snapshot-upgrade--" - const taskDescription = nodeTaskInfo.tasks[task].description; - const taskSnapshotAndJobIds = taskDescription!.replace('job-snapshot-upgrade-', ''); - const taskSnapshotAndJobIdParts = taskSnapshotAndJobIds.split('-'); - const taskSnapshotId = - taskSnapshotAndJobIdParts[taskSnapshotAndJobIdParts.length - 1]; - const taskJobId = taskSnapshotAndJobIdParts.slice(0, 1).join('-'); - - return taskSnapshotId === snapshotId && taskJobId === jobId; - }); - - // If the snapshot task exists, assume the upgrade is in progress - if (snapshotTaskId && nodeTaskInfo.tasks[snapshotTaskId]) { + if (upgradeStatus) { + if ( + upgradeStatus.state === 'loading_old_state' || + upgradeStatus.state === 'saving_new_state' + ) { return response.ok({ body: { ...snapshotInfo, status: 'in_progress', }, }); + } else if (upgradeStatus.state === 'failed') { + return response.customError({ + statusCode: 500, + body: { + message: i18n.translate( + 'xpack.upgradeAssistant.ml_snapshots.modelSnapshotUpgradeFailed', + { + defaultMessage: + "The upgrade that was started for this model snapshot doesn't exist anymore.", + } + ), + }, + }); } else { // The task ID was not found; verify the deprecation was resolved const { isSuccessful: isSnapshotDeprecationResolved, error: upgradeSnapshotError } =