Skip to content

Commit b068814

Browse files
committed
Fix hanging cancelling task with no children
Cancelling tasks with no cancellable children can cause the cancellation operation to hang. This commit fixes this issue.
1 parent ea7077f commit b068814

File tree

2 files changed

+64
-8
lines changed

2 files changed

+64
-8
lines changed

core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,15 @@ protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask
114114
@Override
115115
protected synchronized void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask,
116116
ActionListener<TaskInfo> listener) {
117-
DiscoveryNodes childNodes = clusterService.state().nodes();
118-
final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes));
119-
boolean canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
120-
if (canceled) {
121-
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
117+
String nodeId = clusterService.localNode().getId();
118+
final boolean canceled;
119+
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
120+
DiscoveryNodes childNodes = clusterService.state().nodes();
121+
final BanLock banLock = new BanLock(childNodes.getSize(), () -> removeBanOnNodes(cancellableTask, childNodes));
122+
canceled = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
123+
if (canceled) {
122124
// /In case the task has some child tasks, we need to wait for until ban is set on all nodes
123125
logger.trace("cancelling task {} on child nodes", cancellableTask.getId());
124-
String nodeId = clusterService.localNode().getId();
125126
AtomicInteger responses = new AtomicInteger(childNodes.getSize());
126127
List<Exception> failures = new ArrayList<>();
127128
setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener<Void>() {
@@ -152,15 +153,21 @@ private void processResponse() {
152153
}
153154
}
154155
});
155-
} else {
156+
}
157+
} else {
158+
canceled = taskManager.cancel(cancellableTask, request.getReason(),
159+
() -> listener.onResponse(cancellableTask.taskInfo(nodeId, false)));
160+
if (canceled) {
156161
logger.trace("task {} doesn't have any children that should be cancelled", cancellableTask.getId());
157162
}
158-
} else {
163+
}
164+
if (canceled == false) {
159165
logger.trace("task {} is already cancelled", cancellableTask.getId());
160166
throw new IllegalStateException("task with id " + cancellableTask.getId() + " is already cancelled");
161167
}
162168
}
163169

170+
164171
@Override
165172
protected boolean accumulateExceptions() {
166173
return true;

core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.concurrent.atomic.AtomicReference;
5252

5353
import static org.elasticsearch.test.ClusterServiceUtils.setState;
54+
import static org.hamcrest.Matchers.equalTo;
5455
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5556
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5657

@@ -299,6 +300,54 @@ public void onFailure(Exception e) {
299300
});
300301
}
301302

303+
public void testChildTasksCancellation() throws Exception {
304+
setupTestNodes(Settings.EMPTY);
305+
connectNodes(testNodes);
306+
CountDownLatch responseLatch = new CountDownLatch(1);
307+
final AtomicReference<NodesResponse> responseReference = new AtomicReference<>();
308+
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
309+
Task mainTask = startCancellableTestNodesAction(true, nodesCount, new ActionListener<NodesResponse>() {
310+
@Override
311+
public void onResponse(NodesResponse listTasksResponse) {
312+
responseReference.set(listTasksResponse);
313+
responseLatch.countDown();
314+
}
315+
316+
@Override
317+
public void onFailure(Exception e) {
318+
throwableReference.set(e);
319+
responseLatch.countDown();
320+
}
321+
});
322+
323+
// Cancel all child tasks without cancelling the main task, which should quit on its own
324+
CancelTasksRequest request = new CancelTasksRequest();
325+
request.setReason("Testing Cancellation");
326+
request.setParentTaskId(new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()));
327+
// And send the cancellation request to a random node
328+
CancelTasksResponse response = testNodes[randomIntBetween(1, testNodes.length - 1)].transportCancelTasksAction.execute(request)
329+
.get();
330+
331+
// Awaiting for the main task to finish
332+
responseLatch.await();
333+
334+
// Should have cancelled tasks on all nodes
335+
assertThat(response.getTasks().size(), equalTo(testNodes.length));
336+
337+
assertBusy(() -> {
338+
try {
339+
// Make sure that main task is no longer running
340+
ListTasksResponse listTasksResponse = testNodes[randomIntBetween(0, testNodes.length - 1)]
341+
.transportListTasksAction.execute(new ListTasksRequest().setTaskId(
342+
new TaskId(testNodes[0].discoveryNode.getId(), mainTask.getId()))).get();
343+
assertEquals(0, listTasksResponse.getTasks().size());
344+
345+
} catch (ExecutionException | InterruptedException ex) {
346+
throw new RuntimeException(ex);
347+
}
348+
});
349+
}
350+
302351
public void testTaskCancellationOnCoordinatingNodeLeavingTheCluster() throws Exception {
303352
setupTestNodes(Settings.EMPTY);
304353
connectNodes(testNodes);

0 commit comments

Comments
 (0)