From 93baa8a6a35d25d396b69ac43e698f24743be8f1 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Thu, 27 Feb 2020 13:51:03 +0100 Subject: [PATCH 1/8] Reproduce the issue --- .../collectors/AsyncParallelCollector.java | 2 +- .../collectors/blackbox/CountingExecutor.java | 25 +++++++++++++++++ .../collectors/blackbox/FunctionalTest.java | 27 ++++++++++++++++++- 3 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index 08fc2ef9..65d37c04 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -135,7 +135,7 @@ static void requireValidParallelism(int parallelism) { } static Collector> asyncCollector(Function mapper, Executor executor, Function, RR> finisher) { - return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper)), executor)); + return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper).collect(toList()).stream()), executor)); } static final class BatchingCollectors { diff --git a/src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java b/src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java new file mode 100644 index 00000000..7e872924 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/blackbox/CountingExecutor.java @@ -0,0 +1,25 @@ +package com.pivovarit.collectors.blackbox; + +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +class CountingExecutor implements Executor { + + private final AtomicInteger counter = new AtomicInteger(); + + private final Executor delegate; + + public CountingExecutor(Executor delegate) { + this.delegate = delegate; + } + + @Override + public void execute(Runnable command) { + counter.incrementAndGet(); + delegate.execute(command); + } + + public Integer getInvocations() { + return counter.get(); + } +} diff --git a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java index 9903075f..565aff01 100644 --- a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java @@ -1,12 +1,14 @@ -package com.pivovarit.collectors.test; +package com.pivovarit.collectors.blackbox; import com.pivovarit.collectors.ParallelCollectors.Batching; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; import java.time.Duration; import java.time.LocalTime; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.LinkedList; @@ -47,6 +49,7 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.of; +import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; @@ -98,6 +101,28 @@ void shouldCollectInCompletionOrder() { assertThat(result).isSorted(); } + @Test + void shouldExecuteEagerlyOnProvidedThreadPool() { + ExecutorService executor = Executors.newFixedThreadPool(2); + CountingExecutor countingExecutor = new CountingExecutor(executor); + AtomicInteger executions = new AtomicInteger(); + try { + List list = Arrays.asList("A", "B"); + + list.stream() + .collect(parallel(s -> { + executions.incrementAndGet(); + return s; + }, countingExecutor, 1)) + .join(); + } finally { + executor.shutdown(); + } + + assertThat(countingExecutor.getInvocations()).isEqualTo(2); + assertThat(executions.get()).isEqualTo(2); + } + private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { return of( shouldCollect(collector, name, 1), From 85708cc0101eaa94692ac6c9e3c907fd89ed3ba9 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Thu, 27 Feb 2020 14:15:40 +0100 Subject: [PATCH 2/8] Fix issues --- .../collectors/AsyncParallelCollector.java | 2 ++ .../collectors/blackbox/FunctionalTest.java | 25 +++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index 65d37c04..e2fadf5c 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -12,6 +12,7 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.pivovarit.collectors.BatchingStream.batching; @@ -21,6 +22,7 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.stream.Collectors.*; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; diff --git a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java index 565aff01..5dfdfaf2 100644 --- a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java @@ -1,7 +1,6 @@ package com.pivovarit.collectors.blackbox; import com.pivovarit.collectors.ParallelCollectors.Batching; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; @@ -49,7 +48,6 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.of; -import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; @@ -79,12 +77,18 @@ Stream collectors() { @TestFactory Stream batching_collectors() { return of( - batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true), - batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false), - batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true), - batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true), - batchTests((m, e, p) -> adaptAsync(Batching.parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false), - batchTests((m, e, p) -> adaptAsync(Batching.parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true) + batchTests((m, e, p) -> Batching + .parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true), + batchTests((m, e, p) -> Batching + .parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false), + batchTests((m, e, p) -> Batching + .parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true), + batchTests((m, e, p) -> adapt(Batching + .parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true), + batchTests((m, e, p) -> adaptAsync(Batching + .parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false), + batchTests((m, e, p) -> adaptAsync(Batching + .parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true) ).flatMap(identity()); } @@ -109,8 +113,9 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { try { List list = Arrays.asList("A", "B"); - list.stream() + Stream stream = list.stream() .collect(parallel(s -> { + System.out.println("Running on " + Thread.currentThread().getName()); executions.incrementAndGet(); return s; }, countingExecutor, 1)) @@ -119,7 +124,7 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { executor.shutdown(); } - assertThat(countingExecutor.getInvocations()).isEqualTo(2); + assertThat(countingExecutor.getInvocations()).isEqualTo(1); assertThat(executions.get()).isEqualTo(2); } From 7f0bfaea93e2db327d906369252787d6260723bb Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Thu, 27 Feb 2020 15:09:25 +0100 Subject: [PATCH 3/8] Fix imports --- .../java/com/pivovarit/collectors/AsyncParallelCollector.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index e2fadf5c..65d37c04 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -12,7 +12,6 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; -import java.util.stream.Collectors; import java.util.stream.Stream; import static com.pivovarit.collectors.BatchingStream.batching; @@ -22,7 +21,6 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.supplyAsync; -import static java.util.stream.Collectors.*; import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.toList; From 909c5bf6b45c5142c4cbceb7880d59281e684cf1 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Thu, 27 Feb 2020 21:37:25 +0100 Subject: [PATCH 4/8] Add javadoc notes --- .../com/pivovarit/collectors/ParallelCollectors.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java index 052b0a53..1ad037c0 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java +++ b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java @@ -138,6 +138,8 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results in completion order * + * For the parallelism of 1, it gets executed by the calling thread. + * *

* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} * @@ -169,6 +171,8 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive. * + * For the parallelism of 1, it gets executed by the calling thread. + * *
* Example: *
{@code
@@ -195,6 +199,8 @@ private ParallelCollectors() {
      * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
      * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
      *
+     * For the parallelism of 1, it gets executed by the calling thread.
+     *
      * 

* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} * @@ -225,6 +231,8 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. * + * For the parallelism of 1, it gets executed by the calling thread. + * *
* Example: *
{@code
@@ -350,6 +358,8 @@ private Batching() {
          * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
          * and returning a {@link Stream} instance returning results as they arrive.
          *
+         * For the parallelism of 1, it gets executed by the calling thread.
+         *
          * 
* Example: *
{@code
@@ -376,6 +386,8 @@ private Batching() {
          * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
          * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order.
          *
+         * For the parallelism of 1, it gets executed by the calling thread.
+         *
          * 
* Example: *
{@code

From 25465826dda86914039d03e590eefe11b54e6ad7 Mon Sep 17 00:00:00 2001
From: Grzegorz Piwowarek 
Date: Thu, 27 Feb 2020 21:38:10 +0100
Subject: [PATCH 5/8] Fix formatting

---
 .../collectors/blackbox/FunctionalTest.java    | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java
index 5dfdfaf2..53f14fb8 100644
--- a/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java
+++ b/src/test/java/com/pivovarit/collectors/blackbox/FunctionalTest.java
@@ -77,18 +77,12 @@ Stream collectors() {
     @TestFactory
     Stream batching_collectors() {
         return of(
-          batchTests((m, e, p) -> Batching
-            .parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true),
-          batchTests((m, e, p) -> Batching
-            .parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false),
-          batchTests((m, e, p) -> Batching
-            .parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true),
-          batchTests((m, e, p) -> adapt(Batching
-            .parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true),
-          batchTests((m, e, p) -> adaptAsync(Batching
-            .parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false),
-          batchTests((m, e, p) -> adaptAsync(Batching
-            .parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true)
+          batchTests((m, e, p) -> Batching.parallel(m, toList(), e, p), format("ParallelCollectors.Batching.parallel(toList(), p=%d)", PARALLELISM), true),
+          batchTests((m, e, p) -> Batching.parallel(m, toSet(), e, p), format("ParallelCollectors.Batching.parallel(toSet(), p=%d)", PARALLELISM), false),
+          batchTests((m, e, p) -> Batching.parallel(m, toCollection(LinkedList::new), e, p), format("ParallelCollectors.Batching.parallel(toCollection(), p=%d)", PARALLELISM), true),
+          batchTests((m, e, p) -> adapt(Batching.parallel(m, e, p)), format("ParallelCollectors.Batching.parallel(p=%d)", PARALLELISM), true),
+          batchTests((m, e, p) -> adaptAsync(Batching.parallelToStream(m, e, p)), format("ParallelCollectors.Batching.parallelToStream(p=%d)", PARALLELISM), false),
+          batchTests((m, e, p) -> adaptAsync(Batching.parallelToOrderedStream(m, e, p)), format("ParallelCollectors.Batching.parallelToOrderedStream(p=%d)", PARALLELISM), true)
         ).flatMap(identity());
     }
 

From 7577248c28b918aa9ab9954cbaf335f3ee3ba419 Mon Sep 17 00:00:00 2001
From: Grzegorz Piwowarek 
Date: Thu, 27 Feb 2020 21:41:53 +0100
Subject: [PATCH 6/8] Use a simple loop

---
 .../com/pivovarit/collectors/AsyncParallelCollector.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
index 65d37c04..f53035e2 100644
--- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
+++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
@@ -135,7 +135,14 @@ static void requireValidParallelism(int parallelism) {
     }
 
     static  Collector> asyncCollector(Function mapper, Executor executor, Function, RR> finisher) {
-        return collectingAndThen(toList(), list -> supplyAsync(() -> finisher.apply(list.stream().map(mapper).collect(toList()).stream()), executor));
+        return collectingAndThen(toList(), list -> supplyAsync(() -> {
+            List acc = new ArrayList<>(list.size());
+            for (T t : list) {
+                R r = mapper.apply(t);
+                acc.add(r);
+            }
+            return finisher.apply(acc.stream());
+        }, executor));
     }
 
     static final class BatchingCollectors {

From ff57825b05383410ff509341534a0e0c7a8b029a Mon Sep 17 00:00:00 2001
From: Grzegorz Piwowarek 
Date: Thu, 27 Feb 2020 21:44:40 +0100
Subject: [PATCH 7/8] Inline redundant var

---
 .../java/com/pivovarit/collectors/AsyncParallelCollector.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
index f53035e2..794137d4 100644
--- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
+++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java
@@ -138,8 +138,7 @@ static void requireValidParallelism(int parallelism) {
         return collectingAndThen(toList(), list -> supplyAsync(() -> {
             List acc = new ArrayList<>(list.size());
             for (T t : list) {
-                R r = mapper.apply(t);
-                acc.add(r);
+                acc.add(mapper.apply(t));
             }
             return finisher.apply(acc.stream());
         }, executor));

From f1d8e8a9a6aabf628f7a7327a831b476cada72ca Mon Sep 17 00:00:00 2001
From: Grzegorz Piwowarek 
Date: Thu, 27 Feb 2020 21:46:11 +0100
Subject: [PATCH 8/8] Javadoc fixes

---
 .../com/pivovarit/collectors/ParallelCollectors.java | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java
index 1ad037c0..52968bdd 100644
--- a/src/main/java/com/pivovarit/collectors/ParallelCollectors.java
+++ b/src/main/java/com/pivovarit/collectors/ParallelCollectors.java
@@ -138,7 +138,7 @@ private ParallelCollectors() {
      * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor}
      * and returning a {@link Stream} instance returning results in completion order
      *
-     * For the parallelism of 1, it gets executed by the calling thread.
+     * For the parallelism of 1, the stream is executed by the calling thread.
      *
      * 

* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} @@ -171,7 +171,7 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive. * - * For the parallelism of 1, it gets executed by the calling thread. + * For the parallelism of 1, the stream is executed by the calling thread. * *
* Example: @@ -199,7 +199,7 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. * - * For the parallelism of 1, it gets executed by the calling thread. + * For the parallelism of 1, the stream is executed by the calling thread. * *

* The max parallelism level defaults to {@code Runtime.availableProcessors() - 1} @@ -231,7 +231,7 @@ private ParallelCollectors() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. * - * For the parallelism of 1, it gets executed by the calling thread. + * For the parallelism of 1, the stream is executed by the calling thread. * *
* Example: @@ -358,7 +358,7 @@ private Batching() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive. * - * For the parallelism of 1, it gets executed by the calling thread. + * For the parallelism of 1, the stream is executed by the calling thread. * *
* Example: @@ -386,7 +386,7 @@ private Batching() { * A convenience {@link Collector} used for executing parallel computations on a custom {@link Executor} * and returning a {@link Stream} instance returning results as they arrive while maintaining the initial order. * - * For the parallelism of 1, it gets executed by the calling thread. + * For the parallelism of 1, the stream is executed by the calling thread. * *
* Example: