From 9fbf2cd294702072b6e9e430e87366011d8d2139 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 16 Sep 2024 14:11:46 +0200 Subject: [PATCH] Unify Collector factories used in tests (#949) --- .../collectors/test/BasicParallelismTest.java | 9 +---- .../collectors/test/BasicProcessingTest.java | 14 +++---- .../collectors/test/BatchingTest.java | 9 +---- .../test/ExecutorPollutionTest.java | 2 +- .../test/ExecutorValidationTest.java | 13 ++---- .../{Collectors.java => test/Factory.java} | 40 ++++++++++++++++--- .../collectors/test/NonBlockingTest.java | 9 +---- .../test/RejectedExecutionHandlingTest.java | 18 +++------ .../collectors/test/StreamingTest.java | 9 +---- 9 files changed, 59 insertions(+), 64 deletions(-) rename src/test/java/com/pivovarit/collectors/{Collectors.java => test/Factory.java} (54%) diff --git a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java index 0a6b24c4..9f9f6a7e 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java @@ -76,17 +76,12 @@ Stream shouldRejectInvalidParallelism() { }))); } - protected record CollectorDefinition(String name, CollectorFactory factory) { - static CollectorDefinition collector(String name, CollectorFactory collector) { + protected record CollectorDefinition(String name, Factory.CollectorFactoryWithParallelism factory) { + static CollectorDefinition collector(String name, Factory.CollectorFactoryWithParallelism collector) { return new CollectorDefinition<>(name, collector); } } - @FunctionalInterface - private interface CollectorFactory { - Collector> collector(Function f, Integer p); - } - private static Executor e() { return Executors.newCachedThreadPool(); } diff --git a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java index 97d9adf1..4bd1dfb8 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java @@ -69,7 +69,7 @@ public static Stream> allOrdered() { Stream shouldProcessEmpty() { return all() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { - assertThat(Stream.empty().collect(c.collector().apply(i -> i))).isEmpty(); + assertThat(Stream.empty().collect(c.collector().collector(i -> i))).isEmpty(); })); } @@ -78,7 +78,7 @@ Stream shouldProcessAllElements() { return all() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { var list = IntStream.range(0, 100).boxed().toList(); - List result = list.stream().collect(c.collector().apply(i -> i)); + List result = list.stream().collect(c.collector().collector(i -> i)); assertThat(result).containsExactlyInAnyOrderElementsOf(list); })); } @@ -88,7 +88,7 @@ Stream shouldProcessAllElementsInOrder() { return allOrdered() .map(c -> DynamicTest.dynamicTest(c.name(), () -> { var list = IntStream.range(0, 100).boxed().toList(); - List result = list.stream().collect(c.collector().apply(i -> i)); + List result = list.stream().collect(c.collector().collector(i -> i)); assertThat(result).containsAnyElementsOf(list); })); } @@ -102,7 +102,7 @@ Stream shouldStartProcessingImmediately() { Thread.startVirtualThread(() -> { Stream.iterate(0, i -> i + 1) .limit(100) - .collect(c.collector().apply(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1)))); + .collect(c.collector().collector(i -> returnWithDelay(counter.incrementAndGet(), ofSeconds(1)))); }); await() @@ -121,7 +121,7 @@ Stream shouldInterruptOnException() { var latch = new CountDownLatch(size); assertThatThrownBy(() -> IntStream.range(0, size).boxed() - .collect(c.collector().apply(i -> { + .collect(c.collector().collector(i -> { try { latch.countDown(); latch.await(); @@ -140,8 +140,8 @@ Stream shouldInterruptOnException() { })); } - record CollectorDefinition(String name, Function, Collector>> collector) { - static CollectorDefinition collector(String name, Function, Collector>> collector) { + record CollectorDefinition(String name, Factory.CollectorFactory collector) { + static CollectorDefinition collector(String name, Factory.CollectorFactory collector) { return new CollectorDefinition<>(name, collector); } } diff --git a/src/test/java/com/pivovarit/collectors/test/BatchingTest.java b/src/test/java/com/pivovarit/collectors/test/BatchingTest.java index 5b41393a..23be552d 100644 --- a/src/test/java/com/pivovarit/collectors/test/BatchingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BatchingTest.java @@ -46,13 +46,8 @@ Stream shouldProcessOnExactlyNThreads() { })); } - @FunctionalInterface - private interface CollectorFactory { - Collector> collector(Function f, Integer p); - } - - record CollectorDefinition(String name, CollectorFactory collector) { - static CollectorDefinition collector(String name, CollectorFactory collector) { + record CollectorDefinition(String name, Factory.CollectorFactoryWithParallelism collector) { + static CollectorDefinition collector(String name, Factory.CollectorFactoryWithParallelism collector) { return new CollectorDefinition<>(name, collector); } } diff --git a/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java b/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java index f423dcae..75d0d83f 100644 --- a/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java +++ b/src/test/java/com/pivovarit/collectors/test/ExecutorPollutionTest.java @@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import static com.pivovarit.collectors.Collectors.boundedCollectors; +import static com.pivovarit.collectors.test.Factory.boundedCollectors; class ExecutorPollutionTest { diff --git a/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java b/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java index cf9ce031..68b4d992 100644 --- a/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java +++ b/src/test/java/com/pivovarit/collectors/test/ExecutorValidationTest.java @@ -4,13 +4,9 @@ import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; -import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collector; import java.util.stream.Stream; import static com.pivovarit.collectors.test.ExecutorValidationTest.CollectorDefinition.collector; @@ -49,14 +45,11 @@ Stream shouldRejectInvalidRejectedExecutionHandlerFactory() { }))); } - protected record CollectorDefinition(String name, CollectorFactory factory) { - static CollectorDefinition collector(String name, CollectorFactory factory) { + protected record CollectorDefinition(String name, Factory.CollectorFactoryWithExecutor factory) { + static CollectorDefinition collector(String name, Factory.CollectorFactoryWithExecutor factory) { return new CollectorDefinition<>(name, factory); } } - @FunctionalInterface - private interface CollectorFactory { - Collector> collector(Function f, Executor executor); - } + } diff --git a/src/test/java/com/pivovarit/collectors/Collectors.java b/src/test/java/com/pivovarit/collectors/test/Factory.java similarity index 54% rename from src/test/java/com/pivovarit/collectors/Collectors.java rename to src/test/java/com/pivovarit/collectors/test/Factory.java index 7191d728..4f40bc2a 100644 --- a/src/test/java/com/pivovarit/collectors/Collectors.java +++ b/src/test/java/com/pivovarit/collectors/test/Factory.java @@ -1,6 +1,10 @@ -package com.pivovarit.collectors; +package com.pivovarit.collectors.test; +import com.pivovarit.collectors.ParallelCollectors; + +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collector; @@ -9,13 +13,13 @@ import static com.pivovarit.collectors.ParallelCollectors.Batching.parallel; import static java.util.stream.Collectors.toList; -public final class Collectors { +public final class Factory { - private Collectors() { + private Factory() { throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); } - public static Stream>> boundedCollectors() { + public static Stream>> boundedCollectors() { return Stream.of( Map.entry("parallel()", (f, e, p) -> ParallelCollectors.parallel(f, e, p)), Map.entry("parallel(toList())", (f, e, p) -> ParallelCollectors.parallel(f, toList(), e, p)), @@ -27,7 +31,33 @@ public static Stream>> bounde Map.entry("parallelToOrderedStream() (batching)", (f, e, p) -> ParallelCollectors.Batching.parallelToOrderedStream(f, e, p))); } - public interface BoundedCollectorFactory { + @FunctionalInterface + interface CollectorFactoryWithParallelismAndExecutor { Collector apply(Function function, Executor executorService, int parallelism); } + + @FunctionalInterface + interface CollectorFactoryWithExecutor { + Collector> collector(Function f, Executor executor); + } + + @FunctionalInterface + interface CollectorFactoryWithParallelism { + Collector> collector(Function f, Integer p); + } + + @FunctionalInterface + interface CollectorFactory { + Collector> collector(Function f); + } + + @FunctionalInterface + interface StreamingCollectorFactory { + Collector> collector(Function f); + } + + @FunctionalInterface + interface AsyncCollectorFactory { + Collector>> collector(Function f); + } } diff --git a/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java b/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java index 5f1757f5..3ababe50 100644 --- a/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java @@ -44,17 +44,12 @@ Stream shouldNotBlockTheCallingThread() { })); } - protected record CollectorDefinition(String name, CollectorFactory factory) { - static CollectorDefinition collector(String name, CollectorFactory collector) { + protected record CollectorDefinition(String name, Factory.AsyncCollectorFactory factory) { + static CollectorDefinition collector(String name, Factory.AsyncCollectorFactory collector) { return new CollectorDefinition<>(name, collector); } } - @FunctionalInterface - private interface CollectorFactory { - Collector>> collector(Function f); - } - private static Executor e() { return Executors.newCachedThreadPool(); } diff --git a/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java b/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java index 88690557..95daf76e 100644 --- a/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/RejectedExecutionHandlingTest.java @@ -27,12 +27,9 @@ class RejectedExecutionHandlingTest { private static Stream> allWithCustomExecutors() { return Stream.of( - collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList) - .join())), - collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList) - .join())), - collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList) - .join())), + collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList).join())), + collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), + collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList).join())), collector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)), collector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)), collector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)), @@ -78,14 +75,9 @@ Stream shouldRejectInvalidRejectedExecutionHandlerWhenParallelismOn })); } - protected record CollectorDefinition(String name, CollectorFactory factory) { - static CollectorDefinition collector(String name, CollectorFactory factory) { + protected record CollectorDefinition(String name, Factory.CollectorFactoryWithExecutor factory) { + static CollectorDefinition collector(String name, Factory.CollectorFactoryWithExecutor factory) { return new CollectorDefinition<>(name, factory); } } - - @FunctionalInterface - private interface CollectorFactory { - Collector> collector(Function f, Executor executor); - } } diff --git a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java index c6e97614..a08912b5 100644 --- a/src/test/java/com/pivovarit/collectors/test/StreamingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/StreamingTest.java @@ -94,17 +94,12 @@ Stream shouldCollectInOriginalOrder() { })); } - protected record CollectorDefinition(String name, CollectorFactory factory) { - static CollectorDefinition collector(String name, CollectorFactory collector) { + protected record CollectorDefinition(String name, Factory.StreamingCollectorFactory factory) { + static CollectorDefinition collector(String name, Factory.StreamingCollectorFactory collector) { return new CollectorDefinition<>(name, collector); } } - @FunctionalInterface - private interface CollectorFactory { - Collector> collector(Function f); - } - private static Executor e() { return Executors.newCachedThreadPool(); }