From 8f5ae026d2dd0fb4933610db9e527f68de1e3542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Wed, 8 Feb 2023 15:31:26 -0300 Subject: [PATCH 01/34] Refactor insert_* to remove duplicate code Co-authored-by: lambdaclass-user --- crates/stages/src/test_utils/test_db.rs | 53 +++++++++---------------- 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 17734161719..a4e72c39b95 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -176,25 +176,22 @@ impl TestTransaction { }) } + /// Inserts a single [SealedHeader] into the corresponding tables of the headers stage. + fn insert_header(tx: &mut Tx<'_, RW, WriteMap>, header: &SealedHeader) -> Result<(), DbError> { + let key: BlockNumHash = header.num_hash().into(); + + tx.put::(header.number, header.hash())?; + tx.put::(header.hash(), header.number)?; + tx.put::(key, header.clone().unseal()) + } + /// Insert ordered collection of [SealedHeader] into the corresponding tables /// that are supposed to be populated by the headers stage. pub fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError> where I: Iterator, { - self.commit(|tx| { - let headers = headers.collect::>(); - - for header in headers { - let key: BlockNumHash = header.num_hash().into(); - - tx.put::(header.number, header.hash())?; - tx.put::(header.hash(), header.number)?; - tx.put::(key, header.clone().unseal())?; - } - - Ok(()) - }) + self.commit(|tx| headers.into_iter().try_for_each(|header| Self::insert_header(tx, header))) } /// Inserts total difficulty of headers into the corresponding tables. @@ -205,20 +202,12 @@ impl TestTransaction { I: Iterator, { self.commit(|tx| { - let headers = headers.collect::>(); - let mut td = U256::ZERO; - for header in headers { - let key: BlockNumHash = header.num_hash().into(); - + headers.into_iter().try_for_each(|header| { + Self::insert_header(tx, header)?; td += header.difficulty; - tx.put::(header.num_hash().into(), td.into())?; - tx.put::(header.number, header.hash())?; - tx.put::(header.hash(), header.number)?; - tx.put::(key, header.clone().unseal())?; - } - - Ok(()) + tx.put::(header.num_hash().into(), td.into()) + }) }) } @@ -231,14 +220,9 @@ impl TestTransaction { self.commit(|tx| { let mut current_tx_id = tx_offset.unwrap_or_default(); - for block in blocks { + blocks.into_iter().try_for_each(|block| { + Self::insert_header(tx, &block.header)?; let key: BlockNumHash = block.num_hash().into(); - - // Insert into header tables. - tx.put::(block.number, block.hash())?; - tx.put::(block.hash(), block.number)?; - tx.put::(key, block.header.clone().unseal())?; - // Insert into body tables. tx.put::( key, @@ -251,9 +235,8 @@ impl TestTransaction { tx.put::(current_tx_id, body_tx)?; current_tx_id += 1; } - } - - Ok(()) + Ok(()) + }) }) } } From f35fbdc39b6d5a20c9dedf8acfc2631f53fcc6f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Wed, 8 Feb 2023 19:00:21 -0300 Subject: [PATCH 02/34] Add benchmark for MerkleStage Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 124 +++++++++++++++++++++++++---- 1 file changed, 110 insertions(+), 14 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 4556f18e5ce..151df7b6ab5 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -2,10 +2,14 @@ use criterion::{ async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion, }; -use reth_db::mdbx::{Env, WriteMap}; -use reth_primitives::H256; +use itertools::concat; +use reth_db::{ + mdbx::{Env, WriteMap}, + transaction::DbTx, +}; +use reth_primitives::{StorageEntry, H256}; use reth_stages::{ - stages::{SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, + stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestTransaction, ExecInput, Stage, StageId, UnwindInput, }; @@ -16,28 +20,37 @@ criterion_main!(benches); fn senders(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - + group.measurement_time(std::time::Duration::from_millis(2000)); + group.warm_up_time(std::time::Duration::from_millis(2000)); // don't need to run each stage for that many times group.sample_size(10); for batch in [1000usize, 10_000, 100_000, 250_000] { let num_blocks = 10_000; let mut stage = SenderRecoveryStage::default(); + stage.batch_size = batch; stage.commit_threshold = num_blocks; - let label = format!("SendersRecovery-batch-{batch}"); - measure_stage(&mut group, stage, num_blocks, label); + let label = format!("SendersRecovery-batch-{}", batch); + + let path = txs_testdata(num_blocks as usize); + + measure_stage(&mut group, stage, path, label); } } fn tx_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - + group.measurement_time(std::time::Duration::from_millis(2000)); + group.warm_up_time(std::time::Duration::from_millis(2000)); // don't need to run each stage for that many times group.sample_size(10); let num_blocks = 10_000; let stage = TransactionLookupStage::new(num_blocks); - measure_stage(&mut group, stage, num_blocks, "TransactionLookup".to_string()); + + let path = txs_testdata(num_blocks as usize); + + measure_stage(&mut group, stage, path, "TransactionLookup".to_string()); } fn total_difficulty(c: &mut Criterion) { @@ -49,16 +62,38 @@ fn total_difficulty(c: &mut Criterion) { let num_blocks = 10_000; let stage = TotalDifficultyStage::default(); - measure_stage(&mut group, stage, num_blocks, "TotalDifficulty".to_string()); + + let path = txs_testdata(num_blocks); + + measure_stage(&mut group, stage, path, "TotalDifficulty".to_string()); +} + +fn merkle(c: &mut Criterion) { + let mut group = c.benchmark_group("Stages"); + group.measurement_time(std::time::Duration::from_secs(5)); + group.warm_up_time(std::time::Duration::from_secs(5)); + // don't need to run each stage for that many times + group.sample_size(10); + + let num_blocks = 10_000; + + let path = accs_testdata(num_blocks); + + let num_blocks = 10_000; + + let stage = MerkleStage::Both { clean_threshold: 10_001 }; + measure_stage(&mut group, stage, path, "MerkleStage-incremental".to_string()); + + let stage = MerkleStage::Both { clean_threshold: 0 }; + measure_stage(&mut group, stage, path, "MerkleStage-fullhash".to_string()); } fn measure_stage>>( group: &mut BenchmarkGroup, stage: S, - num_blocks: u64, + path: PathBuf, label: String, ) { - let path = txs_testdata(num_blocks as usize); let tx = TestTransaction::new(&path); let mut input = ExecInput::default(); @@ -81,14 +116,16 @@ fn measure_stage>>( |_| async { let mut stage = stage.clone(); let mut db_tx = tx.inner(); - stage.execute(&mut db_tx, input).await.unwrap(); + stage.execute(&mut db_tx, input.clone()).await.unwrap(); db_tx.commit().unwrap(); }, ) }); } -use reth_interfaces::test_utils::generators::random_block_range; +use reth_interfaces::test_utils::generators::{ + random_block_range, random_eoa_account_range, random_transition, +}; // Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). // Returns the path to the database file and the number of blocks written. @@ -115,7 +152,8 @@ fn txs_testdata(num_blocks: usize) -> PathBuf { transaction::{DbTx, DbTxMut}, }; tx.commit(|tx| { - let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default(); + let (head, _) = + tx.cursor_read::()?.first()?.unwrap_or_default().into(); tx.put::(head, reth_primitives::U256::from(0).into()) }) .unwrap(); @@ -123,3 +161,61 @@ fn txs_testdata(num_blocks: usize) -> PathBuf { path } + +fn accs_testdata(num_blocks: usize) -> PathBuf { + let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("accs-bench"); + let txs_range = 100..150; + + let key_range = 0..300; + + let n_eoa = 131; + let n_contract = 31; + + if !path.exists() { + // create the dirs + std::fs::create_dir_all(&path).unwrap(); + println!("Transactions testdata not found, generating to {:?}", path.display()); + let tx = TestTransaction::new(&path); + + let accounts: BTreeMap<_, _> = concat([ + random_eoa_account_range(&mut (0..n_eoa)), + random_contract_account_range(&mut (0..n_contract)), + ]); + let addresses = accounts.iter().map(|(k, _)| *k).collect(); + + // insert starting accounts + tx.insert_accounts_and_storages( + accounts.iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))), + ); + + let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); + + // insert all blocks + tx.insert_blocks(blocks.iter(), None).unwrap(); + + // TODO: insert account and storage changes + tx.commit(|tx| { + let storage_cursor = tx.cursor_dup_read::()?; + blocks.into_iter().try_for_each(|block| { + let (addr, new_entry) = random_transition(addresses, key_range); + let old_entry = storage_cursor + .seek_by_key_subkey(addr, new_entry.key)? + .and_then(|entry| { + if entry.key != new_entry.key { + None + } else { + storage_cursor.delete_current()?; + Some(entry) + } + }) + .unwrap_or(StorageEntry { value: U256::ZERO, ..new_entry }); + + Ok(()) + }) + }) + + // TODO: insert hashed accounts and storage + } + + path +} From 1f07aac3c3ef7f499ffa55d15081c4d736a64682 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Thu, 9 Feb 2023 18:20:23 -0300 Subject: [PATCH 03/34] Add insert_accounts_and_storages fn Co-authored-by: lambdaclass-user --- .../interfaces/src/test_utils/generators.rs | 26 ++++++- crates/stages/src/stages/merkle.rs | 72 +++++-------------- crates/stages/src/test_utils/test_db.rs | 31 +++++++- 3 files changed, 70 insertions(+), 59 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 0fc42312a19..f774a64440f 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -1,7 +1,7 @@ -use rand::{distributions::uniform::SampleRange, thread_rng, Rng}; +use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng}; use reth_primitives::{ - proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction, - TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, + proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry, + Transaction, TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, }; use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey}; @@ -164,6 +164,26 @@ pub fn random_block_range( blocks } +/// Generate a range of random blocks. +/// +/// The parent hash of the first block +/// in the result will be equal to `head`. +/// +/// See [random_block] for considerations when validating the generated blocks. +pub fn random_transition( + valid_addresses: &Vec
, + key_range: std::ops::Range, + value_range: std::ops::Range, +) -> (Address, U256, StorageEntry) { + let mut rng = rand::thread_rng(); + let address = + valid_addresses.choose(&mut rng).map_or_else(|| H160::random_using(&mut rng), |v| *v); + let transfer = U256::from(rng.gen::()); + let key = H256::from_low_u64_be(key_range.sample_single(&mut rng)); + let value = U256::from(value_range.sample_single(&mut rng)); + (address, transfer, StorageEntry { key, value }) +} + /// Generate random Externaly Owned Account (EOA account without contract). pub fn random_eoa_account() -> (Address, Account) { let nonce: u64 = rand::random(); diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 0302d5edd3c..3d92e1782f1 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -172,6 +172,7 @@ mod tests { TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, }; use assert_matches::assert_matches; + use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, models::{AccountBeforeTx, BlockNumHash, StoredBlockBody}, @@ -346,8 +347,20 @@ mod tests { })?; } - self.insert_accounts(&accounts)?; - self.insert_storages(&storages)?; + self.tx.insert_accounts_and_storages(accounts.iter().map(|(addr, acc)| { + ( + *addr, + ( + *acc, + storages + .entry(*addr) + .or_default() + .iter() + .map(|(&key, &value)| StorageEntry { key, value }) + .collect_vec(), + ), + ) + }))?; let last_numhash = self.tx.inner().get_block_numhash(end - 1).unwrap(); let root = self.state_root()?; @@ -474,7 +487,9 @@ mod tests { &self, accounts: &[(Address, Account)], ) -> Result { - self.insert_accounts(accounts)?; + self.tx.insert_accounts_and_storages( + accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))), + )?; let loader = DBTrieLoader::default(); @@ -486,57 +501,6 @@ mod tests { Ok(root) } - pub(crate) fn insert_accounts( - &self, - accounts: &[(Address, Account)], - ) -> Result<(), TestRunnerError> { - for (addr, acc) in accounts.iter() { - self.tx.commit(|tx| { - tx.put::(*addr, *acc)?; - tx.put::(keccak256(addr), *acc)?; - Ok(()) - })?; - } - - Ok(()) - } - - fn insert_storages( - &self, - storages: &BTreeMap>, - ) -> Result<(), TestRunnerError> { - self.tx - .commit(|tx| { - storages.iter().try_for_each(|(&addr, storage)| { - storage.iter().try_for_each(|(&key, &value)| { - let entry = StorageEntry { key, value }; - tx.put::(addr, entry) - }) - })?; - storages - .iter() - .map(|(addr, storage)| { - ( - keccak256(addr), - storage - .iter() - .filter(|(_, &value)| value != U256::ZERO) - .map(|(key, value)| (keccak256(key), value)), - ) - }) - .collect::>() - .into_iter() - .try_for_each(|(addr, storage)| { - storage.into_iter().try_for_each(|(key, &value)| { - let entry = StorageEntry { key, value }; - tx.put::(addr, entry) - }) - })?; - Ok(()) - }) - .map_err(|e| e.into()) - } - fn check_root(&self, previous_stage_progress: u64) -> Result<(), TestRunnerError> { if previous_stage_progress != 0 { let block_root = diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index a4e72c39b95..981ceaf1f4a 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -11,8 +11,10 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, Error as DbError, }; -use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, U256}; -use std::{borrow::Borrow, path::Path, sync::Arc}; +use reth_primitives::{ + keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, U256, +}; +use std::{borrow::Borrow, collections::BTreeMap, path::Path, sync::Arc}; use crate::db::Transaction; @@ -239,4 +241,29 @@ impl TestTransaction { }) }) } + + /// Insert collection of ([Address], [Account]) into corresponding tables. + pub fn insert_accounts_and_storages(&self, accounts: I) -> Result<(), DbError> + where + I: IntoIterator, + S: IntoIterator, + { + self.commit(|tx| { + accounts.into_iter().try_for_each(|(address, (account, storage))| { + let hashed_address = keccak256(address); + + // Insert into account tables. + tx.put::(address, account)?; + tx.put::(hashed_address, account)?; + + // Insert into storage tables. + storage.into_iter().filter(|e| e.value != U256::ZERO).try_for_each(|entry| { + let hashed_entry = StorageEntry { key: keccak256(entry.key), ..entry }; + + tx.put::(address, entry)?; + tx.put::(hashed_address, hashed_entry) + }) + }) + }) + } } From 7d42600ebf2dfe5824d219522e3fafb0dd3e2494 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Thu, 9 Feb 2023 18:34:13 -0300 Subject: [PATCH 04/34] Add random_transition to merkle_stage Co-authored-by: lambdaclass-user --- .../interfaces/src/test_utils/generators.rs | 4 +-- crates/stages/src/stages/merkle.rs | 31 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index f774a64440f..9eb07cd6b42 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -174,14 +174,14 @@ pub fn random_transition( valid_addresses: &Vec
, key_range: std::ops::Range, value_range: std::ops::Range, -) -> (Address, U256, StorageEntry) { +) -> (Address, StorageEntry) { let mut rng = rand::thread_rng(); let address = valid_addresses.choose(&mut rng).map_or_else(|| H160::random_using(&mut rng), |v| *v); let transfer = U256::from(rng.gen::()); let key = H256::from_low_u64_be(key_range.sample_single(&mut rng)); let value = U256::from(value_range.sample_single(&mut rng)); - (address, transfer, StorageEntry { key, value }) + (address, StorageEntry { key, value }) } /// Generate random Externaly Owned Account (EOA account without contract). diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 3d92e1782f1..8f4782d7a70 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -180,7 +180,7 @@ mod tests { transaction::{DbTx, DbTxMut}, }; use reth_interfaces::test_utils::generators::{ - random_block, random_block_range, random_contract_account_range, + random_block, random_block_range, random_contract_account_range, random_transition, }; use reth_primitives::{keccak256, Account, Address, SealedBlock, StorageEntry, H256, U256}; use std::collections::BTreeMap; @@ -276,12 +276,16 @@ mod tests { let end = input.previous_stage_progress() + 1; let n_accounts = 31; - let mut accounts = random_contract_account_range(&mut (0..n_accounts)); + let mut accounts = random_contract_account_range(&mut (0..n_accounts)) + .into_iter() + .collect::>(); + let addresses = accounts.iter().map(|(addr, _)| *addr).collect_vec(); let SealedBlock { header, body, ommers } = random_block(stage_progress, None, Some(0), None); let mut header = header.unseal(); - header.state_root = self.generate_initial_trie(&accounts)?; + header.state_root = + self.generate_initial_trie(accounts.iter().map(|(k, v)| (*k, *v)))?; let sealed_head = SealedBlock { header: header.seal(), body, ommers }; let head_hash = sealed_head.hash(); @@ -310,27 +314,22 @@ mod tests { tx.put::(tx_id, transaction.clone())?; tx.put::(tx_id, transition_id)?; + let (address, new_entry) = random_transition(&addresses, 0..125, 1..125); + // seed account changeset - let (addr, prev_acc) = accounts - .get_mut(rand::random::() % n_accounts as usize) - .unwrap(); - let acc_before_tx = - AccountBeforeTx { address: *addr, info: Some(*prev_acc) }; + let prev_acc = accounts.get_mut(&address).unwrap(); + let acc_before_tx = AccountBeforeTx { address, info: Some(*prev_acc) }; tx.put::(transition_id, acc_before_tx)?; prev_acc.nonce += 1; prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1)); - let new_entry = StorageEntry { - key: keccak256([rand::random::()]), - value: U256::from(rand::random::() % 30 + 1), - }; - let storage = storages.entry(*addr).or_default(); + let storage = storages.entry(address).or_default(); let old_value = storage.entry(new_entry.key).or_default(); tx.put::( - (transition_id, *addr).into(), + (transition_id, address).into(), StorageEntry { key: new_entry.key, value: *old_value }, )?; @@ -485,10 +484,10 @@ mod tests { pub(crate) fn generate_initial_trie( &self, - accounts: &[(Address, Account)], + accounts: impl IntoIterator, ) -> Result { self.tx.insert_accounts_and_storages( - accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))), )?; let loader = DBTrieLoader::default(); From 301eaedde73c04c27c9ecf611a733454a8d01fa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Thu, 9 Feb 2023 19:25:56 -0300 Subject: [PATCH 05/34] Add accs_testdata Co-authored-by: lambdaclass-user --- .../interfaces/src/test_utils/generators.rs | 13 ++++-------- crates/stages/benches/criterion.rs | 21 +++++++++++-------- crates/stages/src/stages/merkle.rs | 12 ++++++----- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 9eb07cd6b42..57d92c438fc 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -164,23 +164,18 @@ pub fn random_block_range( blocks } -/// Generate a range of random blocks. -/// -/// The parent hash of the first block -/// in the result will be equal to `head`. -/// -/// See [random_block] for considerations when validating the generated blocks. +/// Generate a random storage change. pub fn random_transition( valid_addresses: &Vec
, key_range: std::ops::Range, - value_range: std::ops::Range, ) -> (Address, StorageEntry) { let mut rng = rand::thread_rng(); let address = valid_addresses.choose(&mut rng).map_or_else(|| H160::random_using(&mut rng), |v| *v); - let transfer = U256::from(rng.gen::()); + let key = H256::from_low_u64_be(key_range.sample_single(&mut rng)); - let value = U256::from(value_range.sample_single(&mut rng)); + let value = U256::from(rng.gen::()); + (address, StorageEntry { key, value }) } diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 151df7b6ab5..ece642bf350 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -4,7 +4,10 @@ use criterion::{ }; use itertools::concat; use reth_db::{ + cursor::DbCursorRO, mdbx::{Env, WriteMap}, + models::BlockNumHash, + tables, transaction::DbTx, }; use reth_primitives::{StorageEntry, H256}; @@ -20,17 +23,14 @@ criterion_main!(benches); fn senders(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - group.measurement_time(std::time::Duration::from_millis(2000)); - group.warm_up_time(std::time::Duration::from_millis(2000)); // don't need to run each stage for that many times group.sample_size(10); for batch in [1000usize, 10_000, 100_000, 250_000] { let num_blocks = 10_000; let mut stage = SenderRecoveryStage::default(); - stage.batch_size = batch; stage.commit_threshold = num_blocks; - let label = format!("SendersRecovery-batch-{}", batch); + let label = format!("SendersRecovery-batch-{batch}"); let path = txs_testdata(num_blocks as usize); @@ -40,8 +40,6 @@ fn senders(c: &mut Criterion) { fn tx_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - group.measurement_time(std::time::Duration::from_millis(2000)); - group.warm_up_time(std::time::Duration::from_millis(2000)); // don't need to run each stage for that many times group.sample_size(10); @@ -70,8 +68,6 @@ fn total_difficulty(c: &mut Criterion) { fn merkle(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - group.measurement_time(std::time::Duration::from_secs(5)); - group.warm_up_time(std::time::Duration::from_secs(5)); // don't need to run each stage for that many times group.sample_size(10); @@ -88,7 +84,7 @@ fn merkle(c: &mut Criterion) { measure_stage(&mut group, stage, path, "MerkleStage-fullhash".to_string()); } -fn measure_stage>>( +fn measure_stage>>( group: &mut BenchmarkGroup, stage: S, path: PathBuf, @@ -97,6 +93,13 @@ fn measure_stage>>( let tx = TestTransaction::new(&path); let mut input = ExecInput::default(); + let (BlockNumHash((num_blocks, _)), _) = tx + .inner() + .cursor_read::() + .unwrap() + .last() + .unwrap() + .expect("Headers table should not be empty"); input.previous_stage = Some((StageId("Another"), num_blocks)); group.bench_function(label, move |b| { diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 8f4782d7a70..c911f90ed97 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -34,7 +34,7 @@ pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind"); /// - [`AccountHashingStage`][crate::stages::AccountHashingStage] /// - [`StorageHashingStage`][crate::stages::StorageHashingStage] /// - [`MerkleStage::Execution`] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum MerkleStage { /// The execution portion of the merkle stage. Execution { @@ -45,7 +45,9 @@ pub enum MerkleStage { /// The unwind portion of the merkle stage. Unwind, - #[cfg(test)] + /// Able to execute and unwind. Used for tests + #[cfg(feature = "test-utils")] + #[allow(missing_docs)] Both { clean_threshold: u64 }, } @@ -68,7 +70,7 @@ impl Stage for MerkleStage { match self { MerkleStage::Execution { .. } => MERKLE_EXECUTION, MerkleStage::Unwind => MERKLE_UNWIND, - #[cfg(test)] + #[cfg(feature = "test-utils")] MerkleStage::Both { .. } => unreachable!(), } } @@ -88,7 +90,7 @@ impl Stage for MerkleStage { }) } MerkleStage::Execution { clean_threshold } => *clean_threshold, - #[cfg(test)] + #[cfg(feature = "test-utils")] MerkleStage::Both { clean_threshold } => *clean_threshold, }; @@ -314,7 +316,7 @@ mod tests { tx.put::(tx_id, transaction.clone())?; tx.put::(tx_id, transition_id)?; - let (address, new_entry) = random_transition(&addresses, 0..125, 1..125); + let (address, new_entry) = random_transition(&addresses, 0..125); // seed account changeset let prev_acc = accounts.get_mut(&address).unwrap(); From 62d92d081a72cae77725f170bfa527b6075f7dc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Fri, 10 Feb 2023 13:08:23 -0300 Subject: [PATCH 06/34] Add transition generation helper function Co-authored-by: lambdaclass-user --- crates/interfaces/Cargo.toml | 16 ++- .../interfaces/src/test_utils/generators.rs | 98 +++++++++++++++++-- crates/primitives/src/storage.rs | 6 ++ crates/stages/benches/criterion.rs | 22 ++++- crates/stages/src/stages/merkle.rs | 10 +- 5 files changed, 134 insertions(+), 18 deletions(-) diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index 6ca15c44e09..d9255f5f4cd 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -10,7 +10,7 @@ readme = "README.md" reth-codecs = { path = "../storage/codecs" } reth-primitives = { path = "../primitives" } reth-rpc-types = { path = "../rpc/rpc-types" } -reth-network-api = { path = "../net/network-api"} +reth-network-api = { path = "../net/network-api" } revm-primitives = "1.0" async-trait = "0.1.57" thiserror = "1.0.37" @@ -27,16 +27,24 @@ futures = "0.3.25" tokio-stream = "0.1.11" rand = "0.8.5" arbitrary = { version = "1.1.7", features = ["derive"], optional = true } -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +], optional = true } modular-bitfield = "0.11.2" [dev-dependencies] reth-db = { path = "../storage/db", features = ["test-utils"] } tokio = { version = "1.21.2", features = ["full"] } tokio-stream = { version = "0.1.11", features = ["sync"] } -arbitrary = { version = "1.1.7", features = ["derive"]} +arbitrary = { version = "1.1.7", features = ["derive"] } hex-literal = "0.3" -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +] } [features] bench = [] diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 57d92c438fc..a26af270341 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -1,3 +1,5 @@ +use std::{collections::BTreeMap, ops::Sub}; + use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng}; use reth_primitives::{ proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry, @@ -164,19 +166,103 @@ pub fn random_block_range( blocks } -/// Generate a random storage change. -pub fn random_transition( +/// Generate a range of transitions for given blocks and accounts. +/// Assumes all accounts start with an empty storage. +/// +/// Returns a Vec of account and storage changes for each transition, +/// along with the final state of all accounts and storages. +pub fn random_transition_range( + blocks: IBlk, + accounts: IAcc, + n_changes: std::ops::Range, + key_range: std::ops::Range, +) -> ( + Vec<(Vec<(Address, Account, Vec)>)>, + BTreeMap)>, +) +where + IBlk: IntoIterator, + IAcc: IntoIterator, +{ + let mut rng = rand::thread_rng(); + let mut state: BTreeMap<_, _> = + accounts.into_iter().map(|(addr, acc)| (addr, (acc, BTreeMap::new()))).collect(); + + let valid_addresses = state.iter().map(|(addr, _)| *addr).collect(); + + let num_transitions: usize = blocks.into_iter().map(|block| block.body.len()).sum(); + let mut transitions = Vec::with_capacity(num_transitions); + + (0..num_transitions).for_each(|i| { + let mut transition = Vec::new(); + let (from, to, mut transfer, new_entries) = + random_account_change(&valid_addresses, n_changes.clone(), key_range.clone()); + + // extract from sending account + let (prev_from, _) = state.get_mut(&from).unwrap(); + transition.push((from, *prev_from, Vec::new())); + + transfer = transfer.min(prev_from.balance).max(U256::from(1)); + prev_from.balance = prev_from.balance.wrapping_sub(transfer); + + // deposit in receiving account and update storage + let (prev_to, storage): &mut (Account, BTreeMap) = state.get_mut(&to).unwrap(); + + let old_entries = new_entries + .into_iter() + .map(|entry| { + let old = storage.insert(entry.key, entry.value); + StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry } + }) + .collect(); + + transition.push((from, *prev_to, old_entries)); + + prev_to.balance = prev_to.balance.wrapping_add(transfer); + + transitions.push(transition); + }); + + let final_state = state + .into_iter() + .map(|(addr, (acc, storage))| { + (addr, (acc, storage.into_iter().map(|v| v.into()).collect())) + }) + .collect(); + (transitions, final_state) +} + +/// Generate a random account change. +/// +/// Returns two addresses, a balance_change, and a Vec of new storage entries. +pub fn random_account_change( valid_addresses: &Vec
, + n_changes: std::ops::Range, key_range: std::ops::Range, -) -> (Address, StorageEntry) { +) -> (Address, Address, U256, Vec) { + let mut rng = rand::thread_rng(); + let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).into_iter().cloned(); + + let addr_from = addresses.next().unwrap_or_else(|| Address::random()); + let addr_to = addresses.next().unwrap_or_else(|| Address::random()); + + let balance_change = U256::from(rng.gen::()); + + let storage_changes = (0..n_changes.sample_single(&mut rng)) + .map(|_| random_storage_entry(key_range.clone())) + .collect(); + + (addr_from, addr_to, balance_change, storage_changes) +} + +/// Generate a random storage change. +pub fn random_storage_entry(key_range: std::ops::Range) -> StorageEntry { let mut rng = rand::thread_rng(); - let address = - valid_addresses.choose(&mut rng).map_or_else(|| H160::random_using(&mut rng), |v| *v); let key = H256::from_low_u64_be(key_range.sample_single(&mut rng)); let value = U256::from(rng.gen::()); - (address, StorageEntry { key, value }) + StorageEntry { key, value } } /// Generate random Externaly Owned Account (EOA account without contract). diff --git a/crates/primitives/src/storage.rs b/crates/primitives/src/storage.rs index f64554c88c4..67e027c34dc 100644 --- a/crates/primitives/src/storage.rs +++ b/crates/primitives/src/storage.rs @@ -12,6 +12,12 @@ pub struct StorageEntry { pub value: U256, } +impl From<(H256, U256)> for StorageEntry { + fn from((key, value): (H256, U256)) -> Self { + StorageEntry { key, value } + } +} + // NOTE: Removing main_codec and manually encode subkey // and compress second part of the value. If we have compression // over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index ece642bf350..48762790e24 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -127,7 +127,7 @@ fn measure_stage>>( } use reth_interfaces::test_utils::generators::{ - random_block_range, random_eoa_account_range, random_transition, + random_block_range, random_eoa_account_range, random_storage_entry, }; // Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). @@ -180,7 +180,7 @@ fn accs_testdata(num_blocks: usize) -> PathBuf { println!("Transactions testdata not found, generating to {:?}", path.display()); let tx = TestTransaction::new(&path); - let accounts: BTreeMap<_, _> = concat([ + let accounts: BTreeMap = concat([ random_eoa_account_range(&mut (0..n_eoa)), random_contract_account_range(&mut (0..n_contract)), ]); @@ -196,11 +196,26 @@ fn accs_testdata(num_blocks: usize) -> PathBuf { // insert all blocks tx.insert_blocks(blocks.iter(), None).unwrap(); + let transitions = random_transition_range(blocks.iter(), accounts); + + transition_id = 0; // TODO: insert account and storage changes tx.commit(|tx| { let storage_cursor = tx.cursor_dup_read::()?; blocks.into_iter().try_for_each(|block| { - let (addr, new_entry) = random_transition(addresses, key_range); + tx.put::(tx_id, transition_id)?; + + let (addr, new_entry) = random_storage_entry(addresses, key_range); + + // modify account + let prev_acc = accounts.get_mut(&address).unwrap(); + let acc_before_tx = AccountBeforeTx { address, info: Some(*prev_acc) }; + + prev_acc.nonce += 1; + prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1)); + + tx.put::(transition_id, acc_before_tx)?; + let old_entry = storage_cursor .seek_by_key_subkey(addr, new_entry.key)? .and_then(|entry| { @@ -213,6 +228,7 @@ fn accs_testdata(num_blocks: usize) -> PathBuf { }) .unwrap_or(StorageEntry { value: U256::ZERO, ..new_entry }); + transition_id += 1; Ok(()) }) }) diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index c911f90ed97..473fff5ddb2 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -46,7 +46,7 @@ pub enum MerkleStage { Unwind, /// Able to execute and unwind. Used for tests - #[cfg(feature = "test-utils")] + #[cfg(any(test, feature = "test-utils"))] #[allow(missing_docs)] Both { clean_threshold: u64 }, } @@ -70,7 +70,7 @@ impl Stage for MerkleStage { match self { MerkleStage::Execution { .. } => MERKLE_EXECUTION, MerkleStage::Unwind => MERKLE_UNWIND, - #[cfg(feature = "test-utils")] + #[cfg(any(test, feature = "test-utils"))] MerkleStage::Both { .. } => unreachable!(), } } @@ -90,7 +90,7 @@ impl Stage for MerkleStage { }) } MerkleStage::Execution { clean_threshold } => *clean_threshold, - #[cfg(feature = "test-utils")] + #[cfg(any(test, feature = "test-utils"))] MerkleStage::Both { clean_threshold } => *clean_threshold, }; @@ -182,7 +182,7 @@ mod tests { transaction::{DbTx, DbTxMut}, }; use reth_interfaces::test_utils::generators::{ - random_block, random_block_range, random_contract_account_range, random_transition, + random_block, random_block_range, random_contract_account_range, random_storage_entry, }; use reth_primitives::{keccak256, Account, Address, SealedBlock, StorageEntry, H256, U256}; use std::collections::BTreeMap; @@ -316,7 +316,7 @@ mod tests { tx.put::(tx_id, transaction.clone())?; tx.put::(tx_id, transition_id)?; - let (address, new_entry) = random_transition(&addresses, 0..125); + let (address, new_entry) = random_storage_entry(&addresses, 0..125); // seed account changeset let prev_acc = accounts.get_mut(&address).unwrap(); From dbe7ea71b739f9aad937782d21e4a93c69eebccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Fri, 10 Feb 2023 14:58:55 -0300 Subject: [PATCH 07/34] Add insert_transitions helper function Co-authored-by: lambdaclass-user --- .../interfaces/src/test_utils/generators.rs | 39 +++++---- crates/stages/benches/criterion.rs | 84 ++++++------------- crates/stages/src/stages/merkle.rs | 78 ++--------------- crates/stages/src/test_utils/test_db.rs | 40 +++++++-- 4 files changed, 91 insertions(+), 150 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index a26af270341..7cbbe576302 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -1,11 +1,10 @@ -use std::{collections::BTreeMap, ops::Sub}; - use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng}; use reth_primitives::{ proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry, Transaction, TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, }; use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey}; +use std::{collections::BTreeMap, ops::Sub}; // TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the // relevant crates? @@ -166,29 +165,29 @@ pub fn random_block_range( blocks } +type Transition = Vec<(Address, Account, Vec)>; +type AccountState = (Account, Vec); + /// Generate a range of transitions for given blocks and accounts. /// Assumes all accounts start with an empty storage. /// /// Returns a Vec of account and storage changes for each transition, /// along with the final state of all accounts and storages. -pub fn random_transition_range( +pub fn random_transition_range<'a, IBlk, IAcc>( blocks: IBlk, accounts: IAcc, n_changes: std::ops::Range, key_range: std::ops::Range, -) -> ( - Vec<(Vec<(Address, Account, Vec)>)>, - BTreeMap)>, -) +) -> (Vec, BTreeMap) where - IBlk: IntoIterator, + IBlk: IntoIterator, IAcc: IntoIterator, { let mut rng = rand::thread_rng(); let mut state: BTreeMap<_, _> = accounts.into_iter().map(|(addr, acc)| (addr, (acc, BTreeMap::new()))).collect(); - let valid_addresses = state.iter().map(|(addr, _)| *addr).collect(); + let valid_addresses = state.keys().copied().collect(); let num_transitions: usize = blocks.into_iter().map(|block| block.body.len()).sum(); let mut transitions = Vec::with_capacity(num_transitions); @@ -210,13 +209,21 @@ where let old_entries = new_entries .into_iter() - .map(|entry| { - let old = storage.insert(entry.key, entry.value); - StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry } + .filter_map(|entry| { + let old = if entry.value != U256::ZERO { + storage.insert(entry.key, entry.value) + } else { + let old = storage.remove(&entry.key); + if matches!(old, Some(U256::ZERO)) { + return None + } + old + }; + Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry }) }) .collect(); - transition.push((from, *prev_to, old_entries)); + transition.push((to, *prev_to, old_entries)); prev_to.balance = prev_to.balance.wrapping_add(transfer); @@ -241,10 +248,10 @@ pub fn random_account_change( key_range: std::ops::Range, ) -> (Address, Address, U256, Vec) { let mut rng = rand::thread_rng(); - let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).into_iter().cloned(); + let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).cloned(); - let addr_from = addresses.next().unwrap_or_else(|| Address::random()); - let addr_to = addresses.next().unwrap_or_else(|| Address::random()); + let addr_from = addresses.next().unwrap_or_else(Address::random); + let addr_to = addresses.next().unwrap_or_else(Address::random); let balance_change = U256::from(rng.gen::()); diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 48762790e24..323eed8b688 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -8,17 +8,24 @@ use reth_db::{ mdbx::{Env, WriteMap}, models::BlockNumHash, tables, - transaction::DbTx, + transaction::{DbTx, DbTxMut}, }; -use reth_primitives::{StorageEntry, H256}; +use reth_interfaces::test_utils::generators::{ + random_block_range, random_contract_account_range, random_eoa_account_range, + random_transition_range, +}; +use reth_primitives::{Account, Address, H256}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestTransaction, ExecInput, Stage, StageId, UnwindInput, }; -use std::path::{Path, PathBuf}; +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; -criterion_group!(benches, tx_lookup, senders, total_difficulty); +criterion_group!(benches, tx_lookup, senders, total_difficulty, merkle); criterion_main!(benches); fn senders(c: &mut Criterion) { @@ -75,10 +82,8 @@ fn merkle(c: &mut Criterion) { let path = accs_testdata(num_blocks); - let num_blocks = 10_000; - let stage = MerkleStage::Both { clean_threshold: 10_001 }; - measure_stage(&mut group, stage, path, "MerkleStage-incremental".to_string()); + measure_stage(&mut group, stage, path.clone(), "MerkleStage-incremental".to_string()); let stage = MerkleStage::Both { clean_threshold: 0 }; measure_stage(&mut group, stage, path, "MerkleStage-fullhash".to_string()); @@ -126,10 +131,6 @@ fn measure_stage>>( }); } -use reth_interfaces::test_utils::generators::{ - random_block_range, random_eoa_account_range, random_storage_entry, -}; - // Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). // Returns the path to the database file and the number of blocks written. fn txs_testdata(num_blocks: usize) -> PathBuf { @@ -149,11 +150,6 @@ fn txs_testdata(num_blocks: usize) -> PathBuf { tx.insert_blocks(blocks.iter(), None).unwrap(); // // initialize TD - use reth_db::{ - cursor::DbCursorRO, - tables, - transaction::{DbTx, DbTxMut}, - }; tx.commit(|tx| { let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default().into(); @@ -169,8 +165,13 @@ fn accs_testdata(num_blocks: usize) -> PathBuf { let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("accs-bench"); let txs_range = 100..150; + // number of storage changes per transition + let n_changes = 0..3; + + // range of possible values for a storage key let key_range = 0..300; + // number of accounts let n_eoa = 131; let n_contract = 31; @@ -183,57 +184,20 @@ fn accs_testdata(num_blocks: usize) -> PathBuf { let accounts: BTreeMap = concat([ random_eoa_account_range(&mut (0..n_eoa)), random_contract_account_range(&mut (0..n_contract)), - ]); - let addresses = accounts.iter().map(|(k, _)| *k).collect(); - - // insert starting accounts - tx.insert_accounts_and_storages( - accounts.iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))), - ); + ]) + .into_iter() + .collect(); let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); - // insert all blocks tx.insert_blocks(blocks.iter(), None).unwrap(); - let transitions = random_transition_range(blocks.iter(), accounts); + let (transitions, final_state) = + random_transition_range(blocks.iter(), accounts, n_changes.clone(), key_range.clone()); - transition_id = 0; - // TODO: insert account and storage changes - tx.commit(|tx| { - let storage_cursor = tx.cursor_dup_read::()?; - blocks.into_iter().try_for_each(|block| { - tx.put::(tx_id, transition_id)?; - - let (addr, new_entry) = random_storage_entry(addresses, key_range); - - // modify account - let prev_acc = accounts.get_mut(&address).unwrap(); - let acc_before_tx = AccountBeforeTx { address, info: Some(*prev_acc) }; - - prev_acc.nonce += 1; - prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1)); - - tx.put::(transition_id, acc_before_tx)?; - - let old_entry = storage_cursor - .seek_by_key_subkey(addr, new_entry.key)? - .and_then(|entry| { - if entry.key != new_entry.key { - None - } else { - storage_cursor.delete_current()?; - Some(entry) - } - }) - .unwrap_or(StorageEntry { value: U256::ZERO, ..new_entry }); - - transition_id += 1; - Ok(()) - }) - }) + tx.insert_transitions(transitions).unwrap(); - // TODO: insert hashed accounts and storage + tx.insert_accounts_and_storages(final_state).unwrap(); } path diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 473fff5ddb2..9d2af777ccf 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -174,15 +174,13 @@ mod tests { TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, }; use assert_matches::assert_matches; - use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, - models::{AccountBeforeTx, BlockNumHash, StoredBlockBody}, tables, transaction::{DbTx, DbTxMut}, }; use reth_interfaces::test_utils::generators::{ - random_block, random_block_range, random_contract_account_range, random_storage_entry, + random_block, random_block_range, random_contract_account_range, random_transition_range, }; use reth_primitives::{keccak256, Account, Address, SealedBlock, StorageEntry, H256, U256}; use std::collections::BTreeMap; @@ -278,16 +276,17 @@ mod tests { let end = input.previous_stage_progress() + 1; let n_accounts = 31; - let mut accounts = random_contract_account_range(&mut (0..n_accounts)) + let accounts = random_contract_account_range(&mut (0..n_accounts)) .into_iter() .collect::>(); - let addresses = accounts.iter().map(|(addr, _)| *addr).collect_vec(); let SealedBlock { header, body, ommers } = random_block(stage_progress, None, Some(0), None); let mut header = header.unseal(); + header.state_root = self.generate_initial_trie(accounts.iter().map(|(k, v)| (*k, *v)))?; + let sealed_head = SealedBlock { header: header.seal(), body, ommers }; let head_hash = sealed_head.hash(); @@ -295,73 +294,14 @@ mod tests { blocks.extend(random_block_range((stage_progress + 1)..end, head_hash, 0..3)); - self.tx.insert_headers(blocks.iter().map(|block| &block.header))?; - - let (mut transition_id, mut tx_id) = (0, 0); - - let mut storages: BTreeMap> = BTreeMap::new(); - - for progress in blocks.iter() { - // Insert last progress data - self.tx.commit(|tx| { - let key: BlockNumHash = (progress.number, progress.hash()).into(); - - let body = StoredBlockBody { - start_tx_id: tx_id, - tx_count: progress.body.len() as u64, - }; - - progress.body.iter().try_for_each(|transaction| { - tx.put::(transaction.hash(), tx_id)?; - tx.put::(tx_id, transaction.clone())?; - tx.put::(tx_id, transition_id)?; - - let (address, new_entry) = random_storage_entry(&addresses, 0..125); - - // seed account changeset - let prev_acc = accounts.get_mut(&address).unwrap(); - let acc_before_tx = AccountBeforeTx { address, info: Some(*prev_acc) }; + self.tx.insert_blocks(blocks.iter(), None)?; - tx.put::(transition_id, acc_before_tx)?; + let (transitions, final_state) = + random_transition_range(blocks.iter(), accounts, 0..3, 0..256); - prev_acc.nonce += 1; - prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1)); + self.tx.insert_transitions(transitions)?; - let storage = storages.entry(address).or_default(); - let old_value = storage.entry(new_entry.key).or_default(); - - tx.put::( - (transition_id, address).into(), - StorageEntry { key: new_entry.key, value: *old_value }, - )?; - - *old_value = new_entry.value; - - tx_id += 1; - transition_id += 1; - - Ok(()) - })?; - - tx.put::(key.number(), transition_id)?; - tx.put::(key, body) - })?; - } - - self.tx.insert_accounts_and_storages(accounts.iter().map(|(addr, acc)| { - ( - *addr, - ( - *acc, - storages - .entry(*addr) - .or_default() - .iter() - .map(|(&key, &value)| StorageEntry { key, value }) - .collect_vec(), - ), - ) - }))?; + self.tx.insert_accounts_and_storages(final_state)?; let last_numhash = self.tx.inner().get_block_numhash(end - 1).unwrap(); let root = self.state_root()?; diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 981ceaf1f4a..a06c21e476e 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -5,7 +5,7 @@ use reth_db::{ tx::Tx, Env, EnvKind, WriteMap, RW, }, - models::{BlockNumHash, StoredBlockBody}, + models::{AccountBeforeTx, BlockNumHash, StoredBlockBody}, table::Table, tables, transaction::{DbTx, DbTxMut}, @@ -215,6 +215,8 @@ impl TestTransaction { /// Insert ordered collection of [SealedBlock] into corresponding tables. /// Superset functionality of [TestTransaction::insert_headers]. + /// + /// Assumes that there's a single transition for each transaction (i.e. no block rewards). pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option) -> Result<(), DbError> where I: Iterator, @@ -233,11 +235,13 @@ impl TestTransaction { tx_count: block.body.len() as u64, }, )?; - for body_tx in block.body.clone() { - tx.put::(current_tx_id, body_tx)?; + block.body.iter().try_for_each(|body_tx| { + tx.put::(current_tx_id, current_tx_id)?; + tx.put::(current_tx_id, body_tx.clone())?; current_tx_id += 1; - } - Ok(()) + Ok(()) + })?; + tx.put::(key.number(), current_tx_id) }) }) } @@ -266,4 +270,30 @@ impl TestTransaction { }) }) } + + /// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into + /// corresponding tables. + pub fn insert_transitions(&self, transitions: I) -> Result<(), DbError> + where + I: IntoIterator)>>, + { + self.commit(|tx| { + transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| { + changes.into_iter().try_for_each(|(address, old_account, old_storage)| { + // Insert into account changeset. + tx.put::( + transition_id as u64, + AccountBeforeTx { address, info: Some(old_account) }, + )?; + + let tid_address = (transition_id as u64, address).into(); + + // Insert into storage changeset. + old_storage.into_iter().try_for_each(|entry| { + tx.put::(tid_address, entry) + }) + }) + }) + }) + } } From ab90047311392eb53e2faad9cf70f1f51c9f13d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Fri, 10 Feb 2023 18:17:44 -0300 Subject: [PATCH 08/34] Unify *_testdata functions Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 42 ++++++++---------------------- 1 file changed, 11 insertions(+), 31 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 323eed8b688..2aba4834515 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -80,7 +80,7 @@ fn merkle(c: &mut Criterion) { let num_blocks = 10_000; - let path = accs_testdata(num_blocks); + let path = txs_testdata(num_blocks); let stage = MerkleStage::Both { clean_threshold: 10_001 }; measure_stage(&mut group, stage, path.clone(), "MerkleStage-incremental".to_string()); @@ -131,37 +131,9 @@ fn measure_stage>>( }); } -// Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). -// Returns the path to the database file and the number of blocks written. +// Helper for generating testdata for the benchmarks. +// Returns the path to the database file. fn txs_testdata(num_blocks: usize) -> PathBuf { - let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); - let txs_range = 100..150; - - if !path.exists() { - // create the dirs - std::fs::create_dir_all(&path).unwrap(); - println!("Transactions testdata not found, generating to {:?}", path.display()); - let tx = TestTransaction::new(&path); - - // This takes a while because it does sig recovery internally - let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); - - // insert all blocks - tx.insert_blocks(blocks.iter(), None).unwrap(); - - // // initialize TD - tx.commit(|tx| { - let (head, _) = - tx.cursor_read::()?.first()?.unwrap_or_default().into(); - tx.put::(head, reth_primitives::U256::from(0).into()) - }) - .unwrap(); - } - - path -} - -fn accs_testdata(num_blocks: usize) -> PathBuf { let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("accs-bench"); let txs_range = 100..150; @@ -198,6 +170,14 @@ fn accs_testdata(num_blocks: usize) -> PathBuf { tx.insert_transitions(transitions).unwrap(); tx.insert_accounts_and_storages(final_state).unwrap(); + + // initialize TD + tx.commit(|tx| { + let (head, _) = + tx.cursor_read::()?.first()?.unwrap_or_default().into(); + tx.put::(head, reth_primitives::U256::from(0).into()) + }) + .unwrap(); } path From 5260e62b7b717790ebc3659514271e5409a471f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Fri, 10 Feb 2023 18:48:23 -0300 Subject: [PATCH 09/34] Add custom setup for merkle stage Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 71 +++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 2aba4834515..cc956fda1ef 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -16,7 +16,10 @@ use reth_interfaces::test_utils::generators::{ }; use reth_primitives::{Account, Address, H256}; use reth_stages::{ - stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, + stages::{ + AccountHashingStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, + TotalDifficultyStage, TransactionLookupStage, + }, test_utils::TestTransaction, ExecInput, Stage, StageId, UnwindInput, }; @@ -41,7 +44,7 @@ fn senders(c: &mut Criterion) { let path = txs_testdata(num_blocks as usize); - measure_stage(&mut group, stage, path, label); + measure_stage(&mut group, stage_unwind, stage, path, label); } } @@ -55,7 +58,7 @@ fn tx_lookup(c: &mut Criterion) { let path = txs_testdata(num_blocks as usize); - measure_stage(&mut group, stage, path, "TransactionLookup".to_string()); + measure_stage(&mut group, stage_unwind, stage, path, "TransactionLookup".to_string()); } fn total_difficulty(c: &mut Criterion) { @@ -70,7 +73,7 @@ fn total_difficulty(c: &mut Criterion) { let path = txs_testdata(num_blocks); - measure_stage(&mut group, stage, path, "TotalDifficulty".to_string()); + measure_stage(&mut group, stage_unwind, stage, path, "TotalDifficulty".to_string()); } fn merkle(c: &mut Criterion) { @@ -83,18 +86,60 @@ fn merkle(c: &mut Criterion) { let path = txs_testdata(num_blocks); let stage = MerkleStage::Both { clean_threshold: 10_001 }; - measure_stage(&mut group, stage, path.clone(), "MerkleStage-incremental".to_string()); + measure_stage(&mut group, unwind_hashes, stage, path.clone(), "Merkle-incremental".to_string()); let stage = MerkleStage::Both { clean_threshold: 0 }; - measure_stage(&mut group, stage, path, "MerkleStage-fullhash".to_string()); + measure_stage(&mut group, unwind_hashes, stage, path, "Merkle-fullhash".to_string()); } -fn measure_stage>>( +fn stage_unwind>>( + stage: S, + tx: &TestTransaction, + _exec_input: ExecInput, +) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + // Clear previous run + stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + db_tx.commit().unwrap(); + }); +} + +fn unwind_hashes>>( + stage: S, + tx: &TestTransaction, + exec_input: ExecInput, +) { + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + StorageHashingStage::default().unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + AccountHashingStage::default().unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + // Clear previous run + stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + AccountHashingStage::default().execute(&mut db_tx, exec_input).await.unwrap(); + StorageHashingStage::default().execute(&mut db_tx, exec_input).await.unwrap(); + + db_tx.commit().unwrap(); + }); +} + +fn measure_stage( group: &mut BenchmarkGroup, + setup: F, stage: S, path: PathBuf, label: String, -) { +) where + S: Clone + Stage>, + F: Fn(S, &TestTransaction, ExecInput), +{ let tx = TestTransaction::new(&path); let mut input = ExecInput::default(); @@ -111,15 +156,7 @@ fn measure_stage>>( b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - tokio::runtime::Runtime::new().unwrap().block_on(async { - let mut stage = stage.clone(); - let mut db_tx = tx.inner(); - - // Clear previous run - stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); - - db_tx.commit().unwrap(); - }); + setup(stage.clone(), &tx, input.clone()) }, |_| async { let mut stage = stage.clone(); From e13469104050ae13a070b69b53f6c241c839cc68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Mon, 13 Feb 2023 15:49:27 -0300 Subject: [PATCH 10/34] Generate valid state roots in testdata Co-authored-by: lambdaclass-user --- .../interfaces/src/test_utils/generators.rs | 8 +- crates/stages/benches/criterion.rs | 105 ++++++++++++++---- crates/stages/src/lib.rs | 2 + crates/stages/src/stages/merkle.rs | 12 +- crates/stages/src/test_utils/test_db.rs | 12 +- crates/stages/src/trie/mod.rs | 10 +- 6 files changed, 111 insertions(+), 38 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 7cbbe576302..95abe4066be 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -181,11 +181,13 @@ pub fn random_transition_range<'a, IBlk, IAcc>( ) -> (Vec, BTreeMap) where IBlk: IntoIterator, - IAcc: IntoIterator, + IAcc: IntoIterator))>, { let mut rng = rand::thread_rng(); - let mut state: BTreeMap<_, _> = - accounts.into_iter().map(|(addr, acc)| (addr, (acc, BTreeMap::new()))).collect(); + let mut state: BTreeMap<_, _> = accounts + .into_iter() + .map(|(addr, (acc, st))| (addr, (acc, st.into_iter().map(|e| (e.key, e.value)).collect()))) + .collect(); let valid_addresses = state.keys().copied().collect(); diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index f193a158bf6..3b78db5de29 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -13,14 +13,14 @@ use reth_interfaces::test_utils::generators::{ random_block_range, random_contract_account_range, random_eoa_account_range, random_transition_range, }; -use reth_primitives::{Account, Address, H256}; +use reth_primitives::{Account, Address, SealedBlock, H256}; use reth_stages::{ stages::{ AccountHashingStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, }, test_utils::TestTransaction, - ExecInput, Stage, StageId, UnwindInput, + DBTrieLoader, ExecInput, Stage, StageId, UnwindInput, }; use std::{ collections::BTreeMap, @@ -41,9 +41,9 @@ fn senders(c: &mut Criterion) { stage.commit_threshold = num_blocks; let label = format!("SendersRecovery-batch-{batch}"); - let path = txs_testdata(num_blocks as usize); + let path = txs_testdata(num_blocks); - measure_stage(&mut group, stage_unwind, stage, path, label); + measure_stage(&mut group, 0..num_blocks, stage_unwind, stage, path, label); } } @@ -55,9 +55,16 @@ fn tx_lookup(c: &mut Criterion) { let num_blocks = 10_000; let stage = TransactionLookupStage::new(num_blocks); - let path = txs_testdata(num_blocks as usize); + let path = txs_testdata(num_blocks); - measure_stage(&mut group, stage_unwind, stage, path, "TransactionLookup".to_string()); + measure_stage( + &mut group, + 0..num_blocks, + stage_unwind, + stage, + path, + "TransactionLookup".to_string(), + ); } fn total_difficulty(c: &mut Criterion) { @@ -72,7 +79,14 @@ fn total_difficulty(c: &mut Criterion) { let path = txs_testdata(num_blocks); - measure_stage(&mut group, stage_unwind, stage, path, "TotalDifficulty".to_string()); + measure_stage( + &mut group, + 0..num_blocks, + stage_unwind, + stage, + path, + "TotalDifficulty".to_string(), + ); } fn merkle(c: &mut Criterion) { @@ -84,11 +98,25 @@ fn merkle(c: &mut Criterion) { let path = txs_testdata(num_blocks); - let stage = MerkleStage::Both { clean_threshold: 10_001 }; - measure_stage(&mut group, unwind_hashes, stage, path.clone(), "Merkle-incremental".to_string()); + let stage = MerkleStage::Both { clean_threshold: num_blocks + 1 }; + measure_stage( + &mut group, + 1..num_blocks, + unwind_hashes, + stage, + path.clone(), + "Merkle-incremental".to_string(), + ); let stage = MerkleStage::Both { clean_threshold: 0 }; - measure_stage(&mut group, unwind_hashes, stage, path, "Merkle-fullhash".to_string()); + measure_stage( + &mut group, + 1..num_blocks, + unwind_hashes, + stage, + path, + "Merkle-fullhash".to_string(), + ); } fn stage_unwind>>( @@ -131,6 +159,7 @@ fn unwind_hashes>>( fn measure_stage( group: &mut BenchmarkGroup, + block_interval: std::ops::Range, setup: F, stage: S, path: PathBuf, @@ -142,14 +171,9 @@ fn measure_stage( let tx = TestTransaction::new(&path); let mut input = ExecInput::default(); - let (num_blocks, _) = tx - .inner() - .cursor_read::() - .unwrap() - .last() - .unwrap() - .expect("Headers table should not be empty"); - input.previous_stage = Some((StageId("Another"), num_blocks)); + + input.previous_stage = Some((StageId("Another"), block_interval.end - 1)); + input.stage_progress = Some(block_interval.start); group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( @@ -169,7 +193,7 @@ fn measure_stage( // Helper for generating testdata for the benchmarks. // Returns the path to the database file. -fn txs_testdata(num_blocks: usize) -> PathBuf { +fn txs_testdata(num_blocks: u64) -> PathBuf { let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("accs-bench"); let txs_range = 100..150; @@ -196,17 +220,50 @@ fn txs_testdata(num_blocks: usize) -> PathBuf { .into_iter() .collect(); - let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); + let mut blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); - tx.insert_blocks(blocks.iter(), None).unwrap(); + let (transitions, start_state) = random_transition_range( + blocks.iter().take(2), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + n_changes.clone(), + key_range.clone(), + ); + + tx.insert_accounts_and_storages(start_state.clone()).unwrap(); + + // make first block after genesis have valid state root + let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let second_block = blocks.get_mut(1).unwrap(); + let cloned_second = second_block.clone(); + let mut updated_header = cloned_second.header.unseal(); + updated_header.state_root = root; + *second_block = SealedBlock { header: updated_header.seal(), ..cloned_second }; - let (transitions, final_state) = - random_transition_range(blocks.iter(), accounts, n_changes.clone(), key_range.clone()); + let offset = transitions.len() as u64; - tx.insert_transitions(transitions).unwrap(); + tx.insert_transitions(transitions, None).unwrap(); + + let (transitions, final_state) = random_transition_range( + blocks.iter().skip(2), + start_state, + n_changes.clone(), + key_range.clone(), + ); + + tx.insert_transitions(transitions, Some(offset)).unwrap(); tx.insert_accounts_and_storages(final_state).unwrap(); + tx.insert_blocks(blocks.iter(), None).unwrap(); + + // make last block have valid state root + let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let last_block = blocks.last_mut().unwrap(); + let cloned_last = last_block.clone(); + let mut updated_header = cloned_last.header.unseal(); + updated_header.state_root = root; + *last_block = SealedBlock { header: updated_header.seal(), ..cloned_last }; + // initialize TD tx.commit(|tx| { let (head, _) = diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index df4fefb125f..2755c422367 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -60,6 +60,8 @@ mod util; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; +pub use trie::DBTrieLoader; + /// A re-export of common structs and traits. pub mod prelude; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index e74d58a80a1..2f4c8ad5874 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -297,10 +297,14 @@ mod tests { self.tx.insert_blocks(blocks.iter(), None)?; - let (transitions, final_state) = - random_transition_range(blocks.iter(), accounts, 0..3, 0..256); - - self.tx.insert_transitions(transitions)?; + let (transitions, final_state) = random_transition_range( + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..3, + 0..256, + ); + + self.tx.insert_transitions(transitions, None)?; self.tx.insert_accounts_and_storages(final_state)?; diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index c104ea977c3..b32dec25e3f 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -271,20 +271,26 @@ impl TestTransaction { /// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into /// corresponding tables. - pub fn insert_transitions(&self, transitions: I) -> Result<(), DbError> + pub fn insert_transitions( + &self, + transitions: I, + transition_offset: Option, + ) -> Result<(), DbError> where I: IntoIterator)>>, { + let offset = transition_offset.unwrap_or_default(); self.commit(|tx| { transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| { changes.into_iter().try_for_each(|(address, old_account, old_storage)| { + let tid = offset + transition_id as u64; // Insert into account changeset. tx.put::( - transition_id as u64, + tid, AccountBeforeTx { address, info: Some(old_account) }, )?; - let tid_address = (transition_id as u64, address).into(); + let tid_address = (tid, address).into(); // Insert into storage changeset. old_storage.into_iter().try_for_each(|entry| { diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index 1530fb04d50..79d61af3ee2 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -24,7 +24,7 @@ use std::{ use tracing::*; #[derive(Debug, thiserror::Error)] -pub(crate) enum TrieError { +pub enum TrieError { #[error("Some error occurred: {0}")] InternalError(#[from] cita_trie::TrieError), #[error("The root node wasn't found in the DB")] @@ -190,12 +190,14 @@ impl EthAccount { } } +/// Struct for calculating the root of a merkle patricia tree, +/// while populating the database with intermediate hashes. #[derive(Debug, Default)] -pub(crate) struct DBTrieLoader; +pub struct DBTrieLoader; impl DBTrieLoader { /// Calculates the root of the state trie, saving intermediate hashes in the database. - pub(crate) fn calculate_root( + pub fn calculate_root( &self, tx: &Transaction<'_, DB>, ) -> Result { @@ -255,7 +257,7 @@ impl DBTrieLoader { } /// Calculates the root of the state trie by updating an existing trie. - pub(crate) fn update_root( + pub fn update_root( &self, tx: &Transaction<'_, DB>, root: H256, From b655cd68ec5058306d4034486dee73ae21f872ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Mon, 13 Feb 2023 18:44:44 -0300 Subject: [PATCH 11/34] Fix compile warnings Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 26 +++++++------------------- crates/storage/db/benches/utils.rs | 5 +++++ 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 3b78db5de29..a7bf6fe4dd7 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -41,9 +41,7 @@ fn senders(c: &mut Criterion) { stage.commit_threshold = num_blocks; let label = format!("SendersRecovery-batch-{batch}"); - let path = txs_testdata(num_blocks); - - measure_stage(&mut group, 0..num_blocks, stage_unwind, stage, path, label); + measure_stage(&mut group, 0..num_blocks + 1, stage_unwind, stage, label); } } @@ -55,14 +53,11 @@ fn tx_lookup(c: &mut Criterion) { let num_blocks = 10_000; let stage = TransactionLookupStage::new(num_blocks); - let path = txs_testdata(num_blocks); - measure_stage( &mut group, - 0..num_blocks, + 0..num_blocks + 1, stage_unwind, stage, - path, "TransactionLookup".to_string(), ); } @@ -77,14 +72,11 @@ fn total_difficulty(c: &mut Criterion) { let num_blocks = 10_000; let stage = TotalDifficultyStage::default(); - let path = txs_testdata(num_blocks); - measure_stage( &mut group, - 0..num_blocks, + 0..num_blocks + 1, stage_unwind, stage, - path, "TotalDifficulty".to_string(), ); } @@ -96,25 +88,21 @@ fn merkle(c: &mut Criterion) { let num_blocks = 10_000; - let path = txs_testdata(num_blocks); - let stage = MerkleStage::Both { clean_threshold: num_blocks + 1 }; measure_stage( &mut group, - 1..num_blocks, + 1..num_blocks + 1, unwind_hashes, stage, - path.clone(), "Merkle-incremental".to_string(), ); let stage = MerkleStage::Both { clean_threshold: 0 }; measure_stage( &mut group, - 1..num_blocks, + 1..num_blocks + 1, unwind_hashes, stage, - path, "Merkle-fullhash".to_string(), ); } @@ -162,12 +150,12 @@ fn measure_stage( block_interval: std::ops::Range, setup: F, stage: S, - path: PathBuf, label: String, ) where S: Clone + Stage>, F: Fn(S, &TestTransaction, ExecInput), { + let path = txs_testdata(block_interval.end - 1); let tx = TestTransaction::new(&path); let mut input = ExecInput::default(); @@ -220,7 +208,7 @@ fn txs_testdata(num_blocks: u64) -> PathBuf { .into_iter() .collect(); - let mut blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); + let mut blocks = random_block_range(0..num_blocks + 1, H256::zero(), txs_range); let (transitions, start_state) = random_transition_range( blocks.iter().take(2), diff --git a/crates/storage/db/benches/utils.rs b/crates/storage/db/benches/utils.rs index d82eb55ae7e..54b6877ed71 100644 --- a/crates/storage/db/benches/utils.rs +++ b/crates/storage/db/benches/utils.rs @@ -1,3 +1,4 @@ +#[allow(unused_imports)] use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -9,12 +10,15 @@ use reth_db::{ use std::path::Path; /// Path where the DB is initialized for benchmarks. +#[allow(unused)] const BENCH_DB_PATH: &str = "/tmp/reth-benches"; /// Used for RandomRead and RandomWrite benchmarks. +#[allow(unused)] const RANDOM_INDEXES: [usize; 10] = [23, 2, 42, 5, 3, 99, 54, 0, 33, 64]; /// Returns bench vectors in the format: `Vec<(Key, EncodedKey, Value, CompressedValue)>`. +#[allow(unused)] fn load_vectors() -> Vec<(T::Key, bytes::Bytes, T::Value, bytes::Bytes)> where T: Default, @@ -44,6 +48,7 @@ where } /// Sets up a clear database at `bench_db_path`. +#[allow(unused)] fn set_up_db( bench_db_path: &Path, pair: &Vec<(::Key, bytes::Bytes, ::Value, bytes::Bytes)>, From 1d768e02347173a9e4cc5a76b9ffd5763da8611b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Mon, 13 Feb 2023 18:51:29 -0300 Subject: [PATCH 12/34] Tidy up some clones Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index a7bf6fe4dd7..b4f9e1861cc 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -158,21 +158,21 @@ fn measure_stage( let path = txs_testdata(block_interval.end - 1); let tx = TestTransaction::new(&path); - let mut input = ExecInput::default(); - - input.previous_stage = Some((StageId("Another"), block_interval.end - 1)); - input.stage_progress = Some(block_interval.start); + let input = ExecInput { + previous_stage: Some((StageId("Another"), block_interval.end - 1)), + stage_progress: Some(block_interval.start), + }; group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - setup(stage.clone(), &tx, input.clone()) + setup(stage.clone(), &tx, input) }, |_| async { let mut stage = stage.clone(); let mut db_tx = tx.inner(); - stage.execute(&mut db_tx, input.clone()).await.unwrap(); + stage.execute(&mut db_tx, input).await.unwrap(); db_tx.commit().unwrap(); }, ) @@ -231,12 +231,8 @@ fn txs_testdata(num_blocks: u64) -> PathBuf { tx.insert_transitions(transitions, None).unwrap(); - let (transitions, final_state) = random_transition_range( - blocks.iter().skip(2), - start_state, - n_changes.clone(), - key_range.clone(), - ); + let (transitions, final_state) = + random_transition_range(blocks.iter().skip(2), start_state, n_changes, key_range); tx.insert_transitions(transitions, Some(offset)).unwrap(); @@ -254,8 +250,7 @@ fn txs_testdata(num_blocks: u64) -> PathBuf { // initialize TD tx.commit(|tx| { - let (head, _) = - tx.cursor_read::()?.first()?.unwrap_or_default().into(); + let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default(); tx.put::(head, reth_primitives::U256::from(0).into()) }) .unwrap(); From de91a79ff7cb5f5cd2ebe0cd104691aa4420b67b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Tue, 14 Feb 2023 12:19:42 -0300 Subject: [PATCH 13/34] Fix: update blocks before insertion Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index b4f9e1861cc..8cc4bbf064e 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -182,7 +182,7 @@ fn measure_stage( // Helper for generating testdata for the benchmarks. // Returns the path to the database file. fn txs_testdata(num_blocks: u64) -> PathBuf { - let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("accs-bench"); + let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); let txs_range = 100..150; // number of storage changes per transition @@ -238,8 +238,6 @@ fn txs_testdata(num_blocks: u64) -> PathBuf { tx.insert_accounts_and_storages(final_state).unwrap(); - tx.insert_blocks(blocks.iter(), None).unwrap(); - // make last block have valid state root let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); let last_block = blocks.last_mut().unwrap(); @@ -248,6 +246,8 @@ fn txs_testdata(num_blocks: u64) -> PathBuf { updated_header.state_root = root; *last_block = SealedBlock { header: updated_header.seal(), ..cloned_last }; + tx.insert_blocks(blocks, None).unwrap(); + // initialize TD tx.commit(|tx| { let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default(); From 07c631535cdc2fa7e7924aad0075ea66dab7416e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s?= Date: Tue, 14 Feb 2023 12:22:21 -0300 Subject: [PATCH 14/34] Fix: add iter() Co-authored-by: lambdaclass-user --- crates/stages/benches/criterion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 8cc4bbf064e..46fb2cbdcfc 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -246,7 +246,7 @@ fn txs_testdata(num_blocks: u64) -> PathBuf { updated_header.state_root = root; *last_block = SealedBlock { header: updated_header.seal(), ..cloned_last }; - tx.insert_blocks(blocks, None).unwrap(); + tx.insert_blocks(blocks.iter(), None).unwrap(); // initialize TD tx.commit(|tx| { From 8eb7e6c30b18bb77ccd3a46268f0d4a1a16fd9af Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 21 Feb 2023 02:45:14 +0000 Subject: [PATCH 15/34] fix removal on trie --- crates/stages/src/trie/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index e692cbece87..045deab20ff 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -122,9 +122,17 @@ where fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_dup_write::()?; + let subkey = H256::from_slice(key); + cursor - .seek_by_key_subkey(self.key, H256::from_slice(key))? - .map(|_| cursor.delete_current()) + .seek_by_key_subkey(self.key, subkey)? + .map(|v| { + if v.hash == subkey { + cursor.delete_current().map_err(|e| TrieError::DatabaseError(e)) + } else { + Err(TrieError::MissingRoot(self.key)) + } + }) .transpose()?; Ok(()) } From 29d9caaf55928f03ec0d6023eaae048062abc153 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:00:50 +0000 Subject: [PATCH 16/34] throw error if unwinded calculated root does not match --- crates/stages/src/stages/merkle.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 4c942cc99d6..9ecfccfa102 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -158,10 +158,17 @@ impl Stage for MerkleStage { let from_transition = tx.get_block_transition(input.unwind_to)?; let to_transition = tx.get_block_transition(input.stage_progress)?; - loader + let root = loader .update_root(tx, current_root, from_transition..to_transition) .map_err(|e| StageError::Fatal(Box::new(e)))?; + if root != target_root { + return Err(StageError::Fatal( + format!("Expected root [{target_root}] does not match calculated one [{root}]") + .into(), + )) + } + info!(target: "sync::stages::merkle::unwind", "Stage finished"); Ok(UnwindOutput { stage_progress: input.unwind_to }) } From e6cd629773c8c8bd9f02174b78e39e3a0bbbd791 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:02:17 +0000 Subject: [PATCH 17/34] fix insertion on test_db for accounts and storages --- crates/stages/src/test_utils/test_db.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 9a2aa19a5db..bf1a0043f28 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -1,5 +1,5 @@ use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, mdbx::{ test_utils::{create_test_db, create_test_db_with_path}, tx::Tx, @@ -271,8 +271,25 @@ impl TestTransaction { storage.into_iter().filter(|e| e.value != U256::ZERO).try_for_each(|entry| { let hashed_entry = StorageEntry { key: keccak256(entry.key), ..entry }; - tx.put::(address, entry)?; - tx.put::(hashed_address, hashed_entry) + let mut cursor = tx.cursor_dup_write::()?; + if let Some(e) = cursor + .seek_by_key_subkey(address, entry.key)? + .filter(|e| e.key == entry.key) + { + cursor.delete_current()?; + } + cursor.upsert(address, entry)?; + + let mut cursor = tx.cursor_dup_write::()?; + if let Some(e) = cursor + .seek_by_key_subkey(hashed_address, hashed_entry.key)? + .filter(|e| e.key == hashed_entry.key) + { + cursor.delete_current()?; + } + cursor.upsert(hashed_address, hashed_entry)?; + + Ok(()) }) }) }) From aa608eee6034ded7c8d7336fe06df8863619f4a6 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:03:55 +0000 Subject: [PATCH 18/34] make sure root is committed --- crates/stages/benches/setup/mod.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 4ee467bbd70..7527a3557f1 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -146,7 +146,19 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { tx.insert_accounts_and_storages(final_state).unwrap(); // make last block have valid state root - let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let root = { + let mut tx_mut = tx.inner(); + let root = DBTrieLoader::default().calculate_root(&tx_mut).unwrap(); + tx_mut.commit().unwrap(); + root + }; + + tx.query(|tx| { + assert!(tx.get::(root)?.is_some()); + Ok(()) + }) + .unwrap(); + let last_block = blocks.last_mut().unwrap(); let cloned_last = last_block.clone(); let mut updated_header = cloned_last.header.unseal(); From b46da0a0edd2341fd0d59891cdc1b9979b859a3f Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:04:30 +0000 Subject: [PATCH 19/34] delete target root for repeated unwinds --- crates/stages/benches/setup/mod.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 7527a3557f1..006f4d79161 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -1,9 +1,4 @@ -use criterion::{ - async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime, - BenchmarkGroup, Criterion, -}; use itertools::concat; -use pprof::criterion::{Output, PProfProfiler}; use reth_db::{ cursor::DbCursorRO, mdbx::{Env, WriteMap}, @@ -16,12 +11,9 @@ use reth_interfaces::test_utils::generators::{ }; use reth_primitives::{Account, Address, SealedBlock, H256}; use reth_stages::{ - stages::{ - AccountHashingStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TotalDifficultyStage, TransactionLookupStage, - }, + stages::{AccountHashingStage, MerkleStage, StorageHashingStage}, test_utils::TestTransaction, - DBTrieLoader, ExecInput, Stage, StageId, UnwindInput, + DBTrieLoader, ExecInput, Stage, UnwindInput, }; use std::{ collections::BTreeMap, @@ -76,6 +68,9 @@ pub(crate) fn unwind_hashes>>( StorageHashingStage::default().unwind(&mut db_tx, unwind).await.unwrap(); AccountHashingStage::default().unwind(&mut db_tx, unwind).await.unwrap(); + let target_root = db_tx.get_header(unwind.unwind_to).unwrap().state_root; + let _ = db_tx.delete::(target_root, None); + // Clear previous run stage.unwind(&mut db_tx, unwind).await.unwrap(); From a33659e57aa1978280dee76d40d6072b08b2afd2 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:07:11 +0000 Subject: [PATCH 20/34] fix and speedup trie db methods --- crates/stages/src/trie/mod.rs | 79 ++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 24 deletions(-) diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index 045deab20ff..0b4b693f756 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -1,7 +1,7 @@ use cita_trie::{PatriciaTrie, Trie}; use hasher::HasherKeccak; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, database::Database, models::{AccountBeforeTx, TransitionIdAddress}, tables, @@ -55,13 +55,31 @@ where } fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { - // Caching and bulk inserting shouldn't be needed, as the data is ordered - self.tx.put::(H256::from_slice(key.as_slice()), value)?; + assert!(false, "Use batch instead."); + Ok(()) + } + + // Insert a batch of data into the cache. + fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { + let mut cursor = self.tx.cursor_write::()?; + for i in 0..keys.len() { + cursor.upsert(H256::from_slice(keys[i].as_slice()), values[i].clone())?; + } + Ok(()) + } + + fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { + let mut cursor = self.tx.cursor_write::()?; + for i in 0..keys.len() { + if cursor.seek_exact(H256::from_slice(keys[i].as_slice()))?.is_some() { + cursor.delete_current()?; + } + } Ok(()) } fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { - self.tx.delete::(H256::from_slice(key), None)?; + assert!(false, "Use batch instead."); Ok(()) } @@ -104,7 +122,11 @@ where fn get(&self, key: &[u8]) -> Result>, Self::Error> { let mut cursor = self.tx.cursor_dup_read::()?; - Ok(cursor.seek_by_key_subkey(self.key, H256::from_slice(key))?.map(|entry| entry.node)) + let subkey = H256::from_slice(key); + Ok(cursor + .seek_by_key_subkey(self.key, subkey)? + .filter(|entry| entry.hash == subkey) + .map(|entry| entry.node)) } fn contains(&self, key: &[u8]) -> Result { @@ -112,28 +134,36 @@ where } fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { - // Caching and bulk inserting shouldn't be needed, as the data is ordered - self.tx.put::( - self.key, - StorageTrieEntry { hash: H256::from_slice(key.as_slice()), node: value }, - )?; + assert!(false, "Use batch instead."); Ok(()) } - fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { + /// Insert a batch of data into the cache. + fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_dup_write::()?; - let subkey = H256::from_slice(key); + for i in 0..keys.len() { + let hash = H256::from_slice(keys[i].as_slice()); + if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { + cursor.delete_current()?; + } + cursor.upsert(self.key, StorageTrieEntry { hash, node: values[i].clone() })?; + } + Ok(()) + } - cursor - .seek_by_key_subkey(self.key, subkey)? - .map(|v| { - if v.hash == subkey { - cursor.delete_current().map_err(|e| TrieError::DatabaseError(e)) - } else { - Err(TrieError::MissingRoot(self.key)) - } - }) - .transpose()?; + fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { + let mut cursor = self.tx.cursor_dup_write::()?; + for i in 0..keys.len() { + let hash = H256::from_slice(keys[i].as_slice()); + if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { + cursor.delete_current()?; + } + } + Ok(()) + } + + fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { + assert!(false, "Use batch instead."); Ok(()) } @@ -147,7 +177,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> { fn new(tx: &'tx Transaction<'itx, DB>, key: H256) -> Result { let root = EMPTY_ROOT; let mut cursor = tx.cursor_dup_write::()?; - if cursor.seek_by_key_subkey(key, root)?.is_none() { + if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() { tx.put::( key, StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() }, @@ -163,6 +193,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> { } tx.cursor_dup_read::()? .seek_by_key_subkey(key, root)? + .filter(|entry| entry.hash == root) .ok_or(TrieError::MissingRoot(root))?; Ok(Self { tx, key }) } @@ -320,7 +351,7 @@ impl DBTrieLoader { for key in changed_storages { if let Some(StorageEntry { value, .. }) = - storage_cursor.seek_by_key_subkey(address, key)? + storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key) { let out = encode_fixed_size(&value).to_vec(); trie.insert(key.as_bytes().to_vec(), out)?; From 121d5b7aa963acd4158ae2c26c7d9fe35acdb96d Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:17:21 +0000 Subject: [PATCH 21/34] set DEFAULT_NUM_BLOCKS for criterion --- crates/stages/benches/criterion.rs | 40 ++++++++++++------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index f9575d54e03..0a34cbb4360 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -5,10 +5,7 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::mdbx::{Env, WriteMap}; use reth_stages::{ - stages::{ - AccountHashingStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, - TotalDifficultyStage, TransactionLookupStage, - }, + stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestTransaction, ExecInput, Stage, StageId, UnwindInput, }; @@ -19,11 +16,13 @@ use setup::StageRange; criterion_group! { name = benches; - config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None))); targets = transaction_lookup, account_hashing, senders, total_difficulty, merkle } criterion_main!(benches); +const DEFAULT_NUM_BLOCKS: u64 = 10_000; + fn account_hashing(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); @@ -49,11 +48,10 @@ fn senders(c: &mut Criterion) { group.sample_size(10); for batch in [1000usize, 10_000, 100_000, 250_000] { - let num_blocks = 10_000; - let stage = SenderRecoveryStage { commit_threshold: num_blocks }; + let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS }; let label = format!("SendersRecovery-batch-{batch}"); - measure_stage(&mut group, setup::stage_unwind, stage, 0..num_blocks + 1, label); + measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label); } } @@ -61,15 +59,13 @@ fn transaction_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); // don't need to run each stage for that many times group.sample_size(10); - - let num_blocks = 10_000; - let stage = TransactionLookupStage::new(num_blocks); + let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS); measure_stage( &mut group, setup::stage_unwind, stage, - 0..num_blocks + 1, + 0..DEFAULT_NUM_BLOCKS, "TransactionLookup".to_string(), ); } @@ -80,15 +76,13 @@ fn total_difficulty(c: &mut Criterion) { group.warm_up_time(std::time::Duration::from_millis(2000)); // don't need to run each stage for that many times group.sample_size(10); - - let num_blocks = 10_000; let stage = TotalDifficultyStage::default(); measure_stage( &mut group, setup::stage_unwind, stage, - 0..num_blocks + 1, + 0..DEFAULT_NUM_BLOCKS, "TotalDifficulty".to_string(), ); } @@ -98,14 +92,12 @@ fn merkle(c: &mut Criterion) { // don't need to run each stage for that many times group.sample_size(10); - let num_blocks = 10_000; - - let stage = MerkleStage::Both { clean_threshold: num_blocks + 1 }; + let stage = MerkleStage::Both { clean_threshold: u64::MAX }; measure_stage( &mut group, setup::unwind_hashes, stage, - 1..num_blocks + 1, + 1..DEFAULT_NUM_BLOCKS + 1, "Merkle-incremental".to_string(), ); @@ -114,7 +106,7 @@ fn merkle(c: &mut Criterion) { &mut group, setup::unwind_hashes, stage, - 1..num_blocks + 1, + 1..DEFAULT_NUM_BLOCKS + 1, "Merkle-fullhash".to_string(), ); } @@ -159,7 +151,7 @@ fn measure_stage( S: Clone + Stage>, F: Fn(S, &TestTransaction, StageRange), { - let path = setup::txs_testdata(block_interval.end - 1); + let path = setup::txs_testdata(block_interval.end); measure_stage_with_path( path, @@ -168,12 +160,12 @@ fn measure_stage( stage, ( ExecInput { - previous_stage: Some((StageId("Another"), block_interval.end - 1)), + previous_stage: Some((StageId("Another"), block_interval.end)), stage_progress: Some(block_interval.start), }, UnwindInput { - stage_progress: Some(block_interval.end - 1), - unwind_to: Some(block_interval.start), + stage_progress: block_interval.end, + unwind_to: block_interval.start, bad_block: None, }, ), From 14feb947d88aceab70da6dfd475bb6be9e625752 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 22 Feb 2023 07:17:50 +0000 Subject: [PATCH 22/34] clippy --- crates/stages/benches/setup/mod.rs | 2 +- crates/stages/src/trie/mod.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 006f4d79161..34876d031ad 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -11,7 +11,7 @@ use reth_interfaces::test_utils::generators::{ }; use reth_primitives::{Account, Address, SealedBlock, H256}; use reth_stages::{ - stages::{AccountHashingStage, MerkleStage, StorageHashingStage}, + stages::{AccountHashingStage, StorageHashingStage}, test_utils::TestTransaction, DBTrieLoader, ExecInput, Stage, UnwindInput, }; diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index 0b4b693f756..75ed102c44f 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -1,7 +1,7 @@ use cita_trie::{PatriciaTrie, Trie}; use hasher::HasherKeccak; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, models::{AccountBeforeTx, TransitionIdAddress}, tables, @@ -54,7 +54,7 @@ where Ok(::get(self, key)?.is_some()) } - fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { + fn insert(&self, _key: Vec, _value: Vec) -> Result<(), Self::Error> { assert!(false, "Use batch instead."); Ok(()) } @@ -78,7 +78,7 @@ where Ok(()) } - fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { + fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { assert!(false, "Use batch instead."); Ok(()) } @@ -133,7 +133,7 @@ where Ok(::get(self, key)?.is_some()) } - fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { + fn insert(&self, _key: Vec, _value: Vec) -> Result<(), Self::Error> { assert!(false, "Use batch instead."); Ok(()) } @@ -162,7 +162,7 @@ where Ok(()) } - fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { + fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { assert!(false, "Use batch instead."); Ok(()) } From 1a203311b57d6abff0371c897eb0a9c4a60936b7 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 23 Feb 2023 02:14:19 +0000 Subject: [PATCH 23/34] remove assert use --- crates/stages/src/trie/mod.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index 75ed102c44f..552124d0394 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -55,8 +55,7 @@ where } fn insert(&self, _key: Vec, _value: Vec) -> Result<(), Self::Error> { - assert!(false, "Use batch instead."); - Ok(()) + unreachable!("Use batch instead."); } // Insert a batch of data into the cache. @@ -79,8 +78,7 @@ where } fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { - assert!(false, "Use batch instead."); - Ok(()) + unreachable!("Use batch instead."); } fn flush(&self) -> Result<(), Self::Error> { @@ -134,8 +132,7 @@ where } fn insert(&self, _key: Vec, _value: Vec) -> Result<(), Self::Error> { - assert!(false, "Use batch instead."); - Ok(()) + unreachable!("Use batch instead."); } /// Insert a batch of data into the cache. @@ -163,8 +160,7 @@ where } fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { - assert!(false, "Use batch instead."); - Ok(()) + unreachable!("Use batch instead."); } fn flush(&self) -> Result<(), Self::Error> { From 60f1fa88bc5a28e21f6ebe507195a8b3773e510f Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 23 Feb 2023 04:35:00 +0000 Subject: [PATCH 24/34] fix update trie root --- crates/stages/src/lib.rs | 3 +-- crates/stages/src/trie/mod.rs | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 5fdff2fa35c..6b6d72dee0e 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -60,8 +60,6 @@ mod util; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; -pub use trie::DBTrieLoader; - /// A re-export of common structs and traits. pub mod prelude; @@ -74,6 +72,7 @@ pub use error::*; pub use id::*; pub use pipeline::*; pub use stage::*; +pub use trie::DBTrieLoader; // NOTE: Needed so the link in the module-level rustdoc works. #[allow(unused_extern_crates)] diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index 552124d0394..d050f361c0b 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -309,20 +309,21 @@ impl DBTrieLoader { let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?; for (address, changed_storages) in changed_accounts { - if let Some(account) = trie.get(address.as_slice())? { - let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root; + let storage_root = if let Some(account) = trie.get(address.as_slice())? { trie.remove(address.as_bytes())?; - if let Some((_, account)) = accounts_cursor.seek_exact(address)? { - let value = EthAccount::from_with_root( - account, - self.update_storage_root(tx, storage_root, address, changed_storages)?, - ); + let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root; + self.update_storage_root(tx, storage_root, address, changed_storages)? + } else { + self.calculate_storage_root(tx, address)? + }; - let mut out = Vec::new(); - Encodable::encode(&value, &mut out); - trie.insert(address.as_bytes().to_vec(), out)?; - } + if let Some((_, account)) = accounts_cursor.seek_exact(address)? { + let value = EthAccount::from_with_root(account, storage_root); + + let mut out = Vec::new(); + Encodable::encode(&value, &mut out); + trie.insert(address.as_bytes().to_vec(), out)?; } } From f578774a888afa0c2a0c875facd904d2a17c7d0b Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 23 Feb 2023 04:38:10 +0000 Subject: [PATCH 25/34] add dump-stage merkle --- bin/reth/src/dump_stage/merkle.rs | 122 ++++++++++++++++++++++++++++++ bin/reth/src/dump_stage/mod.rs | 8 ++ 2 files changed, 130 insertions(+) create mode 100644 bin/reth/src/dump_stage/merkle.rs diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs new file mode 100644 index 00000000000..e9b28ff3d66 --- /dev/null +++ b/bin/reth/src/dump_stage/merkle.rs @@ -0,0 +1,122 @@ +use crate::{ + db::DbTool, + dirs::{DbPath, PlatformPath}, + dump_stage::setup, +}; +use eyre::Result; +use reth_db::{database::Database, table::TableImporter, tables, transaction::DbTx}; +use reth_provider::Transaction; +use reth_stages::{ + stages::{AccountHashingStage, MerkleStage, StorageHashingStage}, + Stage, StageId, UnwindInput, +}; +use std::ops::DerefMut; +use tracing::info; + +pub(crate) async fn dump_merkle_stage( + db_tool: &mut DbTool<'_, DB>, + from: u64, + to: u64, + output_db: &PlatformPath, + should_run: bool, +) -> Result<()> { + let (output_db, tip_block_number) = setup::(from, to, output_db, db_tool)?; + + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + + let tx = db_tool.db.tx()?; + let from_transition_rev = + tx.get::(from)?.expect("there should be at least one."); + let to_transition_rev = + tx.get::(to)?.expect("there should be at least one."); + + output_db.update(|tx| { + tx.import_table_with_range::( + &db_tool.db.tx()?, + Some(from_transition_rev), + to_transition_rev, + ) + })??; + + unwind_and_copy::(db_tool, (from, to), tip_block_number, &output_db).await?; + + if should_run { + println!( + "\n# Merkle stage does not support dry run, so it will actually be committing changes." + ); + run(output_db, to, from).await?; + } + + Ok(()) +} + +/// Dry-run an unwind to FROM block and copy the necessary table data to the new database. +async fn unwind_and_copy( + db_tool: &mut DbTool<'_, DB>, + range: (u64, u64), + tip_block_number: u64, + output_db: &reth_db::mdbx::Env, +) -> eyre::Result<()> { + let (from, to) = range; + let mut unwind_tx = Transaction::new(db_tool.db)?; + let unwind = UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None }; + let execute_input = reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }; + + // Unwind hashes all the way to FROM + StorageHashingStage::default().unwind(&mut unwind_tx, unwind.clone()).await.unwrap(); + AccountHashingStage::default().unwind(&mut unwind_tx, unwind.clone()).await.unwrap(); + + MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; + + // Bring hashes to TO + AccountHashingStage::default().execute(&mut unwind_tx, execute_input).await.unwrap(); + StorageHashingStage::default().execute(&mut unwind_tx, execute_input).await.unwrap(); + + let unwind_inner_tx = unwind_tx.deref_mut(); + + // TODO optimize we can actually just get the entries we need for both these tables + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + + unwind_tx.drop()?; + + Ok(()) +} + +/// Try to re-execute the stage straightaway +async fn run( + output_db: reth_db::mdbx::Env, + to: u64, + from: u64, +) -> eyre::Result<()> { + info!(target: "reth::cli", "Executing stage."); + + let mut tx = Transaction::new(&output_db)?; + + MerkleStage::Execution { + clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch + } + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await?; + + info!(target: "reth::cli", "Success."); + + Ok(()) +} diff --git a/bin/reth/src/dump_stage/mod.rs b/bin/reth/src/dump_stage/mod.rs index 0bc3863a342..4eddb3817da 100644 --- a/bin/reth/src/dump_stage/mod.rs +++ b/bin/reth/src/dump_stage/mod.rs @@ -8,6 +8,9 @@ use hashing_account::dump_hashing_account_stage; mod execution; use execution::dump_execution_stage; +mod merkle; +use merkle::dump_merkle_stage; + use crate::{ db::DbTool, dirs::{DbPath, PlatformPath}, @@ -45,6 +48,8 @@ pub enum Stages { StorageHashing(StageCommand), /// AccountHashing stage. AccountHashing(StageCommand), + /// Merkle stage. + Merkle(StageCommand), } /// Stage command that takes a range @@ -94,6 +99,9 @@ impl Command { Stages::AccountHashing(StageCommand { output_db, from, to, dry_run, .. }) => { dump_hashing_account_stage(&mut tool, *from, *to, output_db, *dry_run).await? } + Stages::Merkle(StageCommand { output_db, from, to, dry_run, .. }) => { + dump_merkle_stage(&mut tool, *from, *to, output_db, *dry_run).await? + } } Ok(()) From c20dc5843425672206bac6da3c9e6aba3f38b22c Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 23 Feb 2023 05:32:00 +0000 Subject: [PATCH 26/34] zip keys and values --- crates/stages/src/trie/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index d050f361c0b..199df9f8417 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -61,16 +61,16 @@ where // Insert a batch of data into the cache. fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_write::()?; - for i in 0..keys.len() { - cursor.upsert(H256::from_slice(keys[i].as_slice()), values[i].clone())?; + for (key, value) in keys.into_iter().zip(values.into_iter()) { + cursor.upsert(H256::from_slice(key.as_slice()), value)?; } Ok(()) } fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_write::()?; - for i in 0..keys.len() { - if cursor.seek_exact(H256::from_slice(keys[i].as_slice()))?.is_some() { + for key in keys { + if cursor.seek_exact(H256::from_slice(key.as_slice()))?.is_some() { cursor.delete_current()?; } } @@ -138,20 +138,20 @@ where /// Insert a batch of data into the cache. fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_dup_write::()?; - for i in 0..keys.len() { - let hash = H256::from_slice(keys[i].as_slice()); + for (key, node) in keys.into_iter().zip(values.into_iter()) { + let hash = H256::from_slice(key.as_slice()); if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { cursor.delete_current()?; } - cursor.upsert(self.key, StorageTrieEntry { hash, node: values[i].clone() })?; + cursor.upsert(self.key, StorageTrieEntry { hash, node })?; } Ok(()) } fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_dup_write::()?; - for i in 0..keys.len() { - let hash = H256::from_slice(keys[i].as_slice()); + for key in keys { + let hash = H256::from_slice(key.as_slice()); if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { cursor.delete_current()?; } From a50d7f80b245f688c1db53816ba45d973a375c55 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 23 Feb 2023 05:40:48 +0000 Subject: [PATCH 27/34] clippy --- bin/reth/src/dump_stage/merkle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index e9b28ff3d66..51791b3d31f 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -68,8 +68,8 @@ async fn unwind_and_copy( }; // Unwind hashes all the way to FROM - StorageHashingStage::default().unwind(&mut unwind_tx, unwind.clone()).await.unwrap(); - AccountHashingStage::default().unwind(&mut unwind_tx, unwind.clone()).await.unwrap(); + StorageHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap(); + AccountHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap(); MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; From d25a3df9e921eef6d22d87e48556cd6029b6d07d Mon Sep 17 00:00:00 2001 From: joshieDo Date: Mon, 27 Feb 2023 03:48:08 +0000 Subject: [PATCH 28/34] unwind execution stage too for merkle dump --- bin/reth/src/dump_stage/merkle.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index 51791b3d31f..4ae5120af6a 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -7,7 +7,7 @@ use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables, transaction::DbTx}; use reth_provider::Transaction; use reth_stages::{ - stages::{AccountHashingStage, MerkleStage, StorageHashingStage}, + stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage}, Stage, StageId, UnwindInput, }; use std::ops::DerefMut; @@ -73,15 +73,27 @@ async fn unwind_and_copy( MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; + // Bring Plainstate to TO (hashing stage execution requires it) + ExecutionStage { commit_threshold: u64::MAX, ..Default::default() } + .unwind( + &mut unwind_tx, + UnwindInput { unwind_to: to, stage_progress: tip_block_number, bad_block: None }, + ) + .await?; + // Bring hashes to TO - AccountHashingStage::default().execute(&mut unwind_tx, execute_input).await.unwrap(); - StorageHashingStage::default().execute(&mut unwind_tx, execute_input).await.unwrap(); + AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } + .execute(&mut unwind_tx, execute_input) + .await + .unwrap(); + StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } + .execute(&mut unwind_tx, execute_input) + .await + .unwrap(); let unwind_inner_tx = unwind_tx.deref_mut(); - // TODO optimize we can actually just get the entries we need for both these tables - output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; - output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + // TODO optimize we can actually just get the entries we need output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; From 1c10b657d27e2dae6f9c65f8dfab0fd85f3c9d5d Mon Sep 17 00:00:00 2001 From: joshieDo Date: Mon, 27 Feb 2023 06:24:39 +0000 Subject: [PATCH 29/34] fix hashing storage incremental --- crates/stages/src/stages/hashing_storage.rs | 38 +++++++++++---------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 2427d90a50f..a187ddb2ee1 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -160,33 +160,35 @@ impl Stage for StorageHashingStage { // Assumption we are okay with is that plain state represent // `previous_stage_progress` state. .map(|(address, storage)| { - storage - .into_iter() - .map(|key| { - plain_storage - .seek_by_key_subkey(address, key) - .map(|ret| (keccak256(key), ret.map(|e| e.value))) - }) - .collect::, _>>() - .map(|storage| (keccak256(address), storage)) + ( + keccak256(address), + storage + .into_iter() + .filter_map(|key| { + plain_storage + .seek_by_key_subkey(address, key) + .ok()? + .filter(|v| v.key == key) + .map(|ret| (keccak256(key), ret.value)) + }) + .collect::>(), + ) }) - .collect::, _>>()? + .collect::>() .into_iter() // Hash the address and key and apply them to HashedStorage (if Storage is None // just remove it); - .try_for_each(|(address, storage)| { + .try_for_each(|(hashed_address, storage)| { storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> { if hashed_storage - .seek_by_key_subkey(address, key)? + .seek_by_key_subkey(hashed_address, key)? .filter(|entry| entry.key == key) .is_some() { hashed_storage.delete_current()?; } - if let Some(value) = val { - hashed_storage.upsert(address, StorageEntry { key, value })?; - } + hashed_storage.upsert(hashed_address, StorageEntry { key, value: val })?; Ok(()) }) })?; @@ -232,9 +234,9 @@ impl Stage for StorageHashingStage { .collect::>() .into_iter() // Apply values to HashedStorage (if Value is zero just remove it); - .try_for_each(|((address, key), value)| -> Result<(), StageError> { + .try_for_each(|((hashed_address, key), value)| -> Result<(), StageError> { if hashed_storage - .seek_by_key_subkey(address, key)? + .seek_by_key_subkey(hashed_address, key)? .filter(|entry| entry.key == key) .is_some() { @@ -242,7 +244,7 @@ impl Stage for StorageHashingStage { } if value != U256::ZERO { - hashed_storage.upsert(address, StorageEntry { key, value })?; + hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; } Ok(()) })?; From e4c0a0e741ee69001671ec3e504bd8a0dfa19b12 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Mon, 27 Feb 2023 06:39:42 +0000 Subject: [PATCH 30/34] propagate error --- crates/stages/src/stages/hashing_storage.rs | 23 +++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index a187ddb2ee1..5e154d93aa2 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -160,21 +160,26 @@ impl Stage for StorageHashingStage { // Assumption we are okay with is that plain state represent // `previous_stage_progress` state. .map(|(address, storage)| { - ( + let res = ( keccak256(address), storage .into_iter() - .filter_map(|key| { - plain_storage - .seek_by_key_subkey(address, key) - .ok()? - .filter(|v| v.key == key) - .map(|ret| (keccak256(key), ret.value)) + .map(|key| { + Ok::, reth_db::Error>( + plain_storage + .seek_by_key_subkey(address, key)? + .filter(|v| v.key == key) + .map(|ret| (keccak256(key), ret.value)), + ) }) + .collect::>, _>>()? + .into_iter() + .filter_map(|v| v) .collect::>(), - ) + ); + Ok::<_, reth_db::Error>(res) }) - .collect::>() + .collect::, _>>()? .into_iter() // Hash the address and key and apply them to HashedStorage (if Storage is None // just remove it); From 472bfeb4714a052a487ae2a8782790e6661bee3f Mon Sep 17 00:00:00 2001 From: joshieDo Date: Mon, 27 Feb 2023 07:04:32 +0000 Subject: [PATCH 31/34] clippy --- crates/stages/src/stages/hashing_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 5e154d93aa2..e7b5d3c49a3 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -174,7 +174,7 @@ impl Stage for StorageHashingStage { }) .collect::>, _>>()? .into_iter() - .filter_map(|v| v) + .flatten() .collect::>(), ); Ok::<_, reth_db::Error>(res) From 54710c12fcd77e78203c423711e5434bb42de5c1 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 28 Feb 2023 02:45:43 +0000 Subject: [PATCH 32/34] use existing consensus error for diff root --- crates/stages/src/stages/merkle.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 9ecfccfa102..61a6e57dc7d 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -158,15 +158,20 @@ impl Stage for MerkleStage { let from_transition = tx.get_block_transition(input.unwind_to)?; let to_transition = tx.get_block_transition(input.stage_progress)?; - let root = loader + let block_root = loader .update_root(tx, current_root, from_transition..to_transition) .map_err(|e| StageError::Fatal(Box::new(e)))?; - if root != target_root { - return Err(StageError::Fatal( - format!("Expected root [{target_root}] does not match calculated one [{root}]") - .into(), - )) + if block_root != target_root { + let unwind_to = input.unwind_to; + warn!(target: "sync::stages::merkle::unwind", ?unwind_to, got = ?block_root, expected = ?target_root, "Block's root state failed verification"); + return Err(StageError::Validation { + block: unwind_to, + error: consensus::Error::BodyStateRootDiff { + got: block_root, + expected: target_root, + }, + }) } info!(target: "sync::stages::merkle::unwind", "Stage finished"); From 1f768e3c7fbba2482f19bd5df8b90ee3dbfdad71 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 28 Feb 2023 02:46:31 +0000 Subject: [PATCH 33/34] add stage id MERKLE_BOTH --- crates/stages/src/stages/merkle.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 61a6e57dc7d..884c137a86d 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -14,6 +14,9 @@ pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecute"); /// The [`StageId`] of the merkle hashing unwind stage. pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind"); +/// The [`StageId`] of the merkle hashing unwind and execution stage. +pub const MERKLE_BOTH: StageId = StageId("MerkleBoth"); + /// The merkle hashing stage uses input from /// [`AccountHashingStage`][crate::stages::AccountHashingStage] and /// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes @@ -72,7 +75,7 @@ impl Stage for MerkleStage { MerkleStage::Execution { .. } => MERKLE_EXECUTION, MerkleStage::Unwind => MERKLE_UNWIND, #[cfg(any(test, feature = "test-utils"))] - MerkleStage::Both { .. } => unreachable!(), + MerkleStage::Both { .. } => MERKLE_BOTH, } } From 283292feeeab74cec27f13f446265a82bec89f00 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 1 Mar 2023 02:23:27 +0000 Subject: [PATCH 34/34] add missing Executor to stage --- bin/reth/src/dump_stage/merkle.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index 4ae5120af6a..8e4b2b56115 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -5,10 +5,11 @@ use crate::{ }; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables, transaction::DbTx}; +use reth_primitives::MAINNET; use reth_provider::Transaction; use reth_stages::{ stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage}, - Stage, StageId, UnwindInput, + DefaultDB, Stage, StageId, UnwindInput, }; use std::ops::DerefMut; use tracing::info; @@ -74,7 +75,9 @@ async fn unwind_and_copy( MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; // Bring Plainstate to TO (hashing stage execution requires it) - ExecutionStage { commit_threshold: u64::MAX, ..Default::default() } + let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone()); + exec_stage.commit_threshold = u64::MAX; + exec_stage .unwind( &mut unwind_tx, UnwindInput { unwind_to: to, stage_progress: tip_block_number, bad_block: None },