Skip to content

Commit

Permalink
Rewrite immediate consumption tests (#943)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent e8aeec7 commit fd91254
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 23 deletions.
21 changes: 0 additions & 21 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,30 +161,26 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name)
);
}

private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name)
);
}

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldShortCircuitOnException(collector, name)
);
}

private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldPushElementsToStreamAsSoonAsPossible(collector, name),
shouldShortCircuitOnException(collector, name)
);
Expand Down Expand Up @@ -256,23 +252,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldShortCircuitOnE
});
}

private static <R extends Collection<Integer>> DynamicTest shouldStartConsumingImmediately(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should start consuming immediately", name), () -> {
try (var e = Executors.newCachedThreadPool()) {
var counter = new AtomicInteger();

Stream.iterate(0, i -> returnWithDelay(i + 1, ofMillis(100)))
.limit(2)
.collect(collector.apply(i -> counter.incrementAndGet(), e, PARALLELISM));

await()
.pollInterval(ofMillis(10))
.atMost(50, MILLISECONDS)
.until(() -> counter.get() > 0);
}
});
}

private static <R extends Collection<Integer>> DynamicTest shouldInterruptOnException(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should interrupt on exception", name), () -> {
AtomicLong counter = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ class BasicParallelismTest {

private static Stream<CollectorDefinition<Integer, Integer>> allBounded() {
return Stream.of(
collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join()
.toList())),
collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())),
collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)),
collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.pivovarit.collectors.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.awaitility.Awaitility.await;

class BasicProcessingTest {

Expand Down Expand Up @@ -84,6 +89,25 @@ Stream<DynamicTest> shouldProcessAllElementsInOrder() {
}));
}

@TestFactory
Stream<DynamicTest> shouldStartProcessingImmediately() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var counter = new AtomicInteger();

Thread.startVirtualThread(() -> {
Stream.iterate(0, i -> i + 1)
.limit(100)
.collect(c.collector().apply(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1))));
});

await()
.pollInterval(1, MILLISECONDS)
.atMost(500, MILLISECONDS)
.until(() -> counter.get() > 0);
}));
}

record CollectorDefinition<T, R>(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
return new CollectorDefinition<>(name, collector);
Expand Down

0 comments on commit fd91254

Please sign in to comment.