Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 16 additions & 29 deletions crates/stages/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use reth_db_api::{
use reth_etl::Collector;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_provider::{
BlockReader, DBProvider, EitherWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StatsReader, StorageSettingsCache,
TransactionsProvider, TransactionsProviderExt,
make_rocksdb_batch_arg, make_rocksdb_provider, register_rocksdb_batch, BlockReader, DBProvider,
EitherWriter, PruneCheckpointReader, PruneCheckpointWriter, RocksDBProviderFactory,
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionsProvider,
TransactionsProviderExt,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
Expand Down Expand Up @@ -158,15 +159,11 @@ where
let append_only =
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();

// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();

// Create writer that routes to either MDBX or RocksDB based on settings
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;

Expand All @@ -187,11 +184,8 @@ where
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}

// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
// Register RocksDB batch for commit at provider level
register_rocksdb_batch(provider, writer);

trace!(target: "sync::stages::transaction_lookup",
total_hashes,
Expand All @@ -217,15 +211,11 @@ where
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);

// Create RocksDB batch if feature is enabled
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();

// Create writer that routes to either MDBX or RocksDB based on settings
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;

let static_file_provider = provider.static_file_provider();
Expand All @@ -248,11 +238,8 @@ where
}
}

// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
// Register RocksDB batch for commit at provider level
register_rocksdb_batch(provider, writer);

Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
Expand Down
101 changes: 78 additions & 23 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,20 +765,27 @@ mod tests {
};
use alloy_genesis::Genesis;
use reth_chainspec::{Chain, ChainSpec, HOLESKY, MAINNET, SEPOLIA};
use reth_db::DatabaseEnv;
use reth_db_api::{
cursor::DbCursorRO,
models::{storage_sharded_key::StorageShardedKey, IntegerList, ShardedKey},
table::{Table, TableRow},
transaction::DbTx,
Database,
tables,
};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
ProviderFactory, RocksDBProviderFactory,
};
use std::{collections::BTreeMap, sync::Arc};

#[cfg(not(feature = "edge"))]
use reth_db::DatabaseEnv;
#[cfg(not(feature = "edge"))]
use reth_db_api::{
cursor::DbCursorRO,
table::{Table, TableRow},
transaction::DbTx,
Database,
};

#[cfg(not(feature = "edge"))]
fn collect_table_entries<DB, T>(
tx: &<DB as Database>::TX,
) -> Result<Vec<TableRow<T>>, InitStorageError>
Expand Down Expand Up @@ -875,26 +882,74 @@ mod tests {
let factory = create_test_provider_factory_with_chain_spec(chain_spec);
init_genesis(&factory).unwrap();

let provider = factory.provider().unwrap();
// In edge mode, history indices are written to RocksDB instead of MDBX
#[cfg(feature = "edge")]
{
let rocksdb = factory.rocksdb_provider();

let tx = provider.tx_ref();
let account_history: Vec<_> = rocksdb
.iter::<tables::AccountsHistory>()
.expect("failed to iterate")
.collect::<Result<Vec<_>, _>>()
.expect("failed to collect");

assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
.expect("failed to collect"),
vec![
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap())
],
);
assert_eq!(
account_history,
vec![
(
ShardedKey::new(address_with_balance, u64::MAX),
IntegerList::new([0]).unwrap()
),
(
ShardedKey::new(address_with_storage, u64::MAX),
IntegerList::new([0]).unwrap()
)
],
);

assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
.expect("failed to collect"),
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
let storage_history: Vec<_> = rocksdb
.iter::<tables::StoragesHistory>()
.expect("failed to iterate")
.collect::<Result<Vec<_>, _>>()
.expect("failed to collect");

assert_eq!(
storage_history,
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
}

#[cfg(not(feature = "edge"))]
{
let provider = factory.provider().unwrap();
let tx = provider.tx_ref();

assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
.expect("failed to collect"),
vec![
(
ShardedKey::new(address_with_balance, u64::MAX),
IntegerList::new([0]).unwrap()
),
(
ShardedKey::new(address_with_storage, u64::MAX),
IntegerList::new([0]).unwrap()
)
],
);

assert_eq!(
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
.expect("failed to collect"),
vec![(
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
IntegerList::new([0]).unwrap()
)],
);
}
}
}
Loading