diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 81e29eea3fa..4ecb6fd1656 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -26,11 +26,11 @@ use alloy_consensus::transaction::TxHashRef; use alloy_eip7928::BlockAccessList; use alloy_evm::Database; use alloy_primitives::{keccak256, map::B256Set, B256}; -use crossbeam_channel::Sender as CrossbeamSender; +use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use metrics::{Counter, Gauge, Histogram}; use reth_evm::{execute::ExecutableTxFor, ConfigureEvm, Evm, EvmFor, RecoveredTx, SpecFor}; use reth_metrics::Metrics; -use reth_primitives_traits::{NodePrimitives, SignedTransaction}; +use reth_primitives_traits::NodePrimitives; use reth_provider::{ AccountReader, BlockExecutionOutput, BlockReader, StateProvider, StateProviderFactory, StateReader, @@ -163,8 +163,8 @@ where transaction_count_hint.min(max_concurrency) }; - // Initialize worker handles container - let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone()); + // Spawn workers + let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone()); // Distribute transactions to workers let mut tx_index = 0usize; @@ -179,37 +179,18 @@ where } let indexed_tx = IndexedTransaction { index: tx_index, tx }; - let is_system_tx = indexed_tx.tx.tx().is_system_tx(); - - // System transactions (type > 4) in the first position set critical metadata - // that affects all subsequent transactions (e.g., L1 block info on L2s). - // Broadcast the first system transaction to all workers to ensure they have - // the critical state. This is particularly important for L2s like Optimism - // where the first deposit transaction (type 126) contains essential block metadata. - if tx_index == 0 && is_system_tx { - for handle in &handles { - // Ignore send errors: workers listen to terminate_execution and may - // exit early when signaled. Sending to a disconnected worker is - // possible and harmless and should happen at most once due to - // the terminate_execution check above. - let _ = handle.send(indexed_tx.clone()); - } - } else { - // Round-robin distribution for all other transactions - let worker_idx = tx_index % workers_needed; - // Ignore send errors: workers listen to terminate_execution and may - // exit early when signaled. Sending to a disconnected worker is - // possible and harmless and should happen at most once due to - // the terminate_execution check above. - let _ = handles[worker_idx].send(indexed_tx); - } + + // Send transaction to the workers + // Ignore send errors: workers listen to terminate_execution and may + // exit early when signaled. + let _ = tx_sender.send(indexed_tx); tx_index += 1; } - // drop handle and wait for all tasks to finish and drop theirs + // drop sender and wait for all tasks to finish drop(done_tx); - drop(handles); + drop(tx_sender); while done_rx.recv().is_ok() {} let _ = actions_tx @@ -548,7 +529,7 @@ where Some((evm, metrics, terminate_execution, v2_proofs_enabled)) } - /// Accepts an [`mpsc::Receiver`] of transactions and a handle to prewarm task. Executes + /// Accepts a [`CrossbeamReceiver`] of transactions and a handle to prewarm task. Executes /// transactions and streams [`PrewarmTaskEvent::Outcome`] messages for each transaction. /// /// This function processes transactions sequentially from the receiver and emits outcome events @@ -560,7 +541,7 @@ where #[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)] fn transact_batch( self, - txs: mpsc::Receiver>, + txs: CrossbeamReceiver>, sender: Sender>, done_tx: Sender<()>, ) where @@ -647,35 +628,31 @@ where let _ = done_tx.send(()); } - /// Spawns a worker task for transaction execution and returns its sender channel. + /// Spawns worker tasks that pull transactions from a shared channel. + /// + /// Returns the sender for distributing transactions to workers. fn spawn_workers( self, workers_needed: usize, task_executor: &WorkloadExecutor, actions_tx: Sender>, done_tx: Sender<()>, - ) -> Vec>> + ) -> CrossbeamSender> where Tx: ExecutableTxFor + Send + 'static, { - let mut handles = Vec::with_capacity(workers_needed); - let mut receivers = Vec::with_capacity(workers_needed); - - for _ in 0..workers_needed { - let (tx, rx) = mpsc::channel(); - handles.push(tx); - receivers.push(rx); - } + let (tx_sender, tx_receiver) = crossbeam_channel::unbounded(); - // Spawn a separate task spawning workers in parallel. + // Spawn workers that all pull from the shared receiver let executor = task_executor.clone(); let span = Span::current(); task_executor.spawn_blocking(move || { let _enter = span.entered(); - for (idx, rx) in receivers.into_iter().enumerate() { + for idx in 0..workers_needed { let ctx = self.clone(); let actions_tx = actions_tx.clone(); let done_tx = done_tx.clone(); + let rx = tx_receiver.clone(); let span = debug_span!(target: "engine::tree::payload_processor::prewarm", "prewarm worker", idx); executor.spawn_blocking(move || { let _enter = span.entered(); @@ -684,7 +661,7 @@ where } }); - handles + tx_sender } /// Spawns a worker task for BAL slot prefetching.