Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/primitives/src/prune/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ use serde::{Deserialize, Serialize};
#[serde(default)]
pub struct PruneModes {
/// Sender Recovery pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
// TODO(alexey): removing min blocks restriction is possible if we start calculating the senders
// dynamically on blockchain tree unwind.
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub sender_recovery: Option<PruneMode>,
/// Transaction Lookup pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -126,7 +131,7 @@ impl PruneModes {
}

impl_prune_parts!(
(sender_recovery, "SenderRecovery", None),
(sender_recovery, "SenderRecovery", Some(64)),
(transaction_lookup, "TransactionLookup", None),
(receipts, "Receipts", Some(64)),
(account_history, "AccountHistory", Some(64)),
Expand Down
139 changes: 130 additions & 9 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
pub struct BatchSizes {
receipts: usize,
transaction_lookup: usize,
transaction_senders: usize,
}

impl Default for BatchSizes {
fn default() -> Self {
Self { receipts: 10000, transaction_lookup: 10000 }
Self { receipts: 10000, transaction_lookup: 10000, transaction_senders: 10000 }
}
}

Expand Down Expand Up @@ -83,6 +84,12 @@ impl<DB: Database> Pruner<DB> {
self.prune_transaction_lookup(&provider, to_block, prune_mode)?;
}

if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_sender_recovery(tip_block_number)?
{
self.prune_transaction_senders(&provider, to_block, prune_mode)?;
}

provider.commit()?;

self.last_pruned_block_number = Some(tip_block_number);
Expand Down Expand Up @@ -124,13 +131,16 @@ impl<DB: Database> Pruner<DB> {
prune_part: PrunePart,
to_block: BlockNumber,
) -> reth_interfaces::Result<Option<RangeInclusive<TxNumber>>> {
let from_tx_num = provider
.get_prune_checkpoint(prune_part)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number + 1))
.transpose()?
.flatten()
.map(|body| body.first_tx_num)
.unwrap_or_default();
let checkpoint = provider.get_prune_checkpoint(prune_part)?.unwrap_or(PruneCheckpoint {
block_number: 0, // No checkpoint, fresh pruning
prune_mode: PruneMode::Full, // Doesn't matter in this case, can be anything
});
// Get first transaction of the next block after the highest pruned one
let from_tx_num =
provider.block_body_indices(checkpoint.block_number + 1)?.map(|body| body.first_tx_num);
// If no block body index is found, the DB is either corrupted or we've already pruned up to
// the latest block, so there's no thing to prune now.
let Some(from_tx_num) = from_tx_num else { return Ok(None) };

let to_tx_num = match provider.block_body_indices(to_block)? {
Some(body) => body,
Expand Down Expand Up @@ -200,7 +210,7 @@ impl<DB: Database> Pruner<DB> {
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
trace!(target: "pruner", "No transaction lookup entries to prune");
return Ok(())
}
};
Expand Down Expand Up @@ -248,6 +258,50 @@ impl<DB: Database> Pruner<DB> {

Ok(())
}

/// Prune transaction senders up to the provided block, inclusive.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_transaction_senders(
&self,
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
prune_mode: PruneMode,
) -> PrunerResult {
let range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::SenderRecovery,
to_block,
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transaction senders to prune");
return Ok(())
}
};
let total = range.clone().count();

let mut processed = 0;
provider.prune_table_in_batches::<tables::TxSenders, _>(
range,
self.batch_sizes.transaction_senders,
|entries| {
processed += entries;
trace!(
target: "pruner",
%entries,
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
"Pruned transaction senders"
);
},
)?;

provider.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;

Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -409,4 +463,71 @@ mod tests {
// ended last time
test_prune(20);
}

#[test]
fn prune_transaction_senders() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let mut transaction_senders = Vec::new();
for block in &blocks {
for transaction in &block.body {
transaction_senders.push((
transaction_senders.len() as u64,
transaction.recover_signer().expect("recover signer"),
));
}
}
tx.insert_transaction_senders(transaction_senders).expect("insert transaction senders");

assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::TxSenders>().unwrap().len()
);

let test_prune = |to_block: BlockNumber| {
let prune_mode = PruneMode::Before(to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
0,
PruneModes { sender_recovery: Some(prune_mode), ..Default::default() },
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
transaction_senders: 10,
..Default::default()
},
);

let provider = tx.inner_rw();
assert_matches!(
pruner.prune_transaction_senders(&provider, to_block, prune_mode),
Ok(())
);
provider.commit().expect("commit");

assert_eq!(
tx.table::<tables::TxSenders>().unwrap().len(),
blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(),
Some(PruneCheckpoint { block_number: to_block, prune_mode })
);
};

// Pruning first time ever, no previous checkpoint is present
test_prune(10);
// Prune second time, previous checkpoint is present, should continue pruning from where
// ended last time
test_prune(20);
}
}
78 changes: 72 additions & 6 deletions crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use reth_interfaces::consensus;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H160,
PrunePart, TransactionSignedNoHash, TxNumber, H160,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader,
};
use reth_provider::{BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError};
use std::fmt::Debug;
use thiserror::Error;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -207,9 +209,20 @@ fn recover_sender(

fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::SenderRecovery)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::TxSenders>()? as u64,
// If `TxSenders` table was pruned, we will have a number of entries in it not matching
// the actual number of processed transactions. To fix that, we add the number of pruned
// `TxSenders` entries.
processed: provider.tx_ref().entries::<tables::TxSenders>()? as u64 + pruned_entries,
total: provider.tx_ref().entries::<tables::Transactions>()? as u64,
})
}
Expand Down Expand Up @@ -239,9 +252,10 @@ mod tests {
generators::{random_block, random_block_range},
};
use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, SealedBlock, TransactionSigned, H256,
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock,
TransactionSigned, H256, MAINNET,
};
use reth_provider::TransactionsProvider;
use reth_provider::{ProviderFactory, PruneCheckpointWriter, TransactionsProvider};

use super::*;
use crate::test_utils::{
Expand Down Expand Up @@ -366,6 +380,58 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}

#[test]
fn stage_checkpoint_pruned() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let max_pruned_block = 30;
let max_processed_block = 70;

let mut tx_senders = Vec::new();
let mut tx_number = 0;
for block in &blocks[..=max_processed_block] {
for transaction in &block.body {
if block.number > max_pruned_block {
tx_senders
.push((tx_number, transaction.recover_signer().expect("recover signer")));
}
tx_number += 1;
}
}
tx.insert_transaction_senders(tx_senders).expect("insert tx hash numbers");

let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
prune_mode: PruneMode::Full,
},
)
.expect("save stage checkpoint");
provider.commit().expect("commit");

let db = tx.inner_raw();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().expect("provider rw");

assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {
processed: blocks[..=max_processed_block]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
total: blocks.iter().map(|block| block.body.len() as u64).sum::<u64>()
}
);
}

struct SenderRecoveryTestRunner {
tx: TestTransaction,
threshold: u64,
Expand Down
12 changes: 12 additions & 0 deletions crates/stages/src/test_utils/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ impl TestTransaction {
})
}

pub fn insert_transaction_senders<I>(&self, transaction_senders: I) -> Result<(), DbError>
where
I: IntoIterator<Item = (TxNumber, Address)>,
{
self.commit(|tx| {
transaction_senders.into_iter().try_for_each(|(tx_num, sender)| {
// Insert into receipts table.
tx.put::<tables::TxSenders>(tx_num, sender)
})
})
}

/// Insert collection of ([Address], [Account]) into corresponding tables.
pub fn insert_accounts_and_storages<I, S>(&self, accounts: I) -> Result<(), DbError>
where
Expand Down