diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index 4b22acca..2e5658a9 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -5,8 +5,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; -import java.time.Duration; -import java.time.LocalTime; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -24,7 +22,6 @@ import java.util.stream.Stream; import static com.pivovarit.collectors.ParallelCollectors.parallel; -import static com.pivovarit.collectors.ParallelCollectors.parallelToOrderedStream; import static com.pivovarit.collectors.ParallelCollectors.parallelToStream; import static com.pivovarit.collectors.TestUtils.returnWithDelay; import static com.pivovarit.collectors.TestUtils.withExecutor; @@ -59,15 +56,6 @@ Stream collectors() { ).flatMap(i -> i); } - @TestFactory - Stream streaming_collectors() { - return of( - // platform threads - streamingTests((m, e, p) -> adaptAsync(parallelToStream(m, e, p)), format("ParallelCollectors.parallelToStream(p=%d)", PARALLELISM)), - streamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m, e, p)), format("ParallelCollectors.parallelToOrderedStream(p=%d)", PARALLELISM)) - ).flatMap(i -> i); - } - @TestFactory Stream streaming_batching_collectors() { return of( @@ -133,36 +121,12 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { } } - private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of(shouldPushElementsToStreamAsSoonAsPossible(collector, name)); - } - private static > Stream batchTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of(shouldProcessOnNThreadsETParallelism(collector, name)); } private static > Stream batchStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return Stream.concat( - streamingTests(collector, name), - of(shouldProcessOnNThreadsETParallelism(collector, name))); - } - - private static > DynamicTest shouldPushElementsToStreamAsSoonAsPossible(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> { - int parallelism = 2; - int delayMillis = 50; - withExecutor(e -> { - LocalTime before = LocalTime.now(); - Stream.generate(() -> 42) - .limit(100) - .collect(collector.apply(i -> returnWithDelay(i, ofMillis(delayMillis)), e, parallelism)) - .join(); - - LocalTime after = LocalTime.now(); - assertThat(Duration.between(before, after)) - .isGreaterThanOrEqualTo(ofMillis(delayMillis * parallelism)); - }); - }); + return of(shouldProcessOnNThreadsETParallelism(collector, name)); } private static > DynamicTest shouldProcessOnNThreadsETParallelism(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { diff --git a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java new file mode 100644 index 00000000..9f016da1 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java @@ -0,0 +1,70 @@ +package com.pivovarit.collectors.test; + +import com.pivovarit.collectors.ParallelCollectors; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.TestUtils.returnWithDelay; +import static com.pivovarit.collectors.test.StreamingTest.CollectorDefinition.collector; +import static java.time.Duration.ofSeconds; +import static org.awaitility.Awaitility.await; + +class StreamingTest { + + private static Stream> allStreaming() { + return Stream.of( + collector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)), + collector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())), + collector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())), + collector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)), + collector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())), + collector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p())) + ); + } + + @TestFactory + Stream shouldPushElementsAsSoonAsTheyAreReady() { + return allStreaming() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var counter = new AtomicInteger(); + Thread.startVirtualThread(() -> { + Stream.concat(Stream.of(0), IntStream.range(1, 10).boxed()) + .collect(c.factory().collector(i -> returnWithDelay(i, ofSeconds(i)))) + .forEach(__ -> counter.incrementAndGet()); + }); + + await() + .pollInterval(Duration.ofMillis(10)) + .atMost(Duration.ofMillis(100)) + .until(() -> counter.get() > 0); + })); + } + + protected record CollectorDefinition(String name, CollectorFactory factory) { + static CollectorDefinition collector(String name, CollectorFactory collector) { + return new CollectorDefinition<>(name, collector); + } + } + + @FunctionalInterface + private interface CollectorFactory { + Collector> collector(Function f); + } + + private static Executor e() { + return Executors.newCachedThreadPool(); + } + + private static int p() { + return 4; + } +}