Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions crates/stages/stages/src/stages/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,27 +137,39 @@ where

// Use the new walker for lazy iteration over static file changesets
let static_file_provider = provider.static_file_provider();

// Get total count for progress reporting
let total_changesets = static_file_provider.account_changeset_count()?;
let interval = (total_changesets / 1000).max(1);

let start_block = range.start;
let total_blocks = range.end.saturating_sub(start_block);
let progress_interval = (total_blocks / 1000).max(1);
let mut next_progress_block =
(total_blocks > 1000).then_some(start_block.saturating_add(progress_interval));
let walker = static_file_provider.walk_account_changeset_range(range);

let mut flush_counter = 0;
let mut current_block_number = u64::MAX;

for (idx, changeset_result) in walker.enumerate() {
for changeset_result in walker {
let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
cache.entry(address).or_default().push(block_number);

if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}

if block_number != current_block_number {
current_block_number = block_number;
flush_counter += 1;
if let Some(next_block) = next_progress_block &&
block_number >= next_block
{
let progressed_blocks = block_number.saturating_sub(start_block).saturating_add(1);
info!(
target: "sync::stages::index_history",
progress = %format!("{:.4}%", (progressed_blocks as f64 / total_blocks as f64) * 100.0),
"Collecting indices"
);

let next_interval = progressed_blocks
.saturating_div(progress_interval)
.saturating_add(1)
.saturating_mul(progress_interval);
next_progress_block = Some(start_block.saturating_add(next_interval));
}
}

if flush_counter > DEFAULT_CACHE_THRESHOLD {
Expand Down Expand Up @@ -193,26 +205,39 @@ where

let range = to_range(range);
let static_file_provider = provider.static_file_provider();

let total_changesets = static_file_provider.storage_changeset_count()?;
let interval = (total_changesets / 1000).max(1);

let start_block = range.start;
let total_blocks = range.end.saturating_sub(start_block);
let progress_interval = (total_blocks / 1000).max(1);
let mut next_progress_block =
(total_blocks > 1000).then_some(start_block.saturating_add(progress_interval));
let walker = static_file_provider.walk_storage_changeset_range(range);

let mut flush_counter = 0;
let mut current_block_number = u64::MAX;

for (idx, changeset_result) in walker.enumerate() {
for changeset_result in walker {
let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number);

if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
}

if block_number != current_block_number {
current_block_number = block_number;
flush_counter += 1;
if let Some(next_block) = next_progress_block &&
block_number >= next_block
{
let progressed_blocks = block_number.saturating_sub(start_block).saturating_add(1);
info!(
target: "sync::stages::index_history",
progress = %format!("{:.4}%", (progressed_blocks as f64 / total_blocks as f64) * 100.0),
"Collecting indices"
);

let next_interval = progressed_blocks
.saturating_div(progress_interval)
.saturating_add(1)
.saturating_mul(progress_interval);
next_progress_block = Some(start_block.saturating_add(next_interval));
}
}

if flush_counter > DEFAULT_CACHE_THRESHOLD {
Expand Down
22 changes: 12 additions & 10 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2512,11 +2512,12 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
.with_extension("csoff");
if csoff_path.exists() {
let len = header.changeset_offsets_len();
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
.map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
for offset in offsets {
count += offset.num_changes() as usize;
if len > 0 {
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
.map_err(ProviderError::other)?;
if let Some(offset) = reader.get(len - 1).map_err(ProviderError::other)? {
count += (offset.offset() + offset.num_changes()) as usize;
}
}
}
}
Expand Down Expand Up @@ -2638,11 +2639,12 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
.with_extension("csoff");
if csoff_path.exists() {
let len = header.changeset_offsets_len();
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
.map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
for offset in offsets {
count += offset.num_changes() as usize;
if len > 0 {
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
.map_err(ProviderError::other)?;
if let Some(offset) = reader.get(len - 1).map_err(ProviderError::other)? {
count += (offset.offset() + offset.num_changes()) as usize;
}
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,36 @@ mod tests {
}
}

#[test]
fn test_account_changeset_count() {
let (static_dir, _) = create_test_static_files_dir();

let sf_rw: StaticFileProvider<EthPrimitives> =
StaticFileProviderBuilder::read_write(&static_dir)
.with_blocks_per_file(3)
.build()
.unwrap();

let changes_per_block = [2usize, 0, 3, 1, 4, 0];
let expected_total = changes_per_block.iter().sum::<usize>();

let mut writer = sf_rw.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
for (block_num, changes) in changes_per_block.into_iter().enumerate() {
let changeset = (0..changes)
.map(|index| {
let mut address = Address::ZERO;
address.0[0] = block_num as u8;
address.0[1] = index as u8;
AccountBeforeTx { address, info: None }
})
.collect::<Vec<_>>();
writer.append_account_changeset(changeset, block_num as u64).unwrap();
}
writer.commit().unwrap();

assert_eq!(sf_rw.account_changeset_count().unwrap(), expected_total);
}

#[test]
fn test_get_account_before_block() {
let (static_dir, _) = create_test_static_files_dir();
Expand Down Expand Up @@ -1106,6 +1136,40 @@ mod tests {
}
}

#[test]
fn test_storage_changeset_count() {
let (static_dir, _) = create_test_static_files_dir();

let sf_rw: StaticFileProvider<EthPrimitives> =
StaticFileProviderBuilder::read_write(&static_dir)
.with_blocks_per_file(3)
.build()
.unwrap();

let changes_per_block = [2usize, 0, 3, 1, 4, 0];
let expected_total = changes_per_block.iter().sum::<usize>();

let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
for (block_num, changes) in changes_per_block.into_iter().enumerate() {
let changeset = (0..changes)
.map(|index| {
let mut address = Address::ZERO;
address.0[0] = block_num as u8;
address.0[1] = index as u8;
StorageBeforeTx {
address,
key: B256::with_last_byte(index as u8),
value: U256::from((block_num * 100 + index) as u64),
}
})
.collect::<Vec<_>>();
writer.append_storage_changeset(changeset, block_num as u64).unwrap();
}
writer.commit().unwrap();

assert_eq!(sf_rw.storage_changeset_count().unwrap(), expected_total);
}

#[test]
fn test_get_storage_before_block() {
let (static_dir, _) = create_test_static_files_dir();
Expand Down
Loading