Skip to content
4 changes: 2 additions & 2 deletions crates/storage/provider/src/either_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub type RawRocksDBBatch = rocksdb::WriteBatchWithTransaction<true>;
///
/// The `Option` allows callers to skip `RocksDB` access when it isn't needed
/// (e.g., on legacy MDBX-only nodes).
pub type RocksDBRefArg<'a> = Option<crate::providers::rocksdb::RocksReadSnapshot<'a>>;
pub type RocksDBRefArg<'a> = Option<&'a crate::providers::rocksdb::RocksReadSnapshot>;

/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
#[derive(Debug, Display)]
Expand Down Expand Up @@ -673,7 +673,7 @@ pub enum EitherReader<'a, CURSOR, N> {
/// Read from static file
StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
/// Read from `RocksDB` snapshot (works in both read-only and read-write modes)
RocksDB(crate::providers::rocksdb::RocksReadSnapshot<'a>),
RocksDB(&'a crate::providers::rocksdb::RocksReadSnapshot),
}

impl<'a> EitherReader<'a, (), ()> {
Expand Down
86 changes: 78 additions & 8 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
providers::{
state::latest::LatestStateProvider, NodeTypesForProvider, RocksDBProvider,
StaticFileProvider, StaticFileProviderRWRefMut,
RocksReadSnapshot, StaticFileProvider, StaticFileProviderRWRefMut,
},
to_range,
traits::{BlockSource, ReceiptProvider},
Expand Down Expand Up @@ -239,6 +239,35 @@ impl<N: ProviderNodeTypes<DB = DatabaseEnv>> ProviderFactory<N> {
}

impl<N: ProviderNodeTypes> ProviderFactory<N> {
/// Opens a read-only MDBX transaction and, for `storage_v2`, pins a matching `RocksDB`
/// snapshot.
///
/// The helper retries if an MDBX write commit lands between opening the read transaction and
/// taking the `RocksDB` snapshot. This keeps both stores aligned for historical reads that
/// span MDBX state and `RocksDB` history indices.
Comment on lines +245 to +247
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to note that this:

  1. allows rocksdb to be ahead of mdbx, ie mdbx at block 7 and rocksdb up to block 10, and
  2. we need docs that outline what behavior is safe, for example the engine is safe because it creates txs sequentially, and we only ever perform SaveBlocks after we have closed a tx and added a block to the tree. I am not sure a single historical provider should outlive both a remove_blocks mdbx commit and a subsequent save_blocks

fn read_tx_and_rocksdb_snapshot(
&self,
) -> ProviderResult<(<N::DB as Database>::TX, Option<RocksReadSnapshot>)> {
if !self.cached_storage_settings().storage_v2 {
return Ok((self.db.tx()?, None));
}

loop {
let txnid_before = self.db.last_txnid();
let tx = self.db.tx()?;
let snapshot = self.rocksdb_provider.snapshot();

if self.db.last_txnid() == txnid_before {
return Ok((tx, Some(snapshot)));
}

tracing::debug!(
target: "providers::db",
"MDBX write commit detected while pinning RocksDB snapshot, retrying"
);
}
}

/// Returns a provider with a created `DbTx` inside, which allows fetching data from the
/// database using different types of providers. Example: [`HeaderProvider`]
/// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
Expand All @@ -247,8 +276,10 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
/// data.
#[track_caller]
pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<N::DB, N>> {
Ok(DatabaseProvider::new(
self.db.tx()?,
let (tx, pinned_rocksdb_snapshot) = self.read_tx_and_rocksdb_snapshot()?;

let provider = DatabaseProvider::new(
tx,
self.chain_spec.clone(),
self.static_file_provider.clone(),
self.prune_modes.clone(),
Expand All @@ -259,7 +290,13 @@ impl<N: ProviderNodeTypes> ProviderFactory<N> {
self.runtime.clone(),
self.db.path(),
)
.with_minimum_pruning_distance(self.minimum_pruning_distance))
.with_minimum_pruning_distance(self.minimum_pruning_distance);

Ok(if let Some(snapshot) = pinned_rocksdb_snapshot {
provider.with_pinned_rocksdb_snapshot(snapshot)
} else {
provider
})
}

/// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
Expand Down Expand Up @@ -857,21 +894,22 @@ impl<N: NodeTypesWithDB> Clone for ProviderFactory<N> {
mod tests {
use super::*;
use crate::{
providers::{StaticFileProvider, StaticFileWriter},
providers::{HistoryInfo, StaticFileProvider, StaticFileWriter},
test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB},
BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider,
TransactionsProvider,
RocksDBProviderFactory, TransactionsProvider,
};
use alloy_primitives::{TxNumber, B256};
use alloy_primitives::{Address, TxNumber, B256};
use assert_matches::assert_matches;
use reth_chainspec::ChainSpecBuilder;
use reth_db::{
mdbx::DatabaseArguments,
test_utils::{create_test_rocksdb_dir, create_test_static_files_dir, ERROR_TEMPDIR},
};
use reth_db_api::tables;
use reth_db_api::{models::StorageSettings, tables};
use reth_primitives_traits::SignerRecoverable;
use reth_prune_types::{PruneMode, PruneModes};
use reth_storage_api::StorageSettingsCache;
use reth_storage_errors::provider::ProviderError;
use reth_testing_utils::generators::{self, random_block, random_header, BlockParams};
use std::{ops::RangeInclusive, sync::Arc};
Expand Down Expand Up @@ -902,6 +940,38 @@ mod tests {
provider.block_hash(0).unwrap();
}

#[test]
fn provider_pins_rocksdb_snapshot_for_storage_v2() {
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(StorageSettings::v2());

let address = Address::from([0x42; 20]);
let rocksdb = factory.rocksdb_provider();

let mut batch = rocksdb.batch();
batch.append_account_history_shard(address, vec![100]).unwrap();
batch.commit().unwrap();

let provider = factory.provider().unwrap();

let mut batch = rocksdb.batch();
batch.append_account_history_shard(address, vec![200]).unwrap();
batch.commit().unwrap();

assert_eq!(
provider
.pinned_rocksdb_snapshot()
.expect("storage_v2 providers pin a RocksDB snapshot")
.account_history_info(address, 150, None)
.unwrap(),
HistoryInfo::InPlainState
);
assert_eq!(
rocksdb.snapshot().account_history_info(address, 150, None).unwrap(),
HistoryInfo::InChangeset(200)
);
}

#[test]
fn provider_factory_with_database_path() {
let chain_spec = ChainSpecBuilder::mainnet().build();
Expand Down
16 changes: 15 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
changesets_utils::StorageRevertsIter,
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx, RocksReadSnapshot},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
Expand Down Expand Up @@ -212,6 +212,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
minimum_pruning_distance: u64,
/// Database provider metrics
metrics: metrics::DatabaseProviderMetrics,
/// Pinned `RocksDB` snapshot taken alongside the MDBX read transaction.
pinned_rocksdb_snapshot: Option<RocksReadSnapshot>,
}

impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
Expand All @@ -229,6 +231,7 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("pending_rocksdb_batches", &"<pending batches>")
.field("commit_order", &self.commit_order)
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.field("pinned_rocksdb_snapshot", &self.pinned_rocksdb_snapshot)
.finish()
}
}
Expand All @@ -244,6 +247,11 @@ impl<TX, N: NodeTypes> DatabaseProvider<TX, N> {
self.minimum_pruning_distance = distance;
self
}

pub(crate) fn with_pinned_rocksdb_snapshot(mut self, snapshot: RocksReadSnapshot) -> Self {
self.pinned_rocksdb_snapshot = Some(snapshot);
self
}
}

impl<TX: DbTx + 'static, N: NodeTypes> DatabaseProvider<TX, N> {
Expand Down Expand Up @@ -328,6 +336,10 @@ impl<TX, N: NodeTypes> RocksDBProviderFactory for DatabaseProvider<TX, N> {
self.rocksdb_provider.clone()
}

fn pinned_rocksdb_snapshot(&self) -> Option<&RocksReadSnapshot> {
self.pinned_rocksdb_snapshot.as_ref()
}

fn set_pending_rocksdb_batch(&self, batch: rocksdb::WriteBatchWithTransaction<true>) {
self.pending_rocksdb_batches.lock().push(batch);
}
Expand Down Expand Up @@ -382,6 +394,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
commit_order,
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
pinned_rocksdb_snapshot: None,
}
}

Expand Down Expand Up @@ -1007,6 +1020,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
commit_order: CommitOrder::Normal,
minimum_pruning_distance: MINIMUM_UNWIND_SAFE_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
pinned_rocksdb_snapshot: None,
}
}

Expand Down
54 changes: 46 additions & 8 deletions crates/storage/provider/src/providers/rocksdb/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,17 @@ impl RocksDBProvider {
/// Returns a read-only, point-in-time snapshot of the database.
///
/// Lighter weight than [`RocksTx`] — no write-conflict tracking, and `Send + Sync`.
pub fn snapshot(&self) -> RocksReadSnapshot<'_> {
RocksReadSnapshot { inner: self.0.snapshot(), provider: self }
pub fn snapshot(&self) -> RocksReadSnapshot {
// SAFETY: the snapshot borrows the underlying RocksDB handle stored inside the provider's
// `Arc`. Cloning the provider keeps that allocation alive for the lifetime of the snapshot,
// and `RocksReadSnapshot` drops `inner` before `provider`.
let inner = unsafe {
std::mem::transmute::<RocksReadSnapshotInner<'_>, RocksReadSnapshotInner<'static>>(
self.0.snapshot(),
)
};

RocksReadSnapshot { inner, provider: self.clone() }
}

/// Creates a new transaction with MDBX-like semantics (read-your-writes, rollback).
Expand Down Expand Up @@ -1378,9 +1387,11 @@ impl RocksDBProvider {
/// used by [`EitherReader::RocksDB`](crate::either_writer::EitherReader) for history lookups.
///
/// Lighter weight than [`RocksTx`] — no transaction overhead, no write support.
pub struct RocksReadSnapshot<'db> {
inner: RocksReadSnapshotInner<'db>,
provider: &'db RocksDBProvider,
pub struct RocksReadSnapshot {
// `inner` must drop before `provider` so the borrowed RocksDB snapshot is released before the
// cloned provider keeping the DB alive goes away.
inner: RocksReadSnapshotInner<'static>,
provider: RocksDBProvider,
}

/// Inner enum to hold the snapshot for either read-write or read-only mode.
Expand All @@ -1401,17 +1412,17 @@ impl<'db> RocksReadSnapshotInner<'db> {
}
}

impl fmt::Debug for RocksReadSnapshot<'_> {
impl fmt::Debug for RocksReadSnapshot {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksReadSnapshot")
.field("provider", &self.provider)
.finish_non_exhaustive()
}
}

impl<'db> RocksReadSnapshot<'db> {
impl RocksReadSnapshot {
/// Gets the column family handle for a table.
fn cf_handle<T: Table>(&self) -> Result<&'db rocksdb::ColumnFamily, DatabaseError> {
fn cf_handle<T: Table>(&self) -> Result<&rocksdb::ColumnFamily, DatabaseError> {
self.provider.get_cf_handle::<T>()
}

Expand Down Expand Up @@ -3036,6 +3047,33 @@ mod tests {
assert_eq!(result, HistoryInfo::InPlainState);
}

#[test]
fn test_snapshot_stays_pinned_after_later_writes() {
let temp_dir = TempDir::new().unwrap();
let provider = RocksDBBuilder::new(temp_dir.path()).with_default_tables().build().unwrap();

let address = Address::from([0x42; 20]);

let mut batch = provider.batch();
batch.append_account_history_shard(address, vec![100]).unwrap();
batch.commit().unwrap();

let pinned_snapshot = provider.snapshot();

let mut batch = provider.batch();
batch.append_account_history_shard(address, vec![200]).unwrap();
batch.commit().unwrap();

assert_eq!(
pinned_snapshot.account_history_info(address, 150, None).unwrap(),
HistoryInfo::InPlainState
);
assert_eq!(
provider.snapshot().account_history_info(address, 150, None).unwrap(),
HistoryInfo::InChangeset(200)
);
}

#[test]
fn test_account_history_shard_split_at_boundary() {
let temp_dir = TempDir::new().unwrap();
Expand Down
Loading
Loading