Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 22 additions & 45 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use alloy_consensus::transaction::TxHashRef;
use alloy_eip7928::BlockAccessList;
use alloy_evm::Database;
use alloy_primitives::{keccak256, map::B256Set, B256};
use crossbeam_channel::Sender as CrossbeamSender;
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use metrics::{Counter, Gauge, Histogram};
use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor};
use reth_metrics::Metrics;
use reth_primitives_traits::{NodePrimitives, SignedTransaction};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory,
StateReader,
Expand Down Expand Up @@ -163,8 +163,8 @@ where
transaction_count_hint.min(max_concurrency)
};

// Initialize worker handles container
let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());
// Spawn workers
let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone());

// Distribute transactions to workers
let mut tx_index = 0usize;
Expand All @@ -179,37 +179,18 @@ where
}

let indexed_tx = IndexedTransaction { index: tx_index, tx };
let is_system_tx = indexed_tx.tx.tx().is_system_tx();

// System transactions (type > 4) in the first position set critical metadata
// that affects all subsequent transactions (e.g., L1 block info on L2s).
// Broadcast the first system transaction to all workers to ensure they have
// the critical state. This is particularly important for L2s like Optimism
// where the first deposit transaction (type 126) contains essential block metadata.
if tx_index == 0 && is_system_tx {
for handle in &handles {
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handle.send(indexed_tx.clone());
}
} else {
// Round-robin distribution for all other transactions
let worker_idx = tx_index % workers_needed;
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled. Sending to a disconnected worker is
// possible and harmless and should happen at most once due to
// the terminate_execution check above.
let _ = handles[worker_idx].send(indexed_tx);
}

// Send transaction to the workers
// Ignore send errors: workers listen to terminate_execution and may
// exit early when signaled.
let _ = tx_sender.send(indexed_tx);

tx_index += 1;
}

// drop handle and wait for all tasks to finish and drop theirs
// drop sender and wait for all tasks to finish
drop(done_tx);
drop(handles);
drop(tx_sender);
while done_rx.recv().is_ok() {}

let _ = actions_tx
Expand Down Expand Up @@ -548,7 +529,7 @@ where
Some((evm, metrics, terminate_execution, v2_proofs_enabled))
}

/// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes
/// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes
/// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction.
///
/// This function processes transactions sequentially from the receiver and emits outcome events
Expand All @@ -560,7 +541,7 @@ where
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
fn transact_batch<Tx>(
self,
txs: mpsc::Receiver<IndexedTransaction<Tx>>,
txs: CrossbeamReceiver<IndexedTransaction<Tx>>,
sender: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) where
Expand Down Expand Up @@ -647,35 +628,31 @@ where
let _ = done_tx.send(());
}

/// Spawns a worker task for transaction execution and returns its sender channel.
/// Spawns worker tasks that pull transactions from a shared channel.
///
/// Returns the sender for distributing transactions to workers.
fn spawn_workers<Tx>(
self,
workers_needed: usize,
task_executor: &WorkloadExecutor,
actions_tx: Sender<PrewarmTaskEvent<N::Receipt>>,
done_tx: Sender<()>,
) -> Vec<mpsc::Sender<IndexedTransaction<Tx>>>
) -> CrossbeamSender<IndexedTransaction<Tx>>
where
Tx: ExecutableTxFor<Evm> + Send + 'static,
{
let mut handles = Vec::with_capacity(workers_needed);
let mut receivers = Vec::with_capacity(workers_needed);

for _ in 0..workers_needed {
let (tx, rx) = mpsc::channel();
handles.push(tx);
receivers.push(rx);
}
let (tx_sender, tx_receiver) = crossbeam_channel::unbounded();

// Spawn a separate task spawning workers in parallel.
// Spawn workers that all pull from the shared receiver
let executor = task_executor.clone();
let span = Span::current();
task_executor.spawn_blocking(move || {
let _enter = span.entered();
for (idx, rx) in receivers.into_iter().enumerate() {
for idx in 0..workers_needed {
let ctx = self.clone();
let actions_tx = actions_tx.clone();
let done_tx = done_tx.clone();
let rx = tx_receiver.clone();
let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx);
executor.spawn_blocking(move || {
let _enter = span.entered();
Expand All @@ -684,7 +661,7 @@ where
}
});

handles
tx_sender
}

/// Spawns a worker task for BAL slot prefetching.
Expand Down
Loading