Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 62 additions & 35 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use reth_trie_sparse::{
ParallelSparseTrie, ParallelismThresholds, RevealableSparseTrie, SparseStateTrie,
};
use std::{
collections::BTreeMap,
ops::Not,
sync::{
atomic::AtomicBool,
Expand Down Expand Up @@ -363,6 +362,10 @@ where
}
}

/// Below this threshold, transactions are converted via rayon's order-preserving `collect()`
/// in a single task, eliminating the out-of-order channel and `BTreeMap` reorder task.
const PARALLEL_REORDER_TX_THRESHOLD: usize = 30;

/// Spawns a task advancing transaction env iterator and streaming updates through a channel.
#[expect(clippy::type_complexity)]
fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
Expand All @@ -372,48 +375,72 @@ where
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
) {
let (ooo_tx, ooo_rx) = mpsc::channel();
let (transactions, convert) = transactions.into();
let transactions = transactions.into_par_iter();
let tx_count = transactions.len();

let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();

// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
// Only send Ok(_) variants to prewarming task.
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
});
});
if tx_count < Self::PARALLEL_REORDER_TX_THRESHOLD {
rayon::spawn(move || {
let results: Vec<_> = transactions
.map(|tx| {
let tx = convert(tx);
tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
})
})
.collect();

// Spawn a task that processes out-of-order transactions from the task above and sends them
// to the execution task in order.
self.executor.spawn_blocking(move || {
let mut next_for_execution = 0;
let mut queue = BTreeMap::new();
while let Ok((idx, tx)) = ooo_rx.recv() {
if next_for_execution == idx {
for tx in results {
if let Ok(ref ok) = tx {
let _ = prewarm_tx.send(ok.clone());
}
let _ = execute_tx.send(tx);
next_for_execution += 1;
}
});
} else {
let (ooo_tx, ooo_rx) = mpsc::channel();

rayon::spawn(move || {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
WithTxEnv { tx_env, tx: Arc::new(tx) }
});
if let Ok(tx) = &tx {
let _ = prewarm_tx.send(tx.clone());
}
let _ = ooo_tx.send((idx, tx));
});
});

while let Some(entry) = queue.first_entry() &&
*entry.key() == next_for_execution
{
let _ = execute_tx.send(entry.remove());
next_for_execution += 1;
self.executor.spawn_blocking(move || {
let mut next = 0usize;
let mut buf: Vec<Option<Result<_, _>>> = (0..tx_count).map(|_| None).collect();
while let Ok((idx, tx)) = ooo_rx.recv() {
if idx == next {
let _ = execute_tx.send(tx);
next += 1;

while next < buf.len() {
match buf[next].take() {
Some(queued) => {
let _ = execute_tx.send(queued);
next += 1;
}
None => break,
}
}
} else if idx < buf.len() {
buf[idx] = Some(tx);
}
} else {
queue.insert(idx, tx);
}
}
});
});
}

(prewarm_rx, execute_rx)
}
Expand Down
Loading