Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8f5ae02
Refactor insert_* to remove duplicate code
MegaRedHand Feb 8, 2023
f35fbdc
Add benchmark for MerkleStage
MegaRedHand Feb 8, 2023
1f07aac
Add insert_accounts_and_storages fn
MegaRedHand Feb 9, 2023
7d42600
Add random_transition to merkle_stage
MegaRedHand Feb 9, 2023
301eaed
Add accs_testdata
MegaRedHand Feb 9, 2023
62d92d0
Add transition generation helper function
MegaRedHand Feb 10, 2023
dbe7ea7
Add insert_transitions helper function
MegaRedHand Feb 10, 2023
ab90047
Unify *_testdata functions
MegaRedHand Feb 10, 2023
5260e62
Add custom setup for merkle stage
MegaRedHand Feb 10, 2023
5409340
Merge branch 'main' into bench-merkle
MegaRedHand Feb 13, 2023
e134691
Generate valid state roots in testdata
MegaRedHand Feb 13, 2023
b655cd6
Fix compile warnings
MegaRedHand Feb 13, 2023
1d768e0
Tidy up some clones
MegaRedHand Feb 13, 2023
de91a79
Fix: update blocks before insertion
MegaRedHand Feb 14, 2023
07c6315
Fix: add iter()
MegaRedHand Feb 14, 2023
fa58c3c
Merge branch 'main' into bench-merkle
MegaRedHand Feb 16, 2023
6460cc4
Merge branch 'main' into bench-merkle
MegaRedHand Feb 16, 2023
35d47ee
Merge remote-tracking branch 'origin/main' into bench-merkle
joshieDo Feb 20, 2023
8eb7e6c
fix removal on trie
joshieDo Feb 21, 2023
29d9caa
throw error if unwinded calculated root does not match
joshieDo Feb 22, 2023
e6cd629
fix insertion on test_db for accounts and storages
joshieDo Feb 22, 2023
aa608ee
make sure root is committed
joshieDo Feb 22, 2023
b46da0a
delete target root for repeated unwinds
joshieDo Feb 22, 2023
a33659e
fix and speedup trie db methods
joshieDo Feb 22, 2023
121d5b7
set DEFAULT_NUM_BLOCKS for criterion
joshieDo Feb 22, 2023
14feb94
clippy
joshieDo Feb 22, 2023
1a20331
remove assert use
joshieDo Feb 23, 2023
60f1fa8
fix update trie root
joshieDo Feb 23, 2023
f578774
add dump-stage merkle
joshieDo Feb 23, 2023
c20dc58
zip keys and values
joshieDo Feb 23, 2023
a50d7f8
clippy
joshieDo Feb 23, 2023
d25a3df
unwind execution stage too for merkle dump
joshieDo Feb 27, 2023
19f004e
Merge remote-tracking branch 'origin/main' into perf/bench-merkle
joshieDo Feb 27, 2023
1c10b65
fix hashing storage incremental
joshieDo Feb 27, 2023
e4c0a0e
propagate error
joshieDo Feb 27, 2023
472bfeb
clippy
joshieDo Feb 27, 2023
54710c1
use existing consensus error for diff root
joshieDo Feb 28, 2023
1f768e3
add stage id MERKLE_BOTH
joshieDo Feb 28, 2023
426ae37
Merge branch 'main' into perf/bench-merkle
gakonst Mar 1, 2023
283292f
add missing Executor to stage
joshieDo Mar 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions bin/reth/src/dump_stage/merkle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
dump_stage::setup,
};
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables, transaction::DbTx};
use reth_primitives::MAINNET;
use reth_provider::Transaction;
use reth_stages::{
stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage},
DefaultDB, Stage, StageId, UnwindInput,
};
use std::ops::DerefMut;
use tracing::info;

pub(crate) async fn dump_merkle_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
to: u64,
output_db: &PlatformPath<DbPath>,
should_run: bool,
) -> Result<()> {
let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;

output_db.update(|tx| {
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
})??;

let tx = db_tool.db.tx()?;
let from_transition_rev =
tx.get::<tables::BlockTransitionIndex>(from)?.expect("there should be at least one.");
let to_transition_rev =
tx.get::<tables::BlockTransitionIndex>(to)?.expect("there should be at least one.");

output_db.update(|tx| {
tx.import_table_with_range::<tables::AccountChangeSet, _>(
&db_tool.db.tx()?,
Some(from_transition_rev),
to_transition_rev,
)
})??;

unwind_and_copy::<DB>(db_tool, (from, to), tip_block_number, &output_db).await?;

if should_run {
println!(
"\n# Merkle stage does not support dry run, so it will actually be committing changes."
);
run(output_db, to, from).await?;
}

Ok(())
}

/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
range: (u64, u64),
tip_block_number: u64,
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let (from, to) = range;
let mut unwind_tx = Transaction::new(db_tool.db)?;
let unwind = UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None };
let execute_input = reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
};

// Unwind hashes all the way to FROM
StorageHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap();

MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?;

// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
exec_stage.commit_threshold = u64::MAX;
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: to, stage_progress: tip_block_number, bad_block: None },
)
.await?;

// Bring hashes to TO
AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut unwind_tx, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut unwind_tx, execute_input)
.await
.unwrap();

let unwind_inner_tx = unwind_tx.deref_mut();

// TODO optimize we can actually just get the entries we need
output_db.update(|tx| tx.import_dupsort::<tables::StorageChangeSet, _>(unwind_inner_tx))??;

output_db.update(|tx| tx.import_table::<tables::HashedAccount, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_dupsort::<tables::HashedStorage, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(unwind_inner_tx))??;

unwind_tx.drop()?;

Ok(())
}

/// Try to re-execute the stage straightaway
async fn run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");

let mut tx = Transaction::new(&output_db)?;

MerkleStage::Execution {
clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch
}
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;

info!(target: "reth::cli", "Success.");

Ok(())
}
8 changes: 8 additions & 0 deletions bin/reth/src/dump_stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use hashing_account::dump_hashing_account_stage;
mod execution;
use execution::dump_execution_stage;

mod merkle;
use merkle::dump_merkle_stage;

use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
Expand Down Expand Up @@ -45,6 +48,8 @@ pub enum Stages {
StorageHashing(StageCommand),
/// AccountHashing stage.
AccountHashing(StageCommand),
/// Merkle stage.
Merkle(StageCommand),
}

/// Stage command that takes a range
Expand Down Expand Up @@ -94,6 +99,9 @@ impl Command {
Stages::AccountHashing(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_hashing_account_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
Stages::Merkle(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_merkle_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
}

Ok(())
Expand Down
16 changes: 12 additions & 4 deletions crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ readme = "README.md"
reth-codecs = { path = "../storage/codecs" }
reth-primitives = { path = "../primitives" }
reth-rpc-types = { path = "../rpc/rpc-types" }
reth-network-api = { path = "../net/network-api"}
reth-network-api = { path = "../net/network-api" }
revm-primitives = "1.0"
async-trait = "0.1.57"
thiserror = "1.0.37"
Expand All @@ -26,16 +26,24 @@ futures = "0.3"
tokio-stream = "0.1.11"
rand = "0.8.5"
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true }
secp256k1 = { version = "0.24.2", default-features = false, features = [
"alloc",
"recovery",
"rand",
], optional = true }
modular-bitfield = "0.11.2"

[dev-dependencies]
reth-db = { path = "../storage/db", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["sync"] }
arbitrary = { version = "1.1.7", features = ["derive"]}
arbitrary = { version = "1.1.7", features = ["derive"] }
hex-literal = "0.3"
secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] }
secp256k1 = { version = "0.24.2", default-features = false, features = [
"alloc",
"recovery",
"rand",
] }

[features]
bench = []
Expand Down
116 changes: 113 additions & 3 deletions crates/interfaces/src/test_utils/generators.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use rand::{distributions::uniform::SampleRange, thread_rng, Rng};
use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng};
use reth_primitives::{
proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256,
proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry,
Transaction, TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256,
};
use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey};
use std::{collections::BTreeMap, ops::Sub};

// TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the
// relevant crates?
Expand Down Expand Up @@ -165,6 +166,115 @@ pub fn random_block_range(
blocks
}

type Transition = Vec<(Address, Account, Vec<StorageEntry>)>;
type AccountState = (Account, Vec<StorageEntry>);

/// Generate a range of transitions for given blocks and accounts.
/// Assumes all accounts start with an empty storage.
///
/// Returns a Vec of account and storage changes for each transition,
/// along with the final state of all accounts and storages.
pub fn random_transition_range<'a, IBlk, IAcc>(
blocks: IBlk,
accounts: IAcc,
n_changes: std::ops::Range<u64>,
key_range: std::ops::Range<u64>,
) -> (Vec<Transition>, BTreeMap<Address, AccountState>)
where
IBlk: IntoIterator<Item = &'a SealedBlock>,
IAcc: IntoIterator<Item = (Address, (Account, Vec<StorageEntry>))>,
{
let mut rng = rand::thread_rng();
let mut state: BTreeMap<_, _> = accounts
.into_iter()
.map(|(addr, (acc, st))| (addr, (acc, st.into_iter().map(|e| (e.key, e.value)).collect())))
.collect();

let valid_addresses = state.keys().copied().collect();

let num_transitions: usize = blocks.into_iter().map(|block| block.body.len()).sum();
let mut transitions = Vec::with_capacity(num_transitions);

(0..num_transitions).for_each(|i| {
let mut transition = Vec::new();
let (from, to, mut transfer, new_entries) =
random_account_change(&valid_addresses, n_changes.clone(), key_range.clone());

// extract from sending account
let (prev_from, _) = state.get_mut(&from).unwrap();
transition.push((from, *prev_from, Vec::new()));

transfer = transfer.min(prev_from.balance).max(U256::from(1));
prev_from.balance = prev_from.balance.wrapping_sub(transfer);

// deposit in receiving account and update storage
let (prev_to, storage): &mut (Account, BTreeMap<H256, U256>) = state.get_mut(&to).unwrap();

let old_entries = new_entries
.into_iter()
.filter_map(|entry| {
let old = if entry.value != U256::ZERO {
storage.insert(entry.key, entry.value)
} else {
let old = storage.remove(&entry.key);
if matches!(old, Some(U256::ZERO)) {
return None
}
old
};
Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry })
})
.collect();

transition.push((to, *prev_to, old_entries));

prev_to.balance = prev_to.balance.wrapping_add(transfer);

transitions.push(transition);
});

let final_state = state
.into_iter()
.map(|(addr, (acc, storage))| {
(addr, (acc, storage.into_iter().map(|v| v.into()).collect()))
})
.collect();
(transitions, final_state)
}

/// Generate a random account change.
///
/// Returns two addresses, a balance_change, and a Vec of new storage entries.
pub fn random_account_change(
valid_addresses: &Vec<Address>,
n_changes: std::ops::Range<u64>,
key_range: std::ops::Range<u64>,
) -> (Address, Address, U256, Vec<StorageEntry>) {
let mut rng = rand::thread_rng();
let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).cloned();

let addr_from = addresses.next().unwrap_or_else(Address::random);
let addr_to = addresses.next().unwrap_or_else(Address::random);

let balance_change = U256::from(rng.gen::<u64>());

let storage_changes = (0..n_changes.sample_single(&mut rng))
.map(|_| random_storage_entry(key_range.clone()))
.collect();

(addr_from, addr_to, balance_change, storage_changes)
}

/// Generate a random storage change.
pub fn random_storage_entry(key_range: std::ops::Range<u64>) -> StorageEntry {
let mut rng = rand::thread_rng();

let key = H256::from_low_u64_be(key_range.sample_single(&mut rng));
let value = U256::from(rng.gen::<u64>());

StorageEntry { key, value }
}

/// Generate random Externaly Owned Account (EOA account without contract).
pub fn random_eoa_account() -> (Address, Account) {
let nonce: u64 = rand::random();
Expand Down
6 changes: 6 additions & 0 deletions crates/primitives/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ pub struct StorageEntry {
pub value: U256,
}

impl From<(H256, U256)> for StorageEntry {
fn from((key, value): (H256, U256)) -> Self {
StorageEntry { key, value }
}
}

// NOTE: Removing main_codec and manually encode subkey
// and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
Expand Down
Loading