Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 19 additions & 16 deletions bin/reth/src/dump_stage/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ pub(crate) async fn dump_merkle_stage<DB: Database>(
unwind_and_copy::<DB>(db_tool, (from, to), tip_block_number, &output_db).await?;

if should_run {
println!(
"\n# Merkle stage does not support dry run, so it will actually be committing changes."
);
run(output_db, to, from).await?;
dry_run(output_db, to, from).await?;
}

Ok(())
Expand Down Expand Up @@ -113,26 +110,32 @@ async fn unwind_and_copy<DB: Database>(
}

/// Try to re-execute the stage straightaway
async fn run(
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");

let mut tx = Transaction::new(&output_db)?;

MerkleStage::Execution {
clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch
let mut exec_output = false;
while !exec_output {
exec_output = MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from
* scratch */
}
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?
.done;
}
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;

tx.drop()?;

info!(target: "reth::cli", "Success.");

Expand Down
1 change: 1 addition & 0 deletions crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod withdrawal;

/// Helper function for calculating Merkle proofs and hashes
pub mod proofs;
pub use proofs::ProofCheckpoint;

pub use account::{Account, Bytecode};
pub use bits::H512;
Expand Down
15 changes: 15 additions & 0 deletions crates/primitives/src/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bytes::BytesMut;
use hash_db::Hasher;
use hex_literal::hex;
use plain_hasher::PlainHasher;
use reth_codecs::{main_codec, Compact};
use reth_rlp::Encodable;
use triehash::{ordered_trie_root, sec_trie_root};

Expand All @@ -34,6 +35,20 @@ impl Hasher for KeccakHasher {
}
}

/// Saves the progress of MerkleStage
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct ProofCheckpoint {
/// The next hashed account to insert into the trie.
pub hashed_address: Option<H256>,
/// The next storage entry to insert into the trie.
pub storage_key: Option<H256>,
/// Current intermediate root for `AccountsTrie`.
pub account_root: Option<H256>,
/// Current intermediate storage root from an account.
pub storage_root: Option<H256>,
}

/// Calculate a transaction root.
///
/// Iterates over the given transactions and the merkle merkle trie root of
Expand Down
6 changes: 4 additions & 2 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
tx.insert_accounts_and_storages(start_state.clone()).unwrap();

// make first block after genesis have valid state root
let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap();
let root =
DBTrieLoader::default().calculate_root(&tx.inner()).and_then(|e| e.root()).unwrap();
let second_block = blocks.get_mut(1).unwrap();
let cloned_second = second_block.clone();
let mut updated_header = cloned_second.header.unseal();
Expand All @@ -144,7 +145,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// make last block have valid state root
let root = {
let mut tx_mut = tx.inner();
let root = DBTrieLoader::default().calculate_root(&tx_mut).unwrap();
let root =
DBTrieLoader::default().calculate_root(&tx_mut).and_then(|e| e.root()).unwrap();
tx_mut.commit().unwrap();
root
};
Expand Down
79 changes: 57 additions & 22 deletions crates/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::{database::Database, tables, transaction::DbTx};
use reth_interfaces::consensus;
use reth_provider::{trie::DBTrieLoader, Transaction};
use reth_provider::{
trie::{DBTrieLoader, TrieProgress},
Transaction,
};
use std::{fmt::Debug, ops::DerefMut};
use tracing::*;

Expand Down Expand Up @@ -105,23 +108,32 @@ impl<DB: Database> Stage<DB> for MerkleStage {

let trie_root = if from_transition == to_transition {
block_root
} else if to_transition - from_transition > threshold || stage_progress == 0 {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie");
// if there are more blocks than threshold it is faster to rebuild the trie
DBTrieLoader::new(tx.deref_mut())
.calculate_root()
.map_err(|e| StageError::Fatal(Box::new(e)))?
} else {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie");
// Iterate over changeset (similar to Hashing stages) and take new values
let current_root = tx.get_header(stage_progress)?.state_root;
DBTrieLoader::new(tx.deref_mut())
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
let res = if to_transition - from_transition > threshold || stage_progress == 0 {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie");
// if there are more blocks than threshold it is faster to rebuild the trie
let mut loader = DBTrieLoader::new(tx.deref_mut());
loader.calculate_root().map_err(|e| StageError::Fatal(Box::new(e)))?
Comment on lines +114 to +116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this new method of instantiation of the DBTrieLoader since #1515

} else {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie");
// Iterate over changeset (similar to Hashing stages) and take new values
let current_root = tx.get_header(stage_progress)?.state_root;
let mut loader = DBTrieLoader::new(tx.deref_mut());
loader
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
};

match res {
TrieProgress::Complete(root) => root,
TrieProgress::InProgress(_) => {
return Ok(ExecOutput { stage_progress, done: false })
}
}
};

if block_root != trie_root {
warn!(target: "sync::stages::merkle::exec", ?previous_stage_progress, got = ?block_root, expected = ?trie_root, "Block's root state failed verification");
warn!(target: "sync::stages::merkle::exec", ?previous_stage_progress, got = ?trie_root, expected = ?block_root, "Block's root state failed verification");
return Err(StageError::Validation {
block: previous_stage_progress,
error: consensus::ConsensusError::BodyStateRootDiff {
Expand Down Expand Up @@ -156,13 +168,28 @@ impl<DB: Database> Stage<DB> for MerkleStage {
}

let current_root = tx.get_header(input.stage_progress)?.state_root;

let from_transition = tx.get_block_transition(input.unwind_to)?;
let to_transition = tx.get_block_transition(input.stage_progress)?;

let block_root = DBTrieLoader::new(tx.deref_mut())
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?;
let mut loader = DBTrieLoader::new(tx.deref_mut());
let block_root = loop {
match loader
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
{
TrieProgress::Complete(root) => break root,
TrieProgress::InProgress(_) => {
// Save the loader's progress & drop it to allow committing to the database,
// otherwise we're hitting the borrow checker
let progress = loader.current;
let _ = loader;
tx.commit()?;
// Reinstantiate the loader from where it was left off.
loader = DBTrieLoader::new(tx.deref_mut());
loader.current = progress;
}
}
};

if block_root != target_root {
let unwind_to = input.unwind_to;
Expand Down Expand Up @@ -447,7 +474,10 @@ mod tests {

impl MerkleTestRunner {
fn state_root(&self) -> Result<H256, TestRunnerError> {
Ok(create_trie_loader(&self.tx.inner()).calculate_root().unwrap())
Ok(create_trie_loader(&self.tx.inner())
.calculate_root()
.and_then(|e| e.root())
.unwrap())
}

pub(crate) fn generate_initial_trie(
Expand All @@ -459,8 +489,10 @@ mod tests {
)?;

let mut tx = self.tx.inner();
let root =
create_trie_loader(&tx).calculate_root().expect("couldn't create initial trie");
let root = create_trie_loader(&tx)
.calculate_root()
.and_then(|e| e.root())
.expect("couldn't create initial trie");

tx.commit()?;

Expand All @@ -471,7 +503,10 @@ mod tests {
if previous_stage_progress != 0 {
let block_root =
self.tx.inner().get_header(previous_stage_progress).unwrap().state_root;
let root = create_trie_loader(&self.tx.inner()).calculate_root().unwrap();
let root = create_trie_loader(&self.tx().inner())
.calculate_root()
.and_then(|e| e.root())
.unwrap();
assert_eq!(block_root, root);
}
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion crates/storage/db/src/tables/codecs/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ impl_compression_for_compact!(
StoredBlockBody,
StoredBlockOmmers,
StoredBlockWithdrawals,
Bytecode
Bytecode,
ProofCheckpoint
);
impl_compression_for_compact!(AccountBeforeTx, TransactionSigned);
impl_compression_for_compact!(CompactU256);
Expand Down
8 changes: 7 additions & 1 deletion crates/storage/db/src/tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub enum TableType {
}

/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 26] = [
pub const TABLES: [(TableType, &str); 27] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
Expand All @@ -59,6 +59,7 @@ pub const TABLES: [(TableType, &str); 26] = [
(TableType::DupSort, StoragesTrie::const_name()),
(TableType::Table, TxSenders::const_name()),
(TableType::Table, SyncStage::const_name()),
(TableType::Table, SyncStageProgress::const_name()),
];

#[macro_export]
Expand Down Expand Up @@ -293,6 +294,11 @@ table!(
( SyncStage ) StageId | BlockNumber
);

table!(
/// Stores arbitrary data to keep track of a stage first-sync progress.
( SyncStageProgress ) StageId | Vec<u8>
);

///
/// Alias Types

Expand Down
5 changes: 3 additions & 2 deletions crates/storage/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-revm-primitives = { path = "../../revm/revm-primitives" }
reth-db = { path = "../db" }
reth-tracing = { path = "../../tracing" }
reth-rlp = { path = "../../rlp" }
reth-codecs = { path = "../codecs" }
reth-tracing = {path = "../../tracing"}
reth-rlp = {path = "../../rlp"}

revm-primitives = "1.0.0"

Expand Down
4 changes: 2 additions & 2 deletions crates/storage/provider/src/providers/state/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for LatestStateProviderRef<'a, 'b, TX>
.state_root;

let (account_proof, storage_root) = loader
.generate_acount_proof(self.db, root, hashed_address)
.generate_acount_proof(root, hashed_address)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.db is no longer needed because we pass it inside the loader

.map_err(|_| ProviderError::StateTree)?;
let account_proof = account_proof.into_iter().map(Bytes::from).collect();

Expand All @@ -86,7 +86,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for LatestStateProviderRef<'a, 'b, TX>
} else {
let hashed_keys: Vec<H256> = keys.iter().map(keccak256).collect();
loader
.generate_storage_proofs(self.db, storage_root, hashed_address, &hashed_keys)
.generate_storage_proofs(storage_root, hashed_address, &hashed_keys)
.map_err(|_| ProviderError::StateTree)?
.into_iter()
.map(|v| v.into_iter().map(Bytes::from).collect())
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/provider/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ where
// merkle tree
{
let current_root = self.get_header(parent_block_number)?.state_root;
let loader = DBTrieLoader::new(self.deref_mut());
let root = loader.update_root(current_root, from..to)?;
let mut loader = DBTrieLoader::new(self.deref_mut());
let root = loader.update_root(current_root, from..to).and_then(|e| e.root())?;
if root != block.state_root {
return Err(TransactionError::StateTrieRootMismatch {
got: root,
Expand Down
Loading