From d749ccdaeba74efbb72d0c1b49cf2dd8ecce33e9 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 28 Jul 2025 10:14:13 +0200 Subject: [PATCH 1/7] Expand background processor define_run_body! macro Only expansion, clean ups in the next commit to facilitate review. --- lightning-background-processor/src/lib.rs | 1108 +++++++++++++++------ 1 file changed, 805 insertions(+), 303 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index e340109d35c..36bb98da761 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -331,247 +331,6 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri true } -macro_rules! maybe_await { - (true, $e:expr) => { - $e.await - }; - (false, $e:expr) => { - $e - }; -} - -macro_rules! define_run_body { - ( - $kv_store: ident, - $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $onion_messenger: ident, $process_onion_message_handler_events: expr, - $peer_manager: ident, $gossip_sync: ident, - $process_sweeper: expr, - $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr, $async_persist: tt, - ) => { { - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); - $channel_manager.get_cm().timer_tick_occurred(); - log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); - $chain_monitor.rebroadcast_pending_claims(); - - let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); - let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); - let mut last_ping_call = $get_timer(PING_TIMER); - let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); - let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); - let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); - let mut last_sweeper_call = $get_timer(SWEEPER_TIMER); - let mut have_pruned = false; - let mut have_decayed_scorer = false; - - let mut cur_batch_delay = $batch_delay.get(); - let mut last_forwards_processing_call = $get_timer(cur_batch_delay); - - loop { - $process_channel_manager_events; - $process_chain_monitor_events; - $process_onion_message_handler_events; - - // Note that the PeerManager::process_events may block on ChannelManager's locks, - // hence it comes last here. When the ChannelManager finishes whatever it's doing, - // we want to ensure we get into `persist_manager` as quickly as we can, especially - // without running the normal event processing above and handing events to users. - // - // Specifically, on an *extremely* slow machine, we may see ChannelManager start - // processing a message effectively at any point during this loop. In order to - // minimize the time between such processing completing and persisting the updated - // ChannelManager, we want to minimize methods blocking on a ChannelManager - // generally, and as a fallback place such blocking only immediately before - // persistence. - $peer_manager.as_ref().process_events(); - - if $timer_elapsed(&mut last_forwards_processing_call, cur_batch_delay) { - $channel_manager.get_cm().process_pending_htlc_forwards(); - cur_batch_delay = $batch_delay.next(); - last_forwards_processing_call = $get_timer(cur_batch_delay); - } - - // Exit the loop if the background processor was requested to stop. - if $loop_exit_check { - log_trace!($logger, "Terminating background processor."); - break; - } - - // We wait up to 100ms, but track how long it takes to detect being put to sleep, - // see `await_start`'s use below. - let mut await_start = None; - if $check_slow_await { await_start = Some($get_timer(Duration::from_secs(1))); } - $await; - let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), Duration::from_secs(1)) } else { false }; - - // Exit the loop if the background processor was requested to stop. - if $loop_exit_check { - log_trace!($logger, "Terminating background processor."); - break; - } - - if $channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!($logger, "Persisting ChannelManager..."); - maybe_await!($async_persist, $kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &$channel_manager.get_cm().encode(), - ))?; - log_trace!($logger, "Done persisting ChannelManager."); - } - if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); - $channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = $get_timer(FRESHNESS_TIMER); - } - if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { - if let Some(om) = &$onion_messenger { - log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); - om.get_om().timer_tick_occurred(); - } - last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); - } - if await_slow { - // On various platforms, we may be starved of CPU cycles for several reasons. - // E.g. on iOS, if we've been in the background, we will be entirely paused. - // Similarly, if we're on a desktop platform and the device has been asleep, we - // may not get any cycles. - // We detect this by checking if our max-100ms-sleep, above, ran longer than a - // full second, at which point we assume sockets may have been killed (they - // appear to be at least on some platforms, even if it has only been a second). - // Note that we have to take care to not get here just because user event - // processing was slow at the top of the loop. For example, the sample client - // may call Bitcoin Core RPCs during event handling, which very often takes - // more than a handful of seconds to complete, and shouldn't disconnect all our - // peers. - log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); - $peer_manager.as_ref().disconnect_all_peers(); - last_ping_call = $get_timer(PING_TIMER); - } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) { - log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); - $peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = $get_timer(PING_TIMER); - } - - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. For RGS, since 60 seconds is likely too long, - // we prune after an initial sync completes. - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer); - let should_prune = match $gossip_sync { - GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, - _ => prune_timer_elapsed, - }; - if should_prune { - // The network graph must not be pruned while rapid sync completion is pending - if let Some(network_graph) = $gossip_sync.prunable_network_graph() { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Pruning and persisting network graph."); - network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs()); - } else { - log_warn!($logger, - "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually." - ); - log_trace!($logger, "Persisting network graph."); - } - - if let Err(e) = maybe_await!($async_persist, $kv_store.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - )) { - log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) - } - - have_pruned = true; - } - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - last_prune_call = $get_timer(prune_timer); - } - - if !have_decayed_scorer { - if let Some(ref scorer) = $scorer { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Calling time_passed on scorer at startup"); - scorer.write_lock().time_passed(duration_since_epoch); - } - } - have_decayed_scorer = true; - } - - if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { - if let Some(ref scorer) = $scorer { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Calling time_passed and persisting scorer"); - scorer.write_lock().time_passed(duration_since_epoch); - } else { - log_trace!($logger, "Persisting scorer"); - } - if let Err(e) = maybe_await!($async_persist, $kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - )) { - log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } - } - last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); - } - - if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) { - log_trace!($logger, "Rebroadcasting monitor's pending claims"); - $chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); - } - - if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) { - log_trace!($logger, "Regenerating sweeper spends if necessary"); - $process_sweeper; - last_sweeper_call = $get_timer(SWEEPER_TIMER); - } - } - - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - maybe_await!($async_persist, $kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &$channel_manager.get_cm().encode(), - ))?; - - // Persist Scorer on exit - if let Some(ref scorer) = $scorer { - maybe_await!($async_persist, $kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ))?; - } - - // Persist NetworkGraph on exit - if let Some(network_graph) = $gossip_sync.network_graph() { - maybe_await!($async_persist, $kv_store.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - ))?; - } - - Ok(()) - } } -} - pub(crate) mod futures_util { use core::future::Future; use core::marker::Unpin; @@ -947,26 +706,79 @@ where }) }; let mut batch_delay = BatchDelay::new(); - define_run_body!( - kv_store, - chain_monitor, - chain_monitor.process_pending_events_async(async_event_handler).await, - channel_manager, - channel_manager.get_cm().process_pending_events_async(async_event_handler).await, - onion_messenger, - if let Some(om) = &onion_messenger { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling ChannelManager's timer_tick_occurred on startup"), + module_path!(), + file!(), + 0u32, + None, + )); + channel_manager.get_cm().timer_tick_occurred(); + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Rebroadcasting monitor's pending claims on startup"), + module_path!(), + file!(), + 0u32, + None, + )); + chain_monitor.rebroadcast_pending_claims(); + let mut last_freshness_call = (|t| sleeper(t))(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = (|t| sleeper(t))(ONION_MESSAGE_HANDLER_TIMER); + let mut last_ping_call = (|t| sleeper(t))(PING_TIMER); + let mut last_prune_call = (|t| sleeper(t))(FIRST_NETWORK_PRUNE_TIMER); + let mut last_scorer_persist_call = (|t| sleeper(t))(SCORER_PERSIST_TIMER); + let mut last_rebroadcast_call = (|t| sleeper(t))(REBROADCAST_TIMER); + let mut last_sweeper_call = (|t| sleeper(t))(SWEEPER_TIMER); + let mut have_pruned = false; + let mut have_decayed_scorer = false; + let mut cur_batch_delay = batch_delay.get(); + let mut last_forwards_processing_call = (|t| sleeper(t))(cur_batch_delay); + loop { + (channel_manager.get_cm().process_pending_events_async(async_event_handler).await); + (chain_monitor.process_pending_events_async(async_event_handler).await); + (if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await - }, - peer_manager, - gossip_sync, - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + }); + peer_manager.as_ref().process_events(); + if (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, } - }, - logger, - scorer, - should_break, + })(&mut last_forwards_processing_call, cur_batch_delay) + { + channel_manager.get_cm().process_pending_htlc_forwards(); + cur_batch_delay = batch_delay.next(); + last_forwards_processing_call = (|t| sleeper(t))(cur_batch_delay); + } + if should_break { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Terminating background processor."), + module_path!(), + file!(), + 0u32, + None, + )); + break; + } + let mut await_start = None; + if mobile_interruptable_platform { + await_start = Some((|t| sleeper(t))(Duration::from_secs(1))); + } { let om_fut = if let Some(om) = onion_messenger.as_ref() { let fut = om.get_om().get_update_future(); @@ -980,7 +792,6 @@ where } else { OptionalSelector { optional_future: None } }; - let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); let sleep_delay = match (needs_processing, mobile_interruptable_platform) { (true, true) => batch_delay.get().min(Duration::from_millis(100)), @@ -988,7 +799,6 @@ where (false, true) => Duration::from_millis(100), (false, false) => FASTEST_TIMER, }; - let fut = Selector { a: channel_manager.get_cm().get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -1002,9 +812,65 @@ where should_break = exit; }, } - }, - |t| sleeper(t), - |fut: &mut SleepFuture, _| { + }; + let await_slow = if mobile_interruptable_platform { + (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut await_start.unwrap(), Duration::from_secs(1)) + } else { + false + }; + if should_break { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Terminating background processor."), + module_path!(), + file!(), + 0u32, + None, + )); + break; + } + if channel_manager.get_cm().get_and_clear_needs_persistence() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Persisting ChannelManager..."), + module_path!(), + file!(), + 0u32, + None, + )); + (kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + )) + .await?; + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Done persisting ChannelManager."), + module_path!(), + file!(), + 0u32, + None, + )); + } + if (|fut: &mut SleepFuture, _| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -1014,12 +880,308 @@ where }, task::Poll::Pending => false, } - }, - mobile_interruptable_platform, - fetch_time, - batch_delay, - true, - ) + })(&mut last_freshness_call, FRESHNESS_TIMER) + { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling ChannelManager's timer_tick_occurred"), + module_path!(), + file!(), + 0u32, + None, + )); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = (|t| sleeper(t))(FRESHNESS_TIMER); + } + if (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) + { + if let Some(om) = &onion_messenger { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling OnionMessageHandler's timer_tick_occurred"), + module_path!(), + file!(), + 0u32, + None, + )); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = (|t| sleeper(t))(ONION_MESSAGE_HANDLER_TIMER); + } + if await_slow { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("100ms sleep took more than a second, disconnecting peers."), + module_path!(), + file!(), + 0u32, + None, + )); + peer_manager.as_ref().disconnect_all_peers(); + last_ping_call = (|t| sleeper(t))(PING_TIMER); + } else if (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut last_ping_call, PING_TIMER) + { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling PeerManager's timer_tick_occurred"), + module_path!(), + file!(), + 0u32, + None, + )); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = (|t| sleeper(t))(PING_TIMER); + } + let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + let prune_timer_elapsed = (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut last_prune_call, prune_timer); + let should_prune = match gossip_sync { + GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, + _ => prune_timer_elapsed, + }; + if should_prune { + if let Some(network_graph) = gossip_sync.prunable_network_graph() { + if let Some(duration_since_epoch) = fetch_time() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Pruning and persisting network graph."), + module_path!(), + file!(), + 0u32, + None, + )); + network_graph.remove_stale_channels_and_tracking_with_time( + duration_since_epoch.as_secs(), + ); + } else { + logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Warn),None,None,format_args!("Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."),module_path!(),file!(),0u32,None)); + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Persisting network graph."), + module_path!(), + file!(), + 0u32, + None, + )); + } + if let Err(e) = (kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + )) + .await + { + logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Error),None,None,format_args!("Error: Failed to persist network graph, check your disk and permissions {}",e),module_path!(),file!(),0u32,None)); + } + have_pruned = true; + } + let prune_timer = + if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + last_prune_call = (|t| sleeper(t))(prune_timer); + } + if !have_decayed_scorer { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling time_passed on scorer at startup"), + module_path!(), + file!(), + 0u32, + None, + )); + scorer.write_lock().time_passed(duration_since_epoch); + } + } + have_decayed_scorer = true; + } + if (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) + { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling time_passed and persisting scorer"), + module_path!(), + file!(), + 0u32, + None, + )); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Persisting scorer"), + module_path!(), + file!(), + 0u32, + None, + )); + } + if let Err(e) = (kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + )) + .await + { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Error), + None, + None, + format_args!( + "Error: Failed to persist scorer, check your disk and permissions {}", + e + ), + module_path!(), + file!(), + 0u32, + None, + )); + } + } + last_scorer_persist_call = (|t| sleeper(t))(SCORER_PERSIST_TIMER); + } + if (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut last_rebroadcast_call, REBROADCAST_TIMER) + { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Rebroadcasting monitor's pending claims"), + module_path!(), + file!(), + 0u32, + None, + )); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = (|t| sleeper(t))(REBROADCAST_TIMER); + } + if (|fut: &mut SleepFuture, _| { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + should_break = exit; + true + }, + task::Poll::Pending => false, + } + })(&mut last_sweeper_call, SWEEPER_TIMER) + { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Regenerating sweeper spends if necessary"), + module_path!(), + file!(), + 0u32, + None, + )); + { + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + } + }; + last_sweeper_call = (|t| sleeper(t))(SWEEPER_TIMER); + } + } + (kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + )) + .await?; + if let Some(ref scorer) = scorer { + (kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + )) + .await?; + } + if let Some(network_graph) = gossip_sync.network_graph() { + (kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + )) + .await?; + } + Ok(()) } /// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for @@ -1218,26 +1380,72 @@ impl BackgroundProcessor { event_handler.handle_event(event) }; let mut batch_delay = BatchDelay::new(); - define_run_body!( - kv_store, - chain_monitor, - chain_monitor.process_pending_events(&event_handler), - channel_manager, - channel_manager.get_cm().process_pending_events(&event_handler), - onion_messenger, - if let Some(om) = &onion_messenger { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling ChannelManager's timer_tick_occurred on startup"), + module_path!(), + file!(), + 0u32, + None, + )); + channel_manager.get_cm().timer_tick_occurred(); + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Rebroadcasting monitor's pending claims on startup"), + module_path!(), + file!(), + 0u32, + None, + )); + chain_monitor.rebroadcast_pending_claims(); + let mut last_freshness_call = (|_| Instant::now())(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = + (|_| Instant::now())(ONION_MESSAGE_HANDLER_TIMER); + let mut last_ping_call = (|_| Instant::now())(PING_TIMER); + let mut last_prune_call = (|_| Instant::now())(FIRST_NETWORK_PRUNE_TIMER); + let mut last_scorer_persist_call = (|_| Instant::now())(SCORER_PERSIST_TIMER); + let mut last_rebroadcast_call = (|_| Instant::now())(REBROADCAST_TIMER); + let mut last_sweeper_call = (|_| Instant::now())(SWEEPER_TIMER); + let mut have_pruned = false; + let mut have_decayed_scorer = false; + let mut cur_batch_delay = batch_delay.get(); + let mut last_forwards_processing_call = (|_| Instant::now())(cur_batch_delay); + loop { + (channel_manager.get_cm().process_pending_events(&event_handler)); + (chain_monitor.process_pending_events(&event_handler)); + (if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) - }, - peer_manager, - gossip_sync, - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); - } - }, - logger, - scorer, - stop_thread.load(Ordering::Acquire), + }); + peer_manager.as_ref().process_events(); + if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_forwards_processing_call, + cur_batch_delay, + ) { + channel_manager.get_cm().process_pending_htlc_forwards(); + cur_batch_delay = batch_delay.next(); + last_forwards_processing_call = (|_| Instant::now())(cur_batch_delay); + } + if (stop_thread.load(Ordering::Acquire)) { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Terminating background processor."), + module_path!(), + file!(), + 0u32, + None, + )); + break; + } + let mut await_start = None; + if false { + await_start = Some((|_| Instant::now())(Duration::from_secs(1))); + } { let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { (Some(om), Some(lm)) => Sleeper::from_four_futures( @@ -1268,21 +1476,315 @@ impl BackgroundProcessor { }; let fastest_timeout = batch_delay.min(Duration::from_millis(100)); sleeper.wait_timeout(fastest_timeout); - }, - |_| Instant::now(), - |time: &Instant, dur| time.elapsed() > dur, - false, - || { - use std::time::SystemTime; - Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"), + }; + let await_slow = if false { + (|time: &Instant, dur| time.elapsed() > dur)( + &mut await_start.unwrap(), + Duration::from_secs(1), ) - }, - batch_delay, - false, - ) + } else { + false + }; + if (stop_thread.load(Ordering::Acquire)) { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Terminating background processor."), + module_path!(), + file!(), + 0u32, + None, + )); + break; + } + if channel_manager.get_cm().get_and_clear_needs_persistence() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Persisting ChannelManager..."), + module_path!(), + file!(), + 0u32, + None, + )); + (kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ))?; + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Done persisting ChannelManager."), + module_path!(), + file!(), + 0u32, + None, + )); + } + if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_freshness_call, + FRESHNESS_TIMER, + ) { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling ChannelManager's timer_tick_occurred"), + module_path!(), + file!(), + 0u32, + None, + )); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = (|_| Instant::now())(FRESHNESS_TIMER); + } + if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_onion_message_handler_call, + ONION_MESSAGE_HANDLER_TIMER, + ) { + if let Some(om) = &onion_messenger { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling OnionMessageHandler's timer_tick_occurred"), + module_path!(), + file!(), + 0u32, + None, + )); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = + (|_| Instant::now())(ONION_MESSAGE_HANDLER_TIMER); + } + if await_slow { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("100ms sleep took more than a second, disconnecting peers."), + module_path!(), + file!(), + 0u32, + None, + )); + peer_manager.as_ref().disconnect_all_peers(); + last_ping_call = (|_| Instant::now())(PING_TIMER); + } else if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_ping_call, + PING_TIMER, + ) { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling PeerManager's timer_tick_occurred"), + module_path!(), + file!(), + 0u32, + None, + )); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = (|_| Instant::now())(PING_TIMER); + } + let prune_timer = + if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + let prune_timer_elapsed = + (|time: &Instant, dur| time.elapsed() > dur)(&mut last_prune_call, prune_timer); + let should_prune = match gossip_sync { + GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, + _ => prune_timer_elapsed, + }; + if should_prune { + if let Some(network_graph) = gossip_sync.prunable_network_graph() { + if let Some(duration_since_epoch) = (|| { + use std::time::SystemTime; + Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"), + ) + })() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Pruning and persisting network graph."), + module_path!(), + file!(), + 0u32, + None, + )); + network_graph.remove_stale_channels_and_tracking_with_time( + duration_since_epoch.as_secs(), + ); + } else { + logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Warn),None,None,format_args!("Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."),module_path!(),file!(),0u32,None)); + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Persisting network graph."), + module_path!(), + file!(), + 0u32, + None, + )); + } + if let Err(e) = (kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + )) { + logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Error),None,None,format_args!("Error: Failed to persist network graph, check your disk and permissions {}",e),module_path!(),file!(),0u32,None)); + } + have_pruned = true; + } + let prune_timer = + if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + last_prune_call = (|_| Instant::now())(prune_timer); + } + if !have_decayed_scorer { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = (|| { + use std::time::SystemTime; + Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"), + ) + })() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling time_passed on scorer at startup"), + module_path!(), + file!(), + 0u32, + None, + )); + scorer.write_lock().time_passed(duration_since_epoch); + } + } + have_decayed_scorer = true; + } + if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_scorer_persist_call, + SCORER_PERSIST_TIMER, + ) { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = (|| { + use std::time::SystemTime; + Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"), + ) + })() { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Calling time_passed and persisting scorer"), + module_path!(), + file!(), + 0u32, + None, + )); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Persisting scorer"), + module_path!(), + file!(), + 0u32, + None, + )); + } + if let Err(e) = (kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + )) { + logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Error),None,None,format_args!("Error: Failed to persist scorer, check your disk and permissions {}",e),module_path!(),file!(),0u32,None)); + } + } + last_scorer_persist_call = (|_| Instant::now())(SCORER_PERSIST_TIMER); + } + if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_rebroadcast_call, + REBROADCAST_TIMER, + ) { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Rebroadcasting monitor's pending claims"), + module_path!(), + file!(), + 0u32, + None, + )); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = (|_| Instant::now())(REBROADCAST_TIMER); + } + if (|time: &Instant, dur| time.elapsed() > dur)( + &mut last_sweeper_call, + SWEEPER_TIMER, + ) { + logger.log(lightning::util::logger::Record::new( + (lightning::util::logger::Level::Trace), + None, + None, + format_args!("Regenerating sweeper spends if necessary"), + module_path!(), + file!(), + 0u32, + None, + )); + { + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); + } + }; + last_sweeper_call = (|_| Instant::now())(SWEEPER_TIMER); + } + } + (kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ))?; + if let Some(ref scorer) = scorer { + (kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ))?; + } + if let Some(network_graph) = gossip_sync.network_graph() { + (kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ))?; + } + Ok(()) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } From ff01967242f344a39cfc08d47e592d139861a30f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 28 Jul 2025 10:17:31 +0200 Subject: [PATCH 2/7] Clean up macro expansion Macros can only be expanded recursively, so the log macros needed to be brought back. Also some unnecessary parenthesis, curly braces and unused arguments/code removed. --- lightning-background-processor/src/lib.rs | 945 ++++++++-------------- 1 file changed, 318 insertions(+), 627 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 36bb98da761..5221e800c2b 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -706,47 +706,45 @@ where }) }; let mut batch_delay = BatchDelay::new(); - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling ChannelManager's timer_tick_occurred on startup"), - module_path!(), - file!(), - 0u32, - None, - )); + + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); channel_manager.get_cm().timer_tick_occurred(); - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Rebroadcasting monitor's pending claims on startup"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Rebroadcasting monitor's pending claims on startup"); chain_monitor.rebroadcast_pending_claims(); - let mut last_freshness_call = (|t| sleeper(t))(FRESHNESS_TIMER); - let mut last_onion_message_handler_call = (|t| sleeper(t))(ONION_MESSAGE_HANDLER_TIMER); - let mut last_ping_call = (|t| sleeper(t))(PING_TIMER); - let mut last_prune_call = (|t| sleeper(t))(FIRST_NETWORK_PRUNE_TIMER); - let mut last_scorer_persist_call = (|t| sleeper(t))(SCORER_PERSIST_TIMER); - let mut last_rebroadcast_call = (|t| sleeper(t))(REBROADCAST_TIMER); - let mut last_sweeper_call = (|t| sleeper(t))(SWEEPER_TIMER); + + let mut last_freshness_call = sleeper(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); + let mut last_ping_call = sleeper(PING_TIMER); + let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER); + let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); + let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER); + let mut last_sweeper_call = sleeper(SWEEPER_TIMER); let mut have_pruned = false; let mut have_decayed_scorer = false; + let mut cur_batch_delay = batch_delay.get(); - let mut last_forwards_processing_call = (|t| sleeper(t))(cur_batch_delay); + let mut last_forwards_processing_call = sleeper(cur_batch_delay); + loop { - (channel_manager.get_cm().process_pending_events_async(async_event_handler).await); - (chain_monitor.process_pending_events_async(async_event_handler).await); - (if let Some(om) = &onion_messenger { + channel_manager.get_cm().process_pending_events_async(async_event_handler).await; + chain_monitor.process_pending_events_async(async_event_handler).await; + if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await - }); + } + + // Note that the PeerManager::process_events may block on ChannelManager's locks, + // hence it comes last here. When the ChannelManager finishes whatever it's doing, + // we want to ensure we get into `persist_manager` as quickly as we can, especially + // without running the normal event processing above and handing events to users. + // + // Specifically, on an *extremely* slow machine, we may see ChannelManager start + // processing a message effectively at any point during this loop. In order to + // minimize the time between such processing completing and persisting the updated + // ChannelManager, we want to minimize methods blocking on a ChannelManager + // generally, and as a fallback place such blocking only immediately before + // persistence. peer_manager.as_ref().process_events(); - if (|fut: &mut SleepFuture, _| { + if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -756,65 +754,57 @@ where }, task::Poll::Pending => false, } - })(&mut last_forwards_processing_call, cur_batch_delay) + })(&mut last_forwards_processing_call) { channel_manager.get_cm().process_pending_htlc_forwards(); cur_batch_delay = batch_delay.next(); - last_forwards_processing_call = (|t| sleeper(t))(cur_batch_delay); + last_forwards_processing_call = sleeper(cur_batch_delay); } if should_break { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Terminating background processor."), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Terminating background processor."); break; } + + // We wait up to 100ms, but track how long it takes to detect being put to sleep, + // see `await_start`'s use below. let mut await_start = None; if mobile_interruptable_platform { - await_start = Some((|t| sleeper(t))(Duration::from_secs(1))); + await_start = Some(sleeper(Duration::from_secs(1))); } - { - let om_fut = if let Some(om) = onion_messenger.as_ref() { - let fut = om.get_om().get_update_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; - let lm_fut = if let Some(lm) = liquidity_manager.as_ref() { - let fut = lm.get_lm().get_pending_msgs_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; - let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); - let sleep_delay = match (needs_processing, mobile_interruptable_platform) { - (true, true) => batch_delay.get().min(Duration::from_millis(100)), - (true, false) => batch_delay.get().min(FASTEST_TIMER), - (false, true) => Duration::from_millis(100), - (false, false) => FASTEST_TIMER, - }; - let fut = Selector { - a: channel_manager.get_cm().get_event_or_persistence_needed_future(), - b: chain_monitor.get_update_future(), - c: om_fut, - d: lm_fut, - e: sleeper(sleep_delay), - }; - match fut.await { - SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {}, - SelectorOutput::E(exit) => { - should_break = exit; - }, - } + let om_fut = if let Some(om) = onion_messenger.as_ref() { + let fut = om.get_om().get_update_future(); + OptionalSelector { optional_future: Some(fut) } + } else { + OptionalSelector { optional_future: None } + }; + let lm_fut = if let Some(lm) = liquidity_manager.as_ref() { + let fut = lm.get_lm().get_pending_msgs_future(); + OptionalSelector { optional_future: Some(fut) } + } else { + OptionalSelector { optional_future: None } + }; + let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); + let sleep_delay = match (needs_processing, mobile_interruptable_platform) { + (true, true) => batch_delay.get().min(Duration::from_millis(100)), + (true, false) => batch_delay.get().min(FASTEST_TIMER), + (false, true) => Duration::from_millis(100), + (false, false) => FASTEST_TIMER, }; + let fut = Selector { + a: channel_manager.get_cm().get_event_or_persistence_needed_future(), + b: chain_monitor.get_update_future(), + c: om_fut, + d: lm_fut, + e: sleeper(sleep_delay), + }; + match fut.await { + SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {}, + SelectorOutput::E(exit) => { + should_break = exit; + }, + } let await_slow = if mobile_interruptable_platform { - (|fut: &mut SleepFuture, _| { + (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -824,53 +814,27 @@ where }, task::Poll::Pending => false, } - })(&mut await_start.unwrap(), Duration::from_secs(1)) + })(&mut await_start.unwrap()) } else { false }; if should_break { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Terminating background processor."), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Terminating background processor."); break; } if channel_manager.get_cm().get_and_clear_needs_persistence() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Persisting ChannelManager..."), - module_path!(), - file!(), - 0u32, - None, - )); - (kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode(), - )) - .await?; - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Done persisting ChannelManager."), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Persisting ChannelManager..."); + kv_store + .write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ) + .await?; + log_trace!(logger, "Done persisting ChannelManager."); } - if (|fut: &mut SleepFuture, _| { + if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -880,22 +844,13 @@ where }, task::Poll::Pending => false, } - })(&mut last_freshness_call, FRESHNESS_TIMER) + })(&mut last_freshness_call) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling ChannelManager's timer_tick_occurred"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = (|t| sleeper(t))(FRESHNESS_TIMER); + last_freshness_call = sleeper(FRESHNESS_TIMER); } - if (|fut: &mut SleepFuture, _| { + if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -905,37 +860,31 @@ where }, task::Poll::Pending => false, } - })(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) + })(&mut last_onion_message_handler_call) { if let Some(om) = &onion_messenger { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling OnionMessageHandler's timer_tick_occurred"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); om.get_om().timer_tick_occurred(); } - last_onion_message_handler_call = (|t| sleeper(t))(ONION_MESSAGE_HANDLER_TIMER); + last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); } if await_slow { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("100ms sleep took more than a second, disconnecting peers."), - module_path!(), - file!(), - 0u32, - None, - )); + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // We detect this by checking if our max-100ms-sleep, above, ran longer than a + // full second, at which point we assume sockets may have been killed (they + // appear to be at least on some platforms, even if it has only been a second). + // Note that we have to take care to not get here just because user event + // processing was slow at the top of the loop. For example, the sample client + // may call Bitcoin Core RPCs during event handling, which very often takes + // more than a handful of seconds to complete, and shouldn't disconnect all our + // peers. + log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); peer_manager.as_ref().disconnect_all_peers(); - last_ping_call = (|t| sleeper(t))(PING_TIMER); - } else if (|fut: &mut SleepFuture, _| { + last_ping_call = sleeper(PING_TIMER); + } else if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -945,23 +894,19 @@ where }, task::Poll::Pending => false, } - })(&mut last_ping_call, PING_TIMER) + })(&mut last_ping_call) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling PeerManager's timer_tick_occurred"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = (|t| sleeper(t))(PING_TIMER); + last_ping_call = sleeper(PING_TIMER); } - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - let prune_timer_elapsed = (|fut: &mut SleepFuture, _| { + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. For RGS, since 60 seconds is likely too long, + // we prune after an initial sync completes. + let prune_timer_elapsed = (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -971,75 +916,50 @@ where }, task::Poll::Pending => false, } - })(&mut last_prune_call, prune_timer); + })(&mut last_prune_call); let should_prune = match gossip_sync { GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, }; if should_prune { + // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = gossip_sync.prunable_network_graph() { if let Some(duration_since_epoch) = fetch_time() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Pruning and persisting network graph."), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Pruning and persisting network graph."); network_graph.remove_stale_channels_and_tracking_with_time( duration_since_epoch.as_secs(), ); } else { - logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Warn),None,None,format_args!("Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."),module_path!(),file!(),0u32,None)); - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Persisting network graph."), - module_path!(), - file!(), - 0u32, - None, - )); + log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); + log_trace!(logger, "Persisting network graph."); } - if let Err(e) = (kv_store.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - )) - .await + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) + .await { - logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Error),None,None,format_args!("Error: Failed to persist network graph, check your disk and permissions {}",e),module_path!(),file!(),0u32,None)); + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); } have_pruned = true; } let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - last_prune_call = (|t| sleeper(t))(prune_timer); + last_prune_call = sleeper(prune_timer); } if !have_decayed_scorer { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling time_passed on scorer at startup"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Calling time_passed on scorer at startup"); scorer.write_lock().time_passed(duration_since_epoch); } } have_decayed_scorer = true; } - if (|fut: &mut SleepFuture, _| { + if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -1049,59 +969,34 @@ where }, task::Poll::Pending => false, } - })(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) + })(&mut last_scorer_persist_call) { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling time_passed and persisting scorer"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Calling time_passed and persisting scorer"); scorer.write_lock().time_passed(duration_since_epoch); } else { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Persisting scorer"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Persisting scorer"); } - if let Err(e) = (kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - )) - .await + if let Err(e) = kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Error), - None, - None, - format_args!( - "Error: Failed to persist scorer, check your disk and permissions {}", - e - ), - module_path!(), - file!(), - 0u32, - None, - )); + log_error!( + logger, + "Error: Failed to persist scorer, check your disk and permissions {}", + e + ); } } - last_scorer_persist_call = (|t| sleeper(t))(SCORER_PERSIST_TIMER); + last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); } - if (|fut: &mut SleepFuture, _| { + if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -1111,22 +1006,13 @@ where }, task::Poll::Pending => false, } - })(&mut last_rebroadcast_call, REBROADCAST_TIMER) + })(&mut last_rebroadcast_call) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Rebroadcasting monitor's pending claims"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Rebroadcasting monitor's pending claims"); chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = (|t| sleeper(t))(REBROADCAST_TIMER); + last_rebroadcast_call = sleeper(REBROADCAST_TIMER); } - if (|fut: &mut SleepFuture, _| { + if (|fut: &mut SleepFuture| { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match core::pin::Pin::new(fut).poll(&mut ctx) { @@ -1136,50 +1022,46 @@ where }, task::Poll::Pending => false, } - })(&mut last_sweeper_call, SWEEPER_TIMER) + })(&mut last_sweeper_call) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Regenerating sweeper spends if necessary"), - module_path!(), - file!(), - 0u32, - None, - )); - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - } - }; - last_sweeper_call = (|t| sleeper(t))(SWEEPER_TIMER); + log_trace!(logger, "Regenerating sweeper spends if necessary"); + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + } + last_sweeper_call = sleeper(SWEEPER_TIMER); } } - (kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode(), - )) - .await?; - if let Some(ref scorer) = scorer { - (kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - )) + + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + kv_store + .write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ) .await?; + if let Some(ref scorer) = scorer { + kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await?; } if let Some(network_graph) = gossip_sync.network_graph() { - (kv_store.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - )) - .await?; + kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) + .await?; } Ok(()) } @@ -1380,409 +1262,218 @@ impl BackgroundProcessor { event_handler.handle_event(event) }; let mut batch_delay = BatchDelay::new(); - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling ChannelManager's timer_tick_occurred on startup"), - module_path!(), - file!(), - 0u32, - None, - )); + + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); channel_manager.get_cm().timer_tick_occurred(); - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Rebroadcasting monitor's pending claims on startup"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Rebroadcasting monitor's pending claims on startup"); chain_monitor.rebroadcast_pending_claims(); - let mut last_freshness_call = (|_| Instant::now())(FRESHNESS_TIMER); - let mut last_onion_message_handler_call = - (|_| Instant::now())(ONION_MESSAGE_HANDLER_TIMER); - let mut last_ping_call = (|_| Instant::now())(PING_TIMER); - let mut last_prune_call = (|_| Instant::now())(FIRST_NETWORK_PRUNE_TIMER); - let mut last_scorer_persist_call = (|_| Instant::now())(SCORER_PERSIST_TIMER); - let mut last_rebroadcast_call = (|_| Instant::now())(REBROADCAST_TIMER); - let mut last_sweeper_call = (|_| Instant::now())(SWEEPER_TIMER); + + let mut last_freshness_call = Instant::now(); + let mut last_onion_message_handler_call = Instant::now(); + let mut last_ping_call = Instant::now(); + let mut last_prune_call = Instant::now(); + let mut last_scorer_persist_call = Instant::now(); + let mut last_rebroadcast_call = Instant::now(); + let mut last_sweeper_call = Instant::now(); let mut have_pruned = false; let mut have_decayed_scorer = false; + let mut cur_batch_delay = batch_delay.get(); - let mut last_forwards_processing_call = (|_| Instant::now())(cur_batch_delay); + let mut last_forwards_processing_call = Instant::now(); + loop { - (channel_manager.get_cm().process_pending_events(&event_handler)); - (chain_monitor.process_pending_events(&event_handler)); - (if let Some(om) = &onion_messenger { + channel_manager.get_cm().process_pending_events(&event_handler); + chain_monitor.process_pending_events(&event_handler); + if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) - }); + }; + + // Note that the PeerManager::process_events may block on ChannelManager's locks, + // hence it comes last here. When the ChannelManager finishes whatever it's doing, + // we want to ensure we get into `persist_manager` as quickly as we can, especially + // without running the normal event processing above and handing events to users. + // + // Specifically, on an *extremely* slow machine, we may see ChannelManager start + // processing a message effectively at any point during this loop. In order to + // minimize the time between such processing completing and persisting the updated + // ChannelManager, we want to minimize methods blocking on a ChannelManager + // generally, and as a fallback place such blocking only immediately before + // persistence. peer_manager.as_ref().process_events(); - if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_forwards_processing_call, - cur_batch_delay, - ) { + if last_forwards_processing_call.elapsed() > cur_batch_delay { channel_manager.get_cm().process_pending_htlc_forwards(); cur_batch_delay = batch_delay.next(); - last_forwards_processing_call = (|_| Instant::now())(cur_batch_delay); + last_forwards_processing_call = Instant::now(); } - if (stop_thread.load(Ordering::Acquire)) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Terminating background processor."), - module_path!(), - file!(), - 0u32, - None, - )); + if stop_thread.load(Ordering::Acquire) { + log_trace!(logger, "Terminating background processor."); break; } - let mut await_start = None; - if false { - await_start = Some((|_| Instant::now())(Duration::from_secs(1))); - } - { - let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - (Some(om), Some(lm)) => Sleeper::from_four_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (Some(om), None) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - ), - (None, Some(lm)) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (None, None) => Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ), - }; - let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { - batch_delay.get() - } else { - Duration::MAX - }; - let fastest_timeout = batch_delay.min(Duration::from_millis(100)); - sleeper.wait_timeout(fastest_timeout); + let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { + (Some(om), Some(lm)) => Sleeper::from_four_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + &lm.get_lm().get_pending_msgs_future(), + ), + (Some(om), None) => Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + ), + (None, Some(lm)) => Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &lm.get_lm().get_pending_msgs_future(), + ), + (None, None) => Sleeper::from_two_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + ), }; - let await_slow = if false { - (|time: &Instant, dur| time.elapsed() > dur)( - &mut await_start.unwrap(), - Duration::from_secs(1), - ) + let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { + batch_delay.get() } else { - false + Duration::MAX }; - if (stop_thread.load(Ordering::Acquire)) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Terminating background processor."), - module_path!(), - file!(), - 0u32, - None, - )); + let fastest_timeout = batch_delay.min(Duration::from_millis(100)); + sleeper.wait_timeout(fastest_timeout); + if stop_thread.load(Ordering::Acquire) { + log_trace!(logger, "Terminating background processor."); break; } if channel_manager.get_cm().get_and_clear_needs_persistence() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Persisting ChannelManager..."), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, &channel_manager.get_cm().encode(), ))?; - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Done persisting ChannelManager."), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Done persisting ChannelManager."); } - if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_freshness_call, - FRESHNESS_TIMER, - ) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling ChannelManager's timer_tick_occurred"), - module_path!(), - file!(), - 0u32, - None, - )); + if last_freshness_call.elapsed() > FRESHNESS_TIMER { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = (|_| Instant::now())(FRESHNESS_TIMER); + last_freshness_call = Instant::now(); } - if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_onion_message_handler_call, - ONION_MESSAGE_HANDLER_TIMER, - ) { + if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER { if let Some(om) = &onion_messenger { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling OnionMessageHandler's timer_tick_occurred"), - module_path!(), - file!(), - 0u32, - None, - )); + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); om.get_om().timer_tick_occurred(); } - last_onion_message_handler_call = - (|_| Instant::now())(ONION_MESSAGE_HANDLER_TIMER); + last_onion_message_handler_call = Instant::now(); } - if await_slow { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("100ms sleep took more than a second, disconnecting peers."), - module_path!(), - file!(), - 0u32, - None, - )); - peer_manager.as_ref().disconnect_all_peers(); - last_ping_call = (|_| Instant::now())(PING_TIMER); - } else if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_ping_call, - PING_TIMER, - ) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling PeerManager's timer_tick_occurred"), - module_path!(), - file!(), - 0u32, - None, - )); + if last_ping_call.elapsed() > PING_TIMER { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = (|_| Instant::now())(PING_TIMER); + last_ping_call = Instant::now(); } + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. For RGS, since 60 seconds is likely too long, + // we prune after an initial sync completes. let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - let prune_timer_elapsed = - (|time: &Instant, dur| time.elapsed() > dur)(&mut last_prune_call, prune_timer); + let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer; let should_prune = match gossip_sync { GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, }; if should_prune { + // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = gossip_sync.prunable_network_graph() { - if let Some(duration_since_epoch) = (|| { - use std::time::SystemTime; - Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"), - ) - })() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Pruning and persisting network graph."), - module_path!(), - file!(), - 0u32, - None, - )); - network_graph.remove_stale_channels_and_tracking_with_time( - duration_since_epoch.as_secs(), - ); - } else { - logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Warn),None,None,format_args!("Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."),module_path!(),file!(),0u32,None)); - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Persisting network graph."), - module_path!(), - file!(), - 0u32, - None, - )); - } - if let Err(e) = (kv_store.write( + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + + log_trace!(logger, "Pruning and persisting network graph."); + network_graph.remove_stale_channels_and_tracking_with_time( + duration_since_epoch.as_secs(), + ); + if let Err(e) = kv_store.write( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode(), - )) { - logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Error),None,None,format_args!("Error: Failed to persist network graph, check your disk and permissions {}",e),module_path!(),file!(),0u32,None)); + ) { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); } have_pruned = true; } - let prune_timer = - if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - last_prune_call = (|_| Instant::now())(prune_timer); + last_prune_call = Instant::now(); } if !have_decayed_scorer { if let Some(ref scorer) = scorer { - if let Some(duration_since_epoch) = (|| { - use std::time::SystemTime; - Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"), - ) - })() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling time_passed on scorer at startup"), - module_path!(), - file!(), - 0u32, - None, - )); - scorer.write_lock().time_passed(duration_since_epoch); - } + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + log_trace!(logger, "Calling time_passed on scorer at startup"); + scorer.write_lock().time_passed(duration_since_epoch); } have_decayed_scorer = true; } - if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_scorer_persist_call, - SCORER_PERSIST_TIMER, - ) { + if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER { if let Some(ref scorer) = scorer { - if let Some(duration_since_epoch) = (|| { - use std::time::SystemTime; - Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"), - ) - })() { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Calling time_passed and persisting scorer"), - module_path!(), - file!(), - 0u32, - None, - )); - scorer.write_lock().time_passed(duration_since_epoch); - } else { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Persisting scorer"), - module_path!(), - file!(), - 0u32, - None, - )); - } - if let Err(e) = (kv_store.write( + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + log_trace!(logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + if let Err(e) = kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, &scorer.encode(), - )) { - logger.log(lightning::util::logger::Record::new((lightning::util::logger::Level::Error),None,None,format_args!("Error: Failed to persist scorer, check your disk and permissions {}",e),module_path!(),file!(),0u32,None)); + ) { + log_error!(logger, + "Error: Failed to persist scorer, check your disk and permissions {}", + e, + ); } } - last_scorer_persist_call = (|_| Instant::now())(SCORER_PERSIST_TIMER); + last_scorer_persist_call = Instant::now(); } - if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_rebroadcast_call, - REBROADCAST_TIMER, - ) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Rebroadcasting monitor's pending claims"), - module_path!(), - file!(), - 0u32, - None, - )); + if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = (|_| Instant::now())(REBROADCAST_TIMER); + last_rebroadcast_call = Instant::now(); } - if (|time: &Instant, dur| time.elapsed() > dur)( - &mut last_sweeper_call, - SWEEPER_TIMER, - ) { - logger.log(lightning::util::logger::Record::new( - (lightning::util::logger::Level::Trace), - None, - None, - format_args!("Regenerating sweeper spends if necessary"), - module_path!(), - file!(), - 0u32, - None, - )); - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); - } - }; - last_sweeper_call = (|_| Instant::now())(SWEEPER_TIMER); + if last_sweeper_call.elapsed() > SWEEPER_TIMER { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); + } + last_sweeper_call = Instant::now(); } } - (kv_store.write( + + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + kv_store.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, &channel_manager.get_cm().encode(), - ))?; + )?; if let Some(ref scorer) = scorer { - (kv_store.write( + kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, &scorer.encode(), - ))?; + )?; } if let Some(network_graph) = gossip_sync.network_graph() { - (kv_store.write( + kv_store.write( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode(), - ))?; + )?; } Ok(()) }); From e15f6a3fe4ba525176b9b5d381289a81935c14c7 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 28 Jul 2025 12:35:39 +0200 Subject: [PATCH 3/7] Dedup terminating log --- lightning-background-processor/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 5221e800c2b..be6735c20b4 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -761,7 +761,6 @@ where last_forwards_processing_call = sleeper(cur_batch_delay); } if should_break { - log_trace!(logger, "Terminating background processor."); break; } @@ -819,7 +818,6 @@ where false }; if should_break { - log_trace!(logger, "Terminating background processor."); break; } if channel_manager.get_cm().get_and_clear_needs_persistence() { @@ -1031,6 +1029,7 @@ where last_sweeper_call = sleeper(SWEEPER_TIMER); } } + log_trace!(logger, "Terminating background processor."); // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with From fa026021dddfebcb045ecbf47a386ba934ec3f4d Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 28 Jul 2025 14:32:29 +0200 Subject: [PATCH 4/7] Introduce check_sleeper helper --- lightning-background-processor/src/lib.rs | 267 +++++++++------------- 1 file changed, 109 insertions(+), 158 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index be6735c20b4..c04320e745e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -667,7 +667,6 @@ where D::Target: 'static + ChangeDestinationSource, K::Target: 'static + KVStore, { - let mut should_break = false; let async_event_handler = |event| { let network_graph = gossip_sync.network_graph(); let event_handler = &event_handler; @@ -744,24 +743,14 @@ where // generally, and as a fallback place such blocking only immediately before // persistence. peer_manager.as_ref().process_events(); - if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut last_forwards_processing_call) - { - channel_manager.get_cm().process_pending_htlc_forwards(); - cur_batch_delay = batch_delay.next(); - last_forwards_processing_call = sleeper(cur_batch_delay); - } - if should_break { - break; + match check_sleeper(&mut last_forwards_processing_call) { + Some(false) => { + channel_manager.get_cm().process_pending_htlc_forwards(); + cur_batch_delay = batch_delay.next(); + last_forwards_processing_call = sleeper(cur_batch_delay); + }, + Some(true) => break, + None => {}, } // We wait up to 100ms, but track how long it takes to detect being put to sleep, @@ -799,27 +788,21 @@ where match fut.await { SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {}, SelectorOutput::E(exit) => { - should_break = exit; + if exit { + break; + } }, } + let await_slow = if mobile_interruptable_platform { - (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut await_start.unwrap()) + match check_sleeper(&mut await_start.unwrap()) { + Some(true) => break, + Some(false) => true, + None => false, + } } else { false }; - if should_break { - break; - } if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); kv_store @@ -832,39 +815,25 @@ where .await?; log_trace!(logger, "Done persisting ChannelManager."); } - if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut last_freshness_call) - { - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); - channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = sleeper(FRESHNESS_TIMER); - } - if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut last_onion_message_handler_call) - { - if let Some(om) = &onion_messenger { - log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); - om.get_om().timer_tick_occurred(); - } - last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); + match check_sleeper(&mut last_freshness_call) { + Some(false) => { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = sleeper(FRESHNESS_TIMER); + }, + Some(true) => break, + None => {}, + } + match check_sleeper(&mut last_onion_message_handler_call) { + Some(false) => { + if let Some(om) = &onion_messenger { + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); + }, + Some(true) => break, + None => {}, } if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. @@ -882,21 +851,16 @@ where log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); peer_manager.as_ref().disconnect_all_peers(); last_ping_call = sleeper(PING_TIMER); - } else if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true + } else { + match check_sleeper(&mut last_ping_call) { + Some(false) => { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = sleeper(PING_TIMER); }, - task::Poll::Pending => false, + Some(true) => break, + _ => {}, } - })(&mut last_ping_call) - { - log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); - peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = sleeper(PING_TIMER); } // Note that we want to run a graph prune once not long after startup before @@ -904,17 +868,14 @@ where // pruning their network graph. We run once 60 seconds after startup before // continuing our normal cadence. For RGS, since 60 seconds is likely too long, // we prune after an initial sync completes. - let prune_timer_elapsed = (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, + let prune_timer_elapsed = { + match check_sleeper(&mut last_prune_call) { + Some(false) => true, + Some(true) => break, + None => false, } - })(&mut last_prune_call); + }; + let should_prune = match gossip_sync { GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, _ => prune_timer_elapsed, @@ -957,76 +918,55 @@ where } have_decayed_scorer = true; } - if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut last_scorer_persist_call) - { - if let Some(ref scorer) = scorer { - if let Some(duration_since_epoch) = fetch_time() { - log_trace!(logger, "Calling time_passed and persisting scorer"); - scorer.write_lock().time_passed(duration_since_epoch); - } else { - log_trace!(logger, "Persisting scorer"); + match check_sleeper(&mut last_scorer_persist_call) { + Some(false) => { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + log_trace!(logger, "Persisting scorer"); + } + if let Err(e) = kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await + { + log_error!( + logger, + "Error: Failed to persist scorer, check your disk and permissions {}", + e + ); + } } - if let Err(e) = kv_store - .write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ) - .await - { - log_error!( - logger, - "Error: Failed to persist scorer, check your disk and permissions {}", - e - ); + last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); + }, + Some(true) => break, + None => {}, + } + match check_sleeper(&mut last_rebroadcast_call) { + Some(false) => { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = sleeper(REBROADCAST_TIMER); + }, + Some(true) => break, + None => {}, + } + match check_sleeper(&mut last_sweeper_call) { + Some(false) => { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + if let Some(ref sweeper) = sweeper { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; } - } - last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); - } - if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut last_rebroadcast_call) - { - log_trace!(logger, "Rebroadcasting monitor's pending claims"); - chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = sleeper(REBROADCAST_TIMER); - } - if (|fut: &mut SleepFuture| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true - }, - task::Poll::Pending => false, - } - })(&mut last_sweeper_call) - { - log_trace!(logger, "Regenerating sweeper spends if necessary"); - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; - } - last_sweeper_call = sleeper(SWEEPER_TIMER); + last_sweeper_call = sleeper(SWEEPER_TIMER); + }, + Some(true) => break, + None => {}, } } log_trace!(logger, "Terminating background processor."); @@ -1065,6 +1005,17 @@ where Ok(()) } +fn check_sleeper + core::marker::Unpin>( + fut: &mut SleepFuture, +) -> Option { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => Some(exit), + task::Poll::Pending => None, + } +} + /// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for /// synchronous background persistence. pub async fn process_events_async_with_kv_store_sync< From ec8e1ca5bc3d1046043196431dfcff4756d33c01 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 29 Jul 2025 09:45:24 +0200 Subject: [PATCH 5/7] Re-order async background processor tasks Prepare for parallelization. --- lightning-background-processor/src/lib.rs | 160 +++++++++++----------- 1 file changed, 82 insertions(+), 78 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c04320e745e..af122843350 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -803,6 +803,15 @@ where } else { false }; + match check_sleeper(&mut last_freshness_call) { + Some(false) => { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = sleeper(FRESHNESS_TIMER); + }, + Some(true) => break, + None => {}, + } if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); kv_store @@ -815,53 +824,6 @@ where .await?; log_trace!(logger, "Done persisting ChannelManager."); } - match check_sleeper(&mut last_freshness_call) { - Some(false) => { - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); - channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = sleeper(FRESHNESS_TIMER); - }, - Some(true) => break, - None => {}, - } - match check_sleeper(&mut last_onion_message_handler_call) { - Some(false) => { - if let Some(om) = &onion_messenger { - log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); - om.get_om().timer_tick_occurred(); - } - last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); - }, - Some(true) => break, - None => {}, - } - if await_slow { - // On various platforms, we may be starved of CPU cycles for several reasons. - // E.g. on iOS, if we've been in the background, we will be entirely paused. - // Similarly, if we're on a desktop platform and the device has been asleep, we - // may not get any cycles. - // We detect this by checking if our max-100ms-sleep, above, ran longer than a - // full second, at which point we assume sockets may have been killed (they - // appear to be at least on some platforms, even if it has only been a second). - // Note that we have to take care to not get here just because user event - // processing was slow at the top of the loop. For example, the sample client - // may call Bitcoin Core RPCs during event handling, which very often takes - // more than a handful of seconds to complete, and shouldn't disconnect all our - // peers. - log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); - peer_manager.as_ref().disconnect_all_peers(); - last_ping_call = sleeper(PING_TIMER); - } else { - match check_sleeper(&mut last_ping_call) { - Some(false) => { - log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); - peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = sleeper(PING_TIMER); - }, - Some(true) => break, - _ => {}, - } - } // Note that we want to run a graph prune once not long after startup before // falling back to our usual hourly prunes. This avoids short-lived clients never @@ -948,15 +910,6 @@ where Some(true) => break, None => {}, } - match check_sleeper(&mut last_rebroadcast_call) { - Some(false) => { - log_trace!(logger, "Rebroadcasting monitor's pending claims"); - chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = sleeper(REBROADCAST_TIMER); - }, - Some(true) => break, - None => {}, - } match check_sleeper(&mut last_sweeper_call) { Some(false) => { log_trace!(logger, "Regenerating sweeper spends if necessary"); @@ -968,6 +921,57 @@ where Some(true) => break, None => {}, } + match check_sleeper(&mut last_onion_message_handler_call) { + Some(false) => { + if let Some(om) = &onion_messenger { + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); + }, + Some(true) => break, + None => {}, + } + + // Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers. + if await_slow { + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // We detect this by checking if our max-100ms-sleep, above, ran longer than a + // full second, at which point we assume sockets may have been killed (they + // appear to be at least on some platforms, even if it has only been a second). + // Note that we have to take care to not get here just because user event + // processing was slow at the top of the loop. For example, the sample client + // may call Bitcoin Core RPCs during event handling, which very often takes + // more than a handful of seconds to complete, and shouldn't disconnect all our + // peers. + log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); + peer_manager.as_ref().disconnect_all_peers(); + last_ping_call = sleeper(PING_TIMER); + } else { + match check_sleeper(&mut last_ping_call) { + Some(false) => { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = sleeper(PING_TIMER); + }, + Some(true) => break, + _ => {}, + } + } + + // Rebroadcast pending claims. + match check_sleeper(&mut last_rebroadcast_call) { + Some(false) => { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = sleeper(REBROADCAST_TIMER); + }, + Some(true) => break, + None => {}, + } } log_trace!(logger, "Terminating background processor."); @@ -1292,6 +1296,11 @@ impl BackgroundProcessor { log_trace!(logger, "Terminating background processor."); break; } + if last_freshness_call.elapsed() > FRESHNESS_TIMER { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = Instant::now(); + } if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1302,23 +1311,6 @@ impl BackgroundProcessor { ))?; log_trace!(logger, "Done persisting ChannelManager."); } - if last_freshness_call.elapsed() > FRESHNESS_TIMER { - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); - channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = Instant::now(); - } - if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER { - if let Some(om) = &onion_messenger { - log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); - om.get_om().timer_tick_occurred(); - } - last_onion_message_handler_call = Instant::now(); - } - if last_ping_call.elapsed() > PING_TIMER { - log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); - peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = Instant::now(); - } // Note that we want to run a graph prune once not long after startup before // falling back to our usual hourly prunes. This avoids short-lived clients never @@ -1386,11 +1378,6 @@ impl BackgroundProcessor { } last_scorer_persist_call = Instant::now(); } - if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER { - log_trace!(logger, "Rebroadcasting monitor's pending claims"); - chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = Instant::now(); - } if last_sweeper_call.elapsed() > SWEEPER_TIMER { log_trace!(logger, "Regenerating sweeper spends if necessary"); if let Some(ref sweeper) = sweeper { @@ -1398,6 +1385,23 @@ impl BackgroundProcessor { } last_sweeper_call = Instant::now(); } + if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER { + if let Some(om) = &onion_messenger { + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = Instant::now(); + } + if last_ping_call.elapsed() > PING_TIMER { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = Instant::now(); + } + if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = Instant::now(); + } } // After we exit, ensure we persist the ChannelManager one final time - this avoids From 269cff77eae1f3d6a2c1303f0bf62cdf51aec262 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 28 Jul 2025 20:15:42 +0200 Subject: [PATCH 6/7] Remove unnecessary Copy bound from MultiResultFuturePoller --- lightning/src/util/async_poll.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 93be60adfd0..024d433cf41 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -15,27 +15,22 @@ use core::marker::Unpin; use core::pin::Pin; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -pub(crate) enum ResultFuture>, E: Copy + Unpin> { +pub(crate) enum ResultFuture>, E: Unpin> { Pending(F), Ready(Result<(), E>), } -pub(crate) struct MultiResultFuturePoller< - F: Future> + Unpin, - E: Copy + Unpin, -> { +pub(crate) struct MultiResultFuturePoller> + Unpin, E: Unpin> { futures_state: Vec>, } -impl> + Unpin, E: Copy + Unpin> MultiResultFuturePoller { +impl> + Unpin, E: Unpin> MultiResultFuturePoller { pub fn new(futures_state: Vec>) -> Self { Self { futures_state } } } -impl> + Unpin, E: Copy + Unpin> Future - for MultiResultFuturePoller -{ +impl> + Unpin, E: Unpin> Future for MultiResultFuturePoller { type Output = Vec>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { let mut have_pending_futures = false; From d9a66412f863bb5bed89e96748d8794e55a41971 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 28 Jul 2025 16:56:16 +0200 Subject: [PATCH 7/7] Parallelize persistence in the async bg processor Co-authored-by: Matt Corallo --- lightning-background-processor/src/lib.rs | 209 ++++++++++++++++++---- 1 file changed, 177 insertions(+), 32 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index af122843350..f53023a0635 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -443,9 +443,117 @@ pub(crate) mod futures_util { pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } + + enum JoinerResult> + Unpin> { + Pending(Option), + Ready(Result<(), E>), + } + + pub(crate) struct Joiner< + E, + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + D: Future> + Unpin, + > { + a: JoinerResult, + b: JoinerResult, + c: JoinerResult, + d: JoinerResult, + } + + impl< + E, + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + D: Future> + Unpin, + > Joiner + { + pub(crate) fn new() -> Self { + Self { + a: JoinerResult::Pending(None), + b: JoinerResult::Pending(None), + c: JoinerResult::Pending(None), + d: JoinerResult::Pending(None), + } + } + + pub(crate) fn set_a(&mut self, fut: A) { + self.a = JoinerResult::Pending(Some(fut)); + } + pub(crate) fn set_b(&mut self, fut: B) { + self.b = JoinerResult::Pending(Some(fut)); + } + pub(crate) fn set_c(&mut self, fut: C) { + self.c = JoinerResult::Pending(Some(fut)); + } + pub(crate) fn set_d(&mut self, fut: D) { + self.d = JoinerResult::Pending(Some(fut)); + } + } + + impl< + E, + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + D: Future> + Unpin, + > Future for Joiner + where + Joiner: Unpin, + { + type Output = [Result<(), E>; 4]; + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + let mut all_complete = true; + macro_rules! handle { + ($val: ident) => { + match &mut (self.$val) { + JoinerResult::Pending(None) => { + self.$val = JoinerResult::Ready(Ok(())); + }, + JoinerResult::::Pending(Some(ref mut val)) => { + match Pin::new(val).poll(ctx) { + Poll::Ready(res) => { + self.$val = JoinerResult::Ready(res); + }, + Poll::Pending => { + all_complete = false; + }, + } + }, + JoinerResult::Ready(_) => {}, + } + }; + } + handle!(a); + handle!(b); + handle!(c); + handle!(d); + + if all_complete { + let mut res = [Ok(()), Ok(()), Ok(()), Ok(())]; + if let JoinerResult::Ready(ref mut val) = &mut self.a { + core::mem::swap(&mut res[0], val); + } + if let JoinerResult::Ready(ref mut val) = &mut self.b { + core::mem::swap(&mut res[1], val); + } + if let JoinerResult::Ready(ref mut val) = &mut self.c { + core::mem::swap(&mut res[2], val); + } + if let JoinerResult::Ready(ref mut val) = &mut self.d { + core::mem::swap(&mut res[3], val); + } + Poll::Ready(res) + } else { + Poll::Pending + } + } + } } use core::task; -use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; +use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput}; /// Processes background events in a future. /// @@ -812,16 +920,25 @@ where Some(true) => break, None => {}, } + + let mut futures = Joiner::new(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); - kv_store - .write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.get_cm().encode(), - ) - .await?; + + let fut = async { + kv_store + .write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ) + .await + }; + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_a(Box::pin(fut)); + log_trace!(logger, "Done persisting ChannelManager."); } @@ -854,17 +971,25 @@ where log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); log_trace!(logger, "Persisting network graph."); } - if let Err(e) = kv_store - .write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - ) - .await - { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); - } + let fut = async { + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) + .await + { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + + Ok(()) + }; + + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_b(Box::pin(fut)); + have_pruned = true; } let prune_timer = @@ -889,21 +1014,28 @@ where } else { log_trace!(logger, "Persisting scorer"); } - if let Err(e) = kv_store - .write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ) - .await - { - log_error!( + let fut = async { + if let Err(e) = kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await + { + log_error!( logger, "Error: Failed to persist scorer, check your disk and permissions {}", e ); - } + } + + Ok(()) + }; + + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_c(Box::pin(fut)); } last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); }, @@ -914,13 +1046,26 @@ where Some(false) => { log_trace!(logger, "Regenerating sweeper spends if necessary"); if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + let fut = async { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + + Ok(()) + }; + + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_d(Box::pin(fut)); } last_sweeper_call = sleeper(SWEEPER_TIMER); }, Some(true) => break, None => {}, } + + // Run persistence tasks in parallel and exit if any of them returns an error. + for res in futures.await { + res?; + } + match check_sleeper(&mut last_onion_message_handler_call) { Some(false) => { if let Some(om) = &onion_messenger {