From c6a5cab6fb7139be3fffe227e5e2891e3f8cf661 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 13:08:28 +0100 Subject: [PATCH 1/9] fix: pin RocksDB snapshot at provider creation for cross-store consistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HistoricalStateProvider takes its MDBX read tx at creation time (MVCC snapshot) but previously took a fresh RocksDB snapshot on every account_history_lookup/storage_history_lookup call. During aggressive reorgs, RemoveBlocksAbove commits MDBX then RocksDB sequentially — a reader with an older MDBX tx could see unwound RocksDB indices, causing InPlainState fallback to return state at the wrong tip. Fix: capture both MDBX read tx and RocksDB snapshot together in ProviderFactory::provider() with a retry loop that verifies no write commit landed between the two (via last_txnid check). The pinned snapshot is threaded through DatabaseProvider → HistoricalStateProvider and used for all history lookups instead of per-query snapshots. This only affects storage_v2 where history indices live in RocksDB. With storage_v1, history indices are in MDBX and share the read tx's MVCC snapshot automatically. Co-authored-by: Amp Amp-Thread-ID: https://ampcode.com/threads/T-019d4393-7938-71ab-a3fd-0132444583bd --- .../provider/src/providers/database/mod.rs | 36 +++- .../src/providers/database/provider.rs | 31 +++- .../provider/src/providers/rocksdb/mod.rs | 7 +- .../src/providers/rocksdb/provider.rs | 172 ++++++++++++++++++ .../src/providers/state/historical.rs | 59 +++++- 5 files changed, 288 insertions(+), 17 deletions(-) diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index c2ca29c36ba..ac096e7499d 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -247,8 +247,32 @@ impl ProviderFactory { /// data. #[track_caller] pub fn provider(&self) -> ProviderResult> { - Ok(DatabaseProvider::new( - self.db.tx()?, + let storage_v2 = self.storage_settings.read().storage_v2; + + // For storage_v2, we need consistent MDBX + RocksDB snapshots. + // Take both and verify no write commit landed between them. + let (tx, pinned_snapshot) = if storage_v2 { + loop { + let txnid_before = self.db.last_txnid(); + let tx = self.db.tx()?; + let snapshot = Arc::new(self.rocksdb_provider.owned_snapshot()); + let txnid_after = self.db.last_txnid(); + if txnid_before == txnid_after { + break (tx, Some(snapshot)); + } + // A write commit landed between the two snapshots — retry. + // This is extremely rare (nanosecond window). + tracing::debug!( + target: "providers::db", + "MDBX write commit detected between read tx and RocksDB snapshot, retrying" + ); + } + } else { + (self.db.tx()?, None) + }; + + let mut provider = DatabaseProvider::new( + tx, self.chain_spec.clone(), self.static_file_provider.clone(), self.prune_modes.clone(), @@ -259,7 +283,13 @@ impl ProviderFactory { self.runtime.clone(), self.db.path(), ) - .with_minimum_pruning_distance(self.minimum_pruning_distance)) + .with_minimum_pruning_distance(self.minimum_pruning_distance); + + if let Some(snapshot) = pinned_snapshot { + provider = provider.with_pinned_rocksdb_snapshot(snapshot); + } + + Ok(provider) } /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 1ed65f86432..97e1cf8467d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2,7 +2,9 @@ use crate::{ changesets_utils::StorageRevertsIter, providers::{ database::{chain::ChainStorage, metrics}, - rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx}, + rocksdb::{ + OwnedRocksReadSnapshot, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx, + }, static_file::{StaticFileWriteCtx, StaticFileWriter}, NodeTypesForProvider, StaticFileProvider, }, @@ -212,6 +214,10 @@ pub struct DatabaseProvider { minimum_pruning_distance: u64, /// Database provider metrics metrics: metrics::DatabaseProviderMetrics, + /// Pinned RocksDB snapshot for cross-store read consistency. + /// When set, `HistoricalStateProvider` uses this instead of creating per-query snapshots. + /// This ensures the RocksDB view is consistent with the MDBX read transaction. + pinned_rocksdb_snapshot: Option>, } impl Debug for DatabaseProvider { @@ -229,6 +235,7 @@ impl Debug for DatabaseProvider { .field("pending_rocksdb_batches", &"") .field("commit_order", &self.commit_order) .field("minimum_pruning_distance", &self.minimum_pruning_distance) + .field("pinned_rocksdb_snapshot", &self.pinned_rocksdb_snapshot) .finish() } } @@ -244,6 +251,17 @@ impl DatabaseProvider { self.minimum_pruning_distance = distance; self } + + /// Sets the pinned RocksDB snapshot for cross-store read consistency. + pub fn with_pinned_rocksdb_snapshot(mut self, snapshot: Arc) -> Self { + self.pinned_rocksdb_snapshot = Some(snapshot); + self + } + + /// Returns the pinned RocksDB snapshot, if set. + pub fn pinned_rocksdb_snapshot(&self) -> Option<&Arc> { + self.pinned_rocksdb_snapshot.as_ref() + } } impl DatabaseProvider { @@ -276,6 +294,10 @@ impl DatabaseProvider { let mut state_provider = HistoricalStateProviderRef::new(self, block_number); + if let Some(snapshot) = &self.pinned_rocksdb_snapshot { + state_provider.pinned_rocksdb_snapshot = Some(snapshot); + } + // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment. if let Some(prune_checkpoint_block_number) = @@ -382,6 +404,7 @@ impl DatabaseProvider { commit_order, minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE, metrics: metrics::DatabaseProviderMetrics::default(), + pinned_rocksdb_snapshot: None, } } @@ -890,9 +913,14 @@ impl TryIntoHistoricalStateProvider for Databa self.get_prune_checkpoint(PruneSegment::AccountHistory)?; let storage_history_prune_checkpoint = self.get_prune_checkpoint(PruneSegment::StorageHistory)?; + let pinned_snapshot = self.pinned_rocksdb_snapshot.clone(); let mut state_provider = HistoricalStateProvider::new(self, block_number); + if let Some(snapshot) = pinned_snapshot { + state_provider = state_provider.with_pinned_rocksdb_snapshot(snapshot); + } + // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment. if let Some(prune_checkpoint_block_number) = @@ -1007,6 +1035,7 @@ impl DatabaseProvider { commit_order: CommitOrder::Normal, minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE, metrics: metrics::DatabaseProviderMetrics::default(), + pinned_rocksdb_snapshot: None, } } diff --git a/crates/storage/provider/src/providers/rocksdb/mod.rs b/crates/storage/provider/src/providers/rocksdb/mod.rs index 46bb9724a5c..7a5c4f93ea8 100644 --- a/crates/storage/provider/src/providers/rocksdb/mod.rs +++ b/crates/storage/provider/src/providers/rocksdb/mod.rs @@ -4,8 +4,9 @@ mod invariants; mod metrics; mod provider; -pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; pub use provider::{ - PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, - RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksReadSnapshot, RocksTx, + OwnedRocksReadSnapshot, PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, + RocksDBIter, RocksDBProvider, RocksDBRawIter, RocksDBStats, RocksDBTableStats, + RocksReadSnapshot, RocksTx, }; +pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index a938a0c4424..63838d1f840 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1550,6 +1550,178 @@ impl<'db> RocksReadSnapshot<'db> { } } +/// An owned RocksDB read snapshot that keeps the provider alive. +/// +/// This allows pinning a RocksDB snapshot at creation time and reusing it +/// for all subsequent reads, ensuring consistency with an MDBX read transaction +/// opened at the same point in time. +/// +/// # Safety +/// +/// `RocksDBProvider` wraps `Arc`, so the inner DB is +/// heap-allocated and stable. The snapshot borrows from the DB, which won't +/// move or be dropped while the Arc is held. Rust drops fields in declaration +/// order, so `inner` (the snapshot) is dropped before `_provider` (the Arc). +pub struct OwnedRocksReadSnapshot { + // IMPORTANT: `inner` must be declared before `_provider` so it drops first. + inner: RocksReadSnapshotInner<'static>, + _provider: RocksDBProvider, +} + +impl fmt::Debug for OwnedRocksReadSnapshot { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedRocksReadSnapshot").finish_non_exhaustive() + } +} + +// SAFETY: RocksDB snapshots are thread-safe — the underlying C++ SnapshotImpl is +// reference-counted and immutable. RocksDBProvider is Arc-based and already Send+Sync. +unsafe impl Send for OwnedRocksReadSnapshot {} +unsafe impl Sync for OwnedRocksReadSnapshot {} + +impl RocksDBProvider { + /// Creates an owned, pinned read snapshot. + /// + /// The returned snapshot keeps the provider alive and can be stored alongside + /// an MDBX read transaction to ensure cross-store consistency. + pub fn owned_snapshot(&self) -> OwnedRocksReadSnapshot { + // SAFETY: The snapshot borrows from the DB inside Arc. + // We clone the Arc (cheap) to keep it alive. The snapshot's actual lifetime + // is tied to the Arc, which we guarantee by storing both in the struct + // (with snapshot dropped first due to field order). + let snapshot = self.0.snapshot(); + let inner = unsafe { + std::mem::transmute::, RocksReadSnapshotInner<'static>>( + snapshot, + ) + }; + OwnedRocksReadSnapshot { inner, _provider: self.clone() } + } +} + +impl OwnedRocksReadSnapshot { + /// Gets the column family handle for a table. + fn cf_handle(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> { + self._provider.get_cf_handle::() + } + + /// Lookup account history and return [`HistoryInfo`] directly. + pub fn account_history_info( + &self, + address: Address, + block_number: BlockNumber, + lowest_available_block_number: Option, + ) -> ProviderResult { + let key = ShardedKey::new(address, block_number); + self.history_info::( + key.encode().as_ref(), + block_number, + lowest_available_block_number, + |key_bytes| Ok( as Decode>::decode(key_bytes)?.key == address), + |prev_bytes| { + as Decode>::decode(prev_bytes) + .map(|k| k.key == address) + .unwrap_or(false) + }, + ) + } + + /// Lookup storage history and return [`HistoryInfo`] directly. + pub fn storage_history_info( + &self, + address: Address, + storage_key: B256, + block_number: BlockNumber, + lowest_available_block_number: Option, + ) -> ProviderResult { + let key = StorageShardedKey::new(address, storage_key, block_number); + self.history_info::( + key.encode().as_ref(), + block_number, + lowest_available_block_number, + |key_bytes| { + let k = ::decode(key_bytes)?; + Ok(k.address == address && k.sharded_key.key == storage_key) + }, + |prev_bytes| { + ::decode(prev_bytes) + .map(|k| k.address == address && k.sharded_key.key == storage_key) + .unwrap_or(false) + }, + ) + } + + /// Generic history lookup using the snapshot's raw iterator. + fn history_info( + &self, + encoded_key: &[u8], + block_number: BlockNumber, + lowest_available_block_number: Option, + key_matches: impl FnOnce(&[u8]) -> Result, + prev_key_matches: impl Fn(&[u8]) -> bool, + ) -> ProviderResult + where + T: Table, + { + let is_maybe_pruned = lowest_available_block_number.is_some(); + let fallback = || { + Ok(if is_maybe_pruned { + HistoryInfo::MaybeInPlainState + } else { + HistoryInfo::NotYetWritten + }) + }; + + let cf = self.cf_handle::()?; + let mut iter = self.inner.raw_iterator_cf(cf); + + iter.seek(encoded_key); + iter.status().map_err(|e| { + ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + })?; + + if !iter.valid() { + return fallback(); + } + + let Some(key_bytes) = iter.key() else { + return fallback(); + }; + if !key_matches(key_bytes)? { + return fallback(); + } + + let Some(value_bytes) = iter.value() else { + return fallback(); + }; + let chunk = BlockNumberList::decompress(value_bytes)?; + let (rank, found_block) = compute_history_rank(&chunk, block_number); + + let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) { + iter.prev(); + iter.status().map_err(|e| { + ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })) + })?; + let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches); + !has_prev + } else { + false + }; + + Ok(HistoryInfo::from_lookup( + found_block, + is_before_first_write, + lowest_available_block_number, + )) + } +} + /// Outcome of pruning a history shard in `RocksDB`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PruneShardOutcome { diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index ee2a85fae60..30e53614d53 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -32,7 +32,8 @@ use reth_trie_db::{ DatabaseStorageProof, DatabaseStorageRoot, }; -use std::fmt::Debug; +use crate::providers::rocksdb::OwnedRocksReadSnapshot; +use std::{fmt::Debug, sync::Arc}; type DbStateRoot<'a, TX, A> = StateRoot< reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>, @@ -127,6 +128,8 @@ pub struct HistoricalStateProviderRef<'b, Provider> { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, + /// Pinned RocksDB snapshot for cross-store read consistency. + pub(crate) pinned_rocksdb_snapshot: Option<&'b Arc>, } impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumReader> @@ -134,7 +137,12 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block { /// Create new `StateProvider` for historical block number pub fn new(provider: &'b Provider, block_number: BlockNumber) -> Self { - Self { provider, block_number, lowest_available_blocks: Default::default() } + Self { + provider, + block_number, + lowest_available_blocks: Default::default(), + pinned_rocksdb_snapshot: None, + } } /// Create new `StateProvider` for historical block number and lowest block numbers at which @@ -144,7 +152,7 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block block_number: BlockNumber, lowest_available_blocks: LowestAvailableBlocks, ) -> Self { - Self { provider, block_number, lowest_available_blocks } + Self { provider, block_number, lowest_available_blocks, pinned_rocksdb_snapshot: None } } /// Lookup an account in the `AccountsHistory` table using `EitherReader`. @@ -156,6 +164,14 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } + if let Some(snapshot) = &self.pinned_rocksdb_snapshot { + return snapshot.account_history_info( + address, + self.block_number, + self.lowest_available_blocks.account_history_block_number, + ); + } + self.provider.with_rocksdb_snapshot(|rocksdb_ref| { let mut reader = EitherReader::new_accounts_history(self.provider, rocksdb_ref)?; reader.account_history_info( @@ -181,6 +197,15 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } + if let Some(snapshot) = &self.pinned_rocksdb_snapshot { + return snapshot.storage_history_info( + address, + lookup_key, + self.block_number, + self.lowest_available_blocks.storage_history_block_number, + ); + } + self.provider.with_rocksdb_snapshot(|rocksdb_ref| { let mut reader = EitherReader::new_storages_history(self.provider, rocksdb_ref)?; reader.storage_history_info( @@ -589,6 +614,8 @@ pub struct HistoricalStateProvider { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, + /// Pinned RocksDB snapshot for cross-store read consistency. + pinned_rocksdb_snapshot: Option>, } impl @@ -596,7 +623,18 @@ impl Self { - Self { provider, block_number, lowest_available_blocks: Default::default() } + Self { + provider, + block_number, + lowest_available_blocks: Default::default(), + pinned_rocksdb_snapshot: None, + } + } + + /// Sets the pinned RocksDB snapshot for cross-store read consistency. + pub fn with_pinned_rocksdb_snapshot(mut self, snapshot: Arc) -> Self { + self.pinned_rocksdb_snapshot = Some(snapshot); + self } /// Set the lowest block number at which the account history is available. @@ -619,12 +657,13 @@ impl HistoricalStateProviderRef<'_, Provider> { - HistoricalStateProviderRef::new_with_lowest_available_blocks( - &self.provider, - self.block_number, - self.lowest_available_blocks, - ) + fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> { + HistoricalStateProviderRef { + provider: &self.provider, + block_number: self.block_number, + lowest_available_blocks: self.lowest_available_blocks, + pinned_rocksdb_snapshot: self.pinned_rocksdb_snapshot.as_ref(), + } } } From 9777d8c88dd1fcade3bf24a430bdd07e087625bf Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 15:22:23 +0100 Subject: [PATCH 2/9] refactor(storage): reuse pinned RocksDB snapshot path Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- crates/storage/provider/src/either_writer.rs | 10 +- .../provider/src/providers/database/mod.rs | 92 ++++--- .../src/providers/database/provider.rs | 40 ++-- .../provider/src/providers/rocksdb/mod.rs | 7 +- .../src/providers/rocksdb/provider.rs | 226 ++++-------------- .../src/providers/state/historical.rs | 47 ++-- .../provider/src/traits/rocksdb_provider.rs | 2 +- 7 files changed, 165 insertions(+), 259 deletions(-) diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 7e5c9765def..bb5d871d417 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -72,7 +72,7 @@ pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction; /// /// The `Option` allows callers to skip `RocksDB` access when it isn't needed /// (e.g., on legacy MDBX-only nodes). -pub type RocksDBRefArg<'a> = Option>; +pub type RocksDBRefArg = Option; /// Represents a destination for writing data, either to database, static files, or `RocksDB`. #[derive(Debug, Display)] @@ -673,7 +673,7 @@ pub enum EitherReader<'a, CURSOR, N> { /// Read from static file StaticFile(StaticFileProvider, PhantomData<&'a ()>), /// Read from `RocksDB` snapshot (works in both read-only and read-write modes) - RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>), + RocksDB(crate::providers::rocksdb::RocksReadSnapshot), } impl<'a> EitherReader<'a, (), ()> { @@ -698,7 +698,7 @@ impl<'a> EitherReader<'a, (), ()> { /// Creates a new [`EitherReader`] for storages history based on storage settings. pub fn new_storages_history

( provider: &P, - rocksdb: RocksDBRefArg<'a>, + rocksdb: RocksDBRefArg, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -719,7 +719,7 @@ impl<'a> EitherReader<'a, (), ()> { /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings. pub fn new_transaction_hash_numbers

( provider: &P, - rocksdb: RocksDBRefArg<'a>, + rocksdb: RocksDBRefArg, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -740,7 +740,7 @@ impl<'a> EitherReader<'a, (), ()> { /// Creates a new [`EitherReader`] for account history based on storage settings. pub fn new_accounts_history

( provider: &P, - rocksdb: RocksDBRefArg<'a>, + rocksdb: RocksDBRefArg, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index ac096e7499d..3275f8c201f 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -1,7 +1,7 @@ use crate::{ providers::{ state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider, - StaticFileProvider, StaticFileProviderRWRefMut, + RocksReadSnapshot, StaticFileProvider, StaticFileProviderRWRefMut, }, to_range, traits::{BlockSource, ReceiptProvider}, @@ -239,6 +239,29 @@ impl> ProviderFactory { } impl ProviderFactory { + fn provider_read_view( + &self, + ) -> ProviderResult<(::TX, Option)> { + if !self.cached_storage_settings().storage_v2 { + return Ok((self.db.tx()?, None)); + } + + loop { + let txnid_before = self.db.last_txnid(); + let tx = self.db.tx()?; + let snapshot = self.rocksdb_provider.snapshot(); + + if self.db.last_txnid() == txnid_before { + return Ok((tx, Some(snapshot))); + } + + tracing::debug!( + target: "providers::db", + "MDBX write commit detected while pinning RocksDB snapshot, retrying" + ); + } + } + /// Returns a provider with a created `DbTx` inside, which allows fetching data from the /// database using different types of providers. Example: [`HeaderProvider`] /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. @@ -247,29 +270,7 @@ impl ProviderFactory { /// data. #[track_caller] pub fn provider(&self) -> ProviderResult> { - let storage_v2 = self.storage_settings.read().storage_v2; - - // For storage_v2, we need consistent MDBX + RocksDB snapshots. - // Take both and verify no write commit landed between them. - let (tx, pinned_snapshot) = if storage_v2 { - loop { - let txnid_before = self.db.last_txnid(); - let tx = self.db.tx()?; - let snapshot = Arc::new(self.rocksdb_provider.owned_snapshot()); - let txnid_after = self.db.last_txnid(); - if txnid_before == txnid_after { - break (tx, Some(snapshot)); - } - // A write commit landed between the two snapshots — retry. - // This is extremely rare (nanosecond window). - tracing::debug!( - target: "providers::db", - "MDBX write commit detected between read tx and RocksDB snapshot, retrying" - ); - } - } else { - (self.db.tx()?, None) - }; + let (tx, pinned_rocksdb_snapshot) = self.provider_read_view()?; let mut provider = DatabaseProvider::new( tx, @@ -285,7 +286,7 @@ impl ProviderFactory { ) .with_minimum_pruning_distance(self.minimum_pruning_distance); - if let Some(snapshot) = pinned_snapshot { + if let Some(snapshot) = pinned_rocksdb_snapshot { provider = provider.with_pinned_rocksdb_snapshot(snapshot); } @@ -887,21 +888,22 @@ impl Clone for ProviderFactory { mod tests { use super::*; use crate::{ - providers::{StaticFileProvider, StaticFileWriter}, + providers::{HistoryInfo, StaticFileProvider, StaticFileWriter}, test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB}, BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider, - TransactionsProvider, + RocksDBProviderFactory, TransactionsProvider, }; - use alloy_primitives::{TxNumber, B256}; + use alloy_primitives::{Address, TxNumber, B256}; use assert_matches::assert_matches; use reth_chainspec::ChainSpecBuilder; use reth_db::{ mdbx::DatabaseArguments, test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR}, }; - use reth_db_api::tables; + use reth_db_api::{models::StorageSettings, tables}; use reth_primitives_traits::SignerRecoverable; use reth_prune_types::{PruneMode, PruneModes}; + use reth_storage_api::StorageSettingsCache; use reth_storage_errors::provider::ProviderError; use reth_testing_utils::generators::{self, random_block, random_header, BlockParams}; use std::{ops::RangeInclusive, sync::Arc}; @@ -932,6 +934,38 @@ mod tests { provider.block_hash(0).unwrap(); } + #[test] + fn provider_pins_rocksdb_snapshot_for_storage_v2() { + let factory = create_test_provider_factory(); + factory.set_storage_settings_cache(StorageSettings::v2()); + + let address = Address::from([0x42; 20]); + let rocksdb = factory.rocksdb_provider(); + + let mut batch = rocksdb.batch(); + batch.append_account_history_shard(address, vec![100]).unwrap(); + batch.commit().unwrap(); + + let provider = factory.provider().unwrap(); + + let mut batch = rocksdb.batch(); + batch.append_account_history_shard(address, vec![200]).unwrap(); + batch.commit().unwrap(); + + assert_eq!( + provider + .pinned_rocksdb_snapshot() + .expect("storage_v2 providers pin a RocksDB snapshot") + .account_history_info(address, 150, None) + .unwrap(), + HistoryInfo::InPlainState + ); + assert_eq!( + rocksdb.snapshot().account_history_info(address, 150, None).unwrap(), + HistoryInfo::InChangeset(200) + ); + } + #[test] fn provider_factory_with_database_path() { let chain_spec = ChainSpecBuilder::mainnet().build(); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 97e1cf8467d..ffdb49e059f 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2,9 +2,7 @@ use crate::{ changesets_utils::StorageRevertsIter, providers::{ database::{chain::ChainStorage, metrics}, - rocksdb::{ - OwnedRocksReadSnapshot, PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx, - }, + rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx, RocksReadSnapshot}, static_file::{StaticFileWriteCtx, StaticFileWriter}, NodeTypesForProvider, StaticFileProvider, }, @@ -214,10 +212,8 @@ pub struct DatabaseProvider { minimum_pruning_distance: u64, /// Database provider metrics metrics: metrics::DatabaseProviderMetrics, - /// Pinned RocksDB snapshot for cross-store read consistency. - /// When set, `HistoricalStateProvider` uses this instead of creating per-query snapshots. - /// This ensures the RocksDB view is consistent with the MDBX read transaction. - pinned_rocksdb_snapshot: Option>, + /// Pinned RocksDB snapshot taken alongside the MDBX read transaction. + pinned_rocksdb_snapshot: Option, } impl Debug for DatabaseProvider { @@ -252,16 +248,18 @@ impl DatabaseProvider { self } - /// Sets the pinned RocksDB snapshot for cross-store read consistency. - pub fn with_pinned_rocksdb_snapshot(mut self, snapshot: Arc) -> Self { + pub(crate) fn with_pinned_rocksdb_snapshot(mut self, snapshot: RocksReadSnapshot) -> Self { self.pinned_rocksdb_snapshot = Some(snapshot); self } - /// Returns the pinned RocksDB snapshot, if set. - pub fn pinned_rocksdb_snapshot(&self) -> Option<&Arc> { + pub(crate) fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { self.pinned_rocksdb_snapshot.as_ref() } + + fn take_pinned_rocksdb_snapshot(&mut self) -> Option { + self.pinned_rocksdb_snapshot.take() + } } impl DatabaseProvider { @@ -293,9 +291,8 @@ impl DatabaseProvider { self.get_prune_checkpoint(PruneSegment::StorageHistory)?; let mut state_provider = HistoricalStateProviderRef::new(self, block_number); - - if let Some(snapshot) = &self.pinned_rocksdb_snapshot { - state_provider.pinned_rocksdb_snapshot = Some(snapshot); + if let Some(snapshot) = self.pinned_rocksdb_snapshot() { + state_provider = state_provider.with_pinned_rocksdb_snapshot(snapshot); } // If we pruned account or storage history, we can't return state on every historical block. @@ -891,7 +888,8 @@ impl TryIntoHistoricalStateProvider for Databa self, mut block_number: BlockNumber, ) -> ProviderResult { - let best_block = self.best_block_number().unwrap_or_default(); + let mut provider = self; + let best_block = provider.best_block_number().unwrap_or_default(); // Reject requests for blocks beyond the best block if block_number > best_block { @@ -903,21 +901,21 @@ impl TryIntoHistoricalStateProvider for Databa // If requesting state at the best block, use the latest state provider if block_number == best_block { - return Ok(Box::new(LatestStateProvider::new(self))); + return Ok(Box::new(LatestStateProvider::new(provider))); } // +1 as the changeset that we want is the one that was applied after this block. block_number += 1; let account_history_prune_checkpoint = - self.get_prune_checkpoint(PruneSegment::AccountHistory)?; + provider.get_prune_checkpoint(PruneSegment::AccountHistory)?; let storage_history_prune_checkpoint = - self.get_prune_checkpoint(PruneSegment::StorageHistory)?; - let pinned_snapshot = self.pinned_rocksdb_snapshot.clone(); + provider.get_prune_checkpoint(PruneSegment::StorageHistory)?; - let mut state_provider = HistoricalStateProvider::new(self, block_number); + let pinned_rocksdb_snapshot = provider.take_pinned_rocksdb_snapshot(); - if let Some(snapshot) = pinned_snapshot { + let mut state_provider = HistoricalStateProvider::new(provider, block_number); + if let Some(snapshot) = pinned_rocksdb_snapshot { state_provider = state_provider.with_pinned_rocksdb_snapshot(snapshot); } diff --git a/crates/storage/provider/src/providers/rocksdb/mod.rs b/crates/storage/provider/src/providers/rocksdb/mod.rs index 7a5c4f93ea8..46bb9724a5c 100644 --- a/crates/storage/provider/src/providers/rocksdb/mod.rs +++ b/crates/storage/provider/src/providers/rocksdb/mod.rs @@ -4,9 +4,8 @@ mod invariants; mod metrics; mod provider; +pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; pub use provider::{ - OwnedRocksReadSnapshot, PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, - RocksDBIter, RocksDBProvider, RocksDBRawIter, RocksDBStats, RocksDBTableStats, - RocksReadSnapshot, RocksTx, + PruneShardOutcome, PrunedIndices, RocksDBBatch, RocksDBBuilder, RocksDBIter, RocksDBProvider, + RocksDBRawIter, RocksDBStats, RocksDBTableStats, RocksReadSnapshot, RocksTx, }; -pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx}; diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 63838d1f840..c8836d4cc9f 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -704,8 +704,17 @@ impl RocksDBProvider { /// Returns a read-only, point-in-time snapshot of the database. /// /// Lighter weight than [`RocksTx`] — no write-conflict tracking, and `Send + Sync`. - pub fn snapshot(&self) -> RocksReadSnapshot<'_> { - RocksReadSnapshot { inner: self.0.snapshot(), provider: self } + pub fn snapshot(&self) -> RocksReadSnapshot { + // SAFETY: the snapshot borrows the underlying RocksDB handle stored inside the provider's + // `Arc`. Cloning the provider keeps that allocation alive for the lifetime of the snapshot, + // and `RocksReadSnapshot` drops `inner` before `provider`. + let inner = unsafe { + std::mem::transmute::, RocksReadSnapshotInner<'static>>( + self.0.snapshot(), + ) + }; + + RocksReadSnapshot { inner, provider: self.clone() } } /// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback). @@ -1378,9 +1387,11 @@ impl RocksDBProvider { /// used by [`EitherReader::RocksDB`](crate::either_writer::EitherReader) for history lookups. /// /// Lighter weight than [`RocksTx`] — no transaction overhead, no write support. -pub struct RocksReadSnapshot<'db> { - inner: RocksReadSnapshotInner<'db>, - provider: &'db RocksDBProvider, +pub struct RocksReadSnapshot { + // `inner` must drop before `provider` so the borrowed RocksDB snapshot is released before the + // cloned provider keeping the DB alive goes away. + inner: RocksReadSnapshotInner<'static>, + provider: RocksDBProvider, } /// Inner enum to hold the snapshot for either read-write or read-only mode. @@ -1401,7 +1412,7 @@ impl<'db> RocksReadSnapshotInner<'db> { } } -impl fmt::Debug for RocksReadSnapshot<'_> { +impl fmt::Debug for RocksReadSnapshot { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RocksReadSnapshot") .field("provider", &self.provider) @@ -1409,9 +1420,9 @@ impl fmt::Debug for RocksReadSnapshot<'_> { } } -impl<'db> RocksReadSnapshot<'db> { +impl RocksReadSnapshot { /// Gets the column family handle for a table. - fn cf_handle(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> { + fn cf_handle(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> { self.provider.get_cf_handle::() } @@ -1550,178 +1561,6 @@ impl<'db> RocksReadSnapshot<'db> { } } -/// An owned RocksDB read snapshot that keeps the provider alive. -/// -/// This allows pinning a RocksDB snapshot at creation time and reusing it -/// for all subsequent reads, ensuring consistency with an MDBX read transaction -/// opened at the same point in time. -/// -/// # Safety -/// -/// `RocksDBProvider` wraps `Arc`, so the inner DB is -/// heap-allocated and stable. The snapshot borrows from the DB, which won't -/// move or be dropped while the Arc is held. Rust drops fields in declaration -/// order, so `inner` (the snapshot) is dropped before `_provider` (the Arc). -pub struct OwnedRocksReadSnapshot { - // IMPORTANT: `inner` must be declared before `_provider` so it drops first. - inner: RocksReadSnapshotInner<'static>, - _provider: RocksDBProvider, -} - -impl fmt::Debug for OwnedRocksReadSnapshot { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("OwnedRocksReadSnapshot").finish_non_exhaustive() - } -} - -// SAFETY: RocksDB snapshots are thread-safe — the underlying C++ SnapshotImpl is -// reference-counted and immutable. RocksDBProvider is Arc-based and already Send+Sync. -unsafe impl Send for OwnedRocksReadSnapshot {} -unsafe impl Sync for OwnedRocksReadSnapshot {} - -impl RocksDBProvider { - /// Creates an owned, pinned read snapshot. - /// - /// The returned snapshot keeps the provider alive and can be stored alongside - /// an MDBX read transaction to ensure cross-store consistency. - pub fn owned_snapshot(&self) -> OwnedRocksReadSnapshot { - // SAFETY: The snapshot borrows from the DB inside Arc. - // We clone the Arc (cheap) to keep it alive. The snapshot's actual lifetime - // is tied to the Arc, which we guarantee by storing both in the struct - // (with snapshot dropped first due to field order). - let snapshot = self.0.snapshot(); - let inner = unsafe { - std::mem::transmute::, RocksReadSnapshotInner<'static>>( - snapshot, - ) - }; - OwnedRocksReadSnapshot { inner, _provider: self.clone() } - } -} - -impl OwnedRocksReadSnapshot { - /// Gets the column family handle for a table. - fn cf_handle(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> { - self._provider.get_cf_handle::() - } - - /// Lookup account history and return [`HistoryInfo`] directly. - pub fn account_history_info( - &self, - address: Address, - block_number: BlockNumber, - lowest_available_block_number: Option, - ) -> ProviderResult { - let key = ShardedKey::new(address, block_number); - self.history_info::( - key.encode().as_ref(), - block_number, - lowest_available_block_number, - |key_bytes| Ok( as Decode>::decode(key_bytes)?.key == address), - |prev_bytes| { - as Decode>::decode(prev_bytes) - .map(|k| k.key == address) - .unwrap_or(false) - }, - ) - } - - /// Lookup storage history and return [`HistoryInfo`] directly. - pub fn storage_history_info( - &self, - address: Address, - storage_key: B256, - block_number: BlockNumber, - lowest_available_block_number: Option, - ) -> ProviderResult { - let key = StorageShardedKey::new(address, storage_key, block_number); - self.history_info::( - key.encode().as_ref(), - block_number, - lowest_available_block_number, - |key_bytes| { - let k = ::decode(key_bytes)?; - Ok(k.address == address && k.sharded_key.key == storage_key) - }, - |prev_bytes| { - ::decode(prev_bytes) - .map(|k| k.address == address && k.sharded_key.key == storage_key) - .unwrap_or(false) - }, - ) - } - - /// Generic history lookup using the snapshot's raw iterator. - fn history_info( - &self, - encoded_key: &[u8], - block_number: BlockNumber, - lowest_available_block_number: Option, - key_matches: impl FnOnce(&[u8]) -> Result, - prev_key_matches: impl Fn(&[u8]) -> bool, - ) -> ProviderResult - where - T: Table, - { - let is_maybe_pruned = lowest_available_block_number.is_some(); - let fallback = || { - Ok(if is_maybe_pruned { - HistoryInfo::MaybeInPlainState - } else { - HistoryInfo::NotYetWritten - }) - }; - - let cf = self.cf_handle::()?; - let mut iter = self.inner.raw_iterator_cf(cf); - - iter.seek(encoded_key); - iter.status().map_err(|e| { - ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { - message: e.to_string().into(), - code: -1, - })) - })?; - - if !iter.valid() { - return fallback(); - } - - let Some(key_bytes) = iter.key() else { - return fallback(); - }; - if !key_matches(key_bytes)? { - return fallback(); - } - - let Some(value_bytes) = iter.value() else { - return fallback(); - }; - let chunk = BlockNumberList::decompress(value_bytes)?; - let (rank, found_block) = compute_history_rank(&chunk, block_number); - - let is_before_first_write = if needs_prev_shard_check(rank, found_block, block_number) { - iter.prev(); - iter.status().map_err(|e| { - ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { - message: e.to_string().into(), - code: -1, - })) - })?; - let has_prev = iter.valid() && iter.key().is_some_and(&prev_key_matches); - !has_prev - } else { - false - }; - - Ok(HistoryInfo::from_lookup( - found_block, - is_before_first_write, - lowest_available_block_number, - )) - } -} - /// Outcome of pruning a history shard in `RocksDB`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PruneShardOutcome { @@ -3208,6 +3047,33 @@ mod tests { assert_eq!(result, HistoryInfo::InPlainState); } + #[test] + fn test_snapshot_stays_pinned_after_later_writes() { + let temp_dir = TempDir::new().unwrap(); + let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap(); + + let address = Address::from([0x42; 20]); + + let mut batch = provider.batch(); + batch.append_account_history_shard(address, vec![100]).unwrap(); + batch.commit().unwrap(); + + let pinned_snapshot = provider.snapshot(); + + let mut batch = provider.batch(); + batch.append_account_history_shard(address, vec![200]).unwrap(); + batch.commit().unwrap(); + + assert_eq!( + pinned_snapshot.account_history_info(address, 150, None).unwrap(), + HistoryInfo::InPlainState + ); + assert_eq!( + provider.snapshot().account_history_info(address, 150, None).unwrap(), + HistoryInfo::InChangeset(200) + ); + } + #[test] fn test_account_history_shard_split_at_boundary() { let temp_dir = TempDir::new().unwrap(); diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 30e53614d53..602a08a5c22 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -1,6 +1,7 @@ use crate::{ - AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider, - ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider, + providers::RocksReadSnapshot, AccountReader, BlockHashReader, ChangeSetReader, EitherReader, + HashedPostStateProvider, ProviderError, RocksDBProviderFactory, StateProvider, + StateRootProvider, }; use alloy_eips::merge::EPOCH_SLOTS; use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256}; @@ -32,8 +33,7 @@ use reth_trie_db::{ DatabaseStorageProof, DatabaseStorageRoot, }; -use crate::providers::rocksdb::OwnedRocksReadSnapshot; -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; type DbStateRoot<'a, TX, A> = StateRoot< reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>, @@ -128,8 +128,8 @@ pub struct HistoricalStateProviderRef<'b, Provider> { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, - /// Pinned RocksDB snapshot for cross-store read consistency. - pub(crate) pinned_rocksdb_snapshot: Option<&'b Arc>, + /// Pinned RocksDB snapshot taken alongside the MDBX read transaction. + pinned_rocksdb_snapshot: Option<&'b RocksReadSnapshot>, } impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumReader> @@ -155,6 +155,11 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block Self { provider, block_number, lowest_available_blocks, pinned_rocksdb_snapshot: None } } + pub(crate) fn with_pinned_rocksdb_snapshot(mut self, snapshot: &'b RocksReadSnapshot) -> Self { + self.pinned_rocksdb_snapshot = Some(snapshot); + self + } + /// Lookup an account in the `AccountsHistory` table using `EitherReader`. pub fn account_history_lookup(&self, address: Address) -> ProviderResult where @@ -164,12 +169,12 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - if let Some(snapshot) = &self.pinned_rocksdb_snapshot { + if let Some(snapshot) = self.pinned_rocksdb_snapshot { return snapshot.account_history_info( address, self.block_number, self.lowest_available_blocks.account_history_block_number, - ); + ) } self.provider.with_rocksdb_snapshot(|rocksdb_ref| { @@ -197,13 +202,13 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - if let Some(snapshot) = &self.pinned_rocksdb_snapshot { + if let Some(snapshot) = self.pinned_rocksdb_snapshot { return snapshot.storage_history_info( address, lookup_key, self.block_number, self.lowest_available_blocks.storage_history_block_number, - ); + ) } self.provider.with_rocksdb_snapshot(|rocksdb_ref| { @@ -614,8 +619,8 @@ pub struct HistoricalStateProvider { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, - /// Pinned RocksDB snapshot for cross-store read consistency. - pinned_rocksdb_snapshot: Option>, + /// Pinned RocksDB snapshot taken alongside the MDBX read transaction. + pinned_rocksdb_snapshot: Option, } impl @@ -631,8 +636,7 @@ impl) -> Self { + pub(crate) fn with_pinned_rocksdb_snapshot(mut self, snapshot: RocksReadSnapshot) -> Self { self.pinned_rocksdb_snapshot = Some(snapshot); self } @@ -658,11 +662,16 @@ impl HistoricalStateProviderRef<'_, Provider> { - HistoricalStateProviderRef { - provider: &self.provider, - block_number: self.block_number, - lowest_available_blocks: self.lowest_available_blocks, - pinned_rocksdb_snapshot: self.pinned_rocksdb_snapshot.as_ref(), + let provider = HistoricalStateProviderRef::new_with_lowest_available_blocks( + &self.provider, + self.block_number, + self.lowest_available_blocks, + ); + + if let Some(snapshot) = self.pinned_rocksdb_snapshot.as_ref() { + provider.with_pinned_rocksdb_snapshot(snapshot) + } else { + provider } } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index 6ddbbc2bfe3..2b208dd2f64 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -37,7 +37,7 @@ pub trait RocksDBProviderFactory { fn with_rocksdb_snapshot(&self, f: F) -> ProviderResult where Self: StorageSettingsCache, - F: FnOnce(RocksDBRefArg<'_>) -> ProviderResult, + F: FnOnce(RocksDBRefArg) -> ProviderResult, { if self.cached_storage_settings().storage_v2 { let rocksdb = self.rocksdb_provider(); From 3bf8272d1b0e682038686d312ae00a39be0acf00 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 15:29:04 +0100 Subject: [PATCH 3/9] refactor(storage): dedupe historical lookup dispatch Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- .../src/providers/state/historical.rs | 141 +++++++++++++----- 1 file changed, 107 insertions(+), 34 deletions(-) diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 602a08a5c22..ba9c888ef8f 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -9,9 +9,10 @@ use reth_db_api::{ cursor::{DbCursorRO, DbDupCursorRO}, table::Table, tables, - transaction::DbTx, + transaction::{CursorTy, DbTx}, BlockNumberList, }; +use reth_node_types::NodePrimitives; use reth_primitives_traits::{Account, Bytecode}; use reth_storage_api::{ BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider, @@ -109,6 +110,71 @@ impl HistoryInfo { } } +type AccountHistoryReader<'a, Provider> = HistoryLookupReader< + 'a, + CursorTy<::Tx, tables::AccountsHistory>, + ::Primitives, +>; +type StorageHistoryReader<'a, Provider> = HistoryLookupReader< + 'a, + CursorTy<::Tx, tables::StoragesHistory>, + ::Primitives, +>; + +enum HistoryLookupReader<'a, CURSOR, N> { + Snapshot(&'a RocksReadSnapshot), + Either(EitherReader<'a, CURSOR, N>), +} + +impl HistoryLookupReader<'_, CURSOR, N> +where + CURSOR: DbCursorRO, +{ + fn account_history_info( + &mut self, + address: Address, + block_number: BlockNumber, + lowest_available_block_number: Option, + ) -> ProviderResult { + match self { + Self::Snapshot(snapshot) => { + snapshot.account_history_info(address, block_number, lowest_available_block_number) + } + Self::Either(reader) => { + reader.account_history_info(address, block_number, lowest_available_block_number) + } + } + } +} + +impl HistoryLookupReader<'_, CURSOR, N> +where + CURSOR: DbCursorRO, +{ + fn storage_history_info( + &mut self, + address: Address, + storage_key: B256, + block_number: BlockNumber, + lowest_available_block_number: Option, + ) -> ProviderResult { + match self { + Self::Snapshot(snapshot) => snapshot.storage_history_info( + address, + storage_key, + block_number, + lowest_available_block_number, + ), + Self::Either(reader) => reader.storage_history_info( + address, + storage_key, + block_number, + lowest_available_block_number, + ), + } + } +} + /// State provider for a given block number which takes a tx reference. /// /// Historical state provider accesses the state at the start of the provided block number. @@ -160,33 +226,51 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block self } - /// Lookup an account in the `AccountsHistory` table using `EitherReader`. - pub fn account_history_lookup(&self, address: Address) -> ProviderResult + fn account_history_reader(&self) -> ProviderResult> where Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, { - if !self.lowest_available_blocks.is_account_history_available(self.block_number) { - return Err(ProviderError::StateAtBlockPruned(self.block_number)) + if let Some(snapshot) = self.pinned_rocksdb_snapshot { + return Ok(HistoryLookupReader::Snapshot(snapshot)) } + self.provider.with_rocksdb_snapshot(|rocksdb_ref| { + EitherReader::new_accounts_history(self.provider, rocksdb_ref) + .map(HistoryLookupReader::Either) + }) + } + + fn storage_history_reader(&self) -> ProviderResult> + where + Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, + { if let Some(snapshot) = self.pinned_rocksdb_snapshot { - return snapshot.account_history_info( - address, - self.block_number, - self.lowest_available_blocks.account_history_block_number, - ) + return Ok(HistoryLookupReader::Snapshot(snapshot)) } self.provider.with_rocksdb_snapshot(|rocksdb_ref| { - let mut reader = EitherReader::new_accounts_history(self.provider, rocksdb_ref)?; - reader.account_history_info( - address, - self.block_number, - self.lowest_available_blocks.account_history_block_number, - ) + EitherReader::new_storages_history(self.provider, rocksdb_ref) + .map(HistoryLookupReader::Either) }) } + /// Lookup an account in the `AccountsHistory` table using `EitherReader`. + pub fn account_history_lookup(&self, address: Address) -> ProviderResult + where + Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, + { + if !self.lowest_available_blocks.is_account_history_available(self.block_number) { + return Err(ProviderError::StateAtBlockPruned(self.block_number)) + } + + let mut reader = self.account_history_reader()?; + reader.account_history_info( + address, + self.block_number, + self.lowest_available_blocks.account_history_block_number, + ) + } + /// Lookup a storage key in the `StoragesHistory` table using `EitherReader`. /// /// `lookup_key` is always a plain (unhashed) storage key. @@ -202,24 +286,13 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - if let Some(snapshot) = self.pinned_rocksdb_snapshot { - return snapshot.storage_history_info( - address, - lookup_key, - self.block_number, - self.lowest_available_blocks.storage_history_block_number, - ) - } - - self.provider.with_rocksdb_snapshot(|rocksdb_ref| { - let mut reader = EitherReader::new_storages_history(self.provider, rocksdb_ref)?; - reader.storage_history_info( - address, - lookup_key, - self.block_number, - self.lowest_available_blocks.storage_history_block_number, - ) - }) + let mut reader = self.storage_history_reader()?; + reader.storage_history_info( + address, + lookup_key, + self.block_number, + self.lowest_available_blocks.storage_history_block_number, + ) } /// Resolves a storage value by looking up the given key in history, changesets, or From f797f1481fa3e1f31db026c6d30ff937c9c2a3ab Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:10:35 +0100 Subject: [PATCH 4/9] refactor(storage): reuse existing RocksDB snapshots for history lookups Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- crates/storage/provider/src/either_writer.rs | 10 +- .../provider/src/providers/database/mod.rs | 10 +- .../src/providers/state/historical.rs | 128 +++--------------- .../provider/src/traits/rocksdb_provider.rs | 23 +++- 4 files changed, 55 insertions(+), 116 deletions(-) diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index bb5d871d417..5d5f4c8bda1 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -72,7 +72,7 @@ pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction; /// /// The `Option` allows callers to skip `RocksDB` access when it isn't needed /// (e.g., on legacy MDBX-only nodes). -pub type RocksDBRefArg = Option; +pub type RocksDBRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksReadSnapshot>; /// Represents a destination for writing data, either to database, static files, or `RocksDB`. #[derive(Debug, Display)] @@ -673,7 +673,7 @@ pub enum EitherReader<'a, CURSOR, N> { /// Read from static file StaticFile(StaticFileProvider, PhantomData<&'a ()>), /// Read from `RocksDB` snapshot (works in both read-only and read-write modes) - RocksDB(crate::providers::rocksdb::RocksReadSnapshot), + RocksDB(&'a crate::providers::rocksdb::RocksReadSnapshot), } impl<'a> EitherReader<'a, (), ()> { @@ -698,7 +698,7 @@ impl<'a> EitherReader<'a, (), ()> { /// Creates a new [`EitherReader`] for storages history based on storage settings. pub fn new_storages_history

( provider: &P, - rocksdb: RocksDBRefArg, + rocksdb: RocksDBRefArg<'a>, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -719,7 +719,7 @@ impl<'a> EitherReader<'a, (), ()> { /// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings. pub fn new_transaction_hash_numbers

( provider: &P, - rocksdb: RocksDBRefArg, + rocksdb: RocksDBRefArg<'a>, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, @@ -740,7 +740,7 @@ impl<'a> EitherReader<'a, (), ()> { /// Creates a new [`EitherReader`] for account history based on storage settings. pub fn new_accounts_history

( provider: &P, - rocksdb: RocksDBRefArg, + rocksdb: RocksDBRefArg<'a>, ) -> ProviderResult> where P: DBProvider + NodePrimitivesProvider + StorageSettingsCache, diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 3275f8c201f..73a7a356f12 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -239,7 +239,13 @@ impl> ProviderFactory { } impl ProviderFactory { - fn provider_read_view( + /// Opens a read-only MDBX transaction and, for `storage_v2`, pins a matching RocksDB + /// snapshot. + /// + /// The helper retries if an MDBX write commit lands between opening the read transaction and + /// taking the RocksDB snapshot. This keeps both stores aligned for historical reads that span + /// MDBX state and RocksDB history indices. + fn read_tx_and_rocksdb_snapshot( &self, ) -> ProviderResult<(::TX, Option)> { if !self.cached_storage_settings().storage_v2 { @@ -270,7 +276,7 @@ impl ProviderFactory { /// data. #[track_caller] pub fn provider(&self) -> ProviderResult> { - let (tx, pinned_rocksdb_snapshot) = self.provider_read_view()?; + let (tx, pinned_rocksdb_snapshot) = self.read_tx_and_rocksdb_snapshot()?; let mut provider = DatabaseProvider::new( tx, diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index ba9c888ef8f..980f5ea5dc4 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -9,10 +9,9 @@ use reth_db_api::{ cursor::{DbCursorRO, DbDupCursorRO}, table::Table, tables, - transaction::{CursorTy, DbTx}, + transaction::DbTx, BlockNumberList, }; -use reth_node_types::NodePrimitives; use reth_primitives_traits::{Account, Bytecode}; use reth_storage_api::{ BlockNumReader, BytecodeReader, DBProvider, NodePrimitivesProvider, StateProofProvider, @@ -110,71 +109,6 @@ impl HistoryInfo { } } -type AccountHistoryReader<'a, Provider> = HistoryLookupReader< - 'a, - CursorTy<::Tx, tables::AccountsHistory>, - ::Primitives, ->; -type StorageHistoryReader<'a, Provider> = HistoryLookupReader< - 'a, - CursorTy<::Tx, tables::StoragesHistory>, - ::Primitives, ->; - -enum HistoryLookupReader<'a, CURSOR, N> { - Snapshot(&'a RocksReadSnapshot), - Either(EitherReader<'a, CURSOR, N>), -} - -impl HistoryLookupReader<'_, CURSOR, N> -where - CURSOR: DbCursorRO, -{ - fn account_history_info( - &mut self, - address: Address, - block_number: BlockNumber, - lowest_available_block_number: Option, - ) -> ProviderResult { - match self { - Self::Snapshot(snapshot) => { - snapshot.account_history_info(address, block_number, lowest_available_block_number) - } - Self::Either(reader) => { - reader.account_history_info(address, block_number, lowest_available_block_number) - } - } - } -} - -impl HistoryLookupReader<'_, CURSOR, N> -where - CURSOR: DbCursorRO, -{ - fn storage_history_info( - &mut self, - address: Address, - storage_key: B256, - block_number: BlockNumber, - lowest_available_block_number: Option, - ) -> ProviderResult { - match self { - Self::Snapshot(snapshot) => snapshot.storage_history_info( - address, - storage_key, - block_number, - lowest_available_block_number, - ), - Self::Either(reader) => reader.storage_history_info( - address, - storage_key, - block_number, - lowest_available_block_number, - ), - } - } -} - /// State provider for a given block number which takes a tx reference. /// /// Historical state provider accesses the state at the start of the provided block number. @@ -226,34 +160,6 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block self } - fn account_history_reader(&self) -> ProviderResult> - where - Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, - { - if let Some(snapshot) = self.pinned_rocksdb_snapshot { - return Ok(HistoryLookupReader::Snapshot(snapshot)) - } - - self.provider.with_rocksdb_snapshot(|rocksdb_ref| { - EitherReader::new_accounts_history(self.provider, rocksdb_ref) - .map(HistoryLookupReader::Either) - }) - } - - fn storage_history_reader(&self) -> ProviderResult> - where - Provider: StorageSettingsCache + RocksDBProviderFactory + NodePrimitivesProvider, - { - if let Some(snapshot) = self.pinned_rocksdb_snapshot { - return Ok(HistoryLookupReader::Snapshot(snapshot)) - } - - self.provider.with_rocksdb_snapshot(|rocksdb_ref| { - EitherReader::new_storages_history(self.provider, rocksdb_ref) - .map(HistoryLookupReader::Either) - }) - } - /// Lookup an account in the `AccountsHistory` table using `EitherReader`. pub fn account_history_lookup(&self, address: Address) -> ProviderResult where @@ -263,11 +169,16 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - let mut reader = self.account_history_reader()?; - reader.account_history_info( - address, - self.block_number, - self.lowest_available_blocks.account_history_block_number, + self.provider.with_existing_or_new_rocksdb_snapshot( + self.pinned_rocksdb_snapshot, + |rocksdb_ref| { + let mut reader = EitherReader::new_accounts_history(self.provider, rocksdb_ref)?; + reader.account_history_info( + address, + self.block_number, + self.lowest_available_blocks.account_history_block_number, + ) + }, ) } @@ -286,12 +197,17 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - let mut reader = self.storage_history_reader()?; - reader.storage_history_info( - address, - lookup_key, - self.block_number, - self.lowest_available_blocks.storage_history_block_number, + self.provider.with_existing_or_new_rocksdb_snapshot( + self.pinned_rocksdb_snapshot, + |rocksdb_ref| { + let mut reader = EitherReader::new_storages_history(self.provider, rocksdb_ref)?; + reader.storage_history_info( + address, + lookup_key, + self.block_number, + self.lowest_available_blocks.storage_history_block_number, + ) + }, ) } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index 2b208dd2f64..511d5cb0829 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -34,19 +34,36 @@ pub trait RocksDBProviderFactory { /// Unlike a transaction-based approach, this works in both read-only and read-write /// modes since the snapshot provides a consistent view of the data at the time it /// was created. - fn with_rocksdb_snapshot(&self, f: F) -> ProviderResult + fn with_existing_or_new_rocksdb_snapshot( + &self, + snapshot: RocksDBRefArg<'_>, + f: F, + ) -> ProviderResult where Self: StorageSettingsCache, - F: FnOnce(RocksDBRefArg) -> ProviderResult, + F: for<'a> FnOnce(RocksDBRefArg<'a>) -> ProviderResult, { if self.cached_storage_settings().storage_v2 { + if let Some(snapshot) = snapshot { + return f(Some(snapshot)); + } + let rocksdb = self.rocksdb_provider(); let snapshot = rocksdb.snapshot(); - return f(Some(snapshot)); + return f(Some(&snapshot)); } f(None) } + /// Executes a closure with a `RocksDB` point-in-time snapshot for consistent reads. + fn with_rocksdb_snapshot(&self, f: F) -> ProviderResult + where + Self: StorageSettingsCache, + F: for<'a> FnOnce(RocksDBRefArg<'a>) -> ProviderResult, + { + self.with_existing_or_new_rocksdb_snapshot(None, f) + } + /// Executes a closure with a `RocksDB` batch, automatically registering it for commit. /// /// This helper encapsulates all the cfg-gated `RocksDB` batch handling. From b5b03bbaefdbc31c96cb0eb5538e46554d7349f0 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:16:08 +0100 Subject: [PATCH 5/9] docs(storage): clarify RocksDB snapshot reuse helper Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- .../storage/provider/src/traits/rocksdb_provider.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index 511d5cb0829..ed571a00948 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -25,15 +25,14 @@ pub trait RocksDBProviderFactory { /// full commit path. fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>; - /// Executes a closure with a `RocksDB` point-in-time snapshot for consistent reads. + /// Executes a closure with either an existing `RocksDB` snapshot or a newly created one. /// - /// This helper encapsulates `RocksDB` access for read operations. - /// On legacy MDBX-only nodes (where `storage_v2` is false), this skips creating - /// the `RocksDB` snapshot entirely, avoiding unnecessary overhead. + /// On `storage_v2` nodes this reuses the provided snapshot when one is available, otherwise it + /// creates a fresh point-in-time snapshot for the duration of the closure. On legacy MDBX-only + /// nodes (where `storage_v2` is false), it skips `RocksDB` access entirely and passes `None`. /// - /// Unlike a transaction-based approach, this works in both read-only and read-write - /// modes since the snapshot provides a consistent view of the data at the time it - /// was created. + /// Unlike a transaction-based approach, snapshots work in both read-only and read-write modes + /// because they provide a consistent view of the database at the moment they were created. fn with_existing_or_new_rocksdb_snapshot( &self, snapshot: RocksDBRefArg<'_>, From 2ddbfc7e239cd0cf9e98219b69c6abe31e3254de Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 16:32:42 +0100 Subject: [PATCH 6/9] fix(storage): satisfy clippy for pinned snapshots Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- crates/storage/provider/src/providers/database/mod.rs | 6 +++--- .../provider/src/providers/database/provider.rs | 6 +++--- .../provider/src/providers/state/historical.rs | 11 +++++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 73a7a356f12..c96cd45a6a4 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -239,12 +239,12 @@ impl> ProviderFactory { } impl ProviderFactory { - /// Opens a read-only MDBX transaction and, for `storage_v2`, pins a matching RocksDB + /// Opens a read-only MDBX transaction and, for `storage_v2`, pins a matching `RocksDB` /// snapshot. /// /// The helper retries if an MDBX write commit lands between opening the read transaction and - /// taking the RocksDB snapshot. This keeps both stores aligned for historical reads that span - /// MDBX state and RocksDB history indices. + /// taking the `RocksDB` snapshot. This keeps both stores aligned for historical reads that + /// span MDBX state and `RocksDB` history indices. fn read_tx_and_rocksdb_snapshot( &self, ) -> ProviderResult<(::TX, Option)> { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index ffdb49e059f..3cb1ea7e6f8 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -212,7 +212,7 @@ pub struct DatabaseProvider { minimum_pruning_distance: u64, /// Database provider metrics metrics: metrics::DatabaseProviderMetrics, - /// Pinned RocksDB snapshot taken alongside the MDBX read transaction. + /// Pinned `RocksDB` snapshot taken alongside the MDBX read transaction. pinned_rocksdb_snapshot: Option, } @@ -253,11 +253,11 @@ impl DatabaseProvider { self } - pub(crate) fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { + pub(crate) const fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { self.pinned_rocksdb_snapshot.as_ref() } - fn take_pinned_rocksdb_snapshot(&mut self) -> Option { + const fn take_pinned_rocksdb_snapshot(&mut self) -> Option { self.pinned_rocksdb_snapshot.take() } } diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 980f5ea5dc4..14855755f0f 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -128,7 +128,7 @@ pub struct HistoricalStateProviderRef<'b, Provider> { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, - /// Pinned RocksDB snapshot taken alongside the MDBX read transaction. + /// Pinned `RocksDB` snapshot taken alongside the MDBX read transaction. pinned_rocksdb_snapshot: Option<&'b RocksReadSnapshot>, } @@ -155,7 +155,10 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block Self { provider, block_number, lowest_available_blocks, pinned_rocksdb_snapshot: None } } - pub(crate) fn with_pinned_rocksdb_snapshot(mut self, snapshot: &'b RocksReadSnapshot) -> Self { + pub(crate) const fn with_pinned_rocksdb_snapshot( + mut self, + snapshot: &'b RocksReadSnapshot, + ) -> Self { self.pinned_rocksdb_snapshot = Some(snapshot); self } @@ -608,7 +611,7 @@ pub struct HistoricalStateProvider { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, - /// Pinned RocksDB snapshot taken alongside the MDBX read transaction. + /// Pinned `RocksDB` snapshot taken alongside the MDBX read transaction. pinned_rocksdb_snapshot: Option, } @@ -650,7 +653,7 @@ impl HistoricalStateProviderRef<'_, Provider> { + const fn as_ref(&self) -> HistoricalStateProviderRef<'_, Provider> { let provider = HistoricalStateProviderRef::new_with_lowest_available_blocks( &self.provider, self.block_number, From f40af47318627ee2d4f0b42596ec6f3ddc236fcb Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:13:01 +0100 Subject: [PATCH 7/9] refactor(storage): centralize pinned RocksDB snapshot reuse Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- .../src/providers/database/provider.rs | 22 ++--- .../src/providers/state/historical.rs | 88 +++++-------------- .../provider/src/traits/rocksdb_provider.rs | 71 ++++++++++----- 3 files changed, 80 insertions(+), 101 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 3cb1ea7e6f8..ab1328c4f9d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -252,14 +252,6 @@ impl DatabaseProvider { self.pinned_rocksdb_snapshot = Some(snapshot); self } - - pub(crate) const fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { - self.pinned_rocksdb_snapshot.as_ref() - } - - const fn take_pinned_rocksdb_snapshot(&mut self) -> Option { - self.pinned_rocksdb_snapshot.take() - } } impl DatabaseProvider { @@ -291,9 +283,6 @@ impl DatabaseProvider { self.get_prune_checkpoint(PruneSegment::StorageHistory)?; let mut state_provider = HistoricalStateProviderRef::new(self, block_number); - if let Some(snapshot) = self.pinned_rocksdb_snapshot() { - state_provider = state_provider.with_pinned_rocksdb_snapshot(snapshot); - } // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment. @@ -347,6 +336,10 @@ impl RocksDBProviderFactory for DatabaseProvider { self.rocksdb_provider.clone() } + fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { + self.pinned_rocksdb_snapshot.as_ref() + } + fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction) { self.pending_rocksdb_batches.lock().push(batch); } @@ -888,7 +881,7 @@ impl TryIntoHistoricalStateProvider for Databa self, mut block_number: BlockNumber, ) -> ProviderResult { - let mut provider = self; + let provider = self; let best_block = provider.best_block_number().unwrap_or_default(); // Reject requests for blocks beyond the best block @@ -912,12 +905,7 @@ impl TryIntoHistoricalStateProvider for Databa let storage_history_prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::StorageHistory)?; - let pinned_rocksdb_snapshot = provider.take_pinned_rocksdb_snapshot(); - let mut state_provider = HistoricalStateProvider::new(provider, block_number); - if let Some(snapshot) = pinned_rocksdb_snapshot { - state_provider = state_provider.with_pinned_rocksdb_snapshot(snapshot); - } // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment. diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 14855755f0f..ee2a85fae60 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -1,7 +1,6 @@ use crate::{ - providers::RocksReadSnapshot, AccountReader, BlockHashReader, ChangeSetReader, EitherReader, - HashedPostStateProvider, ProviderError, RocksDBProviderFactory, StateProvider, - StateRootProvider, + AccountReader, BlockHashReader, ChangeSetReader, EitherReader, HashedPostStateProvider, + ProviderError, RocksDBProviderFactory, StateProvider, StateRootProvider, }; use alloy_eips::merge::EPOCH_SLOTS; use alloy_primitives::{Address, BlockNumber, Bytes, StorageKey, StorageValue, B256}; @@ -128,8 +127,6 @@ pub struct HistoricalStateProviderRef<'b, Provider> { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, - /// Pinned `RocksDB` snapshot taken alongside the MDBX read transaction. - pinned_rocksdb_snapshot: Option<&'b RocksReadSnapshot>, } impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + BlockNumReader> @@ -137,12 +134,7 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block { /// Create new `StateProvider` for historical block number pub fn new(provider: &'b Provider, block_number: BlockNumber) -> Self { - Self { - provider, - block_number, - lowest_available_blocks: Default::default(), - pinned_rocksdb_snapshot: None, - } + Self { provider, block_number, lowest_available_blocks: Default::default() } } /// Create new `StateProvider` for historical block number and lowest block numbers at which @@ -152,15 +144,7 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block block_number: BlockNumber, lowest_available_blocks: LowestAvailableBlocks, ) -> Self { - Self { provider, block_number, lowest_available_blocks, pinned_rocksdb_snapshot: None } - } - - pub(crate) const fn with_pinned_rocksdb_snapshot( - mut self, - snapshot: &'b RocksReadSnapshot, - ) -> Self { - self.pinned_rocksdb_snapshot = Some(snapshot); - self + Self { provider, block_number, lowest_available_blocks } } /// Lookup an account in the `AccountsHistory` table using `EitherReader`. @@ -172,17 +156,14 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - self.provider.with_existing_or_new_rocksdb_snapshot( - self.pinned_rocksdb_snapshot, - |rocksdb_ref| { - let mut reader = EitherReader::new_accounts_history(self.provider, rocksdb_ref)?; - reader.account_history_info( - address, - self.block_number, - self.lowest_available_blocks.account_history_block_number, - ) - }, - ) + self.provider.with_rocksdb_snapshot(|rocksdb_ref| { + let mut reader = EitherReader::new_accounts_history(self.provider, rocksdb_ref)?; + reader.account_history_info( + address, + self.block_number, + self.lowest_available_blocks.account_history_block_number, + ) + }) } /// Lookup a storage key in the `StoragesHistory` table using `EitherReader`. @@ -200,18 +181,15 @@ impl<'b, Provider: DBProvider + ChangeSetReader + StorageChangeSetReader + Block return Err(ProviderError::StateAtBlockPruned(self.block_number)) } - self.provider.with_existing_or_new_rocksdb_snapshot( - self.pinned_rocksdb_snapshot, - |rocksdb_ref| { - let mut reader = EitherReader::new_storages_history(self.provider, rocksdb_ref)?; - reader.storage_history_info( - address, - lookup_key, - self.block_number, - self.lowest_available_blocks.storage_history_block_number, - ) - }, - ) + self.provider.with_rocksdb_snapshot(|rocksdb_ref| { + let mut reader = EitherReader::new_storages_history(self.provider, rocksdb_ref)?; + reader.storage_history_info( + address, + lookup_key, + self.block_number, + self.lowest_available_blocks.storage_history_block_number, + ) + }) } /// Resolves a storage value by looking up the given key in history, changesets, or @@ -611,8 +589,6 @@ pub struct HistoricalStateProvider { block_number: BlockNumber, /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, - /// Pinned `RocksDB` snapshot taken alongside the MDBX read transaction. - pinned_rocksdb_snapshot: Option, } impl @@ -620,17 +596,7 @@ impl Self { - Self { - provider, - block_number, - lowest_available_blocks: Default::default(), - pinned_rocksdb_snapshot: None, - } - } - - pub(crate) fn with_pinned_rocksdb_snapshot(mut self, snapshot: RocksReadSnapshot) -> Self { - self.pinned_rocksdb_snapshot = Some(snapshot); - self + Self { provider, block_number, lowest_available_blocks: Default::default() } } /// Set the lowest block number at which the account history is available. @@ -654,17 +620,11 @@ impl HistoricalStateProviderRef<'_, Provider> { - let provider = HistoricalStateProviderRef::new_with_lowest_available_blocks( + HistoricalStateProviderRef::new_with_lowest_available_blocks( &self.provider, self.block_number, self.lowest_available_blocks, - ); - - if let Some(snapshot) = self.pinned_rocksdb_snapshot.as_ref() { - provider.with_pinned_rocksdb_snapshot(snapshot) - } else { - provider - } + ) } } diff --git a/crates/storage/provider/src/traits/rocksdb_provider.rs b/crates/storage/provider/src/traits/rocksdb_provider.rs index ed571a00948..aff46ebf03f 100644 --- a/crates/storage/provider/src/traits/rocksdb_provider.rs +++ b/crates/storage/provider/src/traits/rocksdb_provider.rs @@ -1,6 +1,6 @@ use crate::{ either_writer::{RawRocksDBBatch, RocksBatchArg, RocksDBRefArg}, - providers::RocksDBProvider, + providers::{RocksDBProvider, RocksReadSnapshot}, }; use reth_storage_api::StorageSettingsCache; use reth_storage_errors::provider::ProviderResult; @@ -12,6 +12,14 @@ pub trait RocksDBProviderFactory { /// Returns the `RocksDB` provider. fn rocksdb_provider(&self) -> RocksDBProvider; + /// Returns a pinned `RocksDB` snapshot, if this provider already carries one. + /// + /// Read-only `storage_v2` providers use this to reuse the snapshot captured alongside the + /// MDBX read transaction. Other providers fall back to a fresh snapshot per read. + fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { + None + } + /// Adds a pending `RocksDB` batch to be committed when this provider is committed. /// /// This allows deferring `RocksDB` commits to happen at the same time as MDBX and static file @@ -25,25 +33,22 @@ pub trait RocksDBProviderFactory { /// full commit path. fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()>; - /// Executes a closure with either an existing `RocksDB` snapshot or a newly created one. + /// Executes a closure with a `RocksDB` point-in-time snapshot for consistent reads. /// - /// On `storage_v2` nodes this reuses the provided snapshot when one is available, otherwise it - /// creates a fresh point-in-time snapshot for the duration of the closure. On legacy MDBX-only - /// nodes (where `storage_v2` is false), it skips `RocksDB` access entirely and passes `None`. + /// On `storage_v2` nodes this reuses the provider's pinned snapshot when one is available, + /// otherwise it creates a fresh point-in-time snapshot for the duration of the closure. On + /// legacy MDBX-only nodes (where `storage_v2` is false), it skips `RocksDB` access entirely + /// and passes `None`. /// /// Unlike a transaction-based approach, snapshots work in both read-only and read-write modes /// because they provide a consistent view of the database at the moment they were created. - fn with_existing_or_new_rocksdb_snapshot( - &self, - snapshot: RocksDBRefArg<'_>, - f: F, - ) -> ProviderResult + fn with_rocksdb_snapshot(&self, f: F) -> ProviderResult where Self: StorageSettingsCache, F: for<'a> FnOnce(RocksDBRefArg<'a>) -> ProviderResult, { if self.cached_storage_settings().storage_v2 { - if let Some(snapshot) = snapshot { + if let Some(snapshot) = self.pinned_rocksdb_snapshot() { return f(Some(snapshot)); } @@ -54,15 +59,6 @@ pub trait RocksDBProviderFactory { f(None) } - /// Executes a closure with a `RocksDB` point-in-time snapshot for consistent reads. - fn with_rocksdb_snapshot(&self, f: F) -> ProviderResult - where - Self: StorageSettingsCache, - F: for<'a> FnOnce(RocksDBRefArg<'a>) -> ProviderResult, - { - self.with_existing_or_new_rocksdb_snapshot(None, f) - } - /// Executes a closure with a `RocksDB` batch, automatically registering it for commit. /// /// This helper encapsulates all the cfg-gated `RocksDB` batch handling. @@ -127,6 +123,7 @@ mod tests { struct TestProvider { settings: StorageSettings, mock_rocksdb: MockRocksDBProvider, + pinned_snapshot: Option, temp_dir: tempfile::TempDir, } @@ -135,10 +132,23 @@ mod tests { Self { settings, mock_rocksdb: MockRocksDBProvider::new(), + pinned_snapshot: None, temp_dir: tempfile::TempDir::new().unwrap(), } } + fn with_pinned_snapshot(settings: StorageSettings) -> Self { + let temp_dir = tempfile::TempDir::new().unwrap(); + let rocksdb = RocksDBProvider::new(temp_dir.path()).unwrap(); + + Self { + settings, + mock_rocksdb: MockRocksDBProvider::new(), + pinned_snapshot: Some(rocksdb.snapshot()), + temp_dir, + } + } + fn tx_call_count(&self) -> usize { self.mock_rocksdb.tx_call_count() } @@ -158,6 +168,10 @@ mod tests { RocksDBProvider::new(self.temp_dir.path()).unwrap() } + fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> { + self.pinned_snapshot.as_ref() + } + fn set_pending_rocksdb_batch(&self, _batch: rocksdb::WriteBatchWithTransaction) {} fn commit_pending_rocksdb_batches(&self) -> ProviderResult<()> { @@ -199,4 +213,21 @@ mod tests { "should create RocksDB provider when storage_v2 is true" ); } + + #[test] + fn test_rocksdb_settings_reuse_pinned_snapshot() { + let provider = TestProvider::with_pinned_snapshot(StorageSettings::v2()); + + let result = provider.with_rocksdb_snapshot(|rocksdb| { + assert!(rocksdb.is_some(), "rocksdb settings should pass Some snapshot"); + Ok(42) + }); + + assert_eq!(result.unwrap(), 42); + assert_eq!( + provider.tx_call_count(), + 0, + "should reuse the pinned RocksDB snapshot instead of creating a new provider" + ); + } } From ac99cbde47faa7e409c12a905edde8ad692c1c30 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:16:31 +0100 Subject: [PATCH 8/9] refactor(storage): simplify pinned snapshot setup Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- .../storage/provider/src/providers/database/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index c96cd45a6a4..f9635380e81 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -278,7 +278,7 @@ impl ProviderFactory { pub fn provider(&self) -> ProviderResult> { let (tx, pinned_rocksdb_snapshot) = self.read_tx_and_rocksdb_snapshot()?; - let mut provider = DatabaseProvider::new( + let provider = DatabaseProvider::new( tx, self.chain_spec.clone(), self.static_file_provider.clone(), @@ -292,11 +292,11 @@ impl ProviderFactory { ) .with_minimum_pruning_distance(self.minimum_pruning_distance); - if let Some(snapshot) = pinned_rocksdb_snapshot { - provider = provider.with_pinned_rocksdb_snapshot(snapshot); - } - - Ok(provider) + Ok(if let Some(snapshot) = pinned_rocksdb_snapshot { + provider.with_pinned_rocksdb_snapshot(snapshot) + } else { + provider + }) } /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating From c9d1fdf79708ec6737f0124b46cd71547fdaab0c Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 31 Mar 2026 17:24:18 +0100 Subject: [PATCH 9/9] refactor(storage): trim redundant provider rebinding Amp-Thread-ID: https://ampcode.com/threads/T-019d4435-3bfe-7739-b47f-fbcf6473a16a Co-authored-by: Amp --- .../provider/src/providers/database/provider.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index ab1328c4f9d..0d42fc27f3e 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -881,8 +881,7 @@ impl TryIntoHistoricalStateProvider for Databa self, mut block_number: BlockNumber, ) -> ProviderResult { - let provider = self; - let best_block = provider.best_block_number().unwrap_or_default(); + let best_block = self.best_block_number().unwrap_or_default(); // Reject requests for blocks beyond the best block if block_number > best_block { @@ -894,18 +893,18 @@ impl TryIntoHistoricalStateProvider for Databa // If requesting state at the best block, use the latest state provider if block_number == best_block { - return Ok(Box::new(LatestStateProvider::new(provider))); + return Ok(Box::new(LatestStateProvider::new(self))); } // +1 as the changeset that we want is the one that was applied after this block. block_number += 1; let account_history_prune_checkpoint = - provider.get_prune_checkpoint(PruneSegment::AccountHistory)?; + self.get_prune_checkpoint(PruneSegment::AccountHistory)?; let storage_history_prune_checkpoint = - provider.get_prune_checkpoint(PruneSegment::StorageHistory)?; + self.get_prune_checkpoint(PruneSegment::StorageHistory)?; - let mut state_provider = HistoricalStateProvider::new(provider, block_number); + let mut state_provider = HistoricalStateProvider::new(self, block_number); // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune segment.