diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 5798e245768..a02092973d9 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -43,7 +43,6 @@ use reth_trie_sparse::{ ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie, }; use std::{ - collections::BTreeMap, ops::Not, sync::{ atomic::AtomicBool, @@ -363,6 +362,10 @@ where } } + /// Below this threshold, transactions are converted via rayon's order-preserving `collect()` + /// in a single task, eliminating the out-of-order channel and `BTreeMap` reorder task. + const PARALLEL_REORDER_TX_THRESHOLD: usize = 30; + /// Spawns a task advancing transaction env iterator and streaming updates through a channel. #[expect(clippy::type_complexity)] fn spawn_tx_iterator>( @@ -372,48 +375,72 @@ where mpsc::Receiver, I::Recovered>>, mpsc::Receiver, I::Recovered>, I::Error>>, ) { - let (ooo_tx, ooo_rx) = mpsc::channel(); + let (transactions, convert) = transactions.into(); + let transactions = transactions.into_par_iter(); + let tx_count = transactions.len(); + let (prewarm_tx, prewarm_rx) = mpsc::channel(); let (execute_tx, execute_rx) = mpsc::channel(); - // Spawn a task that `convert`s all transactions in parallel and sends them out-of-order. - rayon::spawn(move || { - let (transactions, convert) = transactions.into(); - transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { - let tx = convert(tx); - let tx = tx.map(|tx| { - let (tx_env, tx) = tx.into_parts(); - WithTxEnv { tx_env, tx: Arc::new(tx) } - }); - // Only send Ok(_) variants to prewarming task. - if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); - } - let _ = ooo_tx.send((idx, tx)); - }); - }); + if tx_count < Self::PARALLEL_REORDER_TX_THRESHOLD { + rayon::spawn(move || { + let results: Vec<_> = transactions + .map(|tx| { + let tx = convert(tx); + tx.map(|tx| { + let (tx_env, tx) = tx.into_parts(); + WithTxEnv { tx_env, tx: Arc::new(tx) } + }) + }) + .collect(); - // Spawn a task that processes out-of-order transactions from the task above and sends them - // to the execution task in order. - self.executor.spawn_blocking(move || { - let mut next_for_execution = 0; - let mut queue = BTreeMap::new(); - while let Ok((idx, tx)) = ooo_rx.recv() { - if next_for_execution == idx { + for tx in results { + if let Ok(ref ok) = tx { + let _ = prewarm_tx.send(ok.clone()); + } let _ = execute_tx.send(tx); - next_for_execution += 1; + } + }); + } else { + let (ooo_tx, ooo_rx) = mpsc::channel(); + + rayon::spawn(move || { + transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { + let tx = convert(tx); + let tx = tx.map(|tx| { + let (tx_env, tx) = tx.into_parts(); + WithTxEnv { tx_env, tx: Arc::new(tx) } + }); + if let Ok(tx) = &tx { + let _ = prewarm_tx.send(tx.clone()); + } + let _ = ooo_tx.send((idx, tx)); + }); + }); - while let Some(entry) = queue.first_entry() && - *entry.key() == next_for_execution - { - let _ = execute_tx.send(entry.remove()); - next_for_execution += 1; + self.executor.spawn_blocking(move || { + let mut next = 0usize; + let mut buf: Vec>> = (0..tx_count).map(|_| None).collect(); + while let Ok((idx, tx)) = ooo_rx.recv() { + if idx == next { + let _ = execute_tx.send(tx); + next += 1; + + while next < buf.len() { + match buf[next].take() { + Some(queued) => { + let _ = execute_tx.send(queued); + next += 1; + } + None => break, + } + } + } else if idx < buf.len() { + buf[idx] = Some(tx); } - } else { - queue.insert(idx, tx); } - } - }); + }); + } (prewarm_rx, execute_rx) }