Skip to content
9 changes: 6 additions & 3 deletions bin/reth/src/chain/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use reth_staged_sync::{
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
DefaultDB,
};
use std::sync::Arc;
use tracing::{debug, info};
Expand Down Expand Up @@ -156,9 +157,11 @@ impl ImportCommand {
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: config.stages.execution.commit_threshold,
.set({
let mut stage: ExecutionStage<'_, DefaultDB<'_>> =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This DefaultDB thing is the only thing I don't like. It's necessary because when you instantiate the stage it still doesn't know the type of the database we'll use. It'd be nice if we can avoid it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to avoid this if we get a SubState type that expires part of the cache (in a correct way) since we would be able to instantiate that once as well. For now this has to do.

ExecutionStage::from(self.chain.clone());
stage.commit_threshold = config.stages.execution.commit_threshold;
stage
}),
)
.with_max_block(0)
Expand Down
7 changes: 4 additions & 3 deletions bin/reth/src/dump_stage/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use eyre::Result;
use reth_db::{
cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx,
};
use reth_primitives::MAINNET;
use reth_provider::Transaction;
use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput};
use reth_stages::{stages::ExecutionStage, DefaultDB, Stage, StageId, UnwindInput};
use std::ops::DerefMut;
use tracing::info;

Expand Down Expand Up @@ -96,7 +97,7 @@ async fn unwind_and_copy<DB: Database>(
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let mut unwind_tx = Transaction::new(db_tool.db)?;
let mut exec_stage = ExecutionStage::default();
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());

exec_stage
.unwind(
Expand Down Expand Up @@ -125,7 +126,7 @@ async fn dry_run(
info!(target: "reth::cli", "Executing stage. [dry-run]");

let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = ExecutionStage::default();
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());

exec_stage
.execute(
Expand Down
9 changes: 6 additions & 3 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use reth_staged_sync::{
use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage, FINISH},
DefaultDB,
};
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
Expand Down Expand Up @@ -448,9 +449,11 @@ impl Command {
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
})
.set(ExecutionStage {
chain_spec: self.chain.clone(),
commit_threshold: stage_conf.execution.commit_threshold,
.set({
let mut stage: ExecutionStage<'_, DefaultDB<'_>> =
ExecutionStage::from(self.chain.clone());
stage.commit_threshold = stage_conf.execution.commit_threshold;
stage
}),
)
.build();
Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_staged_sync::{
};
use reth_stages::{
stages::{BodyStage, ExecutionStage, SenderRecoveryStage},
ExecInput, Stage, StageId, UnwindInput,
DefaultDB, ExecInput, Stage, StageId, UnwindInput,
};
use std::{net::SocketAddr, sync::Arc};
use tracing::*;
Expand Down Expand Up @@ -171,8 +171,8 @@ impl Command {
stage.execute(&mut tx, input).await?;
}
StageEnum::Execution => {
let mut stage =
ExecutionStage { chain_spec: self.chain.clone(), commit_threshold: num_blocks };
let mut stage = ExecutionStage::<DefaultDB<'_>>::from(self.chain.clone());
stage.commit_threshold = num_blocks;
if !self.skip_unwind {
stage.unwind(&mut tx, unwind).await?;
}
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/test_eth_chain/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use reth_primitives::{
};
use reth_provider::Transaction;
use reth_rlp::Decodable;
use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId};
use reth_stages::{stages::ExecutionStage, DefaultDB, ExecInput, Stage, StageId};
use std::{
collections::HashMap,
ffi::OsStr,
Expand Down Expand Up @@ -193,7 +193,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<TestOutcome> {

// Initialize the execution stage
// Hardcode the chain_id to Ethereum 1.
let mut stage = ExecutionStage::new(chain_spec, 1000);
let mut stage = ExecutionStage::<DefaultDB<'_>>::from(chain_spec);

// Call execution stage
let input = ExecInput {
Expand Down
155 changes: 86 additions & 69 deletions crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,47 @@ use revm::{
primitives::{Account as RevmAccount, AccountInfo, Bytecode, ResultAndState},
EVM,
};
use std::collections::{BTreeMap, HashMap};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};

/// Main block executor
pub struct Executor<'a, DB>
where
DB: StateProvider,
{
chain_spec: &'a ChainSpec,
/// The configured chain-spec
pub chain_spec: Arc<ChainSpec>,
evm: EVM<&'a mut SubState<DB>>,
stack: InspectorStack,
}

impl<'a, DB> From<ChainSpec> for Executor<'a, DB>
where
DB: StateProvider,
{
/// Instantiates a new executor from the chainspec. Must call
/// `with_db` to set the database before executing.
fn from(chain_spec: ChainSpec) -> Self {
let evm = EVM::new();
Executor {
chain_spec: Arc::new(chain_spec),
evm,
stack: InspectorStack::new(InspectorStackConfig::default()),
}
}
}

impl<'a, DB> Executor<'a, DB>
where
DB: StateProvider,
{
/// Creates a new executor from the given chain spec and database.
pub fn new(chain_spec: &'a ChainSpec, db: &'a mut SubState<DB>) -> Self {
pub fn new(chain_spec: Arc<ChainSpec>, db: &'a mut SubState<DB>) -> Self {
let mut evm = EVM::new();
evm.database(db);

Executor { chain_spec, evm, stack: InspectorStack::new(InspectorStackConfig::default()) }
}

Expand All @@ -50,10 +71,21 @@ where
self
}

fn db(&mut self) -> &mut SubState<DB> {
/// Gives a reference to the database
pub fn db(&mut self) -> &mut SubState<DB> {
self.evm.db().expect("db to not be moved")
}

/// Overrides the database
pub fn with_db<OtherDB: StateProvider>(
&self,
db: &'a mut SubState<OtherDB>,
) -> Executor<'a, OtherDB> {
let mut evm = EVM::new();
evm.database(db);
Executor { chain_spec: self.chain_spec.clone(), evm, stack: self.stack.clone() }
}

fn recover_senders(
&self,
body: &[TransactionSigned],
Expand All @@ -75,7 +107,7 @@ where
fill_cfg_and_block_env(
&mut self.evm.env.cfg,
&mut self.evm.env.block,
self.chain_spec,
&self.chain_spec,
header,
total_difficulty,
);
Expand Down Expand Up @@ -341,6 +373,30 @@ where
}
}

/// Execute and verify block
pub fn execute_and_verify_receipt(
&mut self,
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
) -> Result<ExecutionResult, Error> {
let execution_result = self.execute(block, total_difficulty, senders)?;

let receipts_iter =
execution_result.tx_changesets.iter().map(|changeset| &changeset.receipt);

if self.chain_spec.fork(Hardfork::Byzantium).active_at_block(block.header.number) {
verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts_iter)?;
}

// TODO Before Byzantium, receipts contained state root that would mean that expensive
// operation as hashing that is needed for state root got calculated in every
// transaction This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658

Ok(execution_result)
}

/// Runs a single transaction in the configured environment and proceeds
/// to return the result and state diff (without applying it).
///
Expand Down Expand Up @@ -464,30 +520,6 @@ where
}
}

/// Execute and verify block
pub fn execute_and_verify_receipt<DB: StateProvider>(
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
chain_spec: &ChainSpec,
db: &mut SubState<DB>,
) -> Result<ExecutionResult, Error> {
let execution_result = execute(block, total_difficulty, senders, chain_spec, db)?;

let receipts_iter = execution_result.tx_changesets.iter().map(|changeset| &changeset.receipt);

if chain_spec.fork(Hardfork::Byzantium).active_at_block(block.header.number) {
verify_receipt(block.header.receipts_root, block.header.logs_bloom, receipts_iter)?;
}

// TODO Before Byzantium, receipts contained state root that would mean that expensive operation
// as hashing that is needed for state root got calculated in every transaction
// This was replaced with is_success flag.
// See more about EIP here: https://eips.ethereum.org/EIPS/eip-658

Ok(execution_result)
}

/// Verify receipts
pub fn verify_receipt<'a>(
expected_receipts_root: H256,
Expand All @@ -511,22 +543,6 @@ pub fn verify_receipt<'a>(
Ok(())
}

/// Verify block. Execute all transaction and compare results.
/// Returns ChangeSet on transaction granularity.
/// NOTE: If block reward is still active (Before Paris/Merge) we would return
/// additional TransactionStatechangeset for account that receives the reward.
pub fn execute<DB: StateProvider>(
block: &Block,
total_difficulty: U256,
senders: Option<Vec<Address>>,
chain_spec: &ChainSpec,
db: &mut SubState<DB>,
) -> Result<ExecutionResult, Error> {
let mut executor = Executor::new(chain_spec, db)
.with_stack(InspectorStack::new(InspectorStackConfig::default()));
executor.execute(block, total_difficulty, senders)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -640,13 +656,13 @@ mod tests {
);

// spec at berlin fork
let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build();
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build());

let mut db = SubState::new(State::new(db));

// execute chain and verify receipts
let out =
execute_and_verify_receipt(&block, U256::ZERO, None, &chain_spec, &mut db).unwrap();
let mut executor = Executor::new(chain_spec, &mut db);
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();

assert_eq!(out.tx_changesets.len(), 1, "Should executed one transaction");

Expand Down Expand Up @@ -765,21 +781,23 @@ mod tests {
beneficiary_balance += i;
}

let chain_spec = ChainSpecBuilder::from(&*MAINNET)
.homestead_activated()
.with_fork(Hardfork::Dao, ForkCondition::Block(1))
.build();
let chain_spec = Arc::new(
ChainSpecBuilder::from(&*MAINNET)
.homestead_activated()
.with_fork(Hardfork::Dao, ForkCondition::Block(1))
.build(),
);

let mut db = SubState::new(State::new(db));
// execute chain and verify receipts
let out = execute_and_verify_receipt(
&Block { header, body: vec![], ommers: vec![], withdrawals: None },
U256::ZERO,
None,
&chain_spec,
&mut db,
)
.unwrap();
let mut executor = Executor::new(chain_spec, &mut db);
let out = executor
.execute_and_verify_receipt(
&Block { header, body: vec![], ommers: vec![], withdrawals: None },
U256::ZERO,
None,
)
.unwrap();
assert_eq!(out.tx_changesets.len(), 0, "No tx");

// Check if cache is set
Expand Down Expand Up @@ -858,13 +876,13 @@ mod tests {
);

// spec at berlin fork
let chain_spec = ChainSpecBuilder::mainnet().berlin_activated().build();
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build());

let mut db = SubState::new(State::new(db));

// execute chain and verify receipts
let out =
execute_and_verify_receipt(&block, U256::ZERO, None, &chain_spec, &mut db).unwrap();
let mut executor = Executor::new(chain_spec, &mut db);
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();

assert_eq!(out.tx_changesets.len(), 1, "Should executed one transaction");

Expand Down Expand Up @@ -907,17 +925,17 @@ mod tests {
Address::from_str("c94f5374fce5edbc8e2a8697c15331677e6ebf0b").unwrap();

// spec at shanghai fork
let chain_spec = ChainSpecBuilder::mainnet().shanghai_activated().build();
let chain_spec = Arc::new(ChainSpecBuilder::mainnet().shanghai_activated().build());

let mut db = SubState::new(State::new(StateProviderTest::default()));

// execute chain and verify receipts
let out =
execute_and_verify_receipt(&block, U256::ZERO, None, &chain_spec, &mut db).unwrap();
let mut executor = Executor::new(chain_spec, &mut db);
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();
assert_eq!(out.tx_changesets.len(), 0, "No tx");

let withdrawal_sum = withdrawals.iter().fold(U256::ZERO, |sum, w| sum + w.amount_wei());
let beneficiary_account = db.accounts.get(&withdrawal_beneficiary).unwrap();
let beneficiary_account = executor.db().accounts.get(&withdrawal_beneficiary).unwrap();
assert_eq!(beneficiary_account.info.balance, withdrawal_sum);
assert_eq!(beneficiary_account.info.nonce, 0);
assert_eq!(beneficiary_account.account_state, AccountState::StorageCleared);
Expand All @@ -931,8 +949,7 @@ mod tests {
);

// Execute same block again
let out =
execute_and_verify_receipt(&block, U256::ZERO, None, &chain_spec, &mut db).unwrap();
let out = executor.execute_and_verify_receipt(&block, U256::ZERO, None).unwrap();
assert_eq!(out.tx_changesets.len(), 0, "No tx");

assert_eq!(out.block_changesets.len(), 1);
Expand Down
Loading