Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
afff1e5
feat(stages): add RocksDB support for IndexAccountHistoryStage
yongkangc Jan 17, 2026
0d34fab
feat(rocksdb): implement clear method for efficient table clearing
yongkangc Jan 18, 2026
174b494
refactor(stages): extract history index loading into reusable utilities
yongkangc Jan 18, 2026
a9aaef4
refactor(stages): simplify account history loading by removing trait …
yongkangc Jan 18, 2026
84a24e3
refactor(stages): use EitherWriter for account history loading
yongkangc Jan 18, 2026
2885c3f
refactor: extract HistoryIndexWriter trait and fix review feedback
yongkangc Jan 18, 2026
17e9247
fix: clippy and fmt issues
yongkangc Jan 18, 2026
e28384a
refactor: switch to closure-based approach with RefCell for load_hist…
yongkangc Jan 18, 2026
b982120
feat(stages): add RocksDB support for IndexAccountHistoryStage
yongkangc Jan 18, 2026
8b98200
refactor: improve RocksDB clear semantics and shard flush logic
gakonst Jan 18, 2026
25d8fe8
fix: type mismatch in delete_range_cf call
gakonst Jan 18, 2026
fa6ec8b
fix: add backticks to RocksDB in docstring for clippy::doc_markdown
gakonst Jan 18, 2026
b140e38
fix: restore original shard flush behavior to keep last chunk buffered
gakonst Jan 18, 2026
40759b4
fix: use is_multiple_of per clippy::manual_is_multiple_of
gakonst Jan 18, 2026
36616fa
chore: restore comments in load_history_indices
yongkangc Jan 18, 2026
977aecf
docs: add inline comments to account history loader functions
yongkangc Jan 18, 2026
461388e
feat(stages): add RocksDB unwind support for IndexAccountHistoryStage
yongkangc Jan 18, 2026
adec46d
Apply suggestion from @yongkangc
yongkangc Jan 18, 2026
f1a540f
Apply suggestion from @yongkangc
yongkangc Jan 18, 2026
f9a2910
Apply suggestion from @yongkangc
yongkangc Jan 18, 2026
0d33a25
docs: add inline comments to account_history_shards function
yongkangc Jan 18, 2026
297f094
test: add multi-shard unwind tests for account history
yongkangc Jan 18, 2026
4bfa952
refactor: remove unused HashMap import in DatabaseProvider's HistoryW…
yongkangc Jan 18, 2026
3041ed1
fix: handle block 0 unwind edge case and optimize RocksDB account his…
yongkangc Jan 19, 2026
d4a49ef
fix: resolve redundant_else clippy lint
yongkangc Jan 19, 2026
da97261
perf: optimize RocksDB account history unwind
yongkangc Jan 19, 2026
9cbb74b
Delete DIFFERENTIAL_REVIEW_PR_21175.md
yongkangc Jan 19, 2026
290c59d
fix(stages): fail fast when rocksdb config enabled but feature not co…
yongkangc Jan 19, 2026
fc5bba8
fix: conditionally import ProviderError for non-rocksdb builds
yongkangc Jan 19, 2026
e6b175a
chore: remove feature gate check from unwind per review
yongkangc Jan 19, 2026
71f09ba
refactor: reduce #cfg annotations with helper patterns
yongkangc Jan 19, 2026
78de393
refactor: move RocksDB account history unwind logic to RocksDBProvider
yongkangc Jan 19, 2026
4a0f162
fix: clippy doc-markdown warnings
yongkangc Jan 19, 2026
15de723
fix: clippy redundant_else and unused_unit warnings
yongkangc Jan 19, 2026
b04ed80
refactor: simplify RocksDB helpers by removing _if variants
yongkangc Jan 19, 2026
14039f9
refactor: use with_rocksdb_batch trait method in stage
yongkangc Jan 19, 2026
8f74032
chore: remove dead batch() stub method
yongkangc Jan 19, 2026
9b80295
refactor: rename load_account_history_via_writer to load_account_history
yongkangc Jan 20, 2026
1e86bd1
refactor: move RocksDB safety comment inside use_rocksdb branch
yongkangc Jan 20, 2026
ab3e275
refactor: simplify unwind_account_history_indices with else branch
yongkangc Jan 20, 2026
0a63ee9
perf: use fixed range for RocksDB clear instead of seeking last key
yongkangc Jan 20, 2026
7b2d1f3
fix: increase RocksDB clear range to 256 bytes and document batch.get…
yongkangc Jan 20, 2026
3ebf3ea
Update crates/storage/provider/src/providers/rocksdb/provider.rs
joshieDo Jan 20, 2026
ffc9180
fix: restore deleted comment per review feedback
yongkangc Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 192 additions & 18 deletions crates/stages/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::stages::utils::collect_history_indices;

use super::{collect_account_history_indices, load_history_indices};
use alloy_primitives::Address;
use super::collect_account_history_indices;
use crate::stages::utils::{collect_history_indices, load_account_history};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_db_api::{models::ShardedKey, tables, transaction::DbTxMut};
use reth_provider::{
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
Expand Down Expand Up @@ -53,7 +52,8 @@ where
+ PruneCheckpointWriter
+ reth_storage_api::ChangeSetReader
+ reth_provider::StaticFileProviderFactory
+ StorageSettingsCache,
+ StorageSettingsCache
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
Expand Down Expand Up @@ -101,15 +101,25 @@ where

let mut range = input.next_block_range();
let first_sync = input.checkpoint().block_number == 0;
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;

// On first sync we might have history coming from genesis. We clear the table since it's
// faster to rebuild from scratch.
if first_sync {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
if use_rocksdb {
// Note: RocksDB clear() executes immediately (not deferred to commit like MDBX),
// but this is safe for first_sync because if we crash before commit, the
// checkpoint stays at 0 and we'll just clear and rebuild again on restart. The
// source data (changesets) is intact.
#[cfg(all(unix, feature = "rocksdb"))]
provider.rocksdb_provider().clear::<tables::AccountsHistory>()?;
} else {
provider.tx_ref().clear::<tables::AccountsHistory>()?;
}
range = 0..=*input.next_block_range().end();
}

info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices");
info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");

let collector = if provider.cached_storage_settings().account_changesets_in_static_files {
// Use the provider-based collection that can read from static files.
Expand All @@ -125,14 +135,13 @@ where
};

info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
provider,
collector,
first_sync,
ShardedKey::new,
ShardedKey::<Address>::decode_owned,
|key| key.key,
)?;

provider.with_rocksdb_batch(|rocksdb_batch| {
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
load_account_history(collector, first_sync, &mut writer)
.map_err(|e| reth_provider::ProviderError::other(Box::new(e)))?;
Ok(((), writer.into_raw_rocksdb_batch()))
})?;

Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
}
Expand Down Expand Up @@ -160,7 +169,7 @@ mod tests {
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestStageDB, UnwindStageTestRunner,
};
use alloy_primitives::{address, BlockNumber, B256};
use alloy_primitives::{address, Address, BlockNumber, B256};
use itertools::Itertools;
use reth_db_api::{
cursor::DbCursorRO,
Expand Down Expand Up @@ -646,4 +655,169 @@ mod tests {
Ok(())
}
}

#[cfg(all(unix, feature = "rocksdb"))]
mod rocksdb_tests {
use super::*;
use reth_provider::RocksDBProviderFactory;
use reth_storage_api::StorageSettings;

/// Test that when `account_history_in_rocksdb` is enabled, the stage
/// writes account history indices to `RocksDB` instead of MDBX.
#[tokio::test]
async fn execute_writes_to_rocksdb_when_enabled() {
// init
let db = TestStageDB::default();

// Enable RocksDB for account history
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);

db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();

let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();

// Verify MDBX table is empty (data should be in RocksDB)
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
assert!(
mdbx_table.is_empty(),
"MDBX AccountsHistory should be empty when RocksDB is enabled"
);

// Verify RocksDB has the data
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should contain account history");

let block_list = result.unwrap();
let blocks: Vec<u64> = block_list.iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}

/// Test that unwind works correctly when `account_history_in_rocksdb` is enabled.
#[tokio::test]
async fn unwind_works_when_rocksdb_enabled() {
let db = TestStageDB::default();

db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);

db.commit(|tx| {
for block in 0..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();

let input = ExecInput { target: Some(10), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();

// Verify RocksDB has blocks 0-10 before unwind
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have data before unwind");
let blocks_before: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks_before, (0..=10).collect::<Vec<_>>());

// Unwind to block 5 (remove blocks 6-10)
let unwind_input =
UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
provider.commit().unwrap();

// Verify RocksDB now only has blocks 0-5 (blocks 6-10 removed)
let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should still have data after unwind");
let blocks_after: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks_after, (0..=5).collect::<Vec<_>>(), "Should only have blocks 0-5");
}

/// Test incremental sync merges new data with existing shards.
#[tokio::test]
async fn execute_incremental_sync() {
let db = TestStageDB::default();

db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);

db.commit(|tx| {
for block in 0..=5 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();

let input = ExecInput { target: Some(5), ..Default::default() };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
provider.commit().unwrap();

let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some());
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());

db.commit(|tx| {
for block in 6..=10 {
tx.put::<tables::BlockBodyIndices>(
block,
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
)?;
tx.put::<tables::AccountChangeSets>(block, acc())?;
}
Ok(())
})
.unwrap();

let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();

let rocksdb = db.factory.rocksdb_provider();
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
assert!(result.is_some(), "RocksDB should have merged data");
let blocks: Vec<u64> = result.unwrap().iter().collect();
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
}
}
}
Loading
Loading