Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions crates/stages/stages/src/stages/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,33 +134,32 @@ where

// Convert range bounds to concrete range
let range = to_range(range);
let start_block = range.start;

// Use the new walker for lazy iteration over static file changesets
let static_file_provider = provider.static_file_provider();

// Get total count for progress reporting
let total_changesets = static_file_provider.account_changeset_count()?;
let interval = (total_changesets / 1000).max(1);

let walker = static_file_provider.walk_account_changeset_range(range);

let mut flush_counter = 0;
let mut current_block_number = u64::MAX;

for (idx, changeset_result) in walker.enumerate() {
for changeset_result in walker {
let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
cache.entry(address).or_default().push(block_number);

if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}

if block_number != current_block_number {
current_block_number = block_number;
flush_counter += 1;
}

if flush_counter > DEFAULT_CACHE_THRESHOLD {
info!(
target: "sync::stages::index_history",
processed_blocks = current_block_number.saturating_sub(start_block) + 1,
current_block = current_block_number,
"Collecting indices"
);
collect_indices(cache.drain(), &mut insert_fn)?;
flush_counter = 0;
}
Expand Down Expand Up @@ -192,30 +191,30 @@ where
};

let range = to_range(range);
let start_block = range.start;
let static_file_provider = provider.static_file_provider();

let total_changesets = static_file_provider.storage_changeset_count()?;
let interval = (total_changesets / 1000).max(1);

let walker = static_file_provider.walk_storage_changeset_range(range);

let mut flush_counter = 0;
let mut current_block_number = u64::MAX;

for (idx, changeset_result) in walker.enumerate() {
for changeset_result in walker {
let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number);

if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}

if block_number != current_block_number {
current_block_number = block_number;
flush_counter += 1;
}

if flush_counter > DEFAULT_CACHE_THRESHOLD {
info!(
target: "sync::stages::index_history",
processed_blocks = current_block_number.saturating_sub(start_block) + 1,
current_block = current_block_number,
"Collecting indices"
);
collect_indices(cache.drain(), &mut insert_fn)?;
flush_counter = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,10 +729,6 @@ impl<N: ProviderNodeTypes> StorageChangeSetReader for BlockchainProvider<N> {
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
self.consistent_provider()?.storage_changesets_range(range)
}

fn storage_changeset_count(&self) -> ProviderResult<usize> {
self.consistent_provider()?.storage_changeset_count()
}
}

impl<N: ProviderNodeTypes> ChangeSetReader for BlockchainProvider<N> {
Expand All @@ -757,10 +753,6 @@ impl<N: ProviderNodeTypes> ChangeSetReader for BlockchainProvider<N> {
) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
self.consistent_provider()?.account_changesets_range(range)
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
self.consistent_provider()?.account_changeset_count()
}
}

impl<N: ProviderNodeTypes> AccountReader for BlockchainProvider<N> {
Expand Down
36 changes: 0 additions & 36 deletions crates/storage/provider/src/providers/consistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,27 +1462,6 @@ impl<N: ProviderNodeTypes> StorageChangeSetReader for ConsistentProvider<N> {

Ok(changesets)
}

fn storage_changeset_count(&self) -> ProviderResult<usize> {
let mut count = 0;
if let Some(head_block) = &self.head_block {
for state in head_block.chain() {
count += state
.block_ref()
.execution_output
.state
.reverts
.iter()
.flatten()
.map(|(_, revert)| revert.storage.len())
.sum::<usize>();
}
}

count += self.storage_provider.storage_changeset_count()?;

Ok(count)
}
}

impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {
Expand Down Expand Up @@ -1631,21 +1610,6 @@ impl<N: ProviderNodeTypes> ChangeSetReader for ConsistentProvider<N> {

Ok(changesets)
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
// Count changesets from in-memory state
let mut count = 0;
if let Some(head_block) = &self.head_block {
for state in head_block.chain() {
count += state.block_ref().execution_output.state.reverts.len();
}
}

// Add changesets from storage provider
count += self.storage_provider.account_changeset_count()?;

Ok(count)
}
}

impl<N: ProviderNodeTypes> AccountReader for ConsistentProvider<N> {
Expand Down
18 changes: 0 additions & 18 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1558,14 +1558,6 @@ impl<TX: DbTx, N: NodeTypes> StorageChangeSetReader for DatabaseProvider<TX, N>
.collect()
}
}

fn storage_changeset_count(&self) -> ProviderResult<usize> {
if self.cached_storage_settings().storage_v2 {
self.static_file_provider.storage_changeset_count()
} else {
Ok(self.tx.entries::<tables::StorageChangeSets>()?)
}
}
}

impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
Expand Down Expand Up @@ -1621,16 +1613,6 @@ impl<TX: DbTx, N: NodeTypes> ChangeSetReader for DatabaseProvider<TX, N> {
.collect()
}
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
// check if account changesets are in static files, otherwise just count the changeset
// entries in the DB
if self.cached_storage_settings().storage_v2 {
self.static_file_provider.account_changeset_count()
} else {
Ok(self.tx.entries::<tables::AccountChangeSets>()?)
}
}
}

impl<TX: DbTx + 'static, N: NodeTypesForProvider> HeaderSyncGapProvider
Expand Down
54 changes: 2 additions & 52 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use reth_primitives_traits::{
use reth_prune_types::PruneSegment;
use reth_stages_types::PipelineTarget;
use reth_static_file_types::{
find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
};
use reth_storage_api::{
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
Expand Down Expand Up @@ -2499,31 +2499,6 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
self.walk_account_changeset_range(range).collect()
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
let mut count = 0;

let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
for (block_range, header) in changeset_segments {
let csoff_path = self
.path
.join(StaticFileSegment::AccountChangeSets.filename(block_range))
.with_extension("csoff");
if csoff_path.exists() {
let len = header.changeset_offsets_len();
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
.map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
for offset in offsets {
count += offset.num_changes() as usize;
}
}
}
}

Ok(count)
}
}

impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
Expand Down Expand Up @@ -2625,31 +2600,6 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
self.walk_storage_changeset_range(range).collect()
}

fn storage_changeset_count(&self) -> ProviderResult<usize> {
let mut count = 0;

let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
for (block_range, header) in changeset_segments {
let csoff_path = self
.path
.join(StaticFileSegment::StorageChangeSets.filename(block_range))
.with_extension("csoff");
if csoff_path.exists() {
let len = header.changeset_offsets_len();
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
.map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
for offset in offsets {
count += offset.num_changes() as usize;
}
}
}
}

Ok(count)
}
}

impl<N: NodePrimitives> StaticFileProvider<N> {
Expand Down
8 changes: 0 additions & 8 deletions crates/storage/provider/src/test_utils/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,10 +1009,6 @@ impl<T: NodePrimitives, ChainSpec: Send + Sync> ChangeSetReader for MockEthProvi
) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
Ok(Vec::default())
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
Ok(0)
}
}

impl<T: NodePrimitives, ChainSpec: Send + Sync> StorageChangeSetReader
Expand Down Expand Up @@ -1040,10 +1036,6 @@ impl<T: NodePrimitives, ChainSpec: Send + Sync> StorageChangeSetReader
) -> ProviderResult<Vec<(reth_db_api::models::BlockNumberAddress, StorageEntry)>> {
Ok(Vec::default())
}

fn storage_changeset_count(&self) -> ProviderResult<usize> {
Ok(0)
}
}

impl<T: NodePrimitives, ChainSpec: Send + Sync> StateReader for MockEthProvider<T, ChainSpec> {
Expand Down
4 changes: 0 additions & 4 deletions crates/storage/rpc-provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1758,10 +1758,6 @@ where
) -> ProviderResult<Vec<(BlockNumber, reth_db_api::models::AccountBeforeTx)>> {
Err(ProviderError::UnsupportedProvider)
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
Err(ProviderError::UnsupportedProvider)
}
}

impl<P, Node, N> StateProviderFactory for RpcBlockchainStateProvider<P, Node, N>
Expand Down
5 changes: 0 additions & 5 deletions crates/storage/storage-api/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,4 @@ pub trait ChangeSetReader {
&self,
range: impl RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>>;

/// Get the total count of all account changes.
///
/// Returns the total number of account changes across all blocks.
fn account_changeset_count(&self) -> ProviderResult<usize>;
}
8 changes: 0 additions & 8 deletions crates/storage/storage-api/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ impl<C: Send + Sync, N: NodePrimitives> ChangeSetReader for NoopProvider<C, N> {
) -> ProviderResult<Vec<(BlockNumber, AccountBeforeTx)>> {
Ok(Vec::default())
}

fn account_changeset_count(&self) -> ProviderResult<usize> {
Ok(0)
}
}

#[cfg(feature = "db-api")]
Expand Down Expand Up @@ -436,10 +432,6 @@ impl<C: Send + Sync, N: NodePrimitives> StorageChangeSetReader for NoopProvider<
> {
Ok(Vec::default())
}

fn storage_changeset_count(&self) -> ProviderResult<usize> {
Ok(0)
}
}

impl<C: Send + Sync, N: NodePrimitives> StateRootProvider for NoopProvider<C, N> {
Expand Down
3 changes: 0 additions & 3 deletions crates/storage/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ pub trait StorageChangeSetReader: Send {
range: impl core::ops::RangeBounds<BlockNumber>,
) -> ProviderResult<Vec<(reth_db_api::models::BlockNumberAddress, StorageEntry)>>;

/// Get the total count of all storage changes.
fn storage_changeset_count(&self) -> ProviderResult<usize>;

/// Get storage changesets for a block as static-file rows.
///
/// Default implementation uses `storage_changeset` and maps to `StorageBeforeTx`.
Expand Down
Loading