Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
00e650e
stage progress table wip
joshieDo Mar 6, 2023
762cdde
add root() to TrieProgress
joshieDo Mar 7, 2023
15233f3
rename to checkpoint
joshieDo Mar 7, 2023
d6f3078
remove custom error
joshieDo Mar 7, 2023
d280573
rename fields
joshieDo Mar 7, 2023
cd1f393
update Cargo.lock
joshieDo Mar 7, 2023
f1301c8
use PatriciaTrie::from
joshieDo Mar 7, 2023
5182fc0
fix merkle dump_stage
joshieDo Mar 7, 2023
34a31e8
Merge remote-tracking branch 'origin/main' into fix/merkle-oom-v2
joshieDo Mar 8, 2023
09e835f
add table checkpoint to AccountHashing instead
joshieDo Mar 8, 2023
a06b528
dry run hashing acc dump stage
joshieDo Mar 8, 2023
c0b9127
add table checkpoint to StorageHashing instead
joshieDo Mar 8, 2023
bba73f2
Merge branch 'main' into fix/merkle-oom-v2
joshieDo Mar 9, 2023
c136f4b
Merge remote-tracking branch 'origin/main' into fix/merkle-oom-v2
joshieDo Mar 10, 2023
114d085
Merge branch 'main' into fix/merkle-oom-v2
gakonst Mar 14, 2023
72b39fd
fix(trie): use mutable database where needed
gakonst Mar 14, 2023
5a59a20
fix(merkle): work around borrow checker
gakonst Mar 14, 2023
1b7524c
chore: fix lint
gakonst Mar 14, 2023
07225e2
chore: fix clippy nightly lint
gakonst Mar 14, 2023
dc368b5
Merge branch 'fix/merkle-oom-v2' into joshie/hashing-checkpoint
joshieDo Mar 14, 2023
4bf5d0b
Merge remote-tracking branch 'origin/main' into joshie/hashing-checkp…
joshieDo Mar 14, 2023
6fcedb9
remove repeated struct
joshieDo Mar 14, 2023
eafe665
add a better explanation for invalid checkpoints
joshieDo Mar 14, 2023
360f7e1
continue from checkpoint on test
joshieDo Mar 14, 2023
1b79f0f
Merge branch 'main' into joshie/hashing-checkpoint
gakonst Mar 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 17 additions & 12 deletions bin/reth/src/dump_stage/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;

if should_run {
println!("\n# AccountHashing stage does not support dry run, so it will actually be committing changes.");
run(output_db, to, from).await?;
dry_run(output_db, to, from).await?;
}

Ok(())
Expand Down Expand Up @@ -69,7 +68,7 @@ async fn unwind_and_copy<DB: Database>(
}

/// Try to re-execute the stage straightaway
async fn run(
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
Expand All @@ -82,15 +81,21 @@ async fn run(
..Default::default()
};

exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?
.done;
}

tx.drop()?;

info!(target: "reth::cli", "Success.");

Expand Down
46 changes: 41 additions & 5 deletions bin/reth/src/dump_stage/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables};
use reth_provider::Transaction;
use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput};
use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use tracing::info;

pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
Expand All @@ -16,14 +17,14 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
output_db: &PlatformPath<DbPath>,
should_run: bool,
) -> Result<()> {
if should_run {
eyre::bail!("StorageHashing stage does not support dry run.")
}

let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;

unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;

if should_run {
dry_run(output_db, to, from).await?;
}

Ok(())
}

Expand Down Expand Up @@ -53,3 +54,38 @@ async fn unwind_and_copy<DB: Database>(

Ok(())
}

/// Try to re-execute the stage straightaway
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");

let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};

let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?
.done;
}

tx.drop()?;

info!(target: "reth::cli", "Success.");

Ok(())
}
42 changes: 42 additions & 0 deletions crates/primitives/src/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::{Address, H256};
use reth_codecs::{main_codec, Compact};

/// Saves the progress of MerkleStage
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct ProofCheckpoint {
/// The next hashed account to insert into the trie.
pub hashed_address: Option<H256>,
/// The next storage entry to insert into the trie.
pub storage_key: Option<H256>,
/// Current intermediate root for `AccountsTrie`.
pub account_root: Option<H256>,
/// Current intermediate storage root from an account.
pub storage_root: Option<H256>,
}

/// Saves the progress of AccountHashing
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct AccountHashingCheckpoint {
/// The next account to start hashing from
pub address: Option<Address>,
/// Start transition id
pub from: u64,
/// Last transition id
pub to: u64,
}

/// Saves the progress of StorageHashing
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct StorageHashingCheckpoint {
/// The next account to start hashing from
pub address: Option<Address>,
/// The next storage slot to start hashing from
pub storage: Option<H256>,
/// Start transition id
pub from: u64,
/// Last transition id
pub to: u64,
}
3 changes: 2 additions & 1 deletion crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod bits;
mod block;
pub mod bloom;
mod chain;
mod checkpoints;
pub mod constants;
pub mod contract;
mod error;
Expand All @@ -34,7 +35,6 @@ mod withdrawal;

/// Helper function for calculating Merkle proofs and hashes
pub mod proofs;
pub use proofs::ProofCheckpoint;

pub use account::{Account, Bytecode};
pub use bits::H512;
Expand All @@ -46,6 +46,7 @@ pub use chain::{
AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI,
MAINNET, SEPOLIA,
};
pub use checkpoints::{AccountHashingCheckpoint, ProofCheckpoint, StorageHashingCheckpoint};
pub use constants::{
EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS,
};
Expand Down
19 changes: 1 addition & 18 deletions crates/primitives/src/proofs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use crate::{
keccak256, Address, Bytes, GenesisAccount, Header, Log, Receipt, TransactionSigned, Withdrawal,
H256,
Expand All @@ -8,8 +6,8 @@ use bytes::BytesMut;
use hash_db::Hasher;
use hex_literal::hex;
use plain_hasher::PlainHasher;
use reth_codecs::{main_codec, Compact};
use reth_rlp::Encodable;
use std::collections::HashMap;
use triehash::{ordered_trie_root, sec_trie_root};

/// Keccak-256 hash of the RLP of an empty list, KEC("\xc0").
Expand All @@ -35,23 +33,8 @@ impl Hasher for KeccakHasher {
}
}

/// Saves the progress of MerkleStage
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct ProofCheckpoint {
/// The next hashed account to insert into the trie.
pub hashed_address: Option<H256>,
/// The next storage entry to insert into the trie.
pub storage_key: Option<H256>,
/// Current intermediate root for `AccountsTrie`.
pub account_root: Option<H256>,
/// Current intermediate storage root from an account.
pub storage_root: Option<H256>,
}

/// Calculate a transaction root.
///
/// Iterates over the given transactions and the merkle merkle trie root of
/// `(rlp(index), encoded(tx))` pairs.
pub fn calculate_transaction_root<'a>(
transactions: impl IntoIterator<Item = &'a TransactionSigned>,
Expand Down
1 change: 1 addition & 0 deletions crates/stages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ normal = [
reth-primitives = { path = "../primitives" }
reth-interfaces = { path = "../interfaces" }
reth-db = { path = "../storage/db" }
reth-codecs = { path = "../storage/codecs" }
reth-provider = { path = "../storage/provider" }
reth-metrics-derive = { path = "../metrics/metrics-derive" }

Expand Down
125 changes: 89 additions & 36 deletions crates/stages/src/stages/hashing_account.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_codecs::Compact;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::keccak256;
use reth_primitives::{keccak256, AccountHashingCheckpoint};
use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug, ops::Range};
use tracing::*;
Expand All @@ -30,6 +31,43 @@ impl Default for AccountHashingStage {
}
}

impl AccountHashingStage {
/// Saves the hashing progress
pub fn save_checkpoint<DB: Database>(
&mut self,
tx: &Transaction<'_, DB>,
checkpoint: AccountHashingCheckpoint,
) -> Result<(), StageError> {
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Saving inner account hashing checkpoint");

let mut buf = vec![];
checkpoint.to_compact(&mut buf);

Ok(tx.put::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into(), buf)?)
}

/// Gets the hashing progress
pub fn get_checkpoint<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<AccountHashingCheckpoint, StageError> {
let buf =
tx.get::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into())?.unwrap_or_default();

if buf.is_empty() {
return Ok(AccountHashingCheckpoint::default())
}

let (checkpoint, _) = AccountHashingCheckpoint::from_compact(&buf, buf.len());

if checkpoint.address.is_some() {
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Continuing inner account hashing checkpoint");
}

Ok(checkpoint)
}
}

#[derive(Clone, Debug)]
/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
/// in unit tests or benchmarks to generate an initial database state for running the
Expand Down Expand Up @@ -137,43 +175,58 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset.
if to_transition - from_transition > self.clean_threshold || stage_progress == 0 {
// clear table, load all accounts and hash it
tx.clear::<tables::HashedAccount>()?;
tx.commit()?;

let mut first_key = None;
loop {
let next_key = {
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?;

let hashed_batch = accounts
.walk(first_key)?
.take(self.commit_threshold as usize)
.map(|res| res.map(|(address, account)| (keccak256(address), account)))
.collect::<Result<BTreeMap<_, _>, _>>()?;

let mut hashed_account_cursor = tx.cursor_write::<tables::HashedAccount>()?;

// iterate and put presorted hashed accounts
if first_key.is_none() {
hashed_batch
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?;
} else {
hashed_batch
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?;
}
let mut checkpoint = self.get_checkpoint(tx)?;

if checkpoint.address.is_none() ||
// Checkpoint is no longer valid if the range of transitions changed.
// An already hashed account may have been changed with the new range, and therefore should be hashed again.
checkpoint.to != to_transition ||
checkpoint.from != from_transition
{
// clear table, load all accounts and hash it
tx.clear::<tables::HashedAccount>()?;

checkpoint = AccountHashingCheckpoint::default();
self.save_checkpoint(tx, checkpoint)?;
}

// next key of iterator
accounts.next()?
};
tx.commit()?;
if let Some((next_key, _)) = next_key {
first_key = Some(next_key);
continue
let start_address = checkpoint.address.take();
let next_address = {
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?;

let hashed_batch = accounts
.walk(start_address)?
.take(self.commit_threshold as usize)
.map(|res| res.map(|(address, account)| (keccak256(address), account)))
.collect::<Result<BTreeMap<_, _>, _>>()?;

let mut hashed_account_cursor = tx.cursor_write::<tables::HashedAccount>()?;

// iterate and put presorted hashed accounts
if start_address.is_none() {
hashed_batch
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?;
} else {
hashed_batch
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?;
}
break

// next key of iterator
accounts.next()?
};

if let Some((next_address, _)) = &next_address {
checkpoint.address = Some(*next_address);
checkpoint.from = from_transition;
checkpoint.to = to_transition;
}

self.save_checkpoint(tx, checkpoint)?;

if next_address.is_some() {
return Ok(ExecOutput { stage_progress, done: false })
}
} else {
// Aggregate all transition changesets and and make list of account that have been
Expand Down
Loading