Skip to content

Commit b0542b3

Browse files
committed
[improve] Retry re-validating ResourceLock with backoff after errors (#22617)
1 parent 9242f33 commit b0542b3

File tree

28 files changed

+106
-25
lines changed

28 files changed

+106
-25
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@
3636
import org.apache.pulsar.client.api.Producer;
3737
import org.apache.pulsar.client.api.ProducerBuilder;
3838
import org.apache.pulsar.client.api.Schema;
39-
import org.apache.pulsar.client.impl.Backoff;
4039
import org.apache.pulsar.client.impl.ProducerImpl;
4140
import org.apache.pulsar.client.impl.PulsarClientImpl;
4241
import org.apache.pulsar.common.naming.TopicName;
42+
import org.apache.pulsar.common.util.Backoff;
4343
import org.apache.pulsar.common.util.FutureUtil;
4444
import org.slf4j.Logger;
4545
import org.slf4j.LoggerFactory;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.apache.pulsar.client.api.Producer;
3434
import org.apache.pulsar.client.api.Schema;
3535
import org.apache.pulsar.client.api.SubscriptionType;
36-
import org.apache.pulsar.client.impl.Backoff;
3736
import org.apache.pulsar.client.impl.PulsarClientImpl;
37+
import org.apache.pulsar.common.util.Backoff;
3838
import org.apache.pulsar.common.util.FutureUtil;
3939
import org.apache.pulsar.metadata.api.MetadataEvent;
4040
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@
2424
import java.util.concurrent.TimeUnit;
2525
import javax.annotation.Nonnull;
2626
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
27-
import org.apache.pulsar.client.impl.Backoff;
28-
import org.apache.pulsar.client.impl.BackoffBuilder;
2927
import org.apache.pulsar.client.util.RetryUtil;
3028
import org.apache.pulsar.common.classification.InterfaceStability;
3129
import org.apache.pulsar.common.naming.NamespaceBundle;
3230
import org.apache.pulsar.common.naming.TopicName;
3331
import org.apache.pulsar.common.policies.data.TopicPolicies;
32+
import org.apache.pulsar.common.util.Backoff;
33+
import org.apache.pulsar.common.util.BackoffBuilder;
3434
import org.apache.pulsar.common.util.FutureUtil;
3535

3636
/**

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@
6868
import org.apache.pulsar.broker.service.Subscription;
6969
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
7070
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
71-
import org.apache.pulsar.client.impl.Backoff;
7271
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
7372
import org.apache.pulsar.common.api.proto.MessageMetadata;
7473
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
7574
import org.apache.pulsar.common.protocol.Commands;
75+
import org.apache.pulsar.common.util.Backoff;
7676
import org.apache.pulsar.common.util.Codec;
7777
import org.apache.pulsar.common.util.FutureUtil;
7878
import org.slf4j.Logger;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050
import org.apache.pulsar.broker.service.Subscription;
5151
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
5252
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
53-
import org.apache.pulsar.client.impl.Backoff;
5453
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
54+
import org.apache.pulsar.common.util.Backoff;
5555
import org.apache.pulsar.common.util.Codec;
5656
import org.slf4j.Logger;
5757
import org.slf4j.LoggerFactory;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.apache.pulsar.client.api.MessageId;
5656
import org.apache.pulsar.client.api.Producer;
5757
import org.apache.pulsar.client.api.PulsarClientException;
58-
import org.apache.pulsar.client.impl.Backoff;
5958
import org.apache.pulsar.client.impl.MessageImpl;
6059
import org.apache.pulsar.client.impl.ProducerImpl;
6160
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -64,6 +63,7 @@
6463
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
6564
import org.apache.pulsar.common.schema.SchemaInfo;
6665
import org.apache.pulsar.common.stats.Rate;
66+
import org.apache.pulsar.common.util.Backoff;
6767
import org.apache.pulsar.common.util.Codec;
6868
import org.slf4j.Logger;
6969
import org.slf4j.LoggerFactory;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.bookkeeper.mledger.impl.PositionImpl;
3838
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3939
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
40-
import org.apache.pulsar.client.impl.Backoff;
40+
import org.apache.pulsar.common.util.Backoff;
4141

4242
/**
4343
* Entry reader that fulfill read request by streamline the read instead of reading with micro batch.

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@
5959
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
6060
import org.apache.pulsar.client.api.PulsarClientException;
6161
import org.apache.pulsar.client.api.transaction.TxnID;
62-
import org.apache.pulsar.client.impl.Backoff;
6362
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
6463
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
6564
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
6665
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
66+
import org.apache.pulsar.common.util.Backoff;
6767
import org.apache.pulsar.common.util.FutureUtil;
6868
import org.apache.pulsar.common.util.RecoverTimeRecord;
6969
import org.apache.pulsar.common.util.collections.BitSetRecyclable;

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@
5050
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
5151
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
5252
import org.apache.pulsar.broker.resources.PulsarResources;
53-
import org.apache.pulsar.client.impl.Backoff;
5453
import org.apache.pulsar.common.policies.data.BundlesData;
5554
import org.apache.pulsar.common.policies.data.LocalPolicies;
5655
import org.apache.pulsar.common.policies.data.Policies;
56+
import org.apache.pulsar.common.util.Backoff;
5757
import org.apache.pulsar.metadata.api.Notification;
5858
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
5959
import org.apache.pulsar.stats.CacheMetricsCollector;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
import org.apache.pulsar.broker.systopic.SystemTopicClient;
4646
import org.apache.pulsar.client.admin.PulsarAdminException;
4747
import org.apache.pulsar.client.api.Schema;
48-
import org.apache.pulsar.client.impl.Backoff;
49-
import org.apache.pulsar.client.impl.BackoffBuilder;
48+
import org.apache.pulsar.common.util.Backoff;
49+
import org.apache.pulsar.common.util.BackoffBuilder;
5050
import org.apache.pulsar.common.events.PulsarEvent;
5151
import org.apache.pulsar.common.naming.NamespaceName;
5252
import org.apache.pulsar.common.naming.TopicName;

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import lombok.extern.slf4j.Slf4j;
3333
import org.apache.commons.lang3.reflect.FieldUtils;
3434
import org.apache.pulsar.client.api.ProducerConsumerBase;
35+
import org.apache.pulsar.common.util.Backoff;
36+
import org.apache.pulsar.common.util.BackoffBuilder;
3537
import org.apache.pulsar.common.util.FutureUtil;
3638
import org.awaitility.Awaitility;
3739
import org.awaitility.core.ConditionTimeoutException;

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.pulsar.client.impl;
2020

2121
import org.apache.pulsar.client.util.RetryUtil;
22+
import org.apache.pulsar.common.util.Backoff;
23+
import org.apache.pulsar.common.util.BackoffBuilder;
2224
import org.apache.pulsar.common.util.FutureUtil;
2325
import org.testng.annotations.Test;
2426

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.apache.pulsar.common.protocol.Commands;
4646
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
4747
import org.apache.pulsar.common.schema.SchemaInfo;
48+
import org.apache.pulsar.common.util.Backoff;
49+
import org.apache.pulsar.common.util.BackoffBuilder;
4850
import org.apache.pulsar.common.util.FutureUtil;
4951
import org.slf4j.Logger;
5052
import org.slf4j.LoggerFactory;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.pulsar.client.api.PulsarClientException;
3030
import org.apache.pulsar.client.impl.HandlerState.State;
3131
import org.apache.pulsar.common.protocol.Commands;
32+
import org.apache.pulsar.common.util.Backoff;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@
117117
import org.apache.pulsar.common.protocol.Commands;
118118
import org.apache.pulsar.common.schema.SchemaInfo;
119119
import org.apache.pulsar.common.schema.SchemaType;
120+
import org.apache.pulsar.common.util.Backoff;
121+
import org.apache.pulsar.common.util.BackoffBuilder;
120122
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
121123
import org.apache.pulsar.common.util.ExceptionHandler;
122124
import org.apache.pulsar.common.util.FutureUtil;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.apache.pulsar.common.naming.NamespaceName;
4444
import org.apache.pulsar.common.naming.TopicName;
4545
import org.apache.pulsar.common.topics.TopicList;
46+
import org.apache.pulsar.common.util.Backoff;
47+
import org.apache.pulsar.common.util.BackoffBuilder;
4648
import org.apache.pulsar.common.util.FutureUtil;
4749
import org.slf4j.Logger;
4850
import org.slf4j.LoggerFactory;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
9292
import org.apache.pulsar.common.schema.SchemaInfo;
9393
import org.apache.pulsar.common.schema.SchemaType;
94+
import org.apache.pulsar.common.util.BackoffBuilder;
9495
import org.apache.pulsar.common.util.DateFormatter;
9596
import org.apache.pulsar.common.util.FutureUtil;
9697
import org.apache.pulsar.common.util.RelativeTimeUtil;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
import org.apache.pulsar.common.schema.SchemaInfo;
8585
import org.apache.pulsar.common.schema.SchemaType;
8686
import org.apache.pulsar.common.topics.TopicList;
87+
import org.apache.pulsar.common.util.Backoff;
88+
import org.apache.pulsar.common.util.BackoffBuilder;
8789
import org.apache.pulsar.common.util.FutureUtil;
8890
import org.apache.pulsar.common.util.netty.EventLoopUtil;
8991
import org.slf4j.Logger;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
3232
import org.apache.pulsar.common.naming.NamespaceName;
3333
import org.apache.pulsar.common.protocol.Commands;
34+
import org.apache.pulsar.common.util.BackoffBuilder;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.apache.pulsar.common.api.proto.Subscription;
4747
import org.apache.pulsar.common.api.proto.TxnAction;
4848
import org.apache.pulsar.common.protocol.Commands;
49+
import org.apache.pulsar.common.util.Backoff;
50+
import org.apache.pulsar.common.util.BackoffBuilder;
4951
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
5052
import org.slf4j.Logger;
5153
import org.slf4j.LoggerFactory;

pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.ScheduledExecutorService;
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.function.Supplier;
25-
import org.apache.pulsar.client.impl.Backoff;
25+
import org.apache.pulsar.common.util.Backoff;
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
5050
import org.apache.pulsar.client.util.ExecutorProvider;
5151
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
52+
import org.apache.pulsar.common.util.Backoff;
5253
import org.awaitility.Awaitility;
5354
import org.testng.Assert;
5455
import org.testng.annotations.AfterMethod;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java renamed to pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pulsar.client.impl;
19+
package org.apache.pulsar.common.util;
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import java.time.Clock;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java renamed to pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pulsar.client.impl;
19+
package org.apache.pulsar.common.util;
2020

2121
import java.time.Clock;
2222
import java.util.concurrent.TimeUnit;
@@ -32,8 +32,11 @@ public class BackoffBuilder {
3232

3333
public BackoffBuilder() {
3434
this.initial = 0;
35+
this.unitInitial = TimeUnit.MILLISECONDS;
3536
this.max = 0;
37+
this.unitMax = TimeUnit.MILLISECONDS;
3638
this.mandatoryStop = 0;
39+
this.unitMandatoryStop = TimeUnit.MILLISECONDS;
3740
this.clock = Clock.systemDefaultZone();
3841
}
3942

pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java renamed to pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pulsar.client.impl;
19+
package org.apache.pulsar.common.util;
2020

2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertFalse;

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.CompletionException;
2929
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.ScheduledExecutorService;
3131
import java.util.stream.Collectors;
3232
import lombok.extern.slf4j.Slf4j;
3333
import org.apache.pulsar.common.util.FutureUtil;
@@ -52,21 +52,21 @@ class LockManagerImpl<T> implements LockManager<T> {
5252
private final MetadataCache<T> cache;
5353
private final MetadataSerde<T> serde;
5454
private final FutureUtil.Sequencer<Void> sequencer;
55-
private final ExecutorService executor;
55+
private final ScheduledExecutorService executor;
5656

5757
private enum State {
5858
Ready, Closed
5959
}
6060

6161
private State state = State.Ready;
6262

63-
LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ExecutorService executor) {
63+
LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ScheduledExecutorService executor) {
6464
this(store, new JSONMetadataSerdeSimpleType<>(
6565
TypeFactory.defaultInstance().constructSimpleType(clazz, null)),
6666
executor);
6767
}
6868

69-
LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ExecutorService executor) {
69+
LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ScheduledExecutorService executor) {
7070
this.store = store;
7171
this.cache = store.getMetadataCache(serde);
7272
this.serde = serde;
@@ -83,7 +83,7 @@ public CompletableFuture<Optional<T>> readLock(String path) {
8383

8484
@Override
8585
public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
86-
ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path);
86+
ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, executor);
8787

8888
CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
8989
lock.acquire(value).thenRun(() -> {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java

+33-4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@
2121
import java.util.EnumSet;
2222
import java.util.Optional;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.ScheduledFuture;
26+
import java.util.concurrent.TimeUnit;
2427
import lombok.extern.slf4j.Slf4j;
2528
import org.apache.bookkeeper.common.concurrent.FutureUtils;
29+
import org.apache.pulsar.common.util.Backoff;
30+
import org.apache.pulsar.common.util.BackoffBuilder;
2631
import org.apache.pulsar.common.util.FutureUtil;
2732
import org.apache.pulsar.metadata.api.GetResult;
2833
import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -44,7 +49,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
4449
private long version;
4550
private final CompletableFuture<Void> expiredFuture;
4651
private boolean revalidateAfterReconnection = false;
52+
private final Backoff backoff;
4753
private final FutureUtil.Sequencer<Void> sequencer;
54+
private final ScheduledExecutorService executor;
55+
private ScheduledFuture<?> revalidateTask;
4856

4957
private enum State {
5058
Init,
@@ -55,14 +63,20 @@ private enum State {
5563

5664
private State state;
5765

58-
public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path) {
66+
ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path,
67+
ScheduledExecutorService executor) {
5968
this.store = store;
6069
this.serde = serde;
6170
this.path = path;
6271
this.version = -1;
6372
this.expiredFuture = new CompletableFuture<>();
6473
this.sequencer = FutureUtil.Sequencer.create();
6574
this.state = State.Init;
75+
this.executor = executor;
76+
this.backoff = new BackoffBuilder()
77+
.setInitialTime(100, TimeUnit.MILLISECONDS)
78+
.setMax(60, TimeUnit.SECONDS)
79+
.create();
6680
}
6781

6882
@Override
@@ -93,6 +107,10 @@ public synchronized CompletableFuture<Void> release() {
93107
}
94108

95109
state = State.Releasing;
110+
if (revalidateTask != null) {
111+
revalidateTask.cancel(true);
112+
}
113+
96114
CompletableFuture<Void> result = new CompletableFuture<>();
97115

98116
store.delete(path, Optional.of(version))
@@ -210,8 +228,15 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
210228
* This method is thread-safe and it will perform multiple re-validation operations in turn.
211229
*/
212230
synchronized CompletableFuture<Void> silentRevalidateOnce() {
231+
if (state != State.Valid) {
232+
return CompletableFuture.completedFuture(null);
233+
}
234+
213235
return sequencer.sequential(() -> revalidate(value))
214-
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
236+
.thenRun(() -> {
237+
log.info("Successfully revalidated the lock on {}", path);
238+
backoff.reset();
239+
})
215240
.exceptionally(ex -> {
216241
synchronized (ResourceLockImpl.this) {
217242
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
@@ -225,8 +250,12 @@ synchronized CompletableFuture<Void> silentRevalidateOnce() {
225250
// Continue assuming we hold the lock, until we can revalidate it, either
226251
// on Reconnected or SessionReestablished events.
227252
revalidateAfterReconnection = true;
228-
log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
229-
realCause.getMessage());
253+
254+
long delayMillis = backoff.next();
255+
log.warn("Failed to revalidate the lock at {}: {} - Retrying in {} seconds", path,
256+
realCause.getMessage(), delayMillis / 1000.0);
257+
revalidateTask =
258+
executor.schedule(this::silentRevalidateOnce, delayMillis, TimeUnit.MILLISECONDS);
230259
}
231260
}
232261
return null;

0 commit comments

Comments
 (0)