From 903805fcd6b1538a19e9590f390c489c79b0a662 Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Sun, 19 Oct 2025 20:42:56 +0200 Subject: [PATCH 01/10] Improve naming --- ...urrentHierarchicalTestExecutorService.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index d5e85e1464c1..62d91b1f327c 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -265,11 +265,11 @@ void invokeAll(List testTasks) { List isolatedTasks = new ArrayList<>(testTasks.size()); List sameThreadTasks = new ArrayList<>(testTasks.size()); - var forkedChildren = forkConcurrentChildren(testTasks, isolatedTasks::add, sameThreadTasks); + var reverseQueueEntries = forkConcurrentChildren(testTasks, isolatedTasks::add, sameThreadTasks); executeAll(sameThreadTasks); - var queueEntriesByResult = tryToStealWorkWithoutBlocking(forkedChildren); - tryToStealWorkWithBlocking(queueEntriesByResult); - waitFor(queueEntriesByResult); + var reverseQueueEntriesByResult = tryToStealWorkWithoutBlocking(reverseQueueEntries); + tryToStealWorkWithBlocking(reverseQueueEntriesByResult); + waitFor(reverseQueueEntriesByResult); executeAll(isolatedTasks); } @@ -299,32 +299,31 @@ else if (child.getExecutionMode() == SAME_THREAD) { } forkAll(queueEntries); } - + queueEntries.sort(reverseOrder()); return queueEntries; } private Map> tryToStealWorkWithoutBlocking( - List forkedChildren) { + List queueEntries) { Map> queueEntriesByResult = new EnumMap<>(WorkStealResult.class); - if (!forkedChildren.isEmpty()) { - forkedChildren.sort(reverseOrder()); - tryToStealWork(forkedChildren, BlockingMode.NON_BLOCKING, queueEntriesByResult); + if (!queueEntries.isEmpty()) { + tryToStealWork(queueEntries, BlockingMode.NON_BLOCKING, queueEntriesByResult); } return queueEntriesByResult; } private void tryToStealWorkWithBlocking(Map> queueEntriesByResult) { - var childrenRequiringResourceLocks = queueEntriesByResult.remove(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); - if (childrenRequiringResourceLocks == null) { + var entriesRequiringResourceLocks = queueEntriesByResult.remove(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); + if (entriesRequiringResourceLocks == null) { return; } - tryToStealWork(childrenRequiringResourceLocks, BlockingMode.BLOCKING, queueEntriesByResult); + tryToStealWork(entriesRequiringResourceLocks, BlockingMode.BLOCKING, queueEntriesByResult); } - private void tryToStealWork(List children, BlockingMode blocking, + private void tryToStealWork(List entries, BlockingMode blocking, Map> queueEntriesByResult) { - for (var entry : children) { + for (var entry : entries) { var state = tryToStealWork(entry, blocking); queueEntriesByResult.computeIfAbsent(state, __ -> new ArrayList<>()).add(entry); } From 185692f9bfc003153cec28b2328c5831ce64c28a Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Mon, 20 Oct 2025 00:04:29 +0200 Subject: [PATCH 02/10] Skip over unavailable resources --- ...urrentHierarchicalTestExecutorService.java | 33 +++++++-- ...tHierarchicalTestExecutorServiceTests.java | 74 +++++++++++++++++++ 2 files changed, 99 insertions(+), 8 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 62d91b1f327c..91a79307a22e 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -226,13 +226,26 @@ void processQueueEntries(WorkerLease workerLease, BooleanSupplier doneCondition) LOGGER.trace(() -> "yielding resource lock"); break; } - var entry = workQueue.poll(); - if (entry == null) { - LOGGER.trace(() -> "no queue entry available"); + var queueEntries = workQueue.peekAll(); + if (queueEntries.isEmpty()) { + LOGGER.trace(() -> "no queue entries available"); break; } - LOGGER.trace(() -> "processing: " + entry.task); - execute(entry); + var queueEntriesByResult = tryToStealWorkWithoutBlocking(queueEntries); + maybeTryToStealWorkWithBlocking(queueEntriesByResult); + } + } + + private void maybeTryToStealWorkWithBlocking(Map> queueEntriesByResult) { + if (queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) || // + queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER)) { + // Queue changed. Try to see if there is work that does not need locking + return; + } + // All resources locked, start blocking + var entriesRequiringResourceLocks = queueEntriesByResult.remove(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); + if (entriesRequiringResourceLocks != null) { + tryToStealWork(entriesRequiringResourceLocks.get(0), BlockingMode.BLOCKING); } } @@ -556,9 +569,13 @@ private Entry doAdd(Entry entry) { return entry; } - @Nullable - Entry poll() { - return queue.poll(); + private List peekAll() { + List entries = new ArrayList<>(queue); + // Iteration order isn't the same as queue order. + // TODO: This makes the queue kinda pointless + // TODO: This also makes retries pointless + entries.sort(naturalOrder()); + return entries; } boolean remove(Entry entry) { diff --git a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java index 8ee150354e6f..17db098e1e45 100644 --- a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java +++ b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorServiceTests.java @@ -226,6 +226,80 @@ void runsTasksWithoutConflictingLocksConcurrently() throws Exception { assertThat(leaves).allSatisfy(TestTaskStub::assertExecutedSuccessfully); } + @Test + void processingQueueEntriesSkipsOverUnavailableResources() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2)); + + var resourceLock = new SingleLock(exclusiveResource(LockMode.READ_WRITE), new ReentrantLock()); + + var lockFreeChildrenStarted = new CountDownLatch(2); + var child1Started = new CountDownLatch(1); + + Executable child1Behaviour = () -> { + child1Started.countDown(); + lockFreeChildrenStarted.await(); + }; + Executable child4Behaviour = () -> { + lockFreeChildrenStarted.countDown(); + child1Started.await(); + }; + + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, child1Behaviour) // + .withResourceLock(resourceLock) // + .withName("child1"); + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, lockFreeChildrenStarted::countDown).withName("child2"); // + var child3 = new TestTaskStub(ExecutionMode.CONCURRENT).withResourceLock(resourceLock) // + .withName("child3"); + var child4 = new TestTaskStub(ExecutionMode.CONCURRENT, child4Behaviour).withName("child4"); + var children = List.of(child1, child2, child3, child4); + var root = new TestTaskStub(ExecutionMode.CONCURRENT, () -> requiredService().invokeAll(children)) // + .withName("root"); + + service.submit(root).get(); + + root.assertExecutedSuccessfully(); + assertThat(children).allSatisfy(TestTaskStub::assertExecutedSuccessfully); + assertThat(child4.executionThread).isEqualTo(child2.executionThread); + assertThat(child3.startTime).isAfterOrEqualTo(child2.startTime); + } + + @Test + void invokeAllQueueEntriesSkipsOverUnavailableResources() throws Exception { + service = new ConcurrentHierarchicalTestExecutorService(configuration(2)); + + var resourceLock = new SingleLock(exclusiveResource(LockMode.READ_WRITE), new ReentrantLock()); + + var lockFreeChildrenStarted = new CountDownLatch(2); + var child4Started = new CountDownLatch(1); + + Executable child1Behaviour = () -> { + lockFreeChildrenStarted.countDown(); + child4Started.await(); + }; + Executable child4Behaviour = () -> { + child4Started.countDown(); + lockFreeChildrenStarted.await(); + }; + + var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, child1Behaviour) // + .withName("child1"); + var child2 = new TestTaskStub(ExecutionMode.CONCURRENT).withResourceLock(resourceLock) // + .withName("child2"); // + var child3 = new TestTaskStub(ExecutionMode.CONCURRENT, lockFreeChildrenStarted::countDown).withName("child3"); + var child4 = new TestTaskStub(ExecutionMode.CONCURRENT, child4Behaviour).withResourceLock(resourceLock) // + .withName("child4"); + var children = List.of(child1, child2, child3, child4); + var root = new TestTaskStub(ExecutionMode.CONCURRENT, () -> requiredService().invokeAll(children)) // + .withName("root"); + + service.submit(root).get(); + + root.assertExecutedSuccessfully(); + assertThat(children).allSatisfy(TestTaskStub::assertExecutedSuccessfully); + assertThat(child1.executionThread).isEqualTo(child3.executionThread); + assertThat(child2.startTime).isAfterOrEqualTo(child3.startTime); + } + @Test void prioritizesChildrenOfStartedContainers() throws Exception { service = new ConcurrentHierarchicalTestExecutorService(configuration(2)); From dcb27a252712e18693561205bd030c82400b8a9a Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Mon, 20 Oct 2025 15:20:46 +0200 Subject: [PATCH 03/10] Remove unused entry ordering --- ...urrentHierarchicalTestExecutorService.java | 46 ------------------- 1 file changed, 46 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 91a79307a22e..1c194ca9626a 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -22,15 +22,12 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; @@ -51,7 +48,6 @@ import org.junit.platform.commons.util.ClassLoaderUtils; import org.junit.platform.commons.util.Preconditions; import org.junit.platform.engine.ConfigurationParameters; -import org.junit.platform.engine.UniqueId; /** * @since 6.1 @@ -542,7 +538,6 @@ private enum BlockingMode { private static class WorkQueue { - private final EntryOrdering ordering = new EntryOrdering(); private final Queue queue = new PriorityBlockingQueue<>(); Entry add(TestTask task) { @@ -557,7 +552,6 @@ void addAll(Collection entries) { void reAdd(Entry entry) { LOGGER.trace(() -> "re-enqueuing: " + entry.task); - ordering.incrementAttempts(entry); doAdd(entry); } @@ -572,8 +566,6 @@ private Entry doAdd(Entry entry) { private List peekAll() { List entries = new ArrayList<>(queue); // Iteration order isn't the same as queue order. - // TODO: This makes the queue kinda pointless - // TODO: This also makes retries pointless entries.sort(naturalOrder()); return entries; } @@ -628,44 +620,6 @@ private boolean isContainer() { } } - - static class EntryOrdering implements Comparator { - - private final ConcurrentMap attempts = new ConcurrentHashMap<>(); - - @Override - public int compare(Entry a, Entry b) { - var result = a.compareTo(b); - if (result == 0) { - result = Integer.compare(attempts(b), attempts(a)); - } - return result; - } - - void incrementAttempts(Entry entry) { - attempts.compute(key(entry), (key, n) -> { - if (n == null) { - registerForKeyRemoval(entry, key); - return 1; - } - return n + 1; - }); - } - - private int attempts(Entry entry) { - return attempts.getOrDefault(key(entry), 0); - } - - @SuppressWarnings("FutureReturnValueIgnored") - private void registerForKeyRemoval(Entry entry, UniqueId key) { - entry.future.whenComplete((__, ___) -> attempts.remove(key)); - } - - private static UniqueId key(Entry entry) { - return entry.task.getTestDescriptor().getUniqueId(); - } - } - } static class WorkerLeaseManager { From 72c2e9e0507bd5e0b56b4455eabad79171bcfb09 Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Mon, 20 Oct 2025 15:45:34 +0200 Subject: [PATCH 04/10] Polishing --- ...oncurrentHierarchicalTestExecutorService.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 1c194ca9626a..6ce20702d61a 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -227,20 +227,20 @@ void processQueueEntries(WorkerLease workerLease, BooleanSupplier doneCondition) LOGGER.trace(() -> "no queue entries available"); break; } - var queueEntriesByResult = tryToStealWorkWithoutBlocking(queueEntries); - maybeTryToStealWorkWithBlocking(queueEntriesByResult); + processQueueEntries(queueEntries); } } - private void maybeTryToStealWorkWithBlocking(Map> queueEntriesByResult) { - if (queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) || // - queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER)) { - // Queue changed. Try to see if there is work that does not need locking + private void processQueueEntries(List queueEntries) { + var queueEntriesByResult = tryToStealWorkWithoutBlocking(queueEntries); + var queueModified = queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) // + || queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER); + if (queueModified) { return; } - // All resources locked, start blocking - var entriesRequiringResourceLocks = queueEntriesByResult.remove(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); + var entriesRequiringResourceLocks = queueEntriesByResult.get(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); if (entriesRequiringResourceLocks != null) { + // One entry at a time to avoid over comitting tryToStealWork(entriesRequiringResourceLocks.get(0), BlockingMode.BLOCKING); } } From 9d39274d7628efb48337ac4840ee5e2a3816918e Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Mon, 20 Oct 2025 15:49:57 +0200 Subject: [PATCH 05/10] Polishing with spotless --- .../hierarchical/ConcurrentHierarchicalTestExecutorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 6ce20702d61a..8effe036558b 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -234,7 +234,7 @@ void processQueueEntries(WorkerLease workerLease, BooleanSupplier doneCondition) private void processQueueEntries(List queueEntries) { var queueEntriesByResult = tryToStealWorkWithoutBlocking(queueEntries); var queueModified = queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) // - || queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER); + || queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER); if (queueModified) { return; } From e46a3322bbcb6b643c2923597a6cbc8af81185d6 Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Mon, 20 Oct 2025 16:07:44 +0200 Subject: [PATCH 06/10] Polishing --- .../hierarchical/ConcurrentHierarchicalTestExecutorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 8effe036558b..5f645f589109 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -240,7 +240,7 @@ private void processQueueEntries(List queueEntries) { } var entriesRequiringResourceLocks = queueEntriesByResult.get(WorkStealResult.RESOURCE_LOCK_UNAVAILABLE); if (entriesRequiringResourceLocks != null) { - // One entry at a time to avoid over comitting + // One entry at a time to avoid blocking too much tryToStealWork(entriesRequiringResourceLocks.get(0), BlockingMode.BLOCKING); } } From 57fffab8efde392ed2cd0c20785dd5f72b624050 Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Mon, 20 Oct 2025 16:33:35 +0200 Subject: [PATCH 07/10] Extract RejectedExecutionHandler to avoid control flow by exception --- ...urrentHierarchicalTestExecutorService.java | 66 +++++++++++++------ 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 5f645f589109..3d634a0df9e1 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -32,6 +32,7 @@ import java.util.concurrent.Future; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -47,6 +48,7 @@ import org.junit.platform.commons.logging.LoggerFactory; import org.junit.platform.commons.util.ClassLoaderUtils; import org.junit.platform.commons.util.Preconditions; +import org.junit.platform.commons.util.ToStringBuilder; import org.junit.platform.engine.ConfigurationParameters; /** @@ -72,10 +74,12 @@ public ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration ConcurrentHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration, ClassLoader classLoader) { ThreadFactory threadFactory = new WorkerThreadFactory(classLoader); - threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(), - configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory); parallelism = configuration.getParallelism(); workerLeaseManager = new WorkerLeaseManager(parallelism, this::maybeStartWorker); + var rejectedExecutionHandler = new LeaseAwareRejectedExecutionHandler(workerLeaseManager); + threadPool = new ThreadPoolExecutor(configuration.getCorePoolSize(), configuration.getMaxPoolSize(), + configuration.getKeepAliveSeconds(), SECONDS, new SynchronousQueue<>(), threadFactory, + rejectedExecutionHandler); LOGGER.trace(() -> "initialized thread pool for parallelism of " + configuration.getParallelism()); } @@ -142,26 +146,25 @@ private void maybeStartWorker(BooleanSupplier doneCondition) { if (workerLease == null) { return; } - try { - threadPool.execute(() -> { - LOGGER.trace(() -> "starting worker"); - try { - WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition); - } - finally { - workerLease.release(false); - LOGGER.trace(() -> "stopping worker"); - } - maybeStartWorker(doneCondition); - }); - } - catch (RejectedExecutionException e) { - workerLease.release(false); - if (threadPool.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) { - return; + threadPool.execute(new RunLeaseAwareWorker(workerLease, + () -> WorkerThread.getOrThrow().processQueueEntries(workerLease, doneCondition), + () -> this.maybeStartWorker(doneCondition))); + } + + private record RunLeaseAwareWorker(WorkerLease workerLease, Runnable worker, Runnable onWorkerFinished) + implements Runnable { + + @Override + public void run() { + LOGGER.trace(() -> "starting worker"); + try { + worker.run(); } - LOGGER.error(e, () -> "failed to submit worker to thread pool"); - throw e; + finally { + workerLease.release(false); + LOGGER.trace(() -> "stopping worker"); + } + onWorkerFinished.run(); } } @@ -669,6 +672,12 @@ void reacquire() throws InterruptedException { LOGGER.trace(() -> "reacquired worker lease (available: %d)".formatted(semaphore.availablePermits())); } } + + @Override + public String toString() { + return new ToStringBuilder(this).append("parallelism", parallelism).append("semaphore", + semaphore).toString(); + } } static class WorkerLease implements AutoCloseable { @@ -705,4 +714,19 @@ void reacquire() throws InterruptedException { reacquisitionToken = null; } } + + private record LeaseAwareRejectedExecutionHandler(WorkerLeaseManager workerLeaseManager) + implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (!(r instanceof RunLeaseAwareWorker worker)) { + return; + } + worker.workerLease.release(false); + if (executor.isShutdown() || workerLeaseManager.isAtLeastOneLeaseTaken()) { + return; + } + throw new RejectedExecutionException("Task with " + workerLeaseManager + " rejected from " + executor); + } + } } From 1ec304bd56b448e5652c57c27446de92dfbbe02c Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Tue, 21 Oct 2025 03:47:10 +0200 Subject: [PATCH 08/10] Use ConcurrentSkipListSet with absolute ordering to back work queue --- ...urrentHierarchicalTestExecutorService.java | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 3d634a0df9e1..05d1f93a1e6b 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -23,14 +23,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.Semaphore; @@ -225,17 +226,16 @@ void processQueueEntries(WorkerLease workerLease, BooleanSupplier doneCondition) LOGGER.trace(() -> "yielding resource lock"); break; } - var queueEntries = workQueue.peekAll(); - if (queueEntries.isEmpty()) { + if (workQueue.isEmpty()) { LOGGER.trace(() -> "no queue entries available"); break; } - processQueueEntries(queueEntries); + processQueueEntries(); } } - private void processQueueEntries(List queueEntries) { - var queueEntriesByResult = tryToStealWorkWithoutBlocking(queueEntries); + private void processQueueEntries() { + var queueEntriesByResult = tryToStealWorkWithoutBlocking(workQueue); var queueModified = queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_THIS_WORKER) // || queueEntriesByResult.containsKey(WorkStealResult.EXECUTED_BY_DIFFERENT_WORKER); if (queueModified) { @@ -288,7 +288,6 @@ void invokeAll(List testTasks) { private List forkConcurrentChildren(List children, Consumer isolatedTaskCollector, List sameThreadTasks) { - int index = 0; List queueEntries = new ArrayList<>(children.size()); for (TestTask child : children) { if (requiresGlobalReadWriteLock(child)) { @@ -298,7 +297,7 @@ else if (child.getExecutionMode() == SAME_THREAD) { sameThreadTasks.add(child); } else { - queueEntries.add(WorkQueue.Entry.createWithIndex(child, index++)); + queueEntries.add(workQueue.createEntry(child)); } } @@ -316,12 +315,10 @@ else if (child.getExecutionMode() == SAME_THREAD) { } private Map> tryToStealWorkWithoutBlocking( - List queueEntries) { + Iterable queueEntries) { Map> queueEntriesByResult = new EnumMap<>(WorkStealResult.class); - if (!queueEntries.isEmpty()) { - tryToStealWork(queueEntries, BlockingMode.NON_BLOCKING, queueEntriesByResult); - } + tryToStealWork(queueEntries, BlockingMode.NON_BLOCKING, queueEntriesByResult); return queueEntriesByResult; } @@ -333,7 +330,7 @@ private void tryToStealWorkWithBlocking(Map entries, BlockingMode blocking, + private void tryToStealWork(Iterable entries, BlockingMode blocking, Map> queueEntriesByResult) { for (var entry : entries) { var state = tryToStealWork(entry, blocking); @@ -539,16 +536,21 @@ private enum BlockingMode { NON_BLOCKING, BLOCKING } - private static class WorkQueue { - - private final Queue queue = new PriorityBlockingQueue<>(); + private static class WorkQueue implements Iterable { + private final AtomicInteger index = new AtomicInteger(); + private final Set queue = new ConcurrentSkipListSet<>(); Entry add(TestTask task) { - Entry entry = Entry.create(task); + Entry entry = createEntry(task); LOGGER.trace(() -> "forking: " + entry.task); return doAdd(entry); } + Entry createEntry(TestTask task) { + int level = task.getTestDescriptor().getUniqueId().getSegments().size(); + return new Entry(task, new CompletableFuture<>(), level, index.getAndIncrement()); + } + void addAll(Collection entries) { entries.forEach(this::doAdd); } @@ -566,13 +568,6 @@ private Entry doAdd(Entry entry) { return entry; } - private List peekAll() { - List entries = new ArrayList<>(queue); - // Iteration order isn't the same as queue order. - entries.sort(naturalOrder()); - return entries; - } - boolean remove(Entry entry) { return queue.remove(entry); } @@ -581,18 +576,14 @@ boolean isEmpty() { return queue.isEmpty(); } + @Override + public Iterator iterator() { + return queue.iterator(); + } + private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, int index) implements Comparable { - static Entry create(TestTask task) { - return createWithIndex(task, 0); - } - - static Entry createWithIndex(TestTask task, int index) { - int level = task.getTestDescriptor().getUniqueId().getSegments().size(); - return new Entry(task, new CompletableFuture<>(), level, index); - } - @SuppressWarnings("FutureReturnValueIgnored") Entry { future.whenComplete((__, t) -> { From 63ace0977f6dc786e4ce185545021cf42d54b1b3 Mon Sep 17 00:00:00 2001 From: "M.P. Korstanje" Date: Tue, 21 Oct 2025 03:57:33 +0200 Subject: [PATCH 09/10] Use long index, because containers are int size --- .../ConcurrentHierarchicalTestExecutorService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index 05d1f93a1e6b..c7cc3442fa0f 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -39,6 +39,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -537,7 +538,7 @@ private enum BlockingMode { } private static class WorkQueue implements Iterable { - private final AtomicInteger index = new AtomicInteger(); + private final AtomicLong index = new AtomicLong(); private final Set queue = new ConcurrentSkipListSet<>(); Entry add(TestTask task) { @@ -581,7 +582,7 @@ public Iterator iterator() { return queue.iterator(); } - private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, int index) + private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, long index) implements Comparable { @SuppressWarnings("FutureReturnValueIgnored") @@ -606,7 +607,7 @@ public int compareTo(Entry that) { if (result != 0) { return result; } - return Integer.compare(that.index, this.index); + return Long.compare(that.index, this.index); } private boolean isContainer() { From bb118122bf26febf3190f1a396220c97eb98b930 Mon Sep 17 00:00:00 2001 From: Marc Philipp Date: Fri, 24 Oct 2025 17:02:53 +0200 Subject: [PATCH 10/10] Polishing --- .../ConcurrentHierarchicalTestExecutorService.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java index c7cc3442fa0f..cfb340758061 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ConcurrentHierarchicalTestExecutorService.java @@ -153,14 +153,14 @@ private void maybeStartWorker(BooleanSupplier doneCondition) { () -> this.maybeStartWorker(doneCondition))); } - private record RunLeaseAwareWorker(WorkerLease workerLease, Runnable worker, Runnable onWorkerFinished) + private record RunLeaseAwareWorker(WorkerLease workerLease, Runnable work, Runnable onWorkerFinished) implements Runnable { @Override public void run() { LOGGER.trace(() -> "starting worker"); try { - worker.run(); + work.run(); } finally { workerLease.release(false); @@ -667,8 +667,10 @@ void reacquire() throws InterruptedException { @Override public String toString() { - return new ToStringBuilder(this).append("parallelism", parallelism).append("semaphore", - semaphore).toString(); + return new ToStringBuilder(this) // + .append("parallelism", parallelism) // + .append("semaphore", semaphore) // + .toString(); } }