Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
/// Internal function used to advance the chain.
///
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(name = "ChainOrchestrator::poll", skip(self, cx))]
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
fn poll_next_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent<T::Event>> {
let this = self.get_mut();

Expand Down
18 changes: 2 additions & 16 deletions crates/engine/tree/src/tree/cached_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_trie::{
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use std::{sync::Arc, time::Duration};
use tracing::{debug_span, instrument, trace};
use tracing::trace;

pub(crate) type Cache<K, V> =
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
Expand Down Expand Up @@ -354,7 +354,6 @@ impl ExecutionCache {
}

/// Invalidates the storage for all addresses in the set
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(accounts = addresses.len()))]
pub(crate) fn invalidate_storages(&self, addresses: HashSet<&Address>) {
// NOTE: this must collect because the invalidate function should not be called while we
// hold an iter for it
Expand Down Expand Up @@ -386,25 +385,12 @@ impl ExecutionCache {
/// ## Error Handling
///
/// Returns an error if the state updates are inconsistent and should be discarded.
#[instrument(level = "debug", target = "engine::tree", skip_all)]
pub(crate) fn insert_state(&self, state_updates: &BundleState) -> Result<(), ()> {
let _enter =
debug_span!(target: "engine::tree", "contracts", len = state_updates.contracts.len())
.entered();
// Insert bytecodes
for (code_hash, bytecode) in &state_updates.contracts {
self.code_cache.insert(*code_hash, Some(Bytecode(bytecode.clone())));
}
drop(_enter);

let _enter = debug_span!(
target: "engine::tree",
"accounts",
accounts = state_updates.state.len(),
storages =
state_updates.state.values().map(|account| account.storage.len()).sum::<usize>()
)
.entered();

let mut invalidated_accounts = HashSet::default();
for (addr, account) in &state_updates.state {
// If the account was not modified, as in not changed and not destroyed, then we have
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl EngineApiMetrics {
for tx in transactions {
let tx = tx?;
let span =
debug_span!(target: "engine::tree", "execute tx", tx_hash=?tx.tx().tx_hash());
debug_span!(target: "engine::tree", "execute_tx", tx_hash=?tx.tx().tx_hash());
let _enter = span.enter();
trace!(target: "engine::tree", "Executing transaction");
executor.execute_transaction(tx)?;
Expand Down
15 changes: 4 additions & 11 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,7 @@ where
///
/// This returns a [`PayloadStatus`] that represents the outcome of a processed new payload and
/// returns an error if an internal error occurred.
#[instrument(
level = "debug",
target = "engine::tree",
skip_all,
fields(block_hash = %payload.block_hash(), block_num = %payload.block_number()),
)]
#[instrument(level = "trace", skip_all, fields(block_hash = %payload.block_hash(), block_num = %payload.block_number(),), target = "engine::tree")]
fn on_new_payload(
&mut self,
payload: T::ExecutionData,
Expand Down Expand Up @@ -582,7 +577,6 @@ where
/// - `Valid`: Payload successfully validated and inserted
/// - `Syncing`: Parent missing, payload buffered for later
/// - Error status: Payload is invalid
#[instrument(level = "debug", target = "engine::tree", skip_all)]
fn try_insert_payload(
&mut self,
payload: T::ExecutionData,
Expand Down Expand Up @@ -976,7 +970,7 @@ where
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash))]
#[instrument(level = "trace", skip_all, fields(head = % state.head_block_hash, safe = % state.safe_block_hash,finalized = % state.finalized_block_hash), target = "engine::tree")]
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
Expand Down Expand Up @@ -1978,7 +1972,7 @@ where
}

/// Attempts to connect any buffered blocks that are connected to the given parent hash.
#[instrument(level = "debug", target = "engine::tree", skip(self))]
#[instrument(level = "trace", skip(self), target = "engine::tree")]
fn try_connect_buffered_blocks(
&mut self,
parent: BlockNumHash,
Expand Down Expand Up @@ -2287,7 +2281,7 @@ where
/// Returns an event with the appropriate action to take, such as:
/// - download more missing blocks
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_hash = %block.hash(), block_num = %block.number()))]
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
fn on_downloaded_block(
&mut self,
block: RecoveredBlock<N::Block>,
Expand Down Expand Up @@ -2393,7 +2387,6 @@ where
/// Returns `InsertPayloadOk::Inserted(BlockStatus::Valid)` on successful execution,
/// `InsertPayloadOk::AlreadySeen` if the block already exists, or
/// `InsertPayloadOk::Inserted(BlockStatus::Disconnected)` if parent state is missing.
#[instrument(level = "debug", target = "engine::tree", skip_all, fields(block_id))]
fn insert_block_or_payload<Input, Err>(
&mut self,
block_id: BlockWithParent,
Expand Down
26 changes: 4 additions & 22 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::{
mpsc::{self, channel, Sender},
Arc,
};
use tracing::{debug, debug_span, instrument, warn};
use tracing::{debug, instrument, warn};

mod configured_sparse_trie;
pub mod executor;
Expand Down Expand Up @@ -167,12 +167,6 @@ where
/// This returns a handle to await the final state root and to interact with the tasks (e.g.
/// canceling)
#[allow(clippy::type_complexity)]
#[instrument(
level = "debug",
target = "engine::tree::payload_processor",
name = "payload processor",
skip_all
)]
pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
&mut self,
env: ExecutionEnv<Evm>,
Expand Down Expand Up @@ -242,9 +236,7 @@ where
);

// spawn multi-proof task
let span = tracing::Span::current();
self.executor.spawn_blocking(move || {
let _enter = span.entered();
multi_proof_task.run();
});

Expand All @@ -265,7 +257,6 @@ where
/// Spawns a task that exclusively handles cache prewarming for transaction execution.
///
/// Returns a [`PayloadHandle`] to communicate with the task.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
&self,
env: ExecutionEnv<Evm>,
Expand Down Expand Up @@ -362,9 +353,7 @@ where
// spawn pre-warm task
{
let to_prewarm_task = to_prewarm_task.clone();
let span = debug_span!(target: "engine::tree::payload_processor", "prewarm task");
self.executor.spawn_blocking(move || {
let _enter = span.entered();
prewarm_task.run(transactions, to_prewarm_task);
});
}
Expand All @@ -381,7 +370,7 @@ where
///
/// If the given hash is different then what is recently cached, then this will create a new
/// instance.
#[instrument(level = "debug", target = "engine::caching", skip(self))]
#[instrument(target = "engine::caching", skip(self))]
fn cache_for(&self, parent_hash: B256) -> SavedCache {
if let Some(cache) = self.execution_cache.get_cache_for(parent_hash) {
debug!("reusing execution cache");
Expand All @@ -394,7 +383,6 @@ where
}

/// Spawns the [`SparseTrieTask`] for this payload processor.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
fn spawn_sparse_trie_task<BPF>(
&self,
sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
Expand Down Expand Up @@ -433,18 +421,13 @@ where
sparse_state_trie,
);

let span = tracing::Span::current();
self.executor.spawn_blocking(move || {
let _enter = span.entered();

let (result, trie) = task.run();
// Send state root computation result
let _ = state_root_tx.send(result);

// Clear the SparseStateTrie and replace it back into the mutex _after_ sending
// results to the next step, so that time spent clearing doesn't block the step after
// this one.
let _enter = debug_span!(target: "engine::tree::payload_processor", "clear").entered();
// Clear the SparseStateTrie and replace it back into the mutex _after_ sending results
// to the next step, so that time spent clearing doesn't block the step after this one.
cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
});
}
Expand All @@ -469,7 +452,6 @@ impl<Tx, Err> PayloadHandle<Tx, Err> {
/// # Panics
///
/// If payload processing was started without background tasks.
#[instrument(level = "debug", target = "engine::tree::payload_processor", skip_all)]
pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
self.state_root
.take()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
},
time::{Duration, Instant},
};
use tracing::{debug, error, instrument, trace};
use tracing::{debug, error, trace};

/// A trie update that can be applied to sparse trie alongside the proofs for touched parts of the
/// state.
Expand Down Expand Up @@ -718,7 +718,6 @@ impl MultiProofTask {
/// Handles request for proof prefetch.
///
/// Returns a number of proofs that were spawned.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all, fields(accounts = targets.len()))]
fn on_prefetch_proof(&mut self, targets: MultiProofTargets) -> u64 {
let proof_targets = self.get_prefetch_proof_targets(targets);
self.fetched_proof_targets.extend_ref(&proof_targets);
Expand Down Expand Up @@ -845,7 +844,6 @@ impl MultiProofTask {
/// Handles state updates.
///
/// Returns a number of proofs that were spawned.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip(self, update), fields(accounts = update.len()))]
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
let hashed_state_update = evm_state_to_hashed_post_state(update);

Expand Down Expand Up @@ -975,7 +973,6 @@ impl MultiProofTask {
/// currently being calculated, or if there are any pending proofs in the proof sequencer
/// left to be revealed by checking the pending tasks.
/// 6. This task exits after all pending proofs are processed.
#[instrument(level = "debug", target = "engine::tree::payload_processor::multiproof", skip_all)]
pub(crate) fn run(mut self) {
// TODO convert those into fields
let mut prefetch_proofs_requested = 0;
Expand Down
40 changes: 5 additions & 35 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use std::{
},
time::Instant,
};
use tracing::{debug, debug_span, instrument, trace, warn};
use tracing::{debug, trace, warn};

/// A wrapper for transactions that includes their index in the block.
#[derive(Clone)]
Expand Down Expand Up @@ -139,11 +139,8 @@ where
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let transaction_count_hint = self.transaction_count_hint;
let span = tracing::Span::current();

self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();

let (done_tx, done_rx) = mpsc::channel();
let mut executing = 0usize;

Expand All @@ -160,8 +157,8 @@ where
};

// Only spawn initial workers as needed
for i in 0..workers_needed {
handles.push(ctx.spawn_worker(i, &executor, actions_tx.clone(), done_tx.clone()));
for _ in 0..workers_needed {
handles.push(ctx.spawn_worker(&executor, actions_tx.clone(), done_tx.clone()));
}

let mut tx_index = 0usize;
Expand Down Expand Up @@ -251,7 +248,6 @@ where
/// the new, warmed cache to be inserted.
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn save_cache(self, state: BundleState) {
let start = Instant::now();

Expand Down Expand Up @@ -288,12 +284,6 @@ where
///
/// This will execute the transactions until all transactions have been processed or the task
/// was cancelled.
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::prewarm",
name = "prewarm",
skip_all
)]
pub(super) fn run(
self,
pending: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
Expand Down Expand Up @@ -374,7 +364,6 @@ where
{
/// Splits this context into an evm, an evm config, metrics, and the atomic bool for terminating
/// execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, PrewarmMetrics, Arc<AtomicBool>)> {
let Self {
env,
Expand All @@ -391,7 +380,7 @@ where
Ok(provider) => provider,
Err(err) => {
trace!(
target: "engine::tree::payload_processor::prewarm",
target: "engine::tree",
%err,
"Failed to build state provider in prewarm thread"
);
Expand Down Expand Up @@ -440,7 +429,6 @@ where
///
/// Note: There are no ordering guarantees; this does not reflect the state produced by
/// sequential execution.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
Expand All @@ -451,15 +439,7 @@ where
{
let Some((mut evm, metrics, terminate_execution)) = self.evm_for_ctx() else { return };

while let Ok(IndexedTransaction { index, tx }) = {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", "recv tx")
.entered();
txs.recv()
} {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm tx", index, tx_hash=%tx.tx().tx_hash())
.entered();

while let Ok(IndexedTransaction { index, tx }) = txs.recv() {
// If the task was cancelled, stop execution, send an empty result to notify the task,
// and exit.
if terminate_execution.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -487,18 +467,12 @@ where
};
metrics.execution_duration.record(start.elapsed());

drop(_enter);

// Only send outcome for transactions after the first txn
// as the main execution will be just as fast
if index > 0 {
let _enter =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm outcome", index, tx_hash=%tx.tx().tx_hash())
.entered();
let (targets, storage_targets) = multiproof_targets_from_state(res.state);
metrics.prefetch_storage_targets.record(storage_targets as f64);
let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
drop(_enter);
}

metrics.total_runtime.record(start.elapsed());
Expand All @@ -511,7 +485,6 @@ where
/// Spawns a worker task for transaction execution and returns its sender channel.
fn spawn_worker<Tx>(
&self,
idx: usize,
executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent>,
done_tx: Sender<()>,
Expand All @@ -521,11 +494,8 @@ where
{
let (tx, rx) = mpsc::channel();
let ctx = self.clone();
let span =
debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);

executor.spawn_blocking(move || {
let _enter = span.entered();
ctx.transact_batch(rx, actions_tx, done_tx);
});

Expand Down
Loading
Loading