Skip to content

Commit

Permalink
Remove unwanted lazy semantics when parallel==1 (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Feb 27, 2020
1 parent 21f73b8 commit d623578
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ static void requireValidParallelism(int parallelism) {
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> asyncCollector(Function<T, R> mapper, Executor executor, Function<Stream<R>, RR> finisher) {
return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper)), executor));
return collectingAndThen(toList(), list -> supplyAsync(() -> {
List<R> acc = new ArrayList<>(list.size());
for (T t : list) {
acc.add(mapper.apply(t));
}
return finisher.apply(acc.stream());
}, executor));
}

static final class BatchingCollectors {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/pivovarit/collectors/ParallelCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ private ParallelCollectors() {
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results in completion order
*
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br><br>
* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1}
*
Expand Down Expand Up @@ -169,6 +171,8 @@ private ParallelCollectors() {
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive.
*
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
Expand All @@ -195,6 +199,8 @@ private ParallelCollectors() {
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
*
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br><br>
* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1}
*
Expand Down Expand Up @@ -225,6 +231,8 @@ private ParallelCollectors() {
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
*
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
Expand Down Expand Up @@ -350,6 +358,8 @@ private Batching() {
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive.
*
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
Expand All @@ -376,6 +386,8 @@ private Batching() {
* A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
* and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
*
* For the parallelism of 1, the stream is executed by the calling thread.
*
* <br>
* Example:
* <pre>{@code
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.pivovarit.collectors.blackbox;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

class CountingExecutor implements Executor {

private final AtomicInteger counter = new AtomicInteger();

private final Executor delegate;

public CountingExecutor(Executor delegate) {
this.delegate = delegate;
}

@Override
public void execute(Runnable command) {
counter.incrementAndGet();
delegate.execute(command);
}

public Integer getInvocations() {
return counter.get();
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.pivovarit.collectors.test;
package com.pivovarit.collectors.blackbox;

import com.pivovarit.collectors.ParallelCollectors.Batching;
import org.junit.jupiter.api.DynamicTest;
Expand All @@ -7,6 +7,7 @@

import java.time.Duration;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -98,6 +99,29 @@ void shouldCollectInCompletionOrder() {
assertThat(result).isSorted();
}

@Test
void shouldExecuteEagerlyOnProvidedThreadPool() {
ExecutorService executor = Executors.newFixedThreadPool(2);
CountingExecutor countingExecutor = new CountingExecutor(executor);
AtomicInteger executions = new AtomicInteger();
try {
List<String> list = Arrays.asList("A", "B");

Stream<String> stream = list.stream()
.collect(parallel(s -> {
System.out.println("Running on " + Thread.currentThread().getName());
executions.incrementAndGet();
return s;
}, countingExecutor, 1))
.join();
} finally {
executor.shutdown();
}

assertThat(countingExecutor.getInvocations()).isEqualTo(1);
assertThat(executions.get()).isEqualTo(2);
}

private static <R extends Collection<Integer>> Stream<DynamicTest> tests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name, boolean maintainsOrder) {
return of(
shouldCollect(collector, name, 1),
Expand Down

0 comments on commit d623578

Please sign in to comment.