Skip to content

Commit

Permalink
Add new API methods supporting custom Executors with unbounded parall…
Browse files Browse the repository at this point in the history
…elism (#919)

Add new API methods that accept a custom `Executor` but allow unlimited parallelism without extra overhead:

```
Collector<...> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor)
Collector<...> parallel(Function<T, R> mapper, Executor executor) 
Collector<...> parallelToStream(Function<T, R> mapper, Executor executor)
Collector<...> parallelToOrderedStream(Function<T, R> mapper, Executor executor)
```
  • Loading branch information
pivovarit authored Sep 10, 2024
1 parent 89142fc commit 3395142
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 16 deletions.
15 changes: 15 additions & 0 deletions src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), Function.identity());
}

static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");

return new AsyncParallelCollector<>(mapper, Dispatcher.from(executor), Function.identity());
}

static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
Expand Down Expand Up @@ -133,6 +140,14 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
: new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), s -> s.collect(collector));
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor) {
requireNonNull(collector, "collector can't be null");
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");

return new AsyncParallelCollector<>(mapper, Dispatcher.from(executor), s -> s.collect(collector));
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) {
requireNonNull(collector, "collector can't be null");
requireNonNull(executor, "executor can't be null");
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/pivovarit/collectors/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ private Dispatcher(int permits) {
this.limiter = new Semaphore(permits);
}

private Dispatcher(Executor executor) {
this.executor = executor;
this.limiter = null;
}

static <T> Dispatcher<T> from(Executor executor) {
return new Dispatcher<>(executor);
}

static <T> Dispatcher<T> from(Executor executor, int permits) {
return new Dispatcher<>(executor, permits);
}
Expand Down
107 changes: 107 additions & 0 deletions src/main/java/com/pivovarit/collectors/ParallelCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ private ParallelCollectors() {
return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
* .collect(parallel(i -> foo(i), toList(), executor));
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param collector the {@code Collector} describing the reduction
* @param executor the {@code Executor} to use for asynchronous execution
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
* @param <RR> the reduction result {@code collector}
*
* @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
*
* @since 3.3.0
*/
public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, Executor executor) {
return AsyncParallelCollector.collectingWithCollector(collector, mapper, executor);
}

/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
Expand Down Expand Up @@ -175,6 +201,33 @@ private ParallelCollectors() {
return AsyncParallelCollector.collectingToStream(mapper, executor, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism
* and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
*
* <br><br>
* The collector maintains the order of processed {@link Stream}. Instances should not be reused.
*
* <br>
* Example:
* <pre>{@code
* CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
* .collect(parallel(i -> foo(), executor));
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param executor the {@code Executor} to use for asynchronous execution
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
*
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
*
* @since 3.3.0
*/
public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, Executor executor) {
return AsyncParallelCollector.collectingToStream(mapper, executor);
}

/**
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
* and returning a {@link Stream} instance returning results as they arrive.
Expand Down Expand Up @@ -228,6 +281,33 @@ private ParallelCollectors() {
return ParallelStreamCollector.streaming(mapper, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism
* and returning a {@link Stream} instance returning results as they arrive.
* <p>
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
* Stream.of(1, 2, 3)
* .collect(parallelToStream(i -> foo(), executor, 2))
* .forEach(System.out::println);
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param executor the {@code Executor} to use for asynchronous execution
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
*
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
*
* @since 3.3.0
*/
public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, Executor executor) {
return ParallelStreamCollector.streaming(mapper, executor);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive.
Expand Down Expand Up @@ -309,6 +389,33 @@ private ParallelCollectors() {
return ParallelStreamCollector.streamingOrdered(mapper, parallelism);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} with unlimited parallelism
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
* <p>
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
* Stream.of(1, 2, 3)
* .collect(parallelToOrderedStream(i -> foo(), executor))
* .forEach(System.out::println);
* }</pre>
*
* @param mapper a transformation to be performed in parallel
* @param executor the {@code Executor} to use for asynchronous execution
* @param <T> the type of the collected elements
* @param <R> the result returned by {@code mapper}
*
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
*
* @since 3.3.0
*/
public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, Executor executor) {
return ParallelStreamCollector.streamingOrdered(mapper, executor);
}

/**
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ public Set<Characteristics> characteristics() {
return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual(parallelism));
}

static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");

return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.from(executor));
}

static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");
Expand All @@ -118,6 +125,13 @@ public Set<Characteristics> characteristics() {
return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual(parallelism));
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor) {
requireNonNull(executor, "executor can't be null");
requireNonNull(mapper, "mapper can't be null");

return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.from(executor));
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
int parallelism) {
requireNonNull(executor, "executor can't be null");
Expand Down
34 changes: 18 additions & 16 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,22 @@ Stream<DynamicTest> collectors() {
virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]", true),
virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]", true),
// platform threads
tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true),
tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false),
tests((m, e, p) -> parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.parallel(toCollection(), p=%d)", PARALLELISM), true),
tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM), true)
tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true, true),
tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false, true),
tests((m, e, p) -> parallel(m, toList(), e), "ParallelCollectors.parallel(toList(), p=inf)", true, false),
tests((m, e, p) -> parallel(m, toSet(), e), "ParallelCollectors.parallel(toSet(), p=inf)", false, false),
tests((m, e, p) -> parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.parallel(toCollection(), p=%d)", PARALLELISM), true, true),
tests((m, e, p) -> adapt(parallel(m, e, p)), format("ParallelCollectors.parallel(p=%d)", PARALLELISM), true, true)
).flatMap(i -> i);
}

@TestFactory
Stream<DynamicTest> batching_collectors() {
return of(
batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true),
batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false),
batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true),
batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true)
batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true, true),
batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false, true),
batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true, true),
batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true, true)
).flatMap(i -> i);
}

Expand Down Expand Up @@ -186,7 +188,7 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThread
: tests;
}

private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) {
var tests = of(
shouldCollect(collector, name, 1),
shouldCollect(collector, name, PARALLELISM),
Expand All @@ -195,19 +197,19 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
shouldCollectToEmpty(collector, name),
shouldStartConsumingImmediately(collector, name),
shouldNotBlockTheCallingThread(collector, name),
shouldRespectParallelism(collector, name),
shouldHandleThrowable(collector, name),
shouldShortCircuitOnException(collector, name),
shouldInterruptOnException(collector, name),
shouldHandleRejectedExecutionException(collector, name),
shouldRemainConsistent(collector, name),
shouldRejectInvalidParallelism(collector, name),
shouldHandleExecutorRejection(collector, name)
);

return maintainsOrder
? Stream.concat(tests, of(shouldMaintainOrder(collector, name)))
: tests;
tests = maintainsOrder ? Stream.concat(tests, of(shouldMaintainOrder(collector, name))) : tests;
tests = limitedParallelism ? of(shouldRespectParallelism(collector, name)) : tests;
tests = limitedParallelism ? of(shouldRejectInvalidParallelism(collector, name)) : tests;

return tests;
}

private static <R extends Collection<Integer>> Stream<DynamicTest> virtualThreadsStreamingTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
Expand Down Expand Up @@ -249,9 +251,9 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> streamingTest
: tests;
}

private static <R extends Collection<Integer>> Stream<DynamicTest> batchTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
private static <R extends Collection<Integer>> Stream<DynamicTest> batchTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) {
return Stream.concat(
tests(collector, name, maintainsOrder),
tests(collector, name, maintainsOrder, limitedParallelism),
of(shouldProcessOnNThreadsETParallelism(collector, name)));
}

Expand Down

0 comments on commit 3395142

Please sign in to comment.