diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 087a040f795..7a9455ecbb2 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -10,9 +10,10 @@ use reth_db_api::{ use reth_etl::Collector; use reth_primitives_traits::{NodePrimitives, SignedTransaction}; use reth_provider::{ - BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter, - RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache, - TransactionsProvider, TransactionsProviderExt, + make_rocksdb_batch_arg, make_rocksdb_provider, register_rocksdb_batch, BlockReader, DBProvider, + EitherWriter, PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory, + StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionsProvider, + TransactionsProviderExt, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ @@ -158,15 +159,11 @@ where let append_only = provider.count_entries::()?.is_zero(); - // Create RocksDB batch if feature is enabled - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb = provider.rocksdb_provider(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_batch = rocksdb.batch(); - #[cfg(not(all(unix, feature = "rocksdb")))] - let rocksdb_batch = (); - // Create writer that routes to either MDBX or RocksDB based on settings + #[allow(clippy::let_unit_value)] + let rocksdb = make_rocksdb_provider(provider); + #[allow(clippy::let_unit_value)] + let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb); let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; @@ -187,11 +184,8 @@ where writer.put_transaction_hash_number(hash, tx_num, append_only)?; } - // Extract and register RocksDB batch for commit at provider level - #[cfg(all(unix, feature = "rocksdb"))] - if let Some(batch) = writer.into_raw_rocksdb_batch() { - provider.set_pending_rocksdb_batch(batch); - } + // Register RocksDB batch for commit at provider level + register_rocksdb_batch(provider, writer); trace!(target: "sync::stages::transaction_lookup", total_hashes, @@ -217,15 +211,11 @@ where ) -> Result { let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); - // Create RocksDB batch if feature is enabled - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb = provider.rocksdb_provider(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_batch = rocksdb.batch(); - #[cfg(not(all(unix, feature = "rocksdb")))] - let rocksdb_batch = (); - // Create writer that routes to either MDBX or RocksDB based on settings + #[allow(clippy::let_unit_value)] + let rocksdb = make_rocksdb_provider(provider); + #[allow(clippy::let_unit_value)] + let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb); let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; let static_file_provider = provider.static_file_provider(); @@ -248,11 +238,8 @@ where } } - // Extract and register RocksDB batch for commit at provider level - #[cfg(all(unix, feature = "rocksdb"))] - if let Some(batch) = writer.into_raw_rocksdb_batch() { - provider.set_pending_rocksdb_batch(batch); - } + // Register RocksDB batch for commit at provider level + register_rocksdb_batch(provider, writer); Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index c82025970b7..63d500ac6fb 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -765,13 +765,9 @@ mod tests { }; use alloy_genesis::Genesis; use reth_chainspec::{Chain, ChainSpec, HOLESKY, MAINNET, SEPOLIA}; - use reth_db::DatabaseEnv; use reth_db_api::{ - cursor::DbCursorRO, models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey}, - table::{Table, TableRow}, - transaction::DbTx, - Database, + tables, }; use reth_provider::{ test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB}, @@ -779,6 +775,17 @@ mod tests { }; use std::{collections::BTreeMap, sync::Arc}; + #[cfg(not(feature = "edge"))] + use reth_db::DatabaseEnv; + #[cfg(not(feature = "edge"))] + use reth_db_api::{ + cursor::DbCursorRO, + table::{Table, TableRow}, + transaction::DbTx, + Database, + }; + + #[cfg(not(feature = "edge"))] fn collect_table_entries( tx: &::TX, ) -> Result>, InitStorageError> @@ -875,26 +882,74 @@ mod tests { let factory = create_test_provider_factory_with_chain_spec(chain_spec); init_genesis(&factory).unwrap(); - let provider = factory.provider().unwrap(); + // In edge mode, history indices are written to RocksDB instead of MDBX + #[cfg(feature = "edge")] + { + let rocksdb = factory.rocksdb_provider(); - let tx = provider.tx_ref(); + let account_history: Vec<_> = rocksdb + .iter::() + .expect("failed to iterate") + .collect::, _>>() + .expect("failed to collect"); - assert_eq!( - collect_table_entries::, tables::AccountsHistory>(tx) - .expect("failed to collect"), - vec![ - (ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()), - (ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap()) - ], - ); + assert_eq!( + account_history, + vec![ + ( + ShardedKey::new(address_with_balance, u64::MAX), + IntegerList::new([0]).unwrap() + ), + ( + ShardedKey::new(address_with_storage, u64::MAX), + IntegerList::new([0]).unwrap() + ) + ], + ); - assert_eq!( - collect_table_entries::, tables::StoragesHistory>(tx) - .expect("failed to collect"), - vec![( - StorageShardedKey::new(address_with_storage, storage_key, u64::MAX), - IntegerList::new([0]).unwrap() - )], - ); + let storage_history: Vec<_> = rocksdb + .iter::() + .expect("failed to iterate") + .collect::, _>>() + .expect("failed to collect"); + + assert_eq!( + storage_history, + vec![( + StorageShardedKey::new(address_with_storage, storage_key, u64::MAX), + IntegerList::new([0]).unwrap() + )], + ); + } + + #[cfg(not(feature = "edge"))] + { + let provider = factory.provider().unwrap(); + let tx = provider.tx_ref(); + + assert_eq!( + collect_table_entries::, tables::AccountsHistory>(tx) + .expect("failed to collect"), + vec![ + ( + ShardedKey::new(address_with_balance, u64::MAX), + IntegerList::new([0]).unwrap() + ), + ( + ShardedKey::new(address_with_storage, u64::MAX), + IntegerList::new([0]).unwrap() + ) + ], + ); + + assert_eq!( + collect_table_entries::, tables::StoragesHistory>(tx) + .expect("failed to collect"), + vec![( + StorageShardedKey::new(address_with_storage, storage_key, u64::MAX), + IntegerList::new([0]).unwrap() + )], + ); + } } }