Skip to content

Commit 3951f2a

Browse files
committed
Seal CompletionStrategy
1 parent 7cf5087 commit 3951f2a

File tree

5 files changed

+28
-17
lines changed

5 files changed

+28
-17
lines changed

src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.Set;
88
import java.util.concurrent.CompletableFuture;
99
import java.util.concurrent.Executor;
10-
import java.util.concurrent.Executors;
1110
import java.util.function.BiConsumer;
1211
import java.util.function.BinaryOperator;
1312
import java.util.function.Function;

src/main/java/com/pivovarit/collectors/CompletionStrategy.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,30 @@
66
import java.util.stream.Stream;
77
import java.util.stream.StreamSupport;
88

9-
interface CompletionStrategy<T> extends Function<List<CompletableFuture<T>>, Stream<T>> {
9+
sealed interface CompletionStrategy<T> extends Function<List<CompletableFuture<T>>, Stream<T>> permits CompletionStrategy.Unordered, CompletionStrategy.Ordered {
10+
11+
Unordered<?> UNORDERED = new Unordered<>();
12+
Ordered<?> ORDERED = new Ordered<>();
1013

1114
static <R> CompletionStrategy<R> unordered() {
12-
return futures -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false);
15+
return (CompletionStrategy<R>) UNORDERED;
1316
}
1417

1518
static <R> CompletionStrategy<R> ordered() {
16-
return futures -> futures.stream().map(CompletableFuture::join);
19+
return (CompletionStrategy<R>) ORDERED;
20+
}
21+
22+
final class Unordered<T> implements CompletionStrategy<T> {
23+
@Override
24+
public Stream<T> apply(List<CompletableFuture<T>> futures) {
25+
return StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false);
26+
}
27+
}
28+
29+
final class Ordered<T> implements CompletionStrategy<T> {
30+
@Override
31+
public Stream<T> apply(List<CompletableFuture<T>> futures) {
32+
return futures.stream().map(CompletableFuture::join);
33+
}
1734
}
1835
}

src/main/java/com/pivovarit/collectors/Dispatcher.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ private FutureTask<T> completionTask(Supplier<T> supplier, InterruptibleCompleta
5454
if (limiter == null) {
5555
future.complete(supplier.get());
5656
} else {
57-
withLimiter(supplier, future);
57+
try {
58+
limiter.acquire();
59+
future.complete(supplier.get());
60+
} finally {
61+
limiter.release();
62+
}
5863
}
5964
} catch (Throwable e) {
6065
completionSignaller.completeExceptionally(e);
@@ -65,15 +70,6 @@ private FutureTask<T> completionTask(Supplier<T> supplier, InterruptibleCompleta
6570
return task;
6671
}
6772

68-
private void withLimiter(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) throws InterruptedException {
69-
try {
70-
limiter.acquire();
71-
future.complete(supplier.get());
72-
} finally {
73-
limiter.release();
74-
}
75-
}
76-
7773
private static <T> BiConsumer<T, Throwable> shortcircuit(InterruptibleCompletableFuture<?> future) {
7874
return (__, throwable) -> {
7975
if (throwable != null) {

src/main/java/com/pivovarit/collectors/FutureCollectors.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
final class FutureCollectors {
1414
static <T, R> Collector<CompletableFuture<T>, ?, CompletableFuture<R>> toFuture(Collector<T, ?, R> collector) {
1515
return Collectors.collectingAndThen(toList(), list -> {
16-
CompletableFuture<R> future = CompletableFuture
17-
.allOf(list.toArray(new CompletableFuture[0]))
16+
var future = CompletableFuture.allOf(list.toArray(CompletableFuture[]::new))
1817
.thenApply(__ -> list.stream()
1918
.map(CompletableFuture::join)
2019
.collect(collector));
2120

22-
// CompletableFuture#allOf doesn't shortcircuit on exception so that requires manual handling
2321
for (CompletableFuture<T> f : list) {
2422
f.whenComplete((t, throwable) -> {
2523
if (throwable != null) {

src/test/java/com/pivovarit/collectors/ArchitectureTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class ArchitectureTest {
1616
@ArchTest
1717
static final ArchRule shouldHaveSingleFacade = classes()
1818
.that().arePublic()
19+
.and().areNotNestedClasses()
1920
.should().haveSimpleName("ParallelCollectors").orShould().haveSimpleName("Batching")
2021
.andShould().haveOnlyFinalFields()
2122
.andShould().haveOnlyPrivateConstructors()

0 commit comments

Comments
 (0)