From 396f33372f29da76ace3d89a27578b44403b39cd Mon Sep 17 00:00:00 2001 From: yongkangc Date: Sun, 18 Jan 2026 22:39:27 +0000 Subject: [PATCH 01/12] feat(stages): add RocksDB support for IndexStorageHistoryStage This implements RocksDB support for the IndexStorageHistoryStage following the pattern established in #21165 for IndexAccountHistoryStage: - Add RocksDBProviderFactory and StorageSettingsCache trait bounds to Stage impl - Use EitherWriter::new_storages_history with explicit #[cfg] blocks for batch creation - Add helper functions for loading/flushing storage history shards: - load_storage_history_via_writer - iterate collector, merge with existing shards - flush_storage_history_shards_partial - write full shards, keep partial in memory - flush_storage_history_shards - write all remaining shards with u64::MAX for last - Add EitherWriter methods: - append_storage_history, upsert_storage_history, get_last_storage_history_shard - unwind_storage_history_shard - handles RocksDB shard unwinding - Add RocksDBProvider::clear() and RocksDBBatch::get() methods - Route unwind to RocksDB when storages_history_in_rocksdb is enabled - Add rocksdb_tests module with tests for: - execute_writes_to_rocksdb_when_enabled - unwind_deletes_from_rocksdb_when_enabled - unwind_to_zero_keeps_block_zero - execute_incremental_sync Part of #20593 - Move secondary indices to RocksDB --- .../src/stages/index_storage_history.rs | 323 ++++++++++++++++-- crates/stages/stages/src/stages/utils.rs | 177 +++++++++- crates/storage/provider/src/either_writer.rs | 136 +++++++- 3 files changed, 613 insertions(+), 23 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 2ec4094c1ec..5c01cca9bd6 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -1,13 +1,15 @@ -use super::{collect_history_indices, load_history_indices}; -use crate::{StageCheckpoint, StageId}; +use super::collect_history_indices; +use crate::{stages::utils::load_storage_history_via_writer, StageCheckpoint, StageId}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, - table::Decode, tables, - transaction::DbTxMut, + transaction::{DbTx, DbTxMut}, +}; +use reth_provider::{ + DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, + RocksDBProviderFactory, StorageSettingsCache, }; -use reth_provider::{DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter}; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::fmt::Debug; @@ -46,8 +48,13 @@ impl Default for IndexStorageHistoryStage { impl Stage for IndexStorageHistoryStage where - Provider: - DBProvider + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader, + Provider: DBProvider + + PruneCheckpointWriter + + HistoryWriter + + PruneCheckpointReader + + StorageSettingsCache + + RocksDBProviderFactory + + reth_provider::NodePrimitivesProvider, { /// Return the id of the stage fn id(&self) -> StageId { @@ -96,14 +103,22 @@ where let mut range = input.next_block_range(); let first_sync = input.checkpoint().block_number == 0; + // Check if we're using RocksDB for storage history + let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; + // On first sync we might have history coming from genesis. We clear the table since it's // faster to rebuild from scratch. if first_sync { - provider.tx_ref().clear::()?; + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + provider.rocksdb_provider().clear::()?; + } else { + provider.tx_ref().clear::()?; + } range = 0..=*input.next_block_range().end(); } - info!(target: "sync::stages::index_storage_history::exec", ?first_sync, "Collecting indices"); + info!(target: "sync::stages::index_storage_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices"); let collector = collect_history_indices::<_, tables::StorageChangeSets, tables::StoragesHistory, _>( provider, @@ -116,16 +131,22 @@ where )?; info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::StoragesHistory, _>( - provider, - collector, - first_sync, - |AddressStorageKey((address, storage_key)), highest_block_number| { - StorageShardedKey::new(address, storage_key, highest_block_number) - }, - StorageShardedKey::decode_owned, - |key| AddressStorageKey((key.address, key.sharded_key.key)), - )?; + + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb = provider.rocksdb_provider(); + #[cfg(all(unix, feature = "rocksdb"))] + let rocksdb_batch = rocksdb.batch(); + #[cfg(not(all(unix, feature = "rocksdb")))] + let rocksdb_batch = (); + + let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; + + load_storage_history_via_writer(collector, first_sync, &mut writer)?; + + #[cfg(all(unix, feature = "rocksdb"))] + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } @@ -139,7 +160,46 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; + let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; + + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + use alloy_primitives::{Address, B256}; + use reth_db_api::cursor::DbCursorRO; + use std::collections::HashMap; + + let rocksdb = provider.rocksdb_provider(); + let rocksdb_batch = rocksdb.batch(); + let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; + + let changesets = provider + .tx_ref() + .cursor_dup_read::()? + .walk_range(BlockNumberAddress::range(range))? + .collect::, _>>()?; + + let mut to_unwind: HashMap<(Address, B256), u64> = HashMap::new(); + for (block_num_addr, storage) in changesets { + let addr = block_num_addr.address(); + let bn = block_num_addr.block_number(); + to_unwind + .entry((addr, storage.key)) + .and_modify(|min| *min = (*min).min(bn)) + .or_insert(bn); + } + + for ((address, storage_key), rem_index) in to_unwind { + writer.unwind_storage_history_shard(address, storage_key, rem_index)?; + } + + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + } + } else { + provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; + } Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } @@ -663,4 +723,227 @@ mod tests { Ok(()) } } + + #[cfg(all(unix, feature = "rocksdb"))] + mod rocksdb_tests { + use super::*; + use reth_provider::RocksDBProviderFactory; + use reth_storage_api::StorageSettings; + + /// Test that when `storages_history_in_rocksdb` is enabled, the stage + /// writes storage history indices to `RocksDB` instead of MDBX. + #[tokio::test] + async fn execute_writes_to_rocksdb_when_enabled() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let mdbx_table = db.table::().unwrap(); + assert!( + mdbx_table.is_empty(), + "MDBX StoragesHistory should be empty when RocksDB is enabled" + ); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should contain storage history"); + + let block_list = result.unwrap(); + let blocks: Vec = block_list.iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + } + + /// Test that unwind correctly removes block numbers from RocksDB when enabled. + #[tokio::test] + async fn unwind_deletes_from_rocksdb_when_enabled() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have data before unwind"); + let blocks_before: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks_before, (0..=10).collect::>()); + + let unwind_input = + UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should still have data after partial unwind"); + let blocks_after: Vec = result.unwrap().iter().collect(); + assert_eq!( + blocks_after, + (0..=5).collect::>(), + "Should only have blocks 0-5 after unwind to block 5" + ); + } + + /// Test that unwind to block 0 keeps only block 0's history. + #[tokio::test] + async fn unwind_to_zero_keeps_block_zero() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=5 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(5), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have data before unwind"); + + let unwind_input = + UnwindInput { checkpoint: StageCheckpoint::new(5), unwind_to: 0, bad_block: None }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should still have block 0 history"); + let blocks_after: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks_after, vec![0], "Should only have block 0 after unwinding to 0"); + } + + /// Test incremental sync merges new data with existing shards. + #[tokio::test] + async fn execute_incremental_sync() { + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + db.commit(|tx| { + for block in 0..=5 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(5), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some()); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=5).collect::>()); + + db.commit(|tx| { + for block in 6..=10 { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let result = rocksdb.get::(shard(u64::MAX)).unwrap(); + assert!(result.is_some(), "RocksDB should have merged data"); + let blocks: Vec = result.unwrap().iter().collect(); + assert_eq!(blocks, (0..=10).collect::>()); + } + } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 93158a62ed9..d7569bb8aba 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -1,9 +1,12 @@ //! Utils for `stages`. -use alloy_primitives::{Address, BlockNumber, TxNumber}; +use alloy_primitives::{Address, BlockNumber, TxNumber, B256}; use reth_config::config::EtlConfig; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, - models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey}, + models::{ + sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, + AccountBeforeTx, ShardedKey, + }, table::{Decode, Decompress, Table}, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, @@ -537,3 +540,173 @@ where segment, }) } + +/// Loads storage history indices into the database via `EitherWriter`. +/// +/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support +/// both MDBX and `RocksDB` backends. +/// +/// ## Process +/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes +/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the +/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored. +/// +/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid +/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key". +pub(crate) fn load_storage_history_via_writer( + mut collector: Collector, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + let mut current_key: Option<(Address, B256)> = None; + let mut current_list = Vec::::new(); + + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = StorageShardedKey::decode_owned(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); + } + + let partial_key = (sharded_key.address, sharded_key.sharded_key.key); + + if current_key != Some(partial_key) { + if let Some((prev_addr, prev_storage_key)) = current_key { + flush_storage_history_shards( + prev_addr, + prev_storage_key, + &mut current_list, + append_only, + writer, + )?; + } + + current_key = Some(partial_key); + current_list.clear(); + + if !append_only && + let Some(last_shard) = + writer.get_last_storage_history_shard(partial_key.0, partial_key.1)? + { + current_list.extend(last_shard.iter()); + } + } + + current_list.extend(new_list.iter()); + flush_storage_history_shards_partial( + partial_key.0, + partial_key.1, + &mut current_list, + append_only, + writer, + )?; + } + + if let Some((addr, storage_key)) = current_key { + flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?; + } + + Ok(()) +} + +/// Flushes complete shards for storage history, keeping the trailing partial shard buffered. +/// +/// Only flushes when we have more than one shard's worth of data, keeping the last +/// (possibly partial) shard for continued accumulation. +fn flush_storage_history_shards_partial( + address: Address, + storage_key: B256, + list: &mut Vec, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + if list.len() <= NUM_OF_INDICES_IN_SHARD { + return Ok(()); + } + + let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD; + + let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) { + num_full_shards - 1 + } else { + num_full_shards + }; + + if shards_to_flush == 0 { + return Ok(()); + } + + let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD; + let remainder = list.split_off(flush_len); + + for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) { + let highest = *chunk.last().expect("chunk is non-empty"); + let key = StorageShardedKey::new(address, storage_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk.iter().copied()); + + if append_only { + writer.append_storage_history(key, &value)?; + } else { + writer.upsert_storage_history(key, &value)?; + } + } + + *list = remainder; + Ok(()) +} + +/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard. +/// +/// The `u64::MAX` key for the final shard is an invariant that allows +/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental +/// sync for merging with new indices. +fn flush_storage_history_shards( + address: Address, + storage_key: B256, + list: &mut Vec, + append_only: bool, + writer: &mut EitherWriter<'_, CURSOR, N>, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + + DbCursorRO, +{ + if list.is_empty() { + return Ok(()); + } + + let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD); + + for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() { + let is_last = i == num_chunks - 1; + let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") }; + + let key = StorageShardedKey::new(address, storage_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk.iter().copied()); + + if append_only { + writer.append_storage_history(key, &value)?; + } else { + writer.upsert_storage_history(key, &value)?; + } + } + + list.clear(); + Ok(()) +} diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 16eced90dd6..e085e4d079b 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -13,7 +13,7 @@ use crate::{ providers::{history_info, HistoryInfo, StaticFileProvider, StaticFileProviderRWRefMut}, StaticFileProviderFactory, }; -use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber}; +use alloy_primitives::{map::HashMap, Address, BlockNumber, TxHash, TxNumber, B256}; use rayon::slice::ParallelSliceMut; use reth_db::{ cursor::{DbCursorRO, DbDupCursorRW}, @@ -512,6 +512,140 @@ where Self::RocksDB(batch) => batch.delete::(key), } } + + /// Appends a storage history entry (for first sync - more efficient). + pub fn append_storage_history( + &mut self, + key: StorageShardedKey, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.append(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Upserts a storage history entry (for incremental sync). + pub fn upsert_storage_history( + &mut self, + key: StorageShardedKey, + value: &BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.upsert(key, value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, value), + } + } + + /// Gets the last shard for an address and storage key (keyed with `u64::MAX`). + pub fn get_last_storage_history_shard( + &mut self, + address: Address, + storage_key: B256, + ) -> ProviderResult> { + let key = StorageShardedKey::last(address, storage_key); + match self { + Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.get::(key), + } + } + + /// Unwinds storage history by removing block numbers >= `rem_index` from shards. + /// + /// For `RocksDB`: reads the last shard, filters out blocks >= `rem_index`, + /// and either deletes (if empty) or updates with the filtered list. + /// + /// For Database: uses cursor-based shard unwinding logic similar to MDBX. + pub fn unwind_storage_history_shard( + &mut self, + address: Address, + storage_key: B256, + rem_index: BlockNumber, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => { + let start_key = StorageShardedKey::last(address, storage_key); + let partial_shard = unwind_storage_history_shards_cursor( + cursor, + start_key, + rem_index, + |storage_sharded_key| { + storage_sharded_key.address == address && + storage_sharded_key.sharded_key.key == storage_key + }, + )?; + + if !partial_shard.is_empty() { + cursor.insert( + StorageShardedKey::last(address, storage_key), + &BlockNumberList::new_pre_sorted(partial_shard), + )?; + } + Ok(()) + } + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => { + let key = StorageShardedKey::last(address, storage_key); + let shard_opt = batch.get::(key.clone())?; + + if let Some(shard) = shard_opt { + let filtered: Vec = + shard.iter().take_while(|&bn| bn < rem_index).collect(); + + if filtered.is_empty() { + batch.delete::(key)?; + } else { + batch.put::( + key, + &BlockNumberList::new_pre_sorted(filtered), + )?; + } + } + Ok(()) + } + } + } +} + +/// Unwinds storage history shards using a cursor, returning the partial shard to reinsert. +/// +/// This is the cursor-based logic for MDBX that handles multi-shard unwinding. +fn unwind_storage_history_shards_cursor( + cursor: &mut C, + start_key: StorageShardedKey, + block_number: BlockNumber, + mut shard_belongs_to_key: impl FnMut(&StorageShardedKey) -> bool, +) -> ProviderResult> +where + C: DbCursorRO + DbCursorRW, +{ + let mut item = cursor.seek_exact(start_key)?; + while let Some((sharded_key, list)) = item { + if !shard_belongs_to_key(&sharded_key) { + break + } + + cursor.delete_current()?; + + let first = list.iter().next().expect("List can't be empty"); + + if first >= block_number { + item = cursor.prev()?; + continue + } else if block_number <= sharded_key.sharded_key.highest_block_number { + return Ok(list.iter().take_while(|i| *i < block_number).collect::>()) + } + return Ok(list.iter().collect::>()) + } + + Ok(Vec::new()) } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> From f01bb9bacc9041a568eedabf45dcd824f33e6e9b Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 19 Jan 2026 09:03:06 +0000 Subject: [PATCH 02/12] fix: move RocksDB storage history unwind to provider layer - Add storage_history_shards() to RocksDBProvider for multi-shard enumeration - Add unwind_storage_history_to() and clear_storage_history() to RocksDBBatch - Update DatabaseProvider::unwind_storage_history_indices to route to RocksDB - Simplify stage unwind to delegate to provider (matching account history pattern) - Add multi-shard unwind test - Use checked_sub(1) for block 0 edge case - Remove now-unused unwind_storage_history_shard from EitherWriter --- .../src/stages/index_storage_history.rs | 110 +++++++++------ crates/storage/provider/src/either_writer.rs | 91 ------------ .../src/providers/database/provider.rs | 67 ++++++--- .../src/providers/rocksdb/provider.rs | 133 ++++++++++++++++++ 4 files changed, 253 insertions(+), 148 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 5c01cca9bd6..8873285a741 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -4,7 +4,7 @@ use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, tables, - transaction::{DbTx, DbTxMut}, + transaction::DbTxMut, }; use reth_provider::{ DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, @@ -160,46 +160,7 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; - - if use_rocksdb { - #[cfg(all(unix, feature = "rocksdb"))] - { - use alloy_primitives::{Address, B256}; - use reth_db_api::cursor::DbCursorRO; - use std::collections::HashMap; - - let rocksdb = provider.rocksdb_provider(); - let rocksdb_batch = rocksdb.batch(); - let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; - - let changesets = provider - .tx_ref() - .cursor_dup_read::()? - .walk_range(BlockNumberAddress::range(range))? - .collect::, _>>()?; - - let mut to_unwind: HashMap<(Address, B256), u64> = HashMap::new(); - for (block_num_addr, storage) in changesets { - let addr = block_num_addr.address(); - let bn = block_num_addr.block_number(); - to_unwind - .entry((addr, storage.key)) - .and_modify(|min| *min = (*min).min(bn)) - .or_insert(bn); - } - - for ((address, storage_key), rem_index) in to_unwind { - writer.unwind_storage_history_shard(address, storage_key, rem_index)?; - } - - if let Some(batch) = writer.into_raw_rocksdb_batch() { - provider.set_pending_rocksdb_batch(batch); - } - } - } else { - provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; - } + provider.unwind_storage_history_indices_range(BlockNumberAddress::range(range))?; Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } @@ -945,5 +906,72 @@ mod tests { let blocks: Vec = result.unwrap().iter().collect(); assert_eq!(blocks, (0..=10).collect::>()); } + + /// Test multi-shard unwind correctly handles shards that span across unwind boundary. + #[tokio::test] + async fn unwind_multi_shard() { + use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD; + + let db = TestStageDB::default(); + + db.factory.set_storage_settings_cache( + StorageSettings::legacy().with_storages_history_in_rocksdb(true), + ); + + let num_blocks = (NUM_OF_INDICES_IN_SHARD * 2 + 100) as u64; + + db.commit(|tx| { + for block in 0..num_blocks { + tx.put::( + block, + StoredBlockBodyIndices { tx_count: 3, ..Default::default() }, + )?; + tx.put::( + block_number_address(block), + storage(STORAGE_KEY), + )?; + } + Ok(()) + }) + .unwrap(); + + let input = ExecInput { target: Some(num_blocks - 1), ..Default::default() }; + let mut stage = IndexStorageHistoryStage::default(); + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.execute(&provider, input).unwrap(); + assert_eq!( + out, + ExecOutput { checkpoint: StageCheckpoint::new(num_blocks - 1), done: true } + ); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let shards = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap(); + assert!(shards.len() >= 2, "Should have at least 2 shards for {} blocks", num_blocks); + + let unwind_to = NUM_OF_INDICES_IN_SHARD as u64 + 50; + let unwind_input = UnwindInput { + checkpoint: StageCheckpoint::new(num_blocks - 1), + unwind_to, + bad_block: None, + }; + let provider = db.factory.database_provider_rw().unwrap(); + let out = stage.unwind(&provider, unwind_input).unwrap(); + assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to) }); + provider.commit().unwrap(); + + let rocksdb = db.factory.rocksdb_provider(); + let shards_after = rocksdb.storage_history_shards(ADDRESS, STORAGE_KEY).unwrap(); + assert!(!shards_after.is_empty(), "Should still have shards after unwind"); + + let all_blocks: Vec = + shards_after.iter().flat_map(|(_, list)| list.iter()).collect(); + assert_eq!( + all_blocks, + (0..=unwind_to).collect::>(), + "Should only have blocks 0 to {} after unwind", + unwind_to + ); + } } } diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index e085e4d079b..c6ba79d0311 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -555,97 +555,6 @@ where Self::RocksDB(batch) => batch.get::(key), } } - - /// Unwinds storage history by removing block numbers >= `rem_index` from shards. - /// - /// For `RocksDB`: reads the last shard, filters out blocks >= `rem_index`, - /// and either deletes (if empty) or updates with the filtered list. - /// - /// For Database: uses cursor-based shard unwinding logic similar to MDBX. - pub fn unwind_storage_history_shard( - &mut self, - address: Address, - storage_key: B256, - rem_index: BlockNumber, - ) -> ProviderResult<()> { - match self { - Self::Database(cursor) => { - let start_key = StorageShardedKey::last(address, storage_key); - let partial_shard = unwind_storage_history_shards_cursor( - cursor, - start_key, - rem_index, - |storage_sharded_key| { - storage_sharded_key.address == address && - storage_sharded_key.sharded_key.key == storage_key - }, - )?; - - if !partial_shard.is_empty() { - cursor.insert( - StorageShardedKey::last(address, storage_key), - &BlockNumberList::new_pre_sorted(partial_shard), - )?; - } - Ok(()) - } - Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), - #[cfg(all(unix, feature = "rocksdb"))] - Self::RocksDB(batch) => { - let key = StorageShardedKey::last(address, storage_key); - let shard_opt = batch.get::(key.clone())?; - - if let Some(shard) = shard_opt { - let filtered: Vec = - shard.iter().take_while(|&bn| bn < rem_index).collect(); - - if filtered.is_empty() { - batch.delete::(key)?; - } else { - batch.put::( - key, - &BlockNumberList::new_pre_sorted(filtered), - )?; - } - } - Ok(()) - } - } - } -} - -/// Unwinds storage history shards using a cursor, returning the partial shard to reinsert. -/// -/// This is the cursor-based logic for MDBX that handles multi-shard unwinding. -fn unwind_storage_history_shards_cursor( - cursor: &mut C, - start_key: StorageShardedKey, - block_number: BlockNumber, - mut shard_belongs_to_key: impl FnMut(&StorageShardedKey) -> bool, -) -> ProviderResult> -where - C: DbCursorRO + DbCursorRW, -{ - let mut item = cursor.seek_exact(start_key)?; - while let Some((sharded_key, list)) = item { - if !shard_belongs_to_key(&sharded_key) { - break - } - - cursor.delete_current()?; - - let first = list.iter().next().expect("List can't be empty"); - - if first >= block_number { - item = cursor.prev()?; - continue - } else if block_number <= sharded_key.sharded_key.highest_block_number { - return Ok(list.iter().take_while(|i| *i < block_number).collect::>()) - } - return Ok(list.iter().collect::>()) - } - - Ok(Vec::new()) } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index a8032ae66a3..9de97ba87d1 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3005,25 +3005,60 @@ impl HistoryWriter for DatabaseProvi .collect::>(); storage_changesets.sort_by_key(|(address, key, _)| (*address, *key)); - let mut cursor = self.tx.cursor_write::()?; - for &(address, storage_key, rem_index) in &storage_changesets { - let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>( - &mut cursor, - StorageShardedKey::last(address, storage_key), - rem_index, - |storage_sharded_key| { - storage_sharded_key.address == address && - storage_sharded_key.sharded_key.key == storage_key - }, - )?; + let use_rocksdb = self.cached_storage_settings().storages_history_in_rocksdb; + + if use_rocksdb { + #[cfg(all(unix, feature = "rocksdb"))] + { + let mut key_min_block: alloy_primitives::map::HashMap< + (Address, B256), + BlockNumber, + > = alloy_primitives::map::HashMap::with_capacity_and_hasher( + storage_changesets.len(), + Default::default(), + ); + for &(address, storage_key, block_number) in &storage_changesets { + key_min_block + .entry((address, storage_key)) + .and_modify(|min| *min = (*min).min(block_number)) + .or_insert(block_number); + } + + let mut batch = self.rocksdb_provider.batch(); + for ((address, storage_key), min_block) in key_min_block { + match min_block.checked_sub(1) { + Some(keep_to) => { + batch.unwind_storage_history_to(address, storage_key, keep_to)? + } + None => batch.clear_storage_history(address, storage_key)?, + } + } + self.pending_rocksdb_batches.lock().push(batch.into_inner()); + } - // Check the last returned partial shard. - // If it's not empty, the shard needs to be reinserted. - if !partial_shard.is_empty() { - cursor.insert( + #[cfg(not(all(unix, feature = "rocksdb")))] + return Err(ProviderError::UnsupportedProvider); + } + + if !use_rocksdb { + let mut cursor = self.tx.cursor_write::()?; + for &(address, storage_key, rem_index) in &storage_changesets { + let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>( + &mut cursor, StorageShardedKey::last(address, storage_key), - &BlockNumberList::new_pre_sorted(partial_shard), + rem_index, + |storage_sharded_key| { + storage_sharded_key.address == address && + storage_sharded_key.sharded_key.key == storage_key + }, )?; + + if !partial_shard.is_empty() { + cursor.insert( + StorageShardedKey::last(address, storage_key), + &BlockNumberList::new_pre_sorted(partial_shard), + )?; + } } } diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 55c040f7f27..2bb339e48d4 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -708,6 +708,53 @@ impl RocksDBProvider { Ok(result) } + /// Returns all storage history shards for the given `(address, storage_key)` pair. + /// + /// Iterates through all shards in ascending `highest_block_number` order until + /// a different `(address, storage_key)` is encountered. + pub fn storage_history_shards( + &self, + address: Address, + storage_key: B256, + ) -> ProviderResult> { + let cf = self.get_cf_handle::()?; + + let start_key = StorageShardedKey::new(address, storage_key, 0u64); + let start_bytes = start_key.encode(); + + let iter = self + .0 + .db + .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward)); + + let mut result = Vec::new(); + for item in iter { + match item { + Ok((key_bytes, value_bytes)) => { + let key = StorageShardedKey::decode(&key_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + if key.address != address || key.sharded_key.key != storage_key { + break; + } + + let value = BlockNumberList::decompress(&value_bytes) + .map_err(|_| ProviderError::Database(DatabaseError::Decode))?; + + result.push((key, value)); + } + Err(e) => { + return Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + }))); + } + } + } + + Ok(result) + } + /// Unwinds account history indices for the given `(address, block_number)` pairs. /// /// Groups addresses by their minimum block number and calls the appropriate unwind @@ -1180,6 +1227,77 @@ impl<'a> RocksDBBatch<'a> { Ok(()) } + /// Unwinds storage history to keep only blocks `<= keep_to`. + /// + /// Handles multi-shard scenarios by: + /// 1. Loading all shards for the `(address, storage_key)` pair + /// 2. Finding the boundary shard containing `keep_to` + /// 3. Deleting all shards after the boundary + /// 4. Truncating the boundary shard to keep only indices `<= keep_to` + /// 5. Ensuring the last shard is keyed with `u64::MAX` + pub fn unwind_storage_history_to( + &mut self, + address: Address, + storage_key: B256, + keep_to: BlockNumber, + ) -> ProviderResult<()> { + let shards = self.provider.storage_history_shards(address, storage_key)?; + if shards.is_empty() { + return Ok(()); + } + + let boundary_idx = shards.iter().position(|(key, _)| { + key.sharded_key.highest_block_number == u64::MAX || + key.sharded_key.highest_block_number > keep_to + }); + + let Some(boundary_idx) = boundary_idx else { + let (last_key, last_value) = shards.last().expect("shards is non-empty"); + if last_key.sharded_key.highest_block_number != u64::MAX { + self.delete::(last_key.clone())?; + self.put::( + StorageShardedKey::last(address, storage_key), + last_value, + )?; + } + return Ok(()); + }; + + for (key, _) in shards.iter().skip(boundary_idx + 1) { + self.delete::(key.clone())?; + } + + let (boundary_key, boundary_list) = &shards[boundary_idx]; + self.delete::(boundary_key.clone())?; + + let kept_count = boundary_list.iter().take_while(|&b| b <= keep_to).count(); + + if kept_count == 0 { + if boundary_idx == 0 { + return Ok(()); + } + + let (prev_key, prev_value) = &shards[boundary_idx - 1]; + if prev_key.sharded_key.highest_block_number != u64::MAX { + self.delete::(prev_key.clone())?; + self.put::( + StorageShardedKey::last(address, storage_key), + prev_value, + )?; + } + return Ok(()); + } + + let new_last = + BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to)); + self.put::( + StorageShardedKey::last(address, storage_key), + &new_last, + )?; + + Ok(()) + } + /// Clears all account history shards for the given address. /// /// Used when unwinding from block 0 (i.e., removing all history). @@ -1190,6 +1308,21 @@ impl<'a> RocksDBBatch<'a> { } Ok(()) } + + /// Clears all storage history shards for the given `(address, storage_key)` pair. + /// + /// Used when unwinding from block 0 (i.e., removing all history for this storage slot). + pub fn clear_storage_history( + &mut self, + address: Address, + storage_key: B256, + ) -> ProviderResult<()> { + let shards = self.provider.storage_history_shards(address, storage_key)?; + for (key, _) in shards { + self.delete::(key)?; + } + Ok(()) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. From b74f9326f3625058360b598e5260c982d69bd201 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 19 Jan 2026 09:13:22 +0000 Subject: [PATCH 03/12] fix: backtick RocksDB in doc comment --- crates/stages/stages/src/stages/index_storage_history.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 8873285a741..1fc6a3cacdd 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -738,7 +738,7 @@ mod tests { assert_eq!(blocks, (0..=10).collect::>()); } - /// Test that unwind correctly removes block numbers from RocksDB when enabled. + /// Test that unwind correctly removes block numbers from `RocksDB` when enabled. #[tokio::test] async fn unwind_deletes_from_rocksdb_when_enabled() { let db = TestStageDB::default(); From 2d05085c43bb0d97b03b14f7064685828ff5a5b5 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 19 Jan 2026 09:15:51 +0000 Subject: [PATCH 04/12] refactor: align unwind_storage_history_to with account history pattern Use is_empty() check instead of separate count() to avoid double iteration, matching the unwind_account_history_to implementation. --- .../provider/src/providers/rocksdb/provider.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 2bb339e48d4..15c17a5c893 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1267,13 +1267,21 @@ impl<'a> RocksDBBatch<'a> { self.delete::(key.clone())?; } + // Process the boundary shard: filter out blocks > keep_to let (boundary_key, boundary_list) = &shards[boundary_idx]; + + // Delete the boundary shard (we'll either drop it or rewrite at u64::MAX) self.delete::(boundary_key.clone())?; - let kept_count = boundary_list.iter().take_while(|&b| b <= keep_to).count(); + // Build truncated list once; check emptiness directly (avoids double iteration) + let new_last = + BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to)); - if kept_count == 0 { + if new_last.is_empty() { + // Boundary shard is now empty. Previous shard becomes the last and must be keyed + // u64::MAX. if boundary_idx == 0 { + // Nothing left for this (address, storage_key) pair return Ok(()); } @@ -1288,8 +1296,6 @@ impl<'a> RocksDBBatch<'a> { return Ok(()); } - let new_last = - BlockNumberList::new_pre_sorted(boundary_list.iter().take_while(|&b| b <= keep_to)); self.put::( StorageShardedKey::last(address, storage_key), &new_last, From e8229f89ebc3a6e54d357429fdbc026ef42b5650 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 19 Jan 2026 09:38:14 +0000 Subject: [PATCH 05/12] chore: add comments to unwind_storage_history_to for parity with account history --- crates/storage/provider/src/providers/rocksdb/provider.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index 15c17a5c893..b9b3b999459 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -1246,11 +1246,14 @@ impl<'a> RocksDBBatch<'a> { return Ok(()); } + // Find the first shard that might contain blocks > keep_to. + // A shard is affected if it's the sentinel (u64::MAX) or its highest_block_number > keep_to let boundary_idx = shards.iter().position(|(key, _)| { key.sharded_key.highest_block_number == u64::MAX || key.sharded_key.highest_block_number > keep_to }); + // Repair path: no shards affected means all blocks <= keep_to, just ensure sentinel exists let Some(boundary_idx) = boundary_idx else { let (last_key, last_value) = shards.last().expect("shards is non-empty"); if last_key.sharded_key.highest_block_number != u64::MAX { @@ -1263,6 +1266,7 @@ impl<'a> RocksDBBatch<'a> { return Ok(()); }; + // Delete all shards strictly after the boundary (they are entirely > keep_to) for (key, _) in shards.iter().skip(boundary_idx + 1) { self.delete::(key.clone())?; } From b29684ff9dd48351b4accb1c955135f887075334 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Mon, 19 Jan 2026 11:54:05 +0000 Subject: [PATCH 06/12] chore: remove redundant comment --- crates/stages/stages/src/stages/index_storage_history.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 1fc6a3cacdd..5e16f30d40c 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -103,7 +103,6 @@ where let mut range = input.next_block_range(); let first_sync = input.checkpoint().block_number == 0; - // Check if we're using RocksDB for storage history let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; // On first sync we might have history coming from genesis. We clear the table since it's From a3371eaf9db49ebecf03d9460f8a945eafdd7912 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 20 Jan 2026 12:44:58 +0000 Subject: [PATCH 07/12] chore: align storage history with account history format - Rename load_storage_history_via_writer to load_storage_history (matching account history pattern) - Fix doc comment: 'account changesets' -> 'storage changesets' - Add inline comments throughout load_storage_history matching the account history style - Add doc comments for flush helpers referencing LoadMode equivalents - Add inline comments explaining shard flushing logic and u64::MAX invariant --- .../src/stages/index_storage_history.rs | 6 ++--- crates/stages/stages/src/stages/utils.rs | 27 +++++++++++++++++-- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index 5e16f30d40c..f81c0e80ee6 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -1,5 +1,5 @@ use super::collect_history_indices; -use crate::{stages::utils::load_storage_history_via_writer, StageCheckpoint, StageId}; +use crate::{stages::utils::load_storage_history, StageCheckpoint, StageId}; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{ models::{storage_sharded_key::StorageShardedKey, AddressStorageKey, BlockNumberAddress}, @@ -15,7 +15,7 @@ use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, Unw use std::fmt::Debug; use tracing::info; -/// Stage is indexing history the account changesets generated in +/// Stage is indexing history the storage changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// on index sharding take a look at [`tables::StoragesHistory`]. #[derive(Debug)] @@ -140,7 +140,7 @@ where let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; - load_storage_history_via_writer(collector, first_sync, &mut writer)?; + load_storage_history(collector, first_sync, &mut writer)?; #[cfg(all(unix, feature = "rocksdb"))] if let Some(batch) = writer.into_raw_rocksdb_batch() { diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index d7569bb8aba..f883b0c7dd1 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -553,7 +553,7 @@ where /// /// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid /// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key". -pub(crate) fn load_storage_history_via_writer( +pub(crate) fn load_storage_history( mut collector: Collector, append_only: bool, writer: &mut EitherWriter<'_, CURSOR, N>, @@ -564,6 +564,7 @@ where + DbCursorRO, { let mut current_key: Option<(Address, B256)> = None; + // Accumulator for block numbers where the current (address, storage_key) changed. let mut current_list = Vec::::new(); let total_entries = collector.len(); @@ -580,7 +581,9 @@ where let partial_key = (sharded_key.address, sharded_key.sharded_key.key); + // When (address, storage_key) changes, flush the previous key's shards and start fresh. if current_key != Some(partial_key) { + // Flush all remaining shards for the previous key (uses u64::MAX for last shard). if let Some((prev_addr, prev_storage_key)) = current_key { flush_storage_history_shards( prev_addr, @@ -594,6 +597,8 @@ where current_key = Some(partial_key); current_list.clear(); + // On incremental sync, merge with the existing last shard from the database. + // The last shard is stored with key (address, storage_key, u64::MAX) so we can find it. if !append_only && let Some(last_shard) = writer.get_last_storage_history_shard(partial_key.0, partial_key.1)? @@ -602,7 +607,10 @@ where } } + // Append new block numbers to the accumulator. current_list.extend(new_list.iter()); + + // Flush complete shards, keeping the last (partial) shard buffered. flush_storage_history_shards_partial( partial_key.0, partial_key.1, @@ -612,6 +620,7 @@ where )?; } + // Flush the final key's remaining shard. if let Some((addr, storage_key)) = current_key { flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?; } @@ -622,7 +631,10 @@ where /// Flushes complete shards for storage history, keeping the trailing partial shard buffered. /// /// Only flushes when we have more than one shard's worth of data, keeping the last -/// (possibly partial) shard for continued accumulation. +/// (possibly partial) shard for continued accumulation. This avoids writing a shard +/// that may need to be updated when more indices arrive. +/// +/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`]. fn flush_storage_history_shards_partial( address: Address, storage_key: B256, @@ -635,12 +647,15 @@ where CURSOR: DbCursorRW + DbCursorRO, { + // Nothing to flush if we haven't filled a complete shard yet. if list.len() <= NUM_OF_INDICES_IN_SHARD { return Ok(()); } let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD; + // Always keep at least one shard buffered for continued accumulation. + // If len is exact multiple of shard size, keep the last full shard. let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) { num_full_shards - 1 } else { @@ -651,9 +666,11 @@ where return Ok(()); } + // Split: flush the first N shards, keep the remainder buffered. let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD; let remainder = list.split_off(flush_len); + // Write each complete shard with its highest block number as the key. for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) { let highest = *chunk.last().expect("chunk is non-empty"); let key = StorageShardedKey::new(address, storage_key, highest); @@ -666,6 +683,7 @@ where } } + // Keep the remaining indices for the next iteration. *list = remainder; Ok(()) } @@ -675,6 +693,8 @@ where /// The `u64::MAX` key for the final shard is an invariant that allows /// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental /// sync for merging with new indices. +/// +/// Equivalent to [`load_indices`] with [`LoadMode::Flush`]. fn flush_storage_history_shards( address: Address, storage_key: B256, @@ -695,6 +715,9 @@ where for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() { let is_last = i == num_chunks - 1; + + // Use u64::MAX for the final shard's key. This invariant allows incremental sync + // to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging. let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") }; let key = StorageShardedKey::new(address, storage_key, highest); From f4820c5d8e89e5d3c3a3edb7b2ec4a370ffc2998 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 20 Jan 2026 12:50:10 +0000 Subject: [PATCH 08/12] refactor: use with_rocksdb_batch pattern for storage history - Add with_rocksdb_batch method to RocksDBProviderFactory trait - Refactor storage history stage to use with_rocksdb_batch like account history - This removes explicit #[cfg] blocks in the stage for cleaner code --- .../src/stages/index_storage_history.rs | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index f81c0e80ee6..b613c777d52 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -131,21 +131,12 @@ where info!(target: "sync::stages::index_storage_history::exec", "Loading indices into database"); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb = provider.rocksdb_provider(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_batch = rocksdb.batch(); - #[cfg(not(all(unix, feature = "rocksdb")))] - let rocksdb_batch = (); - - let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; - - load_storage_history(collector, first_sync, &mut writer)?; - - #[cfg(all(unix, feature = "rocksdb"))] - if let Some(batch) = writer.into_raw_rocksdb_batch() { - provider.set_pending_rocksdb_batch(batch); - } + provider.with_rocksdb_batch(|rocksdb_batch| { + let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; + load_storage_history(collector, first_sync, &mut writer) + .map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?; + Ok(((), writer.into_raw_rocksdb_batch())) + })?; Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true }) } From f68db568ff4a095141a4516ffa6a67a023e76a69 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 20 Jan 2026 14:27:57 +0000 Subject: [PATCH 09/12] chore: align with account history PR format - Add RocksDB clear() safety comment in storage history stage - Remove blank line between first_sync and use_rocksdb declarations - Add append_account_history, upsert_account_history, get_last_account_history_shard to EitherWriter for parity with account history PR - Update test to use upsert_account_history instead of put_account_history --- .../src/stages/index_storage_history.rs | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/crates/stages/stages/src/stages/index_storage_history.rs b/crates/stages/stages/src/stages/index_storage_history.rs index b613c777d52..e37dbaa4411 100644 --- a/crates/stages/stages/src/stages/index_storage_history.rs +++ b/crates/stages/stages/src/stages/index_storage_history.rs @@ -36,7 +36,7 @@ impl IndexStorageHistoryStage { etl_config: EtlConfig, prune_mode: Option, ) -> Self { - Self { commit_threshold: config.commit_threshold, prune_mode, etl_config } + Self { commit_threshold: config.commit_threshold, etl_config, prune_mode } } } @@ -49,9 +49,9 @@ impl Default for IndexStorageHistoryStage { impl Stage for IndexStorageHistoryStage where Provider: DBProvider - + PruneCheckpointWriter + HistoryWriter + PruneCheckpointReader + + PruneCheckpointWriter + StorageSettingsCache + RocksDBProviderFactory + reth_provider::NodePrimitivesProvider, @@ -102,13 +102,16 @@ where let mut range = input.next_block_range(); let first_sync = input.checkpoint().block_number == 0; - let use_rocksdb = provider.cached_storage_settings().storages_history_in_rocksdb; // On first sync we might have history coming from genesis. We clear the table since it's // faster to rebuild from scratch. if first_sync { if use_rocksdb { + // Note: RocksDB clear() executes immediately (not deferred to commit like MDBX), + // but this is safe for first_sync because if we crash before commit, the + // checkpoint stays at 0 and we'll just clear and rebuild again on restart. The + // source data (changesets) is intact. #[cfg(all(unix, feature = "rocksdb"))] provider.rocksdb_provider().clear::()?; } else { @@ -393,12 +396,12 @@ mod tests { async fn insert_index_second_half_shard() { // init let db = TestStageDB::default(); - let mut close_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::>(); + let mut almost_full_list = (1..=LAST_BLOCK_IN_FULL_SHARD - 1).collect::>(); // setup partial_setup(&db); db.commit(|tx| { - tx.put::(shard(u64::MAX), list(&close_full_list)).unwrap(); + tx.put::(shard(u64::MAX), list(&almost_full_list)).unwrap(); Ok(()) }) .unwrap(); @@ -407,12 +410,12 @@ mod tests { run(&db, LAST_BLOCK_IN_FULL_SHARD + 1, Some(LAST_BLOCK_IN_FULL_SHARD - 1)); // verify - close_full_list.push(LAST_BLOCK_IN_FULL_SHARD); + almost_full_list.push(LAST_BLOCK_IN_FULL_SHARD); let table = cast(db.table::().unwrap()); assert_eq!( table, BTreeMap::from([ - (shard(LAST_BLOCK_IN_FULL_SHARD), close_full_list.clone()), + (shard(LAST_BLOCK_IN_FULL_SHARD), almost_full_list.clone()), (shard(u64::MAX), vec![LAST_BLOCK_IN_FULL_SHARD + 1]) ]) ); @@ -421,9 +424,9 @@ mod tests { unwind(&db, LAST_BLOCK_IN_FULL_SHARD, LAST_BLOCK_IN_FULL_SHARD - 1); // verify initial state - close_full_list.pop(); + almost_full_list.pop(); let table = cast(db.table::().unwrap()); - assert_eq!(table, BTreeMap::from([(shard(u64::MAX), close_full_list)])); + assert_eq!(table, BTreeMap::from([(shard(u64::MAX), almost_full_list)])); } #[tokio::test] @@ -728,9 +731,9 @@ mod tests { assert_eq!(blocks, (0..=10).collect::>()); } - /// Test that unwind correctly removes block numbers from `RocksDB` when enabled. + /// Test that unwind works correctly when `storages_history_in_rocksdb` is enabled. #[tokio::test] - async fn unwind_deletes_from_rocksdb_when_enabled() { + async fn unwind_works_when_rocksdb_enabled() { let db = TestStageDB::default(); db.factory.set_storage_settings_cache( From 17f7cbfde0235cc9b70b7d893ea0585d1ce3f9e1 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 20 Jan 2026 18:36:45 +0000 Subject: [PATCH 10/12] refactor: remove dead code and fix rocksdb iterator path - Remove unused load_history_indices, load_indices, and LoadMode (replaced by EitherWriter-based load_account_history and load_storage_history) - Fix storage_history_shards iterator to use self.0 instead of self.0.db - Update doc comments to remove references to deleted functions --- crates/stages/stages/src/stages/utils.rs | 164 +----------------- .../src/providers/rocksdb/provider.rs | 1 - 2 files changed, 4 insertions(+), 161 deletions(-) diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index f883b0c7dd1..36aaa1493ee 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -8,8 +8,8 @@ use reth_db_api::{ AccountBeforeTx, ShardedKey, }, table::{Decode, Decompress, Table}, - transaction::{DbTx, DbTxMut}, - BlockNumberList, DatabaseError, + transaction::DbTx, + BlockNumberList, }; use reth_etl::Collector; use reth_primitives_traits::NodePrimitives; @@ -174,164 +174,9 @@ where Ok(collector) } -/// Given a [`Collector`] created by [`collect_history_indices`] it iterates all entries, loading -/// the indices into the database in shards. -/// -/// ## Process -/// Iterates over elements, grouping indices by their partial keys (e.g., `Address` or -/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length -/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial -/// key shard is stored. -pub(crate) fn load_history_indices( - provider: &Provider, - mut collector: Collector, - append_only: bool, - sharded_key_factory: impl Clone + Fn(P, u64) -> ::Key, - decode_key: impl Fn(Vec) -> Result<::Key, DatabaseError>, - get_partial: impl Fn(::Key) -> P, -) -> Result<(), StageError> -where - Provider: DBProvider, - H: Table, - P: Copy + Default + Eq, -{ - let mut write_cursor = provider.tx_ref().cursor_write::()?; - let mut current_partial = None; - let mut current_list = Vec::::new(); - - // observability - let total_entries = collector.len(); - let interval = (total_entries / 10).max(1); - - for (index, element) in collector.iter()?.enumerate() { - let (k, v) = element?; - let sharded_key = decode_key(k)?; - let new_list = BlockNumberList::decompress_owned(v)?; - - if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { - info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); - } - - // AccountsHistory: `Address`. - // StorageHistory: `Address.StorageKey`. - let partial_key = get_partial(sharded_key); - - if current_partial != Some(partial_key) { - // We have reached the end of this subset of keys so - // we need to flush its last indice shard. - if let Some(current) = current_partial { - load_indices( - &mut write_cursor, - current, - &mut current_list, - &sharded_key_factory, - append_only, - LoadMode::Flush, - )?; - } - - current_partial = Some(partial_key); - current_list.clear(); - - // If it's not the first sync, there might an existing shard already, so we need to - // merge it with the one coming from the collector - if !append_only && - let Some((_, last_database_shard)) = - write_cursor.seek_exact(sharded_key_factory(partial_key, u64::MAX))? - { - current_list.extend(last_database_shard.iter()); - } - } - - current_list.extend(new_list.iter()); - load_indices( - &mut write_cursor, - partial_key, - &mut current_list, - &sharded_key_factory, - append_only, - LoadMode::KeepLast, - )?; - } - - // There will be one remaining shard that needs to be flushed to DB. - if let Some(current) = current_partial { - load_indices( - &mut write_cursor, - current, - &mut current_list, - &sharded_key_factory, - append_only, - LoadMode::Flush, - )?; - } - - Ok(()) -} - -/// Shard and insert the indices list according to [`LoadMode`] and its length. -pub(crate) fn load_indices( - cursor: &mut C, - partial_key: P, - list: &mut Vec, - sharded_key_factory: &impl Fn(P, BlockNumber) -> ::Key, - append_only: bool, - mode: LoadMode, -) -> Result<(), StageError> -where - C: DbCursorRO + DbCursorRW, - H: Table, - P: Copy, -{ - if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() { - let chunks = list - .chunks(NUM_OF_INDICES_IN_SHARD) - .map(|chunks| chunks.to_vec()) - .collect::>>(); - - let mut iter = chunks.into_iter().peekable(); - while let Some(chunk) = iter.next() { - let mut highest = *chunk.last().expect("at least one index"); - - if !mode.is_flush() && iter.peek().is_none() { - *list = chunk; - } else { - if iter.peek().is_none() { - highest = u64::MAX; - } - let key = sharded_key_factory(partial_key, highest); - let value = BlockNumberList::new_pre_sorted(chunk); - - if append_only { - cursor.append(key, &value)?; - } else { - cursor.upsert(key, &value)?; - } - } - } - } - - Ok(()) -} - -/// Mode on how to load index shards into the database. -pub(crate) enum LoadMode { - /// Keep the last shard in memory and don't flush it to the database. - KeepLast, - /// Flush all shards into the database. - Flush, -} - -impl LoadMode { - const fn is_flush(&self) -> bool { - matches!(self, Self::Flush) - } -} - /// Loads account history indices into the database via `EitherWriter`. /// -/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support -/// both MDBX and `RocksDB` backends. +/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends. /// /// ## Process /// Iterates over elements, grouping indices by their address. It flushes indices to disk @@ -543,8 +388,7 @@ where /// Loads storage history indices into the database via `EitherWriter`. /// -/// Similar to [`load_history_indices`] but works with [`EitherWriter`] to support -/// both MDBX and `RocksDB` backends. +/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends. /// /// ## Process /// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index b9b3b999459..bd887bd962d 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -724,7 +724,6 @@ impl RocksDBProvider { let iter = self .0 - .db .iterator_cf(cf, IteratorMode::From(start_bytes.as_ref(), rocksdb::Direction::Forward)); let mut result = Vec::new(); From 1f3a4cadea218861a44828a3172c574c034e928f Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 20 Jan 2026 18:47:39 +0000 Subject: [PATCH 11/12] refactor: extract unwind_storage_history_indices to RocksDBProvider Match the same pattern as unwind_account_history_indices - move the HashMap grouping and batch logic from DatabaseProvider into RocksDBProvider for consistency. --- .../src/providers/database/provider.rs | 41 ++++--------------- .../src/providers/rocksdb/provider.rs | 31 ++++++++++++++ 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 9de97ba87d1..9a41bc243d4 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3005,42 +3005,15 @@ impl HistoryWriter for DatabaseProvi .collect::>(); storage_changesets.sort_by_key(|(address, key, _)| (*address, *key)); - let use_rocksdb = self.cached_storage_settings().storages_history_in_rocksdb; - - if use_rocksdb { + if self.cached_storage_settings().storages_history_in_rocksdb { #[cfg(all(unix, feature = "rocksdb"))] { - let mut key_min_block: alloy_primitives::map::HashMap< - (Address, B256), - BlockNumber, - > = alloy_primitives::map::HashMap::with_capacity_and_hasher( - storage_changesets.len(), - Default::default(), - ); - for &(address, storage_key, block_number) in &storage_changesets { - key_min_block - .entry((address, storage_key)) - .and_modify(|min| *min = (*min).min(block_number)) - .or_insert(block_number); - } - - let mut batch = self.rocksdb_provider.batch(); - for ((address, storage_key), min_block) in key_min_block { - match min_block.checked_sub(1) { - Some(keep_to) => { - batch.unwind_storage_history_to(address, storage_key, keep_to)? - } - None => batch.clear_storage_history(address, storage_key)?, - } - } - self.pending_rocksdb_batches.lock().push(batch.into_inner()); + let batch = + self.rocksdb_provider.unwind_storage_history_indices(&storage_changesets)?; + self.pending_rocksdb_batches.lock().push(batch); } - - #[cfg(not(all(unix, feature = "rocksdb")))] - return Err(ProviderError::UnsupportedProvider); - } - - if !use_rocksdb { + } else { + // Unwind the storage history index in MDBX. let mut cursor = self.tx.cursor_write::()?; for &(address, storage_key, rem_index) in &storage_changesets { let partial_shard = unwind_history_shards::<_, tables::StoragesHistory, _>( @@ -3053,6 +3026,8 @@ impl HistoryWriter for DatabaseProvi }, )?; + // Check the last returned partial shard. + // If it's not empty, the shard needs to be reinserted. if !partial_shard.is_empty() { cursor.insert( StorageShardedKey::last(address, storage_key), diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index bd887bd962d..db0c55957c3 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -785,6 +785,37 @@ impl RocksDBProvider { Ok(batch.into_inner()) } + /// Unwinds storage history indices for the given `(address, storage_key, block_number)` tuples. + /// + /// Groups by `(address, storage_key)` and finds the minimum block number for each. + /// For each key, keeps only blocks less than the minimum block + /// (i.e., removes the minimum block and all higher blocks). + /// + /// Returns a `WriteBatchWithTransaction` that can be committed later. + pub fn unwind_storage_history_indices( + &self, + storage_changesets: &[(Address, B256, BlockNumber)], + ) -> ProviderResult> { + let mut key_min_block: HashMap<(Address, B256), BlockNumber> = + HashMap::with_capacity_and_hasher(storage_changesets.len(), Default::default()); + for &(address, storage_key, block_number) in storage_changesets { + key_min_block + .entry((address, storage_key)) + .and_modify(|min| *min = (*min).min(block_number)) + .or_insert(block_number); + } + + let mut batch = self.batch(); + for ((address, storage_key), min_block) in key_min_block { + match min_block.checked_sub(1) { + Some(keep_to) => batch.unwind_storage_history_to(address, storage_key, keep_to)?, + None => batch.clear_storage_history(address, storage_key)?, + } + } + + Ok(batch.into_inner()) + } + /// Writes a batch of operations atomically. pub fn write_batch(&self, f: F) -> ProviderResult<()> where From 05187c3a74bfb343764bb8619c9049de08732334 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Tue, 20 Jan 2026 18:52:37 +0000 Subject: [PATCH 12/12] fix: remove unresolved doc links to deleted load_indices/LoadMode --- crates/stages/stages/src/stages/utils.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 36aaa1493ee..c5a8dee347c 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -252,8 +252,6 @@ where /// Only flushes when we have more than one shard's worth of data, keeping the last /// (possibly partial) shard for continued accumulation. This avoids writing a shard /// that may need to be updated when more indices arrive. -/// -/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`]. fn flush_account_history_shards_partial( address: Address, list: &mut Vec, @@ -310,8 +308,6 @@ where /// /// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address, /// u64::MAX)` to find the last shard during incremental sync for merging with new indices. -/// -/// Equivalent to [`load_indices`] with [`LoadMode::Flush`]. fn flush_account_history_shards( address: Address, list: &mut Vec, @@ -477,8 +473,6 @@ where /// Only flushes when we have more than one shard's worth of data, keeping the last /// (possibly partial) shard for continued accumulation. This avoids writing a shard /// that may need to be updated when more indices arrive. -/// -/// Equivalent to [`load_indices`] with [`LoadMode::KeepLast`]. fn flush_storage_history_shards_partial( address: Address, storage_key: B256, @@ -537,8 +531,6 @@ where /// The `u64::MAX` key for the final shard is an invariant that allows /// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental /// sync for merging with new indices. -/// -/// Equivalent to [`load_indices`] with [`LoadMode::Flush`]. fn flush_storage_history_shards( address: Address, storage_key: B256,