From eb933ab88f3b54a6241d768c1cafb2d29ccb2382 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 29 Apr 2024 09:08:33 +0200 Subject: [PATCH] Add streaming tests for immediate processing start (#871) --- .../pivovarit/collectors/FunctionalTest.java | 20 +++++ .../pivovarit/collectors/ToStreamTest.java | 74 +++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 src/test/java/com/pivovarit/collectors/ToStreamTest.java diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index f3a318bd..dd26d676 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -232,6 +232,7 @@ private static > Stream streamingTest shouldStartConsumingImmediately(collector, name), shouldNotBlockTheCallingThread(collector, name), shouldRespectParallelism(collector, name), + shouldPushElementsToStreamAsSoonAsPossible(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldHandleRejectedExecutionException(collector, name), @@ -293,6 +294,25 @@ private static > DynamicTest shouldRespectParallel }); } + private static > DynamicTest shouldPushElementsToStreamAsSoonAsPossible(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { + return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> { + int parallelism = 2; + int delayMillis = 50; + var counter = new AtomicInteger(); + withExecutor(e -> { + LocalTime before = LocalTime.now(); + Stream.generate(() -> 42) + .limit(100) + .collect(collector.apply(i -> returnWithDelay(i, ofMillis(delayMillis)), e, parallelism)) + .join(); + + LocalTime after = LocalTime.now(); + assertThat(Duration.between(before, after)) + .isGreaterThanOrEqualTo(ofMillis(delayMillis * parallelism)); + }); + }); + } + private static > DynamicTest shouldProcessOnNThreadsETParallelism(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return dynamicTest(format("%s: should batch", name), () -> { int parallelism = 2; diff --git a/src/test/java/com/pivovarit/collectors/ToStreamTest.java b/src/test/java/com/pivovarit/collectors/ToStreamTest.java new file mode 100644 index 00000000..2a221feb --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/ToStreamTest.java @@ -0,0 +1,74 @@ +package com.pivovarit.collectors; + +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.ParallelCollectors.parallelToStream; +import static java.time.Duration.ofSeconds; +import static java.util.stream.Stream.of; +import static org.awaitility.Awaitility.await; + +class ToStreamTest { + + @TestFactory + Stream shouldStartProcessingElementsTests() { + return of( + shouldStartProcessingElements(f -> ParallelCollectors.parallelToStream(f, Executors.newCachedThreadPool(), 2), "parallelToStream, parallelism: 2, os threads"), + shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newVirtualThreadPerTaskExecutor(), 2), "parallelToStream, parallelism: 2, vthreads"), + shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newCachedThreadPool(), 2), "parallelToOrderedStream, parallelism: 2, os threads"), + shouldStartProcessingElements(f -> ParallelCollectors.parallelToOrderedStream(f, Executors.newVirtualThreadPerTaskExecutor(), 2), "parallelToOrderedStream, parallelism: 2, vthreads") + ); + } + + private static DynamicTest shouldStartProcessingElements(Function, Collector>> collector, String name) { + return DynamicTest.dynamicTest(name, () -> { + var counter = new AtomicInteger(); + Thread.ofPlatform().start(() -> { + Stream.iterate(0, i -> i + 1) + .limit(100) + .collect(collector.apply(i -> { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + return i; + })) + .forEach(c -> counter.incrementAndGet()); + }); + await() + .atMost(ofSeconds(1)) + .until(() -> counter.get() > 0); + }); + } + + @Test + void shouldStartProcessingElementsAsSoonAsTheyAreReady() { + var e = Executors.newCachedThreadPool(); + var counter = new AtomicInteger(); + Thread.ofPlatform().start(() -> { + Collector> collector = parallelToStream(i -> { + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + return i; + }, e, 2); + Stream.iterate(0, i -> i + 1) + .limit(100) + .collect(collector) + .forEach(c -> counter.incrementAndGet()); + }); + await() + .atMost(ofSeconds(1)) + .until(() -> counter.get() > 0); + } +}