Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

av-store: Move prune on a separate thread #7263

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 62 additions & 9 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use std::{
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};

use futures::{channel::oneshot, future, select, FutureExt};
use futures::{
channel::{
mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
oneshot,
},
future, select, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
Expand Down Expand Up @@ -540,9 +546,17 @@ impl<Context> AvailabilityStoreSubsystem {
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

// Pruning interval is in the order of minutes so we shouldn't have more than one task running
// at one moment in time, so 10 should be more than enough.
let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
loop {
let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await;
let res = run_iteration(
&mut ctx,
&mut subsystem,
&mut next_pruning,
(&mut pruning_result_tx, &mut pruning_result_rx),
)
.await;
match res {
Err(e) => {
e.trace();
Expand All @@ -564,6 +578,10 @@ async fn run_iteration<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut next_pruning: &mut future::Fuse<Delay>,
(pruning_result_tx, pruning_result_rx): (
&mut MpscSender<Result<(), Error>>,
&mut MpscReceiver<Result<(), Error>>,
),
Comment on lines +581 to +584
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be simpler for this not to be a tuple, and it would save two lines. 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean just to have two separate params pruning_result_tx and pruning_result_rx ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Just a style nit, feel free to ignore if you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep it as it is than :) , since I think it makes sense to try to have closely related params in the same data structure.

) -> Result<bool, Error> {
select! {
incoming = ctx.recv().fuse() => {
Expand Down Expand Up @@ -612,15 +630,51 @@ async fn run_iteration<Context>(
// It's important to set the delay before calling `prune_all` because an error in `prune_all`
// could lead to the delay not being set again. Then we would never prune anything anymore.
*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

let _timer = subsystem.metrics.time_pruning();
prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?;
}
start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
},
// Received the prune result and propagate the errors, so that in case of a fatal error
// the main loop of the subsystem can exit graciously.
result = pruning_result_rx.next() => {
if let Some(result) = result {
result?;
}
},
}

Ok(false)
}

// Start prune-all on a separate thread, so that in the case when the operation takes
// longer than expected we don't keep the whole subsystem blocked.
// See: https://github.com/paritytech/polkadot/issues/7237 for more details.
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn start_prune_all<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut pruning_result_tx: MpscSender<Result<(), Error>>,
) -> Result<(), Error> {
let metrics = subsystem.metrics.clone();
let db = subsystem.db.clone();
let config = subsystem.config;
let time_now = subsystem.clock.now()?;

ctx.spawn_blocking(
"av-store-prunning",
Box::pin(async move {
let _timer = metrics.time_pruning();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd also add a debug! log here to know when pruning starts and ends. It would be nice to have when debugging.

Copy link
Contributor Author

@alexggh alexggh May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep will do.
Done


gum::debug!(target: LOG_TARGET, "Prunning started");
let result = prune_all(&db, &config, time_now);

if let Err(err) = pruning_result_tx.send(result).await {
// This usually means that the node is closing down, log it just in case
gum::debug!(target: LOG_TARGET, ?err, "Failed to send prune_all result",);
}
}),
)?;
Ok(())
}

#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn process_block_activated<Context>(
ctx: &mut Context,
Expand Down Expand Up @@ -1250,8 +1304,7 @@ fn store_available_data(
Ok(())
}

fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
let now = clock.now()?;
fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
let (range_start, range_end) = pruning_range(now);

let mut tx = DBTransaction::new();
Expand Down