Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
StageError, StageId, UnwindInput, UnwindOutput,
};
use futures_util::StreamExt;
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
Expand Down Expand Up @@ -75,36 +76,49 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor over the transactions
let mut tx_cursor = tx.cursor_read::<tables::Transactions>()?;
// Walk the transactions from start to end index (inclusive)
let entries = tx_cursor.walk_range(start_tx_index..=end_tx_index)?;
let tx_walker = tx_cursor.walk_range(start_tx_index..=end_tx_index)?;

// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", start_tx_index, end_tx_index, "Recovering senders");

// an _unordered_ channel to receive results from a rayon job
// An _unordered_ channel to receive results from a rayon job
let (tx, rx) = mpsc::unbounded_channel();

// spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel
for entry in entries {
let (tx_id, transaction) = entry?;
// Spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel.
//
// We try to evenly divide the transactions to recover across all threads in the threadpool.
// Chunks are submitted instead of individual transactions to reduce the overhead of work
// stealing in the threadpool workers.
for chunk in
&tx_walker.chunks(self.commit_threshold as usize / rayon::current_num_threads())
Comment on lines +93 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

in hindsight this is kinda obvious

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah...

{
let tx = tx.clone();
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let mut chunk: Vec<_> = chunk.collect();

// Spawn the sender recovery task onto the global rayon pool
// This task will send the result through the channel after it recovered the sender.
// This task will send the results through the channel after it recovered the senders.
rayon::spawn(move || {
let res = if let Some(signer) = transaction.recover_signer() {
Ok((tx_id, signer))
} else {
Err(StageError::from(SenderRecoveryStageError::SenderRecovery { tx: tx_id }))
};
// send the result back
let _ = tx.send(res);
chunk
.drain(..)
.map(|entry| {
let (tx_id, transaction) = entry?;
let sender = transaction.recover_signer().ok_or(StageError::from(
SenderRecoveryStageError::SenderRecovery { tx: tx_id },
))?;

Ok((tx_id, sender))
})
.for_each(|result: Result<_, StageError>| {
let _ = tx.send(result);
});
Comment on lines +113 to +115
Copy link
Collaborator

@mattsse mattsse Mar 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sending them one by one is totally fine

});
}
drop(tx);

// we need sorted results so we wrap the _unordered_ receiver stream into the sequential
// stream which yields the next result (increasing transaction id)
// We need sorted results, so we wrap the _unordered_ receiver stream into a sequential
// stream, which yields the results by ascending transaction ID.
let mut recovered_senders =
SequentialPairStream::new(start_tx_index, UnboundedReceiverStream::new(rx));

Expand Down