diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 89aca9244f3..a6e80c40cf2 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -137,27 +137,39 @@ where // Use the new walker for lazy iteration over static file changesets let static_file_provider = provider.static_file_provider(); - - // Get total count for progress reporting - let total_changesets = static_file_provider.account_changeset_count()?; - let interval = (total_changesets / 1000).max(1); - + let start_block = range.start; + let total_blocks = range.end.saturating_sub(start_block); + let progress_interval = (total_blocks / 1000).max(1); + let mut next_progress_block = + (total_blocks > 1000).then_some(start_block.saturating_add(progress_interval)); let walker = static_file_provider.walk_account_changeset_range(range); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, changeset_result) in walker.enumerate() { + for changeset_result in walker { let (block_number, AccountBeforeTx { address, .. }) = changeset_result?; cache.entry(address).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } - if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; + if let Some(next_block) = next_progress_block && + block_number >= next_block + { + let progressed_blocks = block_number.saturating_sub(start_block).saturating_add(1); + info!( + target: "sync::stages::index_history", + progress = %format!("{:.4}%", (progressed_blocks as f64 / total_blocks as f64) * 100.0), + "Collecting indices" + ); + + let next_interval = progressed_blocks + .saturating_div(progress_interval) + .saturating_add(1) + .saturating_mul(progress_interval); + next_progress_block = Some(start_block.saturating_add(next_interval)); + } } if flush_counter > DEFAULT_CACHE_THRESHOLD { @@ -193,26 +205,39 @@ where let range = to_range(range); let static_file_provider = provider.static_file_provider(); - - let total_changesets = static_file_provider.storage_changeset_count()?; - let interval = (total_changesets / 1000).max(1); - + let start_block = range.start; + let total_blocks = range.end.saturating_sub(start_block); + let progress_interval = (total_blocks / 1000).max(1); + let mut next_progress_block = + (total_blocks > 1000).then_some(start_block.saturating_add(progress_interval)); let walker = static_file_provider.walk_storage_changeset_range(range); let mut flush_counter = 0; let mut current_block_number = u64::MAX; - for (idx, changeset_result) in walker.enumerate() { + for changeset_result in walker { let (BlockNumberAddress((block_number, address)), storage) = changeset_result?; cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number); - if idx > 0 && idx % interval == 0 && total_changesets > 1000 { - info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices"); - } - if block_number != current_block_number { current_block_number = block_number; flush_counter += 1; + if let Some(next_block) = next_progress_block && + block_number >= next_block + { + let progressed_blocks = block_number.saturating_sub(start_block).saturating_add(1); + info!( + target: "sync::stages::index_history", + progress = %format!("{:.4}%", (progressed_blocks as f64 / total_blocks as f64) * 100.0), + "Collecting indices" + ); + + let next_interval = progressed_blocks + .saturating_div(progress_interval) + .saturating_add(1) + .saturating_mul(progress_interval); + next_progress_block = Some(start_block.saturating_add(next_interval)); + } } if flush_counter > DEFAULT_CACHE_THRESHOLD { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index aa73403ee51..672912a0b15 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -2512,11 +2512,12 @@ impl ChangeSetReader for StaticFileProvider { .with_extension("csoff"); if csoff_path.exists() { let len = header.changeset_offsets_len(); - let mut reader = ChangesetOffsetReader::new(&csoff_path, len) - .map_err(ProviderError::other)?; - let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; - for offset in offsets { - count += offset.num_changes() as usize; + if len > 0 { + let mut reader = ChangesetOffsetReader::new(&csoff_path, len) + .map_err(ProviderError::other)?; + if let Some(offset) = reader.get(len - 1).map_err(ProviderError::other)? { + count += (offset.offset() + offset.num_changes()) as usize; + } } } } @@ -2638,11 +2639,12 @@ impl StorageChangeSetReader for StaticFileProvider { .with_extension("csoff"); if csoff_path.exists() { let len = header.changeset_offsets_len(); - let mut reader = ChangesetOffsetReader::new(&csoff_path, len) - .map_err(ProviderError::other)?; - let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; - for offset in offsets { - count += offset.num_changes() as usize; + if len > 0 { + let mut reader = ChangesetOffsetReader::new(&csoff_path, len) + .map_err(ProviderError::other)?; + if let Some(offset) = reader.get(len - 1).map_err(ProviderError::other)? { + count += (offset.offset() + offset.num_changes()) as usize; + } } } } diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index 50cd204df20..8d31b1f4218 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -760,6 +760,36 @@ mod tests { } } + #[test] + fn test_account_changeset_count() { + let (static_dir, _) = create_test_static_files_dir(); + + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(&static_dir) + .with_blocks_per_file(3) + .build() + .unwrap(); + + let changes_per_block = [2usize, 0, 3, 1, 4, 0]; + let expected_total = changes_per_block.iter().sum::(); + + let mut writer = sf_rw.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + for (block_num, changes) in changes_per_block.into_iter().enumerate() { + let changeset = (0..changes) + .map(|index| { + let mut address = Address::ZERO; + address.0[0] = block_num as u8; + address.0[1] = index as u8; + AccountBeforeTx { address, info: None } + }) + .collect::>(); + writer.append_account_changeset(changeset, block_num as u64).unwrap(); + } + writer.commit().unwrap(); + + assert_eq!(sf_rw.account_changeset_count().unwrap(), expected_total); + } + #[test] fn test_get_account_before_block() { let (static_dir, _) = create_test_static_files_dir(); @@ -1106,6 +1136,40 @@ mod tests { } } + #[test] + fn test_storage_changeset_count() { + let (static_dir, _) = create_test_static_files_dir(); + + let sf_rw: StaticFileProvider = + StaticFileProviderBuilder::read_write(&static_dir) + .with_blocks_per_file(3) + .build() + .unwrap(); + + let changes_per_block = [2usize, 0, 3, 1, 4, 0]; + let expected_total = changes_per_block.iter().sum::(); + + let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + for (block_num, changes) in changes_per_block.into_iter().enumerate() { + let changeset = (0..changes) + .map(|index| { + let mut address = Address::ZERO; + address.0[0] = block_num as u8; + address.0[1] = index as u8; + StorageBeforeTx { + address, + key: B256::with_last_byte(index as u8), + value: U256::from((block_num * 100 + index) as u64), + } + }) + .collect::>(); + writer.append_storage_changeset(changeset, block_num as u64).unwrap(); + } + writer.commit().unwrap(); + + assert_eq!(sf_rw.storage_changeset_count().unwrap(), expected_total); + } + #[test] fn test_get_storage_before_block() { let (static_dir, _) = create_test_static_files_dir();