Skip to content

Commit

Permalink
Replace Dispatcher's internal ExecutorService with an inline virtual …
Browse files Browse the repository at this point in the history
…thread (#877)
  • Loading branch information
pivovarit authored May 2, 2024
1 parent 58b17c3 commit 9676de1
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 7 deletions.
7 changes: 1 addition & 6 deletions src/main/java/com/pivovarit/collectors/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ final class Dispatcher<T> {
private static final Runnable POISON_PILL = () -> System.out.println("Why so serious?");

private final CompletableFuture<Void> completionSignaller = new CompletableFuture<>();

private final BlockingQueue<Runnable> workingQueue = new LinkedBlockingQueue<>();

private final ExecutorService dispatcher = Executors.newVirtualThreadPerTaskExecutor();
private final Executor executor;
private final Semaphore limiter;

Expand Down Expand Up @@ -53,7 +51,7 @@ static <T> Dispatcher<T> virtual() {

void start() {
if (!started.getAndSet(true)) {
dispatcher.execute(() -> {
Thread.ofVirtual().start(() -> {
try {
while (true) {
try {
Expand Down Expand Up @@ -90,8 +88,6 @@ void stop() {
workingQueue.put(POISON_PILL);
} catch (InterruptedException e) {
completionSignaller.completeExceptionally(e);
} finally {
dispatcher.shutdown();
}
}

Expand Down Expand Up @@ -123,7 +119,6 @@ private FutureTask<Void> completionTask(Supplier<T> supplier, InterruptibleCompl
private void handle(Throwable e) {
shortCircuited = true;
completionSignaller.completeExceptionally(e);
dispatcher.shutdownNow();
}

private static Function<Throwable, Void> shortcircuit(InterruptibleCompletableFuture<?> future) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class FutureCollectors {
.collect(collector));

for (var f : list) {
f.whenComplete((t, throwable) -> {
f.whenComplete((__, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
}
Expand Down

0 comments on commit 9676de1

Please sign in to comment.