Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
/// Internal function used to advance the chain.
///
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
#[tracing::instrument(level = "debug", target = "engine::tree::chain_orchestrator", skip_all)]
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
let this = self.get_mut();

Expand Down
18 changes: 16 additions & 2 deletions crates/engine/tree/src/tree/cached_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_trie::{
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use std::{sync::Arc, time::Duration};
use tracing::trace;
use tracing::{debug_span, instrument, trace};

pub(crate) type Cache<K, V> =
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
Expand Down Expand Up @@ -354,6 +354,7 @@ impl ExecutionCache {
}

/// Invalidates the storage for all addresses in the set
#[instrument(level = "debug", target = "engine::caching", skip_all, fields(accounts = addresses.len()))]
pub(crate) fn invalidate_storages(&self, addresses: HashSet<&Address>) {
// NOTE: this must collect because the invalidate function should not be called while we
// hold an iter for it
Expand Down Expand Up @@ -385,12 +386,25 @@ impl ExecutionCache {
/// ## Error Handling
///
/// Returns an error if the state updates are inconsistent and should be discarded.
#[instrument(level = "debug", target = "engine::caching", skip_all)]
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
let _enter =
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
.entered();
// Insert bytecodes
for (code_hash, bytecode) in &state_updates.contracts {
self.code_cache.insert(*code_hash, Some(Bytecode(bytecode.clone())));
}

drop(_enter);

let _enter = debug_span!(
target: "engine::tree",
"accounts",
accounts = state_updates.state.len(),
storages =
state_updates.state.values().map(|account| account.storage.len()).sum::<usize>()
)
.entered();
let mut invalidated_accounts = HashSet::default();
for (addr, account) in &state_updates.state {
// If the account was not modified, as in not changed and not destroyed, then we have
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl EngineApiMetrics {
for tx in transactions {
let tx = tx?;
let span =
debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
let _enter = span.enter();
trace!(target: "engine::tree", "Executing transaction");
executor.execute_transaction(tx)?;
Expand Down
15 changes: 11 additions & 4 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,12 @@ where
///
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
/// returns an error if an internal error occurred.
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
#[instrument(
level = "debug",
target = "engine::tree",
skip_all,
fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
)]
fn on_new_payload(
&mut self,
payload: T::ExecutionData,
Expand Down Expand Up @@ -577,6 +582,7 @@ where
/// - `Valid`: Payload successfully validated and inserted
/// - `Syncing`: Parent missing, payload buffered for later
/// - Error status: Payload is invalid
#[instrument(level = "debug", target = "engine::tree", skip_all)]
fn try_insert_payload(
&mut self,
payload: T::ExecutionData,
Expand Down Expand Up @@ -970,7 +976,7 @@ where
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
Expand Down Expand Up @@ -1972,7 +1978,7 @@ where
}

/// Attempts to connect any buffered blocks that are connected to the given parent hash.
#[instrument(level = "trace", skip(self), target = "engine::tree")]
#[instrument(level = "debug", target = "engine::tree", skip(self))]
fn try_connect_buffered_blocks(
&mut self,
parent: BlockNumHash,
Expand Down Expand Up @@ -2281,7 +2287,7 @@ where
/// Returns an event with the appropriate action to take, such as:
/// - download more missing blocks
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
fn on_downloaded_block(
&mut self,
block: RecoveredBlock<N::Block>,
Expand Down Expand Up @@ -2387,6 +2393,7 @@ where
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
fn insert_block_or_payload<Input, Err>(
&mut self,
block_id: BlockWithParent,
Expand Down
26 changes: 22 additions & 4 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::{
mpsc::{self, channel, Sender},
Arc,
};
use tracing::{debug, instrument, warn};
use tracing::{debug, debug_span, instrument, warn};

mod configured_sparse_trie;
pub mod executor;
Expand Down Expand Up @@ -167,6 +167,12 @@ where
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
#[allow(clippy::type_complexity)]
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
name = "payload processor",
skip_all
)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand All @@ -187,6 +193,7 @@ where
+ Clone
+ 'static,
{
let span = tracing::Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
// spawn multiproof task, save the trie input
let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input);
Expand Down Expand Up @@ -237,6 +244,7 @@ where

// spawn multi-proof task
self.executor.spawn_blocking(move || {
let _enter = span.entered();
multi_proof_task.run();
});

Expand All @@ -257,6 +265,7 @@ where
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
///
/// Returns a [`PayloadHandle`] to communicate with the task.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
&self,
env: ExecutionEnv<Evm>,
Expand Down Expand Up @@ -353,7 +362,9 @@ where
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
self.executor.spawn_blocking(move || {
let _enter = span.entered();
prewarm_task.run(transactions, to_prewarm_task);
});
}
Expand All @@ -370,7 +381,7 @@ where
///
/// If the given hash is different then what is recently cached, then this will create a new
/// instance.
#[instrument(target = "engine::caching", skip(self))]
#[instrument(level = "debug", target = "engine::caching", skip(self))]
fn cache_for(&self, parent_hash: B256) -> SavedCache {
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
debug!("reusing execution cache");
Expand All @@ -383,6 +394,7 @@ where
}

/// Spawns the [`SparseTrieTask`] for this payload processor.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_sparse_trie_task<BPF>(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
Expand Down Expand Up @@ -421,13 +433,18 @@ where
sparse_state_trie,
);

let span = tracing::Span::current();
self.executor.spawn_blocking(move || {
let _enter = span.entered();

let (result, trie) = task.run();
// Send state root computation result
let _ = state_root_tx.send(result);

// Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
// to the next step, so that time spent clearing doesn't block the step after this one.
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending
// results to the next step, so that time spent clearing doesn't block the step after
// this one.
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
});
}
Expand All @@ -452,6 +469,7 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
/// # Panics
///
/// If payload processing was started without background tasks.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root
.take()
Expand Down
18 changes: 13 additions & 5 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
},
time::{Duration, Instant},
};
use tracing::{debug, error, trace};
use tracing::{debug, error, instrument, trace};

/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
Expand Down Expand Up @@ -718,6 +718,7 @@ impl MultiProofTask {
/// Handles request for proof prefetch.
///
/// Returns a number of proofs that were spawned.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, fields(accounts = targets.len()))]
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
let proof_targets = self.get_prefetch_proof_targets(targets);
self.fetched_proof_targets.extend_ref(&proof_targets);
Expand Down Expand Up @@ -779,7 +780,7 @@ impl MultiProofTask {
let all_proofs_processed =
proofs_processed >= state_update_proofs_requested + prefetch_proofs_requested;
let no_pending = !self.proof_sequencer.has_pending();
debug!(
trace!(
target: "engine::root",
proofs_processed,
state_update_proofs_requested,
Expand Down Expand Up @@ -844,6 +845,7 @@ impl MultiProofTask {
/// Handles state updates.
///
/// Returns a number of proofs that were spawned.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip(self, update), fields(accounts = update.len()))]
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
let hashed_state_update = evm_state_to_hashed_post_state(update);

Expand Down Expand Up @@ -973,6 +975,12 @@ impl MultiProofTask {
/// currently being calculated, or if there are any pending proofs in the proof sequencer
/// left to be revealed by checking the pending tasks.
/// 6. This task exits after all pending proofs are processed.
#[instrument(
level = "debug",
name = "MultiProofTask::run",
target = "engine::tree::payload_processor::multiproof",
skip_all
)]
pub(crate) fn run(mut self) {
// TODO convert those into fields
let mut prefetch_proofs_requested = 0;
Expand Down Expand Up @@ -1008,7 +1016,7 @@ impl MultiProofTask {
let storage_targets =
targets.values().map(|slots| slots.len()).sum::<usize>();
prefetch_proofs_requested += self.on_prefetch_proof(targets);
debug!(
trace!(
target: "engine::root",
account_targets,
storage_targets,
Expand All @@ -1029,7 +1037,7 @@ impl MultiProofTask {

let len = update.len();
state_update_proofs_requested += self.on_state_update(source, update);
debug!(
trace!(
target: "engine::root",
?source,
len,
Expand Down Expand Up @@ -1091,7 +1099,7 @@ impl MultiProofTask {
.proof_calculation_duration_histogram
.record(proof_calculated.elapsed);

debug!(
trace!(
target: "engine::root",
sequence = proof_calculated.sequence_number,
total_proofs = proofs_processed,
Expand Down
Loading
Loading