diff --git a/crates/stages/stages/src/stages/index_account_history.rs b/crates/stages/stages/src/stages/index_account_history.rs index 25dbf104456..64e94b62343 100644 --- a/crates/stages/stages/src/stages/index_account_history.rs +++ b/crates/stages/stages/src/stages/index_account_history.rs @@ -1,16 +1,20 @@ -use crate::stages::utils::collect_history_indices; +use crate::stages::utils::{ + collect_account_history_indices, collect_history_indices, load_accounts_history_indices, + unwind_accounts_history_shards, +}; -use super::{collect_account_history_indices, load_history_indices}; use alloy_primitives::Address; use reth_config::config::{EtlConfig, IndexHistoryConfig}; use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut}; use reth_provider::{ - DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache, + DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, + RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment}; use reth_stages_api::{ ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, }; +use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider}; use std::fmt::Debug; use tracing::info; @@ -51,9 +55,11 @@ where + HistoryWriter + PruneCheckpointReader + PruneCheckpointWriter - + reth_storage_api::ChangeSetReader - + reth_provider::StaticFileProviderFactory - + StorageSettingsCache, + + ChangeSetReader + + StaticFileProviderFactory + + StorageSettingsCache + + NodePrimitivesProvider + + RocksDBProviderFactory, { /// Return the id of the stage fn id(&self) -> StageId { @@ -125,7 +131,7 @@ where }; info!(target: "sync::stages::index_account_history::exec", "Loading indices into database"); - load_history_indices::<_, tables::AccountsHistory, _>( + load_accounts_history_indices( provider, collector, first_sync, @@ -146,9 +152,28 @@ where let (range, unwind_progress, _) = input.unwind_block_range_with_threshold(self.commit_threshold); - provider.unwind_account_history_indices_range(range)?; + // Get changed accounts for the unwind range + let mut addresses = std::collections::HashSet::new(); + for block in *range.start()..=*range.end() { + let changesets = provider.account_block_changeset(block)?; + addresses.extend(changesets.into_iter().map(|cs| cs.address)); + } + + // Create EitherWriter for unwinding + #[allow(clippy::let_unit_value)] + let _rocksdb = reth_provider::make_rocksdb_provider!(provider); + #[allow(clippy::let_unit_value)] + let rocksdb_batch = reth_provider::make_rocksdb_batch_arg!(_rocksdb); + let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?; + + // Unwind shards for each changed address + for address in addresses { + unwind_accounts_history_shards(&mut writer, address, *range.start())?; + } + + // Register batch for commit + writer.register_for_commit(provider); - // from HistoryIndex higher than that number. Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) }) } } diff --git a/crates/stages/stages/src/stages/utils.rs b/crates/stages/stages/src/stages/utils.rs index 74e9b6b679a..af08f6d317c 100644 --- a/crates/stages/stages/src/stages/utils.rs +++ b/crates/stages/stages/src/stages/utils.rs @@ -5,17 +5,20 @@ use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey}, table::{Decompress, Table}, + tables, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, }; use reth_etl::Collector; +use reth_primitives_traits::NodePrimitives; use reth_provider::{ - providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError, - StaticFileProviderFactory, + providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError, + RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache, }; use reth_stages_api::StageError; use reth_static_file_types::StaticFileSegment; -use reth_storage_api::ChangeSetReader; +use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider}; +use reth_storage_errors::provider::ProviderResult; use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; @@ -356,3 +359,154 @@ where segment, }) } + +/// Loads account history indices from a collector into the provider using [`EitherWriter`]. +/// +/// This function handles both MDBX and `RocksDB` storage backends transparently. +pub(crate) fn load_accounts_history_indices( + provider: &Provider, + mut collector: Collector, BlockNumberList>, + append_only: bool, + sharded_key_factory: impl Clone + Fn(Address, u64) -> ShardedKey
, + decode_key: impl Fn(Vec) -> Result, DatabaseError>, + get_partial: impl Fn(ShardedKey
) -> Address, +) -> Result<(), StageError> +where + Provider: DBProvider + + NodePrimitivesProvider + + StorageSettingsCache + + RocksDBProviderFactory, +{ + // Create EitherWriter for accounts history + #[allow(clippy::let_unit_value)] + let _rocksdb = reth_provider::make_rocksdb_provider!(provider); + #[allow(clippy::let_unit_value)] + let rocksdb_batch = reth_provider::make_rocksdb_batch_arg!(_rocksdb); + let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?; + + let mut current_partial = Address::default(); + let mut current_list = Vec::::new(); + + // observability + let total_entries = collector.len(); + let interval = (total_entries / 10).max(1); + + for (index, element) in collector.iter()?.enumerate() { + let (k, v) = element?; + let sharded_key = decode_key(k)?; + let new_list = BlockNumberList::decompress_owned(v)?; + + if index > 0 && index.is_multiple_of(interval) && total_entries > 10 { + info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices"); + } + + let partial_key = get_partial(sharded_key); + + if current_partial != partial_key { + // Flush the last shard for the previous key (skip if empty, e.g. first iteration) + if !current_list.is_empty() { + load_accounts_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + } + + current_partial = partial_key; + current_list.clear(); + + // If not first sync, merge with existing last shard + if !append_only && + let Some(existing) = + writer.seek_last_shard(sharded_key_factory(current_partial, u64::MAX))? + { + current_list.extend(existing.iter()); + } + } + + current_list.extend(new_list.iter()); + load_accounts_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::KeepLast, + )?; + } + + // Flush remaining + load_accounts_history_shard( + &mut writer, + current_partial, + &mut current_list, + &sharded_key_factory, + append_only, + LoadMode::Flush, + )?; + + // Register for commit + writer.register_for_commit(provider); + + Ok(()) +} + +/// Shard and insert account history indices according to [`LoadMode`] and list length. +fn load_accounts_history_shard( + writer: &mut EitherWriter< + '_, + impl DbCursorRO + DbCursorRW, + N, + >, + partial_key: Address, + list: &mut Vec, + sharded_key_factory: &impl Fn(Address, BlockNumber) -> ShardedKey
, + append_only: bool, + mode: LoadMode, +) -> ProviderResult<()> { + if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() { + let chunks: Vec> = + list.chunks(NUM_OF_INDICES_IN_SHARD).map(|chunk| chunk.to_vec()).collect(); + + let mut iter = chunks.into_iter().peekable(); + while let Some(chunk) = iter.next() { + let mut highest = *chunk.last().expect("at least one index"); + + if !mode.is_flush() && iter.peek().is_none() { + *list = chunk; + } else { + if iter.peek().is_none() { + highest = u64::MAX; + } + let key = sharded_key_factory(partial_key, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + + if append_only { + writer.append_account_history(key, value)?; + } else { + writer.upsert_account_history(key, value)?; + } + } + } + } + + Ok(()) +} + +/// Unwinds account history shards for a given address starting from a block number. +/// +/// This handles both MDBX and `RocksDB` backends through the [`EitherWriter`] abstraction. +pub(crate) fn unwind_accounts_history_shards( + writer: &mut EitherWriter< + '_, + impl DbCursorRO + DbCursorRW, + N, + >, + address: Address, + from_block: BlockNumber, +) -> ProviderResult<()> { + writer.unwind_account_history_shards(address, from_block) +} diff --git a/crates/storage/db-api/src/cursor.rs b/crates/storage/db-api/src/cursor.rs index fac85af5b88..73bc369ee58 100644 --- a/crates/storage/db-api/src/cursor.rs +++ b/crates/storage/db-api/src/cursor.rs @@ -62,9 +62,15 @@ pub trait DbCursorRO { /// A read-only cursor over the dup table `T`. pub trait DbDupCursorRO { + /// Positions the cursor at the prev KV pair of the table, returning it. + fn prev_dup(&mut self) -> PairResult; + /// Positions the cursor at the next KV pair of the table, returning it. fn next_dup(&mut self) -> PairResult; + /// Positions the cursor at the last duplicate value of the current key. + fn last_dup(&mut self) -> ValueOnlyResult; + /// Positions the cursor at the next KV pair of the table, skipping duplicates. fn next_no_dup(&mut self) -> PairResult; diff --git a/crates/storage/db-api/src/mock.rs b/crates/storage/db-api/src/mock.rs index 9928a66c0d4..78a2aec1e14 100644 --- a/crates/storage/db-api/src/mock.rs +++ b/crates/storage/db-api/src/mock.rs @@ -296,6 +296,18 @@ impl DbDupCursorRO for CursorMock { Ok(None) } + /// Moves to the previous duplicate entry. + /// **Mock behavior**: Always returns `None`. + fn prev_dup(&mut self) -> PairResult { + Ok(None) + } + + /// Moves to the last duplicate entry. + /// **Mock behavior**: Always returns `None`. + fn last_dup(&mut self) -> ValueOnlyResult { + Ok(None) + } + /// Moves to the next entry with a different key. /// **Mock behavior**: Always returns `None`. fn next_no_dup(&mut self) -> PairResult { diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 5ca6eacb6c7..f432e76642d 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -158,11 +158,25 @@ impl DbCursorRO for Cursor { } impl DbDupCursorRO for Cursor { + /// Returns the previous `(key, value)` pair of a DUPSORT table. + fn prev_dup(&mut self) -> PairResult { + decode::(self.inner.prev_dup()) + } + /// Returns the next `(key, value)` pair of a DUPSORT table. fn next_dup(&mut self) -> PairResult { decode::(self.inner.next_dup()) } + /// Returns the last `value` of the current duplicate `key`. + fn last_dup(&mut self) -> ValueOnlyResult { + self.inner + .last_dup() + .map_err(|e| DatabaseError::Read(e.into()))? + .map(decode_one::) + .transpose() + } + /// Returns the next `(key, value)` pair skipping the duplicates. fn next_no_dup(&mut self) -> PairResult { decode::(self.inner.next_nodup()) diff --git a/crates/storage/provider/src/either_writer.rs b/crates/storage/provider/src/either_writer.rs index 5cc79d85227..753e5782ca7 100644 --- a/crates/storage/provider/src/either_writer.rs +++ b/crates/storage/provider/src/either_writer.rs @@ -288,6 +288,27 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> { } } + /// Registers this writer's `RocksDB` batch (if any) with the provider for later commit. + /// + /// This consumes the writer and extracts any pending `RocksDB` batch, registering it + /// with the provider to be committed atomically with MDBX and static file commits. + /// + /// For non-`RocksDB` variants, this is a no-op. + #[cfg(all(unix, feature = "rocksdb"))] + pub fn register_for_commit(self, provider: &P) { + if let Some(batch) = self.into_raw_rocksdb_batch() { + provider.set_pending_rocksdb_batch(batch); + } + } + + /// Registers this writer's `RocksDB` batch (if any) with the provider for later commit. + /// + /// Without the `rocksdb` feature, this is a no-op. + #[cfg(not(all(unix, feature = "rocksdb")))] + pub fn register_for_commit

(self, _provider: &P) { + // No-op when RocksDB feature is disabled + } + /// Increment the block number. /// /// Relevant only for [`Self::StaticFile`]. It is a no-op for [`Self::Database`]. @@ -546,6 +567,191 @@ where Self::RocksDB(batch) => batch.delete::(key), } } + + /// Appends an account history entry (append-only mode). + pub fn append_account_history( + &mut self, + key: ShardedKey

, + value: BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.append(key, &value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, &value), + } + } + + /// Upserts an account history entry. + pub fn upsert_account_history( + &mut self, + key: ShardedKey
, + value: BlockNumberList, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => Ok(cursor.upsert(key, &value)?), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.put::(key, &value), + } + } + + /// Seeks the last shard for an address (with `u64::MAX` highest block). + pub fn seek_last_shard( + &mut self, + key: ShardedKey
, + ) -> ProviderResult> { + match self { + Self::Database(cursor) => Ok(cursor.seek_exact(key)?.map(|(_, v)| v)), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => batch.get::(key), + } + } + + /// Unwinds account history shards for a given address starting from a block number. + /// + /// This removes all block numbers >= `from_block` from the history shards for the given + /// address. Empty shards are deleted, and the last shard is updated to have `u64::MAX` + /// as its highest block number (sentinel). + pub fn unwind_account_history_shards( + &mut self, + address: Address, + from_block: BlockNumber, + ) -> ProviderResult<()> { + match self { + Self::Database(cursor) => unwind_account_history_shards_db(cursor, address, from_block), + Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider), + #[cfg(all(unix, feature = "rocksdb"))] + Self::RocksDB(batch) => { + unwind_account_history_shards_rocksdb(batch, address, from_block) + } + } + } +} + +/// Unwinds account history shards in MDBX database. +/// +/// This follows the same algorithm as `unwind_history_shards`: +/// 1. Start from the last shard (`u64::MAX`) and walk backwards +/// 2. Delete each shard +/// 3. Return blocks < `from_block` for reinsertion at the last key +fn unwind_account_history_shards_db( + cursor: &mut CURSOR, + address: Address, + from_block: BlockNumber, +) -> ProviderResult<()> +where + CURSOR: DbCursorRW + DbCursorRO, +{ + // Start from the last shard for this address + let start_key = ShardedKey::new(address, u64::MAX); + let mut item = cursor.seek_exact(start_key)?; + + while let Some((sharded_key, list)) = item { + // If the shard does not belong to this address, break + if sharded_key.key != address { + break; + } + + // Always delete the current shard first + cursor.delete_current()?; + + // Get the first (lowest) block number in this shard + let first = list.iter().next().expect("List can't be empty"); + + // Case 1: Entire shard is at or above the unwinding point + if first >= from_block { + item = cursor.prev()?; + } + // Case 2: Boundary shard (spans across the unwinding point) + else if from_block <= sharded_key.highest_block_number { + // Return only blocks below the unwinding point for reinsertion + let remaining: Vec<_> = list.iter().take_while(|&i| i < from_block).collect(); + if !remaining.is_empty() { + let new_key = ShardedKey::new(address, u64::MAX); + let new_list = BlockNumberList::new_pre_sorted(remaining); + cursor.insert(new_key, &new_list)?; + } + return Ok(()); + } + // Case 3: Entire shard is below the unwinding point + else { + // Reinsert all blocks at the last key + let remaining: Vec<_> = list.iter().collect(); + let new_key = ShardedKey::new(address, u64::MAX); + let new_list = BlockNumberList::new_pre_sorted(remaining); + cursor.insert(new_key, &new_list)?; + return Ok(()); + } + } + + Ok(()) +} + +/// Unwinds account history shards in `RocksDB`. +#[cfg(all(unix, feature = "rocksdb"))] +fn unwind_account_history_shards_rocksdb( + batch: &mut RocksDBBatch<'_>, + address: Address, + from_block: BlockNumber, +) -> ProviderResult<()> { + use reth_db_api::models::sharded_key::NUM_OF_INDICES_IN_SHARD; + + // Collect all shards for this address + let shards = collect_shards_for_unwind::( + batch, + ShardedKey::new(address, 0), + |k| k.key == address, + )?; + + // Collect all block numbers < from_block across all shards + let mut remaining_blocks: Vec = Vec::new(); + for (key, list) in &shards { + remaining_blocks.extend(list.iter().filter(|&bn| bn < from_block)); + // Delete this shard + batch.delete::(key.clone())?; + } + + // Write back the remaining blocks in proper shards + if !remaining_blocks.is_empty() { + remaining_blocks.sort_unstable(); + remaining_blocks.dedup(); + + let chunks: Vec> = + remaining_blocks.chunks(NUM_OF_INDICES_IN_SHARD).map(|c| c.to_vec()).collect(); + + let mut iter = chunks.into_iter().peekable(); + while let Some(chunk) = iter.next() { + let highest = if iter.peek().is_none() { + u64::MAX + } else { + *chunk.last().expect("chunk is non-empty") + }; + let key = ShardedKey::new(address, highest); + let value = BlockNumberList::new_pre_sorted(chunk); + batch.put::(key, &value)?; + } + } + + Ok(()) +} + +/// Collects all shards for a given key prefix from `RocksDB`. +/// +/// This is a generic helper that iterates through `RocksDB` entries starting from +/// `start_key` and collects all entries where `key_matches` returns true. +#[cfg(all(unix, feature = "rocksdb"))] +fn collect_shards_for_unwind( + batch: &RocksDBBatch<'_>, + start_key: T::Key, + key_matches: impl Fn(&T::Key) -> bool, +) -> ProviderResult> +where + T: reth_db_api::table::Table, + T::Key: Clone, +{ + batch.collect_shards_for_key::(start_key, key_matches) } impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index bfab44cb2ac..02eaec32ac7 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -68,3 +68,53 @@ pub fn to_range>(bounds: R) -> std::ops::Range {{ + use $crate::RocksDBProviderFactory; + $provider.rocksdb_provider() + }}; +} + +/// Creates a `RocksDB` provider from a provider that implements `RocksDBProviderFactory`. +/// +/// When the `rocksdb` feature is enabled, this returns the `RocksDB` provider. +/// When the feature is disabled, this returns `()`. +#[macro_export] +#[cfg(not(all(unix, feature = "rocksdb")))] +macro_rules! make_rocksdb_provider { + ($provider:expr) => { + () + }; +} + +/// Creates a `RocksDB` batch argument from a `RocksDB` provider. +/// +/// When the `rocksdb` feature is enabled, this returns a `RocksDBBatch`. +/// When the feature is disabled, this returns `()`. +#[macro_export] +#[cfg(all(unix, feature = "rocksdb"))] +macro_rules! make_rocksdb_batch_arg { + ($rocksdb:expr) => { + $rocksdb.batch() + }; +} + +/// Creates a `RocksDB` batch argument from a `RocksDB` provider. +/// +/// When the `rocksdb` feature is enabled, this returns a `RocksDBBatch`. +/// When the feature is disabled, this returns `()`. +#[macro_export] +#[cfg(not(all(unix, feature = "rocksdb")))] +macro_rules! make_rocksdb_batch_arg { + ($rocksdb:expr) => { + () + }; +} + diff --git a/crates/storage/provider/src/providers/rocksdb/provider.rs b/crates/storage/provider/src/providers/rocksdb/provider.rs index cc427fcb8b8..b56533839e1 100644 --- a/crates/storage/provider/src/providers/rocksdb/provider.rs +++ b/crates/storage/provider/src/providers/rocksdb/provider.rs @@ -489,6 +489,19 @@ impl RocksDBProvider { Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData }) } + /// Creates an iterator starting from the given key (inclusive). + /// + /// Returns decoded `(Key, Value)` pairs in key order, starting from the given key. + pub fn iter_from(&self, key: T::Key) -> ProviderResult> { + let cf = self.get_cf_handle::()?; + let encoded_key = key.encode(); + let iter = self + .0 + .db + .iterator_cf(cf, IteratorMode::From(encoded_key.as_ref(), rocksdb::Direction::Forward)); + Ok(RocksDBIter { inner: iter, _marker: std::marker::PhantomData }) + } + /// Writes a batch of operations atomically. pub fn write_batch(&self, f: F) -> ProviderResult<()> where @@ -841,6 +854,40 @@ impl<'a> RocksDBBatch<'a> { Ok(()) } + + /// Gets a value from the specified table via the underlying provider. + /// + /// Note: This reads from committed state, not pending batch writes. + pub fn get(&self, key: T::Key) -> ProviderResult> { + self.provider.get::(key) + } + + /// Collects all shards for a given key prefix from committed `RocksDB` state. + /// + /// This iterates through entries starting from `start_key` and collects all + /// entries where `key_matches` returns true. + pub fn collect_shards_for_key( + &self, + start_key: T::Key, + key_matches: impl Fn(&T::Key) -> bool, + ) -> ProviderResult> + where + T: Table, + T::Value: Clone, + { + let mut result = Vec::new(); + let iter = self.provider.iter_from::(start_key)?; + + for item in iter { + let (key, value) = item?; + if !key_matches(&key) { + break; + } + result.push((key, value)); + } + + Ok(result) + } } /// `RocksDB` transaction wrapper providing MDBX-like semantics.