Skip to content

Commit

Permalink
Rewrite exception propagation test (#939)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent 8a851c0 commit b57986f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 23 deletions.
23 changes: 0 additions & 23 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {
private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
shouldRemainConsistent(collector, name)
Expand All @@ -174,7 +173,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread
private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean limitedParallelism) {
var tests = of(
shouldStartConsumingImmediately(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
shouldRemainConsistent(collector, name)
Expand All @@ -189,7 +187,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldStartConsumingImmediately(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldRemainConsistent(collector, name)
);
Expand All @@ -200,7 +197,6 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTest
shouldStartConsumingImmediately(collector, name),
shouldRespectParallelism(collector, name),
shouldPushElementsToStreamAsSoonAsPossible(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldRemainConsistent(collector, name),
shouldRejectInvalidParallelism(collector, name)
Expand Down Expand Up @@ -293,25 +289,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldShortCircuitOnE
});
}

private static <R extends Collection<Integer>> DynamicTest shouldHandleThrowable(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should not swallow exception", name), () -> {
List<Integer> elements = IntStream.range(0, 10).boxed().toList();

runWithExecutor(e -> {
assertThatThrownBy(elements.stream()
.collect(collector.apply(i -> {
if (i == 7) {
throw new IllegalArgumentException();
} else {
return i;
}
}, e, PARALLELISM))::join)
.isInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(IllegalArgumentException.class);
}, 10);
});
}

private static <R extends Collection<Integer>> DynamicTest shouldRemainConsistent(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should remain consistent", name), () -> {
int parallelism = 100;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.pivovarit.collectors.test.ExceptionPropagationTest.CollectorDefinition.collector;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class ExceptionPropagationTest {

private static Stream<CollectorDefinition<Integer, Integer>> all() {
return Stream.of(
collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())),
collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())),
collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)),
collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)),
collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)),
collector("parallel(toList(), e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallel(f, toList(), e(), p()), CompletableFuture::join)),
collector("parallelToStream()", f -> collectingAndThen(ParallelCollectors.parallelToStream(f), Stream::toList)),
collector("parallelToStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e()), Stream::toList)),
collector("parallelToStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToStream(f, e(), p()), Stream::toList)),
collector("parallelToStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e(), p()), Stream::toList)),
collector("parallelToOrderedStream()", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f), Stream::toList)),
collector("parallelToOrderedStream(e)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e()), Stream::toList)),
collector("parallelToOrderedStream(e, p)", f -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e(), p()), Stream::toList)),
collector("parallelToOrderedStream(e, p) [batching]", f -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e(), p()), Stream::toList))
);
}

@TestFactory
Stream<DynamicTest> shouldPropagateExceptionFactory() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
assertThatThrownBy(() -> IntStream.range(0, 10)
.boxed()
.collect(c.collector().apply(i -> {
if (i == 7) {
throw new IllegalArgumentException();
} else {
return i;
}
})))
.isInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(IllegalArgumentException.class);
}));
}

record CollectorDefinition<T, R>(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
return new CollectorDefinition<>(name, collector);
}
}

private static Executor e() {
return Executors.newCachedThreadPool();
}

private static int p() {
return 4;
}
}

0 comments on commit b57986f

Please sign in to comment.