Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
av-store: Move prune on a separate thread
Browse files Browse the repository at this point in the history
There are situations where pruning of the data could take more than a few
seconds and that might make the whole subsystem unreponsive. To avoid this just
move the prune process on a separate thread.

See: #7237, for more details.

Signed-off-by: Alexandru Gheorghe <[email protected]>
  • Loading branch information
alexggh committed May 23, 2023
1 parent e05b786 commit f34c93f
Showing 1 changed file with 63 additions and 9 deletions.
72 changes: 63 additions & 9 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use std::{
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};

use futures::{channel::oneshot, future, select, FutureExt};
use futures::{
channel::{
mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender},
oneshot,
},
future, select, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode, Error as CodecError, Input};
use polkadot_node_subsystem_util::database::{DBTransaction, Database};
Expand Down Expand Up @@ -540,9 +546,17 @@ impl<Context> AvailabilityStoreSubsystem {
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context) {
let mut next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

// Pruning interval is in the order of minutes so we shouldn't have more than one task running
// at one moment in time, so 10 should be more than enough.
let (mut pruning_result_tx, mut pruning_result_rx) = channel(10);
loop {
let res = run_iteration(&mut ctx, &mut subsystem, &mut next_pruning).await;
let res = run_iteration(
&mut ctx,
&mut subsystem,
&mut next_pruning,
(&mut pruning_result_tx, &mut pruning_result_rx),
)
.await;
match res {
Err(e) => {
e.trace();
Expand All @@ -564,6 +578,10 @@ async fn run_iteration<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut next_pruning: &mut future::Fuse<Delay>,
(pruning_result_tx, pruning_result_rx): (
&mut MpscSender<Result<(), Error>>,
&mut MpscReceiver<Result<(), Error>>,
),
) -> Result<bool, Error> {
select! {
incoming = ctx.recv().fuse() => {
Expand Down Expand Up @@ -612,15 +630,52 @@ async fn run_iteration<Context>(
// It's important to set the delay before calling `prune_all` because an error in `prune_all`
// could lead to the delay not being set again. Then we would never prune anything anymore.
*next_pruning = Delay::new(subsystem.pruning_config.pruning_interval).fuse();

let _timer = subsystem.metrics.time_pruning();
prune_all(&subsystem.db, &subsystem.config, &*subsystem.clock)?;
}
start_prune_all(ctx, subsystem, pruning_result_tx.clone()).await?;
},
// Received the prune result and propagate the errors, so that in case of a fatal error
// the main loop of the subsystem can exit graciously.
result = pruning_result_rx.next() => {
if let Some(result) = result {
result?;
}
},
}

Ok(false)
}

// Start prune-all on a separate thread, so that in the case when the operation takes
// longer than expected we don't keep the whole subsystem blocked.
// See: https://github.com/paritytech/polkadot/issues/7237 for more details.
#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn start_prune_all<Context>(
ctx: &mut Context,
subsystem: &mut AvailabilityStoreSubsystem,
mut pruning_result_tx: MpscSender<Result<(), Error>>,
) -> Result<(), Error> {
let metrics = subsystem.metrics.clone();
let db = subsystem.db.clone();
let config = subsystem.config.clone();
let time_now = subsystem.clock.now()?;

ctx.spawn_blocking(
"av-store-prunning",
Box::pin(async move {
let _timer = metrics.time_pruning();
let result = prune_all(&db, &config, time_now);

if let Err(err) = pruning_result_tx.send(result).await {
gum::error!(
target: LOG_TARGET,
?err,
"Failed to send prune_all result",
);
}
}),
)?;
Ok(())
}

#[overseer::contextbounds(AvailabilityStore, prefix = self::overseer)]
async fn process_block_activated<Context>(
ctx: &mut Context,
Expand Down Expand Up @@ -1250,8 +1305,7 @@ fn store_available_data(
Ok(())
}

fn prune_all(db: &Arc<dyn Database>, config: &Config, clock: &dyn Clock) -> Result<(), Error> {
let now = clock.now()?;
fn prune_all(db: &Arc<dyn Database>, config: &Config, now: Duration) -> Result<(), Error> {
let (range_start, range_end) = pruning_range(now);

let mut tx = DBTransaction::new();
Expand Down

0 comments on commit f34c93f

Please sign in to comment.