Skip to content

Commit

Permalink
Remove redundant consistency tests (#940)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent b57986f commit 3defaa4
Showing 1 changed file with 3 additions and 41 deletions.
44 changes: 3 additions & 41 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.time.Duration;
import java.time.LocalTime;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
Expand All @@ -17,7 +16,6 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -165,17 +163,15 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread
return of(
shouldStartConsumingImmediately(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
shouldRemainConsistent(collector, name)
shouldInterruptOnException(collector, name)
);
}

private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean limitedParallelism) {
var tests = of(
shouldStartConsumingImmediately(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
shouldRemainConsistent(collector, name)
shouldInterruptOnException(collector, name)
);

tests = limitedParallelism ? of(shouldRespectParallelism(collector, name)) : tests;
Expand All @@ -187,8 +183,7 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldShortCircuitOnException(collector, name),
shouldRemainConsistent(collector, name)
shouldShortCircuitOnException(collector, name)
);
}

Expand All @@ -198,7 +193,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTest
shouldRespectParallelism(collector, name),
shouldPushElementsToStreamAsSoonAsPossible(collector, name),
shouldShortCircuitOnException(collector, name),
shouldRemainConsistent(collector, name),
shouldRejectInvalidParallelism(collector, name)
);
}
Expand Down Expand Up @@ -289,38 +283,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldShortCircuitOnE
});
}

private static <R extends Collection<Integer>> DynamicTest shouldRemainConsistent(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should remain consistent", name), () -> {
int parallelism = 100;

ExecutorService executor = Executors.newFixedThreadPool(parallelism);

try {
List<Integer> elements = IntStream.range(0, parallelism).boxed().toList();

CountDownLatch countDownLatch = new CountDownLatch(parallelism);

R result = elements.stream()
.collect(collector.apply(i -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i;
}, executor, PARALLELISM))
.join();

assertThat(new HashSet<>(result))
.hasSameSizeAs(elements)
.containsAll(elements);
} finally {
executor.shutdownNow();
}
});
}

private static <R extends Collection<Integer>> DynamicTest shouldRejectInvalidParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should reject invalid parallelism", name), () -> {
withExecutor(e -> {
Expand Down

0 comments on commit 3defaa4

Please sign in to comment.