diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index ced46886..e7e576ec 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -14,14 +14,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.IntStream; @@ -160,23 +158,15 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { } private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of( - shouldShortCircuitOnException(collector, name), - shouldInterruptOnException(collector, name) - ); + return of(shouldShortCircuitOnException(collector, name)); } private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of( - shouldShortCircuitOnException(collector, name), - shouldInterruptOnException(collector, name) - ); + return of(shouldShortCircuitOnException(collector, name)); } private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return of( - shouldShortCircuitOnException(collector, name) - ); + return of(shouldShortCircuitOnException(collector, name)); } private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { @@ -252,32 +242,6 @@ private static > DynamicTest shouldShortCircuitOnE }); } - private static > DynamicTest shouldInterruptOnException(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { - return dynamicTest(format("%s: should interrupt on exception", name), () -> { - AtomicLong counter = new AtomicLong(); - int size = 10; - CountDownLatch countDownLatch = new CountDownLatch(size); - - runWithExecutor(e -> { - assertThatThrownBy(IntStream.range(0, size).boxed() - .collect(collector.apply(i -> { - try { - countDownLatch.countDown(); - countDownLatch.await(); - if (i == size - 1) throw new NullPointerException(); - Thread.sleep(Integer.MAX_VALUE); - } catch (InterruptedException ex) { - counter.incrementAndGet(); - } - return i; - }, e, PARALLELISM))::join) - .hasCauseExactlyInstanceOf(NullPointerException.class); - - await().atMost(1, SECONDS).until(() -> counter.get() == size - 1); - }, size); - }); - } - private static Collector>> adapt(Collector>> input) { return collectingAndThen(input, stream -> stream.thenApply(Stream::toList)); } diff --git a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java index 55d5d813..f9fb08f7 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicProcessingTest.java @@ -6,9 +6,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.IntStream; @@ -18,8 +20,10 @@ import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector; import static java.time.Duration.ofSeconds; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.awaitility.Awaitility.await; @@ -49,7 +53,8 @@ public static Stream> allOrdered() { return Stream.of( collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())), collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())), - collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())), + collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join() + .toList())), collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)), collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)), collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)), @@ -108,6 +113,34 @@ Stream shouldStartProcessingImmediately() { })); } + @TestFactory + Stream shouldInterruptOnException() { + return all() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + var counter = new AtomicLong(); + int size = 4; + var latch = new CountDownLatch(size); + + assertThatThrownBy(() -> IntStream.range(0, size).boxed() + .collect(c.collector().apply(i -> { + try { + latch.countDown(); + latch.await(); + if (i == 0) { + throw new NullPointerException(); + } + Thread.sleep(Integer.MAX_VALUE); + } catch (InterruptedException ex) { + counter.incrementAndGet(); + } + return i; + }))) + .hasCauseExactlyInstanceOf(NullPointerException.class); + + await().atMost(1, SECONDS).until(() -> counter.get() == size - 1); + })); + } + record CollectorDefinition(String name, Function, Collector>> collector) { static CollectorDefinition collector(String name, Function, Collector>> collector) { return new CollectorDefinition<>(name, collector);