Skip to content

Commit

Permalink
Rewrite interruption tests (#944)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent fd91254 commit 39af22d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 40 deletions.
42 changes: 3 additions & 39 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -160,23 +158,15 @@ void shouldExecuteEagerlyOnProvidedThreadPool() {
}

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name)
);
return of(shouldShortCircuitOnException(collector, name));
}

private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name)
);
return of(shouldShortCircuitOnException(collector, name));
}

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(
shouldShortCircuitOnException(collector, name)
);
return of(shouldShortCircuitOnException(collector, name));
}

private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
Expand Down Expand Up @@ -252,32 +242,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldShortCircuitOnE
});
}

private static <R extends Collection<Integer>> DynamicTest shouldInterruptOnException(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should interrupt on exception", name), () -> {
AtomicLong counter = new AtomicLong();
int size = 10;
CountDownLatch countDownLatch = new CountDownLatch(size);

runWithExecutor(e -> {
assertThatThrownBy(IntStream.range(0, size).boxed()
.collect(collector.apply(i -> {
try {
countDownLatch.countDown();
countDownLatch.await();
if (i == size - 1) throw new NullPointerException();
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex) {
counter.incrementAndGet();
}
return i;
}, e, PARALLELISM))::join)
.hasCauseExactlyInstanceOf(NullPointerException.class);

await().atMost(1, SECONDS).until(() -> counter.get() == size - 1);
}, size);
});
}

private static Collector<Integer, ?, CompletableFuture<Collection<Integer>>> adapt(Collector<Integer, ?, CompletableFuture<Stream<Integer>>> input) {
return collectingAndThen(input, stream -> stream.thenApply(Stream::toList));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.IntStream;
Expand All @@ -18,8 +20,10 @@
import static com.pivovarit.collectors.test.BasicProcessingTest.CollectorDefinition.collector;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.awaitility.Awaitility.await;

Expand Down Expand Up @@ -49,7 +53,8 @@ public static Stream<CollectorDefinition<Integer, Integer>> allOrdered() {
return Stream.of(
collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.join().toList())),
collector("parallel(e)", f -> collectingAndThen(ParallelCollectors.parallel(f, e()), c -> c.join().toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join().toList())),
collector("parallel(e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, e(), p()), c -> c.join()
.toList())),
collector("parallel(toList())", f -> collectingAndThen(ParallelCollectors.parallel(f, toList()), CompletableFuture::join)),
collector("parallel(toList(), e)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e()), CompletableFuture::join)),
collector("parallel(toList(), e, p)", f -> collectingAndThen(ParallelCollectors.parallel(f, toList(), e(), p()), CompletableFuture::join)),
Expand Down Expand Up @@ -108,6 +113,34 @@ Stream<DynamicTest> shouldStartProcessingImmediately() {
}));
}

@TestFactory
Stream<DynamicTest> shouldInterruptOnException() {
return all()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var counter = new AtomicLong();
int size = 4;
var latch = new CountDownLatch(size);

assertThatThrownBy(() -> IntStream.range(0, size).boxed()
.collect(c.collector().apply(i -> {
try {
latch.countDown();
latch.await();
if (i == 0) {
throw new NullPointerException();
}
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex) {
counter.incrementAndGet();
}
return i;
})))
.hasCauseExactlyInstanceOf(NullPointerException.class);

await().atMost(1, SECONDS).until(() -> counter.get() == size - 1);
}));
}

record CollectorDefinition<T, R>(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
static <T, R> CollectorDefinition<T, R> collector(String name, Function<Function<T, R>, Collector<T, ?, List<R>>> collector) {
return new CollectorDefinition<>(name, collector);
Expand Down

0 comments on commit 39af22d

Please sign in to comment.