|
5 | 5 | */ |
6 | 6 | package org.elasticsearch.xpack.ml.action; |
7 | 7 |
|
| 8 | +import org.elasticsearch.ElasticsearchException; |
8 | 9 | import org.elasticsearch.ElasticsearchStatusException; |
9 | 10 | import org.elasticsearch.ElasticsearchTimeoutException; |
| 11 | +import org.elasticsearch.ResourceNotFoundException; |
10 | 12 | import org.elasticsearch.action.ActionListener; |
11 | 13 | import org.elasticsearch.action.support.ActionFilters; |
12 | 14 | import org.elasticsearch.action.support.master.AcknowledgedResponse; |
|
34 | 36 | import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; |
35 | 37 | import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; |
36 | 38 |
|
| 39 | +import java.util.Comparator; |
37 | 40 | import java.util.List; |
38 | 41 | import java.util.Set; |
39 | 42 | import java.util.concurrent.atomic.AtomicBoolean; |
40 | 43 | import java.util.stream.Collectors; |
41 | 44 |
|
| 45 | +import static org.elasticsearch.ExceptionsHelper.rethrowAndSuppress; |
42 | 46 | import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; |
43 | 47 | import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; |
44 | 48 | import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; |
@@ -120,9 +124,20 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat |
120 | 124 | .cluster() |
121 | 125 | .prepareListTasks() |
122 | 126 | .setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]") |
| 127 | + // There is a chance that we failed un-allocating a task due to allocation_id being changed |
| 128 | + // This call will timeout in that case and return an error |
123 | 129 | .setWaitForCompletion(true) |
124 | 130 | .setTimeout(request.timeout()).execute(ActionListener.wrap( |
125 | | - r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), |
| 131 | + r -> { |
| 132 | + try { |
| 133 | + // Handle potential node timeouts, |
| 134 | + // these should be considered failures as tasks as still potentially executing |
| 135 | + rethrowAndSuppress(r.getNodeFailures()); |
| 136 | + wrappedListener.onResponse(new AcknowledgedResponse(true)); |
| 137 | + } catch (ElasticsearchException ex) { |
| 138 | + wrappedListener.onFailure(ex); |
| 139 | + } |
| 140 | + }, |
126 | 141 | wrappedListener::onFailure)); |
127 | 142 | }, |
128 | 143 | wrappedListener::onFailure |
@@ -244,10 +259,19 @@ private void unassignPersistentTasks(PersistentTasksCustomMetaData tasksCustomMe |
244 | 259 | .stream() |
245 | 260 | .filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) || |
246 | 261 | persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME))) |
| 262 | + // We want to always have the same ordering of which tasks we un-allocate first. |
| 263 | + // However, the order in which the distributed tasks handle the un-allocation event is not guaranteed. |
| 264 | + .sorted(Comparator.comparing(PersistentTask::getTaskName)) |
247 | 265 | .collect(Collectors.toList()); |
248 | 266 |
|
249 | 267 | TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor = |
250 | | - new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true); |
| 268 | + new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), |
| 269 | + r -> true, |
| 270 | + // Another process could modify tasks and thus we cannot find them via the allocation_id and name |
| 271 | + // If the task was removed from the node, all is well |
| 272 | + // We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion |
| 273 | + // Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise. |
| 274 | + ex -> ex instanceof ResourceNotFoundException == false); |
251 | 275 |
|
252 | 276 | for (PersistentTask<?> task : datafeedAndJobTasks) { |
253 | 277 | chainTaskExecutor.add( |
|
0 commit comments