From 08419abdb4153a66fccc5aa5b17a407c39ec1a4f Mon Sep 17 00:00:00 2001 From: YK <> Date: Tue, 31 Mar 2026 04:24:01 +0000 Subject: [PATCH 1/4] perf(stages): avoid v2 history count pre-pass Co-authored-by: YK <46377366+yongkangc@users.noreply.github.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d420d-4530-756b-856b-e3c3b0d8522b --- .../src/stages/index_storage_history.rs | 195 +++++++++++++++++- crates/stages/stages/src/stages/utils.rs | 78 +++++-- 2 files changed, 250 insertions(+), 23 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 182bc1c96f5..e16b8ae1d2a 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -691,10 +691,19 @@ mod tests { mod rocksdb_tests { use super::*; - use reth_db_api::models::StorageBeforeTx; - use reth_provider::{providers::StaticFileWriter, RocksDBProviderFactory}; + use reth_chainspec::MAINNET; + use reth_db::test_utils::{ + create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir, + }; + use reth_db_api::models::{AccountBeforeTx, StorageBeforeTx}; + use reth_provider::{ + providers::{RocksDBProvider, StaticFileProviderBuilder, StaticFileWriter}, + test_utils::MockNodeTypesWithDB, + ProviderFactory, RocksDBProviderFactory, + }; use reth_static_file_types::StaticFileSegment; - use reth_storage_api::StorageSettings; + use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettings}; + use std::time::Instant; /// Sets up v2 storage test data: writes block body indices to MDBX and /// storage changesets to static files (matching realistic v2 layout). @@ -730,6 +739,186 @@ mod tests { writer.commit().unwrap(); } + fn setup_v2_history_repro_data( + db: &TestStageDB, + block_range: std::ops::RangeInclusive, + ) { + db.factory.set_storage_settings_cache(StorageSettings::v2()); + + db.commit(|tx| { + for block in block_range.clone() { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 1, ..Default::default() }, + )?; + } + Ok(()) + }) + .unwrap(); + + let static_file_provider = db.factory.static_file_provider(); + let mut account_writer = + static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + for block in block_range.clone() { + account_writer + .append_account_changeset( + vec![AccountBeforeTx { address: ADDRESS, info: None }], + block, + ) + .unwrap(); + } + account_writer.commit().unwrap(); + + let mut storage_writer = + static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + for block in block_range { + storage_writer + .append_storage_changeset( + vec![StorageBeforeTx { + address: ADDRESS, + key: STORAGE_KEY, + value: U256::ZERO, + }], + block, + ) + .unwrap(); + } + storage_writer.commit().unwrap(); + } + + fn repro_test_db(blocks_per_file: u64) -> TestStageDB { + let (temp_static_files_dir, static_dir_path) = create_test_static_files_dir(); + let (temp_rocksdb_dir, rocksdb_dir_path) = create_test_rocksdb_dir(); + let static_file_provider = StaticFileProviderBuilder::read_write(&static_dir_path) + .with_blocks_per_file_for_segment( + StaticFileSegment::AccountChangeSets, + blocks_per_file, + ) + .with_blocks_per_file_for_segment( + StaticFileSegment::StorageChangeSets, + blocks_per_file, + ) + .build() + .unwrap(); + + TestStageDB { + temp_static_files_dir, + temp_rocksdb_dir, + factory: ProviderFactory::::new( + create_test_rw_db(), + MAINNET.clone(), + static_file_provider, + RocksDBProvider::builder(rocksdb_dir_path) + .with_default_tables() + .build() + .unwrap(), + reth_tasks::Runtime::test(), + ) + .unwrap(), + } + } + + #[tokio::test] + #[ignore = "manual repro for storage_v2 history collect bottleneck"] + async fn repro_collect_bottleneck_breakdown() { + let blocks = std::env::var("RETH_HISTORY_REPRO_BLOCKS") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(200_000); + let blocks_per_file = std::env::var("RETH_HISTORY_REPRO_BLOCKS_PER_FILE") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(1_000); + let tail_blocks = 36; + + assert!(blocks > tail_blocks, "repro needs more history than the tail window"); + + let db = repro_test_db(blocks_per_file); + setup_v2_history_repro_data(&db, 0..=blocks - 1); + + let static_file_provider = db.factory.static_file_provider(); + static_file_provider.initialize_index().unwrap(); + + let tail_start = blocks - tail_blocks; + let tail_end = blocks - 1; + + let started = Instant::now(); + let account_changesets = static_file_provider.account_changeset_count().unwrap(); + let account_count_time = started.elapsed(); + + let started = Instant::now(); + let storage_changesets = static_file_provider.storage_changeset_count().unwrap(); + let storage_count_time = started.elapsed(); + + let started = Instant::now(); + let walked_account = static_file_provider + .walk_account_changeset_range(tail_start..=tail_end) + .collect::, _>>() + .unwrap(); + let account_walk_time = started.elapsed(); + + let started = Instant::now(); + let walked_storage = static_file_provider + .walk_storage_changeset_range(tail_start..=tail_end) + .collect::, _>>() + .unwrap(); + let storage_walk_time = started.elapsed(); + + let mut storage_stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let started = Instant::now(); + let storage_out = storage_stage + .execute( + &provider, + ExecInput { + target: Some(tail_end), + checkpoint: Some(StageCheckpoint::new(tail_start - 1)), + }, + ) + .unwrap(); + let storage_stage_time = started.elapsed(); + + let mut account_stage = crate::stages::IndexAccountHistoryStage::default(); + let started = Instant::now(); + let account_out = account_stage + .execute( + &provider, + ExecInput { + target: Some(tail_end), + checkpoint: Some(StageCheckpoint::new(tail_start - 1)), + }, + ) + .unwrap(); + let account_stage_time = started.elapsed(); + + let account_probe_time = account_count_time + account_walk_time; + let storage_probe_time = storage_count_time + storage_walk_time; + + eprintln!( + "storage_v2 history repro: blocks={blocks} blocks_per_file={blocks_per_file} tail_blocks={tail_blocks} \ +account_changesets={account_changesets} storage_changesets={storage_changesets} \ +account_count={account_count_time:?} storage_count={storage_count_time:?} \ +account_walk={account_walk_time:?} storage_walk={storage_walk_time:?} \ +account_probe={account_probe_time:?} storage_probe={storage_probe_time:?} \ +account_stage={account_stage_time:?} storage_stage={storage_stage_time:?} \ +account_stage_minus_probe={:?} storage_stage_minus_probe={:?}" + , + account_stage_time.saturating_sub(account_probe_time), + storage_stage_time.saturating_sub(storage_probe_time), + ); + + assert_eq!(walked_account.len(), tail_blocks as usize); + assert_eq!(walked_storage.len(), tail_blocks as usize); + assert_eq!( + storage_out, + ExecOutput { checkpoint: StageCheckpoint::new(tail_end), done: true } + ); + assert_eq!( + account_out, + ExecOutput { checkpoint: StageCheckpoint::new(tail_end), done: true } + ); + } + /// Test that when `storages_history_in_rocksdb` is enabled, the stage /// writes storage history indices to `RocksDB` instead of MDBX. #[tokio::test] diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 89aca9244f3..0fbfc6f4c8a 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -20,7 +20,11 @@ use reth_provider::{ use reth_stages_api::StageError; use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; -use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; +use std::{ + collections::HashMap, + hash::Hash, + ops::{Range, RangeBounds}, +}; use tracing::info; /// Number of blocks before pushing indices from cache to [`Collector`] @@ -113,6 +117,49 @@ where Ok(()) } +/// Progress logger for a requested block range. +struct BlockRangeProgress { + start_block: BlockNumber, + total_blocks: u64, + progress_interval: u64, + next_progress_block: BlockNumber, +} + +impl BlockRangeProgress { + fn new(range: &Range) -> Option { + let start_block = range.start; + let total_blocks = range.end.saturating_sub(start_block); + + // Static-file history collection only needs the requested block range. Using block-range + // progress avoids scanning every `.csoff` record just to format percentage logs. + (total_blocks > 1000).then_some(Self { + start_block, + total_blocks, + progress_interval: (total_blocks / 1000).max(1), + next_progress_block: start_block.saturating_add((total_blocks / 1000).max(1)), + }) + } + + fn maybe_log(&mut self, block_number: BlockNumber) { + if block_number < self.next_progress_block { + return; + } + + let progressed_blocks = block_number.saturating_sub(self.start_block).saturating_add(1); + info!( + target: "sync::stages::index_history", + progress = %format!("{:.4}%", (progressed_blocks as f64 / self.total_blocks as f64) * 100.0), + "Collecting indices" + ); + + let next_interval = progressed_blocks + .saturating_div(self.progress_interval) + .saturating_add(1) + .saturating_mul(self.progress_interval); + self.next_progress_block = self.start_block.saturating_add(next_interval); + } +} + /// Collects account history indices using a provider that implements `ChangeSetReader`. pub(crate) fn collect_account_history_indices( provider: &Provider, @@ -137,27 +184,22 @@ where // Use the new walker for lazy iteration over static file changesets let static_file_provider = provider.static_file_provider(); - - // Get total count for progress reporting - let total_changesets = static_file_provider.account_changeset_count()?; - let interval = (total_changesets / 1000).max(1); - + let mut progress = BlockRangeProgress::new(&range); let walker = static_file_provider.walk_account_changeset_range(range); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, changeset_result) in walker.enumerate() { + for changeset_result in walker { let (block_number, AccountBeforeTx { address, .. }) = changeset_result?; cache.entry(address).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } - if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; + if let Some(progress) = progress.as_mut() { + progress.maybe_log(block_number); + } } if flush_counter > DEFAULT_CACHE_THRESHOLD { @@ -193,26 +235,22 @@ where let range = to_range(range); let static_file_provider = provider.static_file_provider(); - - let total_changesets = static_file_provider.storage_changeset_count()?; - let interval = (total_changesets / 1000).max(1); - + let mut progress = BlockRangeProgress::new(&range); let walker = static_file_provider.walk_storage_changeset_range(range); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, changeset_result) in walker.enumerate() { + for changeset_result in walker { let (BlockNumberAddress((block_number, address)), storage) = changeset_result?; cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } - if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; + if let Some(progress) = progress.as_mut() { + progress.maybe_log(block_number); + } } if flush_counter > DEFAULT_CACHE_THRESHOLD { From c75f5439e6a85697de6eab59279573c2a16d5a3b Mon Sep 17 00:00:00 2001 From: YK <> Date: Tue, 31 Mar 2026 04:29:52 +0000 Subject: [PATCH 2/4] perf(provider): speed up static-file changeset counts Co-authored-by: YK <46377366+yongkangc@users.noreply.github.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d420d-4530-756b-856b-e3c3b0d8522b --- .../src/providers/static_file/manager.rs | 22 ++++--- .../provider/src/providers/static_file/mod.rs | 64 +++++++++++++++++++ 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index aa73403ee51..672912a0b15 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -2512,11 +2512,12 @@ impl ChangeSetReader for StaticFileProvider { .with_extension("csoff"); if csoff_path.exists() { let len = header.changeset_offsets_len(); - let mut reader = ChangesetOffsetReader::new(&csoff_path, len) - .map_err(ProviderError::other)?; - let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; - for offset in offsets { - count += offset.num_changes() as usize; + if len > 0 { + let mut reader = ChangesetOffsetReader::new(&csoff_path, len) + .map_err(ProviderError::other)?; + if let Some(offset) = reader.get(len - 1).map_err(ProviderError::other)? { + count += (offset.offset() + offset.num_changes()) as usize; + } } } } @@ -2638,11 +2639,12 @@ impl StorageChangeSetReader for StaticFileProvider { .with_extension("csoff"); if csoff_path.exists() { let len = header.changeset_offsets_len(); - let mut reader = ChangesetOffsetReader::new(&csoff_path, len) - .map_err(ProviderError::other)?; - let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; - for offset in offsets { - count += offset.num_changes() as usize; + if len > 0 { + let mut reader = ChangesetOffsetReader::new(&csoff_path, len) + .map_err(ProviderError::other)?; + if let Some(offset) = reader.get(len - 1).map_err(ProviderError::other)? { + count += (offset.offset() + offset.num_changes()) as usize; + } } } } diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index 50cd204df20..8d31b1f4218 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -760,6 +760,36 @@ mod tests { } } + #[test] + fn test_account_changeset_count() { + let (static_dir, _) = create_test_static_files_dir(); + + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(&static_dir) + .with_blocks_per_file(3) + .build() + .unwrap(); + + let changes_per_block = [2usize, 0, 3, 1, 4, 0]; + let expected_total = changes_per_block.iter().sum::(); + + let mut writer = sf_rw.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + for (block_num, changes) in changes_per_block.into_iter().enumerate() { + let changeset = (0..changes) + .map(|index| { + let mut address = Address::ZERO; + address.0[0] = block_num as u8; + address.0[1] = index as u8; + AccountBeforeTx { address, info: None } + }) + .collect::>(); + writer.append_account_changeset(changeset, block_num as u64).unwrap(); + } + writer.commit().unwrap(); + + assert_eq!(sf_rw.account_changeset_count().unwrap(), expected_total); + } + #[test] fn test_get_account_before_block() { let (static_dir, _) = create_test_static_files_dir(); @@ -1106,6 +1136,40 @@ mod tests { } } + #[test] + fn test_storage_changeset_count() { + let (static_dir, _) = create_test_static_files_dir(); + + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(&static_dir) + .with_blocks_per_file(3) + .build() + .unwrap(); + + let changes_per_block = [2usize, 0, 3, 1, 4, 0]; + let expected_total = changes_per_block.iter().sum::(); + + let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + for (block_num, changes) in changes_per_block.into_iter().enumerate() { + let changeset = (0..changes) + .map(|index| { + let mut address = Address::ZERO; + address.0[0] = block_num as u8; + address.0[1] = index as u8; + StorageBeforeTx { + address, + key: B256::with_last_byte(index as u8), + value: U256::from((block_num * 100 + index) as u64), + } + }) + .collect::>(); + writer.append_storage_changeset(changeset, block_num as u64).unwrap(); + } + writer.commit().unwrap(); + + assert_eq!(sf_rw.storage_changeset_count().unwrap(), expected_total); + } + #[test] fn test_get_storage_before_block() { let (static_dir, _) = create_test_static_files_dir(); From eaa0340672769b897f16d15b8d273f2c8520ebe0 Mon Sep 17 00:00:00 2001 From: YK <> Date: Tue, 31 Mar 2026 04:32:36 +0000 Subject: [PATCH 3/4] chore(stages): drop manual history repro harness Co-authored-by: YK <46377366+yongkangc@users.noreply.github.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d420d-4530-756b-856b-e3c3b0d8522b --- .../src/stages/index_storage_history.rs | 195 +----------------- 1 file changed, 3 insertions(+), 192 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index e16b8ae1d2a..182bc1c96f5 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -691,19 +691,10 @@ mod tests { mod rocksdb_tests { use super::*; - use reth_chainspec::MAINNET; - use reth_db::test_utils::{ - create_test_rocksdb_dir, create_test_rw_db, create_test_static_files_dir, - }; - use reth_db_api::models::{AccountBeforeTx, StorageBeforeTx}; - use reth_provider::{ - providers::{RocksDBProvider, StaticFileProviderBuilder, StaticFileWriter}, - test_utils::MockNodeTypesWithDB, - ProviderFactory, RocksDBProviderFactory, - }; + use reth_db_api::models::StorageBeforeTx; + use reth_provider::{providers::StaticFileWriter, RocksDBProviderFactory}; use reth_static_file_types::StaticFileSegment; - use reth_storage_api::{ChangeSetReader, StorageChangeSetReader, StorageSettings}; - use std::time::Instant; + use reth_storage_api::StorageSettings; /// Sets up v2 storage test data: writes block body indices to MDBX and /// storage changesets to static files (matching realistic v2 layout). @@ -739,186 +730,6 @@ mod tests { writer.commit().unwrap(); } - fn setup_v2_history_repro_data( - db: &TestStageDB, - block_range: std::ops::RangeInclusive, - ) { - db.factory.set_storage_settings_cache(StorageSettings::v2()); - - db.commit(|tx| { - for block in block_range.clone() { - tx.put::( - block, - StoredBlockBodyIndices { tx_count: 1, ..Default::default() }, - )?; - } - Ok(()) - }) - .unwrap(); - - let static_file_provider = db.factory.static_file_provider(); - let mut account_writer = - static_file_provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); - for block in block_range.clone() { - account_writer - .append_account_changeset( - vec![AccountBeforeTx { address: ADDRESS, info: None }], - block, - ) - .unwrap(); - } - account_writer.commit().unwrap(); - - let mut storage_writer = - static_file_provider.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); - for block in block_range { - storage_writer - .append_storage_changeset( - vec![StorageBeforeTx { - address: ADDRESS, - key: STORAGE_KEY, - value: U256::ZERO, - }], - block, - ) - .unwrap(); - } - storage_writer.commit().unwrap(); - } - - fn repro_test_db(blocks_per_file: u64) -> TestStageDB { - let (temp_static_files_dir, static_dir_path) = create_test_static_files_dir(); - let (temp_rocksdb_dir, rocksdb_dir_path) = create_test_rocksdb_dir(); - let static_file_provider = StaticFileProviderBuilder::read_write(&static_dir_path) - .with_blocks_per_file_for_segment( - StaticFileSegment::AccountChangeSets, - blocks_per_file, - ) - .with_blocks_per_file_for_segment( - StaticFileSegment::StorageChangeSets, - blocks_per_file, - ) - .build() - .unwrap(); - - TestStageDB { - temp_static_files_dir, - temp_rocksdb_dir, - factory: ProviderFactory::::new( - create_test_rw_db(), - MAINNET.clone(), - static_file_provider, - RocksDBProvider::builder(rocksdb_dir_path) - .with_default_tables() - .build() - .unwrap(), - reth_tasks::Runtime::test(), - ) - .unwrap(), - } - } - - #[tokio::test] - #[ignore = "manual repro for storage_v2 history collect bottleneck"] - async fn repro_collect_bottleneck_breakdown() { - let blocks = std::env::var("RETH_HISTORY_REPRO_BLOCKS") - .ok() - .and_then(|value| value.parse::().ok()) - .unwrap_or(200_000); - let blocks_per_file = std::env::var("RETH_HISTORY_REPRO_BLOCKS_PER_FILE") - .ok() - .and_then(|value| value.parse::().ok()) - .unwrap_or(1_000); - let tail_blocks = 36; - - assert!(blocks > tail_blocks, "repro needs more history than the tail window"); - - let db = repro_test_db(blocks_per_file); - setup_v2_history_repro_data(&db, 0..=blocks - 1); - - let static_file_provider = db.factory.static_file_provider(); - static_file_provider.initialize_index().unwrap(); - - let tail_start = blocks - tail_blocks; - let tail_end = blocks - 1; - - let started = Instant::now(); - let account_changesets = static_file_provider.account_changeset_count().unwrap(); - let account_count_time = started.elapsed(); - - let started = Instant::now(); - let storage_changesets = static_file_provider.storage_changeset_count().unwrap(); - let storage_count_time = started.elapsed(); - - let started = Instant::now(); - let walked_account = static_file_provider - .walk_account_changeset_range(tail_start..=tail_end) - .collect::, _>>() - .unwrap(); - let account_walk_time = started.elapsed(); - - let started = Instant::now(); - let walked_storage = static_file_provider - .walk_storage_changeset_range(tail_start..=tail_end) - .collect::, _>>() - .unwrap(); - let storage_walk_time = started.elapsed(); - - let mut storage_stage = IndexStorageHistoryStage::default(); - let provider = db.factory.database_provider_rw().unwrap(); - let started = Instant::now(); - let storage_out = storage_stage - .execute( - &provider, - ExecInput { - target: Some(tail_end), - checkpoint: Some(StageCheckpoint::new(tail_start - 1)), - }, - ) - .unwrap(); - let storage_stage_time = started.elapsed(); - - let mut account_stage = crate::stages::IndexAccountHistoryStage::default(); - let started = Instant::now(); - let account_out = account_stage - .execute( - &provider, - ExecInput { - target: Some(tail_end), - checkpoint: Some(StageCheckpoint::new(tail_start - 1)), - }, - ) - .unwrap(); - let account_stage_time = started.elapsed(); - - let account_probe_time = account_count_time + account_walk_time; - let storage_probe_time = storage_count_time + storage_walk_time; - - eprintln!( - "storage_v2 history repro: blocks={blocks} blocks_per_file={blocks_per_file} tail_blocks={tail_blocks} \ -account_changesets={account_changesets} storage_changesets={storage_changesets} \ -account_count={account_count_time:?} storage_count={storage_count_time:?} \ -account_walk={account_walk_time:?} storage_walk={storage_walk_time:?} \ -account_probe={account_probe_time:?} storage_probe={storage_probe_time:?} \ -account_stage={account_stage_time:?} storage_stage={storage_stage_time:?} \ -account_stage_minus_probe={:?} storage_stage_minus_probe={:?}" - , - account_stage_time.saturating_sub(account_probe_time), - storage_stage_time.saturating_sub(storage_probe_time), - ); - - assert_eq!(walked_account.len(), tail_blocks as usize); - assert_eq!(walked_storage.len(), tail_blocks as usize); - assert_eq!( - storage_out, - ExecOutput { checkpoint: StageCheckpoint::new(tail_end), done: true } - ); - assert_eq!( - account_out, - ExecOutput { checkpoint: StageCheckpoint::new(tail_end), done: true } - ); - } - /// Test that when `storages_history_in_rocksdb` is enabled, the stage /// writes storage history indices to `RocksDB` instead of MDBX. #[tokio::test] From 5fda319b0b4b02ca9068b834ac18b59be0abb67c Mon Sep 17 00:00:00 2001 From: YK <> Date: Tue, 31 Mar 2026 04:35:58 +0000 Subject: [PATCH 4/4] refactor(stages): inline history progress tracking Co-authored-by: YK <46377366+yongkangc@users.noreply.github.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d420d-4530-756b-856b-e3c3b0d8522b --- crates/stages/stages/src/stages/utils.rs | 95 ++++++++++-------------- 1 file changed, 41 insertions(+), 54 deletions(-) diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 0fbfc6f4c8a..a6e80c40cf2 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -20,11 +20,7 @@ use reth_provider::{ use reth_stages_api::StageError; use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ChangeSetReader, StorageChangeSetReader}; -use std::{ - collections::HashMap, - hash::Hash, - ops::{Range, RangeBounds}, -}; +use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; /// Number of blocks before pushing indices from cache to [`Collector`] @@ -117,49 +113,6 @@ where Ok(()) } -/// Progress logger for a requested block range. -struct BlockRangeProgress { - start_block: BlockNumber, - total_blocks: u64, - progress_interval: u64, - next_progress_block: BlockNumber, -} - -impl BlockRangeProgress { - fn new(range: &Range) -> Option { - let start_block = range.start; - let total_blocks = range.end.saturating_sub(start_block); - - // Static-file history collection only needs the requested block range. Using block-range - // progress avoids scanning every `.csoff` record just to format percentage logs. - (total_blocks > 1000).then_some(Self { - start_block, - total_blocks, - progress_interval: (total_blocks / 1000).max(1), - next_progress_block: start_block.saturating_add((total_blocks / 1000).max(1)), - }) - } - - fn maybe_log(&mut self, block_number: BlockNumber) { - if block_number < self.next_progress_block { - return; - } - - let progressed_blocks = block_number.saturating_sub(self.start_block).saturating_add(1); - info!( - target: "sync::stages::index_history", - progress = %format!("{:.4}%", (progressed_blocks as f64 / self.total_blocks as f64) * 100.0), - "Collecting indices" - ); - - let next_interval = progressed_blocks - .saturating_div(self.progress_interval) - .saturating_add(1) - .saturating_mul(self.progress_interval); - self.next_progress_block = self.start_block.saturating_add(next_interval); - } -} - /// Collects account history indices using a provider that implements `ChangeSetReader`. pub(crate) fn collect_account_history_indices( provider: &Provider, @@ -184,7 +137,11 @@ where // Use the new walker for lazy iteration over static file changesets let static_file_provider = provider.static_file_provider(); - let mut progress = BlockRangeProgress::new(&range); + let start_block = range.start; + let total_blocks = range.end.saturating_sub(start_block); + let progress_interval = (total_blocks / 1000).max(1); + let mut next_progress_block = + (total_blocks > 1000).then_some(start_block.saturating_add(progress_interval)); let walker = static_file_provider.walk_account_changeset_range(range); let mut flush_counter = 0; @@ -197,8 +154,21 @@ where if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; - if let Some(progress) = progress.as_mut() { - progress.maybe_log(block_number); + if let Some(next_block) = next_progress_block && + block_number >= next_block + { + let progressed_blocks = block_number.saturating_sub(start_block).saturating_add(1); + info!( + target: "sync::stages::index_history", + progress = %format!("{:.4}%", (progressed_blocks as f64 / total_blocks as f64) * 100.0), + "Collecting indices" + ); + + let next_interval = progressed_blocks + .saturating_div(progress_interval) + .saturating_add(1) + .saturating_mul(progress_interval); + next_progress_block = Some(start_block.saturating_add(next_interval)); } } @@ -235,7 +205,11 @@ where let range = to_range(range); let static_file_provider = provider.static_file_provider(); - let mut progress = BlockRangeProgress::new(&range); + let start_block = range.start; + let total_blocks = range.end.saturating_sub(start_block); + let progress_interval = (total_blocks / 1000).max(1); + let mut next_progress_block = + (total_blocks > 1000).then_some(start_block.saturating_add(progress_interval)); let walker = static_file_provider.walk_storage_changeset_range(range); let mut flush_counter = 0; @@ -248,8 +222,21 @@ where if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; - if let Some(progress) = progress.as_mut() { - progress.maybe_log(block_number); + if let Some(next_block) = next_progress_block && + block_number >= next_block + { + let progressed_blocks = block_number.saturating_sub(start_block).saturating_add(1); + info!( + target: "sync::stages::index_history", + progress = %format!("{:.4}%", (progressed_blocks as f64 / total_blocks as f64) * 100.0), + "Collecting indices" + ); + + let next_interval = progressed_blocks + .saturating_div(progress_interval) + .saturating_add(1) + .saturating_mul(progress_interval); + next_progress_block = Some(start_block.saturating_add(next_interval)); } }