diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 95882cfb21b3c8..40efa6390a78a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -99,7 +99,10 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -122,6 +125,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; + public static final int STARTUP_TIMEOUT_SECONDS = 30; + + public static final int MAX_RETRY = 5; + private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set INTERNAL_TOPICS = @@ -401,10 +408,43 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); pulsar.runWhenReadyForIncomingRequests(() -> { - try { - this.serviceUnitStateChannel.start(); - } catch (Exception e) { - failStarting(e); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) + .create(); + int retry = 0; + while (!Thread.currentThread().isInterrupted()) { + try { + brokerRegistry.register(); + this.serviceUnitStateChannel.start(); + break; + } catch (Exception e) { + log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", + pulsar.getBrokerId(), ++retry, e); + try { + Thread.sleep(backoff.next()); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); + // preserve thread's interrupt status + Thread.currentThread().interrupt(); + try { + pulsar.close(); + } catch (PulsarServerException exc) { + log.error("Failed to close pulsar service.", exc); + } + return; + } + failStarting(e); + if (retry >= MAX_RETRY) { + log.error("Failed to start the service unit state channel after retry {} th. " + + "Closing pulsar service.", retry, e); + try { + pulsar.close(); + } catch (PulsarServerException ex) { + log.error("Failed to close pulsar service.", ex); + } + } + } } }); this.antiAffinityGroupPolicyHelper = @@ -498,8 +538,15 @@ private void failStarting(Exception ex) { this.brokerRegistry, ex); if (this.brokerRegistry != null) { try { - brokerRegistry.close(); - } catch (PulsarServerException e) { + brokerRegistry.unregister(); + } catch (MetadataStoreException e) { + // ignore + } + } + if (this.serviceUnitStateChannel != null) { + try { + serviceUnitStateChannel.close(); + } catch (IOException e) { // ignore } }