Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,18 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
JobTaskState jobTaskState = (JobTaskState) state;
JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
// If the job is closing, simply stop and return
if (JobState.CLOSING.equals(jobState)) {
// Mark as completed instead of using `stop` as stop assumes native processes have started
logger.info("[{}] job got reassigned while stopping. Marking as completed", params.getJobId());
jobTask.markAsCompleted();
return;
}
// If the job is failed then the Persistent Task Service will
// try to restart it on a node restart. Exiting here leaves the
// job in the failed state and it must be force closed.
if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) {
if (JobState.FAILED.equals(jobState)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,16 @@ protected void nodeOperation(AllocatedPersistentTask task, StartDataFrameAnalyti
PersistentTaskState state) {
logger.info("[{}] Starting data frame analytics", params.getId());
DataFrameAnalyticsTaskState analyticsTaskState = (DataFrameAnalyticsTaskState) state;
DataFrameAnalyticsState analyticsState = analyticsTaskState == null ? null : analyticsTaskState.getState();

// If we are "stopping" there is nothing to do
// If we are "stopping" there is nothing to do and we should stop
if (DataFrameAnalyticsState.STOPPING.equals(analyticsState)) {
logger.info("[{}] data frame analytics got reassigned while stopping. Marking as completed", params.getId());
task.markAsCompleted();
return;
}
// If we are "failed" then we should leave the task as is; for recovery it must be force stopped.
if (analyticsTaskState != null && analyticsTaskState.getState().isAnyOf(
DataFrameAnalyticsState.STOPPING, DataFrameAnalyticsState.FAILED)) {
if (DataFrameAnalyticsState.FAILED.equals(analyticsState)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,14 @@ protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTa
final StartDatafeedAction.DatafeedParams params,
final PersistentTaskState state) {
DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask;
DatafeedState datafeedState = (DatafeedState) state;

// If we are "stopping" there is nothing to do
if (DatafeedState.STOPPING.equals(datafeedState)) {
logger.info("[{}] datafeed got reassigned while stopping. Marking as completed", params.getDatafeedId());
datafeedTask.markAsCompleted();
return;
}
datafeedTask.datafeedManager = datafeedManager;
datafeedManager.run(datafeedTask,
(error) -> {
Expand Down