From b6a029a9ccbf03b546d98259fddad50d4d380455 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Mon, 9 Oct 2023 13:34:58 +0200 Subject: [PATCH] Limit parallelism at thread-level instead of dispatcher-level (#784) --- .../collectors/AsyncParallelCollector.java | 8 ++--- .../collectors/BatchingSpliterator.java | 1 - .../com/pivovarit/collectors/Dispatcher.java | 33 ++++++++++--------- .../collectors/ParallelStreamCollector.java | 8 ++--- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index 1937f73e..7411ab6d 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -99,7 +99,7 @@ private static CompletableFuture> combine(List i) - : new AsyncParallelCollector<>(mapper, Dispatcher.of(executor, parallelism), t -> t); + : new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), Function.identity()); } static Collector> collectingWithCollector(Collector collector, Function mapper, Executor executor, int parallelism) { @@ -110,7 +110,7 @@ private static CompletableFuture> combine(List s.collect(collector)) - : new AsyncParallelCollector<>(mapper, Dispatcher.of(executor, parallelism), s -> s.collect(collector)); + : new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), s -> s.collect(collector)); } static void requireValidParallelism(int parallelism) { @@ -166,13 +166,13 @@ private BatchingCollectors() { return list.stream() .collect(new AsyncParallelCollector<>( mapper, - Dispatcher.of(executor, parallelism), + Dispatcher.from(executor, parallelism), finisher)); } else { return partitioned(list, parallelism) .collect(new AsyncParallelCollector<>( batching(mapper), - Dispatcher.of(executor, parallelism), + Dispatcher.from(executor, parallelism), listStream -> finisher.apply(listStream.flatMap(Collection::stream)))); } }); diff --git a/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java b/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java index 7239f613..8910795c 100644 --- a/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java +++ b/src/main/java/com/pivovarit/collectors/BatchingSpliterator.java @@ -1,7 +1,6 @@ package com.pivovarit.collectors; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Spliterator; import java.util.function.Consumer; diff --git a/src/main/java/com/pivovarit/collectors/Dispatcher.java b/src/main/java/com/pivovarit/collectors/Dispatcher.java index 206d9163..6aaa0e43 100644 --- a/src/main/java/com/pivovarit/collectors/Dispatcher.java +++ b/src/main/java/com/pivovarit/collectors/Dispatcher.java @@ -44,10 +44,14 @@ private Dispatcher(Executor executor, int permits) { this.limiter = new Semaphore(permits); } - static Dispatcher of(Executor executor, int permits) { + static Dispatcher from(Executor executor, int permits) { return new Dispatcher<>(executor, permits); } + static Dispatcher virtual(int permits) { + return new Dispatcher<>(permits); + } + void start() { if (!started.getAndSet(true)) { dispatcher.execute(() -> { @@ -55,8 +59,17 @@ void start() { while (true) { Runnable task; if ((task = workingQueue.take()) != POISON_PILL) { - limiter.acquire(); - executor.execute(withFinally(task, limiter::release)); + executor.execute(() -> { + try { + limiter.acquire(); + task.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + limiter.release(); + } + }); } else { break; } @@ -117,20 +130,10 @@ private static Function shortcircuit(InterruptibleCompletableFu }; } - private static Runnable withFinally(Runnable task, Runnable finisher) { - return () -> { - try { - task.run(); - } finally { - finisher.run(); - } - }; - } - private static ThreadPoolExecutor newLazySingleThreadExecutor() { - return new ThreadPoolExecutor(0, 1, + return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), + new SynchronousQueue<>(), // dispatcher always executes a single task Thread.ofPlatform() .name("parallel-collectors-dispatcher-", 0) .daemon(false) diff --git a/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java b/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java index e0400713..24d99cab 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java +++ b/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java @@ -89,7 +89,7 @@ public Set characteristics() { requireNonNull(mapper, "mapper can't be null"); requireValidParallelism(parallelism); - return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.of(executor, parallelism)); + return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.from(executor, parallelism)); } static Collector> streamingOrdered(Function mapper, Executor executor, @@ -98,7 +98,7 @@ public Set characteristics() { requireNonNull(mapper, "mapper can't be null"); requireValidParallelism(parallelism); - return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.of(executor, parallelism)); + return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.from(executor, parallelism)); } static final class BatchingCollectors { @@ -140,7 +140,7 @@ private BatchingCollectors() { mapper, ordered(), emptySet(), - Dispatcher.of(executor, parallelism))); + Dispatcher.from(executor, parallelism))); } else { return partitioned(list, parallelism) @@ -148,7 +148,7 @@ private BatchingCollectors() { batching(mapper), ordered(), emptySet(), - Dispatcher.of(executor, parallelism)), + Dispatcher.from(executor, parallelism)), s -> s.flatMap(Collection::stream))); } });