diff --git a/Cargo.lock b/Cargo.lock index 6417ea64c8b..f3130d2b86f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4851,6 +4851,7 @@ dependencies = [ "itertools 0.10.5", "parking_lot 0.12.1", "proptest", + "reth-codecs", "reth-db", "reth-interfaces", "reth-primitives", diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs index a9e8a8f6204..6e6ce370ff3 100644 --- a/bin/reth/src/dump_stage/merkle.rs +++ b/bin/reth/src/dump_stage/merkle.rs @@ -44,10 +44,7 @@ pub(crate) async fn dump_merkle_stage( unwind_and_copy::(db_tool, (from, to), tip_block_number, &output_db).await?; if should_run { - println!( - "\n# Merkle stage does not support dry run, so it will actually be committing changes." - ); - run(output_db, to, from).await?; + dry_run(output_db, to, from).await?; } Ok(()) @@ -113,7 +110,7 @@ async fn unwind_and_copy( } /// Try to re-execute the stage straightaway -async fn run( +async fn dry_run( output_db: reth_db::mdbx::Env, to: u64, from: u64, @@ -121,18 +118,24 @@ async fn run( info!(target: "reth::cli", "Executing stage."); let mut tx = Transaction::new(&output_db)?; - - MerkleStage::Execution { - clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch + let mut exec_output = false; + while !exec_output { + exec_output = MerkleStage::Execution { + clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from + * scratch */ + } + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await? + .done; } - .execute( - &mut tx, - reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), to)), - stage_progress: Some(from), - }, - ) - .await?; + + tx.drop()?; info!(target: "reth::cli", "Success."); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index c4b8a8a1c4d..13d68dbfc03 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -34,6 +34,7 @@ mod withdrawal; /// Helper function for calculating Merkle proofs and hashes pub mod proofs; +pub use proofs::ProofCheckpoint; pub use account::{Account, Bytecode}; pub use bits::H512; diff --git a/crates/primitives/src/proofs.rs b/crates/primitives/src/proofs.rs index bfba3e50507..c94a68aad33 100644 --- a/crates/primitives/src/proofs.rs +++ b/crates/primitives/src/proofs.rs @@ -8,6 +8,7 @@ use bytes::BytesMut; use hash_db::Hasher; use hex_literal::hex; use plain_hasher::PlainHasher; +use reth_codecs::{main_codec, Compact}; use reth_rlp::Encodable; use triehash::{ordered_trie_root, sec_trie_root}; @@ -34,6 +35,20 @@ impl Hasher for KeccakHasher { } } +/// Saves the progress of MerkleStage +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq)] +pub struct ProofCheckpoint { + /// The next hashed account to insert into the trie. + pub hashed_address: Option, + /// The next storage entry to insert into the trie. + pub storage_key: Option, + /// Current intermediate root for `AccountsTrie`. + pub account_root: Option, + /// Current intermediate storage root from an account. + pub storage_root: Option, +} + /// Calculate a transaction root. /// /// Iterates over the given transactions and the merkle merkle trie root of diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index aaed854bd82..08b30de0905 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -123,7 +123,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { tx.insert_accounts_and_storages(start_state.clone()).unwrap(); // make first block after genesis have valid state root - let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let root = + DBTrieLoader::default().calculate_root(&tx.inner()).and_then(|e| e.root()).unwrap(); let second_block = blocks.get_mut(1).unwrap(); let cloned_second = second_block.clone(); let mut updated_header = cloned_second.header.unseal(); @@ -144,7 +145,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { // make last block have valid state root let root = { let mut tx_mut = tx.inner(); - let root = DBTrieLoader::default().calculate_root(&tx_mut).unwrap(); + let root = + DBTrieLoader::default().calculate_root(&tx_mut).and_then(|e| e.root()).unwrap(); tx_mut.commit().unwrap(); root }; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 3078742fd11..d2b8df05b2d 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -1,7 +1,10 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::consensus; -use reth_provider::{trie::DBTrieLoader, Transaction}; +use reth_provider::{ + trie::{DBTrieLoader, TrieProgress}, + Transaction, +}; use std::{fmt::Debug, ops::DerefMut}; use tracing::*; @@ -105,23 +108,32 @@ impl Stage for MerkleStage { let trie_root = if from_transition == to_transition { block_root - } else if to_transition - from_transition > threshold || stage_progress == 0 { - debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie"); - // if there are more blocks than threshold it is faster to rebuild the trie - DBTrieLoader::new(tx.deref_mut()) - .calculate_root() - .map_err(|e| StageError::Fatal(Box::new(e)))? } else { - debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie"); - // Iterate over changeset (similar to Hashing stages) and take new values - let current_root = tx.get_header(stage_progress)?.state_root; - DBTrieLoader::new(tx.deref_mut()) - .update_root(current_root, from_transition..to_transition) - .map_err(|e| StageError::Fatal(Box::new(e)))? + let res = if to_transition - from_transition > threshold || stage_progress == 0 { + debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie"); + // if there are more blocks than threshold it is faster to rebuild the trie + let mut loader = DBTrieLoader::new(tx.deref_mut()); + loader.calculate_root().map_err(|e| StageError::Fatal(Box::new(e)))? + } else { + debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie"); + // Iterate over changeset (similar to Hashing stages) and take new values + let current_root = tx.get_header(stage_progress)?.state_root; + let mut loader = DBTrieLoader::new(tx.deref_mut()); + loader + .update_root(current_root, from_transition..to_transition) + .map_err(|e| StageError::Fatal(Box::new(e)))? + }; + + match res { + TrieProgress::Complete(root) => root, + TrieProgress::InProgress(_) => { + return Ok(ExecOutput { stage_progress, done: false }) + } + } }; if block_root != trie_root { - warn!(target: "sync::stages::merkle::exec", ?previous_stage_progress, got = ?block_root, expected = ?trie_root, "Block's root state failed verification"); + warn!(target: "sync::stages::merkle::exec", ?previous_stage_progress, got = ?trie_root, expected = ?block_root, "Block's root state failed verification"); return Err(StageError::Validation { block: previous_stage_progress, error: consensus::ConsensusError::BodyStateRootDiff { @@ -156,13 +168,28 @@ impl Stage for MerkleStage { } let current_root = tx.get_header(input.stage_progress)?.state_root; - let from_transition = tx.get_block_transition(input.unwind_to)?; let to_transition = tx.get_block_transition(input.stage_progress)?; - let block_root = DBTrieLoader::new(tx.deref_mut()) - .update_root(current_root, from_transition..to_transition) - .map_err(|e| StageError::Fatal(Box::new(e)))?; + let mut loader = DBTrieLoader::new(tx.deref_mut()); + let block_root = loop { + match loader + .update_root(current_root, from_transition..to_transition) + .map_err(|e| StageError::Fatal(Box::new(e)))? + { + TrieProgress::Complete(root) => break root, + TrieProgress::InProgress(_) => { + // Save the loader's progress & drop it to allow committing to the database, + // otherwise we're hitting the borrow checker + let progress = loader.current; + let _ = loader; + tx.commit()?; + // Reinstantiate the loader from where it was left off. + loader = DBTrieLoader::new(tx.deref_mut()); + loader.current = progress; + } + } + }; if block_root != target_root { let unwind_to = input.unwind_to; @@ -447,7 +474,10 @@ mod tests { impl MerkleTestRunner { fn state_root(&self) -> Result { - Ok(create_trie_loader(&self.tx.inner()).calculate_root().unwrap()) + Ok(create_trie_loader(&self.tx.inner()) + .calculate_root() + .and_then(|e| e.root()) + .unwrap()) } pub(crate) fn generate_initial_trie( @@ -459,8 +489,10 @@ mod tests { )?; let mut tx = self.tx.inner(); - let root = - create_trie_loader(&tx).calculate_root().expect("couldn't create initial trie"); + let root = create_trie_loader(&tx) + .calculate_root() + .and_then(|e| e.root()) + .expect("couldn't create initial trie"); tx.commit()?; @@ -471,7 +503,10 @@ mod tests { if previous_stage_progress != 0 { let block_root = self.tx.inner().get_header(previous_stage_progress).unwrap().state_root; - let root = create_trie_loader(&self.tx.inner()).calculate_root().unwrap(); + let root = create_trie_loader(&self.tx().inner()) + .calculate_root() + .and_then(|e| e.root()) + .unwrap(); assert_eq!(block_root, root); } Ok(()) diff --git a/crates/storage/db/src/tables/codecs/compact.rs b/crates/storage/db/src/tables/codecs/compact.rs index 4c0a4136416..c1689a4d8b8 100644 --- a/crates/storage/db/src/tables/codecs/compact.rs +++ b/crates/storage/db/src/tables/codecs/compact.rs @@ -44,7 +44,8 @@ impl_compression_for_compact!( StoredBlockBody, StoredBlockOmmers, StoredBlockWithdrawals, - Bytecode + Bytecode, + ProofCheckpoint ); impl_compression_for_compact!(AccountBeforeTx, TransactionSigned); impl_compression_for_compact!(CompactU256); diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 16fb6ac49b4..bb45b8bf4e0 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -32,7 +32,7 @@ pub enum TableType { } /// Default tables that should be present inside database. -pub const TABLES: [(TableType, &str); 26] = [ +pub const TABLES: [(TableType, &str); 27] = [ (TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), @@ -59,6 +59,7 @@ pub const TABLES: [(TableType, &str); 26] = [ (TableType::DupSort, StoragesTrie::const_name()), (TableType::Table, TxSenders::const_name()), (TableType::Table, SyncStage::const_name()), + (TableType::Table, SyncStageProgress::const_name()), ]; #[macro_export] @@ -293,6 +294,11 @@ table!( ( SyncStage ) StageId | BlockNumber ); +table!( + /// Stores arbitrary data to keep track of a stage first-sync progress. + ( SyncStageProgress ) StageId | Vec +); + /// /// Alias Types diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index c0e26f28791..7cfe4eaffbf 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -13,8 +13,9 @@ reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } reth-revm-primitives = { path = "../../revm/revm-primitives" } reth-db = { path = "../db" } -reth-tracing = { path = "../../tracing" } -reth-rlp = { path = "../../rlp" } +reth-codecs = { path = "../codecs" } +reth-tracing = {path = "../../tracing"} +reth-rlp = {path = "../../rlp"} revm-primitives = "1.0.0" diff --git a/crates/storage/provider/src/providers/state/latest.rs b/crates/storage/provider/src/providers/state/latest.rs index e600eacb294..7d277427771 100644 --- a/crates/storage/provider/src/providers/state/latest.rs +++ b/crates/storage/provider/src/providers/state/latest.rs @@ -76,7 +76,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for LatestStateProviderRef<'a, 'b, TX> .state_root; let (account_proof, storage_root) = loader - .generate_acount_proof(self.db, root, hashed_address) + .generate_acount_proof(root, hashed_address) .map_err(|_| ProviderError::StateTree)?; let account_proof = account_proof.into_iter().map(Bytes::from).collect(); @@ -86,7 +86,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for LatestStateProviderRef<'a, 'b, TX> } else { let hashed_keys: Vec = keys.iter().map(keccak256).collect(); loader - .generate_storage_proofs(self.db, storage_root, hashed_address, &hashed_keys) + .generate_storage_proofs(storage_root, hashed_address, &hashed_keys) .map_err(|_| ProviderError::StateTree)? .into_iter() .map(|v| v.into_iter().map(Bytes::from).collect()) diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 0c1d39b362b..90a3848c96e 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -308,8 +308,8 @@ where // merkle tree { let current_root = self.get_header(parent_block_number)?.state_root; - let loader = DBTrieLoader::new(self.deref_mut()); - let root = loader.update_root(current_root, from..to)?; + let mut loader = DBTrieLoader::new(self.deref_mut()); + let root = loader.update_root(current_root, from..to).and_then(|e| e.root())?; if root != block.state_root { return Err(TransactionError::StateTrieRootMismatch { got: root, diff --git a/crates/storage/provider/src/trie/mod.rs b/crates/storage/provider/src/trie/mod.rs index ffb9661d189..953fccd0d14 100644 --- a/crates/storage/provider/src/trie/mod.rs +++ b/crates/storage/provider/src/trie/mod.rs @@ -1,5 +1,6 @@ use cita_trie::{PatriciaTrie, Trie}; use hasher::HasherKeccak; +use reth_codecs::Compact; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, models::{AccountBeforeTx, TransitionIdAddress}, @@ -7,8 +8,8 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_primitives::{ - keccak256, proofs::EMPTY_ROOT, Account, Address, StorageEntry, StorageTrieEntry, TransitionId, - H256, KECCAK_EMPTY, U256, + keccak256, proofs::EMPTY_ROOT, Account, Address, ProofCheckpoint, StorageEntry, + StorageTrieEntry, TransitionId, H256, KECCAK_EMPTY, U256, }; use reth_rlp::{ encode_fixed_size, Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable, @@ -38,6 +39,8 @@ pub enum TrieError { /// Error when encoding/decoding a value. #[error("{0:?}")] DecodeError(#[from] DecodeError), + #[error("Trie requires committing a checkpoint.")] + UnexpectedCheckpoint, } /// Database wrapper implementing HashDB trait, with a read-write transaction. @@ -207,11 +210,18 @@ where } /// Database wrapper implementing HashDB trait, with a read-only transaction. -struct HashDatabase<'tx, 'itx, TX: DbTx<'itx>> { +pub struct HashDatabase<'tx, 'itx, TX: DbTx<'itx>> { tx: &'tx TX, _p: PhantomData<&'itx ()>, // to suppress "unused" lifetime 'itx } +impl<'tx, 'itx, TX: DbTx<'itx>> HashDatabase<'tx, 'itx, TX> { + /// Creates a new Hash database with the given transaction + pub fn new(tx: &'tx TX) -> Self { + Self { tx, _p: Default::default() } + } +} + impl<'tx, 'itx, TX> cita_trie::DB for HashDatabase<'tx, 'itx, TX> where TX: DbTx<'itx>, @@ -250,12 +260,19 @@ impl<'tx, 'itx, TX: DbTx<'itx>> HashDatabase<'tx, 'itx, TX> { } /// Database wrapper implementing HashDB trait, with a read-only transaction. -struct DupHashDatabase<'tx, 'itx, TX: DbTx<'itx>> { +pub struct DupHashDatabase<'tx, 'itx, TX: DbTx<'itx>> { tx: &'tx TX, key: H256, _p: PhantomData<&'itx ()>, // to suppress "unused" lifetime 'itx } +impl<'tx, 'itx, TX: DbTx<'itx>> DupHashDatabase<'tx, 'itx, TX> { + /// Creates a new DupHash database with the given transaction and key. + pub fn new(tx: &'tx TX, key: H256) -> Self { + Self { tx, key, _p: Default::default() } + } +} + impl<'tx, 'itx, TX> cita_trie::DB for DupHashDatabase<'tx, 'itx, TX> where TX: DbTx<'itx>, @@ -338,14 +355,41 @@ pub type MerkleProof = Vec>; /// Struct for calculating the root of a merkle patricia tree, /// while populating the database with intermediate hashes. +#[derive(Debug)] pub struct DBTrieLoader<'tx, TX> { - tx: &'tx TX, + /// The maximum number of keys to insert before committing. Both from `AccountsTrie` and + /// `StoragesTrie`. + pub commit_threshold: u64, + /// The current number of inserted keys from both `AccountsTrie` and `StoragesTrie`. + pub current: u64, + /// The transaction to use for inserting the trie nodes. + pub tx: &'tx TX, +} + +/// Status of the trie calculation. +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum TrieProgress { + /// Trie has finished with the passed root. + Complete(H256), + /// Trie has hit its commit threshold. + InProgress(ProofCheckpoint), +} + +impl TrieProgress { + /// Consumes the root from its `Complete` variant. If that's not possible, throw + /// `TrieError::UnexpectedCheckpoint`. + pub fn root(self) -> Result { + match self { + Self::Complete(root) => Ok(root), + _ => Err(TrieError::UnexpectedCheckpoint), + } + } } impl<'tx, TX> DBTrieLoader<'tx, TX> { /// Create new instance of trie loader. pub fn new(tx: &'tx TX) -> Self { - Self { tx } + Self { tx, commit_threshold: 500_000, current: 0 } } } @@ -355,151 +399,243 @@ where TX: DbTxMut<'db> + DbTx<'db> + Send + Sync, { /// Calculates the root of the state trie, saving intermediate hashes in the database. - pub fn calculate_root(&self) -> Result { - self.tx.clear::()?; - self.tx.clear::()?; - - let mut accounts_cursor = self.tx.cursor_read::()?; - let mut walker = accounts_cursor.walk(None)?; + pub fn calculate_root(&mut self) -> Result { + let mut checkpoint = self.get_checkpoint()?; - let db = Arc::new(HashDatabaseMut::new(self.tx)?); + if checkpoint.hashed_address.is_none() { + self.tx.clear::()?; + self.tx.clear::()?; + } + let previous_root = checkpoint.account_root.unwrap_or(EMPTY_ROOT); let hasher = Arc::new(HasherKeccak::new()); + let mut trie = if let Some(root) = checkpoint.account_root { + PatriciaTrie::from( + Arc::new(HashDatabaseMut::from_root(self.tx, root)?), + hasher, + root.as_bytes(), + )? + } else { + PatriciaTrie::new(Arc::new(HashDatabaseMut::new(self.tx)?), hasher) + }; - let mut trie = PatriciaTrie::new(Arc::clone(&db), Arc::clone(&hasher)); + let mut accounts_cursor = self.tx.cursor_read::()?; + let mut walker = accounts_cursor.walk(checkpoint.hashed_address.take())?; while let Some((hashed_address, account)) = walker.next().transpose()? { - let value = EthAccount::from(account) - .with_storage_root(self.calculate_storage_root(hashed_address)?); - - let mut out = Vec::new(); - Encodable::encode(&value, &mut out); - trie.insert(hashed_address.as_bytes().to_vec(), out)?; + match self.calculate_storage_root( + hashed_address, + checkpoint.storage_key.take(), + checkpoint.storage_root.take(), + )? { + TrieProgress::Complete(root) => { + let value = EthAccount::from(account).with_storage_root(root); + + let mut out = Vec::new(); + Encodable::encode(&value, &mut out); + trie.insert(hashed_address.as_bytes().to_vec(), out)?; + + if self.has_hit_threshold() { + return self.save_account_checkpoint( + ProofCheckpoint::default(), + self.replace_account_root(&mut trie, previous_root)?, + hashed_address, + ) + } + } + TrieProgress::InProgress(checkpoint) => { + return self.save_account_checkpoint( + checkpoint, + self.replace_account_root(&mut trie, previous_root)?, + hashed_address, + ) + } + } } - let root = H256::from_slice(trie.root()?.as_slice()); - Ok(root) + // Reset inner stage progress + self.save_checkpoint(ProofCheckpoint::default())?; + + Ok(TrieProgress::Complete(self.replace_account_root(&mut trie, previous_root)?)) } - /// Calculate the accounts storage root. - pub fn calculate_storage_root(&self, address: H256) -> Result { - let db = Arc::new(DupHashDatabaseMut::new(self.tx, address)?); + fn calculate_storage_root( + &mut self, + address: H256, + next_storage: Option, + previous_root: Option, + ) -> Result { + let mut storage_cursor = self.tx.cursor_dup_read::()?; let hasher = Arc::new(HasherKeccak::new()); + let (mut current_entry, mut trie) = if let Some(entry) = next_storage { + ( + storage_cursor.seek_by_key_subkey(address, entry)?.filter(|e| e.key == entry), + PatriciaTrie::from( + Arc::new(DupHashDatabaseMut::from_root( + self.tx, + address, + previous_root.expect("is some"), + )?), + hasher, + previous_root.expect("is some").as_bytes(), + )?, + ) + } else { + ( + storage_cursor.seek_by_key_subkey(address, H256::zero())?, + PatriciaTrie::new(Arc::new(DupHashDatabaseMut::new(self.tx, address)?), hasher), + ) + }; - let mut trie = PatriciaTrie::new(Arc::clone(&db), Arc::clone(&hasher)); - - let mut storage_cursor = self.tx.cursor_dup_read::()?; - - // Should be able to use walk_dup, but any call to next() causes an assert fail in mdbx.c - // let mut walker = storage_cursor.walk_dup(address, H256::zero())?; - let mut current = storage_cursor.seek_by_key_subkey(address, H256::zero())?; + let previous_root = previous_root.unwrap_or(EMPTY_ROOT); - while let Some(StorageEntry { key: storage_key, value }) = current { + while let Some(StorageEntry { key: storage_key, value }) = current_entry { let out = encode_fixed_size(&value).to_vec(); trie.insert(storage_key.to_vec(), out)?; - current = storage_cursor.next_dup()?.map(|(_, v)| v); - } - - let root = H256::from_slice(trie.root()?.as_slice()); - - // if root is empty remove it from db - if root == EMPTY_ROOT { - self.tx.delete::(address, None)?; + // Should be able to use walk_dup, but any call to next() causes an assert fail in + // mdbx.c + current_entry = storage_cursor.next_dup()?.map(|(_, v)| v); + let threshold = self.has_hit_threshold(); + if let Some(current_entry) = current_entry { + if threshold { + return Ok(TrieProgress::InProgress(ProofCheckpoint { + storage_root: Some(self.replace_storage_root( + trie, + address, + previous_root, + )?), + storage_key: Some(current_entry.key), + ..Default::default() + })) + } + } } - Ok(root) + Ok(TrieProgress::Complete(self.replace_storage_root(trie, address, previous_root)?)) } /// Calculates the root of the state trie by updating an existing trie. pub fn update_root( - &self, - root: H256, + &mut self, + mut previous_root: H256, tid_range: Range, - ) -> Result { - let mut accounts_cursor = self.tx.cursor_read::()?; + ) -> Result { + let mut checkpoint = self.get_checkpoint()?; - let changed_accounts = self.gather_changes(tid_range)?; + if let Some(account_root) = checkpoint.account_root.take() { + previous_root = account_root; + } - let db = Arc::new(HashDatabaseMut::from_root(self.tx, root)?); + let next_acc = checkpoint.hashed_address.take(); + let changed_accounts = self + .gather_changes(tid_range)? + .into_iter() + .skip_while(|(addr, _)| next_acc.is_some() && next_acc.expect("is some") != *addr); - let hasher = Arc::new(HasherKeccak::new()); + let mut trie = PatriciaTrie::from( + Arc::new(HashDatabaseMut::from_root(self.tx, previous_root)?), + Arc::new(HasherKeccak::new()), + previous_root.as_bytes(), + )?; - let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?; + let mut accounts_cursor = self.tx.cursor_read::()?; - for (address, changed_storages) in changed_accounts { - let storage_root = if let Some(account) = trie.get(address.as_slice())? { - trie.remove(address.as_bytes())?; + for (hashed_address, changed_storages) in changed_accounts { + let res = if let Some(account) = trie.get(hashed_address.as_slice())? { + trie.remove(hashed_address.as_bytes())?; let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root; - self.update_storage_root(storage_root, address, changed_storages)? + self.update_storage_root( + checkpoint.storage_root.take().unwrap_or(storage_root), + hashed_address, + changed_storages, + checkpoint.storage_key.take(), + )? } else { - self.calculate_storage_root(address)? + self.calculate_storage_root( + hashed_address, + checkpoint.storage_key.take(), + checkpoint.storage_root.take(), + )? }; - if let Some((_, account)) = accounts_cursor.seek_exact(address)? { + let storage_root = match res { + TrieProgress::Complete(root) => root, + TrieProgress::InProgress(checkpoint) => { + return self.save_account_checkpoint( + checkpoint, + self.replace_account_root(&mut trie, previous_root)?, + hashed_address, + ) + } + }; + + if let Some((_, account)) = accounts_cursor.seek_exact(hashed_address)? { let value = EthAccount::from(account).with_storage_root(storage_root); let mut out = Vec::new(); Encodable::encode(&value, &mut out); - trie.insert(address.as_bytes().to_vec(), out)?; - } - } - let new_root = H256::from_slice(trie.root()?.as_slice()); - if new_root != root { - let mut cursor = self.tx.cursor_write::()?; - if cursor.seek_exact(root)?.is_some() { - cursor.delete_current()?; + trie.insert(hashed_address.as_bytes().to_vec(), out)?; + + if self.has_hit_threshold() { + return self.save_account_checkpoint( + ProofCheckpoint::default(), + self.replace_account_root(&mut trie, previous_root)?, + hashed_address, + ) + } } } - Ok(new_root) + // Reset inner stage progress + self.save_checkpoint(ProofCheckpoint::default())?; + + Ok(TrieProgress::Complete(self.replace_account_root(&mut trie, previous_root)?)) } /// Update the account's storage root - pub fn update_storage_root( - &self, - root: H256, + fn update_storage_root( + &mut self, + previous_root: H256, address: H256, changed_storages: BTreeSet, - ) -> Result { - let db = Arc::new(DupHashDatabaseMut::from_root(self.tx, address, root)?); - - let hasher = Arc::new(HasherKeccak::new()); + next_storage: Option, + ) -> Result { + let mut hashed_storage_cursor = self.tx.cursor_dup_read::()?; + let mut trie = PatriciaTrie::new( + Arc::new(DupHashDatabaseMut::from_root(self.tx, address, previous_root)?), + Arc::new(HasherKeccak::new()), + ); - let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?; - let mut storage_cursor = self.tx.cursor_dup_read::()?; + let changed_storages = changed_storages + .into_iter() + .skip_while(|k| next_storage.is_some() && *k == next_storage.expect("is some")); for key in changed_storages { if let Some(StorageEntry { value, .. }) = - storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key) + hashed_storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key) { let out = encode_fixed_size(&value).to_vec(); trie.insert(key.as_bytes().to_vec(), out)?; + if self.has_hit_threshold() { + return Ok(TrieProgress::InProgress(ProofCheckpoint { + storage_root: Some(self.replace_storage_root( + trie, + address, + previous_root, + )?), + storage_key: Some(key), + ..Default::default() + })) + } } else { trie.remove(key.as_bytes())?; } } - let new_root = H256::from_slice(trie.root()?.as_slice()); - if new_root != root { - let mut cursor = self.tx.cursor_dup_write::()?; - if cursor - .seek_by_key_subkey(address, root)? - .filter(|entry| entry.hash == root) - .is_some() - { - cursor.delete_current()?; - } - } - - // if root is empty remove it from db - if new_root == EMPTY_ROOT { - self.tx.delete::(address, None)?; - } - - Ok(new_root) + Ok(TrieProgress::Complete(self.replace_storage_root(trie, address, previous_root)?)) } fn gather_changes( @@ -537,6 +673,102 @@ where Ok(hashed_changes) } + + fn save_account_checkpoint( + &mut self, + mut checkpoint: ProofCheckpoint, + root: H256, + hashed_address: H256, + ) -> Result { + checkpoint.account_root = Some(root); + checkpoint.hashed_address = Some(hashed_address); + + debug!(target: "sync::stages::merkle::exec", account = ?hashed_address, storage = ?checkpoint.storage_key, "Saving inner trie checkpoint"); + + self.save_checkpoint(checkpoint)?; + + Ok(TrieProgress::InProgress(checkpoint)) + } + + fn has_hit_threshold(&mut self) -> bool { + self.current += 1; + self.current >= self.commit_threshold + } + + /// Saves the trie progress + pub fn save_checkpoint(&mut self, checkpoint: ProofCheckpoint) -> Result<(), TrieError> { + let mut buf = vec![]; + checkpoint.to_compact(&mut buf); + + // It allows unwind (which commits), to reuse this instance. + self.current = 0; + + Ok(self.tx.put::("TrieLoader".into(), buf)?) + } + + /// Gets the trie progress + pub fn get_checkpoint(&self) -> Result { + let buf = + self.tx.get::("TrieLoader".into())?.unwrap_or_default(); + + if buf.is_empty() { + return Ok(ProofCheckpoint::default()) + } + + let (checkpoint, _) = ProofCheckpoint::from_compact(&buf, buf.len()); + + if checkpoint.account_root.is_some() { + debug!(target: "sync::stages::merkle::exec", checkpoint = ?checkpoint, "Continuing inner trie checkpoint"); + } + + Ok(checkpoint) + } + + /// Finds the most recent account trie root and removes the previous one if applicable. + fn replace_account_root( + &self, + trie: &mut PatriciaTrie, HasherKeccak>, + previous_root: H256, + ) -> Result { + let new_root = H256::from_slice(trie.root()?.as_slice()); + + if new_root != previous_root { + let mut cursor = self.tx.cursor_write::()?; + if cursor.seek_exact(previous_root)?.is_some() { + cursor.delete_current()?; + } + } + + Ok(new_root) + } + + /// Finds the most recent storage trie root and removes the previous one if applicable. + fn replace_storage_root( + &self, + mut trie: PatriciaTrie, HasherKeccak>, + address: H256, + previous_root: H256, + ) -> Result { + let new_root = H256::from_slice(trie.root()?.as_slice()); + + if new_root != previous_root { + let mut trie_cursor = self.tx.cursor_dup_write::()?; + + if trie_cursor + .seek_by_key_subkey(address, previous_root)? + .filter(|entry| entry.hash == previous_root) + .is_some() + { + trie_cursor.delete_current()?; + } + } + + if new_root == EMPTY_ROOT { + self.tx.delete::(address, None)?; + } + + Ok(new_root) + } } // Read-only impls @@ -545,13 +777,12 @@ where TX: DbTx<'db> + Send + Sync, { /// Returns a Merkle proof of the given account, plus its storage root hash. - pub fn generate_acount_proof<'itx>( + pub fn generate_acount_proof( &self, - tx: &'tx impl DbTx<'itx>, root: H256, address: H256, ) -> Result<(MerkleProof, H256), TrieError> { - let db = Arc::new(HashDatabase::from_root(tx, root)?); + let db = Arc::new(HashDatabase::from_root(self.tx, root)?); let hasher = Arc::new(HasherKeccak::new()); let trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?; @@ -565,14 +796,13 @@ where } /// Returns a Merkle proof of the given storage keys, starting at the given root hash. - pub fn generate_storage_proofs<'itx>( + pub fn generate_storage_proofs( &self, - tx: &'tx impl DbTx<'itx>, storage_root: H256, address: H256, keys: &[H256], ) -> Result, TrieError> { - let db = Arc::new(DupHashDatabase::from_root(tx, address, storage_root)?); + let db = Arc::new(DupHashDatabase::from_root(self.tx, address, storage_root)?); let hasher = Arc::new(HasherKeccak::new()); let trie = @@ -588,6 +818,7 @@ where #[cfg(test)] mod tests { use crate::Transaction; + use std::ops::DerefMut; use super::*; use assert_matches::assert_matches; @@ -648,7 +879,7 @@ mod tests { let tx = Transaction::new(db.as_ref()).unwrap(); assert_matches!( create_test_loader(&tx).calculate_root(), - Ok(got) if got == EMPTY_ROOT + Ok(got) if got.root().unwrap() == EMPTY_ROOT ); } @@ -664,14 +895,15 @@ mod tests { let expected = H256(sec_trie_root::([(address, encoded_account)]).0); assert_matches!( create_test_loader(&tx).calculate_root(), - Ok(got) if got == expected + Ok(got) if got.root().unwrap() == expected ); } #[test] fn two_accounts_trie() { let db = create_test_rw_db(); - let tx = Transaction::new(db.as_ref()).unwrap(); + let mut tx = Transaction::new(db.as_ref()).unwrap(); + let mut trie = DBTrieLoader::new(tx.deref_mut()); let accounts = [ ( @@ -684,7 +916,7 @@ mod tests { ), ]; for (address, account) in accounts { - tx.put::(keccak256(address), account).unwrap(); + trie.tx.put::(keccak256(address), account).unwrap(); } let encoded_accounts = accounts.iter().map(|(k, v)| { let mut out = Vec::new(); @@ -693,26 +925,28 @@ mod tests { }); let expected = H256(sec_trie_root::(encoded_accounts).0); assert_matches!( - create_test_loader(&tx).calculate_root(), - Ok(got) if got == expected + trie.calculate_root(), + Ok(got) if got.root().unwrap() == expected ); } #[test] fn single_storage_trie() { let db = create_test_rw_db(); - let tx = Transaction::new(db.as_ref()).unwrap(); + let mut tx = Transaction::new(db.as_ref()).unwrap(); + let mut trie = DBTrieLoader::new(tx.deref_mut()); let address = Address::from_str("9fe4abd71ad081f091bd06dd1c16f7e92927561e").unwrap(); let hashed_address = keccak256(address); let storage = Vec::from([(H256::from_low_u64_be(2), U256::from(1))]); for (k, v) in storage.clone() { - tx.put::( - hashed_address, - StorageEntry { key: keccak256(k), value: v }, - ) - .unwrap(); + trie.tx + .put::( + hashed_address, + StorageEntry { key: keccak256(k), value: v }, + ) + .unwrap(); } let encoded_storage = storage.iter().map(|(k, v)| { let out = encode_fixed_size(v).to_vec(); @@ -720,8 +954,8 @@ mod tests { }); let expected = H256(sec_trie_root::(encoded_storage).0); assert_matches!( - create_test_loader(&tx).calculate_storage_root(hashed_address), - Ok(got) if got == expected + trie.calculate_storage_root(hashed_address, None, None), + Ok(got) if got.root().unwrap() == expected ); } @@ -766,7 +1000,7 @@ mod tests { let expected = H256(sec_trie_root::([(address, out)]).0); assert_matches!( create_test_loader(&tx).calculate_root(), - Ok(got) if got == expected + Ok(got) if got.root().unwrap() == expected ); } @@ -781,7 +1015,7 @@ mod tests { assert_matches!( create_test_loader(&tx).calculate_root(), - Ok(got) if got == state_root + Ok(got) if got.root().unwrap() == state_root ); } @@ -863,7 +1097,7 @@ mod tests { let expected = H256(sec_trie_root::(encoded_accounts).0); assert_matches!( create_test_loader(&tx).calculate_root(), - Ok(got) if got == expected + Ok(got) if got.root().unwrap() == expected , "where expected is {expected:?}"); } @@ -882,8 +1116,8 @@ mod tests { load_mainnet_genesis_root(&mut tx); let root = { - let trie = create_test_loader(&tx); - trie.calculate_root().expect("should be able to load trie") + let mut trie = create_test_loader(&tx); + trie.calculate_root().expect("should be able to load trie").root().unwrap() }; tx.commit().unwrap(); @@ -891,9 +1125,8 @@ mod tests { let address = Address::from(hex!("000d836201318ec6899a67540690382780743280")); let trie = create_test_loader(&tx); - let (proof, storage_root) = trie - .generate_acount_proof(&tx.inner().tx().unwrap(), root, keccak256(address)) - .expect("failed to generate proof"); + let (proof, storage_root) = + trie.generate_acount_proof(root, keccak256(address)).expect("failed to generate proof"); // values extracted from geth via rpc: // { @@ -947,16 +1180,15 @@ mod tests { } let root = { - let trie = create_test_loader(&tx); - trie.calculate_root().expect("should be able to load trie") + let mut trie = create_test_loader(&tx); + trie.calculate_root().expect("should be able to load trie").root().unwrap() }; tx.commit().unwrap(); let trie = create_test_loader(&tx); - let (account_proof, storage_root) = trie - .generate_acount_proof(&tx.inner().tx().unwrap(), root, hashed_address) - .expect("failed to generate proof"); + let (account_proof, storage_root) = + trie.generate_acount_proof(root, hashed_address).expect("failed to generate proof"); // values extracted from geth via rpc: let expected_account = hex!("f86fa1205126413e7857595763591580306b3f228f999498c4c5dfa74f633364936e7651b84bf849819b8418b0d164a029ff6f4d518044318d75b118cf439d8d3d7249c8afcba06ba9ecdf8959410571a02ce1a85814ad94a94ed2a1abaf7c57e9b64326622c1b8c21b4ba4d0e7df61392").as_slice(); @@ -980,7 +1212,6 @@ mod tests { let storage_proofs = trie .generate_storage_proofs( - &tx.inner().tx().unwrap(), storage_root, hashed_address, &[keccak256(H256::from_low_u64_be(2)), keccak256(H256::zero())],