From 2d5eaadb75cbfa77d3e2e84d037b8133ba62f46d Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 16 Sep 2024 09:11:11 +0200 Subject: [PATCH] Rewrite max parallelism tests (#941) --- .../pivovarit/collectors/FunctionalTest.java | 20 ----------------- .../collectors/test/BasicParallelismTest.java | 22 ++++++++++++++++++- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index a9ab4474..dee31364 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -174,7 +174,6 @@ private static > Stream tests(Collect shouldInterruptOnException(collector, name) ); - tests = limitedParallelism ? of(shouldRespectParallelism(collector, name)) : tests; tests = limitedParallelism ? of(shouldRejectInvalidParallelism(collector, name)) : tests; return tests; @@ -190,7 +189,6 @@ private static > Stream virtualThread private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return of( shouldStartConsumingImmediately(collector, name), - shouldRespectParallelism(collector, name), shouldPushElementsToStreamAsSoonAsPossible(collector, name), shouldShortCircuitOnException(collector, name), shouldRejectInvalidParallelism(collector, name) @@ -209,24 +207,6 @@ private static > Stream batchStreamin of(shouldProcessOnNThreadsETParallelism(collector, name))); } - private static > DynamicTest shouldRespectParallelism(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return dynamicTest(format("%s: should respect parallelism", name), () -> { - int parallelism = 2; - int delayMillis = 50; - withExecutor(e -> { - LocalTime before = LocalTime.now(); - Stream.generate(() -> 42) - .limit(4) - .collect(collector.apply(i -> returnWithDelay(i, ofMillis(delayMillis)), e, parallelism)) - .join(); - - LocalTime after = LocalTime.now(); - assertThat(Duration.between(before, after)) - .isGreaterThanOrEqualTo(ofMillis(delayMillis * parallelism)); - }); - }); - } - private static > DynamicTest shouldPushElementsToStreamAsSoonAsPossible(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> { int parallelism = 2; diff --git a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java index c8ec9dc4..a3ef1d0d 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java @@ -1,14 +1,18 @@ package com.pivovarit.collectors.test; import com.pivovarit.collectors.ParallelCollectors; +import com.pivovarit.collectors.TestUtils; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.TestFactory; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -22,7 +26,8 @@ class BasicParallelismTest { private static Stream> allBounded() { return Stream.of( - collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join().toList())), + collector("parallel(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, e(), p), c -> c.join() + .toList())), collector("parallel(toList(), e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p), CompletableFuture::join)), collector("parallel(toList(), e, p) [batching]", (f, p) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p), CompletableFuture::join)), collector("parallelToStream(e, p)", (f, p) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p), Stream::toList)), @@ -52,6 +57,15 @@ Stream shouldProcessAllElementsWithMaxParallelism() { }))); } + @TestFactory + Stream shouldRespectMaxParallelism() { + return allBounded() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var duration = timed(() -> IntStream.range(0, 10).boxed().collect(c.factory().collector(i -> TestUtils.returnWithDelay(i, Duration.ofMillis(100)), 2))); + assertThat(duration).isCloseTo(Duration.ofMillis(500), Duration.ofMillis(100)); + })); + } + protected record CollectorDefinition(String name, CollectorFactory factory) { static CollectorDefinition collector(String name, CollectorFactory collector) { return new CollectorDefinition<>(name, collector); @@ -66,4 +80,10 @@ private interface CollectorFactory { private static Executor e() { return Executors.newCachedThreadPool(); } + + private static Duration timed(Supplier action) { + long start = System.currentTimeMillis(); + var result = action.get(); + return Duration.ofMillis(System.currentTimeMillis() - start); + } }