diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index e340109d35c..f53023a0635 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -331,247 +331,6 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri true } -macro_rules! maybe_await { - (true, $e:expr) => { - $e.await - }; - (false, $e:expr) => { - $e - }; -} - -macro_rules! define_run_body { - ( - $kv_store: ident, - $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $onion_messenger: ident, $process_onion_message_handler_events: expr, - $peer_manager: ident, $gossip_sync: ident, - $process_sweeper: expr, - $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr, $async_persist: tt, - ) => { { - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); - $channel_manager.get_cm().timer_tick_occurred(); - log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); - $chain_monitor.rebroadcast_pending_claims(); - - let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); - let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); - let mut last_ping_call = $get_timer(PING_TIMER); - let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); - let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); - let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); - let mut last_sweeper_call = $get_timer(SWEEPER_TIMER); - let mut have_pruned = false; - let mut have_decayed_scorer = false; - - let mut cur_batch_delay = $batch_delay.get(); - let mut last_forwards_processing_call = $get_timer(cur_batch_delay); - - loop { - $process_channel_manager_events; - $process_chain_monitor_events; - $process_onion_message_handler_events; - - // Note that the PeerManager::process_events may block on ChannelManager's locks, - // hence it comes last here. When the ChannelManager finishes whatever it's doing, - // we want to ensure we get into `persist_manager` as quickly as we can, especially - // without running the normal event processing above and handing events to users. - // - // Specifically, on an *extremely* slow machine, we may see ChannelManager start - // processing a message effectively at any point during this loop. In order to - // minimize the time between such processing completing and persisting the updated - // ChannelManager, we want to minimize methods blocking on a ChannelManager - // generally, and as a fallback place such blocking only immediately before - // persistence. - $peer_manager.as_ref().process_events(); - - if $timer_elapsed(&mut last_forwards_processing_call, cur_batch_delay) { - $channel_manager.get_cm().process_pending_htlc_forwards(); - cur_batch_delay = $batch_delay.next(); - last_forwards_processing_call = $get_timer(cur_batch_delay); - } - - // Exit the loop if the background processor was requested to stop. - if $loop_exit_check { - log_trace!($logger, "Terminating background processor."); - break; - } - - // We wait up to 100ms, but track how long it takes to detect being put to sleep, - // see `await_start`'s use below. - let mut await_start = None; - if $check_slow_await { await_start = Some($get_timer(Duration::from_secs(1))); } - $await; - let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), Duration::from_secs(1)) } else { false }; - - // Exit the loop if the background processor was requested to stop. - if $loop_exit_check { - log_trace!($logger, "Terminating background processor."); - break; - } - - if $channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!($logger, "Persisting ChannelManager..."); - maybe_await!($async_persist, $kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &$channel_manager.get_cm().encode(), - ))?; - log_trace!($logger, "Done persisting ChannelManager."); - } - if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); - $channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = $get_timer(FRESHNESS_TIMER); - } - if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { - if let Some(om) = &$onion_messenger { - log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); - om.get_om().timer_tick_occurred(); - } - last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); - } - if await_slow { - // On various platforms, we may be starved of CPU cycles for several reasons. - // E.g. on iOS, if we've been in the background, we will be entirely paused. - // Similarly, if we're on a desktop platform and the device has been asleep, we - // may not get any cycles. - // We detect this by checking if our max-100ms-sleep, above, ran longer than a - // full second, at which point we assume sockets may have been killed (they - // appear to be at least on some platforms, even if it has only been a second). - // Note that we have to take care to not get here just because user event - // processing was slow at the top of the loop. For example, the sample client - // may call Bitcoin Core RPCs during event handling, which very often takes - // more than a handful of seconds to complete, and shouldn't disconnect all our - // peers. - log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); - $peer_manager.as_ref().disconnect_all_peers(); - last_ping_call = $get_timer(PING_TIMER); - } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) { - log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); - $peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = $get_timer(PING_TIMER); - } - - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. For RGS, since 60 seconds is likely too long, - // we prune after an initial sync completes. - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer); - let should_prune = match $gossip_sync { - GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, - _ => prune_timer_elapsed, - }; - if should_prune { - // The network graph must not be pruned while rapid sync completion is pending - if let Some(network_graph) = $gossip_sync.prunable_network_graph() { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Pruning and persisting network graph."); - network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs()); - } else { - log_warn!($logger, - "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually." - ); - log_trace!($logger, "Persisting network graph."); - } - - if let Err(e) = maybe_await!($async_persist, $kv_store.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - )) { - log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) - } - - have_pruned = true; - } - let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - last_prune_call = $get_timer(prune_timer); - } - - if !have_decayed_scorer { - if let Some(ref scorer) = $scorer { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Calling time_passed on scorer at startup"); - scorer.write_lock().time_passed(duration_since_epoch); - } - } - have_decayed_scorer = true; - } - - if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { - if let Some(ref scorer) = $scorer { - if let Some(duration_since_epoch) = $time_fetch() { - log_trace!($logger, "Calling time_passed and persisting scorer"); - scorer.write_lock().time_passed(duration_since_epoch); - } else { - log_trace!($logger, "Persisting scorer"); - } - if let Err(e) = maybe_await!($async_persist, $kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - )) { - log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } - } - last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); - } - - if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) { - log_trace!($logger, "Rebroadcasting monitor's pending claims"); - $chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); - } - - if $timer_elapsed(&mut last_sweeper_call, SWEEPER_TIMER) { - log_trace!($logger, "Regenerating sweeper spends if necessary"); - $process_sweeper; - last_sweeper_call = $get_timer(SWEEPER_TIMER); - } - } - - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - maybe_await!($async_persist, $kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - &$channel_manager.get_cm().encode(), - ))?; - - // Persist Scorer on exit - if let Some(ref scorer) = $scorer { - maybe_await!($async_persist, $kv_store.write( - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - &scorer.encode(), - ))?; - } - - // Persist NetworkGraph on exit - if let Some(network_graph) = $gossip_sync.network_graph() { - maybe_await!($async_persist, $kv_store.write( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - &network_graph.encode(), - ))?; - } - - Ok(()) - } } -} - pub(crate) mod futures_util { use core::future::Future; use core::marker::Unpin; @@ -684,9 +443,117 @@ pub(crate) mod futures_util { pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } + + enum JoinerResult> + Unpin> { + Pending(Option), + Ready(Result<(), E>), + } + + pub(crate) struct Joiner< + E, + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + D: Future> + Unpin, + > { + a: JoinerResult, + b: JoinerResult, + c: JoinerResult, + d: JoinerResult, + } + + impl< + E, + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + D: Future> + Unpin, + > Joiner + { + pub(crate) fn new() -> Self { + Self { + a: JoinerResult::Pending(None), + b: JoinerResult::Pending(None), + c: JoinerResult::Pending(None), + d: JoinerResult::Pending(None), + } + } + + pub(crate) fn set_a(&mut self, fut: A) { + self.a = JoinerResult::Pending(Some(fut)); + } + pub(crate) fn set_b(&mut self, fut: B) { + self.b = JoinerResult::Pending(Some(fut)); + } + pub(crate) fn set_c(&mut self, fut: C) { + self.c = JoinerResult::Pending(Some(fut)); + } + pub(crate) fn set_d(&mut self, fut: D) { + self.d = JoinerResult::Pending(Some(fut)); + } + } + + impl< + E, + A: Future> + Unpin, + B: Future> + Unpin, + C: Future> + Unpin, + D: Future> + Unpin, + > Future for Joiner + where + Joiner: Unpin, + { + type Output = [Result<(), E>; 4]; + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + let mut all_complete = true; + macro_rules! handle { + ($val: ident) => { + match &mut (self.$val) { + JoinerResult::Pending(None) => { + self.$val = JoinerResult::Ready(Ok(())); + }, + JoinerResult::::Pending(Some(ref mut val)) => { + match Pin::new(val).poll(ctx) { + Poll::Ready(res) => { + self.$val = JoinerResult::Ready(res); + }, + Poll::Pending => { + all_complete = false; + }, + } + }, + JoinerResult::Ready(_) => {}, + } + }; + } + handle!(a); + handle!(b); + handle!(c); + handle!(d); + + if all_complete { + let mut res = [Ok(()), Ok(()), Ok(()), Ok(())]; + if let JoinerResult::Ready(ref mut val) = &mut self.a { + core::mem::swap(&mut res[0], val); + } + if let JoinerResult::Ready(ref mut val) = &mut self.b { + core::mem::swap(&mut res[1], val); + } + if let JoinerResult::Ready(ref mut val) = &mut self.c { + core::mem::swap(&mut res[2], val); + } + if let JoinerResult::Ready(ref mut val) = &mut self.d { + core::mem::swap(&mut res[3], val); + } + Poll::Ready(res) + } else { + Poll::Pending + } + } + } } use core::task; -use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput}; +use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput}; /// Processes background events in a future. /// @@ -908,7 +775,6 @@ where D::Target: 'static + ChangeDestinationSource, K::Target: 'static + KVStore, { - let mut should_break = false; let async_event_handler = |event| { let network_graph = gossip_sync.network_graph(); let event_handler = &event_handler; @@ -947,79 +813,356 @@ where }) }; let mut batch_delay = BatchDelay::new(); - define_run_body!( - kv_store, - chain_monitor, - chain_monitor.process_pending_events_async(async_event_handler).await, - channel_manager, - channel_manager.get_cm().process_pending_events_async(async_event_handler).await, - onion_messenger, + + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); + channel_manager.get_cm().timer_tick_occurred(); + log_trace!(logger, "Rebroadcasting monitor's pending claims on startup"); + chain_monitor.rebroadcast_pending_claims(); + + let mut last_freshness_call = sleeper(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); + let mut last_ping_call = sleeper(PING_TIMER); + let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER); + let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); + let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER); + let mut last_sweeper_call = sleeper(SWEEPER_TIMER); + let mut have_pruned = false; + let mut have_decayed_scorer = false; + + let mut cur_batch_delay = batch_delay.get(); + let mut last_forwards_processing_call = sleeper(cur_batch_delay); + + loop { + channel_manager.get_cm().process_pending_events_async(async_event_handler).await; + chain_monitor.process_pending_events_async(async_event_handler).await; if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await - }, - peer_manager, - gossip_sync, - { - if let Some(ref sweeper) = sweeper { - let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + } + + // Note that the PeerManager::process_events may block on ChannelManager's locks, + // hence it comes last here. When the ChannelManager finishes whatever it's doing, + // we want to ensure we get into `persist_manager` as quickly as we can, especially + // without running the normal event processing above and handing events to users. + // + // Specifically, on an *extremely* slow machine, we may see ChannelManager start + // processing a message effectively at any point during this loop. In order to + // minimize the time between such processing completing and persisting the updated + // ChannelManager, we want to minimize methods blocking on a ChannelManager + // generally, and as a fallback place such blocking only immediately before + // persistence. + peer_manager.as_ref().process_events(); + match check_sleeper(&mut last_forwards_processing_call) { + Some(false) => { + channel_manager.get_cm().process_pending_htlc_forwards(); + cur_batch_delay = batch_delay.next(); + last_forwards_processing_call = sleeper(cur_batch_delay); + }, + Some(true) => break, + None => {}, + } + + // We wait up to 100ms, but track how long it takes to detect being put to sleep, + // see `await_start`'s use below. + let mut await_start = None; + if mobile_interruptable_platform { + await_start = Some(sleeper(Duration::from_secs(1))); + } + let om_fut = if let Some(om) = onion_messenger.as_ref() { + let fut = om.get_om().get_update_future(); + OptionalSelector { optional_future: Some(fut) } + } else { + OptionalSelector { optional_future: None } + }; + let lm_fut = if let Some(lm) = liquidity_manager.as_ref() { + let fut = lm.get_lm().get_pending_msgs_future(); + OptionalSelector { optional_future: Some(fut) } + } else { + OptionalSelector { optional_future: None } + }; + let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); + let sleep_delay = match (needs_processing, mobile_interruptable_platform) { + (true, true) => batch_delay.get().min(Duration::from_millis(100)), + (true, false) => batch_delay.get().min(FASTEST_TIMER), + (false, true) => Duration::from_millis(100), + (false, false) => FASTEST_TIMER, + }; + let fut = Selector { + a: channel_manager.get_cm().get_event_or_persistence_needed_future(), + b: chain_monitor.get_update_future(), + c: om_fut, + d: lm_fut, + e: sleeper(sleep_delay), + }; + match fut.await { + SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {}, + SelectorOutput::E(exit) => { + if exit { + break; + } + }, + } + + let await_slow = if mobile_interruptable_platform { + match check_sleeper(&mut await_start.unwrap()) { + Some(true) => break, + Some(false) => true, + None => false, } - }, - logger, - scorer, - should_break, - { - let om_fut = if let Some(om) = onion_messenger.as_ref() { - let fut = om.get_om().get_update_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; - let lm_fut = if let Some(lm) = liquidity_manager.as_ref() { - let fut = lm.get_lm().get_pending_msgs_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; + } else { + false + }; + match check_sleeper(&mut last_freshness_call) { + Some(false) => { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = sleeper(FRESHNESS_TIMER); + }, + Some(true) => break, + None => {}, + } - let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); - let sleep_delay = match (needs_processing, mobile_interruptable_platform) { - (true, true) => batch_delay.get().min(Duration::from_millis(100)), - (true, false) => batch_delay.get().min(FASTEST_TIMER), - (false, true) => Duration::from_millis(100), - (false, false) => FASTEST_TIMER, - }; + let mut futures = Joiner::new(); - let fut = Selector { - a: channel_manager.get_cm().get_event_or_persistence_needed_future(), - b: chain_monitor.get_update_future(), - c: om_fut, - d: lm_fut, - e: sleeper(sleep_delay), + if channel_manager.get_cm().get_and_clear_needs_persistence() { + log_trace!(logger, "Persisting ChannelManager..."); + + let fut = async { + kv_store + .write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ) + .await }; - match fut.await { - SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {}, - SelectorOutput::E(exit) => { - should_break = exit; - }, + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_a(Box::pin(fut)); + + log_trace!(logger, "Done persisting ChannelManager."); + } + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. For RGS, since 60 seconds is likely too long, + // we prune after an initial sync completes. + let prune_timer_elapsed = { + match check_sleeper(&mut last_prune_call) { + Some(false) => true, + Some(true) => break, + None => false, } - }, - |t| sleeper(t), - |fut: &mut SleepFuture, _| { - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => { - should_break = exit; - true + }; + + let should_prune = match gossip_sync { + GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, + _ => prune_timer_elapsed, + }; + if should_prune { + // The network graph must not be pruned while rapid sync completion is pending + if let Some(network_graph) = gossip_sync.prunable_network_graph() { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Pruning and persisting network graph."); + network_graph.remove_stale_channels_and_tracking_with_time( + duration_since_epoch.as_secs(), + ); + } else { + log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually."); + log_trace!(logger, "Persisting network graph."); + } + let fut = async { + if let Err(e) = kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) + .await + { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + + Ok(()) + }; + + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_b(Box::pin(fut)); + + have_pruned = true; + } + let prune_timer = + if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + last_prune_call = sleeper(prune_timer); + } + if !have_decayed_scorer { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Calling time_passed on scorer at startup"); + scorer.write_lock().time_passed(duration_since_epoch); + } + } + have_decayed_scorer = true; + } + match check_sleeper(&mut last_scorer_persist_call) { + Some(false) => { + if let Some(ref scorer) = scorer { + if let Some(duration_since_epoch) = fetch_time() { + log_trace!(logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + log_trace!(logger, "Persisting scorer"); + } + let fut = async { + if let Err(e) = kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await + { + log_error!( + logger, + "Error: Failed to persist scorer, check your disk and permissions {}", + e + ); + } + + Ok(()) + }; + + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_c(Box::pin(fut)); + } + last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); + }, + Some(true) => break, + None => {}, + } + match check_sleeper(&mut last_sweeper_call) { + Some(false) => { + log_trace!(logger, "Regenerating sweeper spends if necessary"); + if let Some(ref sweeper) = sweeper { + let fut = async { + let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await; + + Ok(()) + }; + + // TODO: Once our MSRV is 1.68 we should be able to drop the Box + futures.set_d(Box::pin(fut)); + } + last_sweeper_call = sleeper(SWEEPER_TIMER); + }, + Some(true) => break, + None => {}, + } + + // Run persistence tasks in parallel and exit if any of them returns an error. + for res in futures.await { + res?; + } + + match check_sleeper(&mut last_onion_message_handler_call) { + Some(false) => { + if let Some(om) = &onion_messenger { + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); + }, + Some(true) => break, + None => {}, + } + + // Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers. + if await_slow { + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // We detect this by checking if our max-100ms-sleep, above, ran longer than a + // full second, at which point we assume sockets may have been killed (they + // appear to be at least on some platforms, even if it has only been a second). + // Note that we have to take care to not get here just because user event + // processing was slow at the top of the loop. For example, the sample client + // may call Bitcoin Core RPCs during event handling, which very often takes + // more than a handful of seconds to complete, and shouldn't disconnect all our + // peers. + log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); + peer_manager.as_ref().disconnect_all_peers(); + last_ping_call = sleeper(PING_TIMER); + } else { + match check_sleeper(&mut last_ping_call) { + Some(false) => { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = sleeper(PING_TIMER); }, - task::Poll::Pending => false, + Some(true) => break, + _ => {}, } - }, - mobile_interruptable_platform, - fetch_time, - batch_delay, - true, - ) + } + + // Rebroadcast pending claims. + match check_sleeper(&mut last_rebroadcast_call) { + Some(false) => { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = sleeper(REBROADCAST_TIMER); + }, + Some(true) => break, + None => {}, + } + } + log_trace!(logger, "Terminating background processor."); + + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + kv_store + .write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ) + .await?; + if let Some(ref scorer) = scorer { + kv_store + .write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) + .await?; + } + if let Some(network_graph) = gossip_sync.network_graph() { + kv_store + .write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) + .await?; + } + Ok(()) +} + +fn check_sleeper + core::marker::Unpin>( + fut: &mut SleepFuture, +) -> Option { + let mut waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&mut waker); + match core::pin::Pin::new(fut).poll(&mut ctx) { + task::Poll::Ready(exit) => Some(exit), + task::Poll::Pending => None, + } } /// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for @@ -1218,71 +1361,220 @@ impl BackgroundProcessor { event_handler.handle_event(event) }; let mut batch_delay = BatchDelay::new(); - define_run_body!( - kv_store, - chain_monitor, - chain_monitor.process_pending_events(&event_handler), - channel_manager, - channel_manager.get_cm().process_pending_events(&event_handler), - onion_messenger, + + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); + channel_manager.get_cm().timer_tick_occurred(); + log_trace!(logger, "Rebroadcasting monitor's pending claims on startup"); + chain_monitor.rebroadcast_pending_claims(); + + let mut last_freshness_call = Instant::now(); + let mut last_onion_message_handler_call = Instant::now(); + let mut last_ping_call = Instant::now(); + let mut last_prune_call = Instant::now(); + let mut last_scorer_persist_call = Instant::now(); + let mut last_rebroadcast_call = Instant::now(); + let mut last_sweeper_call = Instant::now(); + let mut have_pruned = false; + let mut have_decayed_scorer = false; + + let mut cur_batch_delay = batch_delay.get(); + let mut last_forwards_processing_call = Instant::now(); + + loop { + channel_manager.get_cm().process_pending_events(&event_handler); + chain_monitor.process_pending_events(&event_handler); if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) - }, - peer_manager, - gossip_sync, - { + }; + + // Note that the PeerManager::process_events may block on ChannelManager's locks, + // hence it comes last here. When the ChannelManager finishes whatever it's doing, + // we want to ensure we get into `persist_manager` as quickly as we can, especially + // without running the normal event processing above and handing events to users. + // + // Specifically, on an *extremely* slow machine, we may see ChannelManager start + // processing a message effectively at any point during this loop. In order to + // minimize the time between such processing completing and persisting the updated + // ChannelManager, we want to minimize methods blocking on a ChannelManager + // generally, and as a fallback place such blocking only immediately before + // persistence. + peer_manager.as_ref().process_events(); + if last_forwards_processing_call.elapsed() > cur_batch_delay { + channel_manager.get_cm().process_pending_htlc_forwards(); + cur_batch_delay = batch_delay.next(); + last_forwards_processing_call = Instant::now(); + } + if stop_thread.load(Ordering::Acquire) { + log_trace!(logger, "Terminating background processor."); + break; + } + let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { + (Some(om), Some(lm)) => Sleeper::from_four_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + &lm.get_lm().get_pending_msgs_future(), + ), + (Some(om), None) => Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &om.get_om().get_update_future(), + ), + (None, Some(lm)) => Sleeper::from_three_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + &lm.get_lm().get_pending_msgs_future(), + ), + (None, None) => Sleeper::from_two_futures( + &channel_manager.get_cm().get_event_or_persistence_needed_future(), + &chain_monitor.get_update_future(), + ), + }; + let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { + batch_delay.get() + } else { + Duration::MAX + }; + let fastest_timeout = batch_delay.min(Duration::from_millis(100)); + sleeper.wait_timeout(fastest_timeout); + if stop_thread.load(Ordering::Acquire) { + log_trace!(logger, "Terminating background processor."); + break; + } + if last_freshness_call.elapsed() > FRESHNESS_TIMER { + log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); + channel_manager.get_cm().timer_tick_occurred(); + last_freshness_call = Instant::now(); + } + if channel_manager.get_cm().get_and_clear_needs_persistence() { + log_trace!(logger, "Persisting ChannelManager..."); + (kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + ))?; + log_trace!(logger, "Done persisting ChannelManager."); + } + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. For RGS, since 60 seconds is likely too long, + // we prune after an initial sync completes. + let prune_timer = + if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; + let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer; + let should_prune = match gossip_sync { + GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, + _ => prune_timer_elapsed, + }; + if should_prune { + // The network graph must not be pruned while rapid sync completion is pending + if let Some(network_graph) = gossip_sync.prunable_network_graph() { + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + + log_trace!(logger, "Pruning and persisting network graph."); + network_graph.remove_stale_channels_and_tracking_with_time( + duration_since_epoch.as_secs(), + ); + if let Err(e) = kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + ) { + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + } + have_pruned = true; + } + last_prune_call = Instant::now(); + } + if !have_decayed_scorer { + if let Some(ref scorer) = scorer { + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + log_trace!(logger, "Calling time_passed on scorer at startup"); + scorer.write_lock().time_passed(duration_since_epoch); + } + have_decayed_scorer = true; + } + if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER { + if let Some(ref scorer) = scorer { + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + log_trace!(logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + if let Err(e) = kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + ) { + log_error!(logger, + "Error: Failed to persist scorer, check your disk and permissions {}", + e, + ); + } + } + last_scorer_persist_call = Instant::now(); + } + if last_sweeper_call.elapsed() > SWEEPER_TIMER { + log_trace!(logger, "Regenerating sweeper spends if necessary"); if let Some(ref sweeper) = sweeper { let _ = sweeper.regenerate_and_broadcast_spend_if_necessary(); } - }, - logger, - scorer, - stop_thread.load(Ordering::Acquire), - { - let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - (Some(om), Some(lm)) => Sleeper::from_four_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (Some(om), None) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - ), - (None, Some(lm)) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &lm.get_lm().get_pending_msgs_future(), - ), - (None, None) => Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ), - }; - let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { - batch_delay.get() - } else { - Duration::MAX - }; - let fastest_timeout = batch_delay.min(Duration::from_millis(100)); - sleeper.wait_timeout(fastest_timeout); - }, - |_| Instant::now(), - |time: &Instant, dur| time.elapsed() > dur, - false, - || { - use std::time::SystemTime; - Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("Time should be sometime after 1970"), - ) - }, - batch_delay, - false, - ) + last_sweeper_call = Instant::now(); + } + if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER { + if let Some(om) = &onion_messenger { + log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); + om.get_om().timer_tick_occurred(); + } + last_onion_message_handler_call = Instant::now(); + } + if last_ping_call.elapsed() > PING_TIMER { + log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); + peer_manager.as_ref().timer_tick_occurred(); + last_ping_call = Instant::now(); + } + if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER { + log_trace!(logger, "Rebroadcasting monitor's pending claims"); + chain_monitor.rebroadcast_pending_claims(); + last_rebroadcast_call = Instant::now(); + } + } + + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + kv_store.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.get_cm().encode(), + )?; + if let Some(ref scorer) = scorer { + kv_store.write( + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + &scorer.encode(), + )?; + } + if let Some(network_graph) = gossip_sync.network_graph() { + kv_store.write( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + &network_graph.encode(), + )?; + } + Ok(()) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 93be60adfd0..024d433cf41 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -15,27 +15,22 @@ use core::marker::Unpin; use core::pin::Pin; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -pub(crate) enum ResultFuture>, E: Copy + Unpin> { +pub(crate) enum ResultFuture>, E: Unpin> { Pending(F), Ready(Result<(), E>), } -pub(crate) struct MultiResultFuturePoller< - F: Future> + Unpin, - E: Copy + Unpin, -> { +pub(crate) struct MultiResultFuturePoller> + Unpin, E: Unpin> { futures_state: Vec>, } -impl> + Unpin, E: Copy + Unpin> MultiResultFuturePoller { +impl> + Unpin, E: Unpin> MultiResultFuturePoller { pub fn new(futures_state: Vec>) -> Self { Self { futures_state } } } -impl> + Unpin, E: Copy + Unpin> Future - for MultiResultFuturePoller -{ +impl> + Unpin, E: Unpin> Future for MultiResultFuturePoller { type Output = Vec>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { let mut have_pending_futures = false;