@@ -21,6 +21,10 @@ extern crate core;
2121
2222#[ cfg( not( feature = "std" ) ) ]
2323extern crate alloc;
24+ #[ cfg( not( feature = "std" ) ) ]
25+ use alloc:: format;
26+ #[ cfg( not( feature = "std" ) ) ]
27+ use alloc:: vec:: Vec ;
2428
2529#[ macro_use]
2630extern crate lightning;
@@ -70,9 +74,13 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
7074
7175use lightning_liquidity:: ALiquidityManager ;
7276
77+ use core:: future:: Future ;
7378use core:: ops:: Deref ;
79+ use core:: pin:: Pin ;
7480use core:: time:: Duration ;
7581
82+ use lightning:: util:: async_poll:: { MultiResultFuturePoller , ResultFuture } ;
83+
7684#[ cfg( feature = "std" ) ]
7785use core:: sync:: atomic:: { AtomicBool , Ordering } ;
7886#[ cfg( feature = "std" ) ]
@@ -627,11 +635,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627635pub async fn process_events_async <
628636 ' a ,
629637 UL : ' static + Deref ,
630- CF : ' static + Deref ,
631- T : ' static + Deref ,
632- F : ' static + Deref ,
638+ CF : ' static + Deref + Sync ,
639+ T : ' static + Deref + Sync ,
640+ F : ' static + Deref + Sync ,
633641 G : ' static + Deref < Target = NetworkGraph < L > > ,
634- L : ' static + Deref ,
642+ L : ' static + Deref + Sync ,
635643 P : ' static + Deref ,
636644 EventHandlerFuture : core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
637645 EventHandler : Fn ( Event ) -> EventHandlerFuture ,
@@ -646,10 +654,10 @@ pub async fn process_events_async<
646654 RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
647655 PM : ' static + Deref ,
648656 LM : ' static + Deref ,
649- D : ' static + Deref ,
650- O : ' static + Deref ,
651- K : ' static + Deref ,
652- OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > ,
657+ D : ' static + Deref + Sync ,
658+ O : ' static + Deref + Sync ,
659+ K : ' static + Deref + Sync ,
660+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > + Clone + Send ,
653661 S : ' static + Deref < Target = SC > + Send + Sync ,
654662 SC : for < ' b > WriteableScore < ' b > ,
655663 SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
@@ -826,17 +834,24 @@ where
826834 None => { } ,
827835 }
828836
837+ let mut futures = Vec :: new ( ) ;
838+
829839 // Persist channel manager.
830840 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
831841 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
832- kv_store
833- . write (
834- CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
835- CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
836- CHANNEL_MANAGER_PERSISTENCE_KEY ,
837- & channel_manager. get_cm ( ) . encode ( ) ,
838- )
839- . await ?;
842+ let res = kv_store. write (
843+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
844+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
845+ CHANNEL_MANAGER_PERSISTENCE_KEY ,
846+ & channel_manager. get_cm ( ) . encode ( ) ,
847+ ) ;
848+
849+ let fut: Pin <
850+ Box < dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > > + Send + ' static > ,
851+ > = Box :: pin ( async move { res. await . map_err ( |e| ( e, true ) ) } ) ;
852+
853+ futures. push ( ResultFuture :: Pending ( fut) ) ;
854+
840855 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
841856 }
842857
@@ -864,17 +879,29 @@ where
864879 log_warn ! ( logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually." ) ;
865880 log_trace ! ( logger, "Persisting network graph." ) ;
866881 }
867- if let Err ( e) = kv_store
868- . write (
869- NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
870- NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
871- NETWORK_GRAPH_PERSISTENCE_KEY ,
872- & network_graph. encode ( ) ,
873- )
874- . await
875- {
876- log_error ! ( logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e) ;
877- }
882+ let res = kv_store. write (
883+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
884+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
885+ NETWORK_GRAPH_PERSISTENCE_KEY ,
886+ & network_graph. encode ( ) ,
887+ ) ;
888+ let fut: Pin <
889+ Box <
890+ dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > >
891+ + Send
892+ + ' static ,
893+ > ,
894+ > = Box :: pin ( async move {
895+ res. await . map_err ( |e| {
896+ ( lightning:: io:: Error :: new (
897+ lightning:: io:: ErrorKind :: Other ,
898+ format ! ( "failed to persist network graph, check your disk and permissions {}" , e) ) ,
899+ false )
900+ } )
901+ } ) ;
902+
903+ futures. push ( ResultFuture :: Pending ( fut) ) ;
904+
878905 have_pruned = true ;
879906 }
880907 let prune_timer =
@@ -901,21 +928,28 @@ where
901928 } else {
902929 log_trace ! ( logger, "Persisting scorer" ) ;
903930 }
904- if let Err ( e) = kv_store
905- . write (
906- SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
907- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
908- SCORER_PERSISTENCE_KEY ,
909- & scorer. encode ( ) ,
910- )
911- . await
912- {
913- log_error ! (
914- logger,
915- "Error: Failed to persist scorer, check your disk and permissions {}" ,
916- e
917- ) ;
918- }
931+ let res = kv_store. write (
932+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
933+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
934+ SCORER_PERSISTENCE_KEY ,
935+ & scorer. encode ( ) ,
936+ ) ;
937+ let fut: Pin <
938+ Box <
939+ dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > >
940+ + Send
941+ + ' static ,
942+ > ,
943+ > = Box :: pin ( async move {
944+ res. await . map_err ( |e| {
945+ ( lightning:: io:: Error :: new (
946+ lightning:: io:: ErrorKind :: Other ,
947+ format ! ( "failed to persist scorer, check your disk and permissions {}" , e) ) ,
948+ false )
949+ } )
950+ } ) ;
951+
952+ futures. push ( ResultFuture :: Pending ( fut) ) ;
919953 }
920954 last_scorer_persist_call = sleeper ( SCORER_PERSIST_TIMER ) ;
921955 } ,
@@ -928,14 +962,46 @@ where
928962 Some ( false ) => {
929963 log_trace ! ( logger, "Regenerating sweeper spends if necessary" ) ;
930964 if let Some ( ref sweeper) = sweeper {
931- let _ = sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await ;
965+ let sweeper = sweeper. clone ( ) ;
966+ let fut: Pin <
967+ Box <
968+ dyn Future < Output = Result < ( ) , ( lightning:: io:: Error , bool ) > >
969+ + Send
970+ + ' static ,
971+ > ,
972+ > = Box :: pin ( async move {
973+ sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await . map_err ( |_| {
974+ (
975+ lightning:: io:: Error :: new (
976+ lightning:: io:: ErrorKind :: Other ,
977+ "failed to persist sweeper, check your disk and permissions" ,
978+ ) ,
979+ false ,
980+ )
981+ } )
982+ } ) ;
983+
984+ futures. push ( ResultFuture :: Pending ( fut) ) ;
932985 }
933986 last_sweeper_call = sleeper ( SWEEPER_TIMER ) ;
934987 } ,
935988 Some ( true ) => break ,
936989 None => { } ,
937990 }
938991
992+ // Run persistence tasks in parallel.
993+ let multi_res = MultiResultFuturePoller :: new ( futures) . await ;
994+ for res in multi_res {
995+ if let Err ( ( e, exit) ) = res {
996+ log_error ! ( logger, "Error: {}" , e) ;
997+
998+ if exit {
999+ log_error ! ( logger, "Exiting background processor" ) ;
1000+ return Err ( e) ;
1001+ }
1002+ }
1003+ }
1004+
9391005 // Onion messenger timer tick.
9401006 match check_sleeper ( & mut last_onion_message_handler_call) {
9411007 Some ( false ) => {
@@ -1025,9 +1091,9 @@ fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker
10251091/// synchronous background persistence.
10261092pub async fn process_events_async_with_kv_store_sync <
10271093 UL : ' static + Deref ,
1028- CF : ' static + Deref ,
1029- T : ' static + Deref ,
1030- F : ' static + Deref ,
1094+ CF : ' static + Deref + Sync ,
1095+ T : ' static + Deref + Sync ,
1096+ F : ' static + Deref + Sync ,
10311097 G : ' static + Deref < Target = NetworkGraph < L > > ,
10321098 L : ' static + Deref + Send + Sync ,
10331099 P : ' static + Deref ,
@@ -1044,10 +1110,13 @@ pub async fn process_events_async_with_kv_store_sync<
10441110 RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
10451111 PM : ' static + Deref ,
10461112 LM : ' static + Deref ,
1047- D : ' static + Deref ,
1048- O : ' static + Deref ,
1049- K : ' static + Deref ,
1050- OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , KVStoreSyncWrapper < K > , L , O > > ,
1113+ D : ' static + Deref + Sync ,
1114+ O : ' static + Deref + Sync ,
1115+ K : ' static + Deref + Sync ,
1116+ OS : ' static
1117+ + Deref < Target = OutputSweeper < T , D , F , CF , KVStoreSyncWrapper < K > , L , O > >
1118+ + Clone
1119+ + Send ,
10511120 S : ' static + Deref < Target = SC > + Send + Sync ,
10521121 SC : for < ' b > WriteableScore < ' b > ,
10531122 SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
0 commit comments