Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 140 additions & 16 deletions crates/storage/provider/src/either_writer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
//! Generic reader and writer abstractions for interacting with either database tables or static
//! files.

use std::ops::Range;
use std::{marker::PhantomData, ops::Range};

#[cfg(all(unix, feature = "rocksdb"))]
use crate::providers::rocksdb::RocksTx;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
StaticFileProviderFactory,
Expand All @@ -24,8 +26,8 @@ use reth_storage_errors::provider::ProviderResult;
use strum::{Display, EnumIs};

/// Type alias for [`EitherReader`] constructors.
type EitherReaderTy<P, T> =
EitherReader<CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;
type EitherReaderTy<'a, P, T> =
EitherReader<'a, CursorTy<<P as DBProvider>::Tx, T>, <P as NodePrimitivesProvider>::Primitives>;

/// Type alias for [`EitherWriter`] constructors.
type EitherWriterTy<'a, P, T> = EitherWriter<
Expand All @@ -34,13 +36,27 @@ type EitherWriterTy<'a, P, T> = EitherWriter<
<P as NodePrimitivesProvider>::Primitives,
>;

/// Represents a destination for writing data, either to database or static files.
// Helper type so constructors stay exported even when RocksDB feature is off.
#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxArg<'a> = crate::providers::rocksdb::RocksTx<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksTxArg<'a> = ();

#[cfg(all(unix, feature = "rocksdb"))]
type RocksTxRefArg<'a> = &'a crate::providers::rocksdb::RocksTx<'a>;
#[cfg(not(all(unix, feature = "rocksdb")))]
type RocksTxRefArg<'a> = ();

/// Represents a destination for writing data, either to database, static files, or `RocksDB`.
#[derive(Debug, Display)]
pub enum EitherWriter<'a, CURSOR, N> {
/// Write to database table via cursor
Database(CURSOR),
/// Write to static file
StaticFile(StaticFileProviderRWRefMut<'a, N>),
/// Write to `RocksDB` transaction
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(RocksTx<'a>),
}

impl<'a> EitherWriter<'a, (), ()> {
Expand Down Expand Up @@ -109,6 +125,42 @@ impl<'a> EitherWriter<'a, (), ()> {
))
}
}

/// Creates a new [`EitherWriter`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
}

Ok(EitherWriter::Database(provider.tx_ref().cursor_write::<tables::StoragesHistory>()?))
}

/// Creates a new [`EitherWriter`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this what you mean by constructor? @Rjected

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. I think we need to make sure this fn is not gated by cfg though

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Introduced type aliases (RocksTxArg, RocksTxRefArg) that resolve to RocksTx<'a> or &'a RocksTx<'a> when rocksdb is enabled, and () when disabled. The constructors are now always available - internal cfg blocks handle the routing.

provider: &P,
_rocksdb_tx: RocksTxArg<'a>,
) -> ProviderResult<EitherWriterTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherWriter::RocksDB(_rocksdb_tx));
}

Ok(EitherWriter::Database(
provider.tx_ref().cursor_write::<tables::TransactionHashNumbers>()?,
))
}
}

impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
Expand All @@ -119,6 +171,8 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.increment_block(expected_block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}

Expand All @@ -132,6 +186,20 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
match self {
Self::Database(_) => Ok(()),
Self::StaticFile(writer) => writer.ensure_at_block(block_number),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}

/// Commits the `RocksDB` transaction if this is a `RocksDB` writer.
///
/// For [`Self::Database`] and [`Self::StaticFile`], this is a no-op as they use
/// different commit patterns (MDBX transaction commit, static file sync).
#[cfg(all(unix, feature = "rocksdb"))]
pub fn commit(self) -> ProviderResult<()> {
match self {
Self::Database(_) | Self::StaticFile(_) => Ok(()),
Self::RocksDB(tx) => tx.commit(),
}
}
}
Expand All @@ -146,6 +214,8 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, receipt)?),
Self::StaticFile(writer) => writer.append_receipt(tx_num, receipt),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
}
Expand All @@ -159,6 +229,8 @@ where
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, sender)?),
Self::StaticFile(writer) => writer.append_transaction_sender(tx_num, sender),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}

Expand All @@ -175,6 +247,8 @@ where
Ok(())
}
Self::StaticFile(writer) => writer.append_transaction_senders(senders),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}

Expand Down Expand Up @@ -206,41 +280,87 @@ where

writer.prune_transaction_senders(to_delete, block)?;
}
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => return Err(ProviderError::UnsupportedProvider),
}

Ok(())
}
}

/// Represents a source for reading data, either from database or static files.
/// Represents a source for reading data, either from database, static files, or `RocksDB`.
#[derive(Debug, Display)]
pub enum EitherReader<CURSOR, N> {
pub enum EitherReader<'a, CURSOR, N> {
/// Read from database table via cursor
Database(CURSOR),
Database(CURSOR, PhantomData<&'a ()>),
/// Read from static file
StaticFile(StaticFileProvider<N>),
StaticFile(StaticFileProvider<N>, PhantomData<&'a ()>),
/// Read from `RocksDB` transaction
#[cfg(all(unix, feature = "rocksdb"))]
RocksDB(&'a crate::providers::rocksdb::RocksTx<'a>),
}

impl EitherReader<(), ()> {
impl<'a> EitherReader<'a, (), ()> {
/// Creates a new [`EitherReader`] for senders based on storage settings.
pub fn new_senders<P>(
provider: &P,
) -> ProviderResult<EitherReaderTy<P, tables::TransactionSenders>>
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionSenders>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache + StaticFileProviderFactory,
P::Tx: DbTx,
{
if EitherWriterDestination::senders(provider).is_static_file() {
Ok(EitherReader::StaticFile(provider.static_file_provider()))
Ok(EitherReader::StaticFile(provider.static_file_provider(), PhantomData))
} else {
Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::TransactionSenders>()?,
PhantomData,
))
}
}

/// Creates a new [`EitherReader`] for storages history based on storage settings.
pub fn new_storages_history<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::StoragesHistory>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().storages_history_in_rocksdb {
return Ok(EitherReader::RocksDB(_rocksdb_tx));
}

Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::StoragesHistory>()?,
PhantomData,
))
}

/// Creates a new [`EitherReader`] for transaction hash numbers based on storage settings.
pub fn new_transaction_hash_numbers<P>(
provider: &P,
_rocksdb_tx: RocksTxRefArg<'a>,
) -> ProviderResult<EitherReaderTy<'a, P, tables::TransactionHashNumbers>>
where
P: DBProvider + NodePrimitivesProvider + StorageSettingsCache,
P::Tx: DbTx,
{
#[cfg(all(unix, feature = "rocksdb"))]
if provider.cached_storage_settings().transaction_hash_numbers_in_rocksdb {
return Ok(EitherReader::RocksDB(_rocksdb_tx));
}

Ok(EitherReader::Database(
provider.tx_ref().cursor_read::<tables::TransactionHashNumbers>()?,
PhantomData,
))
}
}

impl<CURSOR, N: NodePrimitives> EitherReader<CURSOR, N>
impl<CURSOR, N: NodePrimitives> EitherReader<'_, CURSOR, N>
where
CURSOR: DbCursorRO<tables::TransactionSenders>,
{
Expand All @@ -250,11 +370,11 @@ where
range: Range<TxNumber>,
) -> ProviderResult<HashMap<TxNumber, Address>> {
match self {
Self::Database(cursor) => cursor
Self::Database(cursor, _) => cursor
.walk_range(range)?
.map(|result| result.map_err(ProviderError::from))
.collect::<ProviderResult<HashMap<_, _>>>(),
Self::StaticFile(provider) => range
Self::StaticFile(provider, _) => range
.clone()
.zip(provider.fetch_range_iter(
StaticFileSegment::TransactionSenders,
Expand All @@ -266,6 +386,8 @@ where
Some(result.map(|sender| (tx_num, sender)))
})
.collect::<ProviderResult<HashMap<_, _>>>(),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(_) => Err(ProviderError::UnsupportedProvider),
}
}
}
Expand All @@ -277,6 +399,8 @@ pub enum EitherWriterDestination {
Database,
/// Write to static file
StaticFile,
/// Write to `RocksDB`
RocksDB,
}

impl EitherWriterDestination {
Expand Down Expand Up @@ -336,9 +460,9 @@ mod tests {
let provider = factory.database_provider_ro().unwrap();
let mut reader = EitherReader::new_senders(&provider).unwrap();
if transaction_senders_in_static_files {
assert!(matches!(reader, EitherReader::StaticFile(_)));
assert!(matches!(reader, EitherReader::StaticFile(_, _)));
} else {
assert!(matches!(reader, EitherReader::Database(_)));
assert!(matches!(reader, EitherReader::Database(_, _)));
}

assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ pub use consistent::ConsistentProvider;
// RocksDB currently only supported on Unix platforms
// Windows support is planned for future releases
#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb;
pub(crate) mod rocksdb;
#[cfg(all(unix, feature = "rocksdb"))]
pub use rocksdb::{RocksDBBuilder, RocksDBProvider};
pub use rocksdb::{RocksDBBuilder, RocksDBProvider, RocksTx};

/// Helper trait to bound [`NodeTypes`] so that combined with database they satisfy
/// [`ProviderNodeTypes`].
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/provider/src/providers/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

mod metrics;
mod provider;
pub use provider::{RocksDBBuilder, RocksDBProvider};
pub use provider::{RocksDBBuilder, RocksDBProvider, RocksTx};
Loading
Loading