Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
Expand Down Expand Up @@ -151,7 +151,7 @@ where
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;

provider_rw.save_blocks(blocks)?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.commit()?;
}

Expand Down
8 changes: 6 additions & 2 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader,
};
use reth_stages::{StageCheckpoint, StageId};
Expand Down Expand Up @@ -228,7 +228,11 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());

// finally, write the receipts
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(
&execution_outcome,
OriginalValuesKnown::Yes,
StateWriteConfig::default(),
)?;
}

// Only commit if we have imported as many receipts as the number of transactions.
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockReader, DBProvider, EitherWriter, ExecutionOutcome, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
Expand Down Expand Up @@ -463,7 +463,7 @@ where
}

// write output
provider.write_state(&state, OriginalValuesKnown::Yes)?;
provider.write_state(&state, OriginalValuesKnown::Yes, StateWriteConfig::default())?;

let db_write_duration = time.elapsed();
debug!(
Expand Down
182 changes: 174 additions & 8 deletions crates/stages/stages/src/stages/index_account_history.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
use crate::stages::utils::collect_history_indices;

use super::{collect_account_history_indices, load_history_indices};
use alloy_primitives::Address;
use super::{collect_account_history_indices, load_accounts_history_indices};
use alloy_primitives::{Address, BlockNumber};
use reth_config::config::{EtlConfig, IndexHistoryConfig};
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
use reth_db_api::{
cursor::DbCursorRO,
models::ShardedKey,
table::Decode,
tables,
transaction::{DbTx, DbTxMut},
};
use reth_provider::{
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
make_rocksdb_batch_arg, make_rocksdb_provider, register_rocksdb_batch, DBProvider,
EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
RocksDBProviderFactory, StorageSettingsCache,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_storage_api::NodePrimitivesProvider;
use std::fmt::Debug;
use tracing::info;

Expand Down Expand Up @@ -53,7 +62,9 @@ where
+ PruneCheckpointWriter
+ reth_storage_api::ChangeSetReader
+ reth_provider::StaticFileProviderFactory
+ StorageSettingsCache,
+ StorageSettingsCache
+ NodePrimitivesProvider
+ RocksDBProviderFactory,
{
/// Return the id of the stage
fn id(&self) -> StageId {
Expand Down Expand Up @@ -125,7 +136,7 @@ where
};

info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
load_history_indices::<_, tables::AccountsHistory, _>(
load_accounts_history_indices(
provider,
collector,
first_sync,
Expand All @@ -146,9 +157,40 @@ where
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);

provider.unwind_account_history_indices_range(range)?;
// Create EitherWriter for account history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;

// Read changesets to identify what to unwind
let changesets = provider
.tx_ref()
.cursor_read::<tables::AccountChangeSets>()?
.walk_range(range)?
.collect::<Result<Vec<_>, _>>()?;

// Group by address and find minimum block for each
// We only need to unwind once per address using the LOWEST block number
// since unwind removes all indices >= that block
let mut account_keys: std::collections::HashMap<Address, BlockNumber> =
std::collections::HashMap::new();
for (block_number, account) in changesets {
account_keys
.entry(account.address)
.and_modify(|min_bn| *min_bn = (*min_bn).min(block_number))
.or_insert(block_number);
}

// Unwind each account's history shards (once per unique address)
for (address, min_block) in account_keys {
super::utils::unwind_accounts_history_shards(&mut writer, address, min_block)?;
}

// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);

// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_progress) })
}
}
Expand Down Expand Up @@ -647,3 +689,127 @@ mod tests {
}
}
}

#[cfg(all(test, unix, feature = "rocksdb"))]
mod rocksdb_stage_tests {
use super::*;
use crate::test_utils::TestStageDB;
use reth_db_api::tables;
use reth_provider::{DatabaseProviderFactory, RocksDBProviderFactory};
use reth_storage_api::StorageSettings;

/// Test that `IndexAccountHistoryStage` writes to `RocksDB` when enabled.
#[test]
fn test_index_account_history_writes_to_rocksdb() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);

// Setup changesets (blocks 1-10, skip 0 to avoid genesis edge case)
db.commit(|tx| {
for block in 1..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
reth_db_api::models::StoredBlockBodyIndices {
tx_count: 3,
..Default::default()
},
)?;
tx.put::<tables::AccountChangeSets>(
block,
reth_db_api::models::AccountBeforeTx {
address: alloy_primitives::address!(
"0x0000000000000000000000000000000000000001"
),
info: None,
},
)?;
}
Ok(())
})
.unwrap();

// Execute stage from checkpoint 0 (will process blocks 1-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(0)) };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();

// Verify data is in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let count =
rocksdb.iter::<tables::AccountsHistory>().unwrap().filter_map(|r| r.ok()).count();
assert!(count > 0, "Expected data in RocksDB, found {count} entries");

// Verify MDBX AccountsHistory is empty (data went to RocksDB)
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
assert!(mdbx_table.is_empty(), "MDBX should be empty when RocksDB is enabled");
}

/// Test that `IndexAccountHistoryStage` unwind clears `RocksDB` data.
#[test]
fn test_index_account_history_unwind_clears_rocksdb() {
let db = TestStageDB::default();
db.factory.set_storage_settings_cache(
StorageSettings::legacy().with_account_history_in_rocksdb(true),
);

// Setup changesets (blocks 1-10, skip 0 to avoid genesis edge case)
db.commit(|tx| {
for block in 1..=10u64 {
tx.put::<tables::BlockBodyIndices>(
block,
reth_db_api::models::StoredBlockBodyIndices {
tx_count: 3,
..Default::default()
},
)?;
tx.put::<tables::AccountChangeSets>(
block,
reth_db_api::models::AccountBeforeTx {
address: alloy_primitives::address!(
"0x0000000000000000000000000000000000000001"
),
info: None,
},
)?;
}
Ok(())
})
.unwrap();

// Execute stage from checkpoint 0 (will process blocks 1-10)
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(0)) };
let mut stage = IndexAccountHistoryStage::default();
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.execute(&provider, input).unwrap();
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
provider.commit().unwrap();

// Verify data exists in RocksDB
let rocksdb = db.factory.rocksdb_provider();
let before_count =
rocksdb.iter::<tables::AccountsHistory>().unwrap().filter_map(|r| r.ok()).count();
assert!(before_count > 0, "Expected data in RocksDB before unwind");

// Unwind to block 0 (removes blocks 1-10, leaving nothing)
let unwind_input = UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 0,
..Default::default()
};
let provider = db.factory.database_provider_rw().unwrap();
let out = stage.unwind(&provider, unwind_input).unwrap();
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(0) });
provider.commit().unwrap();

// Verify RocksDB is cleared (no block 0 data exists)
let rocksdb = db.factory.rocksdb_provider();
let after_count =
rocksdb.iter::<tables::AccountsHistory>().unwrap().filter_map(|r| r.ok()).count();
assert_eq!(after_count, 0, "RocksDB should be empty after unwind to 0");
}
}
Loading