Skip to content

Commit

Permalink
[improve][broker] Add retry for start service unit state channel (Ext…
Browse files Browse the repository at this point in the history
…ensibleLoadManagerImpl only) (#23230)
  • Loading branch information
Demogorgon314 authored Sep 4, 2024
1 parent a678e97 commit 8bb30a1
Showing 1 changed file with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.slf4j.Logger;

Expand All @@ -122,6 +125,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;

public static final int STARTUP_TIMEOUT_SECONDS = 30;

public static final int MAX_RETRY = 5;

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

public static final Set<String> INTERNAL_TOPICS =
Expand Down Expand Up @@ -401,10 +408,43 @@ public void start() throws PulsarServerException {
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
pulsar.runWhenReadyForIncomingRequests(() -> {
try {
this.serviceUnitStateChannel.start();
} catch (Exception e) {
failStarting(e);
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.create();
int retry = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
brokerRegistry.register();
this.serviceUnitStateChannel.start();
break;
} catch (Exception e) {
log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Thread.sleep(backoff.next());
} catch (InterruptedException ex) {
log.warn("Interrupted while sleeping.");
// preserve thread's interrupt status
Thread.currentThread().interrupt();
try {
pulsar.close();
} catch (PulsarServerException exc) {
log.error("Failed to close pulsar service.", exc);
}
return;
}
failStarting(e);
if (retry >= MAX_RETRY) {
log.error("Failed to start the service unit state channel after retry {} th. "
+ "Closing pulsar service.", retry, e);
try {
pulsar.close();
} catch (PulsarServerException ex) {
log.error("Failed to close pulsar service.", ex);
}
}
}
}
});
this.antiAffinityGroupPolicyHelper =
Expand Down Expand Up @@ -498,8 +538,15 @@ private void failStarting(Exception ex) {
this.brokerRegistry, ex);
if (this.brokerRegistry != null) {
try {
brokerRegistry.close();
} catch (PulsarServerException e) {
brokerRegistry.unregister();
} catch (MetadataStoreException e) {
// ignore
}
}
if (this.serviceUnitStateChannel != null) {
try {
serviceUnitStateChannel.close();
} catch (IOException e) {
// ignore
}
}
Expand Down

0 comments on commit 8bb30a1

Please sign in to comment.