Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ void invokeAll(List<? extends TestTask> testTasks) {
private List<WorkQueue.Entry> forkConcurrentChildren(List<? extends TestTask> children,
Consumer<TestTask> isolatedTaskCollector, List<TestTask> sameThreadTasks) {

int index = 0;
List<WorkQueue.Entry> queueEntries = new ArrayList<>(children.size());
for (TestTask child : children) {
if (requiresGlobalReadWriteLock(child)) {
Expand All @@ -285,7 +286,7 @@ else if (child.getExecutionMode() == SAME_THREAD) {
sameThreadTasks.add(child);
}
else {
queueEntries.add(WorkQueue.Entry.create(child));
queueEntries.add(WorkQueue.Entry.createWithIndex(child, index++));
}
}

Expand Down Expand Up @@ -570,12 +571,17 @@ boolean isEmpty() {
return queue.isEmpty();
}

private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level)
private record Entry(TestTask task, CompletableFuture<@Nullable Void> future, int level, int index)
implements Comparable<Entry> {

static Entry create(TestTask task) {
int level = task.getTestDescriptor().getUniqueId().getSegments().size();
return new Entry(task, new CompletableFuture<>(), level);
return new Entry(task, new CompletableFuture<>(), level, 0);
}

static Entry createWithIndex(TestTask task, int index) {
int level = task.getTestDescriptor().getUniqueId().getSegments().size();
return new Entry(task, new CompletableFuture<>(), level, index);
}

@SuppressWarnings("FutureReturnValueIgnored")
Expand All @@ -593,10 +599,14 @@ static Entry create(TestTask task) {
@Override
public int compareTo(Entry that) {
var result = Integer.compare(that.level, this.level);
if (result == 0) {
result = Boolean.compare(this.isContainer(), that.isContainer());
if (result != 0) {
return result;
}
return result;
result = Boolean.compare(this.isContainer(), that.isContainer());
if (result != 0) {
return result;
}
return Integer.compare(that.index, this.index);
}

private boolean isContainer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static java.util.concurrent.Future.State.SUCCESS;
import static java.util.function.Predicate.isEqual;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.platform.commons.test.PreconditionAssertions.assertPreconditionViolationFor;
import static org.junit.platform.commons.util.ExceptionUtils.throwAsUncheckedException;
Expand All @@ -32,6 +33,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;
Expand Down Expand Up @@ -230,13 +232,14 @@ void prioritizesChildrenOfStartedContainers() throws Exception {
service = new ConcurrentHierarchicalTestExecutorService(configuration(2));

var leavesStarted = new CountDownLatch(2);
var leaf = new TestTaskStub(ExecutionMode.CONCURRENT, leavesStarted::countDown) //
.withName("leaf").withLevel(3);
var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> requiredService().submit(leaf).get()) //

var child1 = new TestTaskStub(ExecutionMode.CONCURRENT, leavesStarted::await) //
.withName("child1").withLevel(2);
var child2 = new TestTaskStub(ExecutionMode.CONCURRENT, leavesStarted::countDown) //
.withName("child2").withLevel(2);
var child3 = new TestTaskStub(ExecutionMode.SAME_THREAD, leavesStarted::await) //
var leaf = new TestTaskStub(ExecutionMode.CONCURRENT, leavesStarted::countDown) //
.withName("leaf").withLevel(3);
var child3 = new TestTaskStub(ExecutionMode.CONCURRENT, () -> requiredService().submit(leaf).get()) //
.withName("child3").withLevel(2);

var root = new TestTaskStub(ExecutionMode.SAME_THREAD,
Expand All @@ -246,11 +249,11 @@ void prioritizesChildrenOfStartedContainers() throws Exception {
service.submit(root).get();

root.assertExecutedSuccessfully();
assertThat(List.of(child1, child2, child3)).allSatisfy(TestTaskStub::assertExecutedSuccessfully);
assertThat(List.of(child1, child2, leaf, child3)).allSatisfy(TestTaskStub::assertExecutedSuccessfully);
leaf.assertExecutedSuccessfully();

assertThat(leaf.startTime).isBeforeOrEqualTo(child2.startTime);
assertThat(leaf.executionThread).isSameAs(child1.executionThread);
assertThat(leaf.executionThread).isSameAs(child3.executionThread);
}

@Test
Expand Down Expand Up @@ -371,6 +374,84 @@ public void release() {
.containsOnly(child2.executionThread);
}

@Test
void executesChildrenInOrder() throws Exception {
service = new ConcurrentHierarchicalTestExecutorService(configuration(1, 1));

Executable behavior = () -> {

};
var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1a").withLevel(2);
var leaf1b = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1b").withLevel(2);
var leaf1c = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1c").withLevel(2);
var leaf1d = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1d").withLevel(2);

var root = new TestTaskStub(ExecutionMode.SAME_THREAD,
() -> requiredService().invokeAll(List.of(leaf1a, leaf1b, leaf1c, leaf1d))) //
.withName("root").withLevel(1);

service.submit(root).get();

assertThat(List.of(root, leaf1a, leaf1b, leaf1c, leaf1d)) //
.allSatisfy(TestTaskStub::assertExecutedSuccessfully);

assertAll(() -> assertThat(leaf1a.startTime).isBeforeOrEqualTo(leaf1b.startTime),
() -> assertThat(leaf1b.startTime).isBeforeOrEqualTo(leaf1c.startTime),
() -> assertThat(leaf1c.startTime).isBeforeOrEqualTo(leaf1d.startTime));
}

@Test
void workIsStolenInReverseOrder() throws Exception {
service = new ConcurrentHierarchicalTestExecutorService(configuration(2, 2));

// Execute tasks pairwise
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to use that here! 👍

Executable behavior = cyclicBarrier::await;

// With half of the leaves to be executed normally
var leaf1a = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1a").withLevel(2);
var leaf1b = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1b").withLevel(2);
var leaf1c = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf1c").withLevel(2);

// And half of the leaves to be stolen
var leaf2a = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf2a").withLevel(2);
var leaf2b = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf2b").withLevel(2);
var leaf2c = new TestTaskStub(ExecutionMode.CONCURRENT, behavior) //
.withName("leaf2c").withLevel(2);

var root = new TestTaskStub(ExecutionMode.SAME_THREAD,
() -> requiredService().invokeAll(List.of(leaf1a, leaf1b, leaf1c, leaf2a, leaf2b, leaf2c))) //
.withName("root").withLevel(1);

service.submit(root).get();

assertThat(List.of(root, leaf1a, leaf1b, leaf1c, leaf2a, leaf2b, leaf2c)) //
.allSatisfy(TestTaskStub::assertExecutedSuccessfully);

// If the last node was stolen.
assertThat(leaf1a.executionThread).isNotEqualTo(leaf2c.executionThread);
// Then it must follow that the last half of the nodes were stolen
assertThat(Stream.of(leaf1a, leaf1b, leaf1c, leaf2a, leaf2b, leaf2c)).extracting(
TestTaskStub::executionThread).containsExactly( //
leaf1a.executionThread, leaf1a.executionThread, leaf1a.executionThread, //
leaf2c.executionThread, leaf2c.executionThread, leaf2c.executionThread //
);
assertAll( //
() -> assertThat(leaf1a.startTime).isBeforeOrEqualTo(leaf1b.startTime),
() -> assertThat(leaf1b.startTime).isBeforeOrEqualTo(leaf1c.startTime),
() -> assertThat(leaf2a.startTime).isAfterOrEqualTo(leaf2b.startTime),
() -> assertThat(leaf2b.startTime).isAfterOrEqualTo(leaf2c.startTime));
}

private static ExclusiveResource exclusiveResource(LockMode lockMode) {
return new ExclusiveResource("key", lockMode);
}
Expand Down