Skip to content

Commit

Permalink
Simplify fold-left-futures (#208)
Browse files Browse the repository at this point in the history
* Simplify fold-left-futures

* Simplify error handling
  • Loading branch information
pivovarit authored Mar 25, 2019
1 parent 75d5396 commit c6f2d59
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.util.Collections.synchronizedList;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toCollection;

Expand Down Expand Up @@ -42,24 +44,9 @@ static <T1> T1 supplyWithResources(Supplier<T1> supplier, Runnable action) {
}

static <R, C extends Collection<R>> Function<List<CompletableFuture<R>>, CompletableFuture<C>> foldLeftFutures(Supplier<C> collectionFactory) {
return futures -> futures.stream()
.reduce(completedFuture(synchronizedList(new ArrayList<>())),
accumulatingResults(),
mergingPartialResults())
.thenApply(list -> list.stream().collect(toCollection(collectionFactory)));
}

static <T1, R1 extends Collection<T1>> BinaryOperator<CompletableFuture<R1>> mergingPartialResults() {
return (f1, f2) -> f1.thenCombine(f2, (left, right) -> {
left.addAll(right);
return left;
});
}

static <T1, R1 extends Collection<T1>> BiFunction<CompletableFuture<R1>, CompletableFuture<T1>, CompletableFuture<R1>> accumulatingResults() {
return (list, object) -> list.thenCombine(object, (left, right) -> {
left.add(right);
return left;
});
return futures -> allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(__ -> futures.stream()
.map(CompletableFuture::join)
.collect(toCollection(collectionFactory)));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pivovarit.collectors;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand All @@ -14,11 +13,11 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
import static java.util.Collections.synchronizedList;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toCollection;

/**
* @author Grzegorz Piwowarek
Expand Down Expand Up @@ -84,13 +83,11 @@ public void close() {
}

private static <R, C extends Collection<R>> Function<List<CompletableFuture<Entry<Integer, R>>>, CompletableFuture<C>> foldLeftFuturesOrdered(Supplier<C> collectionFactory) {
return futures -> futures.stream()
.reduce(completedFuture(synchronizedList(new ArrayList<>())),
accumulatingResults(),
mergingPartialResults())
.thenApply(list -> list.stream()
return futures -> allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(__ -> futures.stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparing(Entry::getKey))
.map(Entry::getValue)
.collect(Collectors.toCollection(collectionFactory)));
.collect(toCollection(collectionFactory)));
}
}
9 changes: 4 additions & 5 deletions src/main/java/com/pivovarit/collectors/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,18 @@ CompletableFuture<T> enqueue(Supplier<T> supplier) {
future.complete(supplier.get());
pending.remove(future);
} catch (Exception e) {
handle(future, e);
handle(e);
} catch (Throwable e) {
handle(future, e);
handle(e);
throw e;
}
});
return future;
}

private void handle(CompletableFuture<T> future, Throwable e) {
private void handle(Throwable e) {
failed = true;
future.completeExceptionally(e);
pending.forEach(f -> f.obtrudeException(e));
pending.forEach(f -> f.completeExceptionally(e));
}

void run(Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private static <R extends Collection<Integer>> DynamicTest shouldShortCircuitOnE
.isInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(IllegalArgumentException.class);

assertThat(counter.longValue()).isLessThanOrEqualTo(size);
assertThat(counter.longValue()).isLessThanOrEqualTo(size + 1);
}, size);
});
}
Expand Down

0 comments on commit c6f2d59

Please sign in to comment.