From 1e0dc81d3603866be2c22778cd4fb72b57966371 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 27 Aug 2024 19:53:58 +0800 Subject: [PATCH 1/7] Add retry for start service unit state channel --- .../extensions/ExtensibleLoadManagerImpl.java | 42 ++++++++++++++++--- .../impl/ModularLoadManagerImpl.java | 1 + 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 95882cfb21b3c..8abdfe8cf1c34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -122,6 +122,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; + public static final int MAX_RETRY = 5; + private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set INTERNAL_TOPICS = @@ -394,17 +396,40 @@ public void start() throws PulsarServerException { }); }); this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); - this.brokerRegistry.start(); + this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter, pulsar.getBrokerId()); this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); pulsar.runWhenReadyForIncomingRequests(() -> { - try { - this.serviceUnitStateChannel.start(); - } catch (Exception e) { - failStarting(e); + int retry = 0; + while (!Thread.currentThread().isInterrupted()) { + try { + this.brokerRegistry.start(); + this.serviceUnitStateChannel.start(); + break; + } catch (Exception e) { + log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", + pulsar.getBrokerId(), ++retry, e); + try { + TimeUnit.SECONDS.sleep(Math.min(retry * 10L, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); + } catch (InterruptedException ex) { + log.warn("Interrupted while sleeping."); + // preserve thread's interrupt status + Thread.currentThread().interrupt(); + } + failStarting(e); + if (retry >= MAX_RETRY) { + log.error("Failed to start the service unit state channel after retry {} th. " + + "Closing pulsar service.", retry, e); + try { + pulsar.close(); + } catch (PulsarServerException ex) { + log.error("Failed to close pulsar service.", ex); + } + } + } } }); this.antiAffinityGroupPolicyHelper = @@ -503,6 +528,13 @@ private void failStarting(Exception ex) { // ignore } } + if (this.serviceUnitStateChannel != null) { + try { + serviceUnitStateChannel.close(); + } catch (IOException e) { + // ignore + } + } initWaiter.completeExceptionally(ex); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 141e020d7ca45..9c93e636df676 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -879,6 +879,7 @@ private void preallocateBundle(String bundle, String broker) { k -> ConcurrentOpenHashMap.>newBuilder() .build()); + Set myConcurrentSet = ConcurrentHashMap.newKeySet(); synchronized (namespaceToBundleRange) { namespaceToBundleRange.computeIfAbsent(namespaceName, k -> ConcurrentOpenHashSet.newBuilder().build()) From ebad246f51a516b67a5071fa598c17cef2b3d5dc Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 27 Aug 2024 20:03:31 +0800 Subject: [PATCH 2/7] Fix test --- .../extensions/ExtensibleLoadManagerImpl.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 8abdfe8cf1c34..de78458b95eb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -100,6 +100,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -396,7 +397,7 @@ public void start() throws PulsarServerException { }); }); this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar); - + this.brokerRegistry.start(); this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter, pulsar.getBrokerId()); this.serviceUnitStateChannel.listen(unloadManager); @@ -406,7 +407,7 @@ public void start() throws PulsarServerException { int retry = 0; while (!Thread.currentThread().isInterrupted()) { try { - this.brokerRegistry.start(); + brokerRegistry.register(); this.serviceUnitStateChannel.start(); break; } catch (Exception e) { @@ -523,8 +524,8 @@ private void failStarting(Exception ex) { this.brokerRegistry, ex); if (this.brokerRegistry != null) { try { - brokerRegistry.close(); - } catch (PulsarServerException e) { + brokerRegistry.unregister(); + } catch (MetadataStoreException e) { // ignore } } From 5989cce58876fcaaeccf034e1c0803666cc48308 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Wed, 28 Aug 2024 01:25:59 +0800 Subject: [PATCH 3/7] Remove unused change --- .../pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 9c93e636df676..141e020d7ca45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -879,7 +879,6 @@ private void preallocateBundle(String bundle, String broker) { k -> ConcurrentOpenHashMap.>newBuilder() .build()); - Set myConcurrentSet = ConcurrentHashMap.newKeySet(); synchronized (namespaceToBundleRange) { namespaceToBundleRange.computeIfAbsent(namespaceName, k -> ConcurrentOpenHashSet.newBuilder().build()) From 9d16db27f4cc2d4a05aca118af16a86df5afaa58 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 30 Aug 2024 16:55:21 +0800 Subject: [PATCH 4/7] Use Backoff for delay calculation --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index de78458b95eb4..1d3a26ca7d357 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -99,6 +99,8 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; @@ -404,6 +406,10 @@ public void start() throws PulsarServerException { this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); pulsar.runWhenReadyForIncomingRequests(() -> { + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS) + .create(); int retry = 0; while (!Thread.currentThread().isInterrupted()) { try { @@ -414,7 +420,7 @@ public void start() throws PulsarServerException { log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { - TimeUnit.SECONDS.sleep(Math.min(retry * 10L, MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS)); + TimeUnit.SECONDS.sleep(backoff.next()); } catch (InterruptedException ex) { log.warn("Interrupted while sleeping."); // preserve thread's interrupt status From 2d4f7602a0ad01ffdba313cac64146052292893b Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Fri, 30 Aug 2024 16:57:03 +0800 Subject: [PATCH 5/7] Use Thread.sleep --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 1d3a26ca7d357..400863447f378 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -420,7 +420,7 @@ public void start() throws PulsarServerException { log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", pulsar.getBrokerId(), ++retry, e); try { - TimeUnit.SECONDS.sleep(backoff.next()); + Thread.sleep(backoff.next()); } catch (InterruptedException ex) { log.warn("Interrupted while sleeping."); // preserve thread's interrupt status From 16f9e0de2fdace02fb5daf93999fa4693e93ec34 Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Mon, 2 Sep 2024 20:52:56 +0800 Subject: [PATCH 6/7] Add a new field STARTUP_TIMEOUT_SECONDS as the timeout --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 400863447f378..34f82da569128 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -125,6 +125,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; + public static final int STARTUP_TIMEOUT_SECONDS = 30; + public static final int MAX_RETRY = 5; private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; @@ -408,7 +410,7 @@ public void start() throws PulsarServerException { pulsar.runWhenReadyForIncomingRequests(() -> { Backoff backoff = new BackoffBuilder() .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMax(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS) + .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) .create(); int retry = 0; while (!Thread.currentThread().isInterrupted()) { From 1b0f2e6f926d507e2da218a49e83fb7fcaba2e0b Mon Sep 17 00:00:00 2001 From: Demogorgon314 Date: Tue, 3 Sep 2024 15:31:44 +0800 Subject: [PATCH 7/7] Close the pulsar service after thread been interrupted --- .../loadbalance/extensions/ExtensibleLoadManagerImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 34f82da569128..40efa6390a78a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -427,6 +427,12 @@ public void start() throws PulsarServerException { log.warn("Interrupted while sleeping."); // preserve thread's interrupt status Thread.currentThread().interrupt(); + try { + pulsar.close(); + } catch (PulsarServerException exc) { + log.error("Failed to close pulsar service.", exc); + } + return; } failStarting(e); if (retry >= MAX_RETRY) {