Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 82 additions & 78 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,15 @@ where
} else {
false
};
match check_sleeper(&mut last_freshness_call) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're splitting between async and sync anyway, it would be nice to do the new-sleeper init in check_sleeper - if check_sleeper polls Ready, we aren't allowed to poll again (per the Future API contract) so need to be careful to ensure we always create a new sweeper. It would be easier to just do it in check_sleeper rather than being diligent in review here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it with

fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
	fut: &mut SleepFuture, new_sleeper: impl Fn() -> SleepFuture,
) -> Option<bool> {
	let mut waker = dummy_waker();
	let mut ctx = task::Context::from_waker(&mut waker);
	match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
		task::Poll::Ready(exit) => {
			*fut = new_sleeper();
			Some(exit)
		},
		task::Poll::Pending => None,
	}
}

But that ran into a complication with the network graph. There the new interval is based on whether a prune has already happened. Can probably be refactored to make it work, but perhaps it is also ok to leave this pre-existing weakness for now?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still seems worth fixing. We can just pass NETWORK_PRUNE_TIMER for the network graph prune and in the case where we need to prune but prunable_network_graph returns None we can override it with FIRST_NETWORK_PRUNE_TIMER. That's mostly for the "RGS sync but RGS hasn't finished yet" case anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in a direct follow up to minimize the rebase pain of the PR with macro expansion and code move.

Some(false) => {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = sleeper(FRESHNESS_TIMER);
},
Some(true) => break,
None => {},
}
if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
kv_store
Expand All @@ -815,53 +824,6 @@ where
.await?;
log_trace!(logger, "Done persisting ChannelManager.");
}
match check_sleeper(&mut last_freshness_call) {
Some(false) => {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = sleeper(FRESHNESS_TIMER);
},
Some(true) => break,
None => {},
}
match check_sleeper(&mut last_onion_message_handler_call) {
Some(false) => {
if let Some(om) = &onion_messenger {
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
om.get_om().timer_tick_occurred();
}
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
},
Some(true) => break,
None => {},
}
if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// may not get any cycles.
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
// full second, at which point we assume sockets may have been killed (they
// appear to be at least on some platforms, even if it has only been a second).
// Note that we have to take care to not get here just because user event
// processing was slow at the top of the loop. For example, the sample client
// may call Bitcoin Core RPCs during event handling, which very often takes
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
peer_manager.as_ref().disconnect_all_peers();
last_ping_call = sleeper(PING_TIMER);
} else {
match check_sleeper(&mut last_ping_call) {
Some(false) => {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.as_ref().timer_tick_occurred();
last_ping_call = sleeper(PING_TIMER);
},
Some(true) => break,
_ => {},
}
}

// Note that we want to run a graph prune once not long after startup before
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is now in the wrong spot. It was on the assignment of what the timer's time is, and now its on the sleep itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yes, moved. For the async macro invocation, it was always at the wrong location, because async sets the timer further down. It just wasn't so easy to spot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And moved back after the check_sleeper extension...

// falling back to our usual hourly prunes. This avoids short-lived clients never
Expand Down Expand Up @@ -948,15 +910,6 @@ where
Some(true) => break,
None => {},
}
match check_sleeper(&mut last_rebroadcast_call) {
Some(false) => {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
},
Some(true) => break,
None => {},
}
match check_sleeper(&mut last_sweeper_call) {
Some(false) => {
log_trace!(logger, "Regenerating sweeper spends if necessary");
Expand All @@ -968,6 +921,57 @@ where
Some(true) => break,
None => {},
}
match check_sleeper(&mut last_onion_message_handler_call) {
Some(false) => {
if let Some(om) = &onion_messenger {
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
om.get_om().timer_tick_occurred();
}
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
},
Some(true) => break,
None => {},
}

// Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers.
if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// may not get any cycles.
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
// full second, at which point we assume sockets may have been killed (they
// appear to be at least on some platforms, even if it has only been a second).
// Note that we have to take care to not get here just because user event
// processing was slow at the top of the loop. For example, the sample client
// may call Bitcoin Core RPCs during event handling, which very often takes
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
peer_manager.as_ref().disconnect_all_peers();
last_ping_call = sleeper(PING_TIMER);
} else {
match check_sleeper(&mut last_ping_call) {
Some(false) => {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.as_ref().timer_tick_occurred();
last_ping_call = sleeper(PING_TIMER);
},
Some(true) => break,
_ => {},
}
}

// Rebroadcast pending claims.
match check_sleeper(&mut last_rebroadcast_call) {
Some(false) => {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
},
Some(true) => break,
None => {},
}
}
log_trace!(logger, "Terminating background processor.");

Expand Down Expand Up @@ -1292,6 +1296,11 @@ impl BackgroundProcessor {
log_trace!(logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed() > FRESHNESS_TIMER {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}
if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
Expand All @@ -1302,23 +1311,6 @@ impl BackgroundProcessor {
))?;
log_trace!(logger, "Done persisting ChannelManager.");
}
if last_freshness_call.elapsed() > FRESHNESS_TIMER {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}
if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
if let Some(om) = &onion_messenger {
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
om.get_om().timer_tick_occurred();
}
last_onion_message_handler_call = Instant::now();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this reordering in the sync one needed or done to match the async one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done to match async (#3968 (comment))

if last_ping_call.elapsed() > PING_TIMER {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.as_ref().timer_tick_occurred();
last_ping_call = Instant::now();
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
Expand Down Expand Up @@ -1386,18 +1378,30 @@ impl BackgroundProcessor {
}
last_scorer_persist_call = Instant::now();
}
if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
last_rebroadcast_call = Instant::now();
}
if last_sweeper_call.elapsed() > SWEEPER_TIMER {
log_trace!(logger, "Regenerating sweeper spends if necessary");
if let Some(ref sweeper) = sweeper {
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
}
last_sweeper_call = Instant::now();
}
if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
if let Some(om) = &onion_messenger {
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
om.get_om().timer_tick_occurred();
}
last_onion_message_handler_call = Instant::now();
}
if last_ping_call.elapsed() > PING_TIMER {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.as_ref().timer_tick_occurred();
last_ping_call = Instant::now();
}
if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
last_rebroadcast_call = Instant::now();
}
}

// After we exit, ensure we persist the ChannelManager one final time - this avoids
Expand Down