21
21
import java .util .EnumSet ;
22
22
import java .util .Optional ;
23
23
import java .util .concurrent .CompletableFuture ;
24
+ import java .util .concurrent .ScheduledExecutorService ;
25
+ import java .util .concurrent .ScheduledFuture ;
26
+ import java .util .concurrent .TimeUnit ;
24
27
import lombok .extern .slf4j .Slf4j ;
25
28
import org .apache .bookkeeper .common .concurrent .FutureUtils ;
29
+ import org .apache .pulsar .common .util .Backoff ;
30
+ import org .apache .pulsar .common .util .BackoffBuilder ;
26
31
import org .apache .pulsar .common .util .FutureUtil ;
27
32
import org .apache .pulsar .metadata .api .GetResult ;
28
33
import org .apache .pulsar .metadata .api .MetadataSerde ;
@@ -44,7 +49,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
44
49
private long version ;
45
50
private final CompletableFuture <Void > expiredFuture ;
46
51
private boolean revalidateAfterReconnection = false ;
52
+ private final Backoff backoff ;
47
53
private final FutureUtil .Sequencer <Void > sequencer ;
54
+ private final ScheduledExecutorService executor ;
55
+ private ScheduledFuture <?> revalidateTask ;
48
56
49
57
private enum State {
50
58
Init ,
@@ -55,14 +63,20 @@ private enum State {
55
63
56
64
private State state ;
57
65
58
- public ResourceLockImpl (MetadataStoreExtended store , MetadataSerde <T > serde , String path ) {
66
+ ResourceLockImpl (MetadataStoreExtended store , MetadataSerde <T > serde , String path ,
67
+ ScheduledExecutorService executor ) {
59
68
this .store = store ;
60
69
this .serde = serde ;
61
70
this .path = path ;
62
71
this .version = -1 ;
63
72
this .expiredFuture = new CompletableFuture <>();
64
73
this .sequencer = FutureUtil .Sequencer .create ();
65
74
this .state = State .Init ;
75
+ this .executor = executor ;
76
+ this .backoff = new BackoffBuilder ()
77
+ .setInitialTime (100 , TimeUnit .MILLISECONDS )
78
+ .setMax (60 , TimeUnit .SECONDS )
79
+ .create ();
66
80
}
67
81
68
82
@ Override
@@ -93,6 +107,10 @@ public synchronized CompletableFuture<Void> release() {
93
107
}
94
108
95
109
state = State .Releasing ;
110
+ if (revalidateTask != null ) {
111
+ revalidateTask .cancel (true );
112
+ }
113
+
96
114
CompletableFuture <Void > result = new CompletableFuture <>();
97
115
98
116
store .delete (path , Optional .of (version ))
@@ -210,8 +228,15 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
210
228
* This method is thread-safe and it will perform multiple re-validation operations in turn.
211
229
*/
212
230
synchronized CompletableFuture <Void > silentRevalidateOnce () {
231
+ if (state != State .Valid ) {
232
+ return CompletableFuture .completedFuture (null );
233
+ }
234
+
213
235
return sequencer .sequential (() -> revalidate (value ))
214
- .thenRun (() -> log .info ("Successfully revalidated the lock on {}" , path ))
236
+ .thenRun (() -> {
237
+ log .info ("Successfully revalidated the lock on {}" , path );
238
+ backoff .reset ();
239
+ })
215
240
.exceptionally (ex -> {
216
241
synchronized (ResourceLockImpl .this ) {
217
242
Throwable realCause = FutureUtil .unwrapCompletionException (ex );
@@ -225,8 +250,12 @@ synchronized CompletableFuture<Void> silentRevalidateOnce() {
225
250
// Continue assuming we hold the lock, until we can revalidate it, either
226
251
// on Reconnected or SessionReestablished events.
227
252
revalidateAfterReconnection = true ;
228
- log .warn ("Failed to revalidate the lock at {}. Retrying later on reconnection {}" , path ,
229
- realCause .getMessage ());
253
+
254
+ long delayMillis = backoff .next ();
255
+ log .warn ("Failed to revalidate the lock at {}: {} - Retrying in {} seconds" , path ,
256
+ realCause .getMessage (), delayMillis / 1000.0 );
257
+ revalidateTask =
258
+ executor .schedule (this ::silentRevalidateOnce , delayMillis , TimeUnit .MILLISECONDS );
230
259
}
231
260
}
232
261
return null ;
0 commit comments