Skip to content

Commit

Permalink
Don't batch when batch size is equal to one
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit committed Dec 6, 2020
1 parent e753dc8 commit d57e621
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>com.pivovarit</groupId>
<artifactId>parallel-collectors</artifactId>
<version>2.4.2-SNAPSHOT</version>
<version>2.5.0-SNAPSHOT</version>

<packaging>jar</packaging>

Expand Down
21 changes: 16 additions & 5 deletions src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,22 @@ private BatchingCollectors() {
private static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism, Function<Stream<R>, RR> finisher) {
return collectingAndThen(
toList(),
list -> partitioned(list, parallelism)
.collect(new AsyncParallelCollector<>(
batching(mapper),
Dispatcher.of(executor, parallelism),
listStream -> finisher.apply(listStream.flatMap(Collection::stream)))));
list -> {
// no sense to repack into batches of size 1
if (list.size() == parallelism) {
return list.stream()
.collect(new AsyncParallelCollector<>(
mapper,
Dispatcher.of(executor, parallelism),
finisher));
} else {
return partitioned(list, parallelism)
.collect(new AsyncParallelCollector<>(
batching(mapper),
Dispatcher.of(executor, parallelism),
listStream -> finisher.apply(listStream.flatMap(Collection::stream))));
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -130,7 +129,7 @@ private BatchingCollectors() {

return parallelism == 1
? syncCollector(mapper)
: batched(new ParallelStreamCollector<>(batching(mapper), unordered(), UNORDERED, executor, parallelism), parallelism);
: batchingCollector(mapper, executor, parallelism);
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor, int parallelism) {
Expand All @@ -140,14 +139,33 @@ private BatchingCollectors() {

return parallelism == 1
? syncCollector(mapper)
: batched(new ParallelStreamCollector<>(batching(mapper), ordered(), emptySet(), executor, parallelism), parallelism);
: batchingCollector(mapper, executor, parallelism);
}

private static <T, R> Collector<T, ?, Stream<R>> batched(ParallelStreamCollector<List<T>, List<R>> downstream, int parallelism) {
private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism) {
return collectingAndThen(
toList(),
list -> partitioned(list, parallelism)
.collect(collectingAndThen(downstream, s -> s.flatMap(Collection::stream))));
list -> {
// no sense to repack into batches of size 1
if (list.size() == parallelism) {
return list.stream()
.collect(new ParallelStreamCollector<>(
mapper,
ordered(),
emptySet(),
executor,
parallelism));
} else {
return partitioned(list, parallelism)
.collect(collectingAndThen(new ParallelStreamCollector<>(
batching(mapper),
ordered(),
emptySet(),
executor,
parallelism),
s -> s.flatMap(Collection::stream)));
}
});
}

private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
Stream<DynamicTest> tests = of(
shouldCollect(collector, name, 1),
shouldCollect(collector, name, PARALLELISM),
shouldCollectNElementsWithNParallelism(collector, name, 1),
shouldCollectNElementsWithNParallelism(collector, name, PARALLELISM),
shouldCollectToEmpty(collector, name),
shouldStartConsumingImmediately(collector, name),
shouldTerminateAfterConsumingAllElements(collector, name),
Expand Down Expand Up @@ -251,6 +253,18 @@ private static <R extends Collection<Integer>> DynamicTest shouldCollect(Collect
});
}

private static <R extends Collection<Integer>> DynamicTest shouldCollectNElementsWithNParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> factory, String name, int parallelism) {
return dynamicTest(format("%s: should collect %s elements with parallelism %s", name, parallelism, parallelism), () -> {

List<Integer> elements = IntStream.iterate(0, i -> i + 1).limit(parallelism).boxed().collect(toList());
Collector<Integer, ?, CompletableFuture<R>> ctor = factory.apply(i -> i, executor, parallelism);
Collection<Integer> result = elements.stream().collect(ctor)
.join();

assertThat(result).hasSameElementsAs(elements);
});
}

private static <R extends Collection<Integer>> DynamicTest shouldTerminateAfterConsumingAllElements(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> factory, String name) {
return dynamicTest(format("%s: should terminate after consuming all elements", name), () -> {
List<Integer> elements = IntStream.range(0, 10).boxed().collect(toList());
Expand Down

0 comments on commit d57e621

Please sign in to comment.