Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/prune/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reth-interfaces = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
itertools = "0.10"
rayon = "1.6.0"

[dev-dependencies]
# reth
Expand Down
3 changes: 3 additions & 0 deletions crates/prune/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum PrunerError {
#[error("Inconsistent data: {0}")]
InconsistentData(&'static str),

#[error("An interface error occurred.")]
Interface(#[from] reth_interfaces::Error),

Expand Down
248 changes: 216 additions & 32 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
//! Support for pruning.

use crate::PrunerError;
use rayon::prelude::*;
use reth_db::{database::Database, tables};
use reth_primitives::{BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart};
use reth_provider::{BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointWriter};
use std::sync::Arc;
use reth_primitives::{
BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
TransactionsProvider,
};
use std::{ops::RangeInclusive, sync::Arc};
use tracing::{debug, instrument, trace};

/// Result of [Pruner::run] execution
Expand All @@ -15,11 +21,12 @@ pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);

pub struct BatchSizes {
receipts: usize,
transaction_lookup: usize,
}

impl Default for BatchSizes {
fn default() -> Self {
Self { receipts: 10000 }
Self { receipts: 10000, transaction_lookup: 10000 }
}
}

Expand Down Expand Up @@ -70,6 +77,12 @@ impl<DB: Database> Pruner<DB> {
self.prune_receipts(&provider, to_block, prune_mode)?;
}

if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_transaction_lookup(tip_block_number)
{
self.prune_transaction_lookup(&provider, to_block, prune_mode)?;
}

provider.commit()?;

self.last_pruned_block_number = Some(tip_block_number);
Expand Down Expand Up @@ -97,6 +110,37 @@ impl<DB: Database> Pruner<DB> {
}
}

/// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block
/// number.
///
/// To get the range start:
/// 1. If checkpoint exists, get next block body and return its first tx number.
/// 2. If checkpoint doesn't exist, return 0.
///
/// To get the range end: get last tx number for the provided `to_block`.
fn get_next_tx_num_range_from_checkpoint(
&self,
provider: &DatabaseProviderRW<'_, DB>,
prune_part: PrunePart,
to_block: BlockNumber,
) -> reth_interfaces::Result<Option<RangeInclusive<TxNumber>>> {
let from_tx_num = provider
.get_prune_checkpoint(prune_part)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number + 1))
.transpose()?
.flatten()
.map(|body| body.first_tx_num)
.unwrap_or_default();

let to_tx_num = match provider.block_body_indices(to_block)? {
Some(body) => body,
None => return Ok(None),
}
.last_tx_num();

Ok(Some(from_tx_num..=to_tx_num))
}

/// Prune receipts up to the provided block, inclusive.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_receipts(
Expand All @@ -105,16 +149,20 @@ impl<DB: Database> Pruner<DB> {
to_block: BlockNumber,
prune_mode: PruneMode,
) -> PrunerResult {
let to_block_body = match provider.block_body_indices(to_block)? {
Some(body) => body,
let range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::Receipts,
to_block,
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
return Ok(())
}
};

provider.prune_table_in_batches::<tables::Receipts, _, _>(
..=to_block_body.last_tx_num(),
provider.prune_table_in_batches::<tables::Receipts, _>(
range,
self.batch_sizes.receipts,
|receipts| {
trace!(
Expand All @@ -132,6 +180,71 @@ impl<DB: Database> Pruner<DB> {

Ok(())
}

/// Prune transaction lookup entries up to the provided block, inclusive.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
fn prune_transaction_lookup(
&self,
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
prune_mode: PruneMode,
) -> PrunerResult {
let range = match self.get_next_tx_num_range_from_checkpoint(
provider,
PrunePart::TransactionLookup,
to_block,
)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
return Ok(())
}
};
let last_tx_num = *range.end();

for i in range.step_by(self.batch_sizes.transaction_lookup) {
// The `min` ensures that the transaction range doesn't exceed the last transaction
// number. `last_tx_num + 1` is used to include the last transaction in the range.
let tx_range = i..(i + self.batch_sizes.transaction_lookup as u64).min(last_tx_num + 1);

// Retrieve transactions in the range and calculate their hashes in parallel
let mut hashes = provider
.transactions_by_tx_range(tx_range.clone())?
.into_par_iter()
.map(|transaction| transaction.hash())
.collect::<Vec<_>>();

// Number of transactions retrieved from the database should match the tx range count
let tx_count = tx_range.clone().count();
if hashes.len() != tx_count {
return Err(PrunerError::InconsistentData(
"Unexpected number of transaction hashes retrieved by transaction number range",
))
}

// Pre-sort hashes to prune them in order
hashes.sort();

provider.prune_table_in_batches::<tables::TxHashNumber, _>(
hashes,
self.batch_sizes.transaction_lookup,
|entries| {
trace!(
target: "pruner",
%entries,
"Pruned transaction lookup"
);
},
)?;
}

provider.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneCheckpoint { block_number: to_block, prune_mode },
)?;

Ok(())
}
}

#[cfg(test)]
Expand All @@ -143,7 +256,9 @@ mod tests {
generators,
generators::{random_block_range, random_receipt},
};
use reth_primitives::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;

Expand Down Expand Up @@ -192,34 +307,103 @@ mod tests {
tx.table::<tables::Receipts>().unwrap().len()
);

let prune_to_block = 10;
let prune_mode = PruneMode::Before(prune_to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
0,
PruneModes { receipts: Some(prune_mode), ..Default::default() },
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
receipts: 10,
},
);
let test_prune = |to_block: BlockNumber| {
let prune_mode = PruneMode::Before(to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
0,
PruneModes { receipts: Some(prune_mode), ..Default::default() },
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
receipts: 10,
..Default::default()
},
);

let provider = tx.inner_rw();
assert_matches!(pruner.prune_receipts(&provider, prune_to_block, prune_mode), Ok(()));
provider.commit().expect("commit");
let provider = tx.inner_rw();
assert_matches!(pruner.prune_receipts(&provider, to_block, prune_mode), Ok(()));
provider.commit().expect("commit");

assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(),
Some(PruneCheckpoint { block_number: to_block, prune_mode })
);
};

// Pruning first time ever, no previous checkpoint is present
test_prune(10);
// Prune second time, previous checkpoint is present, should continue pruning from where
// ended last time
test_prune(20);
}

#[test]
fn prune_transaction_lookup() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let mut tx_hash_numbers = Vec::new();
for block in &blocks {
for transaction in &block.body {
tx_hash_numbers.push((transaction.hash, tx_hash_numbers.len() as u64));
}
}
tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");

assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
blocks[prune_to_block as usize + 1..]
.iter()
.map(|block| block.body.len())
.sum::<usize>()
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(),
Some(PruneCheckpoint { block_number: prune_to_block, prune_mode })
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::TxHashNumber>().unwrap().len()
);

let test_prune = |to_block: BlockNumber| {
let prune_mode = PruneMode::Before(to_block);
let pruner = Pruner::new(
tx.inner_raw(),
MAINNET.clone(),
5,
0,
PruneModes { transaction_lookup: Some(prune_mode), ..Default::default() },
BatchSizes {
// Less than total amount of blocks to prune to test the batching logic
transaction_lookup: 10,
..Default::default()
},
);

let provider = tx.inner_rw();
assert_matches!(
pruner.prune_transaction_lookup(&provider, to_block, prune_mode),
Ok(())
);
provider.commit().expect("commit");

assert_eq!(
tx.table::<tables::TxHashNumber>().unwrap().len(),
blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(),
Some(PruneCheckpoint { block_number: to_block, prune_mode })
);
};

// Pruning first time ever, no previous checkpoint is present
test_prune(10);
// Prune second time, previous checkpoint is present, should continue pruning from where
// ended last time
test_prune(20);
}
}
14 changes: 13 additions & 1 deletion crates/stages/src/test_utils/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_db::{
};
use reth_primitives::{
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry,
TxNumber, H256, MAINNET, U256,
TxHash, TxNumber, H256, MAINNET, U256,
};
use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory};
use std::{
Expand Down Expand Up @@ -268,6 +268,18 @@ impl TestTransaction {
})
}

pub fn insert_tx_hash_numbers<I>(&self, tx_hash_numbers: I) -> Result<(), DbError>
where
I: IntoIterator<Item = (TxHash, TxNumber)>,
{
self.commit(|tx| {
tx_hash_numbers.into_iter().try_for_each(|(tx_hash, tx_num)| {
// Insert into tx hash numbers table.
tx.put::<tables::TxHashNumber>(tx_hash, tx_num)
})
})
}

/// Insert collection of ([TxNumber], [Receipt]) into the corresponding table.
pub fn insert_receipts<I>(&self, receipts: I) -> Result<(), DbError>
where
Expand Down
Loading