Skip to content

Commit

Permalink
Use List<> instead of Stream.Builder<> as accumulators
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit committed Sep 18, 2021
1 parent 83215b0 commit 0340bbc
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 37 deletions.
20 changes: 11 additions & 9 deletions src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.pivovarit.collectors;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand All @@ -26,7 +28,7 @@
* @author Grzegorz Piwowarek
*/
final class AsyncParallelCollector<T, R, C>
implements Collector<T, Stream.Builder<CompletableFuture<R>>, CompletableFuture<C>> {
implements Collector<T, List<CompletableFuture<R>>, CompletableFuture<C>> {

private final Dispatcher<R> dispatcher;
private final Function<T, R> mapper;
Expand All @@ -42,19 +44,19 @@ private AsyncParallelCollector(
}

@Override
public Supplier<Stream.Builder<CompletableFuture<R>>> supplier() {
return Stream::builder;
public Supplier<List<CompletableFuture<R>>> supplier() {
return ArrayList::new;
}

@Override
public BinaryOperator<Stream.Builder<CompletableFuture<R>>> combiner() {
public BinaryOperator<List<CompletableFuture<R>>> combiner() {
return (left, right) -> {
throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
};
}

@Override
public BiConsumer<Stream.Builder<CompletableFuture<R>>, T> accumulator() {
public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
return (acc, e) -> {
if (!dispatcher.isRunning()) {
dispatcher.start();
Expand All @@ -64,11 +66,11 @@ public BiConsumer<Stream.Builder<CompletableFuture<R>>, T> accumulator() {
}

@Override
public Function<Stream.Builder<CompletableFuture<R>>, CompletableFuture<C>> finisher() {
public Function<List<CompletableFuture<R>>, CompletableFuture<C>> finisher() {
return futures -> {
dispatcher.stop();

return combine(futures.build()).thenApply(processor);
return combine(futures).thenApply(processor);
};
}

Expand All @@ -77,8 +79,8 @@ public Set<Characteristics> characteristics() {
return Collections.emptySet();
}

private static <T> CompletableFuture<Stream<T>> combine(Stream<CompletableFuture<T>> futures) {
CompletableFuture<T>[] futuresArray = (CompletableFuture<T>[]) futures.toArray(CompletableFuture[]::new);
private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T>> futures) {
CompletableFuture<T>[] futuresArray = (CompletableFuture<T>[]) futures.toArray(new CompletableFuture[0]);
CompletableFuture<Stream<T>> combined = allOf(futuresArray)
.thenApply(__ -> Arrays.stream(futuresArray).map(CompletableFuture::join));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pivovarit.collectors;

import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand All @@ -17,15 +18,10 @@ final class CompletionOrderSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<CompletableFuture<T>> completed = new LinkedBlockingQueue<>();
private int remaining;

CompletionOrderSpliterator(Stream<CompletableFuture<T>> futures) {
AtomicInteger size = new AtomicInteger();
futures.forEach(f -> {
f.whenComplete((__, ___) -> completed.add(f));
size.incrementAndGet();
});

this.initialSize = size.get();
CompletionOrderSpliterator(List<CompletableFuture<T>> futures) {
this.initialSize = futures.size();
this.remaining = initialSize;
futures.forEach(f -> f.whenComplete((__, ___) -> completed.add(f)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package com.pivovarit.collectors;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

interface CompletionStrategy<T> extends Function<Stream<CompletableFuture<T>>, Stream<T>> {
interface CompletionStrategy<T> extends Function<List<CompletableFuture<T>>, Stream<T>> {

static <R> CompletionStrategy<R> unordered() {
return futures -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false);
}

static <R> CompletionStrategy<R> ordered() {
return futures -> futures.map(CompletableFuture::join);
return futures -> futures.stream().map(CompletableFuture::join);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.pivovarit.collectors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand All @@ -26,7 +28,7 @@
/**
* @author Grzegorz Piwowarek
*/
class ParallelStreamCollector<T, R> implements Collector<T, Stream.Builder<CompletableFuture<R>>, Stream<R>> {
class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {

private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);

Expand All @@ -53,30 +55,30 @@ private void startConsuming() {
}

@Override
public Supplier<Stream.Builder<CompletableFuture<R>>> supplier() {
return Stream::builder;
public Supplier<List<CompletableFuture<R>>> supplier() {
return ArrayList::new;
}

@Override
public BiConsumer<Stream.Builder<CompletableFuture<R>>, T> accumulator() {
public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
return (acc, e) -> {
startConsuming();
acc.add(dispatcher.enqueue(() -> function.apply(e)));
};
}

@Override
public BinaryOperator<Stream.Builder<CompletableFuture<R>>> combiner() {
public BinaryOperator<List<CompletableFuture<R>>> combiner() {
return (left, right) -> {
throw new UnsupportedOperationException("Using parallel stream with parallel collectors is a bad idea");
};
}

@Override
public Function<Stream.Builder<CompletableFuture<R>>, Stream<R>> finisher() {
public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
return acc -> {
dispatcher.stop();
return completionStrategy.apply(acc.build());
return completionStrategy.apply(acc);
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
package com.pivovarit.collectors;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static java.time.Duration.ofMillis;
Expand All @@ -42,7 +36,7 @@ void shouldTraverseInCompletionOrder() {
f2.complete(1);
});
List<Integer> results = StreamSupport.stream(
new CompletionOrderSpliterator<>(futures.stream()), false)
new CompletionOrderSpliterator<>(futures), false)
.collect(Collectors.toList());

assertThat(results).containsExactly(3, 2, 1);
Expand All @@ -63,7 +57,7 @@ void shouldPropagateException() {
f2.complete(1);
});
assertThatThrownBy(() -> StreamSupport.stream(
new CompletionOrderSpliterator<>(futures.stream()), false)
new CompletionOrderSpliterator<>(futures), false)
.collect(Collectors.toList()))
.isInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(RuntimeException.class);
Expand All @@ -83,7 +77,7 @@ void shouldStreamInCompletionOrder() {
List<CompletableFuture<Integer>> futures = asList(new CompletableFuture<>(), CompletableFuture
.completedFuture(value));

Optional<Integer> result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures.stream()), false).findAny();
Optional<Integer> result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).findAny();

assertThat(result).contains(value);
}
Expand All @@ -92,7 +86,7 @@ void shouldStreamInCompletionOrder() {
void shouldNotConsumeOnEmpty() {
List<CompletableFuture<Integer>> futures = Collections.emptyList();

Spliterator<Integer> spliterator = new CompletionOrderSpliterator<>(futures.stream());
Spliterator<Integer> spliterator = new CompletionOrderSpliterator<>(futures);

ResultHolder<Integer> result = new ResultHolder<>();
boolean consumed = spliterator.tryAdvance(result);
Expand All @@ -104,7 +98,7 @@ void shouldNotConsumeOnEmpty() {
@Test
void shouldRestoreInterrupt() throws InterruptedException {
Thread executorThread = new Thread(() -> {
Spliterator<Integer> spliterator = new CompletionOrderSpliterator<>(Stream.of(new CompletableFuture<>()));
Spliterator<Integer> spliterator = new CompletionOrderSpliterator<>(Arrays.asList(new CompletableFuture<>()));
try {
spliterator.tryAdvance(i -> {});
} catch (Exception e) {
Expand Down

0 comments on commit 0340bbc

Please sign in to comment.