Skip to content

Commit

Permalink
Limit parallelism at thread-level instead of dispatcher-level (#784)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Oct 9, 2023
1 parent 04ce124 commit b6a029a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T

return parallelism == 1
? asyncCollector(mapper, executor, i -> i)
: new AsyncParallelCollector<>(mapper, Dispatcher.of(executor, parallelism), t -> t);
: new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), Function.identity());
}

static <T, R, RR> Collector<T, ?, CompletableFuture<RR>> collectingWithCollector(Collector<R, ?, RR> collector, Function<T, R> mapper, Executor executor, int parallelism) {
Expand All @@ -110,7 +110,7 @@ private static <T> CompletableFuture<Stream<T>> combine(List<CompletableFuture<T

return parallelism == 1
? asyncCollector(mapper, executor, s -> s.collect(collector))
: new AsyncParallelCollector<>(mapper, Dispatcher.of(executor, parallelism), s -> s.collect(collector));
: new AsyncParallelCollector<>(mapper, Dispatcher.from(executor, parallelism), s -> s.collect(collector));
}

static void requireValidParallelism(int parallelism) {
Expand Down Expand Up @@ -166,13 +166,13 @@ private BatchingCollectors() {
return list.stream()
.collect(new AsyncParallelCollector<>(
mapper,
Dispatcher.of(executor, parallelism),
Dispatcher.from(executor, parallelism),
finisher));
} else {
return partitioned(list, parallelism)
.collect(new AsyncParallelCollector<>(
batching(mapper),
Dispatcher.of(executor, parallelism),
Dispatcher.from(executor, parallelism),
listStream -> finisher.apply(listStream.flatMap(Collection::stream))));
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pivovarit.collectors;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Spliterator;
import java.util.function.Consumer;
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/com/pivovarit/collectors/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,32 @@ private Dispatcher(Executor executor, int permits) {
this.limiter = new Semaphore(permits);
}

static <T> Dispatcher<T> of(Executor executor, int permits) {
static <T> Dispatcher<T> from(Executor executor, int permits) {
return new Dispatcher<>(executor, permits);
}

static <T> Dispatcher<T> virtual(int permits) {
return new Dispatcher<>(permits);
}

void start() {
if (!started.getAndSet(true)) {
dispatcher.execute(() -> {
try {
while (true) {
Runnable task;
if ((task = workingQueue.take()) != POISON_PILL) {
limiter.acquire();
executor.execute(withFinally(task, limiter::release));
executor.execute(() -> {
try {
limiter.acquire();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
limiter.release();
}
});
} else {
break;
}
Expand Down Expand Up @@ -117,20 +130,10 @@ private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFu
};
}

private static Runnable withFinally(Runnable task, Runnable finisher) {
return () -> {
try {
task.run();
} finally {
finisher.run();
}
};
}

private static ThreadPoolExecutor newLazySingleThreadExecutor() {
return new ThreadPoolExecutor(0, 1,
return new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new SynchronousQueue<>(), // dispatcher always executes a single task
Thread.ofPlatform()
.name("parallel-collectors-dispatcher-", 0)
.daemon(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public Set<Characteristics> characteristics() {
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.of(executor, parallelism));
return new ParallelStreamCollector<>(mapper, unordered(), UNORDERED, Dispatcher.from(executor, parallelism));
}

static <T, R> Collector<T, ?, Stream<R>> streamingOrdered(Function<T, R> mapper, Executor executor,
Expand All @@ -98,7 +98,7 @@ public Set<Characteristics> characteristics() {
requireNonNull(mapper, "mapper can't be null");
requireValidParallelism(parallelism);

return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.of(executor, parallelism));
return new ParallelStreamCollector<>(mapper, ordered(), emptySet(), Dispatcher.from(executor, parallelism));
}

static final class BatchingCollectors {
Expand Down Expand Up @@ -140,15 +140,15 @@ private BatchingCollectors() {
mapper,
ordered(),
emptySet(),
Dispatcher.of(executor, parallelism)));
Dispatcher.from(executor, parallelism)));
}
else {
return partitioned(list, parallelism)
.collect(collectingAndThen(new ParallelStreamCollector<>(
batching(mapper),
ordered(),
emptySet(),
Dispatcher.of(executor, parallelism)),
Dispatcher.from(executor, parallelism)),
s -> s.flatMap(Collection::stream)));
}
});
Expand Down

0 comments on commit b6a029a

Please sign in to comment.