Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f500a60
feat(storage): wire RocksDB into history lookups via EitherReader
yongkangc Jan 9, 2026
ce83f24
refactor(provider): simplify EitherReader and encapsulate RocksDB logic
yongkangc Jan 12, 2026
a5166e5
fix: use PhantomData in EitherReader to capture lifetime 'a
yongkangc Jan 12, 2026
a5b7df0
fix: clippy warnings and fmt issues
yongkangc Jan 14, 2026
5c83f72
refactor(provider): extract compute_history_rank helper to reduce dup…
yongkangc Jan 16, 2026
3a8dc3e
refactor: use compute_history_rank in history_info to reduce duplication
yongkangc Jan 16, 2026
8a65700
fix: remove unused PhantomData in test EitherReader::Database constru…
yongkangc Jan 16, 2026
98ff8a1
fix(rocksdb): treat empty RocksDB tables as first-run scenario
yongkangc Jan 13, 2026
819c32d
fix(rocksdb): handle sentinel-only entries in consistency check
yongkangc Jan 13, 2026
fdd7776
fix: use proper shard logic for history indices in RocksDB write_bloc…
yongkangc Jan 14, 2026
6892124
refactor: revert either_writer.rs to main's implementation
yongkangc Jan 16, 2026
2ee1ca2
revert: remove changes to database/provider.rs
yongkangc Jan 16, 2026
96932ca
fix: improve RocksDB shard handling and cache provider in historical …
yongkangc Jan 16, 2026
60d7439
refactor: extract fallback helper in RocksTx::history_info and improv…
yongkangc Jan 16, 2026
a8a7e33
refactor: remove RocksDBProvider caching from HistoricalStateProviderRef
yongkangc Jan 16, 2026
50e5fbb
fix: add debug_assert for one-append-per-key invariant in RocksDBBatch
yongkangc Jan 16, 2026
3d8e626
fix: add backticks for clippy doc_markdown lint
yongkangc Jan 16, 2026
4418697
chore: remove unused debug_assertions fields from RocksDBBatch
yongkangc Jan 16, 2026
d2af6c5
Update crates/storage/provider/src/providers/rocksdb/provider.rs
yongkangc Jan 16, 2026
070ab22
feat(cli): add rocksdb feature flag to reth binary
yongkangc Jan 16, 2026
c8af44c
fix: address joshieDo's review comments
yongkangc Jan 16, 2026
280f62c
fix: remove unnecessary edge feature additions
yongkangc Jan 16, 2026
bea16c0
refactor: remove debug_assertions tracking from RocksDBBatch
yongkangc Jan 16, 2026
6fc4f39
Apply suggestion from @yongkangc
yongkangc Jan 16, 2026
1f25e0d
Apply suggestion from @yongkangc
yongkangc Jan 16, 2026
84c4138
refactor: derive Debug for HistoricalStateProviderRef
yongkangc Jan 16, 2026
7e0c050
Update crates/storage/provider/src/providers/rocksdb/provider.rs
yongkangc Jan 16, 2026
60e35bd
Update crates/storage/provider/src/providers/rocksdb/provider.rs
yongkangc Jan 16, 2026
771ff64
fix: restore manual Debug impl for HistoricalStateProviderRef, format…
yongkangc Jan 16, 2026
607b017
refactor: use derive(Debug) for HistoricalStateProviderRef
yongkangc Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter,
LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError,
PruneCheckpointReader, PruneCheckpointWriter, RawRocksDBBatch, RevertsInit, RocksBatchArg,
RocksDBProviderFactory, RocksTxRefArg, StageCheckpointReader, StateProviderBox, StateWriter,
RocksDBProviderFactory, StageCheckpointReader, StateProviderBox, StateWriter,
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieWriter,
};
Expand Down Expand Up @@ -884,25 +884,6 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
pub fn chain_spec(&self) -> &N::ChainSpec {
&self.chain_spec
}

/// Executes a closure with a `RocksDB` transaction for reading.
///
/// This helper encapsulates all the cfg-gated `RocksDB` transaction handling for reads.
fn with_rocksdb_tx<F, R>(&self, f: F) -> ProviderResult<R>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this to rocksdb_provider.rs trait so so all implementors get it for free without duplicating the cfg-gated boilerplate

where
F: FnOnce(RocksTxRefArg<'_>) -> ProviderResult<R>,
{
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = self.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_tx = rocksdb.tx();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_tx_ref = &rocksdb_tx;
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_tx_ref = ();

f(rocksdb_tx_ref)
}
}

impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub use static_file::{
mod state;
pub use state::{
historical::{
history_info, needs_prev_shard_check, HistoricalStateProvider, HistoricalStateProviderRef,
HistoryInfo, LowestAvailableBlocks,
compute_history_rank, history_info, needs_prev_shard_check, HistoricalStateProvider,
HistoricalStateProviderRef, HistoryInfo, LowestAvailableBlocks,
},
latest::{LatestStateProvider, LatestStateProviderRef},
overlay::{OverlayStateProvider, OverlayStateProviderFactory},
Expand Down
170 changes: 134 additions & 36 deletions crates/storage/provider/src/providers/rocksdb/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,7 @@ impl RocksDBProvider {
self.prune_transaction_hash_numbers_in_range(provider, 0..=highest_tx)?;
}
(None, None) => {
// Both MDBX and static files are empty.
// If checkpoint says we should have data, that's an inconsistency.
if checkpoint > 0 {
tracing::warn!(
target: "reth::providers::rocksdb",
checkpoint,
"Checkpoint set but no transaction data exists, unwind needed"
);
return Ok(Some(0));
}
// Both MDBX and static files are empty, nothing to check.
}
}

Expand Down Expand Up @@ -263,16 +254,27 @@ impl RocksDBProvider {
}

// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::StoragesHistory>()? {
let (key, _) = result?;
let highest = key.sharded_key.highest_block_number;
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
}
}
}

// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}

// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
Expand All @@ -296,11 +298,7 @@ impl RocksDBProvider {
Ok(None)
}
None => {
// Empty RocksDB table
if checkpoint > 0 {
// Stage says we should have data but we don't
return Ok(Some(0));
}
// Empty RocksDB table, nothing to check.
Ok(None)
}
}
Expand Down Expand Up @@ -377,16 +375,27 @@ impl RocksDBProvider {
}

// Find the max highest_block_number (excluding u64::MAX sentinel) across all
// entries
// entries. Also track if we found any non-sentinel entries.
let mut max_highest_block = 0u64;
let mut found_non_sentinel = false;
for result in self.iter::<tables::AccountsHistory>()? {
let (key, _) = result?;
let highest = key.highest_block_number;
if highest != u64::MAX && highest > max_highest_block {
max_highest_block = highest;
if highest != u64::MAX {
found_non_sentinel = true;
if highest > max_highest_block {
max_highest_block = highest;
}
}
}

// If all entries are sentinel entries (u64::MAX), treat as first-run scenario.
// This means no completed shards exist (only sentinel shards with
// highest_block_number=u64::MAX), so no actual history has been indexed.
if !found_non_sentinel {
return Ok(None);
}

// If any entry has highest_block > checkpoint, prune excess
if max_highest_block > checkpoint {
tracing::info!(
Expand All @@ -413,11 +422,7 @@ impl RocksDBProvider {
Ok(None)
}
None => {
// Empty RocksDB table
if checkpoint > 0 {
// Stage says we should have data but we don't
return Ok(Some(0));
}
// Empty RocksDB table, nothing to check.
Ok(None)
}
}
Expand Down Expand Up @@ -542,7 +547,7 @@ mod tests {
}

#[test]
fn test_check_consistency_empty_rocksdb_with_checkpoint_needs_unwind() {
fn test_check_consistency_empty_rocksdb_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::TransactionHashNumbers>()
Expand All @@ -566,10 +571,10 @@ mod tests {

let provider = factory.database_provider_ro().unwrap();

// RocksDB is empty but checkpoint says block 100 was processed
// This means RocksDB is missing data and we need to unwind to rebuild
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild RocksDB");
assert_eq!(result, None, "Empty data with checkpoint is treated as first run");
}

#[test]
Expand Down Expand Up @@ -650,7 +655,7 @@ mod tests {
}

#[test]
fn test_check_consistency_storages_history_empty_with_checkpoint_needs_unwind() {
fn test_check_consistency_storages_history_empty_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
Expand All @@ -674,9 +679,10 @@ mod tests {

let provider = factory.database_provider_ro().unwrap();

// RocksDB is empty but checkpoint says block 100 was processed
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild StoragesHistory");
assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run");
}

#[test]
Expand Down Expand Up @@ -978,6 +984,97 @@ mod tests {
);
}

#[test]
fn test_check_consistency_storages_history_sentinel_only_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::StoragesHistory>()
.build()
.unwrap();

// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
// This simulates a scenario where history tracking started but no shards were completed
let key_sentinel_1 = StorageShardedKey::new(Address::ZERO, B256::ZERO, u64::MAX);
let key_sentinel_2 = StorageShardedKey::new(Address::random(), B256::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::StoragesHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::StoragesHistory>(key_sentinel_2, &block_list).unwrap();

// Verify entries exist (not empty table)
assert!(rocksdb.first::<tables::StoragesHistory>().unwrap().is_some());

// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_storages_history_in_rocksdb(true),
);

// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexStorageHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}

let provider = factory.database_provider_ro().unwrap();

// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}

#[test]
fn test_check_consistency_accounts_history_sentinel_only_with_checkpoint_is_first_run() {
use reth_db_api::models::ShardedKey;

let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
.build()
.unwrap();

// Insert ONLY sentinel entries (highest_block_number = u64::MAX)
let key_sentinel_1 = ShardedKey::new(Address::ZERO, u64::MAX);
let key_sentinel_2 = ShardedKey::new(Address::random(), u64::MAX);
let block_list = BlockNumberList::new_pre_sorted([10, 20, 30]);
rocksdb.put::<tables::AccountsHistory>(key_sentinel_1, &block_list).unwrap();
rocksdb.put::<tables::AccountsHistory>(key_sentinel_2, &block_list).unwrap();

// Verify entries exist (not empty table)
assert!(rocksdb.first::<tables::AccountsHistory>().unwrap().is_some());

// Create a test provider factory for MDBX
let factory = create_test_provider_factory();
factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);

// Set a checkpoint indicating we should have processed up to block 100
{
let provider = factory.database_provider_rw().unwrap();
provider
.save_stage_checkpoint(StageId::IndexAccountHistory, StageCheckpoint::new(100))
.unwrap();
provider.commit().unwrap();
}

let provider = factory.database_provider_ro().unwrap();

// RocksDB has only sentinel entries (no completed shards) but checkpoint is set.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(
result, None,
"Sentinel-only entries with checkpoint should be treated as first run"
);
}

#[test]
fn test_check_consistency_storages_history_behind_checkpoint_single_entry() {
use reth_db_api::models::storage_sharded_key::StorageShardedKey;
Expand Down Expand Up @@ -1135,7 +1232,7 @@ mod tests {
}

#[test]
fn test_check_consistency_accounts_history_empty_with_checkpoint_needs_unwind() {
fn test_check_consistency_accounts_history_empty_with_checkpoint_is_first_run() {
let temp_dir = TempDir::new().unwrap();
let rocksdb = RocksDBBuilder::new(temp_dir.path())
.with_table::<tables::AccountsHistory>()
Expand All @@ -1159,9 +1256,10 @@ mod tests {

let provider = factory.database_provider_ro().unwrap();

// RocksDB is empty but checkpoint says block 100 was processed
// RocksDB is empty but checkpoint says block 100 was processed.
// This is treated as a first-run/migration scenario - no unwind needed.
let result = rocksdb.check_consistency(&provider).unwrap();
assert_eq!(result, Some(0), "Should require unwind to block 0 to rebuild AccountsHistory");
assert_eq!(result, None, "Empty RocksDB with checkpoint is treated as first run");
}

#[test]
Expand Down
Loading
Loading