diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 64ce3c4cf09..d93f0b8f4d8 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -366,7 +366,7 @@ where transactions: I, transaction_count: usize, ) -> ( - mpsc::Receiver, I::Recovered>>, + mpsc::Receiver<(usize, WithTxEnv, I::Recovered>)>, mpsc::Receiver, I::Recovered>, I::Error>>, ) { let (prewarm_tx, prewarm_rx) = mpsc::sync_channel(transaction_count); @@ -384,14 +384,14 @@ where ); self.executor.spawn_blocking(move || { let (transactions, convert) = transactions.into_parts(); - for tx in transactions { + for (idx, tx) in transactions.into_iter().enumerate() { let tx = convert.convert(tx); let tx = tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); WithTxEnv { tx_env, tx: Arc::new(tx) } }); if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); + let _ = prewarm_tx.send((idx, tx.clone())); } let _ = execute_tx.send(tx); } @@ -403,13 +403,14 @@ where let (transactions, convert) = transactions.into_parts(); transactions .into_par_iter() - .map(|tx| { + .enumerate() + .map(|(idx, tx)| { let tx = convert.convert(tx); tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); let tx = WithTxEnv { tx_env, tx: Arc::new(tx) }; - // Send to prewarming out of order — order doesn't matter there. - let _ = prewarm_tx.send(tx.clone()); + // Send to prewarming out of order with the original index. + let _ = prewarm_tx.send((idx, tx.clone())); tx }) }) @@ -432,7 +433,7 @@ where fn spawn_caching_with

( &self, env: ExecutionEnv, - transactions: mpsc::Receiver + Clone + Send + 'static>, + transactions: mpsc::Receiver<(usize, impl ExecutableTxFor + Clone + Send + 'static)>, provider_builder: StateProviderBuilder, to_multi_proof: Option>, bal: Option>, diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 8881ef97426..a61120374ef 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -52,8 +52,8 @@ use tracing::{debug, debug_span, instrument, trace, warn, Span}; /// Determines the prewarming mode: transaction-based, BAL-based, or skipped. #[derive(Debug)] pub enum PrewarmMode { - /// Prewarm by executing transactions from a stream. - Transactions(Receiver), + /// Prewarm by executing transactions from a stream, each paired with its block index. + Transactions(Receiver<(usize, Tx)>), /// Prewarm by prefetching slots from a Block Access List. BlockAccessList(Arc), /// Transaction prewarming is skipped (e.g. small blocks where the overhead exceeds the @@ -136,7 +136,7 @@ where /// subsequent transactions in the block. fn spawn_all( &self, - pending: mpsc::Receiver, + pending: mpsc::Receiver<(usize, Tx)>, actions_tx: Sender>, to_multi_proof: Option>, ) where @@ -164,8 +164,8 @@ where let tx_sender = ctx.clone().spawn_workers(workers_needed, &executor, to_multi_proof.clone(), done_tx.clone()); // Distribute transactions to workers - let mut tx_index = 0usize; - while let Ok(tx) = pending.recv() { + let mut tx_count = 0usize; + while let Ok((tx_index, tx)) = pending.recv() { // Stop distributing if termination was requested if ctx.terminate_execution.load(Ordering::Relaxed) { trace!( @@ -182,7 +182,7 @@ where // exit early when signaled. let _ = tx_sender.send(indexed_tx); - tx_index += 1; + tx_count += 1; } // Send withdrawal prefetch targets after all transactions have been distributed @@ -201,7 +201,7 @@ where while done_rx.recv().is_ok() {} let _ = actions_tx - .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index }); + .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_count }); }); }