Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 60 additions & 19 deletions crates/storage/provider/src/providers/state/overlay.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use alloy_primitives::{BlockNumber, B256};
use reth_db_api::DatabaseError;
use reth_errors::ProviderError;
use reth_prune_types::PruneSegment;
use reth_stages_types::StageId;
use reth_storage_api::{DBProvider, DatabaseProviderFactory, StageCheckpointReader, TrieReader};
use reth_storage_api::{
DBProvider, DatabaseProviderFactory, PruneCheckpointReader, StageCheckpointReader, TrieReader,
};
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
Expand All @@ -13,6 +16,7 @@ use reth_trie_db::{
DatabaseHashedCursorFactory, DatabaseHashedPostState, DatabaseTrieCursorFactory,
};
use std::sync::Arc;
use tracing::debug;

/// Factory for creating overlay state providers with optional reverts and overlays.
///
Expand All @@ -33,26 +37,31 @@ pub struct OverlayStateProviderFactory<F> {
impl<F> OverlayStateProviderFactory<F>
where
F: DatabaseProviderFactory,
F::Provider: Clone + TrieReader + StageCheckpointReader,
F::Provider: Clone + TrieReader + StageCheckpointReader + PruneCheckpointReader,
{
/// Create a new overlay state provider factory
pub const fn new(factory: F) -> Self {
Self { factory, block_number: None, trie_overlay: None, hashed_state_overlay: None }
}

/// Set the block number for collecting reverts
/// Set the block number for collecting reverts. All state will be reverted to the point
/// _after_ this block has been processed.
pub const fn with_block_number(mut self, block_number: Option<BlockNumber>) -> Self {
self.block_number = block_number;
self
}

/// Set the trie overlay
/// Set the trie overlay.
///
/// This overlay will be applied on top of any reverts applied via `with_block_number`.
pub fn with_trie_overlay(mut self, trie_overlay: Option<Arc<TrieUpdatesSorted>>) -> Self {
self.trie_overlay = trie_overlay;
self
}

/// Set the hashed state overlay
///
/// This overlay will be applied on top of any reverts applied via `with_block_number`.
pub fn with_hashed_state_overlay(
mut self,
hashed_state_overlay: Option<Arc<HashedPostStateSorted>>,
Expand All @@ -64,25 +73,47 @@ where
/// Validates that there are sufficient changesets to revert to the requested block number.
///
/// Returns an error if the `MerkleChangeSets` checkpoint doesn't cover the requested block.
/// Takes into account both the stage checkpoint and the prune checkpoint to determine the
/// available data range.
fn validate_changesets_availability(
&self,
provider: &F::Provider,
requested_block: BlockNumber,
) -> Result<(), ProviderError> {
// Get the MerkleChangeSets stage checkpoint - let errors propagate as-is
let checkpoint = provider.get_stage_checkpoint(StageId::MerkleChangeSets)?;

// If there's no checkpoint at all or block range details are missing, we can't revert
let available_range = checkpoint
.and_then(|chk| {
chk.merkle_changesets_stage_checkpoint()
.map(|stage_chk| stage_chk.block_range.from..=chk.block_number)
})
.ok_or_else(|| ProviderError::InsufficientChangesets {
requested: requested_block,
available: 0..=0,
// Get the MerkleChangeSets stage and prune checkpoints.
let stage_checkpoint = provider.get_stage_checkpoint(StageId::MerkleChangeSets)?;
let prune_checkpoint = provider.get_prune_checkpoint(PruneSegment::MerkleChangeSets)?;

// Get the upper bound from stage checkpoint
let upper_bound =
stage_checkpoint.as_ref().map(|chk| chk.block_number).ok_or_else(|| {
ProviderError::InsufficientChangesets {
requested: requested_block,
available: 0..=0,
}
})?;

// Extract a possible lower bound from stage checkpoint if available
let stage_lower_bound = stage_checkpoint.as_ref().and_then(|chk| {
chk.merkle_changesets_stage_checkpoint().map(|stage_chk| stage_chk.block_range.from)
});

// Extract a possible lower bound from prune checkpoint if available
// The prune checkpoint's block_number is the highest pruned block, so data is available
// starting from the next block
let prune_lower_bound =
prune_checkpoint.and_then(|chk| chk.block_number.map(|block| block + 1));

// Use the higher of the two lower bounds (or error if neither is available)
let Some(lower_bound) = stage_lower_bound.max(prune_lower_bound) else {
return Err(ProviderError::InsufficientChangesets {
requested: requested_block,
available: 0..=upper_bound,
})
};
Comment on lines +107 to +113
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see


let available_range = lower_bound..=upper_bound;

// Check if the requested block is within the available range
if !available_range.contains(&requested_block) {
return Err(ProviderError::InsufficientChangesets {
Expand All @@ -105,11 +136,13 @@ where
self.validate_changesets_availability(&provider, from_block)?;

// Collect trie reverts
let mut trie_updates_mut = provider.trie_reverts(from_block)?;
let mut trie_updates_mut = provider.trie_reverts(from_block + 1)?;

// Collect state reverts using HashedPostState::from_reverts
let reverted_state =
HashedPostState::from_reverts::<KeccakKeyHasher>(provider.tx_ref(), from_block..)?;
let reverted_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
provider.tx_ref(),
from_block + 1..,
)?;
let mut hashed_state_mut = reverted_state.into_sorted();

// Extend with overlays if provided
Expand All @@ -121,6 +154,14 @@ where
hashed_state_mut.extend_ref(hashed_state_overlay);
}

debug!(
target: "providers::state::overlay",
?from_block,
num_trie_updates = ?trie_updates_mut.total_len(),
num_state_updates = ?hashed_state_mut.total_len(),
"Reverted to target block",
);

(Arc::new(trie_updates_mut), Arc::new(hashed_state_mut))
} else {
// If no block_number, use overlays directly or defaults
Expand Down
16 changes: 10 additions & 6 deletions crates/storage/provider/src/traits/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

use crate::{
AccountReader, BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader,
DatabaseProviderFactory, HashedPostStateProvider, StageCheckpointReader, StateProviderFactory,
StateReader, StaticFileProviderFactory, TrieReader,
DatabaseProviderFactory, HashedPostStateProvider, PruneCheckpointReader, StageCheckpointReader,
StateProviderFactory, StateReader, StaticFileProviderFactory, TrieReader,
};
use reth_chain_state::{CanonStateSubscriptions, ForkChoiceSubscriptions};
use reth_node_types::{BlockTy, HeaderTy, NodeTypesWithDB, ReceiptTy, TxTy};
Expand All @@ -12,8 +12,10 @@ use std::fmt::Debug;

/// Helper trait to unify all provider traits for simplicity.
pub trait FullProvider<N: NodeTypesWithDB>:
DatabaseProviderFactory<DB = N::DB, Provider: BlockReader + TrieReader>
+ NodePrimitivesProvider<Primitives = N::Primitives>
DatabaseProviderFactory<
DB = N::DB,
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
> + NodePrimitivesProvider<Primitives = N::Primitives>
+ StaticFileProviderFactory<Primitives = N::Primitives>
+ BlockReaderIdExt<
Transaction = TxTy<N>,
Expand All @@ -37,8 +39,10 @@ pub trait FullProvider<N: NodeTypesWithDB>:
}

impl<T, N: NodeTypesWithDB> FullProvider<N> for T where
T: DatabaseProviderFactory<DB = N::DB, Provider: BlockReader + TrieReader>
+ NodePrimitivesProvider<Primitives = N::Primitives>
T: DatabaseProviderFactory<
DB = N::DB,
Provider: BlockReader + TrieReader + StageCheckpointReader + PruneCheckpointReader,
> + NodePrimitivesProvider<Primitives = N::Primitives>
+ StaticFileProviderFactory<Primitives = N::Primitives>
+ BlockReaderIdExt<
Transaction = TxTy<N>,
Expand Down
17 changes: 17 additions & 0 deletions crates/trie/common/src/hashed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,13 @@ impl HashedPostStateSorted {
&self.storages
}

/// Returns the total number of updates including all accounts and storage updates.
pub fn total_len(&self) -> usize {
self.accounts.accounts.len() +
self.accounts.destroyed_accounts.len() +
self.storages.values().map(|storage| storage.len()).sum::<usize>()
}

/// Extends this state with contents of another sorted state.
/// Entries in `other` take precedence for duplicate keys.
pub fn extend_ref(&mut self, other: &Self) {
Expand Down Expand Up @@ -568,6 +575,16 @@ impl HashedStorageSorted {
.sorted_by_key(|entry| *entry.0)
}

/// Returns the total number of storage slot updates.
pub fn len(&self) -> usize {
self.non_zero_valued_slots.len() + self.zero_valued_slots.len()
}

/// Returns `true` if there are no storage slot updates.
pub fn is_empty(&self) -> bool {
self.non_zero_valued_slots.is_empty() && self.zero_valued_slots.is_empty()
}

/// Extends this storage with contents of another sorted storage.
/// Entries in `other` take precedence for duplicate keys.
pub fn extend_ref(&mut self, other: &Self) {
Expand Down
16 changes: 16 additions & 0 deletions crates/trie/common/src/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ impl TrieUpdatesSorted {
&self.storage_tries
}

/// Returns the total number of updates including account nodes and all storage updates.
pub fn total_len(&self) -> usize {
self.account_nodes.len() +
self.storage_tries.values().map(|storage| storage.len()).sum::<usize>()
}

/// Extends the trie updates with another set of sorted updates.
///
/// This merges the account nodes and storage tries from `other` into `self`.
Expand Down Expand Up @@ -535,6 +541,16 @@ impl StorageTrieUpdatesSorted {
&self.storage_nodes
}

/// Returns the total number of storage node updates.
pub const fn len(&self) -> usize {
self.storage_nodes.len()
}

/// Returns `true` if there are no storage node updates.
pub const fn is_empty(&self) -> bool {
self.storage_nodes.is_empty()
}

/// Extends the storage trie updates with another set of sorted updates.
///
/// If `other` is marked as deleted, this will be marked as deleted and all nodes cleared.
Expand Down
Loading