diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 24af9148731..f3ed205b30c 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -367,7 +367,7 @@ where &self, transactions: I, ) -> ( - mpsc::Receiver, I::Recovered>>, + mpsc::Receiver<(usize, WithTxEnv, I::Recovered>)>, mpsc::Receiver, I::Recovered>, I::Error>>, usize, ) { @@ -389,7 +389,7 @@ where }); // Only send Ok(_) variants to prewarming task. if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); + let _ = prewarm_tx.send((idx, tx.clone())); } let _ = ooo_tx.send((idx, tx)); }); @@ -425,7 +425,10 @@ where fn spawn_caching_with

( &self, env: ExecutionEnv, - mut transactions: mpsc::Receiver + Clone + Send + 'static>, + mut transactions: mpsc::Receiver<( + usize, + impl ExecutableTxFor + Clone + Send + 'static, + )>, transaction_count_hint: usize, provider_builder: StateProviderBuilder, to_multi_proof: Option>, diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index e68342112a3..a2427f7fed8 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -52,7 +52,7 @@ use tracing::{debug, debug_span, instrument, trace, warn, Span}; /// Determines the prewarming mode: transaction-based or BAL-based. pub(super) enum PrewarmMode { /// Prewarm by executing transactions from a stream. - Transactions(Receiver), + Transactions(Receiver<(usize, Tx)>), /// Prewarm by prefetching slots from a Block Access List. BlockAccessList(Arc), } @@ -152,7 +152,7 @@ where /// subsequent transactions in the block. fn spawn_all( &self, - pending: mpsc::Receiver, + pending: mpsc::Receiver<(usize, Tx)>, actions_tx: Sender>, ) where Tx: ExecutableTxFor + Clone + Send + 'static, @@ -181,8 +181,8 @@ where let handles = ctx.clone().spawn_workers(workers_needed, &executor, actions_tx.clone(), done_tx.clone()); // Distribute transactions to workers - let mut tx_index = 0usize; - while let Ok(tx) = pending.recv() { + let mut total_tx_executed = 0usize; + while let Ok((tx_index, tx)) = pending.recv() { // Stop distributing if termination was requested if ctx.terminate_execution.load(Ordering::Relaxed) { trace!( @@ -218,7 +218,7 @@ where let _ = handles[worker_idx].send(indexed_tx); } - tx_index += 1; + total_tx_executed += 1; } // drop handle and wait for all tasks to finish and drop theirs @@ -227,7 +227,7 @@ where while done_rx.recv().is_ok() {} let _ = actions_tx - .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: tx_index }); + .send(PrewarmTaskEvent::FinishedTxExecution { executed_transactions: total_tx_executed }); }); }