diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index d5e85e1464c1..2d5ffaa76d98 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -20,26 +20,28 @@ import static org.junit.platform.engine.support.hierarchical.ExclusiveResource.GLOBAL_READ_WRITE; import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.SAME_THREAD; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; +import java.util.Deque; import java.util.EnumMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -50,8 +52,8 @@ import org.junit.platform.commons.logging.LoggerFactory; import org.junit.platform.commons.util.ClassLoaderUtils; import org.junit.platform.commons.util.Preconditions; +import org.junit.platform.commons.util.ToStringBuilder; import org.junit.platform.engine.ConfigurationParameters; -import org.junit.platform.engine.UniqueId; /** * @since 6.1 @@ -76,10 +78,12 @@ public ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration, ClassLoader classLoader) { ThreadFactory threadFactory = new WorkerThreadFactory(classLoader); - threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(), - configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory); parallelism = configuration.getParallelism(); workerLeaseManager = new WorkerLeaseManager(parallelism, this::maybeStartWorker); + var rejectedExecutionHandler = new LeaseAwareRejectedExecutionHandler(workerLeaseManager); + threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(), + configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory, + rejectedExecutionHandler); LOGGER.trace(() -> "initialized thread pool for parallelism of " + configuration.getParallelism()); } @@ -103,7 +107,9 @@ public void close() { return completedFuture(null); } - return new WorkStealingFuture(enqueue(testTask)); + var entry = enqueue(testTask); + workerThread.trackSubmittedChild(entry); + return new WorkStealingFuture(entry); } @Override @@ -146,26 +152,25 @@ private void maybeStartWorker(BooleanSupplier doneCondition) { if (workerLease == null) { return; } - try { - threadPool.execute(() -> { - LOGGER.trace(() -> "starting worker"); - try { - WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition); - } - finally { - workerLease.release(false); - LOGGER.trace(() -> "stopping worker"); - } - maybeStartWorker(doneCondition); - }); - } - catch (RejectedExecutionException e) { - workerLease.release(false); - if (threadPool.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) { - return; + threadPool.execute(new RunLeaseAwareWorker(workerLease, + () -> WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition), + () -> this.maybeStartWorker(doneCondition))); + } + + private record RunLeaseAwareWorker(WorkerLease workerLease, Runnable work, Runnable onWorkerFinished) + implements Runnable { + + @Override + public void run() { + LOGGER.trace(() -> "starting worker"); + try { + work.run(); + } + finally { + workerLease.release(false); + LOGGER.trace(() -> "stopping worker"); } - LOGGER.error(e, () -> "failed to submit worker to thread pool"); - throw e; + onWorkerFinished.run(); } } @@ -196,6 +201,8 @@ private class WorkerThread extends Thread { @Nullable WorkerLease workerLease; + private final Deque stateStack = new ArrayDeque<>(); + WorkerThread(Runnable runnable, String name) { super(runnable, name); } @@ -226,13 +233,25 @@ void processQueueEntries(WorkerLease workerLease, BooleanSupplier doneCondition) LOGGER.trace(() -> "yielding resource lock"); break; } - var entry = workQueue.poll(); - if (entry == null) { - LOGGER.trace(() -> "no queue entry available"); + if (workQueue.isEmpty()) { + LOGGER.trace(() -> "no queue entries available"); break; } - LOGGER.trace(() -> "processing: " + entry.task); - execute(entry); + processQueueEntries(); + } + } + + private void processQueueEntries() { + var queueEntriesByResult = tryToStealWorkWithoutBlocking(workQueue); + var queueModified = queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) // + || queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER); + if (queueModified) { + return; + } + var entriesRequiringResourceLocks = queueEntriesByResult.get(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); + if (entriesRequiringResourceLocks != null) { + // One entry at a time to avoid blocking too much + tryToStealWork(entriesRequiringResourceLocks.get(0), BlockingMode.BLOCKING); } } @@ -265,18 +284,17 @@ void invokeAll(List testTasks) { List isolatedTasks = new ArrayList<>(testTasks.size()); List sameThreadTasks = new ArrayList<>(testTasks.size()); - var forkedChildren = forkConcurrentChildren(testTasks, isolatedTasks::add, sameThreadTasks); + var reverseQueueEntries = forkConcurrentChildren(testTasks, isolatedTasks::add, sameThreadTasks); executeAll(sameThreadTasks); - var queueEntriesByResult = tryToStealWorkWithoutBlocking(forkedChildren); - tryToStealWorkWithBlocking(queueEntriesByResult); - waitFor(queueEntriesByResult); + var reverseQueueEntriesByResult = tryToStealWorkWithoutBlocking(reverseQueueEntries); + tryToStealWorkWithBlocking(reverseQueueEntriesByResult); + waitFor(reverseQueueEntriesByResult); executeAll(isolatedTasks); } private List forkConcurrentChildren(List children, Consumer isolatedTaskCollector, List sameThreadTasks) { - int index = 0; List queueEntries = new ArrayList<>(children.size()); for (TestTask child : children) { if (requiresGlobalReadWriteLock(child)) { @@ -286,7 +304,7 @@ else if (child.getExecutionMode() == SAME_THREAD) { sameThreadTasks.add(child); } else { - queueEntries.add(WorkQueue.Entry.createWithIndex(child, index++)); + queueEntries.add(workQueue.createEntry(child)); } } @@ -299,32 +317,29 @@ else if (child.getExecutionMode() == SAME_THREAD) { } forkAll(queueEntries); } - + queueEntries.sort(reverseOrder()); return queueEntries; } private Map> tryToStealWorkWithoutBlocking( - List forkedChildren) { + Iterable queueEntries) { Map> queueEntriesByResult = new EnumMap<>(WorkStealResult.class); - if (!forkedChildren.isEmpty()) { - forkedChildren.sort(reverseOrder()); - tryToStealWork(forkedChildren, BlockingMode.NON_BLOCKING, queueEntriesByResult); - } + tryToStealWork(queueEntries, BlockingMode.NON_BLOCKING, queueEntriesByResult); return queueEntriesByResult; } private void tryToStealWorkWithBlocking(Map> queueEntriesByResult) { - var childrenRequiringResourceLocks = queueEntriesByResult.remove(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); - if (childrenRequiringResourceLocks == null) { + var entriesRequiringResourceLocks = queueEntriesByResult.remove(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); + if (entriesRequiringResourceLocks == null) { return; } - tryToStealWork(childrenRequiringResourceLocks, BlockingMode.BLOCKING, queueEntriesByResult); + tryToStealWork(entriesRequiringResourceLocks, BlockingMode.BLOCKING, queueEntriesByResult); } - private void tryToStealWork(List children, BlockingMode blocking, + private void tryToStealWork(Iterable entries, BlockingMode blocking, Map> queueEntriesByResult) { - for (var entry : children) { + for (var entry : entries) { var state = tryToStealWork(entry, blocking); queueEntriesByResult.computeIfAbsent(state, __ -> new ArrayList<>()).add(entry); } @@ -336,7 +351,7 @@ private WorkStealResult tryToStealWork(WorkQueue.Entry entry, BlockingMode block } var claimed = workQueue.remove(entry); if (claimed) { - LOGGER.trace(() -> "stole work: " + entry); + LOGGER.trace(() -> "stole work: " + entry.task); var executed = executeStolenWork(entry, blockingMode); if (executed) { return WorkStealResult.EXECUTED_BY_THIS_WORKER; @@ -465,10 +480,12 @@ private boolean tryExecuteTask(TestTask testTask) { private void doExecute(TestTask testTask) { LOGGER.trace(() -> "executing: " + testTask); + stateStack.push(new State()); try { testTask.execute(); } finally { + stateStack.pop(); LOGGER.trace(() -> "finished executing: " + testTask); } } @@ -481,8 +498,52 @@ private static CompletableFuture toCombinedFuture(List entri return CompletableFuture.allOf(futures); } + private void trackSubmittedChild(WorkQueue.Entry entry) { + stateStack.element().trackSubmittedChild(entry); + } + + private void tryToStealWorkFromSubmittedChildren() { + var currentState = stateStack.element(); + var currentSubmittedChildren = currentState.submittedChildren; + if (currentSubmittedChildren == null || currentSubmittedChildren.isEmpty()) { + return; + } + var iterator = currentSubmittedChildren.listIterator(currentSubmittedChildren.size()); + while (iterator.hasPrevious()) { + WorkQueue.Entry entry = iterator.previous(); + var result = tryToStealWork(entry, BlockingMode.NON_BLOCKING); + if (result.isExecuted()) { + iterator.remove(); + } + } + currentState.clearIfEmpty(); + } + + private static class State { + + @Nullable + private List submittedChildren; + + private void trackSubmittedChild(WorkQueue.Entry entry) { + if (submittedChildren == null) { + submittedChildren = new ArrayList<>(); + } + submittedChildren.add(entry); + } + + private void clearIfEmpty() { + if (submittedChildren != null && submittedChildren.isEmpty()) { + submittedChildren = null; + } + } + } + private enum WorkStealResult { - EXECUTED_BY_DIFFERENT_WORKER, RESOURCE_LOCK_UNAVAILABLE, EXECUTED_BY_THIS_WORKER + EXECUTED_BY_DIFFERENT_WORKER, RESOURCE_LOCK_UNAVAILABLE, EXECUTED_BY_THIS_WORKER; + + private boolean isExecuted() { + return this != RESOURCE_LOCK_UNAVAILABLE; + } } private interface BlockingAction { @@ -510,8 +571,11 @@ private static class WorkStealingFuture extends BlockingAwareFuture<@Nullable Vo if (entry.future.isDone()) { return callable.call(); } - // TODO steal other dynamic children until future is done and check again before blocking - LOGGER.trace(() -> "blocking for child task"); + workerThread.tryToStealWorkFromSubmittedChildren(); + if (entry.future.isDone()) { + return callable.call(); + } + LOGGER.trace(() -> "blocking for child task: " + entry.task); return workerThread.runBlocking(entry.future::isDone, () -> { try { return callable.call(); @@ -528,24 +592,27 @@ private enum BlockingMode { NON_BLOCKING, BLOCKING } - private static class WorkQueue { - - private final EntryOrdering ordering = new EntryOrdering(); - private final Queue queue = new PriorityBlockingQueue<>(); + private static class WorkQueue implements Iterable { + private final AtomicLong index = new AtomicLong(); + private final Set queue = new ConcurrentSkipListSet<>(); Entry add(TestTask task) { - Entry entry = Entry.create(task); + Entry entry = createEntry(task); LOGGER.trace(() -> "forking: " + entry.task); return doAdd(entry); } + Entry createEntry(TestTask task) { + int level = task.getTestDescriptor().getUniqueId().getSegments().size(); + return new Entry(task, new CompletableFuture<>(), level, index.getAndIncrement()); + } + void addAll(Collection entries) { entries.forEach(this::doAdd); } void reAdd(Entry entry) { LOGGER.trace(() -> "re-enqueuing: " + entry.task); - ordering.incrementAttempts(entry); doAdd(entry); } @@ -557,11 +624,6 @@ private Entry doAdd(Entry entry) { return entry; } - @Nullable - Entry poll() { - return queue.poll(); - } - boolean remove(Entry entry) { return queue.remove(entry); } @@ -570,17 +632,13 @@ boolean isEmpty() { return queue.isEmpty(); } - private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, int index) - implements Comparable { - - static Entry create(TestTask task) { - return createWithIndex(task, 0); - } + @Override + public Iterator iterator() { + return queue.iterator(); + } - static Entry createWithIndex(TestTask task, int index) { - int level = task.getTestDescriptor().getUniqueId().getSegments().size(); - return new Entry(task, new CompletableFuture<>(), level, index); - } + private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, long index) + implements Comparable { @SuppressWarnings("FutureReturnValueIgnored") Entry { @@ -604,7 +662,7 @@ public int compareTo(Entry that) { if (result != 0) { return result; } - return Integer.compare(that.index, this.index); + return Long.compare(that.index, this.index); } private boolean isContainer() { @@ -612,44 +670,6 @@ private boolean isContainer() { } } - - static class EntryOrdering implements Comparator { - - private final ConcurrentMap attempts = new ConcurrentHashMap<>(); - - @Override - public int compare(Entry a, Entry b) { - var result = a.compareTo(b); - if (result == 0) { - result = Integer.compare(attempts(b), attempts(a)); - } - return result; - } - - void incrementAttempts(Entry entry) { - attempts.compute(key(entry), (key, n) -> { - if (n == null) { - registerForKeyRemoval(entry, key); - return 1; - } - return n + 1; - }); - } - - private int attempts(Entry entry) { - return attempts.getOrDefault(key(entry), 0); - } - - @SuppressWarnings("FutureReturnValueIgnored") - private void registerForKeyRemoval(Entry entry, UniqueId key) { - entry.future.whenComplete((__, ___) -> attempts.remove(key)); - } - - private static UniqueId key(Entry entry) { - return entry.task.getTestDescriptor().getUniqueId(); - } - } - } static class WorkerLeaseManager { @@ -699,6 +719,14 @@ void reacquire() throws InterruptedException { LOGGER.trace(() -> "reacquired worker lease (available: %d)".formatted(semaphore.availablePermits())); } } + + @Override + public String toString() { + return new ToStringBuilder(this) // + .append("parallelism", parallelism) // + .append("semaphore", semaphore) // + .toString(); + } } static class WorkerLease implements AutoCloseable { @@ -735,4 +763,19 @@ void reacquire() throws InterruptedException { reacquisitionToken = null; } } + + private record LeaseAwareRejectedExecutionHandler(WorkerLeaseManager workerLeaseManager) + implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (!(r instanceof RunLeaseAwareWorker worker)) { + return; + } + worker.workerLease.release(false); + if (executor.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) { + return; + } + throw new RejectedExecutionException("Task with " + workerLeaseManager + " rejected from " + executor); + } + } } diff --git a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java index 8ee150354e6f..1491fbb2f4b1 100644 --- a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java +++ b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java @@ -226,6 +226,80 @@ void runsTasksWithoutConflictingLocksConcurrently() throws Exception { assertThat(leaves).allSatisfy(TestTaskStub::assertExecutedSuccessfully); } + @Test + void processingQueueEntriesSkipsOverUnavailableResources() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2)); + + var resourceLock = new SingleLock(exclusiveResource(LockMode.READ_WRITE), new ReentrantLock()); + + var lockFreeChildrenStarted = new CountDownLatch(2); + var child1Started = new CountDownLatch(1); + + Executable child1Behaviour = () -> { + child1Started.countDown(); + lockFreeChildrenStarted.await(); + }; + Executable child4Behaviour = () -> { + lockFreeChildrenStarted.countDown(); + child1Started.await(); + }; + + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, child1Behaviour) // + .withResourceLock(resourceLock) // + .withName("child1"); + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, lockFreeChildrenStarted::countDown).withName("child2"); // + var child3 = new TestTaskStub(ExecutionMode.CONCURRENT).withResourceLock(resourceLock) // + .withName("child3"); + var child4 = new TestTaskStub(ExecutionMode.CONCURRENT, child4Behaviour).withName("child4"); + var children = List.of(child1, child2, child3, child4); + var root = new TestTaskStub(ExecutionMode.CONCURRENT, () -> requiredService().invokeAll(children)) // + .withName("root"); + + service.submit(root).get(); + + root.assertExecutedSuccessfully(); + assertThat(children).allSatisfy(TestTaskStub::assertExecutedSuccessfully); + assertThat(child4.executionThread).isEqualTo(child2.executionThread); + assertThat(child3.startTime).isAfterOrEqualTo(child2.startTime); + } + + @Test + void invokeAllQueueEntriesSkipsOverUnavailableResources() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2)); + + var resourceLock = new SingleLock(exclusiveResource(LockMode.READ_WRITE), new ReentrantLock()); + + var lockFreeChildrenStarted = new CountDownLatch(2); + var child4Started = new CountDownLatch(1); + + Executable child1Behaviour = () -> { + lockFreeChildrenStarted.countDown(); + child4Started.await(); + }; + Executable child4Behaviour = () -> { + child4Started.countDown(); + lockFreeChildrenStarted.await(); + }; + + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, child1Behaviour) // + .withName("child1"); + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT).withResourceLock(resourceLock) // + .withName("child2"); // + var child3 = new TestTaskStub(ExecutionMode.CONCURRENT, lockFreeChildrenStarted::countDown).withName("child3"); + var child4 = new TestTaskStub(ExecutionMode.CONCURRENT, child4Behaviour).withResourceLock(resourceLock) // + .withName("child4"); + var children = List.of(child1, child2, child3, child4); + var root = new TestTaskStub(ExecutionMode.CONCURRENT, () -> requiredService().invokeAll(children)) // + .withName("root"); + + service.submit(root).get(); + + root.assertExecutedSuccessfully(); + assertThat(children).allSatisfy(TestTaskStub::assertExecutedSuccessfully); + assertThat(child1.executionThread).isEqualTo(child3.executionThread); + assertThat(child2.startTime).isAfterOrEqualTo(child3.startTime); + } + @Test void prioritizesChildrenOfStartedContainers() throws Exception { service = new ConcurrentHierarchicalTestExecutorService(configuration(2)); @@ -451,6 +525,144 @@ void workIsStolenInReverseOrder() throws Exception { .isSorted(); } + @RepeatedTest(value = 100, failureThreshold = 1) + void stealsDynamicChildren() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2, 2)); + + var child1Started = new CountDownLatch(1); + var child2Finished = new CountDownLatch(1); + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> { + child1Started.countDown(); + child2Finished.await(); + }) // + .withName("child1").withLevel(2); + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, child2Finished::countDown) // + .withName("child2").withLevel(2); + + var root = new TestTaskStub(ExecutionMode.SAME_THREAD, () -> { + var future1 = requiredService().submit(child1); + child1Started.await(); + var future2 = requiredService().submit(child2); + future1.get(); + future2.get(); + }) // + .withName("root").withLevel(1); + + service.submit(root).get(); + + assertThat(Stream.of(root, child1, child2)) // + .allSatisfy(TestTaskStub::assertExecutedSuccessfully); + assertThat(child2.executionThread).isEqualTo(root.executionThread).isNotEqualTo(child1.executionThread); + } + + @RepeatedTest(value = 100, failureThreshold = 1) + void stealsNestedDynamicChildren() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2, 2)); + + var barrier = new CyclicBarrier(2); + + var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT) // + .withName("leaf1a").withLevel(3); + var leaf1b = new TestTaskStub(ExecutionMode.CONCURRENT) // + .withName("leaf1b").withLevel(3); + + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> { + barrier.await(); + var futureA = requiredService().submit(leaf1a); + barrier.await(); + var futureB = requiredService().submit(leaf1b); + futureA.get(); + futureB.get(); + barrier.await(); + }) // + .withName("child1").withLevel(2); + + var leaf2a = new TestTaskStub(ExecutionMode.CONCURRENT) // + .withName("leaf2a").withLevel(3); + var leaf2b = new TestTaskStub(ExecutionMode.CONCURRENT) // + .withName("leaf2b").withLevel(3); + + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> { + barrier.await(); + var futureA = requiredService().submit(leaf2a); + barrier.await(); + var futureB = requiredService().submit(leaf2b); + futureB.get(); + futureA.get(); + barrier.await(); + }) // + .withName("child2").withLevel(2); + + var root = new TestTaskStub(ExecutionMode.SAME_THREAD, () -> { + var future1 = requiredService().submit(child1); + var future2 = requiredService().submit(child2); + future1.get(); + future2.get(); + }) // + .withName("root").withLevel(1); + + service.submit(root).get(); + + assertThat(Stream.of(root, child1, child2, leaf1a, leaf1b, leaf2a, leaf2b)) // + .allSatisfy(TestTaskStub::assertExecutedSuccessfully); + assertThat(child2.executionThread).isNotEqualTo(child1.executionThread); + assertThat(child1.executionThread).isEqualTo(leaf1a.executionThread).isEqualTo(leaf1b.executionThread); + assertThat(child2.executionThread).isEqualTo(leaf2a.executionThread).isEqualTo(leaf2b.executionThread); + } + + @RepeatedTest(value = 100, failureThreshold = 1) + void stealsSiblingDynamicChildrenOnly() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2, 3)); + + var child1Started = new CountDownLatch(1); + var child2Started = new CountDownLatch(1); + var leaf1ASubmitted = new CountDownLatch(1); + var leaf1AStarted = new CountDownLatch(1); + + var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT, () -> { + leaf1AStarted.countDown(); + child2Started.await(); + }) // + .withName("leaf1a").withLevel(3); + + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> { + child1Started.countDown(); + leaf1ASubmitted.await(); + }) // + .withName("child1").withLevel(2); + + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, child2Started::countDown) // + .withName("child2").withLevel(2); + + var child3 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> { + var futureA = requiredService().submit(leaf1a); + leaf1ASubmitted.countDown(); + leaf1AStarted.await(); + futureA.get(); + }) // + .withName("child3").withLevel(2); + + var root = new TestTaskStub(ExecutionMode.SAME_THREAD, () -> { + var future1 = requiredService().submit(child1); + child1Started.await(); + var future2 = requiredService().submit(child2); + var future3 = requiredService().submit(child3); + future1.get(); + future2.get(); + future3.get(); + }) // + .withName("root").withLevel(1); + + service.submit(root).get(); + + assertThat(Stream.of(root, child1, child2, child3, leaf1a)) // + .allSatisfy(TestTaskStub::assertExecutedSuccessfully); + + assertThat(child3.executionThread).isNotEqualTo(child1.executionThread).isNotEqualTo(child2.executionThread); + assertThat(child1.executionThread).isNotEqualTo(child2.executionThread); + assertThat(child1.executionThread).isEqualTo(leaf1a.executionThread); + } + private static ExclusiveResource exclusiveResource(LockMode lockMode) { return new ExclusiveResource("key", lockMode); }