diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index 7f84818e..ced46886 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -161,7 +161,6 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of( - shouldStartConsumingImmediately(collector, name), shouldShortCircuitOnException(collector, name), shouldInterruptOnException(collector, name) ); @@ -169,7 +168,6 @@ private static > Stream virtualThread private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of( - shouldStartConsumingImmediately(collector, name), shouldShortCircuitOnException(collector, name), shouldInterruptOnException(collector, name) ); @@ -177,14 +175,12 @@ private static > Stream tests(Collect private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of( - shouldStartConsumingImmediately(collector, name), shouldShortCircuitOnException(collector, name) ); } private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of( - shouldStartConsumingImmediately(collector, name), shouldPushElementsToStreamAsSoonAsPossible(collector, name), shouldShortCircuitOnException(collector, name) ); @@ -256,23 +252,6 @@ private static > DynamicTest shouldShortCircuitOnE }); } - private static > DynamicTest shouldStartConsumingImmediately(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return dynamicTest(format("%s: should start consuming immediately", name), () -> { - try (var e = Executors.newCachedThreadPool()) { - var counter = new AtomicInteger(); - - Stream.iterate(0, i -> returnWithDelay(i + 1, ofMillis(100))) - .limit(2) - .collect(collector.apply(i -> counter.incrementAndGet(), e, PARALLELISM)); - - await() - .pollInterval(ofMillis(10)) - .atMost(50, MILLISECONDS) - .until(() -> counter.get() > 0); - } - }); - } - private static > DynamicTest shouldInterruptOnException(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return dynamicTest(format("%s: should interrupt on exception", name), () -> { AtomicLong counter = new AtomicLong(); diff --git a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java index 2b0ac70b..0a6b24c4 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java @@ -26,8 +26,7 @@ class BasicParallelismTest { private static Stream> allBounded() { return Stream.of( - collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join() - .toList())), + collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())), collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)), collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)), diff --git a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java index 38d54a7b..55d5d813 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java @@ -8,15 +8,20 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; +import static com.pivovarit.collectors.TestUtils.returnWithDelay; import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector; +import static java.time.Duration.ofSeconds; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.awaitility.Awaitility.await; class BasicProcessingTest { @@ -84,6 +89,25 @@ Stream shouldProcessAllElementsInOrder() { })); } + @TestFactory + Stream shouldStartProcessingImmediately() { + return all() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var counter = new AtomicInteger(); + + Thread.startVirtualThread(() -> { + Stream.iterate(0, i -> i + 1) + .limit(100) + .collect(c.collector().apply(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1)))); + }); + + await() + .pollInterval(1, MILLISECONDS) + .atMost(500, MILLISECONDS) + .until(() -> counter.get() > 0); + })); + } + record CollectorDefinition(String name, Function, Collector>> collector) { static CollectorDefinition collector(String name, Function, Collector>> collector) { return new CollectorDefinition<>(name, collector);