14
14
package com .baidu .bifromq .inbox .store ;
15
15
16
16
import static com .baidu .bifromq .inbox .util .KeyUtil .bufferMsgKey ;
17
+ import static com .baidu .bifromq .inbox .util .KeyUtil .hasInboxKeyPrefix ;
17
18
import static com .baidu .bifromq .inbox .util .KeyUtil .inboxKeyPrefix ;
18
19
import static com .baidu .bifromq .inbox .util .KeyUtil .inboxKeyUpperBound ;
19
20
import static com .baidu .bifromq .inbox .util .KeyUtil .isBufferMessageKey ;
20
21
import static com .baidu .bifromq .inbox .util .KeyUtil .isMetadataKey ;
21
22
import static com .baidu .bifromq .inbox .util .KeyUtil .isQoS0MessageKey ;
23
+ import static com .baidu .bifromq .inbox .util .KeyUtil .parseInboxKeyPrefix ;
22
24
import static com .baidu .bifromq .inbox .util .KeyUtil .parseSeq ;
23
25
import static com .baidu .bifromq .inbox .util .KeyUtil .parseTenantId ;
24
26
import static com .baidu .bifromq .inbox .util .KeyUtil .qos0InboxMsgKey ;
@@ -109,6 +111,7 @@ final class InboxStoreCoProc implements IKVRangeCoProc {
109
111
// make it configurable?
110
112
private static final int MAX_GC_BATCH_SIZE = 10000 ;
111
113
private static long initTime ;
114
+ private final KVRangeId id ;
112
115
private final ISettingProvider settingProvider ;
113
116
private final IEventCollector eventCollector ;
114
117
private final TenantsState tenantStates ;
@@ -120,12 +123,14 @@ final class InboxStoreCoProc implements IKVRangeCoProc {
120
123
ISettingProvider settingProvider ,
121
124
IEventCollector eventCollector ,
122
125
Supplier <IKVReader > rangeReaderProvider ) {
126
+ this .id = id ;
123
127
initTime = HLC .INST .getPhysical ();
124
128
this .settingProvider = settingProvider ;
125
129
this .eventCollector = eventCollector ;
126
130
this .rangeReaderProvider = rangeReaderProvider ;
127
131
this .tenantStates = new TenantsState (rangeReaderProvider .get (),
128
132
"clusterId" , clusterId , "storeId" , storeId , "rangeId" , KVRangeIdUtil .toString (id ));
133
+ log .debug ("Loading tenant states: rangeId={}" , KVRangeIdUtil .toString (id ));
129
134
load ();
130
135
}
131
136
@@ -212,6 +217,7 @@ public Supplier<RWCoProcOutput> mutate(RWCoProcInput input, IKVReader reader, IK
212
217
@ Override
213
218
public void reset (Boundary boundary ) {
214
219
tenantStates .reset ();
220
+ log .debug ("Reloading tenant states: rangeId={}" , KVRangeIdUtil .toString (id ));
215
221
load ();
216
222
}
217
223
@@ -987,17 +993,33 @@ private void commitToInbox(ByteString scopedInboxId,
987
993
private void load () {
988
994
IKVReader reader = rangeReaderProvider .get ();
989
995
IKVIterator itr = reader .iterator ();
996
+ int probe = 0 ;
990
997
for (itr .seekToFirst (); itr .isValid (); ) {
991
998
if (isMetadataKey (itr .key ())) {
999
+ probe = 0 ;
992
1000
try {
993
1001
tenantStates .upsert (parseTenantId (itr .key ()), InboxMetadata .parseFrom (itr .value ()));
994
1002
} catch (InvalidProtocolBufferException e ) {
995
1003
log .error ("Unexpected error" , e );
996
1004
} finally {
997
- itr .seek (inboxKeyUpperBound (itr .key ()));
1005
+ itr .next ();
1006
+ probe ++;
1007
+ }
1008
+ } else {
1009
+ if (probe < 20 ) {
1010
+ itr .next ();
1011
+ probe ++;
1012
+ } else {
1013
+ if (hasInboxKeyPrefix (itr .key ())) {
1014
+ itr .seek (inboxKeyUpperBound (parseInboxKeyPrefix (itr .key ())));
1015
+ } else {
1016
+ itr .next ();
1017
+ probe ++;
1018
+ }
998
1019
}
999
1020
}
1000
1021
}
1022
+ log .debug ("Tenant states loaded: rangeId={}" , KVRangeIdUtil .toString (id ));
1001
1023
}
1002
1024
1003
1025
private boolean hasExpired (InboxMetadata metadata , long nowTS ) {
0 commit comments