Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a064f9d
feat(storage): wire RocksDB into history lookups via EitherReader
yongkangc Jan 9, 2026
a9e2498
refactor(provider): simplify EitherReader and encapsulate RocksDB logic
yongkangc Jan 12, 2026
e78e65f
fix: use PhantomData in EitherReader to capture lifetime 'a
yongkangc Jan 12, 2026
4e61448
fix: clippy warnings and fmt issues
yongkangc Jan 14, 2026
2ef4280
refactor(provider): extract compute_history_rank helper to reduce dup…
yongkangc Jan 16, 2026
8fc9715
fix: remove unused PhantomData in test EitherReader::Database constru…
yongkangc Jan 16, 2026
5614ebe
fix: use proper shard logic for history indices in RocksDB write_bloc…
yongkangc Jan 14, 2026
467bb35
refactor: revert either_writer.rs to main's implementation
yongkangc Jan 16, 2026
f041bd1
revert: remove changes to database/provider.rs
yongkangc Jan 16, 2026
23b4541
fix: improve RocksDB shard handling and cache provider in historical …
yongkangc Jan 16, 2026
17a9c13
refactor: extract fallback helper in RocksTx::history_info and improv…
yongkangc Jan 16, 2026
6204969
refactor: remove RocksDBProvider caching from HistoricalStateProviderRef
yongkangc Jan 16, 2026
08e58b9
fix: add debug_assert for one-append-per-key invariant in RocksDBBatch
yongkangc Jan 16, 2026
1e9abd3
fix: add backticks for clippy doc_markdown lint
yongkangc Jan 16, 2026
15f4012
chore: remove unused debug_assertions fields from RocksDBBatch
yongkangc Jan 16, 2026
8fa0540
feat(stages): add RocksDB support for account history index stage
yongkangc Jan 16, 2026
972e0cd
fix: wrap batch with Some() in EitherWriter tests
yongkangc Jan 16, 2026
c96e4b3
fix: prefix unused rocksdb variables with underscore
yongkangc Jan 16, 2026
3e2d431
chore: remove unnecessary changes from account history PR
yongkangc Jan 16, 2026
2dae62f
refactor: revert RocksBatchArg to non-Option type
yongkangc Jan 16, 2026
5a53918
refactor: use HashSet instead of BTreeSet for address deduplication
yongkangc Jan 16, 2026
4c59195
refactor: remove debug_assertions tracking from RocksDBBatch
yongkangc Jan 16, 2026
9e47260
refactor: replace register_rocksdb_batch macro with EitherWriter method
yongkangc Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 34 additions & 9 deletions crates/stages/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use crate::stages::utils::collect_history_indices;
use crate::stages::utils::{
collect_account_history_indices, collect_history_indices, load_accounts_history_indices,
unwind_accounts_history_shards,
};

use super::{collect_account_history_indices, load_history_indices};
use alloy_primitives::Address;
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider};
use std::fmt::Debug;
use tracing::info;

Expand Down Expand Up @@ -51,9 +55,11 @@ where
+ HistoryWriter
+ PruneCheckpointReader
+ PruneCheckpointWriter
+ reth_storage_api::ChangeSetReader
+ reth_provider::StaticFileProviderFactory
+ StorageSettingsCache,
+ ChangeSetReader
+ StaticFileProviderFactory
+ StorageSettingsCache
+ NodePrimitivesProvider
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
Expand Down Expand Up @@ -125,7 +131,7 @@ where
};

info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
load_accounts_history_indices(
provider,
collector,
first_sync,
Expand All @@ -146,9 +152,28 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_account_history_indices_range(range)?;
// Get changed accounts for the unwind range
let mut addresses = std::collections::HashSet::new();
for block in *range.start()..=*range.end() {
let changesets = provider.account_block_changeset(block)?;
addresses.extend(changesets.into_iter().map(|cs| cs.address));
}

// Create EitherWriter for unwinding
#[allow(clippy::let_unit_value)]
let _rocksdb = reth_provider::make_rocksdb_provider!(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = reth_provider::make_rocksdb_batch_arg!(_rocksdb);
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;

// Unwind shards for each changed address
for address in addresses {
unwind_accounts_history_shards(&mut writer, address, *range.start())?;
}

// Register batch for commit
writer.register_for_commit(provider);

// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
Expand Down
160 changes: 157 additions & 3 deletions crates/stages/stages/src/stages/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey},
table::{Decompress, Table},
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
use reth_etl::Collector;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError,
StaticFileProviderFactory,
providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_stages_api::StageError;
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider};
use reth_storage_errors::provider::ProviderResult;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;

Expand Down Expand Up @@ -356,3 +359,154 @@ where
segment,
})
}

/// Loads account history indices from a collector into the provider using [`EitherWriter`].
///
/// This function handles both MDBX and `RocksDB` storage backends transparently.
pub(crate) fn load_accounts_history_indices<Provider>(
provider: &Provider,
mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(Address, u64) -> ShardedKey<Address>,
decode_key: impl Fn(Vec<u8>) -> Result<ShardedKey<Address>, DatabaseError>,
get_partial: impl Fn(ShardedKey<Address>) -> Address,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ NodePrimitivesProvider
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
// Create EitherWriter for accounts history
#[allow(clippy::let_unit_value)]
let _rocksdb = reth_provider::make_rocksdb_provider!(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = reth_provider::make_rocksdb_batch_arg!(_rocksdb);
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;

let mut current_partial = Address::default();
let mut current_list = Vec::<u64>::new();

// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);

for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;

if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
}

let partial_key = get_partial(sharded_key);

if current_partial != partial_key {
// Flush the last shard for the previous key (skip if empty, e.g. first iteration)
if !current_list.is_empty() {
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;
}

current_partial = partial_key;
current_list.clear();

// If not first sync, merge with existing last shard
if !append_only &&
let Some(existing) =
writer.seek_last_shard(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(existing.iter());
}
}

current_list.extend(new_list.iter());
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}

// Flush remaining
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;

// Register for commit
writer.register_for_commit(provider);

Ok(())
}

/// Shard and insert account history indices according to [`LoadMode`] and list length.
fn load_accounts_history_shard<N: NodePrimitives>(
writer: &mut EitherWriter<
'_,
impl DbCursorRO<tables::AccountsHistory> + DbCursorRW<tables::AccountsHistory>,
N,
>,
partial_key: Address,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(Address, BlockNumber) -> ShardedKey<Address>,
append_only: bool,
mode: LoadMode,
) -> ProviderResult<()> {
if list.len() > NUM_OF_INDICES_IN_SHARD || mode.is_flush() {
let chunks: Vec<Vec<u64>> =
list.chunks(NUM_OF_INDICES_IN_SHARD).map(|chunk| chunk.to_vec()).collect();

let mut iter = chunks.into_iter().peekable();
while let Some(chunk) = iter.next() {
let mut highest = *chunk.last().expect("at least one index");

if !mode.is_flush() && iter.peek().is_none() {
*list = chunk;
} else {
if iter.peek().is_none() {
highest = u64::MAX;
}
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);

if append_only {
writer.append_account_history(key, value)?;
} else {
writer.upsert_account_history(key, value)?;
}
}
}
}

Ok(())
}

/// Unwinds account history shards for a given address starting from a block number.
///
/// This handles both MDBX and `RocksDB` backends through the [`EitherWriter`] abstraction.
pub(crate) fn unwind_accounts_history_shards<N: NodePrimitives>(
writer: &mut EitherWriter<
'_,
impl DbCursorRO<tables::AccountsHistory> + DbCursorRW<tables::AccountsHistory>,
N,
>,
address: Address,
from_block: BlockNumber,
) -> ProviderResult<()> {
writer.unwind_account_history_shards(address, from_block)
}
6 changes: 6 additions & 0 deletions crates/storage/db-api/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,15 @@ pub trait DbCursorRO<T: Table> {

/// A read-only cursor over the dup table `T`.
pub trait DbDupCursorRO<T: DupSort> {
/// Positions the cursor at the prev KV pair of the table, returning it.
fn prev_dup(&mut self) -> PairResult<T>;

/// Positions the cursor at the next KV pair of the table, returning it.
fn next_dup(&mut self) -> PairResult<T>;

/// Positions the cursor at the last duplicate value of the current key.
fn last_dup(&mut self) -> ValueOnlyResult<T>;

/// Positions the cursor at the next KV pair of the table, skipping duplicates.
fn next_no_dup(&mut self) -> PairResult<T>;

Expand Down
12 changes: 12 additions & 0 deletions crates/storage/db-api/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,18 @@ impl<T: DupSort> DbDupCursorRO<T> for CursorMock {
Ok(None)
}

/// Moves to the previous duplicate entry.
/// **Mock behavior**: Always returns `None`.
fn prev_dup(&mut self) -> PairResult<T> {
Ok(None)
}

/// Moves to the last duplicate entry.
/// **Mock behavior**: Always returns `None`.
fn last_dup(&mut self) -> ValueOnlyResult<T> {
Ok(None)
}

/// Moves to the next entry with a different key.
/// **Mock behavior**: Always returns `None`.
fn next_no_dup(&mut self) -> PairResult<T> {
Expand Down
14 changes: 14 additions & 0 deletions crates/storage/db/src/implementation/mdbx/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,25 @@ impl<K: TransactionKind, T: Table> DbCursorRO<T> for Cursor<K, T> {
}

impl<K: TransactionKind, T: DupSort> DbDupCursorRO<T> for Cursor<K, T> {
/// Returns the previous `(key, value)` pair of a DUPSORT table.
fn prev_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.prev_dup())
}

/// Returns the next `(key, value)` pair of a DUPSORT table.
fn next_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_dup())
}

/// Returns the last `value` of the current duplicate `key`.
fn last_dup(&mut self) -> ValueOnlyResult<T> {
self.inner
.last_dup()
.map_err(|e| DatabaseError::Read(e.into()))?
.map(decode_one::<T>)
.transpose()
}

/// Returns the next `(key, value)` pair skipping the duplicates.
fn next_no_dup(&mut self) -> PairResult<T> {
decode::<T>(self.inner.next_nodup())
Expand Down
Loading
Loading