@@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1515use std:: sync:: { Arc , Mutex } ;
1616use std:: time:: Duration ;
1717
18+ use bdk_chain:: Merge ;
1819use bitcoin:: hashes:: { sha256, Hash , HashEngine , Hmac , HmacEngine } ;
1920use lightning:: io:: { self , Error , ErrorKind } ;
2021use lightning:: util:: persist:: { KVStore , KVStoreSync } ;
@@ -181,7 +182,7 @@ impl KVStoreSync for VssStore {
181182 }
182183
183184 fn remove (
184- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
185+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
185186 ) -> io:: Result < ( ) > {
186187 let internal_runtime = self . internal_runtime . as_ref ( ) . ok_or_else ( || {
187188 debug_assert ! ( false , "Failed to access internal runtime" ) ;
@@ -203,6 +204,7 @@ impl KVStoreSync for VssStore {
203204 primary_namespace,
204205 secondary_namespace,
205206 key,
207+ lazy,
206208 )
207209 . await
208210 } ;
@@ -275,7 +277,7 @@ impl KVStore for VssStore {
275277 } )
276278 }
277279 fn remove (
278- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
280+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
279281 ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
280282 let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
281283 let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
@@ -292,6 +294,7 @@ impl KVStore for VssStore {
292294 primary_namespace,
293295 secondary_namespace,
294296 key,
297+ lazy,
295298 )
296299 . await
297300 } )
@@ -321,6 +324,7 @@ struct VssStoreInner {
321324 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
322325 // The lock also encapsulates the latest written version per key.
323326 locks : Mutex < HashMap < String , Arc < tokio:: sync:: Mutex < u64 > > > > ,
327+ pending_lazy_deletes : Mutex < Vec < KeyValue > > ,
324328}
325329
326330impl VssStoreInner {
@@ -347,7 +351,8 @@ impl VssStoreInner {
347351
348352 let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
349353 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
350- Self { client, store_id, storable_builder, key_obfuscator, locks }
354+ let pending_lazy_deletes = Mutex :: new ( Vec :: new ( ) ) ;
355+ Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes }
351356 }
352357
353358 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -451,6 +456,12 @@ impl VssStoreInner {
451456 "write" ,
452457 ) ?;
453458
459+ let delete_items = self
460+ . pending_lazy_deletes
461+ . try_lock ( )
462+ . ok ( )
463+ . and_then ( |mut guard| guard. take ( ) )
464+ . unwrap_or_default ( ) ;
454465 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
455466 let obfuscated_key =
456467 self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
@@ -464,7 +475,7 @@ impl VssStoreInner {
464475 version: vss_version,
465476 value: storable. encode_to_vec( ) ,
466477 } ] ,
467- delete_items : vec ! [ ] ,
478+ delete_items,
468479 } ;
469480
470481 self . client . put_object ( & request) . await . map_err ( |e| {
@@ -482,7 +493,7 @@ impl VssStoreInner {
482493
483494 async fn remove_internal (
484495 & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
485- primary_namespace : String , secondary_namespace : String , key : String ,
496+ primary_namespace : String , secondary_namespace : String , key : String , lazy : bool ,
486497 ) -> io:: Result < ( ) > {
487498 check_namespace_key_validity (
488499 & primary_namespace,
@@ -491,13 +502,19 @@ impl VssStoreInner {
491502 "remove" ,
492503 ) ?;
493504
505+ let obfuscated_key =
506+ self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
507+
508+ let key_value = KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ;
509+ if lazy {
510+ let mut pending_lazy_deletes = self . pending_lazy_deletes . lock ( ) . unwrap ( ) ;
511+ pending_lazy_deletes. push ( key_value) ;
512+ return Ok ( ( ) ) ;
513+ }
514+
494515 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
495- let obfuscated_key =
496- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
497- let request = DeleteObjectRequest {
498- store_id : self . store_id . clone ( ) ,
499- key_value : Some ( KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ) ,
500- } ;
516+ let request =
517+ DeleteObjectRequest { store_id : self . store_id . clone ( ) , key_value : Some ( key_value) } ;
501518
502519 self . client . delete_object ( & request) . await . map_err ( |e| {
503520 let msg = format ! (
0 commit comments