diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index aeb1b909c5f..a6d3c6b5b22 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -10,7 +10,7 @@ readme = "README.md" reth-codecs = { path = "../storage/codecs" } reth-primitives = { path = "../primitives" } reth-rpc-types = { path = "../rpc/rpc-types" } -reth-network-api = { path = "../net/network-api"} +reth-network-api = { path = "../net/network-api" } revm-primitives = "1.0" async-trait = "0.1.57" thiserror = "1.0.37" @@ -26,16 +26,24 @@ futures = "0.3.25" tokio-stream = "0.1.11" rand = "0.8.5" arbitrary = { version = "1.1.7", features = ["derive"], optional = true } -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +], optional = true } modular-bitfield = "0.11.2" [dev-dependencies] reth-db = { path = "../storage/db", features = ["test-utils"] } tokio = { version = "1.21.2", features = ["full"] } tokio-stream = { version = "0.1.11", features = ["sync"] } -arbitrary = { version = "1.1.7", features = ["derive"]} +arbitrary = { version = "1.1.7", features = ["derive"] } hex-literal = "0.3" -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +] } [features] bench = [] diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 5e756e668da..7d5fd582a4b 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -1,9 +1,10 @@ -use rand::{distributions::uniform::SampleRange, thread_rng, Rng}; +use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng}; use reth_primitives::{ - proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction, - TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, + proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry, + Transaction, TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, }; use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey}; +use std::{collections::BTreeMap, ops::Sub}; // TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the // relevant crates? @@ -165,6 +166,115 @@ pub fn random_block_range( blocks } +type Transition = Vec<(Address, Account, Vec)>; +type AccountState = (Account, Vec); + +/// Generate a range of transitions for given blocks and accounts. +/// Assumes all accounts start with an empty storage. +/// +/// Returns a Vec of account and storage changes for each transition, +/// along with the final state of all accounts and storages. +pub fn random_transition_range<'a, IBlk, IAcc>( + blocks: IBlk, + accounts: IAcc, + n_changes: std::ops::Range, + key_range: std::ops::Range, +) -> (Vec, BTreeMap) +where + IBlk: IntoIterator, + IAcc: IntoIterator))>, +{ + let mut rng = rand::thread_rng(); + let mut state: BTreeMap<_, _> = accounts + .into_iter() + .map(|(addr, (acc, st))| (addr, (acc, st.into_iter().map(|e| (e.key, e.value)).collect()))) + .collect(); + + let valid_addresses = state.keys().copied().collect(); + + let num_transitions: usize = blocks.into_iter().map(|block| block.body.len()).sum(); + let mut transitions = Vec::with_capacity(num_transitions); + + (0..num_transitions).for_each(|i| { + let mut transition = Vec::new(); + let (from, to, mut transfer, new_entries) = + random_account_change(&valid_addresses, n_changes.clone(), key_range.clone()); + + // extract from sending account + let (prev_from, _) = state.get_mut(&from).unwrap(); + transition.push((from, *prev_from, Vec::new())); + + transfer = transfer.min(prev_from.balance).max(U256::from(1)); + prev_from.balance = prev_from.balance.wrapping_sub(transfer); + + // deposit in receiving account and update storage + let (prev_to, storage): &mut (Account, BTreeMap) = state.get_mut(&to).unwrap(); + + let old_entries = new_entries + .into_iter() + .filter_map(|entry| { + let old = if entry.value != U256::ZERO { + storage.insert(entry.key, entry.value) + } else { + let old = storage.remove(&entry.key); + if matches!(old, Some(U256::ZERO)) { + return None + } + old + }; + Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry }) + }) + .collect(); + + transition.push((to, *prev_to, old_entries)); + + prev_to.balance = prev_to.balance.wrapping_add(transfer); + + transitions.push(transition); + }); + + let final_state = state + .into_iter() + .map(|(addr, (acc, storage))| { + (addr, (acc, storage.into_iter().map(|v| v.into()).collect())) + }) + .collect(); + (transitions, final_state) +} + +/// Generate a random account change. +/// +/// Returns two addresses, a balance_change, and a Vec of new storage entries. +pub fn random_account_change( + valid_addresses: &Vec
, + n_changes: std::ops::Range, + key_range: std::ops::Range, +) -> (Address, Address, U256, Vec) { + let mut rng = rand::thread_rng(); + let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).cloned(); + + let addr_from = addresses.next().unwrap_or_else(Address::random); + let addr_to = addresses.next().unwrap_or_else(Address::random); + + let balance_change = U256::from(rng.gen::()); + + let storage_changes = (0..n_changes.sample_single(&mut rng)) + .map(|_| random_storage_entry(key_range.clone())) + .collect(); + + (addr_from, addr_to, balance_change, storage_changes) +} + +/// Generate a random storage change. +pub fn random_storage_entry(key_range: std::ops::Range) -> StorageEntry { + let mut rng = rand::thread_rng(); + + let key = H256::from_low_u64_be(key_range.sample_single(&mut rng)); + let value = U256::from(rng.gen::()); + + StorageEntry { key, value } +} + /// Generate random Externaly Owned Account (EOA account without contract). pub fn random_eoa_account() -> (Address, Account) { let nonce: u64 = rand::random(); diff --git a/crates/primitives/src/storage.rs b/crates/primitives/src/storage.rs index f64554c88c4..67e027c34dc 100644 --- a/crates/primitives/src/storage.rs +++ b/crates/primitives/src/storage.rs @@ -12,6 +12,12 @@ pub struct StorageEntry { pub value: U256, } +impl From<(H256, U256)> for StorageEntry { + fn from((key, value): (H256, U256)) -> Self { + StorageEntry { key, value } + } +} + // NOTE: Removing main_codec and manually encode subkey // and compress second part of the value. If we have compression // over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 4556f18e5ce..46fb2cbdcfc 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -2,21 +2,36 @@ use criterion::{ async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, }; -use reth_db::mdbx::{Env, WriteMap}; -use reth_primitives::H256; +use itertools::concat; +use reth_db::{ + cursor::DbCursorRO, + mdbx::{Env, WriteMap}, + tables, + transaction::{DbTx, DbTxMut}, +}; +use reth_interfaces::test_utils::generators::{ + random_block_range, random_contract_account_range, random_eoa_account_range, + random_transition_range, +}; +use reth_primitives::{Account, Address, SealedBlock, H256}; use reth_stages::{ - stages::{SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, + stages::{ + AccountHashingStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, + TotalDifficultyStage, TransactionLookupStage, + }, test_utils::TestTransaction, - ExecInput, Stage, StageId, UnwindInput, + DBTrieLoader, ExecInput, Stage, StageId, UnwindInput, +}; +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, }; -use std::path::{Path, PathBuf}; -criterion_group!(benches, tx_lookup, senders, total_difficulty); +criterion_group!(benches, tx_lookup, senders, total_difficulty, merkle); criterion_main!(benches); fn senders(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - // don't need to run each stage for that many times group.sample_size(10); @@ -25,19 +40,26 @@ fn senders(c: &mut Criterion) { let mut stage = SenderRecoveryStage::default(); stage.commit_threshold = num_blocks; let label = format!("SendersRecovery-batch-{batch}"); - measure_stage(&mut group, stage, num_blocks, label); + + measure_stage(&mut group, 0..num_blocks + 1, stage_unwind, stage, label); } } fn tx_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - // don't need to run each stage for that many times group.sample_size(10); let num_blocks = 10_000; let stage = TransactionLookupStage::new(num_blocks); - measure_stage(&mut group, stage, num_blocks, "TransactionLookup".to_string()); + + measure_stage( + &mut group, + 0..num_blocks + 1, + stage_unwind, + stage, + "TransactionLookup".to_string(), + ); } fn total_difficulty(c: &mut Criterion) { @@ -49,34 +71,103 @@ fn total_difficulty(c: &mut Criterion) { let num_blocks = 10_000; let stage = TotalDifficultyStage::default(); - measure_stage(&mut group, stage, num_blocks, "TotalDifficulty".to_string()); + + measure_stage( + &mut group, + 0..num_blocks + 1, + stage_unwind, + stage, + "TotalDifficulty".to_string(), + ); +} + +fn merkle(c: &mut Criterion) { + let mut group = c.benchmark_group("Stages"); + // don't need to run each stage for that many times + group.sample_size(10); + + let num_blocks = 10_000; + + let stage = MerkleStage::Both { clean_threshold: num_blocks + 1 }; + measure_stage( + &mut group, + 1..num_blocks + 1, + unwind_hashes, + stage, + "Merkle-incremental".to_string(), + ); + + let stage = MerkleStage::Both { clean_threshold: 0 }; + measure_stage( + &mut group, + 1..num_blocks + 1, + unwind_hashes, + stage, + "Merkle-fullhash".to_string(), + ); +} + +fn stage_unwind>>( + stage: S, + tx: &TestTransaction, + _exec_input: ExecInput, +) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + // Clear previous run + stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + db_tx.commit().unwrap(); + }); +} + +fn unwind_hashes>>( + stage: S, + tx: &TestTransaction, + exec_input: ExecInput, +) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + StorageHashingStage::default().unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + AccountHashingStage::default().unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + // Clear previous run + stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + AccountHashingStage::default().execute(&mut db_tx, exec_input).await.unwrap(); + StorageHashingStage::default().execute(&mut db_tx, exec_input).await.unwrap(); + + db_tx.commit().unwrap(); + }); } -fn measure_stage>>( +fn measure_stage( group: &mut BenchmarkGroup, + block_interval: std::ops::Range, + setup: F, stage: S, - num_blocks: u64, label: String, -) { - let path = txs_testdata(num_blocks as usize); +) where + S: Clone + Stage>, + F: Fn(S, &TestTransaction, ExecInput), +{ + let path = txs_testdata(block_interval.end - 1); let tx = TestTransaction::new(&path); - let mut input = ExecInput::default(); - input.previous_stage = Some((StageId("Another"), num_blocks)); + let input = ExecInput { + previous_stage: Some((StageId("Another"), block_interval.end - 1)), + stage_progress: Some(block_interval.start), + }; group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - tokio::runtime::Runtime::new().unwrap().block_on(async { - let mut stage = stage.clone(); - let mut db_tx = tx.inner(); - - // Clear previous run - stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); - - db_tx.commit().unwrap(); - }); + setup(stage.clone(), &tx, input) }, |_| async { let mut stage = stage.clone(); @@ -88,32 +179,76 @@ fn measure_stage>>( }); } -use reth_interfaces::test_utils::generators::random_block_range; - -// Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). -// Returns the path to the database file and the number of blocks written. -fn txs_testdata(num_blocks: usize) -> PathBuf { +// Helper for generating testdata for the benchmarks. +// Returns the path to the database file. +fn txs_testdata(num_blocks: u64) -> PathBuf { let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); let txs_range = 100..150; + // number of storage changes per transition + let n_changes = 0..3; + + // range of possible values for a storage key + let key_range = 0..300; + + // number of accounts + let n_eoa = 131; + let n_contract = 31; + if !path.exists() { // create the dirs std::fs::create_dir_all(&path).unwrap(); println!("Transactions testdata not found, generating to {:?}", path.display()); let tx = TestTransaction::new(&path); - // This takes a while because it does sig recovery internally - let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); + let accounts: BTreeMap = concat([ + random_eoa_account_range(&mut (0..n_eoa)), + random_contract_account_range(&mut (0..n_contract)), + ]) + .into_iter() + .collect(); + + let mut blocks = random_block_range(0..num_blocks + 1, H256::zero(), txs_range); + + let (transitions, start_state) = random_transition_range( + blocks.iter().take(2), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + n_changes.clone(), + key_range.clone(), + ); + + tx.insert_accounts_and_storages(start_state.clone()).unwrap(); + + // make first block after genesis have valid state root + let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let second_block = blocks.get_mut(1).unwrap(); + let cloned_second = second_block.clone(); + let mut updated_header = cloned_second.header.unseal(); + updated_header.state_root = root; + *second_block = SealedBlock { header: updated_header.seal(), ..cloned_second }; + + let offset = transitions.len() as u64; + + tx.insert_transitions(transitions, None).unwrap(); + + let (transitions, final_state) = + random_transition_range(blocks.iter().skip(2), start_state, n_changes, key_range); + + tx.insert_transitions(transitions, Some(offset)).unwrap(); + + tx.insert_accounts_and_storages(final_state).unwrap(); + + // make last block have valid state root + let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let last_block = blocks.last_mut().unwrap(); + let cloned_last = last_block.clone(); + let mut updated_header = cloned_last.header.unseal(); + updated_header.state_root = root; + *last_block = SealedBlock { header: updated_header.seal(), ..cloned_last }; - // insert all blocks tx.insert_blocks(blocks.iter(), None).unwrap(); - // // initialize TD - use reth_db::{ - cursor::DbCursorRO, - tables, - transaction::{DbTx, DbTxMut}, - }; + // initialize TD tx.commit(|tx| { let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default(); tx.put::(head, reth_primitives::U256::from(0).into()) diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 139bc314d1c..5fdff2fa35c 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -60,6 +60,8 @@ mod util; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; +pub use trie::DBTrieLoader; + /// A re-export of common structs and traits. pub mod prelude; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index d86470bff89..4c942cc99d6 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -35,7 +35,7 @@ pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind"); /// - [`AccountHashingStage`][crate::stages::AccountHashingStage] /// - [`StorageHashingStage`][crate::stages::StorageHashingStage] /// - [`MerkleStage::Execution`] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum MerkleStage { /// The execution portion of the merkle stage. Execution { @@ -46,7 +46,9 @@ pub enum MerkleStage { /// The unwind portion of the merkle stage. Unwind, - #[cfg(test)] + /// Able to execute and unwind. Used for tests + #[cfg(any(test, feature = "test-utils"))] + #[allow(missing_docs)] Both { clean_threshold: u64 }, } @@ -69,7 +71,7 @@ impl Stage for MerkleStage { match self { MerkleStage::Execution { .. } => MERKLE_EXECUTION, MerkleStage::Unwind => MERKLE_UNWIND, - #[cfg(test)] + #[cfg(any(test, feature = "test-utils"))] MerkleStage::Both { .. } => unreachable!(), } } @@ -89,7 +91,7 @@ impl Stage for MerkleStage { }) } MerkleStage::Execution { clean_threshold } => *clean_threshold, - #[cfg(test)] + #[cfg(any(test, feature = "test-utils"))] MerkleStage::Both { clean_threshold } => *clean_threshold, }; @@ -175,12 +177,11 @@ mod tests { use assert_matches::assert_matches; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, - models::{AccountBeforeTx, StoredBlockBody}, tables, transaction::{DbTx, DbTxMut}, }; use reth_interfaces::test_utils::generators::{ - random_block, random_block_range, random_contract_account_range, + random_block, random_block_range, random_contract_account_range, random_transition_range, }; use reth_primitives::{keccak256, Account, Address, SealedBlock, StorageEntry, H256, U256}; use std::collections::BTreeMap; @@ -276,12 +277,16 @@ mod tests { let end = input.previous_stage_progress() + 1; let n_accounts = 31; - let mut accounts = random_contract_account_range(&mut (0..n_accounts)); + let accounts = random_contract_account_range(&mut (0..n_accounts)) + .into_iter() + .collect::>(); let SealedBlock { header, body, ommers, withdrawals } = random_block(stage_progress, None, Some(0), None); let mut header = header.unseal(); - header.state_root = self.generate_initial_trie(&accounts)?; + + header.state_root = + self.generate_initial_trie(accounts.iter().map(|(k, v)| (*k, *v)))?; let sealed_head = SealedBlock { header: header.seal_slow(), body, ommers, withdrawals }; let head_hash = sealed_head.hash(); @@ -289,64 +294,18 @@ mod tests { blocks.extend(random_block_range((stage_progress + 1)..end, head_hash, 0..3)); - self.tx.insert_headers(blocks.iter().map(|block| &block.header))?; - - let (mut transition_id, mut tx_id) = (0, 0); - - let mut storages: BTreeMap> = BTreeMap::new(); - - for progress in blocks.iter() { - // Insert last progress data - self.tx.commit(|tx| { - let body = StoredBlockBody { - start_tx_id: tx_id, - tx_count: progress.body.len() as u64, - }; - - progress.body.iter().try_for_each(|transaction| { - tx.put::(transaction.hash(), tx_id)?; - tx.put::(tx_id, transaction.clone())?; - tx.put::(tx_id, transition_id)?; - - // seed account changeset - let (addr, prev_acc) = accounts - .get_mut(rand::random::() % n_accounts as usize) - .unwrap(); - let acc_before_tx = - AccountBeforeTx { address: *addr, info: Some(*prev_acc) }; - - tx.put::(transition_id, acc_before_tx)?; - - prev_acc.nonce += 1; - prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1)); + self.tx.insert_blocks(blocks.iter(), None)?; - let new_entry = StorageEntry { - key: keccak256([rand::random::()]), - value: U256::from(rand::random::() % 30 + 1), - }; - let storage = storages.entry(*addr).or_default(); - let old_value = storage.entry(new_entry.key).or_default(); + let (transitions, final_state) = random_transition_range( + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..3, + 0..256, + ); - tx.put::( - (transition_id, *addr).into(), - StorageEntry { key: new_entry.key, value: *old_value }, - )?; + self.tx.insert_transitions(transitions, None)?; - *old_value = new_entry.value; - - tx_id += 1; - transition_id += 1; - - Ok(()) - })?; - - tx.put::(progress.number, transition_id)?; - tx.put::(progress.number, body) - })?; - } - - self.insert_accounts(&accounts)?; - self.insert_storages(&storages)?; + self.tx.insert_accounts_and_storages(final_state)?; let last_block_number = end - 1; let root = self.state_root()?; @@ -471,9 +430,11 @@ mod tests { pub(crate) fn generate_initial_trie( &self, - accounts: &[(Address, Account)], + accounts: impl IntoIterator, ) -> Result { - self.insert_accounts(accounts)?; + self.tx.insert_accounts_and_storages( + accounts.into_iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))), + )?; let loader = DBTrieLoader::default(); @@ -485,57 +446,6 @@ mod tests { Ok(root) } - pub(crate) fn insert_accounts( - &self, - accounts: &[(Address, Account)], - ) -> Result<(), TestRunnerError> { - for (addr, acc) in accounts.iter() { - self.tx.commit(|tx| { - tx.put::(*addr, *acc)?; - tx.put::(keccak256(addr), *acc)?; - Ok(()) - })?; - } - - Ok(()) - } - - fn insert_storages( - &self, - storages: &BTreeMap>, - ) -> Result<(), TestRunnerError> { - self.tx - .commit(|tx| { - storages.iter().try_for_each(|(&addr, storage)| { - storage.iter().try_for_each(|(&key, &value)| { - let entry = StorageEntry { key, value }; - tx.put::(addr, entry) - }) - })?; - storages - .iter() - .map(|(addr, storage)| { - ( - keccak256(addr), - storage - .iter() - .filter(|(_, &value)| value != U256::ZERO) - .map(|(key, value)| (keccak256(key), value)), - ) - }) - .collect::>() - .into_iter() - .try_for_each(|(addr, storage)| { - storage.into_iter().try_for_each(|(key, &value)| { - let entry = StorageEntry { key, value }; - tx.put::(addr, entry) - }) - })?; - Ok(()) - }) - .map_err(|e| e.into()) - } - fn check_root(&self, previous_stage_progress: u64) -> Result<(), TestRunnerError> { if previous_stage_progress != 0 { let block_root = diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index a040c651386..b32dec25e3f 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -5,15 +5,17 @@ use reth_db::{ tx::Tx, Env, EnvKind, WriteMap, RW, }, - models::StoredBlockBody, + models::{AccountBeforeTx, BlockNumHash, StoredBlockBody}, table::Table, tables, transaction::{DbTx, DbTxMut}, Error as DbError, }; -use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, U256}; +use reth_primitives::{ + keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, U256, +}; use reth_provider::Transaction; -use std::{borrow::Borrow, path::Path, sync::Arc}; +use std::{borrow::Borrow, collections::BTreeMap, path::Path, sync::Arc}; /// The [TestTransaction] is used as an internal /// database for testing stage implementation. @@ -177,23 +179,20 @@ impl TestTransaction { }) } + /// Inserts a single [SealedHeader] into the corresponding tables of the headers stage. + fn insert_header(tx: &mut Tx<'_, RW, WriteMap>, header: &SealedHeader) -> Result<(), DbError> { + tx.put::(header.number, header.hash())?; + tx.put::(header.hash(), header.number)?; + tx.put::(header.number, header.clone().unseal()) + } + /// Insert ordered collection of [SealedHeader] into the corresponding tables /// that are supposed to be populated by the headers stage. pub fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError> where I: Iterator, { - self.commit(|tx| { - let headers = headers.collect::>(); - - for header in headers { - tx.put::(header.number, header.hash())?; - tx.put::(header.hash(), header.number)?; - tx.put::(header.number, header.clone().unseal())?; - } - - Ok(()) - }) + self.commit(|tx| headers.into_iter().try_for_each(|header| Self::insert_header(tx, header))) } /// Inserts total difficulty of headers into the corresponding tables. @@ -204,23 +203,19 @@ impl TestTransaction { I: Iterator, { self.commit(|tx| { - let headers = headers.collect::>(); - let mut td = U256::ZERO; - for header in headers { + headers.into_iter().try_for_each(|header| { + Self::insert_header(tx, header)?; td += header.difficulty; - tx.put::(header.number, td.into())?; - tx.put::(header.number, header.hash())?; - tx.put::(header.hash(), header.number)?; - tx.put::(header.number, header.clone().unseal())?; - } - - Ok(()) + tx.put::(header.number, td.into()) + }) }) } /// Insert ordered collection of [SealedBlock] into corresponding tables. /// Superset functionality of [TestTransaction::insert_headers]. + /// + /// Assumes that there's a single transition for each transaction (i.e. no block rewards). pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option) -> Result<(), DbError> where I: Iterator, @@ -228,12 +223,8 @@ impl TestTransaction { self.commit(|tx| { let mut current_tx_id = tx_offset.unwrap_or_default(); - for block in blocks { - // Insert into header tables. - tx.put::(block.number, block.hash())?; - tx.put::(block.hash(), block.number)?; - tx.put::(block.number, block.header.clone().unseal())?; - + blocks.into_iter().try_for_each(|block| { + Self::insert_header(tx, &block.header)?; // Insert into body tables. tx.put::( block.number, @@ -242,13 +233,71 @@ impl TestTransaction { tx_count: block.body.len() as u64, }, )?; - for body_tx in block.body.clone() { - tx.put::(current_tx_id, body_tx)?; + block.body.iter().try_for_each(|body_tx| { + tx.put::(current_tx_id, current_tx_id)?; + tx.put::(current_tx_id, body_tx.clone())?; current_tx_id += 1; - } - } + Ok(()) + })?; + tx.put::(block.number, current_tx_id) + }) + }) + } - Ok(()) + /// Insert collection of ([Address], [Account]) into corresponding tables. + pub fn insert_accounts_and_storages(&self, accounts: I) -> Result<(), DbError> + where + I: IntoIterator, + S: IntoIterator, + { + self.commit(|tx| { + accounts.into_iter().try_for_each(|(address, (account, storage))| { + let hashed_address = keccak256(address); + + // Insert into account tables. + tx.put::(address, account)?; + tx.put::(hashed_address, account)?; + + // Insert into storage tables. + storage.into_iter().filter(|e| e.value != U256::ZERO).try_for_each(|entry| { + let hashed_entry = StorageEntry { key: keccak256(entry.key), ..entry }; + + tx.put::(address, entry)?; + tx.put::(hashed_address, hashed_entry) + }) + }) + }) + } + + /// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into + /// corresponding tables. + pub fn insert_transitions( + &self, + transitions: I, + transition_offset: Option, + ) -> Result<(), DbError> + where + I: IntoIterator)>>, + { + let offset = transition_offset.unwrap_or_default(); + self.commit(|tx| { + transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| { + changes.into_iter().try_for_each(|(address, old_account, old_storage)| { + let tid = offset + transition_id as u64; + // Insert into account changeset. + tx.put::( + tid, + AccountBeforeTx { address, info: Some(old_account) }, + )?; + + let tid_address = (tid, address).into(); + + // Insert into storage changeset. + old_storage.into_iter().try_for_each(|entry| { + tx.put::(tid_address, entry) + }) + }) + }) }) } } diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index d5f4cb94b80..e692cbece87 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -24,7 +24,7 @@ use std::{ use tracing::*; #[derive(Debug, thiserror::Error)] -pub(crate) enum TrieError { +pub enum TrieError { #[error("Some error occurred: {0}")] InternalError(#[from] cita_trie::TrieError), #[error("The root node wasn't found in the DB")] @@ -190,12 +190,14 @@ impl EthAccount { } } +/// Struct for calculating the root of a merkle patricia tree, +/// while populating the database with intermediate hashes. #[derive(Debug, Default)] -pub(crate) struct DBTrieLoader; +pub struct DBTrieLoader; impl DBTrieLoader { /// Calculates the root of the state trie, saving intermediate hashes in the database. - pub(crate) fn calculate_root( + pub fn calculate_root( &self, tx: &Transaction<'_, DB>, ) -> Result { @@ -255,7 +257,7 @@ impl DBTrieLoader { } /// Calculates the root of the state trie by updating an existing trie. - pub(crate) fn update_root( + pub fn update_root( &self, tx: &Transaction<'_, DB>, root: H256, diff --git a/crates/storage/db/benches/utils.rs b/crates/storage/db/benches/utils.rs index d82eb55ae7e..54b6877ed71 100644 --- a/crates/storage/db/benches/utils.rs +++ b/crates/storage/db/benches/utils.rs @@ -1,3 +1,4 @@ +#[allow(unused_imports)] use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -9,12 +10,15 @@ use reth_db::{ use std::path::Path; /// Path where the DB is initialized for benchmarks. +#[allow(unused)] const BENCH_DB_PATH: &str = "/tmp/reth-benches"; /// Used for RandomRead and RandomWrite benchmarks. +#[allow(unused)] const RANDOM_INDEXES: [usize; 10] = [23, 2, 42, 5, 3, 99, 54, 0, 33, 64]; /// Returns bench vectors in the format: `Vec<(Key, EncodedKey, Value, CompressedValue)>`. +#[allow(unused)] fn load_vectors() -> Vec<(T::Key, bytes::Bytes, T::Value, bytes::Bytes)> where T: Default, @@ -44,6 +48,7 @@ where } /// Sets up a clear database at `bench_db_path`. +#[allow(unused)] fn set_up_db( bench_db_path: &Path, pair: &Vec<(::Key, bytes::Bytes, ::Value, bytes::Bytes)>,