diff --git a/crates/prune/prune/Cargo.toml b/crates/prune/prune/Cargo.toml index d86a35eaf14..75be443ae34 100644 --- a/crates/prune/prune/Cargo.toml +++ b/crates/prune/prune/Cargo.toml @@ -51,3 +51,7 @@ reth-testing-utils.workspace = true reth-tracing.workspace = true assert_matches.workspace = true + +[features] +default = [] +rocksdb = ["reth-provider/rocksdb"] diff --git a/crates/prune/prune/src/builder.rs b/crates/prune/prune/src/builder.rs index 52b175c66a1..8edb9b55e15 100644 --- a/crates/prune/prune/src/builder.rs +++ b/crates/prune/prune/src/builder.rs @@ -4,6 +4,8 @@ use reth_config::PruneConfig; use reth_db_api::{table::Value, transaction::DbTxMut}; use reth_exex_types::FinishedExExHeight; use reth_primitives_traits::NodePrimitives; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::RocksDBProviderFactory; use reth_provider::{ providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider, DatabaseProviderFactory, NodePrimitivesProvider, PruneCheckpointReader, PruneCheckpointWriter, @@ -74,6 +76,7 @@ impl PrunerBuilder { } /// Builds a [Pruner] from the current configuration with the given provider factory. + #[cfg(all(unix, feature = "rocksdb"))] pub fn build_with_provider_factory(self, provider_factory: PF) -> Pruner where PF: DatabaseProviderFactory< @@ -85,6 +88,7 @@ impl PrunerBuilder { + StageCheckpointReader + ChangeSetReader + StorageChangeSetReader + + RocksDBProviderFactory + StaticFileProviderFactory< Primitives: NodePrimitives, >, @@ -105,7 +109,72 @@ impl PrunerBuilder { ) } + /// Builds a [Pruner] from the current configuration with the given provider factory. + #[cfg(not(all(unix, feature = "rocksdb")))] + pub fn build_with_provider_factory(self, provider_factory: PF) -> Pruner + where + PF: DatabaseProviderFactory< + ProviderRW: PruneCheckpointWriter + + PruneCheckpointReader + + BlockReader + + ChainStateBlockReader + + StorageSettingsCache + + StageCheckpointReader + + ChangeSetReader + + StorageChangeSetReader + + StaticFileProviderFactory< + Primitives: NodePrimitives, + >, + > + StaticFileProviderFactory< + Primitives = ::Primitives, + >, + { + let segments = + SegmentSet::from_components(provider_factory.static_file_provider(), self.segments); + + Pruner::new_with_factory( + provider_factory, + segments.into_vec(), + self.block_interval, + self.delete_limit, + self.timeout, + self.finished_exex_height, + ) + } + + /// Builds a [Pruner] from the current configuration with the given static file provider. + #[cfg(all(unix, feature = "rocksdb"))] + pub fn build( + self, + static_file_provider: StaticFileProvider, + ) -> Pruner + where + Provider: StaticFileProviderFactory< + Primitives: NodePrimitives, + > + DBProvider + + BlockReader + + ChainStateBlockReader + + PruneCheckpointWriter + + PruneCheckpointReader + + StorageSettingsCache + + StageCheckpointReader + + ChangeSetReader + + StorageChangeSetReader + + RocksDBProviderFactory, + { + let segments = SegmentSet::::from_components(static_file_provider, self.segments); + + Pruner::new( + segments.into_vec(), + self.block_interval, + self.delete_limit, + self.timeout, + self.finished_exex_height, + ) + } + /// Builds a [Pruner] from the current configuration with the given static file provider. + #[cfg(not(all(unix, feature = "rocksdb")))] pub fn build( self, static_file_provider: StaticFileProvider, diff --git a/crates/prune/prune/src/segments/set.rs b/crates/prune/prune/src/segments/set.rs index 3e56664f26d..62a5ec9c8b6 100644 --- a/crates/prune/prune/src/segments/set.rs +++ b/crates/prune/prune/src/segments/set.rs @@ -5,6 +5,8 @@ use crate::segments::{ use alloy_eips::eip2718::Encodable2718; use reth_db_api::{table::Value, transaction::DbTxMut}; use reth_primitives_traits::NodePrimitives; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::RocksDBProviderFactory; use reth_provider::{ providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, StorageSettingsCache, @@ -44,6 +46,59 @@ impl SegmentSet { } } +#[cfg(all(unix, feature = "rocksdb"))] +impl SegmentSet +where + Provider: StaticFileProviderFactory< + Primitives: NodePrimitives, + > + DBProvider + + PruneCheckpointWriter + + PruneCheckpointReader + + BlockReader + + ChainStateBlockReader + + StorageSettingsCache + + ChangeSetReader + + StorageChangeSetReader + + RocksDBProviderFactory, +{ + /// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and + /// [`PruneModes`]. + pub fn from_components( + _static_file_provider: StaticFileProvider, + prune_modes: PruneModes, + ) -> Self { + let PruneModes { + sender_recovery, + transaction_lookup, + receipts, + account_history, + storage_history, + bodies_history, + receipts_log_filter, + } = prune_modes; + + Self::default() + // Bodies - run first since file deletion is fast + .segment_opt(bodies_history.map(Bodies::new)) + // Account history + .segment_opt(account_history.map(AccountHistory::new)) + // Storage history + .segment_opt(storage_history.map(StorageHistory::new)) + // User receipts + .segment_opt(receipts.map(UserReceipts::new)) + // Receipts by logs + .segment_opt( + (!receipts_log_filter.is_empty()) + .then(|| ReceiptsByLogs::new(receipts_log_filter.clone())), + ) + // Transaction lookup + .segment_opt(transaction_lookup.map(TransactionLookup::new)) + // Sender recovery + .segment_opt(sender_recovery.map(SenderRecovery::new)) + } +} + +#[cfg(not(all(unix, feature = "rocksdb")))] impl SegmentSet where Provider: StaticFileProviderFactory< diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 9bdd26d1114..01c8dbe6a3b 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -1,3 +1,5 @@ +#[cfg(all(unix, feature = "rocksdb"))] +use crate::segments::user::history::finalize_account_history_prune_rocksdb; use crate::{ db_ext::DbTxPruneExt, segments::{ @@ -8,6 +10,8 @@ use crate::{ }; use alloy_primitives::BlockNumber; use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut}; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::RocksDBProviderFactory; use reth_provider::{ changeset_walker::StaticFileAccountChangesetWalker, DBProvider, EitherWriter, StaticFileProviderFactory, StorageSettingsCache, @@ -37,6 +41,48 @@ impl AccountHistory { } } +#[cfg(all(unix, feature = "rocksdb"))] +impl Segment for AccountHistory +where + Provider: DBProvider + + StaticFileProviderFactory + + StorageSettingsCache + + ChangeSetReader + + RocksDBProviderFactory, +{ + fn segment(&self) -> PruneSegment { + PruneSegment::AccountHistory + } + + fn mode(&self) -> Option { + Some(self.mode) + } + + fn purpose(&self) -> PrunePurpose { + PrunePurpose::User + } + + #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))] + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { + let range = match input.get_next_block_range() { + Some(range) => range, + None => { + trace!(target: "pruner", "No account history to prune"); + return Ok(SegmentOutput::done()) + } + }; + let range_end = *range.end(); + + // Check where account changesets are stored + if EitherWriter::account_changesets_destination(provider).is_static_file() { + self.prune_static_files(provider, input, range, range_end) + } else { + self.prune_database(provider, input, range, range_end) + } + } +} + +#[cfg(not(all(unix, feature = "rocksdb")))] impl Segment for AccountHistory where Provider: DBProvider @@ -78,6 +124,7 @@ where impl AccountHistory { /// Prunes account history when changesets are stored in static files. + #[cfg(all(unix, feature = "rocksdb"))] fn prune_static_files( &self, provider: &Provider, @@ -86,7 +133,11 @@ impl AccountHistory { range_end: BlockNumber, ) -> Result where - Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader, + Provider: DBProvider + + StaticFileProviderFactory + + ChangeSetReader + + StorageSettingsCache + + RocksDBProviderFactory, { let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) @@ -132,6 +183,81 @@ impl AccountHistory { } trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)"); + let result = HistoryPruneResult { + highest_deleted: highest_deleted_accounts, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + + // Check if account history is stored in RocksDB + if provider.cached_storage_settings().account_history_in_rocksdb { + finalize_account_history_prune_rocksdb(provider, result, range_end, &limiter) + .map_err(Into::into) + } else { + finalize_history_prune::<_, tables::AccountsHistory, _, _>( + provider, + result, + range_end, + &limiter, + ShardedKey::new, + |a, b| a.key == b.key, + ) + .map_err(Into::into) + } + } + + /// Prunes account history when changesets are stored in static files. + #[cfg(not(all(unix, feature = "rocksdb")))] + fn prune_static_files( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider + StaticFileProviderFactory + ChangeSetReader, + { + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + + let mut highest_deleted_accounts = FxHashMap::default(); + let mut last_changeset_pruned_block = None; + let mut pruned_changesets = 0; + let mut done = true; + + let walker = StaticFileAccountChangesetWalker::new(provider, range); + for result in walker { + if limiter.is_limit_reached() { + done = false; + break; + } + let (block_number, changeset) = result?; + highest_deleted_accounts.insert(changeset.address, block_number); + last_changeset_pruned_block = Some(block_number); + pruned_changesets += 1; + limiter.increment_deleted_entries_count(); + } + + // Delete static file jars below the pruned block + if let Some(last_block) = last_changeset_pruned_block { + provider + .static_file_provider() + .delete_segment_below_block(StaticFileSegment::AccountChangeSets, last_block + 1)?; + } + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from static files)"); + let result = HistoryPruneResult { highest_deleted: highest_deleted_accounts, last_pruned_block: last_changeset_pruned_block, @@ -149,6 +275,7 @@ impl AccountHistory { .map_err(Into::into) } + #[cfg(all(unix, feature = "rocksdb"))] fn prune_database( &self, provider: &Provider, @@ -157,7 +284,7 @@ impl AccountHistory { range_end: BlockNumber, ) -> Result where - Provider: DBProvider, + Provider: DBProvider + StorageSettingsCache + RocksDBProviderFactory, { let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) @@ -194,6 +321,68 @@ impl AccountHistory { )?; trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)"); + let result = HistoryPruneResult { + highest_deleted: highest_deleted_accounts, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + + // Check if account history is stored in RocksDB + if provider.cached_storage_settings().account_history_in_rocksdb { + finalize_account_history_prune_rocksdb(provider, result, range_end, &limiter) + .map_err(Into::into) + } else { + finalize_history_prune::<_, tables::AccountsHistory, _, _>( + provider, + result, + range_end, + &limiter, + ShardedKey::new, + |a, b| a.key == b.key, + ) + .map_err(Into::into) + } + } + + #[cfg(not(all(unix, feature = "rocksdb")))] + fn prune_database( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider, + { + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + + let mut last_changeset_pruned_block = None; + let mut highest_deleted_accounts = FxHashMap::default(); + let (pruned_changesets, done) = + provider.tx_ref().prune_table_with_range::( + range, + &mut limiter, + |_| false, + |(block_number, account)| { + highest_deleted_accounts.insert(account.address, block_number); + last_changeset_pruned_block = Some(block_number); + }, + )?; + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets from database)"); + let result = HistoryPruneResult { highest_deleted: highest_deleted_accounts, last_pruned_block: last_changeset_pruned_block, diff --git a/crates/prune/prune/src/segments/user/history.rs b/crates/prune/prune/src/segments/user/history.rs index d4e6ddcf78d..73e85d7ef96 100644 --- a/crates/prune/prune/src/segments/user/history.rs +++ b/crates/prune/prune/src/segments/user/history.rs @@ -1,5 +1,7 @@ use crate::PruneLimiter; use alloy_primitives::BlockNumber; +#[cfg(all(unix, feature = "rocksdb"))] +use alloy_primitives::{Address, B256}; use itertools::Itertools; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, @@ -9,6 +11,8 @@ use reth_db_api::{ BlockNumberList, DatabaseError, RawKey, RawTable, RawValue, }; use reth_provider::DBProvider; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::RocksDBProviderFactory; use reth_prune_types::{SegmentOutput, SegmentOutputCheckpoint}; use rustc_hash::FxHashMap; @@ -219,3 +223,87 @@ where } } } + +/// Finalizes account history pruning using `RocksDB` backend. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn finalize_account_history_prune_rocksdb( + provider: &Provider, + result: HistoryPruneResult
, + range_end: BlockNumber, + limiter: &PruneLimiter, +) -> Result +where + Provider: DBProvider + RocksDBProviderFactory, +{ + let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result; + + let last_changeset_pruned_block = last_pruned_block + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + let rocksdb = provider.rocksdb_provider(); + let mut batch = rocksdb.batch(); + let mut deleted = 0usize; + + for (address, block_number) in highest_deleted { + let to_block = block_number.min(last_changeset_pruned_block); + deleted += batch + .prune_account_history_up_to(address, to_block) + .map_err(|e| DatabaseError::Other(e.to_string()))?; + } + + batch.commit().map_err(|e| DatabaseError::Other(e.to_string()))?; + + let progress = limiter.progress(done); + + Ok(SegmentOutput { + progress, + pruned: pruned_count + deleted, + checkpoint: Some(SegmentOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) +} + +/// Finalizes storage history pruning using `RocksDB` backend. +#[cfg(all(unix, feature = "rocksdb"))] +pub(crate) fn finalize_storage_history_prune_rocksdb( + provider: &Provider, + result: HistoryPruneResult<(Address, B256)>, + range_end: BlockNumber, + limiter: &PruneLimiter, +) -> Result +where + Provider: DBProvider + RocksDBProviderFactory, +{ + let HistoryPruneResult { highest_deleted, last_pruned_block, pruned_count, done } = result; + + let last_changeset_pruned_block = last_pruned_block + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); + + let rocksdb = provider.rocksdb_provider(); + let mut batch = rocksdb.batch(); + let mut deleted = 0usize; + + for ((address, storage_key), block_number) in highest_deleted { + let to_block = block_number.min(last_changeset_pruned_block); + deleted += batch + .prune_storage_history_up_to(address, storage_key, to_block) + .map_err(|e| DatabaseError::Other(e.to_string()))?; + } + + batch.commit().map_err(|e| DatabaseError::Other(e.to_string()))?; + + let progress = limiter.progress(done); + + Ok(SegmentOutput { + progress, + pruned: pruned_count + deleted, + checkpoint: Some(SegmentOutputCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + }), + }) +} diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index 7abe709e11e..b25b19d2fac 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -1,3 +1,5 @@ +#[cfg(all(unix, feature = "rocksdb"))] +use crate::segments::user::history::finalize_storage_history_prune_rocksdb; use crate::{ db_ext::DbTxPruneExt, segments::{ @@ -12,6 +14,8 @@ use reth_db_api::{ tables, transaction::DbTxMut, }; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::RocksDBProviderFactory; use reth_provider::{DBProvider, EitherWriter, StaticFileProviderFactory}; use reth_prune_types::{ PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, @@ -38,6 +42,47 @@ impl StorageHistory { } } +#[cfg(all(unix, feature = "rocksdb"))] +impl Segment for StorageHistory +where + Provider: DBProvider + + StaticFileProviderFactory + + StorageChangeSetReader + + StorageSettingsCache + + RocksDBProviderFactory, +{ + fn segment(&self) -> PruneSegment { + PruneSegment::StorageHistory + } + + fn mode(&self) -> Option { + Some(self.mode) + } + + fn purpose(&self) -> PrunePurpose { + PrunePurpose::User + } + + #[instrument(target = "pruner", skip(self, provider), ret(level = "trace"))] + fn prune(&self, provider: &Provider, input: PruneInput) -> Result { + let range = match input.get_next_block_range() { + Some(range) => range, + None => { + trace!(target: "pruner", "No storage history to prune"); + return Ok(SegmentOutput::done()) + } + }; + let range_end = *range.end(); + + if EitherWriter::storage_changesets_destination(provider).is_static_file() { + self.prune_static_files(provider, input, range, range_end) + } else { + self.prune_database(provider, input, range, range_end) + } + } +} + +#[cfg(not(all(unix, feature = "rocksdb")))] impl Segment for StorageHistory where Provider: DBProvider @@ -78,6 +123,7 @@ where impl StorageHistory { /// Prunes storage history when changesets are stored in static files. + #[cfg(all(unix, feature = "rocksdb"))] fn prune_static_files( &self, provider: &Provider, @@ -86,7 +132,10 @@ impl StorageHistory { range_end: BlockNumber, ) -> Result where - Provider: DBProvider + StaticFileProviderFactory, + Provider: DBProvider + + StaticFileProviderFactory + + StorageSettingsCache + + RocksDBProviderFactory, { let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) @@ -134,6 +183,85 @@ impl StorageHistory { } trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)"); + let result = HistoryPruneResult { + highest_deleted: highest_deleted_storages, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + + // Check if storage history is stored in RocksDB + if provider.cached_storage_settings().storages_history_in_rocksdb { + finalize_storage_history_prune_rocksdb(provider, result, range_end, &limiter) + .map_err(Into::into) + } else { + finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>( + provider, + result, + range_end, + &limiter, + |(address, storage_key), block_number| { + StorageShardedKey::new(address, storage_key, block_number) + }, + |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, + ) + .map_err(Into::into) + } + } + + /// Prunes storage history when changesets are stored in static files. + #[cfg(not(all(unix, feature = "rocksdb")))] + fn prune_static_files( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider + StaticFileProviderFactory, + { + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + + let mut highest_deleted_storages = FxHashMap::default(); + let mut last_changeset_pruned_block = None; + let mut pruned_changesets = 0; + let mut done = true; + + let walker = provider.static_file_provider().walk_storage_changeset_range(range); + for result in walker { + if limiter.is_limit_reached() { + done = false; + break; + } + let (block_address, entry) = result?; + let block_number = block_address.block_number(); + let address = block_address.address(); + highest_deleted_storages.insert((address, entry.key), block_number); + last_changeset_pruned_block = Some(block_number); + pruned_changesets += 1; + limiter.increment_deleted_entries_count(); + } + + // Delete static file jars below the pruned block + if let Some(last_block) = last_changeset_pruned_block { + provider + .static_file_provider() + .delete_segment_below_block(StaticFileSegment::StorageChangeSets, last_block + 1)?; + } + trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned storage history (changesets from static files)"); + let result = HistoryPruneResult { highest_deleted: highest_deleted_storages, last_pruned_block: last_changeset_pruned_block, @@ -153,6 +281,7 @@ impl StorageHistory { .map_err(Into::into) } + #[cfg(all(unix, feature = "rocksdb"))] fn prune_database( &self, provider: &Provider, @@ -161,7 +290,7 @@ impl StorageHistory { range_end: BlockNumber, ) -> Result where - Provider: DBProvider, + Provider: DBProvider + StorageSettingsCache + RocksDBProviderFactory, { let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) @@ -198,6 +327,70 @@ impl StorageHistory { )?; trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)"); + let result = HistoryPruneResult { + highest_deleted: highest_deleted_storages, + last_pruned_block: last_changeset_pruned_block, + pruned_count: pruned_changesets, + done, + }; + + // Check if storage history is stored in RocksDB + if provider.cached_storage_settings().storages_history_in_rocksdb { + finalize_storage_history_prune_rocksdb(provider, result, range_end, &limiter) + .map_err(Into::into) + } else { + finalize_history_prune::<_, tables::StoragesHistory, (Address, B256), _>( + provider, + result, + range_end, + &limiter, + |(address, storage_key), block_number| { + StorageShardedKey::new(address, storage_key, block_number) + }, + |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, + ) + .map_err(Into::into) + } + } + + #[cfg(not(all(unix, feature = "rocksdb")))] + fn prune_database( + &self, + provider: &Provider, + input: PruneInput, + range: std::ops::RangeInclusive, + range_end: BlockNumber, + ) -> Result + where + Provider: DBProvider, + { + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + + if limiter.is_limit_reached() { + return Ok(SegmentOutput::not_done( + limiter.interrupt_reason(), + input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint), + )) + } + + let mut last_changeset_pruned_block = None; + let mut highest_deleted_storages = FxHashMap::default(); + let (pruned_changesets, done) = + provider.tx_ref().prune_table_with_range::( + BlockNumberAddress::range(range), + &mut limiter, + |_| false, + |(BlockNumberAddress((block_number, address)), entry)| { + highest_deleted_storages.insert((address, entry.key), block_number); + last_changeset_pruned_block = Some(block_number); + }, + )?; + trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)"); + let result = HistoryPruneResult { highest_deleted: highest_deleted_storages, last_pruned_block: last_changeset_pruned_block, diff --git a/crates/stages/stages/src/stages/prune.rs b/crates/stages/stages/src/stages/prune.rs index 98c9a578c64..769705733f8 100644 --- a/crates/stages/stages/src/stages/prune.rs +++ b/crates/stages/stages/src/stages/prune.rs @@ -1,5 +1,7 @@ use reth_db_api::{table::Value, transaction::DbTxMut}; use reth_primitives_traits::NodePrimitives; +#[cfg(all(unix, feature = "rocksdb"))] +use reth_provider::RocksDBProviderFactory; use reth_provider::{ BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter, StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache, @@ -37,6 +39,7 @@ impl PruneStage { } } +#[cfg(all(unix, feature = "rocksdb"))] impl Stage for PruneStage where Provider: DBProvider @@ -49,7 +52,8 @@ where Primitives: NodePrimitives, > + StorageSettingsCache + ChangeSetReader - + StorageChangeSetReader, + + StorageChangeSetReader + + RocksDBProviderFactory, { fn id(&self) -> StageId { StageId::Prune @@ -124,6 +128,89 @@ where } } +#[cfg(not(all(unix, feature = "rocksdb")))] +impl Stage for PruneStage +where + Provider: DBProvider + + PruneCheckpointReader + + PruneCheckpointWriter + + BlockReader + + ChainStateBlockReader + + StageCheckpointReader + + StaticFileProviderFactory< + Primitives: NodePrimitives, + > + StorageSettingsCache + + ChangeSetReader + + StorageChangeSetReader, +{ + fn id(&self) -> StageId { + StageId::Prune + } + + fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + let mut pruner = PrunerBuilder::default() + .segments(self.prune_modes.clone()) + .delete_limit(self.commit_threshold) + .build::(provider.static_file_provider()); + + let result = pruner.run_with_provider(provider, input.target())?; + if result.progress.is_finished() { + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) + } else { + if let Some((last_segment, last_segment_output)) = result.segments.last() { + match last_segment_output { + SegmentOutput { + progress, + pruned, + checkpoint: + checkpoint @ Some(SegmentOutputCheckpoint { block_number: Some(_), .. }), + } => { + info!( + target: "sync::stages::prune::exec", + ?last_segment, + ?progress, + ?pruned, + ?checkpoint, + "Last segment has more data to prune" + ) + } + SegmentOutput { progress, pruned, checkpoint: _ } => { + info!( + target: "sync::stages::prune::exec", + ?last_segment, + ?progress, + ?pruned, + "Last segment has more data to prune" + ) + } + } + } + Ok(ExecOutput { checkpoint: input.checkpoint(), done: false }) + } + } + + fn unwind( + &mut self, + provider: &Provider, + input: UnwindInput, + ) -> Result { + let prune_checkpoints = provider.get_prune_checkpoints()?; + let unwind_to_last_tx = + provider.block_body_indices(input.unwind_to)?.map(|i| i.last_tx_num()); + + for (segment, mut checkpoint) in prune_checkpoints { + if let Some(block) = checkpoint.block_number && + input.unwind_to < block + { + checkpoint.block_number = Some(input.unwind_to); + checkpoint.tx_number = unwind_to_last_tx; + provider.save_prune_checkpoint(segment, checkpoint)?; + } + } + Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) }) + } +} + /// The prune sender recovery stage that runs the pruner with the provided `PruneMode` for the /// `SenderRecovery` segment. /// @@ -144,6 +231,50 @@ impl PruneSenderRecoveryStage { } } +#[cfg(all(unix, feature = "rocksdb"))] +impl Stage for PruneSenderRecoveryStage +where + Provider: DBProvider + + PruneCheckpointReader + + PruneCheckpointWriter + + BlockReader + + ChainStateBlockReader + + StageCheckpointReader + + StaticFileProviderFactory< + Primitives: NodePrimitives, + > + StorageSettingsCache + + ChangeSetReader + + StorageChangeSetReader + + RocksDBProviderFactory, +{ + fn id(&self) -> StageId { + StageId::PruneSenderRecovery + } + + fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { + let mut result = self.0.execute(provider, input)?; + + if !result.done { + let checkpoint = provider + .get_prune_checkpoint(PruneSegment::SenderRecovery)? + .ok_or(StageError::MissingPruneCheckpoint(PruneSegment::SenderRecovery))?; + + result.checkpoint = StageCheckpoint::new(checkpoint.block_number.unwrap_or_default()); + } + + Ok(result) + } + + fn unwind( + &mut self, + provider: &Provider, + input: UnwindInput, + ) -> Result { + self.0.unwind(provider, input) + } +} + +#[cfg(not(all(unix, feature = "rocksdb")))] impl Stage for PruneSenderRecoveryStage where Provider: DBProvider @@ -165,14 +296,11 @@ where fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result { let mut result = self.0.execute(provider, input)?; - // Adjust the checkpoint to the highest pruned block number of the Sender Recovery segment if !result.done { let checkpoint = provider .get_prune_checkpoint(PruneSegment::SenderRecovery)? .ok_or(StageError::MissingPruneCheckpoint(PruneSegment::SenderRecovery))?; - // `unwrap_or_default` is safe because we know that genesis block doesn't have any - // transactions and senders result.checkpoint = StageCheckpoint::new(checkpoint.block_number.unwrap_or_default()); } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index a0e10cf47f4..a4577cf69be 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1723,6 +1723,114 @@ impl<'a> RocksDBBatch<'a> { } Ok(()) } + + /// Prunes account history for the given address, removing all blocks `<= to_block`. + /// + /// Returns the number of shards deleted. + pub fn prune_account_history_up_to( + &mut self, + address: Address, + to_block: BlockNumber, + ) -> ProviderResult { + let shards = self.provider.account_history_shards(address)?; + if shards.is_empty() { + return Ok(0); + } + + let mut deleted = 0; + + // Find the first shard that might contain blocks > to_block + let boundary_idx = shards.iter().position(|(key, list)| { + key.highest_block_number > to_block || + key.highest_block_number == u64::MAX || + list.iter().any(|b| b > to_block) + }); + + // No boundary found means all shards are entirely <= to_block + let Some(boundary_idx) = boundary_idx else { + for (key, _) in &shards { + self.delete::(key.clone())?; + deleted += 1; + } + return Ok(deleted); + }; + + // Delete all shards before the boundary + for (key, _) in shards.iter().take(boundary_idx) { + self.delete::(key.clone())?; + deleted += 1; + } + + // Process the boundary shard + let (boundary_key, boundary_list) = &shards[boundary_idx]; + let remaining_blocks: Vec<_> = boundary_list.iter().filter(|&b| b > to_block).collect(); + + if remaining_blocks.is_empty() { + self.delete::(boundary_key.clone())?; + deleted += 1; + } else if remaining_blocks.len() != boundary_list.len() as usize { + self.delete::(boundary_key.clone())?; + let new_list = BlockNumberList::new_pre_sorted(remaining_blocks); + self.put::(boundary_key.clone(), &new_list)?; + } + + Ok(deleted) + } + + /// Prunes storage history for the given `(address, storage_key)` pair, + /// removing all blocks `<= to_block`. + /// + /// Returns the number of shards deleted. + pub fn prune_storage_history_up_to( + &mut self, + address: Address, + storage_key: B256, + to_block: BlockNumber, + ) -> ProviderResult { + let shards = self.provider.storage_history_shards(address, storage_key)?; + if shards.is_empty() { + return Ok(0); + } + + let mut deleted = 0; + + // Find the first shard that might contain blocks > to_block + let boundary_idx = shards.iter().position(|(key, list)| { + key.sharded_key.highest_block_number > to_block || + key.sharded_key.highest_block_number == u64::MAX || + list.iter().any(|b| b > to_block) + }); + + // No boundary found means all shards are entirely <= to_block + let Some(boundary_idx) = boundary_idx else { + for (key, _) in &shards { + self.delete::(key.clone())?; + deleted += 1; + } + return Ok(deleted); + }; + + // Delete all shards before the boundary + for (key, _) in shards.iter().take(boundary_idx) { + self.delete::(key.clone())?; + deleted += 1; + } + + // Process the boundary shard + let (boundary_key, boundary_list) = &shards[boundary_idx]; + let remaining_blocks: Vec<_> = boundary_list.iter().filter(|&b| b > to_block).collect(); + + if remaining_blocks.is_empty() { + self.delete::(boundary_key.clone())?; + deleted += 1; + } else if remaining_blocks.len() != boundary_list.len() as usize { + self.delete::(boundary_key.clone())?; + let new_list = BlockNumberList::new_pre_sorted(remaining_blocks); + self.put::(boundary_key.clone(), &new_list)?; + } + + Ok(deleted) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics.