Skip to content

Commit

Permalink
[fix][broker] Execute the pending callbacks in order before ready for…
Browse files Browse the repository at this point in the history
… incoming requests (apache#23266)
  • Loading branch information
BewareMyPower authored and michalcukierman committed Sep 11, 2024
1 parent 6b8cdcd commit f874c9c
Showing 1 changed file with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final DefaultMonotonicSnapshotClock monotonicSnapshotClock;
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -1023,7 +1024,13 @@ public void start() throws PulsarServerException {
this.metricsGenerator = new MetricsGenerator(this);

// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
readyForIncomingRequestsFuture.complete(null);
final List<Runnable> runnables;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
runnables = new ArrayList<>(pendingTasksBeforeReadyForIncomingRequests);
pendingTasksBeforeReadyForIncomingRequests.clear();
readyForIncomingRequestsFuture.complete(null);
}
runnables.forEach(Runnable::run);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down Expand Up @@ -1082,7 +1089,21 @@ public void start() throws PulsarServerException {
}

public void runWhenReadyForIncomingRequests(Runnable runnable) {
readyForIncomingRequestsFuture.thenRun(runnable);
// Here we don't call the thenRun() methods because CompletableFuture maintains a stack for pending callbacks,
// not a queue. Once the future is complete, the pending callbacks will be executed in reverse order of
// when they were added.
final boolean addedToPendingTasks;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
if (readyForIncomingRequestsFuture.isDone()) {
addedToPendingTasks = false;
} else {
pendingTasksBeforeReadyForIncomingRequests.add(runnable);
addedToPendingTasks = true;
}
}
if (!addedToPendingTasks) {
runnable.run();
}
}

public void waitUntilReadyForIncomingRequests() throws ExecutionException, InterruptedException {
Expand Down

0 comments on commit f874c9c

Please sign in to comment.