Skip to content

Commit 65e0084

Browse files
authored
[ML] do not start stopping tasks on reassignment (#55315) (#55388)
When a anomaly jobs, datafeeds, and analytics tasks are stopped, they enter an ephemeral state called `STOPPING`. If the node executing the task fails while this is occurring, they could be stuck in the limbo state of `STOPPING`. It is best to mark the tasks as completed if they get reassigned to a node.
1 parent 290361c commit 65e0084

File tree

3 files changed

+25
-4
lines changed

3 files changed

+25
-4
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,10 +437,18 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
437437
JobTask jobTask = (JobTask) task;
438438
jobTask.autodetectProcessManager = autodetectProcessManager;
439439
JobTaskState jobTaskState = (JobTaskState) state;
440+
JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
441+
// If the job is closing, simply stop and return
442+
if (JobState.CLOSING.equals(jobState)) {
443+
// Mark as completed instead of using `stop` as stop assumes native processes have started
444+
logger.info("[{}] job got reassigned while stopping. Marking as completed", params.getJobId());
445+
jobTask.markAsCompleted();
446+
return;
447+
}
440448
// If the job is failed then the Persistent Task Service will
441449
// try to restart it on a node restart. Exiting here leaves the
442450
// job in the failed state and it must be force closed.
443-
if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
451+
if (JobState.FAILED.equals(jobState)) {
444452
return;
445453
}
446454

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -642,11 +642,16 @@ protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyti
642642
PersistentTaskState state) {
643643
logger.info("[{}] Starting data frame analytics", params.getId());
644644
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
645+
DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();
645646

646-
// If we are "stopping" there is nothing to do
647+
// If we are "stopping" there is nothing to do and we should stop
648+
if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {
649+
logger.info("[{}] data frame analytics got reassigned while stopping. Marking as completed", params.getId());
650+
task.markAsCompleted();
651+
return;
652+
}
647653
// If we are "failed" then we should leave the task as is; for recovery it must be force stopped.
648-
if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf(
649-
DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) {
654+
if (DataFrameAnalyticsState.FAILED.equals(analyticsState)) {
650655
return;
651656
}
652657

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,14 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa
397397
final StartDatafeedAction.DatafeedParams params,
398398
final PersistentTaskState state) {
399399
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
400+
DatafeedState datafeedState = (DatafeedState) state;
401+
402+
// If we are "stopping" there is nothing to do
403+
if (DatafeedState.STOPPING.equals(datafeedState)) {
404+
logger.info("[{}] datafeed got reassigned while stopping. Marking as completed", params.getDatafeedId());
405+
datafeedTask.markAsCompleted();
406+
return;
407+
}
400408
datafeedTask.datafeedManager = datafeedManager;
401409
datafeedManager.run(datafeedTask,
402410
(error) -> {

0 commit comments

Comments
 (0)