Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use reth_db::{
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
TransactionSignedNoHash, TxNumber, H256,
PrunePart, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::DatabaseProviderRW;
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader};
use tokio::sync::mpsc;
use tracing::*;

Expand Down Expand Up @@ -183,9 +183,20 @@ fn calculate_hash(

fn stage_checkpoint<DB: Database>(
provider: &DatabaseProviderRW<'_, &DB>,
) -> Result<EntitiesCheckpoint, DatabaseError> {
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::TransactionLookup)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
processed: provider.tx_ref().entries::<tables::TxHashNumber>()? as u64,
// If `TxHashNumber` table was pruned, we will have a number of entries in it not matching
// the actual number of processed transactions. To fix that, we add the number of pruned
// `TxHashNumber` entries.
processed: provider.tx_ref().entries::<tables::TxHashNumber>()? as u64 + pruned_entries,
total: provider.tx_ref().entries::<tables::Transactions>()? as u64,
})
}
Expand All @@ -202,8 +213,13 @@ mod tests {
generators,
generators::{random_block, random_block_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, H256};
use reth_provider::{BlockReader, ProviderError, TransactionsProvider};
use reth_primitives::{
stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock, H256,
MAINNET,
};
use reth_provider::{
BlockReader, ProviderError, ProviderFactory, PruneCheckpointWriter, TransactionsProvider,
};

// Implement stage test suite.
stage_test_suite_ext!(TransactionLookupTestRunner, transaction_lookup);
Expand Down Expand Up @@ -321,6 +337,57 @@ mod tests {
assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}

#[test]
fn stage_checkpoint_pruned() {
let tx = TestTransaction::default();
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=100, H256::zero(), 0..10);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");

let max_pruned_block = 30;
let max_processed_block = 70;

let mut tx_hash_numbers = Vec::new();
let mut tx_hash_number = 0;
for block in &blocks[..=max_processed_block] {
for transaction in &block.body {
if block.number > max_pruned_block {
tx_hash_numbers.push((transaction.hash, tx_hash_number));
}
tx_hash_number += 1;
}
}
tx.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers");

let provider = tx.inner_rw();
provider
.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
prune_mode: PruneMode::Full,
},
)
.expect("save stage checkpoint");
provider.commit().expect("commit");

let db = tx.inner_raw();
let factory = ProviderFactory::new(db.as_ref(), MAINNET.clone());
let provider = factory.provider_rw().expect("provider rw");

assert_eq!(
stage_checkpoint(&provider).expect("stage checkpoint"),
EntitiesCheckpoint {
processed: blocks[..=max_processed_block]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
total: blocks.iter().map(|block| block.body.len() as u64).sum::<u64>()
}
);
}

struct TransactionLookupTestRunner {
tx: TestTransaction,
threshold: u64,
Expand Down