Skip to content

Commit 3e44388

Browse files
committed
Add handy Collectors for executing jobs on Virtual Threads but with limited parallelism
1 parent 894384c commit 3e44388

File tree

8 files changed

+201
-49
lines changed

8 files changed

+201
-49
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
<groupId>com.pivovarit</groupId>
1616
<artifactId>parallel-collectors</artifactId>
17-
<version>3.1.1-SNAPSHOT</version>
17+
<version>3.2.0-SNAPSHOT</version>
1818
<url>https://github.com/pivovarit/parallel-collectors</url>
1919

2020
<packaging>jar</packaging>

src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java

+18
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Set;
88
import java.util.concurrent.CompletableFuture;
99
import java.util.concurrent.Executor;
10+
import java.util.concurrent.Executors;
1011
import java.util.function.BiConsumer;
1112
import java.util.function.BinaryOperator;
1213
import java.util.function.Function;
@@ -98,6 +99,13 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
9899
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), Function.identity());
99100
}
100101

102+
static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, int parallelism) {
103+
requireNonNull(mapper, "mapper can't be null");
104+
requireValidParallelism(parallelism);
105+
106+
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), Function.identity());
107+
}
108+
101109
static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> collectingToStream(Function<T, R> mapper, Executor executor, int parallelism) {
102110
requireNonNull(executor, "executor can't be null");
103111
requireNonNull(mapper, "mapper can't be null");
@@ -115,6 +123,16 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T
115123
return new AsyncParallelCollector<>(mapper, Dispatcher.virtual(), s -> s.collect(collector));
116124
}
117125

126+
static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, int parallelism) {
127+
requireNonNull(collector, "collector can't be null");
128+
requireNonNull(mapper, "mapper can't be null");
129+
requireValidParallelism(parallelism);
130+
131+
return parallelism == 1
132+
? asyncCollector(mapper, Executors.newVirtualThreadPerTaskExecutor(), s -> s.collect(collector))
133+
: new AsyncParallelCollector<>(mapper, Dispatcher.virtual(parallelism), s -> s.collect(collector));
134+
}
135+
118136
static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) {
119137
requireNonNull(collector, "collector can't be null");
120138
requireNonNull(executor, "executor can't be null");

src/main/java/com/pivovarit/collectors/Dispatcher.java

+9
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ private Dispatcher(Executor executor, int permits) {
4242
this.limiter = new Semaphore(permits);
4343
}
4444

45+
private Dispatcher(int permits) {
46+
this.executor = defaultExecutorService();
47+
this.limiter = new Semaphore(permits);
48+
}
49+
4550
static <T> Dispatcher<T> from(Executor executor, int permits) {
4651
return new Dispatcher<>(executor, permits);
4752
}
@@ -50,6 +55,10 @@ static <T> Dispatcher<T> virtual() {
5055
return new Dispatcher<>();
5156
}
5257

58+
static <T> Dispatcher<T> virtual(int permits) {
59+
return new Dispatcher<>(permits);
60+
}
61+
5362
void start() {
5463
if (!started.getAndSet(true)) {
5564
Thread.ofVirtual().start(() -> {

src/main/java/com/pivovarit/collectors/ParallelCollectors.java

+106-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ public final class ParallelCollectors {
1717
private ParallelCollectors() {
1818
}
1919

20-
2120
/**
2221
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
2322
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
@@ -43,6 +42,32 @@ private ParallelCollectors() {
4342
return AsyncParallelCollector.collectingWithCollector(collector, mapper);
4443
}
4544

45+
/**
46+
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
47+
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
48+
*
49+
* <br>
50+
* Example:
51+
* <pre>{@code
52+
* CompletableFuture<List<String>> result = Stream.of(1, 2, 3)
53+
* .collect(parallel(i -> foo(i), toList(), executor, 2));
54+
* }</pre>
55+
*
56+
* @param mapper a transformation to be performed in parallel
57+
* @param collector the {@code Collector} describing the reduction
58+
* @param <T> the type of the collected elements
59+
* @param <R> the result returned by {@code mapper}
60+
* @param <RR> the reduction result {@code collector}
61+
* @param parallelism the max parallelism level
62+
*
63+
* @return a {@code Collector} which collects all processed elements into a user-provided mutable {@code Collection} in parallel
64+
*
65+
* @since 3.2.0
66+
*/
67+
public static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> parallel(Function<T, R> mapper, Collector<R, ?, RR> collector, int parallelism) {
68+
return AsyncParallelCollector.collectingWithCollector(collector, mapper, parallelism);
69+
}
70+
4671
/**
4772
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
4873
* and returning them as a {@link CompletableFuture} containing a result of the application of the user-provided {@link Collector}.
@@ -96,6 +121,32 @@ private ParallelCollectors() {
96121
return AsyncParallelCollector.collectingToStream(mapper);
97122
}
98123

124+
/**
125+
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
126+
* and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
127+
*
128+
* <br><br>
129+
* The collector maintains the order of processed {@link Stream}. Instances should not be reused.
130+
*
131+
* <br>
132+
* Example:
133+
* <pre>{@code
134+
* CompletableFuture<Stream<String>> result = Stream.of(1, 2, 3)
135+
* .collect(parallel(i -> foo()));
136+
* }</pre>
137+
*
138+
* @param mapper a transformation to be performed in parallel
139+
* @param <T> the type of the collected elements
140+
* @param <R> the result returned by {@code mapper}
141+
*
142+
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
143+
*
144+
* @since 3.2.0
145+
*/
146+
public static <T, R> Collector<T, ?, CompletableFuture<Stream<R>>> parallel(Function<T, R> mapper, int parallelism) {
147+
return AsyncParallelCollector.collectingToStream(mapper, parallelism);
148+
}
149+
99150
/**
100151
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
101152
* and returning them as {@link CompletableFuture} containing a {@link Stream} of these elements.
@@ -150,6 +201,33 @@ private ParallelCollectors() {
150201
return ParallelStreamCollector.streaming(mapper);
151202
}
152203

204+
/**
205+
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
206+
* and returning a {@link Stream} instance returning results as they arrive.
207+
* <p>
208+
* For the parallelism of 1, the stream is executed by the calling thread.
209+
*
210+
* <br>
211+
* Example:
212+
* <pre>{@code
213+
* Stream.of(1, 2, 3)
214+
* .collect(parallelToStream(i -> foo(), executor, 2))
215+
* .forEach(System.out::println);
216+
* }</pre>
217+
*
218+
* @param mapper a transformation to be performed in parallel
219+
* @param parallelism the max parallelism level
220+
* @param <T> the type of the collected elements
221+
* @param <R> the result returned by {@code mapper}
222+
*
223+
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
224+
*
225+
* @since 3.2.0
226+
*/
227+
public static <T, R> Collector<T, ?, Stream<R>> parallelToStream(Function<T, R> mapper, int parallelism) {
228+
return ParallelStreamCollector.streaming(mapper, parallelism);
229+
}
230+
153231
/**
154232
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
155233
* and returning a {@link Stream} instance returning results as they arrive.
@@ -204,6 +282,33 @@ private ParallelCollectors() {
204282
return ParallelStreamCollector.streamingOrdered(mapper);
205283
}
206284

285+
/**
286+
* A convenience {@link Collector} used for executing parallel computations using Virtual Threads
287+
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
288+
* <p>
289+
* For the parallelism of 1, the stream is executed by the calling thread.
290+
*
291+
* <br>
292+
* Example:
293+
* <pre>{@code
294+
* Stream.of(1, 2, 3)
295+
* .collect(parallelToOrderedStream(i -> foo(), executor, 2))
296+
* .forEach(System.out::println);
297+
* }</pre>
298+
*
299+
* @param mapper a transformation to be performed in parallel
300+
* @param parallelism the max parallelism level
301+
* @param <T> the type of the collected elements
302+
* @param <R> the result returned by {@code mapper}
303+
*
304+
* @return a {@code Collector} which collects all processed elements into a {@code Stream} in parallel
305+
*
306+
* @since 3.2.0
307+
*/
308+
public static <T, R> Collector<T, ?, Stream<R>> parallelToOrderedStream(Function<T, R> mapper, int parallelism) {
309+
return ParallelStreamCollector.streamingOrdered(mapper, parallelism);
310+
}
311+
207312
/**
208313
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
209314
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.

src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java

+48-37
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFutu
4040
private final Dispatcher<R> dispatcher;
4141

4242
private ParallelStreamCollector(
43-
Function<T, R> function,
44-
CompletionStrategy<R> completionStrategy,
45-
Set<Characteristics> characteristics,
46-
Dispatcher<R> dispatcher) {
43+
Function<T, R> function,
44+
CompletionStrategy<R> completionStrategy,
45+
Set<Characteristics> characteristics,
46+
Dispatcher<R> dispatcher) {
4747
this.completionStrategy = completionStrategy;
4848
this.characteristics = characteristics;
4949
this.dispatcher = dispatcher;
@@ -67,7 +67,7 @@ public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
6767
public BinaryOperator<List<CompletableFuture<R>>> combiner() {
6868
return (left, right) -> {
6969
throw new UnsupportedOperationException(
70-
"Using parallel stream with parallel collectors is a bad idea");
70+
"Using parallel stream with parallel collectors is a bad idea");
7171
};
7272
}
7373

@@ -90,6 +90,13 @@ public Set<Characteristics> characteristics() {
9090
return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual());
9191
}
9292

93+
static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, int parallelism) {
94+
requireNonNull(mapper, "mapper can't be null");
95+
requireValidParallelism(parallelism);
96+
97+
return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.virtual(parallelism));
98+
}
99+
93100
static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor, int parallelism) {
94101
requireNonNull(executor, "executor can't be null");
95102
requireNonNull(mapper, "mapper can't be null");
@@ -104,8 +111,15 @@ public Set<Characteristics> characteristics() {
104111
return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual());
105112
}
106113

114+
static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, int parallelism) {
115+
requireNonNull(mapper, "mapper can't be null");
116+
requireValidParallelism(parallelism);
117+
118+
return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.virtual(parallelism));
119+
}
120+
107121
static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
108-
int parallelism) {
122+
int parallelism) {
109123
requireNonNull(executor, "executor can't be null");
110124
requireNonNull(mapper, "mapper can't be null");
111125
requireValidParallelism(parallelism);
@@ -119,60 +133,57 @@ private BatchingCollectors() {
119133
}
120134

121135
static <T, R> Collector<T, ?, Stream<R>> streaming(Function<T, R> mapper, Executor executor,
122-
int parallelism) {
136+
int parallelism) {
123137
requireNonNull(executor, "executor can't be null");
124138
requireNonNull(mapper, "mapper can't be null");
125139
requireValidParallelism(parallelism);
126140

127141
return parallelism == 1
128-
? syncCollector(mapper)
129-
: batchingCollector(mapper, executor, parallelism);
142+
? syncCollector(mapper)
143+
: batchingCollector(mapper, executor, parallelism);
130144
}
131145

132146
static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
133-
int parallelism) {
147+
int parallelism) {
134148
requireNonNull(executor, "executor can't be null");
135149
requireNonNull(mapper, "mapper can't be null");
136150
requireValidParallelism(parallelism);
137151

138152
return parallelism == 1
139-
? syncCollector(mapper)
140-
: batchingCollector(mapper, executor, parallelism);
153+
? syncCollector(mapper)
154+
: batchingCollector(mapper, executor, parallelism);
141155
}
142156

143157
private static <T, R> Collector<T, ?, Stream<R>> batchingCollector(Function<T, R> mapper,
144-
Executor executor, int parallelism) {
158+
Executor executor, int parallelism) {
145159
return collectingAndThen(
146-
toList(),
147-
list -> {
148-
// no sense to repack into batches of size 1
149-
if (list.size() == parallelism) {
150-
return list.stream()
151-
.collect(new ParallelStreamCollector<>(
152-
mapper,
153-
ordered(),
154-
emptySet(),
155-
Dispatcher.from(executor, parallelism)));
156-
}
157-
else {
158-
return partitioned(list, parallelism)
159-
.collect(collectingAndThen(new ParallelStreamCollector<>(
160-
batching(mapper),
161-
ordered(),
162-
emptySet(),
163-
Dispatcher.from(executor, parallelism)),
164-
s -> s.flatMap(Collection::stream)));
165-
}
166-
});
160+
toList(),
161+
list -> {
162+
// no sense to repack into batches of size 1
163+
if (list.size() == parallelism) {
164+
return list.stream()
165+
.collect(new ParallelStreamCollector<>(
166+
mapper,
167+
ordered(),
168+
emptySet(),
169+
Dispatcher.from(executor, parallelism)));
170+
} else {
171+
return partitioned(list, parallelism)
172+
.collect(collectingAndThen(new ParallelStreamCollector<>(
173+
batching(mapper),
174+
ordered(),
175+
emptySet(),
176+
Dispatcher.from(executor, parallelism)),
177+
s -> s.flatMap(Collection::stream)));
178+
}
179+
});
167180
}
168181

169182
private static <T, R> Collector<T, Stream.Builder<R>, Stream<R>> syncCollector(Function<T, R> mapper) {
170183
return Collector.of(Stream::builder, (rs, t) -> rs.add(mapper.apply(t)), (rs, rs2) -> {
171184
throw new UnsupportedOperationException(
172-
"Using parallel stream with parallel collectors is a bad idea");
185+
"Using parallel stream with parallel collectors is a bad idea");
173186
}, Stream.Builder::build);
174187
}
175-
176188
}
177-
178189
}

src/test/java/com/pivovarit/collectors/FunctionalTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,13 @@ Stream<DynamicTest> collectors() {
6464
return of(
6565
// virtual threads
6666
virtualThreadsTests((m, e, p) -> parallel(m, toList()), "ParallelCollectors.parallel(toList()) [virtual]", true),
67+
virtualThreadsTests((m, e, p) -> parallel(m, toList(), p), "ParallelCollectors.parallel(toList()) [virtual]", true),
6768
virtualThreadsTests((m, e, p) -> parallel(m, toSet()), "ParallelCollectors.parallel(toSet()) [virtual]", false),
69+
virtualThreadsTests((m, e, p) -> parallel(m, toSet(), p), "ParallelCollectors.parallel(toSet()) [virtual]", false),
6870
virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new)), "ParallelCollectors.parallel(toCollection()) [virtual]", true),
71+
virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new), p), "ParallelCollectors.parallel(toCollection()) [virtual]", true),
6972
virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]", true),
73+
virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]", true),
7074
// platform threads
7175
tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true),
7276
tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false),

0 commit comments

Comments
 (0)