Skip to content

Commit

Permalink
Rewrite non-blocking tests (#930)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 15, 2024
1 parent 5bdffa2 commit 1dae395
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
15 changes: 0 additions & 15 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {
private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
var tests = of(
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
Expand All @@ -181,7 +180,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread
private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) {
var tests = of(
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
Expand All @@ -200,7 +198,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
var tests = of(
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldRemainConsistent(collector, name)
Expand All @@ -214,7 +211,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread
private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
var tests = of(
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldRespectParallelism(collector, name),
shouldPushElementsToStreamAsSoonAsPossible(collector, name),
shouldHandleThrowable(collector, name),
Expand Down Expand Up @@ -242,16 +238,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> batchStreamin
of(shouldProcessOnNThreadsETParallelism(collector, name)));
}

private static <R extends Collection<Integer>> DynamicTest shouldNotBlockTheCallingThread(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> c, String name) {
return dynamicTest(format("%s: should not block when returning future", name), () -> {
withExecutor(e -> {
assertTimeoutPreemptively(ofMillis(100), () ->
Stream.<Integer>empty().collect(c
.apply(i -> returnWithDelay(42, ofMillis(Integer.MAX_VALUE)), e, 1)), "returned blocking future");
});
});
}

private static <R extends Collection<Integer>> DynamicTest shouldRespectParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should respect parallelism", name), () -> {
int parallelism = 2;
Expand All @@ -274,7 +260,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldPushElementsToS
return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> {
int parallelism = 2;
int delayMillis = 50;
var counter = new AtomicInteger();
withExecutor(e -> {
LocalTime before = LocalTime.now();
Stream.generate(() -> 42)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<
}

@FunctionalInterface
interface CollectorFactory<T, R> {
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Integer p);
}

Expand Down
61 changes: 61 additions & 0 deletions src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.test.NonBlockingTest.CollectorDefinition.collector;
import static java.time.Duration.ofDays;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

class NonBlockingTest {

private static Stream<CollectorDefinition<Integer, Integer>> allAsync() {
return Stream.of(
collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.thenApply(Stream::toList))),
collector("parallel(toList())", f -> ParallelCollectors.parallel(f, toList())),
collector("parallel(toList(), e)", f -> ParallelCollectors.parallel(f, toList(), e())),
collector("parallel(toList(), e, p=1)", f -> ParallelCollectors.parallel(f, toList(), e(), 1)),
collector("parallel(toList(), e, p=2)", f -> ParallelCollectors.parallel(f, toList(), e(), 2)),
collector("parallel(toList(), e, p=1) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 1)),
collector("parallel(toList(), e, p=2) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 2))
);
}

@TestFactory
Stream<DynamicTest> shouldNotBlockTheCallingThread() {
return allAsync()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
assertTimeoutPreemptively(Duration.ofMillis(100), () -> {
var __ = Stream.of(1, 2, 3, 4).collect(c.factory().collector(i -> returnWithDelay(i, ofDays(1))));
});
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, CompletableFuture<List<R>>> collector(Function<T, R> f);
}

private static Executor e() {
return Executors.newCachedThreadPool();
}
}

0 comments on commit 1dae395

Please sign in to comment.