Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/prune/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ pub use pruner::{
PruneInterruptReason, PruneProgress, PrunedSegmentInfo, PrunerOutput, SegmentOutput,
SegmentOutputCheckpoint,
};
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError};
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError, PRUNE_SEGMENTS};
pub use target::{PruneModes, UnwindTargetPrunedError, MINIMUM_PRUNING_DISTANCE};
12 changes: 12 additions & 0 deletions crates/prune/types/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ pub enum PruneSegment {
Bodies,
}

/// Array of [`PruneSegment`]s actively in use.
pub const PRUNE_SEGMENTS: [PruneSegment; 8] = [
PruneSegment::SenderRecovery,
PruneSegment::TransactionLookup,
PruneSegment::Receipts,
PruneSegment::ContractLogs,
PruneSegment::AccountHistory,
PruneSegment::StorageHistory,
PruneSegment::MerkleChangeSets,
PruneSegment::Bodies,
];

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for PruneSegment {
Expand Down
83 changes: 49 additions & 34 deletions crates/stages/stages/src/stages/merkle_changesets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use alloy_primitives::BlockNumber;
use reth_consensus::ConsensusError;
use reth_primitives_traits::{GotExpected, SealedHeader};
use reth_provider::{
ChainStateBlockReader, DBProvider, HeaderProvider, ProviderError, StageCheckpointReader,
TrieWriter,
ChainStateBlockReader, DBProvider, HeaderProvider, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, StageCheckpointReader, TrieWriter,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneSegment};
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, ExecInput, ExecOutput, MerkleChangeSetsCheckpoint, Stage,
StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
BlockErrorKind, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
UnwindInput, UnwindOutput,
};
use reth_trie::{updates::TrieUpdates, HashedPostState, KeccakKeyHasher, StateRoot, TrieInput};
use reth_trie_db::{DatabaseHashedPostState, DatabaseStateRoot};
Expand Down Expand Up @@ -39,14 +40,28 @@ impl MerkleChangeSets {

/// Returns the range of blocks which are already computed. Will return an empty range if none
/// have been computed.
fn computed_range(checkpoint: Option<StageCheckpoint>) -> Range<BlockNumber> {
fn computed_range<Provider>(
provider: &Provider,
checkpoint: Option<StageCheckpoint>,
) -> Result<Range<BlockNumber>, StageError>
where
Provider: PruneCheckpointReader,
{
let to = checkpoint.map(|chk| chk.block_number).unwrap_or_default();
let from = checkpoint
.map(|chk| chk.merkle_changesets_stage_checkpoint().unwrap_or_default())
.unwrap_or_default()
.block_range
.to;
from..to + 1

// Get the prune checkpoint for MerkleChangeSets to use as the lower bound. If there's no
// prune checkpoint or if the pruned block number is None, return empty range
let Some(from) = provider
.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?
.and_then(|chk| chk.block_number)
// prune checkpoint indicates the last block pruned, so the block after is the start of
// the computed data
.map(|block_number| block_number + 1)
else {
return Ok(0..0)
};

Ok(from..to + 1)
}

/// Determines the target range for changeset computation based on the checkpoint and provider
Expand Down Expand Up @@ -269,8 +284,13 @@ impl Default for MerkleChangeSets {

impl<Provider> Stage<Provider> for MerkleChangeSets
where
Provider:
StageCheckpointReader + TrieWriter + DBProvider + HeaderProvider + ChainStateBlockReader,
Provider: StageCheckpointReader
+ TrieWriter
+ DBProvider
+ HeaderProvider
+ ChainStateBlockReader
+ PruneCheckpointReader
+ PruneCheckpointWriter,
{
fn id(&self) -> StageId {
StageId::MerkleChangeSets
Expand All @@ -291,7 +311,7 @@ where

// Get the previously computed range. This will be updated to reflect the populating of the
// target range.
let mut computed_range = Self::computed_range(input.checkpoint);
let mut computed_range = Self::computed_range(provider, input.checkpoint)?;

// We want the target range to not include any data already computed previously, if
// possible, so we start the target range from the end of the computed range if that is
Expand Down Expand Up @@ -336,16 +356,19 @@ where
// Populate the target range with changesets
Self::populate_range(provider, target_range)?;

let checkpoint_block_range = CheckpointBlockRange {
from: computed_range.start,
// CheckpointBlockRange is inclusive
to: computed_range.end.saturating_sub(1),
};
// Update the prune checkpoint to reflect that all data before `computed_range.start`
// is not available.
provider.save_prune_checkpoint(
PruneSegment::MerkleChangeSets,
PruneCheckpoint {
block_number: Some(computed_range.start.saturating_sub(1)),
tx_number: None,
prune_mode: PruneMode::Before(computed_range.start),
},
)?;

let checkpoint = StageCheckpoint::new(checkpoint_block_range.to)
.with_merkle_changesets_stage_checkpoint(MerkleChangeSetsCheckpoint {
block_range: checkpoint_block_range,
});
// `computed_range.end` is exclusive.
let checkpoint = StageCheckpoint::new(computed_range.end.saturating_sub(1));

Ok(ExecOutput::done(checkpoint))
}
Expand All @@ -358,22 +381,14 @@ where
// Unwinding is trivial; just clear everything after the target block.
provider.clear_trie_changesets_from(input.unwind_to + 1)?;

let mut computed_range = Self::computed_range(Some(input.checkpoint));
let mut computed_range = Self::computed_range(provider, Some(input.checkpoint))?;
computed_range.end = input.unwind_to + 1;
if computed_range.start > computed_range.end {
computed_range.start = computed_range.end;
}

let checkpoint_block_range = CheckpointBlockRange {
from: computed_range.start,
// computed_range.end is exclusive
to: computed_range.end.saturating_sub(1),
};

let checkpoint = StageCheckpoint::new(input.unwind_to)
.with_merkle_changesets_stage_checkpoint(MerkleChangeSetsCheckpoint {
block_range: checkpoint_block_range,
});
// `computed_range.end` is exclusive
let checkpoint = StageCheckpoint::new(computed_range.end.saturating_sub(1));

Ok(UnwindOutput { checkpoint })
}
Expand Down
13 changes: 11 additions & 2 deletions crates/stages/stages/src/stages/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,18 @@ where
// We cannot recover the data that was pruned in `execute`, so we just update the
// checkpoints.
let prune_checkpoints = provider.get_prune_checkpoints()?;
let unwind_to_last_tx =
provider.block_body_indices(input.unwind_to)?.map(|i| i.last_tx_num());

for (segment, mut checkpoint) in prune_checkpoints {
checkpoint.block_number = Some(input.unwind_to);
provider.save_prune_checkpoint(segment, checkpoint)?;
// Only update the checkpoint if unwind_to is lower than the existing checkpoint.
if let Some(block) = checkpoint.block_number &&
input.unwind_to < block
{
checkpoint.block_number = Some(input.unwind_to);
checkpoint.tx_number = unwind_to_last_tx;
provider.save_prune_checkpoint(segment, checkpoint)?;
}
}
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
Expand Down
14 changes: 9 additions & 5 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use reth_primitives_traits::{
Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE, PRUNE_SEGMENTS,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
Expand Down Expand Up @@ -3024,10 +3024,14 @@ impl<TX: DbTx + 'static, N: NodeTypes> PruneCheckpointReader for DatabaseProvide
}

fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
Ok(self
.tx
.cursor_read::<tables::PruneCheckpoints>()?
.walk(None)?
Ok(PRUNE_SEGMENTS
.iter()
.filter_map(|segment| {
self.tx
.get::<tables::PruneCheckpoints>(*segment)
.transpose()
.map(|chk| chk.map(|chk| (*segment, chk)))
})
.collect::<Result<_, _>>()?)
}
}
Expand Down
23 changes: 13 additions & 10 deletions crates/storage/provider/src/providers/state/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,22 @@ where
}
})?;

// Extract a possible lower bound from stage checkpoint if available
let stage_lower_bound = stage_checkpoint.as_ref().and_then(|chk| {
chk.merkle_changesets_stage_checkpoint().map(|stage_chk| stage_chk.block_range.from)
});
// If the requested block is the DB tip (determined by the MerkleChangeSets stage
// checkpoint) then there won't be any reverts necessary, and we can simply return Ok.
if upper_bound == requested_block {
return Ok(())
}

// Extract a possible lower bound from prune checkpoint if available
// Extract the lower bound from prune checkpoint if available
// The prune checkpoint's block_number is the highest pruned block, so data is available
// starting from the next block
let prune_lower_bound =
prune_checkpoint.and_then(|chk| chk.block_number.map(|block| block + 1));

// Use the higher of the two lower bounds. If neither is available assume unbounded.
let lower_bound = stage_lower_bound.max(prune_lower_bound).unwrap_or(0);
let lower_bound = prune_checkpoint
.and_then(|chk| chk.block_number)
.map(|block_number| block_number + 1)
.ok_or_else(|| ProviderError::InsufficientChangesets {
requested: requested_block,
available: 0..=upper_bound,
})?;

let available_range = lower_bound..=upper_bound;

Expand Down