diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java index 6b0e6e1d824c0..2e0a799ea99a5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java @@ -202,6 +202,10 @@ public SimpleIdsMatcher(String[] tokens) { .collect(Collectors.toList()); } + public SimpleIdsMatcher(String expression) { + this(tokenizeExpression(expression)); + } + /** * Do any of the matchers match {@code id}? * diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeAction.java new file mode 100644 index 0000000000000..18d9168fc962d --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeAction.java @@ -0,0 +1,193 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; + +import java.io.IOException; +import java.util.Objects; + +public class CancelJobModelSnapshotUpgradeAction extends ActionType { + + public static final CancelJobModelSnapshotUpgradeAction INSTANCE = new CancelJobModelSnapshotUpgradeAction(); + + // Even though at the time of writing this action doesn't have a REST endpoint the action name is + // still "admin" rather than "internal". This is because there's no conceptual reason why this + // action couldn't have a REST endpoint in the future, and it's painful to change these action + // names after release. The only difference is that in 7.17 the last remaining transport client + // users will be able to call this endpoint. In 8.x there is no transport client, so in 8.x there + // is no difference between having "admin" and "internal" here in the period before a REST endpoint + // exists. Using "admin" just makes life easier if we ever decide to add a REST endpoint in the + // future. + public static final String NAME = "cluster:admin/xpack/ml/job/model_snapshots/upgrade/cancel"; + + private CancelJobModelSnapshotUpgradeAction() { + super(NAME, Response::new); + } + + public static class Request extends ActionRequest implements ToXContentObject { + + public static final String ALL = "_all"; + + public static final ParseField SNAPSHOT_ID = new ParseField("snapshot_id"); + public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + + static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + static { + PARSER.declareString(Request::setJobId, Job.ID); + PARSER.declareString(Request::setSnapshotId, SNAPSHOT_ID); + PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH); + } + + private String jobId = ALL; + private String snapshotId = ALL; + private boolean allowNoMatch = true; + + public Request() {} + + public Request(String jobId, String snapshotId) { + setJobId(jobId); + setSnapshotId(snapshotId); + } + + public Request(StreamInput in) throws IOException { + super(in); + jobId = in.readString(); + snapshotId = in.readString(); + allowNoMatch = in.readBoolean(); + } + + public final Request setJobId(String jobId) { + this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID); + return this; + } + + public String getJobId() { + return jobId; + } + + public final Request setSnapshotId(String snapshotId) { + this.snapshotId = ExceptionsHelper.requireNonNull(snapshotId, Job.ID); + return this; + } + + public String getSnapshotId() { + return snapshotId; + } + + public boolean allowNoMatch() { + return allowNoMatch; + } + + public Request setAllowNoMatch(boolean allowNoMatch) { + this.allowNoMatch = allowNoMatch; + return this; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(jobId); + out.writeString(snapshotId); + out.writeBoolean(allowNoMatch); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject() + .field(Job.ID.getPreferredName(), jobId) + .field(SNAPSHOT_ID.getPreferredName(), snapshotId) + .field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch) + .endObject(); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, snapshotId, allowNoMatch); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(jobId, other.jobId) && Objects.equals(snapshotId, other.snapshotId) && allowNoMatch == other.allowNoMatch; + } + + @Override + public String toString() { + return Strings.toString(this); + } + } + + public static class Response extends ActionResponse implements Writeable, ToXContentObject { + + private final boolean cancelled; + + public Response(boolean cancelled) { + this.cancelled = cancelled; + } + + public Response(StreamInput in) throws IOException { + cancelled = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(cancelled); + } + + public boolean isCancelled() { + return cancelled; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("cancelled", cancelled); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return cancelled == response.cancelled; + } + + @Override + public int hashCode() { + return Objects.hash(cancelled); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeActionRequestTests.java new file mode 100644 index 0000000000000..bb8b3430212f7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeActionRequestTests.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Request; + +public class CancelJobModelSnapshotUpgradeActionRequestTests extends AbstractSerializingTestCase { + + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20)); + if (randomBoolean()) { + request.setAllowNoMatch(randomBoolean()); + } + return request; + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request doParseInstance(XContentParser parser) { + return Request.PARSER.apply(parser, null); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeActionResponseTests.java new file mode 100644 index 0000000000000..d87ea69ff89af --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CancelJobModelSnapshotUpgradeActionResponseTests.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Response; + +public class CancelJobModelSnapshotUpgradeActionResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Response createTestInstance() { + return new Response(randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return CancelJobModelSnapshotUpgradeAction.Response::new; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index c3b0b2b19e6b3..1fdc55b061c87 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -89,6 +89,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlStatsIndex; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAllocationAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; @@ -183,6 +184,7 @@ import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.template.TemplateUtils; +import org.elasticsearch.xpack.ml.action.TransportCancelJobModelSnapshotUpgradeAction; import org.elasticsearch.xpack.ml.action.TransportCloseJobAction; import org.elasticsearch.xpack.ml.action.TransportCreateTrainedModelAllocationAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction; @@ -1281,6 +1283,7 @@ public List getRestHandlers( new ActionHandler<>(GetTrainedModelsStatsAction.INSTANCE, TransportGetTrainedModelsStatsAction.class), new ActionHandler<>(PutTrainedModelAction.INSTANCE, TransportPutTrainedModelAction.class), new ActionHandler<>(UpgradeJobModelSnapshotAction.INSTANCE, TransportUpgradeJobModelSnapshotAction.class), + new ActionHandler<>(CancelJobModelSnapshotUpgradeAction.INSTANCE, TransportCancelJobModelSnapshotUpgradeAction.class), new ActionHandler<>(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, TransportGetJobModelSnapshotsUpgradeStatsAction.class), new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class), new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class), @@ -1763,16 +1766,28 @@ public void cleanUpFeature( }, unsetResetModeListener::onFailure); // Stop data feeds + ActionListener cancelSnapshotUpgradesListener = ActionListener.wrap( + cancelUpgradesResponse -> { + StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true); + client.execute( + StopDatafeedAction.INSTANCE, + stopDatafeedsReq, + ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> { + logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure); + client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped); + }) + ); + }, + unsetResetModeListener::onFailure + ); + + // Cancel model snapshot upgrades ActionListener stopDeploymentsListener = ActionListener.wrap(acknowledgedResponse -> { - StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true); - client.execute( - StopDatafeedAction.INSTANCE, - stopDatafeedsReq, - ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> { - logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure); - client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped); - }) + CancelJobModelSnapshotUpgradeAction.Request cancelSnapshotUpgradesReq = new CancelJobModelSnapshotUpgradeAction.Request( + "_all", + "_all" ); + client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, cancelSnapshotUpgradesListener); }, unsetResetModeListener::onFailure); // Stop all model deployments diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java new file mode 100644 index 0000000000000..cefdff1870f87 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCancelJobModelSnapshotUpgradeAction.java @@ -0,0 +1,146 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher.SimpleIdsMatcher; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Request; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Response; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskParams; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class TransportCancelJobModelSnapshotUpgradeAction extends HandledTransportAction { + + private static final Logger logger = LogManager.getLogger(TransportCancelJobModelSnapshotUpgradeAction.class); + + private final JobConfigProvider jobConfigProvider; + private final ClusterService clusterService; + private final PersistentTasksService persistentTasksService; + + @Inject + public TransportCancelJobModelSnapshotUpgradeAction( + TransportService transportService, + ActionFilters actionFilters, + JobConfigProvider jobConfigProvider, + ClusterService clusterService, + PersistentTasksService persistentTasksService + ) { + super(CancelJobModelSnapshotUpgradeAction.NAME, transportService, actionFilters, Request::new); + this.jobConfigProvider = jobConfigProvider; + this.clusterService = clusterService; + this.persistentTasksService = persistentTasksService; + } + + @Override + public void doExecute(Task task, Request request, ActionListener listener) { + + logger.debug("[{}] cancel model snapshot [{}] upgrades", request.getJobId(), request.getSnapshotId()); + + // 2. Now that we have the job IDs, find the relevant model snapshot upgrade tasks + ActionListener> expandIdsListener = ActionListener.wrap(jobs -> { + SimpleIdsMatcher matcher = new SimpleIdsMatcher(request.getSnapshotId()); + Set jobIds = jobs.stream().map(Job.Builder::getId).collect(Collectors.toSet()); + PersistentTasksCustomMetadata tasksInProgress = clusterService.state().metadata().custom(PersistentTasksCustomMetadata.TYPE); + // allow_no_match plays no part here. The reason is that we have a principle that stopping + // a stopped entity is a no-op, and upgrades that have already completed won't have a task. + // This is a bit different to jobs and datafeeds, where the entity continues to exist even + // after it's stopped. Upgrades cease to exist after they're stopped so the match validation + // cannot be as thorough. + List> upgradeTasksToCancel = MlTasks.snapshotUpgradeTasks(tasksInProgress) + .stream() + .filter(t -> jobIds.contains(((SnapshotUpgradeTaskParams) t.getParams()).getJobId())) + .filter(t -> matcher.idMatches(((SnapshotUpgradeTaskParams) t.getParams()).getSnapshotId())) + .collect(Collectors.toList()); + removePersistentTasks(request, upgradeTasksToCancel, listener); + }, listener::onFailure); + + // 1. Expand jobs - this will throw if a required job ID match isn't made. Jobs being deleted are included here. + jobConfigProvider.expandJobs(request.getJobId(), request.allowNoMatch(), false, expandIdsListener); + } + + private void removePersistentTasks( + Request request, + List> upgradeTasksToCancel, + ActionListener listener + ) { + final int numberOfTasks = upgradeTasksToCancel.size(); + if (numberOfTasks == 0) { + listener.onResponse(new Response(true)); + return; + } + + final AtomicInteger counter = new AtomicInteger(); + final AtomicArray failures = new AtomicArray<>(numberOfTasks); + + for (PersistentTasksCustomMetadata.PersistentTask task : upgradeTasksToCancel) { + persistentTasksService.sendRemoveRequest(task.getId(), new ActionListener<>() { + @Override + public void onResponse(PersistentTasksCustomMetadata.PersistentTask task) { + if (counter.incrementAndGet() == numberOfTasks) { + sendResponseOrFailure(listener, failures); + } + } + + @Override + public void onFailure(Exception e) { + final int slot = counter.incrementAndGet(); + // Not found is not an error - it just means the upgrade completed before we could cancel it. + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) { + failures.set(slot - 1, e); + } + if (slot == numberOfTasks) { + sendResponseOrFailure(listener, failures); + } + } + + private void sendResponseOrFailure(ActionListener listener, AtomicArray failures) { + List caughtExceptions = failures.asList(); + if (caughtExceptions.isEmpty()) { + listener.onResponse(new Response(true)); + return; + } + + String msg = "Failed to cancel model snapshot upgrade for [" + + request.getSnapshotId() + + "] on job [" + + request.getJobId() + + "]. Total failures [" + + caughtExceptions.size() + + "], rethrowing first, all Exceptions: [" + + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + + "]"; + + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); + listener.onFailure(e); + } + }); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 78a0f543d9ed3..28287c0d95185 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MlConfigIndex; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; @@ -376,9 +377,9 @@ public void deleteJob( ) { final String jobId = request.getJobId(); - // Step 4. When the job has been removed from the cluster state, return a response + // Step 5. When the job has been removed from the config index, return a response // ------- - CheckedConsumer apiResponseHandler = jobDeleted -> { + CheckedConsumer configResponseHandler = jobDeleted -> { if (jobDeleted) { logger.info("Job [" + jobId + "] deleted"); auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED)); @@ -388,29 +389,39 @@ public void deleteJob( } }; - // Step 3. When the physical storage has been deleted, delete the job config document + // Step 4. When the physical storage has been deleted, delete the job config document // ------- // Don't report an error if the document has already been deleted - CheckedConsumer deleteJobStateHandler = response -> jobConfigProvider.deleteJob( + CheckedConsumer removeFromCalendarsHandler = response -> jobConfigProvider.deleteJob( jobId, false, - ActionListener.wrap(deleteResponse -> apiResponseHandler.accept(Boolean.TRUE), listener::onFailure) + ActionListener.wrap(deleteResponse -> configResponseHandler.accept(Boolean.TRUE), listener::onFailure) ); - // Step 2. Remove the job from any calendars - CheckedConsumer removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars( + // Step 3. Remove the job from any calendars + CheckedConsumer deleteJobStateHandler = response -> jobResultsProvider.removeJobFromCalendars( jobId, - ActionListener.wrap(deleteJobStateHandler, listener::onFailure) + ActionListener.wrap(removeFromCalendarsHandler, listener::onFailure) ); - // Step 1. Delete the physical storage - new JobDataDeleter(clientToUse, jobId).deleteJobDocuments( - jobConfigProvider, - indexNameExpressionResolver, - state, - removeFromCalendarsHandler, + // Step 2. Delete the physical storage + ActionListener cancelUpgradesListener = ActionListener.wrap( + r -> new JobDataDeleter(clientToUse, jobId).deleteJobDocuments( + jobConfigProvider, + indexNameExpressionResolver, + state, + deleteJobStateHandler, + listener::onFailure + ), listener::onFailure ); + + // Step 1. Cancel any model snapshot upgrades that might be in progress + clientToUse.execute( + CancelJobModelSnapshotUpgradeAction.INSTANCE, + new CancelJobModelSnapshotUpgradeAction.Request(jobId, "_all"), + cancelUpgradesListener + ); } private void postJobUpdate(UpdateJobAction.Request request, Job updatedJob, ActionListener actionListener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java index 230d798d1d521..3ec162fd0c479 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/JobModelSnapshotUpgrader.java @@ -71,6 +71,9 @@ public final class JobModelSnapshotUpgrader { private final AutodetectProcessFactory autodetectProcessFactory; private final JobResultsPersister jobResultsPersister; private final NativeStorageProvider nativeStorageProvider; + // Not volatile as only used in synchronized methods + private AutodetectProcess process; + private JobSnapshotUpgraderResultProcessor processor; JobModelSnapshotUpgrader( SnapshotUpgradeTask task, @@ -98,11 +101,13 @@ public final class JobModelSnapshotUpgrader { this.snapshotId = task.getSnapshotId(); } - void start() { + synchronized void start() { + task.setJobModelSnapshotUpgrader(this); + // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); - AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess( + process = autodetectProcessFactory.createAutodetectProcess( jobId + "-" + snapshotId, job, params, @@ -119,12 +124,7 @@ void start() { } } ); - JobSnapshotUpgraderResultProcessor processor = new JobSnapshotUpgraderResultProcessor( - jobId, - snapshotId, - jobResultsPersister, - process - ); + processor = new JobSnapshotUpgraderResultProcessor(jobId, snapshotId, jobResultsPersister, process); ProcessWorkerExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); @@ -135,6 +135,8 @@ void start() { // the process too, so that other submitted operations to threadpool are stopped. try { IOUtils.close(process); + process = null; + processor = null; } catch (IOException ioe) { logger.error("Can't close autodetect", ioe); } @@ -159,6 +161,24 @@ void setTaskToFailed(String reason, ActionListener> listener) })); } + public synchronized void killProcess(String reason) { + if (process != null) { + try { + logger.debug("[{}] killing upgrade process for model snapshot [{}]: reason [{}]", jobId, snapshotId, reason); + if (processor != null) { + processor.setProcessKilled(); + } + process.kill(true); + process = null; + processor = null; + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] failed to kill upgrade process for model snapshot [{}]", jobId, snapshotId), e); + } + } else { + logger.warn("[{}] attempt to kill upgrade process for model snapshot [{}] when no such process exists", jobId, snapshotId); + } + } + private class Executor { private final StateStreamer stateStreamer; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTask.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTask.java index 816803bc4ee03..35348d9a05929 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTask.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTask.java @@ -7,18 +7,25 @@ package org.elasticsearch.xpack.ml.job.snapshot.upgrader; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.license.LicensedAllocatedPersistentTask; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.job.process.autodetect.JobModelSnapshotUpgrader; import java.util.Map; public class SnapshotUpgradeTask extends LicensedAllocatedPersistentTask { + private static final Logger logger = LogManager.getLogger(SnapshotUpgradeTask.class); + private final String jobId; private final String snapshotId; + // Not volatile as only used in synchronized methods + private JobModelSnapshotUpgrader jobModelSnapshotUpgrader; public SnapshotUpgradeTask( String jobId, @@ -52,4 +59,18 @@ public String getJobId() { public String getSnapshotId() { return snapshotId; } + + @Override + protected synchronized void onCancelled() { + if (jobModelSnapshotUpgrader != null) { + String reason = getReasonCancelled(); + logger.trace("[{}] Cancelling snapshot upgrade [{}] task because: {}", jobId, snapshotId, reason); + jobModelSnapshotUpgrader.killProcess(reason); + jobModelSnapshotUpgrader = null; + } + } + + public synchronized void setJobModelSnapshotUpgrader(JobModelSnapshotUpgrader jobModelSnapshotUpgrader) { + this.jobModelSnapshotUpgrader = jobModelSnapshotUpgrader; + } } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index b2adac43ece11..ca88afc36d2aa 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -152,6 +152,7 @@ public class Constants { "cluster:admin/xpack/ml/job/model_snapshots/revert", "cluster:admin/xpack/ml/job/model_snapshots/update", "cluster:admin/xpack/ml/job/model_snapshots/upgrade", + "cluster:admin/xpack/ml/job/model_snapshots/upgrade/cancel", "cluster:admin/xpack/ml/job/open", "cluster:admin/xpack/ml/job/persist", "cluster:admin/xpack/ml/job/put", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/upgrade_job_snapshot.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/upgrade_job_snapshot.yml index e0961f134a524..e0281880f0f95 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/upgrade_job_snapshot.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/upgrade_job_snapshot.yml @@ -14,6 +14,9 @@ setup: "data_description" : { "format":"xcontent", "time_field":"time" + }, + "analysis_limits" : { + "model_memory_limit":"20mb" } } @@ -93,10 +96,6 @@ setup: --- "Test existing but corrupt snapshot": - - skip: - version: all - reason: "@AwaitsFix https://github.com/elastic/elasticsearch/issues/81578" - - do: ml.upgrade_job_snapshot: job_id: "upgrade-model-snapshot" @@ -110,3 +109,4 @@ setup: - match: { count: 1 } - match: { model_snapshot_upgrades.0.job_id: "upgrade-model-snapshot" } - match: { model_snapshot_upgrades.0.snapshot_id: "1234567890" } + - match: { model_snapshot_upgrades.0.state: /failed|loading_old_state/ }