Skip to content

Commit 47338f4

Browse files
authored
Add heartbeat handling to key iteration (#203)
* Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done. Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat. * history for kv has the same issue - if values are purged in flight, the iteration may hang. Signed-off-by: Alberto Ricart <[email protected]> --------- Signed-off-by: Alberto Ricart <[email protected]>
1 parent 3d448c9 commit 47338f4

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

Diff for: kv/src/kv.ts

+26
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,19 @@ export class Bucket implements KV {
797797
qi.push(fn);
798798
});
799799

800+
(async () => {
801+
for await (const s of iter.status()) {
802+
switch (s.type) {
803+
// if we get a heartbeat we got all the keys
804+
case "heartbeat":
805+
qi.push(() => {
806+
qi.stop();
807+
});
808+
break;
809+
}
810+
}
811+
})().then();
812+
800813
// if they break from the iterator stop the consumer
801814
qi.iterClosed.then(() => {
802815
iter.stop();
@@ -899,6 +912,19 @@ export class Bucket implements KV {
899912
},
900913
});
901914

915+
(async () => {
916+
for await (const s of iter.status()) {
917+
switch (s.type) {
918+
// if we get a heartbeat we got all the keys
919+
case "heartbeat":
920+
keys.push(() => {
921+
keys.stop();
922+
});
923+
break;
924+
}
925+
}
926+
})().then();
927+
902928
iter.closed().then(() => {
903929
keys.push(() => {
904930
keys.stop();

0 commit comments

Comments
 (0)