Skip to content

Commit 55c158a

Browse files
Verify task hierarchy is cancelled
1 parent b445b6d commit 55c158a

File tree

1 file changed

+43
-15
lines changed

1 file changed

+43
-15
lines changed

x-pack/plugin/profiler/src/internalClusterTest/java/org/elasticsearch/xpack/profiler/GetProfilingActionIT.java

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@
4343
import java.util.Collection;
4444
import java.util.Collections;
4545
import java.util.HashMap;
46+
import java.util.HashSet;
4647
import java.util.List;
4748
import java.util.Map;
49+
import java.util.Set;
4850
import java.util.concurrent.CancellationException;
4951
import java.util.concurrent.TimeUnit;
5052
import java.util.concurrent.atomic.AtomicBoolean;
@@ -185,8 +187,9 @@ void verifyCancellation(String action, Request restRequest) throws Exception {
185187
Cancellable cancellable = getRestClient().performRequestAsync(restRequest, wrapAsRestResponseListener(future));
186188

187189
awaitForBlock(plugins);
190+
Collection<TaskId> profilingTasks = collectProfilingRelatedTasks(action);
188191
cancellable.cancel();
189-
ensureTaskIsCancelled(action, nodeIdToName::get);
192+
ensureTasksAreCancelled(profilingTasks, nodeIdToName::get);
190193

191194
disableBlocks(plugins);
192195
expectThrows(CancellationException.class, future::actionGet);
@@ -202,22 +205,35 @@ private static Map<String, String> readNodesInfo() {
202205
return nodeIdToName;
203206
}
204207

205-
private static void ensureTaskIsCancelled(String transportAction, Function<String, String> nodeIdToName) throws Exception {
206-
SetOnce<TaskInfo> searchTask = new SetOnce<>();
208+
private static Collection<TaskId> collectProfilingRelatedTasks(String transportAction) {
209+
SetOnce<TaskInfo> profilingTask = new SetOnce<>();
210+
Map<TaskId, Set<TaskId>> taskToParent = new HashMap<>();
207211
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
208212
for (TaskInfo task : listTasksResponse.getTasks()) {
213+
TaskId parentTaskId = task.parentTaskId();
214+
if (parentTaskId != null) {
215+
if (taskToParent.containsKey(parentTaskId) == false) {
216+
taskToParent.put(parentTaskId, new HashSet<>());
217+
}
218+
taskToParent.get(parentTaskId).add(task.taskId());
219+
}
209220
if (task.action().equals(transportAction)) {
210-
searchTask.set(task);
221+
profilingTask.set(task);
211222
}
212223
}
213-
assertNotNull(searchTask.get());
214-
TaskId taskId = searchTask.get().taskId();
215-
String nodeName = nodeIdToName.apply(taskId.getNodeId());
224+
assertNotNull(profilingTask.get());
225+
return taskToParent.get(profilingTask.get().taskId());
226+
}
227+
228+
private static void ensureTasksAreCancelled(Collection<TaskId> taskIds, Function<String, String> nodeIdToName) throws Exception {
216229
assertBusy(() -> {
217-
TaskManager taskManager = internalCluster().getInstance(TransportService.class, nodeName).getTaskManager();
218-
Task task = taskManager.getTask(taskId.getId());
219-
assertThat(task, instanceOf(CancellableTask.class));
220-
assertTrue(((CancellableTask) task).isCancelled());
230+
for (TaskId taskId : taskIds) {
231+
String nodeName = nodeIdToName.apply(taskId.getNodeId());
232+
TaskManager taskManager = internalCluster().getInstance(TransportService.class, nodeName).getTaskManager();
233+
Task task = taskManager.getTask(taskId.getId());
234+
assertThat(task, instanceOf(CancellableTask.class));
235+
assertTrue(((CancellableTask) task).isCancelled());
236+
}
221237
});
222238
}
223239

@@ -229,6 +245,10 @@ private static List<ScriptedBlockPlugin> initBlockFactory() {
229245
for (ScriptedBlockPlugin plugin : plugins) {
230246
plugin.reset();
231247
plugin.enableBlock();
248+
// Allow to execute one search and only block starting with the second one. This
249+
// is done so we have at least one child action and can check that all active children
250+
// are cancelled with the parent action.
251+
plugin.setSlack(1);
232252
}
233253
return plugins;
234254
}
@@ -255,6 +275,8 @@ public static class ScriptedBlockPlugin extends MockScriptPlugin {
255275

256276
private final AtomicInteger hits = new AtomicInteger();
257277

278+
private final AtomicInteger slack = new AtomicInteger(0);
279+
258280
private final AtomicBoolean shouldBlock = new AtomicBoolean(true);
259281

260282
void reset() {
@@ -269,16 +291,22 @@ void enableBlock() {
269291
shouldBlock.set(true);
270292
}
271293

294+
void setSlack(int slack) {
295+
this.slack.set(slack);
296+
}
297+
272298
@Override
273299
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
274300
return Collections.singletonMap(SCRIPT_NAME, params -> {
275301
LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
276302
LogManager.getLogger(GetProfilingActionIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
277303
hits.incrementAndGet();
278-
try {
279-
waitUntil(() -> shouldBlock.get() == false);
280-
} catch (Exception e) {
281-
throw new RuntimeException(e);
304+
if (slack.decrementAndGet() < 0) {
305+
try {
306+
waitUntil(() -> shouldBlock.get() == false);
307+
} catch (Exception e) {
308+
throw new RuntimeException(e);
309+
}
282310
}
283311
return true;
284312
});

0 commit comments

Comments
 (0)