Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,18 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");

// a channel to receive results from a rayon job
// an _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();

// spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel
for entry in entries {
let (tx_id, transaction) = entry?;
let tx = tx.clone();
rayon::spawn_fifo(move || {

// Spawn the sender recovery task onto the global rayon pool
// This task will send the result through the channel after it recovered the sender.
rayon::spawn(move || {
let res = if let Some(signer) = transaction.recover_signer() {
Ok((tx_id, signer))
} else {
Expand All @@ -100,6 +103,8 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
drop(tx);

// we need sorted results so we wrap the _unordered_ receiver stream into the sequential
// stream which yields the next result (increasing transaction id)
let mut recovered_senders =
SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));

Expand Down