Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions crates/optimism/cli/src/commands/prune_proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ pub struct PruneOpProofsCommand<C: ChainSpecParser> {
value_name = "PROOFS_HISTORY_WINDOW"
)]
pub proofs_history_window: u64,

/// The batch size for pruning operations.
#[arg(
long = "proofs-history.prune-batch-size",
default_value_t = 1000,
value_name = "PROOFS_HISTORY_PRUNE_BATCH_SIZE"
)]
pub proofs_history_prune_batch_size: u64,
}

impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> PruneOpProofsCommand<C> {
Expand Down Expand Up @@ -63,8 +71,12 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> PruneOpProofsCommand<C> {
"Current proofs storage block range"
);

let pruner =
OpProofStoragePruner::new(storage, provider_factory, self.proofs_history_window);
let pruner = OpProofStoragePruner::new(
storage,
provider_factory,
self.proofs_history_window,
self.proofs_history_prune_batch_size,
);
pruner.run().await;
Ok(())
}
Expand Down
18 changes: 18 additions & 0 deletions crates/optimism/trie/src/prune/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ impl Display for PrunerOutput {
}
}

impl PrunerOutput {
/// extend the current [`PrunerOutput`] with another [`PrunerOutput`]
pub fn extend_ref(&mut self, other: Self) {
self.duration += other.duration;
self.fetch_duration += other.fetch_duration;
self.prune_duration += other.prune_duration;
// take the earliest start block
if self.start_block > other.start_block {
self.start_block = other.start_block;
}
// take the latest end block
if self.end_block < other.end_block {
self.end_block = other.end_block;
}
self.write_counts += other.write_counts;
}
}

/// Error returned by the pruner.
#[derive(Debug, Error, Display)]
pub enum PrunerError {
Expand Down
108 changes: 75 additions & 33 deletions crates/optimism/trie/src/prune/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use alloy_eips::{eip1898::BlockWithParent, BlockNumHash};
use reth_provider::BlockHashReader;
use std::cmp;
use tokio::time::Instant;
use tracing::{error, info, trace};

Expand All @@ -18,6 +19,8 @@ pub struct OpProofStoragePruner<P, H> {
block_hash_reader: H,
/// Keep at least these many recent blocks
min_block_interval: u64,
/// Maximum number of blocks to prune in one database transaction
prune_batch_size: u64,
// TODO: add timeout - Maximum time for one pruner run. If `None`, no timeout.
#[doc(hidden)]
#[cfg(feature = "metrics")]
Expand All @@ -30,11 +33,13 @@ impl<P, H> OpProofStoragePruner<P, H> {
provider: OpProofsStorage<P>,
block_hash_reader: H,
min_block_interval: u64,
prune_batch_size: u64,
) -> Self {
Self {
provider,
block_hash_reader,
min_block_interval,
prune_batch_size,
#[cfg(feature = "metrics")]
metrics: Metrics::default(),
}
Expand All @@ -47,9 +52,6 @@ where
H: BlockHashReader,
{
async fn run_inner(&self) -> OpProofStoragePrunerResult {
let t = Instant::now();
// TODO: handle timeout

let latest_block_opt = self.provider.get_latest_block_number().await?;
if latest_block_opt.is_none() {
trace!(target: "trie::pruner", "No latest blocks in the proof storage");
Expand All @@ -72,20 +74,50 @@ where
}

// at this point `latest_block` is always greater than `min_block_interval`
let new_earliest_block = latest_block - self.min_block_interval;
let target_earliest_block = latest_block - self.min_block_interval;

info!(
target: "trie::pruner",
from_block = earliest_block,
to_block = new_earliest_block,
to_block = target_earliest_block,
"Starting pruning proof storage",
);

let mut final_diff = BlockStateDiff::default();
// Fetch all diffs from (earliest_block + 1) to new_earliest_block (inclusive)
// initial proof data contains the state at `earliest_block`, so we start from
// earliest_block + 1
for i in (earliest_block + 1)..=new_earliest_block {
let mut current_earliest_block = earliest_block;
let mut prune_output = PrunerOutput {
start_block: earliest_block,
end_block: target_earliest_block,
..Default::default()
};

// Prune in batches
while current_earliest_block < target_earliest_block {
// Calculate the end of this batch
let batch_end_block =
cmp::min(current_earliest_block + self.prune_batch_size, target_earliest_block);

let batch_output = self.prune_batch(current_earliest_block, batch_end_block).await?;

prune_output.extend_ref(batch_output);

// Update loop state
current_earliest_block = batch_end_block;
}

Ok(prune_output)
}

/// Prunes a single batch of blocks.
async fn prune_batch(
&self,
start_block: u64,
end_block: u64,
) -> Result<PrunerOutput, PrunerError> {
let batch_start_time = Instant::now();
let mut batch_diff = BlockStateDiff::default();

// Fetch all diffs from (start_block + 1) to end_block (inclusive)
for i in (start_block + 1)..=end_block {
let diff = self.provider.fetch_trie_updates(i).await.inspect_err(|err| {
error!(
target: "trie::pruner",
Expand All @@ -94,24 +126,25 @@ where
"Failed to fetch trie updates for block during pruning"
)
})?;
final_diff.extend_ref(&diff);
batch_diff.extend_ref(&diff);
}
let stat_diff_fetch_duration = t.elapsed();
let fetch_duration = batch_start_time.elapsed();

// Fetch block hashes for the new earliest block of this batch
let new_earliest_block_hash = self
.block_hash_reader
.block_hash(new_earliest_block)
.block_hash(end_block)
.inspect_err(|err| {
error!(
target: "trie::pruner",
block = new_earliest_block,
block = end_block,
?err,
"Failed to fetch block hash for new earliest block during pruning"
)
})?
.ok_or(PrunerError::BlockNotFound(new_earliest_block))?;
.ok_or(PrunerError::BlockNotFound(end_block))?;

let parent_block_num = new_earliest_block - 1;
let parent_block_num = end_block - 1;
let parent_block_hash = self
.block_hash_reader
.block_hash(parent_block_num)
Expand All @@ -127,24 +160,33 @@ where

let block_with_parent = BlockWithParent {
parent: parent_block_hash,
block: BlockNumHash { number: new_earliest_block, hash: new_earliest_block_hash },
block: BlockNumHash { number: end_block, hash: new_earliest_block_hash },
};

let write_count = self.provider.prune_earliest_state(block_with_parent, final_diff).await?;

let total_duration = t.elapsed();
let prune_output = PrunerOutput {
duration: total_duration,
fetch_duration: stat_diff_fetch_duration,
prune_duration: total_duration.saturating_sub(stat_diff_fetch_duration),
start_block: earliest_block,
end_block: new_earliest_block,
write_counts: write_count,
// Commit this batch
let write_counts =
self.provider.prune_earliest_state(block_with_parent, batch_diff).await?;

let duration = batch_start_time.elapsed();
let batch_output = PrunerOutput {
duration,
fetch_duration,
prune_duration: duration.saturating_sub(fetch_duration),
start_block,
end_block,
write_counts,
};

// Record metrics for this batch
#[cfg(feature = "metrics")]
self.metrics.record_prune_result(prune_output.clone());
self.metrics.record_prune_result(batch_output.clone());

Ok(prune_output)
info!(
target: "trie::pruner",
?batch_output,
"Finished pruning batch of proof storage",
);
Ok(batch_output)
}

/// Run the pruner
Expand Down Expand Up @@ -409,7 +451,7 @@ mod tests {
.withf(move |block_num| *block_num == 3)
.returning(move |_| Ok(Some(b256(3))));

let pruner = OpProofStoragePruner::new(store.clone(), block_hash_reader, 1);
let pruner = OpProofStoragePruner::new(store.clone(), block_hash_reader, 1, 1000);
let out = pruner.run_inner().await.expect("pruner ok");
assert_eq!(out.start_block, 0);
assert_eq!(out.end_block, 4, "pruned up to 4 (inclusive); new earliest is 4");
Expand Down Expand Up @@ -508,7 +550,7 @@ mod tests {
assert!(latest.is_none());

let block_hash_reader = MockBlockHashReader::new();
let pruner = OpProofStoragePruner::new(store, block_hash_reader, 10);
let pruner = OpProofStoragePruner::new(store, block_hash_reader, 10, 1000);
let out = pruner.run_inner().await.expect("ok");
assert_eq!(out, PrunerOutput::default(), "should early-return default output");
}
Expand All @@ -534,7 +576,7 @@ mod tests {
assert_eq!(latest.unwrap().0, 3);

let block_hash_reader = MockBlockHashReader::new();
let pruner = OpProofStoragePruner::new(store, block_hash_reader, 1);
let pruner = OpProofStoragePruner::new(store, block_hash_reader, 1, 1000);
let out = pruner.run_inner().await.expect("ok");
assert_eq!(out, PrunerOutput::default(), "should early-return default output");
}
Expand Down Expand Up @@ -565,7 +607,7 @@ mod tests {

// Require min_block_interval=2 (or greater) so interval < min
let block_hash_reader = MockBlockHashReader::new();
let pruner = OpProofStoragePruner::new(store, block_hash_reader, 2);
let pruner = OpProofStoragePruner::new(store, block_hash_reader, 2, 1000);
let out = pruner.run_inner().await.expect("ok");
assert_eq!(out, PrunerOutput::default(), "no pruning should occur");
}
Expand Down
5 changes: 4 additions & 1 deletion crates/optimism/trie/src/prune/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tokio::{
};
use tracing::info;

const PRUNE_BATCH_SIZE: u64 = 200;

/// Periodic pruner task: constructs the pruner and runs it every interval.
#[derive(Debug)]
pub struct OpProofStoragePrunerTask<P, H> {
Expand All @@ -27,7 +29,8 @@ where
min_block_interval: u64,
task_run_interval: Duration,
) -> Self {
let pruner = OpProofStoragePruner::new(provider, hash_reader, min_block_interval);
let pruner =
OpProofStoragePruner::new(provider, hash_reader, min_block_interval, PRUNE_BATCH_SIZE);
Self { pruner, min_block_interval, task_run_interval }
}

Expand Down
5 changes: 5 additions & 0 deletions docs/vocs/docs/pages/cli/op-reth/prune-op-proofs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ Static Files:

[default: 1296000]

--proofs-history.prune-batch-size <PROOFS_HISTORY_PRUNE_BATCH_SIZE>
The batch size for pruning operations

[default: 1000]

Logging:
--log.stdout.format <FORMAT>
The format to use for logs written to stdout
Expand Down
Loading