From ec1f1ae1d2c2c388e2062c5d3bf7d182a54afa80 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 16 Sep 2024 12:50:20 +0200 Subject: [PATCH] Rewrite completion-order streaming tests (#947) --- .../pivovarit/collectors/FunctionalTest.java | 93 ------------------- .../collectors/test/StreamingTest.java | 45 +++++++++ 2 files changed, 45 insertions(+), 93 deletions(-) diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index 2e5658a9..e3cabfe3 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -2,40 +2,27 @@ import com.pivovarit.collectors.ParallelCollectors.Batching; import org.junit.jupiter.api.DynamicTest; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; import java.util.Collection; import java.util.LinkedList; -import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Stream; -import static com.pivovarit.collectors.ParallelCollectors.parallel; -import static com.pivovarit.collectors.ParallelCollectors.parallelToStream; -import static com.pivovarit.collectors.TestUtils.returnWithDelay; import static com.pivovarit.collectors.TestUtils.withExecutor; import static java.lang.String.format; -import static java.time.Duration.ofMillis; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toCollection; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.of; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.DynamicTest.dynamicTest; /** @@ -68,59 +55,6 @@ Stream streaming_batching_collectors() { ).flatMap(i -> i); } - @Test - void shouldCollectInCompletionOrder() { - // given - try (var executor = threadPoolExecutor(4)) { - List result = of(350, 200, 0, 400) - .collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), executor, 4)) - .limit(2) - .toList(); - - assertThat(result).isSorted(); - } - } - - @Test - void shouldCollectEagerlyInCompletionOrder() { - // given - var executor = threadPoolExecutor(4); - AtomicBoolean result = new AtomicBoolean(false); - CompletableFuture.runAsync(() -> { - of(1, 10000, 1, 0) - .collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), executor, 2)) - .forEach(i -> { - if (i == 1) { - result.set(true); - } - }); - }); - - await() - .atMost(1, SECONDS) - .until(result::get); - } - - @Test - void shouldExecuteEagerlyOnProvidedThreadPool() { - try (var executor = Executors.newFixedThreadPool(2)) { - var countingExecutor = new CountingExecutor(executor); - var executions = new AtomicInteger(); - var list = List.of("A", "B"); - - list.stream() - .collect(parallel(s -> { - executions.incrementAndGet(); - return s; - }, countingExecutor, 1)) - .join() - .forEach(__ -> {}); - - assertThat(countingExecutor.getInvocations()).isEqualTo(1); - assertThat(executions.get()).isEqualTo(2); - } - } - private static > Stream batchTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of(shouldProcessOnNThreadsETParallelism(collector, name)); } @@ -158,35 +92,8 @@ private static > DynamicTest shouldProcessOnNThrea .supplyAsync(stream::toList, Executors.newSingleThreadExecutor())); } - private static ThreadPoolExecutor threadPoolExecutor(int unitsOfWork) { - return new ThreadPoolExecutor(unitsOfWork, unitsOfWork, - 0L, MILLISECONDS, - new LinkedBlockingQueue<>()); - } - @FunctionalInterface interface CollectorSupplier { R apply(T1 t1, T2 t2, T3 t3); } - - private static class CountingExecutor implements Executor { - - private final AtomicInteger counter = new AtomicInteger(); - - private final Executor delegate; - - public CountingExecutor(Executor delegate) { - this.delegate = delegate; - } - - @Override - public void execute(Runnable command) { - counter.incrementAndGet(); - delegate.execute(command); - } - - public Integer getInvocations() { - return counter.get(); - } - } } diff --git a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java index 9f016da1..c6e97614 100644 --- a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.TestFactory; import java.time.Duration; +import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -15,7 +16,10 @@ import static com.pivovarit.collectors.TestUtils.returnWithDelay; import static com.pivovarit.collectors.test.StreamingTest.CollectorDefinition.collector; +import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; +import static java.util.stream.Stream.of; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; class StreamingTest { @@ -31,6 +35,22 @@ private static Stream> allStreaming() { ); } + private static Stream> allCompletionOrderStreaming() { + return Stream.of( + collector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)), + collector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())), + collector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p())) + ); + } + + private static Stream> allOrderedStreaming() { + return Stream.of( + collector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)), + collector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())), + collector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p())) + ); + } + @TestFactory Stream shouldPushElementsAsSoonAsTheyAreReady() { return allStreaming() @@ -49,6 +69,31 @@ Stream shouldPushElementsAsSoonAsTheyAreReady() { })); } + @TestFactory + Stream shouldCollectInCompletionOrder() { + return allCompletionOrderStreaming() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var result = of(300, 200, 0, 400) + .collect(c.factory().collector(i -> returnWithDelay(i, ofMillis(i)))) + .toList(); + + assertThat(result).isSorted(); + })); + } + + @TestFactory + Stream shouldCollectInOriginalOrder() { + return allOrderedStreaming() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var source = List.of(300, 200, 0, 400); + var result = source.stream() + .collect(c.factory().collector(i -> returnWithDelay(i, ofMillis(i)))) + .toList(); + + assertThat(result).containsExactlyElementsOf(source); + })); + } + protected record CollectorDefinition(String name, CollectorFactory factory) { static CollectorDefinition collector(String name, CollectorFactory collector) { return new CollectorDefinition<>(name, collector);