From 775789bf13aaaa82c14982a3569cabf96dc80953 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 11 Feb 2026 16:30:19 +0000 Subject: [PATCH 1/3] perf: add collect-based fast path for small block tx iterator For blocks with < 128 transactions, use rayon's order-preserving collect() instead of the out-of-order channel + BTreeMap reorder pipeline. This eliminates a spawn_blocking task and BTreeMap allocations for the common small block case. For larger blocks, replace BTreeMap reorder with pre-allocated Vec> buffer for O(1) insert and lookup. Amp-Thread-ID: https://ampcode.com/threads/T-019c4d83-74c0-739b-84aa-6437db8d213a --- .../tree/src/tree/payload_processor/mod.rs | 99 ++++++++++++------- 1 file changed, 64 insertions(+), 35 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 5798e245768..5a7acd7d419 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -43,7 +43,7 @@ use reth_trie_sparse::{ ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie, }; use std::{ - collections::BTreeMap, + ops::Not, sync::{ atomic::AtomicBool, @@ -363,6 +363,10 @@ where } } + /// Below this threshold, transactions are converted via rayon's order-preserving `collect()` + /// in a single task, eliminating the out-of-order channel and BTreeMap reorder task. + const PARALLEL_REORDER_TX_THRESHOLD: usize = 128; + /// Spawns a task advancing transaction env iterator and streaming updates through a channel. #[expect(clippy::type_complexity)] fn spawn_tx_iterator>( @@ -372,48 +376,73 @@ where mpsc::Receiver, I::Recovered>>, mpsc::Receiver, I::Recovered>, I::Error>>, ) { - let (ooo_tx, ooo_rx) = mpsc::channel(); + let (transactions, convert) = transactions.into(); + let transactions = transactions.into_par_iter(); + let tx_count = transactions.len(); + let (prewarm_tx, prewarm_rx) = mpsc::channel(); let (execute_tx, execute_rx) = mpsc::channel(); - // Spawn a task that `convert`s all transactions in parallel and sends them out-of-order. - rayon::spawn(move || { - let (transactions, convert) = transactions.into(); - transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { - let tx = convert(tx); - let tx = tx.map(|tx| { - let (tx_env, tx) = tx.into_parts(); - WithTxEnv { tx_env, tx: Arc::new(tx) } - }); - // Only send Ok(_) variants to prewarming task. - if let Ok(tx) = &tx { - let _ = prewarm_tx.send(tx.clone()); - } - let _ = ooo_tx.send((idx, tx)); - }); - }); + if tx_count < Self::PARALLEL_REORDER_TX_THRESHOLD { + rayon::spawn(move || { + let results: Vec<_> = transactions + .map(|tx| { + let tx = convert(tx); + tx.map(|tx| { + let (tx_env, tx) = tx.into_parts(); + WithTxEnv { tx_env, tx: Arc::new(tx) } + }) + }) + .collect(); - // Spawn a task that processes out-of-order transactions from the task above and sends them - // to the execution task in order. - self.executor.spawn_blocking(move || { - let mut next_for_execution = 0; - let mut queue = BTreeMap::new(); - while let Ok((idx, tx)) = ooo_rx.recv() { - if next_for_execution == idx { + for tx in results { + if let Ok(ref ok) = tx { + let _ = prewarm_tx.send(ok.clone()); + } let _ = execute_tx.send(tx); - next_for_execution += 1; + } + }); + } else { + let (ooo_tx, ooo_rx) = mpsc::channel(); + + rayon::spawn(move || { + transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { + let tx = convert(tx); + let tx = tx.map(|tx| { + let (tx_env, tx) = tx.into_parts(); + WithTxEnv { tx_env, tx: Arc::new(tx) } + }); + if let Ok(tx) = &tx { + let _ = prewarm_tx.send(tx.clone()); + } + let _ = ooo_tx.send((idx, tx)); + }); + }); - while let Some(entry) = queue.first_entry() && - *entry.key() == next_for_execution - { - let _ = execute_tx.send(entry.remove()); - next_for_execution += 1; + self.executor.spawn_blocking(move || { + let mut next = 0usize; + let mut buf: Vec>> = + (0..tx_count).map(|_| None).collect(); + while let Ok((idx, tx)) = ooo_rx.recv() { + if idx == next { + let _ = execute_tx.send(tx); + next += 1; + + while next < buf.len() { + match buf[next].take() { + Some(queued) => { + let _ = execute_tx.send(queued); + next += 1; + } + None => break, + } + } + } else if idx < buf.len() { + buf[idx] = Some(tx); } - } else { - queue.insert(idx, tx); } - } - }); + }); + } (prewarm_rx, execute_rx) } From 14e32acef3dc21272064fcd1b60c3901a8c26b5e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 11 Feb 2026 20:12:57 +0000 Subject: [PATCH 2/3] chore: lower tx iterator fast-path threshold to 30 Amp-Thread-ID: https://ampcode.com/threads/T-019c4e1a-91e9-75dd-88e7-003fbbf31c37 --- crates/engine/tree/src/tree/payload_processor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 5a7acd7d419..0317e091f97 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -365,7 +365,7 @@ where /// Below this threshold, transactions are converted via rayon's order-preserving `collect()` /// in a single task, eliminating the out-of-order channel and BTreeMap reorder task. - const PARALLEL_REORDER_TX_THRESHOLD: usize = 128; + const PARALLEL_REORDER_TX_THRESHOLD: usize = 30; /// Spawns a task advancing transaction env iterator and streaming updates through a channel. #[expect(clippy::type_complexity)] From e4b93a2d923dbcd619a64783913f8810771de4ab Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 11 Feb 2026 20:58:23 +0000 Subject: [PATCH 3/3] fix: resolve fmt and clippy CI failures - Remove blank line in std use block (fmt) - Collapse single-line vec init (fmt) - Add backticks around BTreeMap in doc comment (clippy) Amp-Thread-ID: https://ampcode.com/threads/T-019c4e7e-4c60-7228-a6e8-e24c9bdb0aa4 --- crates/engine/tree/src/tree/payload_processor/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 0317e091f97..a02092973d9 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -43,7 +43,6 @@ use reth_trie_sparse::{ ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie, }; use std::{ - ops::Not, sync::{ atomic::AtomicBool, @@ -364,7 +363,7 @@ where } /// Below this threshold, transactions are converted via rayon's order-preserving `collect()` - /// in a single task, eliminating the out-of-order channel and BTreeMap reorder task. + /// in a single task, eliminating the out-of-order channel and `BTreeMap` reorder task. const PARALLEL_REORDER_TX_THRESHOLD: usize = 30; /// Spawns a task advancing transaction env iterator and streaming updates through a channel. @@ -421,8 +420,7 @@ where self.executor.spawn_blocking(move || { let mut next = 0usize; - let mut buf: Vec>> = - (0..tx_count).map(|_| None).collect(); + let mut buf: Vec>> = (0..tx_count).map(|_| None).collect(); while let Ok((idx, tx)) = ooo_rx.recv() { if idx == next { let _ = execute_tx.send(tx);