Skip to content

Commit

Permalink
print reason why parent task was cancelled
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Jul 3, 2024
1 parent c71fd4a commit 8201ba5
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ public void testFailedToStartChildTaskAfterCancelled() throws Exception {
mainAction.startSubTask(taskId, subRequest, future);
TransportException te = expectThrows(TransportException.class, future::actionGet);
assertThat(te.getCause(), instanceOf(TaskCancelledException.class));
assertThat(te.getCause().getMessage(), equalTo("The parent task was cancelled, shouldn't start any child tasks"));
assertThat(
te.getCause().getMessage(),
equalTo("The parent task was cancelled, shouldn't start any child tasks, reason:by user request")
);
allowEntireRequest(rootRequest);
waitForRootTask(rootTaskFuture);
ensureAllBansRemoved();
Expand Down Expand Up @@ -386,7 +389,7 @@ static void waitForRootTask(ActionFuture<TestResponse> rootTask) {
assertThat(
cause.getMessage(),
anyOf(
equalTo("The parent task was cancelled, shouldn't start any child tasks"),
equalTo("The parent task was cancelled, shouldn't start any child tasks, reason:by user request"),
containsString("Task cancelled before it started:"),
equalTo("Task was cancelled while executing")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitF
Collection<DiscoveryNode> childrenNodes = taskManager.startBanOnChildrenNodes(task.getId(), () -> {
logger.trace("child tasks of parent [{}] are completed", taskId);
groupedListener.onResponse(null);
});
}, reason);
taskManager.cancel(task, reason, () -> {
logger.trace("task [{}] is cancelled", taskId);
groupedListener.onResponse(null);
Expand Down
11 changes: 7 additions & 4 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,10 @@ public Set<TaskId> getBannedTaskIds() {
* @param onChildTasksCompleted called when all child tasks are completed or failed
* @return the set of current nodes that have outstanding child tasks
*/
public Collection<DiscoveryNode> startBanOnChildrenNodes(long taskId, Runnable onChildTasksCompleted) {
public Collection<DiscoveryNode> startBanOnChildrenNodes(long taskId, Runnable onChildTasksCompleted, String reason) {
final CancellableTaskHolder holder = cancellableTasks.get(taskId);
if (holder != null) {
return holder.startBan(onChildTasksCompleted);
return holder.startBan(onChildTasksCompleted, reason);
} else {
onChildTasksCompleted.run();
return Collections.emptySet();
Expand Down Expand Up @@ -585,6 +585,7 @@ private static class CancellableTaskHolder {
private List<Runnable> cancellationListeners = null;
private Map<DiscoveryNode, Integer> childTasksPerNode = null;
private boolean banChildren = false;
private String banReason;
private List<Runnable> childTaskCompletedListeners = null;

CancellableTaskHolder(CancellableTask task) {
Expand Down Expand Up @@ -662,7 +663,7 @@ public CancellableTask getTask() {

synchronized void registerChildNode(DiscoveryNode node) {
if (banChildren) {
throw new TaskCancelledException("The parent task was cancelled, shouldn't start any child tasks");
throw new TaskCancelledException("The parent task was cancelled, shouldn't start any child tasks, " + banReason);
}
if (childTasksPerNode == null) {
childTasksPerNode = new HashMap<>();
Expand All @@ -686,11 +687,13 @@ void unregisterChildNode(DiscoveryNode node) {
notifyListeners(listeners);
}

Set<DiscoveryNode> startBan(Runnable onChildTasksCompleted) {
Set<DiscoveryNode> startBan(Runnable onChildTasksCompleted, String reason) {
final Set<DiscoveryNode> pendingChildNodes;
final Runnable toRun;
synchronized (this) {
banChildren = true;
assert reason != null;
banReason = reason;
if (childTasksPerNode == null) {
pendingChildNodes = Collections.emptySet();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public void testRegisterAndExecuteChildTaskWhileParentTaskIsBeingCanceled() thro
);
assertThat(cancelledException.getMessage(), startsWith("Task cancelled before it started:"));
CountDownLatch latch = new CountDownLatch(1);
taskManager.startBanOnChildrenNodes(parentTaskId.getId(), latch::countDown);
taskManager.startBanOnChildrenNodes(parentTaskId.getId(), latch::countDown, cancelledException.getMessage());
assertTrue("onChildTasksCompleted() is not invoked", latch.await(1, TimeUnit.SECONDS));
}

Expand Down

0 comments on commit 8201ba5

Please sign in to comment.