Skip to content

Commit 7a05492

Browse files
authored
Avoid batching when collection size is equal to parallelism (#561)
There's no point in batching when each batch would carry only a single element.
1 parent 37b409f commit 7a05492

File tree

4 files changed

+55
-12
lines changed

4 files changed

+55
-12
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
<groupId>com.pivovarit</groupId>
2121
<artifactId>parallel-collectors</artifactId>
22-
<version>2.4.2-SNAPSHOT</version>
22+
<version>2.5.0-SNAPSHOT</version>
2323

2424
<packaging>jar</packaging>
2525

src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,22 @@ private BatchingCollectors() {
168168
private static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism, Function<Stream<R>, RR> finisher) {
169169
return collectingAndThen(
170170
toList(),
171-
list -> partitioned(list, parallelism)
172-
.collect(new AsyncParallelCollector<>(
173-
batching(mapper),
174-
Dispatcher.of(executor, parallelism),
175-
listStream -> finisher.apply(listStream.flatMap(Collection::stream)))));
171+
list -> {
172+
// no sense to repack into batches of size 1
173+
if (list.size() == parallelism) {
174+
return list.stream()
175+
.collect(new AsyncParallelCollector<>(
176+
mapper,
177+
Dispatcher.of(executor, parallelism),
178+
finisher));
179+
} else {
180+
return partitioned(list, parallelism)
181+
.collect(new AsyncParallelCollector<>(
182+
batching(mapper),
183+
Dispatcher.of(executor, parallelism),
184+
listStream -> finisher.apply(listStream.flatMap(Collection::stream))));
185+
}
186+
});
176187
}
177188
}
178189
}

src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java

+24-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.Collection;
44
import java.util.EnumSet;
5-
import java.util.List;
65
import java.util.Set;
76
import java.util.concurrent.CompletableFuture;
87
import java.util.concurrent.Executor;
@@ -130,7 +129,7 @@ private BatchingCollectors() {
130129

131130
return parallelism == 1
132131
? syncCollector(mapper)
133-
: batched(new ParallelStreamCollector<>(batching(mapper), unordered(), UNORDERED, executor, parallelism), parallelism);
132+
: batchingCollector(mapper, executor, parallelism);
134133
}
135134

136135
static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor, int parallelism) {
@@ -140,14 +139,33 @@ private BatchingCollectors() {
140139

141140
return parallelism == 1
142141
? syncCollector(mapper)
143-
: batched(new ParallelStreamCollector<>(batching(mapper), ordered(), emptySet(), executor, parallelism), parallelism);
142+
: batchingCollector(mapper, executor, parallelism);
144143
}
145144

146-
private static <T, R> Collector<T, ?, Stream<R>> batched(ParallelStreamCollector<List<T>, List<R>> downstream, int parallelism) {
145+
private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper, Executor executor, int parallelism) {
147146
return collectingAndThen(
148147
toList(),
149-
list -> partitioned(list, parallelism)
150-
.collect(collectingAndThen(downstream, s -> s.flatMap(Collection::stream))));
148+
list -> {
149+
// no sense to repack into batches of size 1
150+
if (list.size() == parallelism) {
151+
return list.stream()
152+
.collect(new ParallelStreamCollector<>(
153+
mapper,
154+
ordered(),
155+
emptySet(),
156+
executor,
157+
parallelism));
158+
} else {
159+
return partitioned(list, parallelism)
160+
.collect(collectingAndThen(new ParallelStreamCollector<>(
161+
batching(mapper),
162+
ordered(),
163+
emptySet(),
164+
executor,
165+
parallelism),
166+
s -> s.flatMap(Collection::stream)));
167+
}
168+
});
151169
}
152170

153171
private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {

src/test/java/com/pivovarit/collectors/FunctionalTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ private static <R extends Collection<Integer>> Stream<DynamicTest> tests(Collect
137137
Stream<DynamicTest> tests = of(
138138
shouldCollect(collector, name, 1),
139139
shouldCollect(collector, name, PARALLELISM),
140+
shouldCollectNElementsWithNParallelism(collector, name, 1),
141+
shouldCollectNElementsWithNParallelism(collector, name, PARALLELISM),
140142
shouldCollectToEmpty(collector, name),
141143
shouldStartConsumingImmediately(collector, name),
142144
shouldTerminateAfterConsumingAllElements(collector, name),
@@ -251,6 +253,18 @@ private static <R extends Collection<Integer>> DynamicTest shouldCollect(Collect
251253
});
252254
}
253255

256+
private static <R extends Collection<Integer>> DynamicTest shouldCollectNElementsWithNParallelism(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> factory, String name, int parallelism) {
257+
return dynamicTest(format("%s: should collect %s elements with parallelism %s", name, parallelism, parallelism), () -> {
258+
259+
List<Integer> elements = IntStream.iterate(0, i -> i + 1).limit(parallelism).boxed().collect(toList());
260+
Collector<Integer, ?, CompletableFuture<R>> ctor = factory.apply(i -> i, executor, parallelism);
261+
Collection<Integer> result = elements.stream().collect(ctor)
262+
.join();
263+
264+
assertThat(result).hasSameElementsAs(elements);
265+
});
266+
}
267+
254268
private static <R extends Collection<Integer>> DynamicTest shouldTerminateAfterConsumingAllElements(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> factory, String name) {
255269
return dynamicTest(format("%s: should terminate after consuming all elements", name), () -> {
256270
List<Integer> elements = IntStream.range(0, 10).boxed().collect(toList());

0 commit comments

Comments
 (0)