Skip to content

Commit a4d0330

Browse files
authored
Ensure that Dispatcher can't be started more than one time (#632)
Just in case someone tries to call `.parallel()` on a _Stream_ instance returned by parallel collectors.
1 parent e4e348a commit a4d0330

File tree

1 file changed

+19
-16
lines changed

1 file changed

+19
-16
lines changed

src/main/java/com/pivovarit/collectors/Dispatcher.java

+19-16
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.SynchronousQueue;
1212
import java.util.concurrent.ThreadPoolExecutor;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicBoolean;
1415
import java.util.function.Function;
1516
import java.util.function.Supplier;
1617

@@ -31,7 +32,8 @@ final class Dispatcher<T> {
3132
private final Executor executor;
3233
private final Semaphore limiter;
3334

34-
private volatile boolean started = false;
35+
private final AtomicBoolean started = new AtomicBoolean(false);
36+
3537
private volatile boolean shortCircuited = false;
3638

3739
private Dispatcher(Executor executor, int permits) {
@@ -44,22 +46,23 @@ static <T> Dispatcher<T> of(Executor executor, int permits) {
4446
}
4547

4648
void start() {
47-
started = true;
48-
dispatcher.execute(() -> {
49-
try {
50-
while (true) {
51-
Runnable task;
52-
if ((task = workingQueue.take()) != POISON_PILL) {
53-
limiter.acquire();
54-
executor.execute(withFinally(task, limiter::release));
55-
} else {
56-
break;
49+
if (!started.getAndSet(true)) {
50+
dispatcher.execute(() -> {
51+
try {
52+
while (true) {
53+
Runnable task;
54+
if ((task = workingQueue.take()) != POISON_PILL) {
55+
limiter.acquire();
56+
executor.execute(withFinally(task, limiter::release));
57+
} else {
58+
break;
59+
}
5760
}
61+
} catch (Throwable e) {
62+
handle(e);
5863
}
59-
} catch (Throwable e) {
60-
handle(e);
61-
}
62-
});
64+
});
65+
}
6366
}
6467

6568
void stop() {
@@ -73,7 +76,7 @@ void stop() {
7376
}
7477

7578
boolean isRunning() {
76-
return started;
79+
return started.get();
7780
}
7881

7982
CompletableFuture<T> enqueue(Supplier<T> supplier) {

0 commit comments

Comments
 (0)