diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 89aca9244f3..4c07f4120d3 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -134,33 +134,32 @@ where // Convert range bounds to concrete range let range = to_range(range); + let start_block = range.start; // Use the new walker for lazy iteration over static file changesets let static_file_provider = provider.static_file_provider(); - // Get total count for progress reporting - let total_changesets = static_file_provider.account_changeset_count()?; - let interval = (total_changesets / 1000).max(1); - let walker = static_file_provider.walk_account_changeset_range(range); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, changeset_result) in walker.enumerate() { + for changeset_result in walker { let (block_number, AccountBeforeTx { address, .. }) = changeset_result?; cache.entry(address).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } - if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; } if flush_counter > DEFAULT_CACHE_THRESHOLD { + info!( + target: "sync::stages::index_history", + processed_blocks = current_block_number.saturating_sub(start_block) + 1, + current_block = current_block_number, + "Collecting indices" + ); collect_indices(cache.drain(), &mut insert_fn)?; flush_counter = 0; } @@ -192,30 +191,30 @@ where }; let range = to_range(range); + let start_block = range.start; let static_file_provider = provider.static_file_provider(); - let total_changesets = static_file_provider.storage_changeset_count()?; - let interval = (total_changesets / 1000).max(1); - let walker = static_file_provider.walk_storage_changeset_range(range); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, changeset_result) in walker.enumerate() { + for changeset_result in walker { let (BlockNumberAddress((block_number, address)), storage) = changeset_result?; cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } - if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; } if flush_counter > DEFAULT_CACHE_THRESHOLD { + info!( + target: "sync::stages::index_history", + processed_blocks = current_block_number.saturating_sub(start_block) + 1, + current_block = current_block_number, + "Collecting indices" + ); collect_indices(cache.drain(), &mut insert_fn)?; flush_counter = 0; } diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 53b8affb1f1..2fa0ff4653e 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -729,10 +729,6 @@ impl StorageChangeSetReader for BlockchainProvider { ) -> ProviderResult> { self.consistent_provider()?.storage_changesets_range(range) } - - fn storage_changeset_count(&self) -> ProviderResult { - self.consistent_provider()?.storage_changeset_count() - } } impl ChangeSetReader for BlockchainProvider { @@ -757,10 +753,6 @@ impl ChangeSetReader for BlockchainProvider { ) -> ProviderResult> { self.consistent_provider()?.account_changesets_range(range) } - - fn account_changeset_count(&self) -> ProviderResult { - self.consistent_provider()?.account_changeset_count() - } } impl AccountReader for BlockchainProvider { diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index 059f1b64b7f..18e3327b1b0 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -1462,27 +1462,6 @@ impl StorageChangeSetReader for ConsistentProvider { Ok(changesets) } - - fn storage_changeset_count(&self) -> ProviderResult { - let mut count = 0; - if let Some(head_block) = &self.head_block { - for state in head_block.chain() { - count += state - .block_ref() - .execution_output - .state - .reverts - .iter() - .flatten() - .map(|(_, revert)| revert.storage.len()) - .sum::(); - } - } - - count += self.storage_provider.storage_changeset_count()?; - - Ok(count) - } } impl ChangeSetReader for ConsistentProvider { @@ -1631,21 +1610,6 @@ impl ChangeSetReader for ConsistentProvider { Ok(changesets) } - - fn account_changeset_count(&self) -> ProviderResult { - // Count changesets from in-memory state - let mut count = 0; - if let Some(head_block) = &self.head_block { - for state in head_block.chain() { - count += state.block_ref().execution_output.state.reverts.len(); - } - } - - // Add changesets from storage provider - count += self.storage_provider.account_changeset_count()?; - - Ok(count) - } } impl AccountReader for ConsistentProvider { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 1ed65f86432..08df81b27c1 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1558,14 +1558,6 @@ impl StorageChangeSetReader for DatabaseProvider .collect() } } - - fn storage_changeset_count(&self) -> ProviderResult { - if self.cached_storage_settings().storage_v2 { - self.static_file_provider.storage_changeset_count() - } else { - Ok(self.tx.entries::()?) - } - } } impl ChangeSetReader for DatabaseProvider { @@ -1621,16 +1613,6 @@ impl ChangeSetReader for DatabaseProvider { .collect() } } - - fn account_changeset_count(&self) -> ProviderResult { - // check if account changesets are in static files, otherwise just count the changeset - // entries in the DB - if self.cached_storage_settings().storage_v2 { - self.static_file_provider.account_changeset_count() - } else { - Ok(self.tx.entries::()?) - } - } } impl HeaderSyncGapProvider diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index aa73403ee51..a76f7999848 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -39,8 +39,8 @@ use reth_primitives_traits::{ use reth_prune_types::PruneSegment; use reth_stages_types::PipelineTarget; use reth_static_file_types::{ - find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader, - SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE, + find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap, + StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE, }; use reth_storage_api::{ BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader, @@ -2499,31 +2499,6 @@ impl ChangeSetReader for StaticFileProvider { let range = self.bound_range(range, StaticFileSegment::AccountChangeSets); self.walk_account_changeset_range(range).collect() } - - fn account_changeset_count(&self) -> ProviderResult { - let mut count = 0; - - let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?; - if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) { - for (block_range, header) in changeset_segments { - let csoff_path = self - .path - .join(StaticFileSegment::AccountChangeSets.filename(block_range)) - .with_extension("csoff"); - if csoff_path.exists() { - let len = header.changeset_offsets_len(); - let mut reader = ChangesetOffsetReader::new(&csoff_path, len) - .map_err(ProviderError::other)?; - let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; - for offset in offsets { - count += offset.num_changes() as usize; - } - } - } - } - - Ok(count) - } } impl StorageChangeSetReader for StaticFileProvider { @@ -2625,31 +2600,6 @@ impl StorageChangeSetReader for StaticFileProvider { let range = self.bound_range(range, StaticFileSegment::StorageChangeSets); self.walk_storage_changeset_range(range).collect() } - - fn storage_changeset_count(&self) -> ProviderResult { - let mut count = 0; - - let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?; - if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) { - for (block_range, header) in changeset_segments { - let csoff_path = self - .path - .join(StaticFileSegment::StorageChangeSets.filename(block_range)) - .with_extension("csoff"); - if csoff_path.exists() { - let len = header.changeset_offsets_len(); - let mut reader = ChangesetOffsetReader::new(&csoff_path, len) - .map_err(ProviderError::other)?; - let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; - for offset in offsets { - count += offset.num_changes() as usize; - } - } - } - } - - Ok(count) - } } impl StaticFileProvider { diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index c8c6e80e217..8f2fd93c5e2 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -1009,10 +1009,6 @@ impl ChangeSetReader for MockEthProvi ) -> ProviderResult> { Ok(Vec::default()) } - - fn account_changeset_count(&self) -> ProviderResult { - Ok(0) - } } impl StorageChangeSetReader @@ -1040,10 +1036,6 @@ impl StorageChangeSetReader ) -> ProviderResult> { Ok(Vec::default()) } - - fn storage_changeset_count(&self) -> ProviderResult { - Ok(0) - } } impl StateReader for MockEthProvider { diff --git a/crates/storage/rpc-provider/src/lib.rs b/crates/storage/rpc-provider/src/lib.rs index 81b3a042432..e8b5b54321c 100644 --- a/crates/storage/rpc-provider/src/lib.rs +++ b/crates/storage/rpc-provider/src/lib.rs @@ -1758,10 +1758,6 @@ where ) -> ProviderResult> { Err(ProviderError::UnsupportedProvider) } - - fn account_changeset_count(&self) -> ProviderResult { - Err(ProviderError::UnsupportedProvider) - } } impl StateProviderFactory for RpcBlockchainStateProvider diff --git a/crates/storage/storage-api/src/account.rs b/crates/storage/storage-api/src/account.rs index 36471fd1a00..38fe6454aa6 100644 --- a/crates/storage/storage-api/src/account.rs +++ b/crates/storage/storage-api/src/account.rs @@ -78,9 +78,4 @@ pub trait ChangeSetReader { &self, range: impl RangeBounds, ) -> ProviderResult>; - - /// Get the total count of all account changes. - /// - /// Returns the total number of account changes across all blocks. - fn account_changeset_count(&self) -> ProviderResult; } diff --git a/crates/storage/storage-api/src/noop.rs b/crates/storage/storage-api/src/noop.rs index ee51b2458bc..8f5b3e340dd 100644 --- a/crates/storage/storage-api/src/noop.rs +++ b/crates/storage/storage-api/src/noop.rs @@ -402,10 +402,6 @@ impl ChangeSetReader for NoopProvider { ) -> ProviderResult> { Ok(Vec::default()) } - - fn account_changeset_count(&self) -> ProviderResult { - Ok(0) - } } #[cfg(feature = "db-api")] @@ -436,10 +432,6 @@ impl StorageChangeSetReader for NoopProvider< > { Ok(Vec::default()) } - - fn storage_changeset_count(&self) -> ProviderResult { - Ok(0) - } } impl StateRootProvider for NoopProvider { diff --git a/crates/storage/storage-api/src/storage.rs b/crates/storage/storage-api/src/storage.rs index 30ed2671779..ebb244c9c53 100644 --- a/crates/storage/storage-api/src/storage.rs +++ b/crates/storage/storage-api/src/storage.rs @@ -58,9 +58,6 @@ pub trait StorageChangeSetReader: Send { range: impl core::ops::RangeBounds, ) -> ProviderResult>; - /// Get the total count of all storage changes. - fn storage_changeset_count(&self) -> ProviderResult; - /// Get storage changesets for a block as static-file rows. /// /// Default implementation uses `storage_changeset` and maps to `StorageBeforeTx`.