From d6235782214f6485056fabc9a7e4b4ac85f13ff3 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Thu, 27 Feb 2020 21:47:59 +0100 Subject: [PATCH] Remove unwanted lazy semantics when parallel==1 (#462) --- .../collectors/AsyncParallelCollector.java | 8 +++++- .../collectors/ParallelCollectors.java | 12 +++++++++ .../collectors/blackbox/CountingExecutor.java | 25 ++++++++++++++++++ .../collectors/blackbox/FunctionalTest.java | 26 ++++++++++++++++++- 4 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index 08fc2ef9..794137d4 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -135,7 +135,13 @@ static void requireValidParallelism(int parallelism) { } static Collector> asyncCollector(Function mapper, Executor executor, Function, RR> finisher) { - return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper)), executor)); + return collectingAndThen(toList(), list -> supplyAsync(() -> { + List acc = new ArrayList<>(list.size()); + for (T t : list) { + acc.add(mapper.apply(t)); + } + return finisher.apply(acc.stream()); + }, executor)); } static final class BatchingCollectors { diff --git a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java index 052b0a53..52968bdd 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java +++ b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java @@ -138,6 +138,8 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results in completion order * + * For the parallelism of 1, the stream is executed by the calling thread. + * *

* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} * @@ -169,6 +171,8 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive. * + * For the parallelism of 1, the stream is executed by the calling thread. + * *
* Example: *
{@code
@@ -195,6 +199,8 @@ private ParallelCollectors() {
      * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
      * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
      *
+     * For the parallelism of 1, the stream is executed by the calling thread.
+     *
      * 

* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} * @@ -225,6 +231,8 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. * + * For the parallelism of 1, the stream is executed by the calling thread. + * *
* Example: *
{@code
@@ -350,6 +358,8 @@ private Batching() {
          * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
          * and returning a {@link Stream} instance returning results as they arrive.
          *
+         * For the parallelism of 1, the stream is executed by the calling thread.
+         *
          * 
* Example: *
{@code
@@ -376,6 +386,8 @@ private Batching() {
          * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
          * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
          *
+         * For the parallelism of 1, the stream is executed by the calling thread.
+         *
          * 
* Example: *
{@code
diff --git a/src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java b/src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java
new file mode 100644
index 00000000..7e872924
--- /dev/null
+++ b/src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java
@@ -0,0 +1,25 @@
+package com.pivovarit.collectors.blackbox;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class CountingExecutor implements Executor {
+
+    private final AtomicInteger counter = new AtomicInteger();
+
+    private final Executor delegate;
+
+    public CountingExecutor(Executor delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        counter.incrementAndGet();
+        delegate.execute(command);
+    }
+
+    public Integer getInvocations() {
+        return counter.get();
+    }
+}
diff --git a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java
index 9903075f..53f14fb8 100644
--- a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java
+++ b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java
@@ -1,4 +1,4 @@
-package com.pivovarit.collectors.test;
+package com.pivovarit.collectors.blackbox;
 
 import com.pivovarit.collectors.ParallelCollectors.Batching;
 import org.junit.jupiter.api.DynamicTest;
@@ -7,6 +7,7 @@
 
 import java.time.Duration;
 import java.time.LocalTime;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -98,6 +99,29 @@ void shouldCollectInCompletionOrder() {
         assertThat(result).isSorted();
     }
 
+    @Test
+    void shouldExecuteEagerlyOnProvidedThreadPool() {
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        CountingExecutor countingExecutor = new CountingExecutor(executor);
+        AtomicInteger executions = new AtomicInteger();
+        try {
+            List list = Arrays.asList("A", "B");
+
+            Stream stream = list.stream()
+              .collect(parallel(s -> {
+                  System.out.println("Running on " + Thread.currentThread().getName());
+                  executions.incrementAndGet();
+                  return s;
+              }, countingExecutor, 1))
+              .join();
+        } finally {
+            executor.shutdown();
+        }
+
+        assertThat(countingExecutor.getInvocations()).isEqualTo(1);
+        assertThat(executions.get()).isEqualTo(2);
+    }
+
     private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) {
         return of(
           shouldCollect(collector, name, 1),