diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index d705a199d4a27..b67c7f5fd4d5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -439,10 +439,18 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara JobTask jobTask = (JobTask) task; jobTask.autodetectProcessManager = autodetectProcessManager; JobTaskState jobTaskState = (JobTaskState) state; + JobState jobState = jobTaskState == null ? null : jobTaskState.getState(); + // If the job is closing, simply stop and return + if (JobState.CLOSING.equals(jobState)) { + // Mark as completed instead of using `stop` as stop assumes native processes have started + logger.info("[{}] job got reassigned while stopping. Marking as completed", params.getJobId()); + jobTask.markAsCompleted(); + return; + } // If the job is failed then the Persistent Task Service will // try to restart it on a node restart. Exiting here leaves the // job in the failed state and it must be force closed. - if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) { + if (JobState.FAILED.equals(jobState)) { return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index c92e0e39a2940..4ce08e998545a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -645,11 +645,16 @@ protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyti PersistentTaskState state) { logger.info("[{}] Starting data frame analytics", params.getId()); DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state; + DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState(); - // If we are "stopping" there is nothing to do + // If we are "stopping" there is nothing to do and we should stop + if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) { + logger.info("[{}] data frame analytics got reassigned while stopping. Marking as completed", params.getId()); + task.markAsCompleted(); + return; + } // If we are "failed" then we should leave the task as is; for recovery it must be force stopped. - if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf( - DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) { + if (DataFrameAnalyticsState.FAILED.equals(analyticsState)) { return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index f4a1333279124..1747f2258a0d0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -398,6 +398,14 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa final StartDatafeedAction.DatafeedParams params, final PersistentTaskState state) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; + DatafeedState datafeedState = (DatafeedState) state; + + // If we are "stopping" there is nothing to do + if (DatafeedState.STOPPING.equals(datafeedState)) { + logger.info("[{}] datafeed got reassigned while stopping. Marking as completed", params.getDatafeedId()); + datafeedTask.markAsCompleted(); + return; + } datafeedTask.datafeedManager = datafeedManager; datafeedManager.run(datafeedTask, (error) -> {