From a520c4405fc3af7371a4930751a4b2167a317f85 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 16 Sep 2024 12:09:33 +0200 Subject: [PATCH] Rewrite shortcircuiting tests (#945) --- .../pivovarit/collectors/FunctionalTest.java | 61 +------------------ ...onTest.java => ExceptionHandlingTest.java} | 23 ++++++- 2 files changed, 23 insertions(+), 61 deletions(-) rename src/test/java/com/pivovarit/collectors/test/{ExceptionPropagationTest.java => ExceptionHandlingTest.java} (80%) diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index e7e576ec..4b22acca 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -12,7 +12,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -22,15 +21,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collector; -import java.util.stream.IntStream; import java.util.stream.Stream; import static com.pivovarit.collectors.ParallelCollectors.parallel; import static com.pivovarit.collectors.ParallelCollectors.parallelToOrderedStream; import static com.pivovarit.collectors.ParallelCollectors.parallelToStream; -import static com.pivovarit.collectors.TestUtils.incrementAndThrow; import static com.pivovarit.collectors.TestUtils.returnWithDelay; -import static com.pivovarit.collectors.TestUtils.runWithExecutor; import static com.pivovarit.collectors.TestUtils.withExecutor; import static java.lang.String.format; import static java.time.Duration.ofMillis; @@ -42,7 +38,6 @@ import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.of; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.DynamicTest.dynamicTest; @@ -56,22 +51,6 @@ class FunctionalTest { @TestFactory Stream collectors() { return of( - // virtual threads - virtualThreadsTests((m, e, p) -> parallel(m, toList()), "ParallelCollectors.parallel(toList()) [virtual]"), - virtualThreadsTests((m, e, p) -> parallel(m, toList(), p), "ParallelCollectors.parallel(toList()) [virtual]"), - virtualThreadsTests((m, e, p) -> parallel(m, toSet()), "ParallelCollectors.parallel(toSet()) [virtual]"), - virtualThreadsTests((m, e, p) -> parallel(m, toSet(), p), "ParallelCollectors.parallel(toSet()) [virtual]"), - virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new)), "ParallelCollectors.parallel(toCollection()) [virtual]"), - virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new), p), "ParallelCollectors.parallel(toCollection()) [virtual]"), - virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]"), - virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]"), - // platform threads - tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM)), - tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM)), - tests((m, e, p) -> parallel(m, toList(), e), "ParallelCollectors.parallel(toList(), p=inf)"), - tests((m, e, p) -> parallel(m, toSet(), e), "ParallelCollectors.parallel(toSet(), p=inf)"), - tests((m, e, p) -> parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.parallel(toCollection(), p=%d)", PARALLELISM)), - tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM)), // platform threads, with batching batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM)), batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM)), @@ -83,9 +62,6 @@ Stream collectors() { @TestFactory Stream streaming_collectors() { return of( - // virtual threads - virtualThreadsStreamingTests((m, e, p) -> adaptAsync(parallelToStream(m)), "ParallelCollectors.parallelToStream() [virtual]"), - virtualThreadsStreamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m)), "ParallelCollectors.parallelToOrderedStream() [virtual]"), // platform threads streamingTests((m, e, p) -> adaptAsync(parallelToStream(m, e, p)), format("ParallelCollectors.parallelToStream(p=%d)", PARALLELISM)), streamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m, e, p)), format("ParallelCollectors.parallelToOrderedStream(p=%d)", PARALLELISM)) @@ -157,27 +133,12 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { } } - private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of(shouldShortCircuitOnException(collector, name)); - } - - private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of(shouldShortCircuitOnException(collector, name)); - } - - private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of(shouldShortCircuitOnException(collector, name)); - } - private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of( - shouldPushElementsToStreamAsSoonAsPossible(collector, name), - shouldShortCircuitOnException(collector, name) - ); + return of(shouldPushElementsToStreamAsSoonAsPossible(collector, name)); } private static > Stream batchTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return Stream.concat(tests(collector, name), of(shouldProcessOnNThreadsETParallelism(collector, name))); + return of(shouldProcessOnNThreadsETParallelism(collector, name)); } private static > Stream batchStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { @@ -224,24 +185,6 @@ private static > DynamicTest shouldProcessOnNThrea }); } - private static > DynamicTest shouldShortCircuitOnException(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return dynamicTest(format("%s: should short circuit on exception", name), () -> { - List elements = IntStream.range(0, 100).boxed().toList(); - int size = 4; - - runWithExecutor(e -> { - AtomicInteger counter = new AtomicInteger(); - - assertThatThrownBy(elements.stream() - .collect(collector.apply(i -> incrementAndThrow(counter), e, PARALLELISM))::join) - .isInstanceOf(CompletionException.class) - .hasCauseExactlyInstanceOf(IllegalArgumentException.class); - - assertThat(counter.longValue()).isLessThan(elements.size()); - }, size); - }); - } - private static Collector>> adapt(Collector>> input) { return collectingAndThen(input, stream -> stream.thenApply(Stream::toList)); } diff --git a/src/test/java/com/pivovarit/collectors/test/ExceptionPropagationTest.java b/src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java similarity index 80% rename from src/test/java/com/pivovarit/collectors/test/ExceptionPropagationTest.java rename to src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java index 7ca795f0..a44a858d 100644 --- a/src/test/java/com/pivovarit/collectors/test/ExceptionPropagationTest.java +++ b/src/test/java/com/pivovarit/collectors/test/ExceptionHandlingTest.java @@ -9,17 +9,20 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.IntStream; import java.util.stream.Stream; -import static com.pivovarit.collectors.test.ExceptionPropagationTest.CollectorDefinition.collector; +import static com.pivovarit.collectors.TestUtils.incrementAndThrow; +import static com.pivovarit.collectors.test.ExceptionHandlingTest.CollectorDefinition.collector; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -class ExceptionPropagationTest { +class ExceptionHandlingTest { private static Stream> all() { return Stream.of( @@ -59,6 +62,22 @@ Stream shouldPropagateExceptionFactory() { })); } + @TestFactory + Stream shouldShortcircuitOnException() { + return all() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + List elements = IntStream.range(0, 100).boxed().toList(); + AtomicInteger counter = new AtomicInteger(); + + assertThatThrownBy(() -> elements.stream() + .collect(c.collector().apply(i -> incrementAndThrow(counter)))) + .isInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(IllegalArgumentException.class); + + assertThat(counter.longValue()).isLessThan(elements.size()); + })); + } + record CollectorDefinition(String name, Function, Collector>> collector) { static CollectorDefinition collector(String name, Function, Collector>> collector) { return new CollectorDefinition<>(name, collector);