diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 74e9b6b679a..97766b66b73 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -1,21 +1,28 @@ //! Utils for `stages`. -use alloy_primitives::{Address, BlockNumber, TxNumber}; +use alloy_primitives::{Address, BlockNumber, TxNumber, B256}; use reth_config::config::EtlConfig; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, - models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey}, + models::{ + sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey, + AccountBeforeTx, ShardedKey, + }, table::{Decompress, Table}, + tables, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, }; use reth_etl::Collector; +use reth_primitives_traits::NodePrimitives; use reth_provider::{ - providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError, - StaticFileProviderFactory, + make_rocksdb_batch_arg, make_rocksdb_provider, providers::StaticFileProvider, + register_rocksdb_batch, to_range, BlockReader, DBProvider, EitherWriter, ProviderError, + RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache, }; use reth_stages_api::StageError; use reth_static_file_types::StaticFileSegment; -use reth_storage_api::ChangeSetReader; +use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider}; +use reth_storage_errors::provider::ProviderResult; use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; @@ -112,6 +119,46 @@ where Ok::<(), StageError>(()) } +/// Generic shard-and-write helper used by both account and storage history loaders. +/// +/// Chunks the list into shards, writes each shard via the provided write function, +/// and handles the last shard according to [`LoadMode`]. +fn shard_and_write( + list: &mut Vec, + mode: LoadMode, + mut write_fn: F, +) -> Result<(), StageError> +where + F: FnMut(Vec, BlockNumber) -> Result<(), StageError>, +{ + if list.len() <= NUM_OF_INDICES_IN_SHARD && !mode.is_flush() { + return Ok(()); + } + + let total = list.len(); + let mut start = 0; + + while start < total { + let end = (start + NUM_OF_INDICES_IN_SHARD).min(total); + let is_last = end == total; + let chunk = list[start..end].to_vec(); + + let highest = *chunk.last().expect("at least one index"); + + if !mode.is_flush() && is_last { + *list = chunk; + break; + } + + let highest = if is_last { u64::MAX } else { highest }; + write_fn(chunk, highest)?; + + start = end; + } + + Ok(()) +} + /// Collects account history indices using a provider that implements `ChangeSetReader`. pub(crate) fn collect_account_history_indices( provider: &Provider, @@ -179,6 +226,7 @@ where /// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length /// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial /// key shard is stored. +#[allow(dead_code)] pub(crate) fn load_history_indices( provider: &Provider, mut collector: Collector, @@ -263,6 +311,7 @@ where } /// Shard and insert the indices list according to [`LoadMode`] and its length. +#[allow(dead_code)] pub(crate) fn load_indices( cursor: &mut C, partial_key: P, @@ -321,6 +370,289 @@ impl LoadMode { } } +/// Loads storage history indices from a collector into the database using `EitherWriter`. +/// +/// This is a specialized version of [`load_history_indices`] for `tables::StoragesHistory` +/// that supports writing to either `MDBX` or `RocksDB` based on storage settings. +#[allow(dead_code)] +pub(crate) fn load_storages_history_indices( + provider: &Provider, + mut collector: Collector< + ::Key, + ::Value, + >, + append_only: bool, + sharded_key_factory: impl Clone + Fn(P, u64) -> StorageShardedKey, + decode_key: impl Fn(Vec) -> Result, + get_partial: impl Fn(StorageShardedKey) -> P, +) -> Result<(), StageError> +where + Provider: DBProvider + + NodePrimitivesProvider + + StorageSettingsCache + + RocksDBProviderFactory, + P: Copy + Default + Eq, +{ + // Create EitherWriter for storage history + #[allow(clippy::let_unit_value)] + let rocksdb = make_rocksdb_provider(provider); + #[allow(clippy::let_unit_value)] + let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb); + let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?; + + // Create read cursor for checking existing shards + let mut read_cursor = provider.tx_ref().cursor_read::()?; + + let mut current_partial = P::default(); + let mut current_list = Vec::::new(); + + // observability + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = decode_key(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing storage history indices"); + } + + let partial_key = get_partial(sharded_key); + + if current_partial != partial_key { + // Flush last shard for previous partial key + load_storages_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + current_partial = partial_key; + current_list.clear(); + + // If not first sync, merge with existing shard + if !append_only && + let Some((_, last_database_shard)) = + read_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))? + { + current_list.extend(last_database_shard.iter()); + } + } + + current_list.extend(new_list.iter()); + load_storages_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::KeepLast, + )?; + } + + // Flush remaining shard + load_storages_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + // Register RocksDB batch for commit + register_rocksdb_batch(provider, writer); + + Ok(()) +} + +/// Shard and insert storage history indices according to [`LoadMode`] and list length. +#[allow(dead_code)] +fn load_storages_history_shard( + writer: &mut EitherWriter<'_, CURSOR, N>, + partial_key: P, + list: &mut Vec, + sharded_key_factory: &impl Fn(P, BlockNumber) -> StorageShardedKey, + _append_only: bool, + mode: LoadMode, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + DbCursorRO, + P: Copy, +{ + shard_and_write(list, mode, |chunk, highest| { + let key = sharded_key_factory(partial_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + Ok(writer.put_storage_history(key, &value)?) + }) +} + +/// Loads account history indices from a collector into the database using `EitherWriter`. +/// +/// This is a specialized version of [`load_history_indices`] for `tables::AccountsHistory` +/// that supports writing to either `MDBX` or `RocksDB` based on storage settings. +#[allow(dead_code)] +pub(crate) fn load_accounts_history_indices( + provider: &Provider, + mut collector: Collector< + ::Key, + ::Value, + >, + append_only: bool, + sharded_key_factory: impl Clone + Fn(P, u64) -> ShardedKey
, + decode_key: impl Fn(Vec) -> Result, DatabaseError>, + get_partial: impl Fn(ShardedKey
) -> P, +) -> Result<(), StageError> +where + Provider: DBProvider + + NodePrimitivesProvider + + StorageSettingsCache + + RocksDBProviderFactory, + P: Copy + Default + Eq, +{ + // Create EitherWriter for account history + #[allow(clippy::let_unit_value)] + let rocksdb = make_rocksdb_provider(provider); + #[allow(clippy::let_unit_value)] + let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb); + let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?; + + // Create read cursor for checking existing shards + let mut read_cursor = provider.tx_ref().cursor_read::()?; + + let mut current_partial = P::default(); + let mut current_list = Vec::::new(); + + // observability + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = decode_key(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing account history indices"); + } + + let partial_key = get_partial(sharded_key); + + if current_partial != partial_key { + // Flush last shard for previous partial key + load_accounts_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + current_partial = partial_key; + current_list.clear(); + + // If not first sync, merge with existing shard + if !append_only && + let Some((_, last_database_shard)) = + read_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))? + { + current_list.extend(last_database_shard.iter()); + } + } + + current_list.extend(new_list.iter()); + load_accounts_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::KeepLast, + )?; + } + + // Flush remaining shard + load_accounts_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + // Register RocksDB batch for commit + register_rocksdb_batch(provider, writer); + + Ok(()) +} + +/// Shard and insert account history indices according to [`LoadMode`] and list length. +#[allow(dead_code)] +fn load_accounts_history_shard( + writer: &mut EitherWriter<'_, CURSOR, N>, + partial_key: P, + list: &mut Vec, + sharded_key_factory: &impl Fn(P, BlockNumber) -> ShardedKey
, + _append_only: bool, + mode: LoadMode, +) -> Result<(), StageError> +where + N: NodePrimitives, + CURSOR: DbCursorRW + DbCursorRO, + P: Copy, +{ + shard_and_write(list, mode, |chunk, highest| { + let key = sharded_key_factory(partial_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + Ok(writer.put_account_history(key, &value)?) + }) +} + +/// Unwinds storage history shards using `EitherWriter` for `RocksDB` support. +/// +/// This reimplements the shard unwinding logic with support for both MDBX and `RocksDB`. +/// Walks through shards for a given key, deleting those >= unwind point and preserving +/// indices below the unwind point. +#[allow(dead_code)] +pub(crate) fn unwind_storages_history_shards( + writer: &mut EitherWriter<'_, CURSOR, N>, + address: Address, + storage_key: B256, + block_number: BlockNumber, +) -> ProviderResult<()> +where + N: NodePrimitives, + CURSOR: DbCursorRW + DbCursorRO, +{ + writer.unwind_storage_history_shards(address, storage_key, block_number) +} + +/// Unwinds account history shards using `EitherWriter` for `RocksDB` support. +/// +/// This reimplements the shard unwinding logic with support for both MDBX and `RocksDB`. +/// Walks through shards for a given key, deleting those >= unwind point and preserving +/// indices below the unwind point. +#[allow(dead_code)] +pub(crate) fn unwind_accounts_history_shards( + writer: &mut EitherWriter<'_, CURSOR, N>, + address: Address, + block_number: BlockNumber, +) -> ProviderResult<()> +where + N: NodePrimitives, + CURSOR: DbCursorRW + DbCursorRO, +{ + writer.unwind_account_history_shards(address, block_number) +} + /// Called when database is ahead of static files. Attempts to find the first block we are missing /// transactions for. pub(crate) fn missing_static_data_error( diff --git a/crates/storage/db-api/src/models/metadata.rs b/crates/storage/db-api/src/models/metadata.rs index 6586c8b7f46..7488ea14b85 100644 --- a/crates/storage/db-api/src/models/metadata.rs +++ b/crates/storage/db-api/src/models/metadata.rs @@ -44,9 +44,9 @@ impl StorageSettings { receipts_in_static_files: true, transaction_senders_in_static_files: true, account_changesets_in_static_files: true, - storages_history_in_rocksdb: false, - transaction_hash_numbers_in_rocksdb: false, - account_history_in_rocksdb: false, + storages_history_in_rocksdb: true, + transaction_hash_numbers_in_rocksdb: true, + account_history_in_rocksdb: true, } } diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index a5f612eda1e..66286cf6196 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -38,6 +38,71 @@ use reth_storage_api::{ChangeSetReader, DBProvider, NodePrimitivesProvider, Stor use reth_storage_errors::provider::ProviderResult; use strum::{Display, EnumIs}; +/// Collects shards to unwind from a `RocksDB` reverse iterator. +/// +/// This is a generic helper for the `RocksDB` unwind logic used by both account and storage +/// history. It iterates through shards from highest to lowest block number, collecting shards to +/// delete and identifying any partial shard that needs to be preserved. +/// +/// # Arguments +/// * `iter` - An iterator yielding `(K, BlockNumberList)` pairs in reverse order +/// * `belongs_to_target` - Predicate that returns `true` if the key belongs to the target being +/// unwound +/// * `highest_block_number` - Function to extract the highest block number from the key +/// * `block_number` - The unwind target block number +/// +/// # Returns +/// A tuple of `(shards_to_delete, partial_shard_to_keep)` where: +/// - `shards_to_delete` contains all keys that should be deleted +/// - `partial_shard_to_keep` contains block numbers to preserve if a boundary shard was found +#[cfg(all(unix, feature = "rocksdb"))] +fn collect_shards_for_unwind( + iter: I, + belongs_to_target: impl Fn(&K) -> bool, + highest_block_number: impl Fn(&K) -> BlockNumber, + block_number: BlockNumber, +) -> Result<(Vec, Option>), E> +where + I: Iterator>, +{ + let mut shards_to_delete = Vec::new(); + let mut partial_shard_to_keep: Option> = None; + + for result in iter { + let (key, list) = result?; + + if !belongs_to_target(&key) { + break; + } + + shards_to_delete.push(key); + let key = shards_to_delete.last().unwrap(); + + let first = list.iter().next().expect("List can't be empty"); + + // Case 1: Entire shard is at or above the unwinding point - keep it deleted + if first >= block_number { + continue; + } + + // Case 2: Boundary shard - spans across the unwinding point + if block_number <= highest_block_number(key) { + let indices_to_keep: Vec<_> = list.iter().take_while(|i| *i < block_number).collect(); + if !indices_to_keep.is_empty() { + partial_shard_to_keep = Some(indices_to_keep); + } + break; + } + + // Case 3: Entire shard is below the unwinding point - keep all indices + let indices_to_keep: Vec<_> = list.iter().collect(); + partial_shard_to_keep = Some(indices_to_keep); + break; + } + + Ok((shards_to_delete, partial_shard_to_keep)) +} + /// Type alias for [`EitherReader`] constructors. type EitherReaderTy<'a, P, T> = EitherReader<'a, CursorTy<

::Tx, T>,

::Primitives>; @@ -108,6 +173,67 @@ pub enum EitherWriter<'a, CURSOR, N> { RocksDB(RocksDBBatch<'a>), } +/// Creates a `RocksDB` batch from the provider for use in [`EitherWriter`] constructors. +/// +/// On `RocksDB`-enabled builds, returns a real batch. +/// On other builds, returns `()` to allow the same API without feature gates. +/// +/// The `rocksdb` parameter should be obtained from [`make_rocksdb_provider`]. +#[cfg(all(unix, feature = "rocksdb"))] +pub fn make_rocksdb_batch_arg( + rocksdb: &crate::providers::rocksdb::RocksDBProvider, +) -> RocksBatchArg<'_> { + rocksdb.batch() +} + +/// Stub for non-`RocksDB` builds. +#[cfg(not(all(unix, feature = "rocksdb")))] +pub const fn make_rocksdb_batch_arg(_rocksdb: &T) -> RocksBatchArg<'static> {} + +/// Gets the `RocksDB` provider from a provider that implements [`crate::RocksDBProviderFactory`]. +/// +/// On `RocksDB`-enabled builds, returns the real provider. +/// On other builds, returns `()` to allow the same API without feature gates. +/// +/// This should be called first, and the result passed to [`make_rocksdb_batch_arg`]. +/// The returned value must be kept alive for as long as the batch is used. +#[cfg(all(unix, feature = "rocksdb"))] +pub fn make_rocksdb_provider

(provider: &P) -> crate::providers::rocksdb::RocksDBProvider +where + P: crate::RocksDBProviderFactory, +{ + provider.rocksdb_provider() +} + +/// Stub for non-`RocksDB` builds. +#[cfg(not(all(unix, feature = "rocksdb")))] +pub const fn make_rocksdb_provider

(_provider: &P) {} + +/// Registers a `RocksDB` batch extracted from an [`EitherWriter`] with the provider. +/// +/// This should be called after operations on an [`EitherWriter`] that may use `RocksDB`, +/// to ensure the batch is committed when the provider commits. +/// +/// On non-`RocksDB` builds, this is a no-op. +#[cfg(all(unix, feature = "rocksdb"))] +pub fn register_rocksdb_batch(provider: &P, writer: EitherWriter<'_, CURSOR, N>) +where + P: crate::RocksDBProviderFactory, + N: NodePrimitives, +{ + if let Some(batch) = writer.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } +} + +/// Stub for non-`RocksDB` builds. +#[cfg(not(all(unix, feature = "rocksdb")))] +pub fn register_rocksdb_batch(_provider: &P, _writer: EitherWriter<'_, CURSOR, N>) +where + N: NodePrimitives, +{ +} + impl<'a> EitherWriter<'a, (), ()> { /// Creates a new [`EitherWriter`] for receipts based on storage settings and prune modes. pub fn new_receipts

( @@ -479,6 +605,99 @@ where Self::RocksDB(batch) => batch.delete::(key), } } + + /// Unwinds storage history shards for a given address and storage key. + /// + /// Walks through all shards for the given key, collecting indices below the unwind point, + /// then deletes all shards and reinserts the kept indices as a single sentinel shard. + pub fn unwind_storage_history_shards( + &mut self, + address: Address, + storage_key: B256, + block_number: BlockNumber, + ) -> ProviderResult<()> { + let start_key = StorageShardedKey::last(address, storage_key); + + match self { + Self::Database(cursor) => { + // Walk through shards from highest to lowest, following the same algorithm + // as unwind_history_shards in provider.rs + let mut item = cursor.seek_exact(start_key.clone())?; + + while let Some((sharded_key, list)) = item { + // Check if shard belongs to this (address, storage_key) + if sharded_key.address != address || sharded_key.sharded_key.key != storage_key + { + break; + } + + // Delete this shard + cursor.delete_current()?; + + // Get the first (lowest) block number in this shard + let first = list.iter().next().expect("List can't be empty"); + + // Case 1: Entire shard is at or above the unwinding point + // Keep it deleted (already done above) and continue to next shard + if first >= block_number { + item = cursor.prev()?; + continue; + } + + // Case 2: Boundary shard - spans across the unwinding point + // Reinsert only indices below unwind point, then STOP + if block_number <= sharded_key.sharded_key.highest_block_number { + let indices_to_keep: Vec<_> = + list.iter().take_while(|i| *i < block_number).collect(); + if !indices_to_keep.is_empty() { + cursor.insert( + start_key, + &BlockNumberList::new_pre_sorted(indices_to_keep), + )?; + } + return Ok(()); + } + + // Case 3: Entire shard is below the unwinding point + // Reinsert all indices, then STOP (preserves earlier shards) + let indices_to_keep: Vec<_> = list.iter().collect(); + cursor.insert(start_key, &BlockNumberList::new_pre_sorted(indices_to_keep))?; + return Ok(()); + } + + Ok(()) + } + Self::StaticFile(..) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => { + let provider = batch.provider(); + let iter = + provider.iter_from_reverse::(start_key.clone())?; + + let (shards_to_delete, partial_shard_to_keep) = collect_shards_for_unwind( + iter, + |k: &StorageShardedKey| { + k.address == address && k.sharded_key.key == storage_key + }, + |k| k.sharded_key.highest_block_number, + block_number, + )?; + + for key in shards_to_delete { + batch.delete::(key)?; + } + + if let Some(indices) = partial_shard_to_keep { + batch.put::( + start_key, + &BlockNumberList::new_pre_sorted(indices), + )?; + } + + Ok(()) + } + } + } } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> @@ -513,6 +732,92 @@ where Self::RocksDB(batch) => batch.delete::(key), } } + + /// Unwinds account history shards for a given address. + /// + /// Walks through all shards for the given address, following the same algorithm + /// as `unwind_history_shards` in provider.rs: only delete/modify shards at or above + /// the unwind point, preserving earlier shards. + pub fn unwind_account_history_shards( + &mut self, + address: Address, + block_number: BlockNumber, + ) -> ProviderResult<()> { + let start_key = ShardedKey::last(address); + + match self { + Self::Database(cursor) => { + // Walk through shards from highest to lowest + let mut item = cursor.seek_exact(start_key.clone())?; + + while let Some((sharded_key, list)) = item { + // Check if shard belongs to this address + if sharded_key.key != address { + break; + } + + // Delete this shard + cursor.delete_current()?; + + // Get the first (lowest) block number in this shard + let first = list.iter().next().expect("List can't be empty"); + + // Case 1: Entire shard is at or above the unwinding point + if first >= block_number { + item = cursor.prev()?; + continue; + } + + // Case 2: Boundary shard - spans across the unwinding point + if block_number <= sharded_key.highest_block_number { + let indices_to_keep: Vec<_> = + list.iter().take_while(|i| *i < block_number).collect(); + if !indices_to_keep.is_empty() { + cursor.insert( + start_key, + &BlockNumberList::new_pre_sorted(indices_to_keep), + )?; + } + return Ok(()); + } + + // Case 3: Entire shard is below the unwinding point + let indices_to_keep: Vec<_> = list.iter().collect(); + cursor.insert(start_key, &BlockNumberList::new_pre_sorted(indices_to_keep))?; + return Ok(()); + } + + Ok(()) + } + Self::StaticFile(..) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => { + let provider = batch.provider(); + let iter = + provider.iter_from_reverse::(start_key.clone())?; + + let (shards_to_delete, partial_shard_to_keep) = collect_shards_for_unwind( + iter, + |k: &ShardedKey

| k.key == address, + |k| k.highest_block_number, + block_number, + )?; + + for key in shards_to_delete { + batch.delete::(key)?; + } + + if let Some(indices) = partial_shard_to_keep { + batch.put::( + start_key, + &BlockNumberList::new_pre_sorted(indices), + )?; + } + + Ok(()) + } + } + } } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4b23735da27..1710fcb127d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -471,7 +471,7 @@ impl DatabaseProvider(start.elapsed()) }); - // RocksDB writes + // RocksDB writes (batches are pushed to pending_batches inside write_blocks_data) let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| { s.spawn(|| { let start = Instant::now(); @@ -543,10 +543,11 @@ impl DatabaseProvider( + &self, + key: T::Key, + ) -> ProviderResult> { + let cf = self.get_cf_handle::()?; + let encoded_key = key.encode(); + let iter = self + .0 + .db + .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Reverse)); + Ok(RocksDBIterReverse { inner: iter, _marker: std::marker::PhantomData }) + } + /// Writes all `RocksDB` data for multiple blocks in parallel. /// /// This handles transaction hash numbers, account history, and storage history based on @@ -592,10 +612,10 @@ impl RocksDBProvider { account_history.entry(address).or_default().push(block_number); } } - for (address, blocks) in account_history { - let key = ShardedKey::new(address, u64::MAX); - let value = BlockNumberList::new_pre_sorted(blocks); - batch.put::(key, &value)?; + + // Write account history using proper shard append logic + for (address, indices) in account_history { + batch.append_account_history_shard(address, indices)?; } ctx.pending_batches.lock().push(batch.into_inner()); Ok(()) @@ -620,10 +640,10 @@ impl RocksDBProvider { } } } - for ((address, slot), blocks) in storage_history { - let key = StorageShardedKey::new(address, slot, u64::MAX); - let value = BlockNumberList::new_pre_sorted(blocks); - batch.put::(key, &value)?; + + // Write storage history using proper shard append logic + for ((address, slot), indices) in storage_history { + batch.append_storage_history_shard(address, slot, indices)?; } ctx.pending_batches.lock().push(batch.into_inner()); Ok(()) @@ -714,6 +734,91 @@ impl<'a> RocksDBBatch<'a> { pub fn into_inner(self) -> WriteBatchWithTransaction { self.inner } + + /// Appends indices to an account history shard with proper shard management. + /// + /// Loads the existing shard (if any), appends new indices, and rechunks into + /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit). + pub fn append_account_history_shard( + &mut self, + address: Address, + indices: impl IntoIterator, + ) -> ProviderResult<()> { + let last_key = ShardedKey::new(address, u64::MAX); + let last_shard_opt = self.provider.get::(last_key.clone())?; + let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty); + + last_shard.append(indices).map_err(ProviderError::other)?; + + // Fast path: all indices fit in one shard + if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 { + self.put::(last_key, &last_shard)?; + return Ok(()); + } + + // Slow path: rechunk into multiple shards + let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD); + let mut chunks_peekable = chunks.into_iter().peekable(); + + while let Some(chunk) = chunks_peekable.next() { + let shard = BlockNumberList::new_pre_sorted(chunk); + let highest_block_number = if chunks_peekable.peek().is_some() { + shard.iter().next_back().expect("`chunks` does not return empty list") + } else { + u64::MAX + }; + + self.put::( + ShardedKey::new(address, highest_block_number), + &shard, + )?; + } + + Ok(()) + } + + /// Appends indices to a storage history shard with proper shard management. + /// + /// Loads the existing shard (if any), appends new indices, and rechunks into + /// multiple shards if needed (respecting `NUM_OF_INDICES_IN_SHARD` limit). + pub fn append_storage_history_shard( + &mut self, + address: Address, + storage_key: B256, + indices: impl IntoIterator, + ) -> ProviderResult<()> { + let last_key = StorageShardedKey::last(address, storage_key); + let last_shard_opt = self.provider.get::(last_key.clone())?; + let mut last_shard = last_shard_opt.unwrap_or_else(BlockNumberList::empty); + + last_shard.append(indices).map_err(ProviderError::other)?; + + // Fast path: all indices fit in one shard + if last_shard.len() <= NUM_OF_INDICES_IN_SHARD as u64 { + self.put::(last_key, &last_shard)?; + return Ok(()); + } + + // Slow path: rechunk into multiple shards + let chunks = last_shard.iter().chunks(NUM_OF_INDICES_IN_SHARD); + let mut chunks_peekable = chunks.into_iter().peekable(); + + while let Some(chunk) = chunks_peekable.next() { + let shard = BlockNumberList::new_pre_sorted(chunk); + let highest_block_number = if chunks_peekable.peek().is_some() { + shard.iter().next_back().expect("`chunks` does not return empty list") + } else { + u64::MAX + }; + + self.put::( + StorageShardedKey::new(address, storage_key, highest_block_number), + &shard, + )?; + } + + Ok(()) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics. @@ -1030,6 +1135,60 @@ impl Iterator for RocksDBIter<'_, T> { } } +/// Result type for raw iterator items. +type RocksDBRawIterResult = Result<(Box<[u8]>, Box<[u8]>), rocksdb::Error>; + +/// Decodes an iterator result from `RocksDB` into a table key-value pair. +fn decode_iter_result( + result: RocksDBRawIterResult, +) -> Option> { + let (key_bytes, value_bytes) = match result { + Ok(kv) => kv, + Err(e) => { + return Some(Err(ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo { + message: e.to_string().into(), + code: -1, + })))) + } + }; + + // Decode key + let key = match ::decode(&key_bytes) { + Ok(k) => k, + Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))), + }; + + // Decompress value + let value = match T::Value::decompress(&value_bytes) { + Ok(v) => v, + Err(_) => return Some(Err(ProviderError::Database(DatabaseError::Decode))), + }; + + Some(Ok((key, value))) +} + +/// Reverse iterator over a `RocksDB` table (non-transactional). +/// +/// Yields decoded `(Key, Value)` pairs in reverse key order. +pub struct RocksDBIterReverse<'db, T: Table> { + inner: rocksdb::DBIteratorWithThreadMode<'db, OptimisticTransactionDB>, + _marker: std::marker::PhantomData, +} + +impl fmt::Debug for RocksDBIterReverse<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RocksDBIterReverse").field("table", &T::NAME).finish_non_exhaustive() + } +} + +impl Iterator for RocksDBIterReverse<'_, T> { + type Item = ProviderResult<(T::Key, T::Value)>; + + fn next(&mut self) -> Option { + decode_iter_result::(self.inner.next()?) + } +} + /// Iterator over a `RocksDB` table within a transaction. /// /// Yields decoded `(Key, Value)` pairs. Sees uncommitted writes.