Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ alloy-sol-types = { version = "1.5.6", default-features = false }

alloy-chains = { version = "0.2.33", default-features = false }
alloy-eip2124 = { version = "0.2.0", default-features = false }
alloy-eip7928 = { version = "0.3.0", default-features = false }
alloy-eip7928 = { version = "0.3.4", default-features = false }
alloy-evm = { version = "0.33.0", default-features = false }
alloy-rlp = { version = "0.3.13", default-features = false, features = ["core-net"] }
alloy-trie = { version = "0.9.4", default-features = false }
Expand Down
26 changes: 13 additions & 13 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::tree::{
CacheWaitDurations, CachedStateMetrics, CachedStateMetricsSource, ExecutionCache,
PayloadExecutionCache, SavedCache, StateProviderBuilder, TreeConfig, WaitForCaches,
};
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal};
use alloy_primitives::B256;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
Expand Down Expand Up @@ -250,7 +250,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
multiproof_provider_factory: F,
config: &TreeConfig,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Expand All @@ -273,13 +272,12 @@ where
halve_workers,
config,
);
let install_state_hook = bal.is_none();
let install_state_hook = env.decoded_bal.is_none();
let prewarm_handle = self.spawn_caching_with(
env,
prewarm_rx,
provider_builder,
Some(state_root_handle.updates_tx().clone()),
bal,
);

PayloadHandle {
Expand All @@ -300,14 +298,13 @@ where
env: ExecutionEnv<Evm>,
transactions: I,
provider_builder: StateProviderBuilder<N, P>,
bal: Option<Arc<BlockAccessList>>,
) -> IteratorPayloadHandle<Evm, I, N>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx) =
self.spawn_tx_iterator(transactions, env.transaction_count);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal);
let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
PayloadHandle {
state_root_handle: None,
install_state_hook: false,
Expand Down Expand Up @@ -465,15 +462,14 @@ where
level = "debug",
target = "engine::tree::payload_processor",
skip_all,
fields(bal=%bal.is_some())
fields(bal=%env.decoded_bal.is_some())
)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
transactions: mpsc::Receiver<(usize, impl ExecutableTxFor<Evm> + Clone + Send + 'static)>,
provider_builder: StateProviderBuilder<N, P>,
to_sparse_trie_task: Option<CrossbeamSender<StateRootMessage>>,
bal: Option<Arc<BlockAccessList>>,
) -> CacheTaskHandle<N::Receipt>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
Expand All @@ -484,7 +480,7 @@ where
let saved_cache = self.disable_state_cache.not().then(|| self.cache_for(env.parent_hash));

let executed_tx_index = Arc::new(AtomicUsize::new(0));

let maybe_decoded_bal = env.decoded_bal.clone();
// configure prewarming
let prewarm_ctx = PrewarmContext {
env,
Expand All @@ -507,15 +503,16 @@ where
prewarm_ctx,
to_sparse_trie_task,
);

{
let to_prewarm_task = to_prewarm_task.clone();
let disable_bal_parallel_execution = self.disable_bal_parallel_execution;
self.executor.spawn_blocking_named("prewarm", move || {
let mode = if skip_prewarm {
PrewarmMode::Skipped
} else if let Some(bal) = bal.filter(|_| !disable_bal_parallel_execution) {
PrewarmMode::BlockAccessList(bal)
} else if let Some(decoded_bal) =
maybe_decoded_bal.filter(|_| !disable_bal_parallel_execution)
{
PrewarmMode::BlockAccessList(decoded_bal)
} else {
PrewarmMode::Transactions(transactions)
};
Expand Down Expand Up @@ -936,6 +933,9 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Withdrawals included in the block.
/// Used to generate prefetch targets for withdrawal addresses.
pub withdrawals: Option<Vec<Withdrawal>>,
/// Optional decoded BAL for the block.
/// Used to validate and optimize execution.
pub decoded_bal: Option<Arc<DecodedBal>>,
}

impl<Evm: ConfigureEvm> ExecutionEnv<Evm>
Expand All @@ -953,6 +953,7 @@ where
transaction_count: 0,
gas_used: 0,
withdrawals: None,
decoded_bal: None,
}
}
}
Expand Down Expand Up @@ -1251,7 +1252,6 @@ mod tests {
StateProviderBuilder::new(provider_factory.clone(), genesis_hash, None),
OverlayStateProviderFactory::new(provider_factory, ChangesetCache::new()),
&TreeConfig::default(),
None, // No BAL for test
);

let mut state_hook = handle.state_hook().expect("state hook is None");
Expand Down
19 changes: 10 additions & 9 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::tree::{
StateProviderBuilder,
};
use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_eip7928::bal::DecodedBal;
use alloy_eips::eip4895::Withdrawal;
use alloy_primitives::{keccak256, StorageKey, B256};
use crossbeam_channel::Sender as CrossbeamSender;
Expand Down Expand Up @@ -48,7 +48,7 @@ pub enum PrewarmMode<Tx> {
/// Prewarm by executing transactions from a stream, each paired with its block index.
Transactions(Receiver<(usize, Tx)>),
/// Prewarm by prefetching slots from a Block Access List.
BlockAccessList(Arc<BlockAccessList>),
BlockAccessList(Arc<DecodedBal>),
/// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the
/// benefit). No workers are spawned.
Skipped,
Expand Down Expand Up @@ -331,9 +331,10 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn run_bal_prewarm(
&self,
bal: Arc<BlockAccessList>,
decoded_bal: Arc<DecodedBal>,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
) {
let bal = decoded_bal.as_bal();
if bal.is_empty() {
if let Some(to_sparse_trie_task) = self.to_sparse_trie_task.as_ref() {
let _ = to_sparse_trie_task.send(StateRootMessage::FinishedStateUpdates);
Expand All @@ -355,8 +356,8 @@ where
let parent_span = Span::current();
let prefetch_parent_span = parent_span.clone();
let stream_parent_span = parent_span;
let prefetch_bal = Arc::clone(&bal);
let stream_bal = Arc::clone(&bal);
let prefetch_bal = Arc::clone(&decoded_bal);
let stream_bal = Arc::clone(&decoded_bal);
let (prefetch_tx, prefetch_rx) = oneshot::channel();
let (stream_tx, stream_rx) = oneshot::channel();

Expand All @@ -367,12 +368,12 @@ where
target: "engine::tree::payload_processor::prewarm",
parent: &prefetch_parent_span,
"bal_prefetch_storage",
bal_accounts = prefetch_bal.len(),
bal_accounts = prefetch_bal.as_bal().len(),
);
let provider_parent_span = branch_span.clone();
let _span = branch_span.entered();

prefetch_bal.par_iter().for_each_init(
prefetch_bal.as_bal().par_iter().for_each_init(
|| {
(
prefetch_ctx.clone(),
Expand Down Expand Up @@ -400,12 +401,12 @@ where
target: "engine::tree::payload_processor::prewarm",
parent: &stream_parent_span,
"bal_hashed_state_stream",
bal_accounts = stream_bal.len(),
bal_accounts = stream_bal.as_bal().len(),
);
let provider_parent_span = branch_span.clone();
let _span = branch_span.entered();

stream_bal.par_iter().for_each_init(
stream_bal.as_bal().par_iter().for_each_init(
|| (ctx.clone(), None::<Box<dyn AccountReader>>, provider_parent_span.clone()),
|(ctx, provider, parent_span), account_changes| {
ctx.send_bal_hashed_state(
Expand Down
42 changes: 24 additions & 18 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ use crate::tree::{
PayloadHandle, StateProviderBuilder, StateProviderDatabase, TreeConfig, WaitForCaches,
};
use alloy_consensus::transaction::{Either, TxHashRef};
use alloy_eip7928::{bal::Bal, BlockAccessList};
use alloy_eip7928::{
bal::{Bal, DecodedBal},
BlockAccessList,
};
use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash};
use alloy_evm::Evm;
use alloy_primitives::{map::B256Set, B256};
Expand Down Expand Up @@ -487,6 +490,12 @@ where
.in_scope(|| self.evm_env_for(&input))
.map_err(NewPayloadError::other)?;

// Extract the decoded BAL, if valid and available.
let decoded_bal = ensure_ok!(input
.try_decoded_access_list()
.map_err(|err| { Box::<dyn std::error::Error + Send + Sync>::from(err) }))
.map(Arc::new);

let env = ExecutionEnv {
evm_env,
hash: input.hash(),
Expand All @@ -495,6 +504,7 @@ where
transaction_count: input.transaction_count(),
gas_used: input.gas_used(),
withdrawals: input.withdrawals().map(|w| w.to_vec()),
decoded_bal,
};

// Plan the strategy used for state root computation.
Expand All @@ -509,14 +519,6 @@ where
// Get an iterator over the transactions in the payload
let txs = self.tx_iterator_for(&input)?;

// Extract the BAL, if valid and available
let block_access_list = ensure_ok!(input
.block_access_list()
.transpose()
// Eventually gets converted to a `InsertBlockErrorKind::Other`
.map_err(Box::<dyn std::error::Error + Send + Sync>::from))
.map(Arc::new);

// Create lazy overlay from ancestors - this doesn't block, allowing execution to start
// before the trie data is ready. The overlay will be computed on first access.
let (lazy_overlay, anchor_hash) = Self::get_parent_lazy_overlay(parent_hash, ctx.state());
Expand All @@ -535,7 +537,6 @@ where
provider_builder,
overlay_factory.clone(),
strategy,
block_access_list,
));

// Create optional cache stats for detailed block logging
Expand Down Expand Up @@ -1439,7 +1440,6 @@ where
provider_builder: StateProviderBuilder<N, P>,
overlay_factory: OverlayStateProviderFactory<P>,
strategy: StateRootStrategy,
block_access_list: Option<Arc<BlockAccessList>>,
) -> Result<
PayloadHandle<
impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
Expand All @@ -1459,7 +1459,6 @@ where
provider_builder,
overlay_factory,
&self.config,
block_access_list,
);

// record prewarming initialization duration
Expand All @@ -1472,12 +1471,8 @@ where
}
StateRootStrategy::Parallel | StateRootStrategy::Synchronous => {
let start = Instant::now();
let handle = self.payload_processor.spawn_cache_exclusive(
env,
txs,
provider_builder,
block_access_list,
);
let handle =
self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);

// Record prewarming initialization duration
self.metrics
Expand Down Expand Up @@ -2110,6 +2105,17 @@ impl<T: PayloadTypes> BlockOrPayload<T> {
}
}

/// Returns the decoded block access list, if present and successfully decoded.
pub fn try_decoded_access_list(&self) -> Result<Option<DecodedBal>, alloy_rlp::Error> {
Comment thread
mattsse marked this conversation as resolved.
match self {
Self::Payload(payload) => payload
.block_access_list()
.map(|block_access_list| DecodedBal::from_rlp_bytes(block_access_list.clone()))
.transpose(),
Self::Block(_) => Ok(None),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for context, next we can add Option Decoded bal to the Block variant, sicne we can also download the over p2p

fyi @0xKarl98

}
}

/// Returns the number of transactions in the payload or block.
pub fn transaction_count(&self) -> usize
where
Expand Down
Loading