Skip to content

Commit

Permalink
Align behaviour of async colllector with other implementations (#936)
Browse files Browse the repository at this point in the history
When it comes to handling `RejectedExecutionException`
  • Loading branch information
pivovarit authored Sep 15, 2024
1 parent 9fe098a commit ad8fff7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 24 deletions.
19 changes: 13 additions & 6 deletions src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -166,13 +167,19 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) {
return collectingAndThen(toList(), list -> supplyAsync(() -> {
Stream.Builder<R> acc = Stream.builder();
for (T t : list) {
acc.add(mapper.apply(t));
return collectingAndThen(toList(), list -> {
try {
return supplyAsync(() -> {
Stream.Builder<R> acc = Stream.builder();
for (T t : list) {
acc.add(mapper.apply(t));
}
return finisher.apply(acc.build());
}, executor);
} catch (Exception e) {
throw new CompletionException(e);
}
return finisher.apply(acc.build());
}, executor));
});
}

static final class BatchingCollectors {
Expand Down
20 changes: 2 additions & 18 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;

/**
Expand Down Expand Up @@ -180,8 +179,7 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
shouldHandleRejectedExecutionException(collector, name),
shouldRemainConsistent(collector, name),
shouldHandleExecutorRejection(collector, name)
shouldRemainConsistent(collector, name)
);

tests = limitedParallelism ? of(shouldRespectParallelism(collector, name)) : tests;
Expand All @@ -208,8 +206,7 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTest
shouldShortCircuitOnException(collector, name),
shouldHandleRejectedExecutionException(collector, name),
shouldRemainConsistent(collector, name),
shouldRejectInvalidParallelism(collector, name),
shouldHandleExecutorRejection(collector, name)
shouldRejectInvalidParallelism(collector, name)
);
}

Expand Down Expand Up @@ -376,19 +373,6 @@ private static <R extends Collection<Integer>> DynamicTest shouldRejectInvalidPa
});
}

private static <R extends Collection<Integer>> DynamicTest shouldHandleExecutorRejection(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should handle rejected execution", name), () -> {
assertThatThrownBy(() -> {
try (var e = new ThreadPoolExecutor(2, 2, 0L, MILLISECONDS,
new LinkedBlockingQueue<>(1), new ThreadPoolExecutor.AbortPolicy())) {
assertTimeoutPreemptively(ofMillis(100), () -> of(1, 2, 3, 4)
.collect(collector.apply(i -> TestUtils.sleepAndReturn(1_000, i), e, Integer.MAX_VALUE))
.join());
}
}).isExactlyInstanceOf(CompletionException.class);
});
}

private static <R extends Collection<Integer>> DynamicTest shouldStartConsumingImmediately(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return dynamicTest(format("%s: should start consuming immediately", name), () -> {
try (var e = Executors.newCachedThreadPool()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.pivovarit.collectors.test;

import com.pivovarit.collectors.ParallelCollectors;
import com.pivovarit.collectors.TestUtils;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.TestFactory;

import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.test.RejectedExecutionHandlingTest.CollectorDefinition.collector;
import static java.time.Duration.ofMillis;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Stream.of;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

class RejectedExecutionHandlingTest {

private static Stream<CollectorDefinition<Integer, Integer>> allWithCustomExecutors() {
return Stream.of(
collector("parallel(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e), c -> c.thenApply(Stream::toList)
.join())),
collector("parallel(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 4), c -> c.thenApply(Stream::toList)
.join())),
collector("parallel(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 4), c -> c.thenApply(Stream::toList)
.join())),
collector("parallelToStream(e)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e), Stream::toList)),
collector("parallelToStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 4), Stream::toList)),
collector("parallelToStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToStream(f, e, 4), Stream::toList)),
collector("parallelToOrderedStream(e, p=4)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 4), Stream::toList)),
collector("parallelToOrderedStream(e, p=4) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallelToOrderedStream(f, e, 4), Stream::toList))
);
}

@TestFactory
Stream<DynamicTest> shouldRejectInvalidRejectedExecutionHandlerFactory() {
return allWithCustomExecutors()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
assertThatThrownBy(() -> {
try (var e = new ThreadPoolExecutor(2, 2, 0L, MILLISECONDS,
new LinkedBlockingQueue<>(1), new ThreadPoolExecutor.AbortPolicy())) {
assertTimeoutPreemptively(ofMillis(100), () -> of(1, 2, 3, 4)
.collect(c.factory().collector(i -> TestUtils.sleepAndReturn(1_000, i), e)));
}
}).isExactlyInstanceOf(CompletionException.class);
}));
}

private static Stream<CollectorDefinition<Integer, Integer>> allWithCustomExecutorsParallelismOne() {
return Stream.of(
collector("parallel(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())),
collector("parallel(e, p=1) [batching]", (f, e) -> collectingAndThen(ParallelCollectors.Batching.parallel(f, e, 1), c -> c.thenApply(Stream::toList).join())),
collector("parallelToStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToStream(f, e, 1), Stream::toList)),
collector("parallelToOrderedStream(e, p=1)", (f, e) -> collectingAndThen(ParallelCollectors.parallelToOrderedStream(f, e, 1), Stream::toList))
);
}

@TestFactory
Stream<DynamicTest> shouldRejectInvalidRejectedExecutionHandlerWhenParallelismOneFactory() {
return allWithCustomExecutorsParallelismOne()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var e = new ThreadPoolExecutor(1, 1, 0L, SECONDS, new LinkedBlockingQueue<>(1));
e.submit(() -> TestUtils.sleepAndReturn(10_000, 42));
e.submit(() -> TestUtils.sleepAndReturn(10_000, 42));
assertThatThrownBy(() -> {
assertTimeoutPreemptively(ofMillis(100), () -> of(1, 2, 3, 4)
.collect(c.factory().collector(i -> TestUtils.sleepAndReturn(1_000, i), e)));
}).isExactlyInstanceOf(CompletionException.class);
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> factory) {
return new CollectorDefinition<>(name, factory);
}
}

@FunctionalInterface
private interface CollectorFactory<T, R> {
Collector<T, ?, List<R>> collector(Function<T, R> f, Executor executor);
}
}

0 comments on commit ad8fff7

Please sign in to comment.