Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 337 additions & 5 deletions crates/stages/stages/src/stages/utils.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
//! Utils for `stages`.
use alloy_primitives::{Address, BlockNumber, TxNumber};
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use reth_config::config::EtlConfig;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey},
models::{
sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
AccountBeforeTx, ShardedKey,
},
table::{Decompress, Table},
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
};
use reth_etl::Collector;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::StaticFileProvider, to_range, BlockReader, DBProvider, ProviderError,
StaticFileProviderFactory,
make_rocksdb_batch_arg, make_rocksdb_provider, providers::StaticFileProvider,
register_rocksdb_batch, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
RocksDBProviderFactory, StaticFileProviderFactory, StorageSettingsCache,
};
use reth_stages_api::StageError;
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::ChangeSetReader;
use reth_storage_api::{ChangeSetReader, NodePrimitivesProvider};
use reth_storage_errors::provider::ProviderResult;
use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
use tracing::info;

Expand Down Expand Up @@ -112,6 +119,46 @@ where
Ok::<(), StageError>(())
}

/// Generic shard-and-write helper used by both account and storage history loaders.
///
/// Chunks the list into shards, writes each shard via the provided write function,
/// and handles the last shard according to [`LoadMode`].
fn shard_and_write<F>(
list: &mut Vec<BlockNumber>,
mode: LoadMode,
mut write_fn: F,
) -> Result<(), StageError>
where
F: FnMut(Vec<u64>, BlockNumber) -> Result<(), StageError>,
{
if list.len() <= NUM_OF_INDICES_IN_SHARD && !mode.is_flush() {
return Ok(());
}

let total = list.len();
let mut start = 0;

while start < total {
let end = (start + NUM_OF_INDICES_IN_SHARD).min(total);
let is_last = end == total;
let chunk = list[start..end].to_vec();

let highest = *chunk.last().expect("at least one index");

if !mode.is_flush() && is_last {
*list = chunk;
break;
}

let highest = if is_last { u64::MAX } else { highest };
write_fn(chunk, highest)?;

start = end;
}

Ok(())
}

/// Collects account history indices using a provider that implements `ChangeSetReader`.
pub(crate) fn collect_account_history_indices<Provider>(
provider: &Provider,
Expand Down Expand Up @@ -179,6 +226,7 @@ where
/// `Address.StorageKey`). It flushes indices to disk when reaching a shard's max length
/// (`NUM_OF_INDICES_IN_SHARD`) or when the partial key changes, ensuring the last previous partial
/// key shard is stored.
#[allow(dead_code)]
pub(crate) fn load_history_indices<Provider, H, P>(
provider: &Provider,
mut collector: Collector<H::Key, H::Value>,
Expand Down Expand Up @@ -263,6 +311,7 @@ where
}

/// Shard and insert the indices list according to [`LoadMode`] and its length.
#[allow(dead_code)]
pub(crate) fn load_indices<H, C, P>(
cursor: &mut C,
partial_key: P,
Expand Down Expand Up @@ -321,6 +370,289 @@ impl LoadMode {
}
}

/// Loads storage history indices from a collector into the database using `EitherWriter`.
///
/// This is a specialized version of [`load_history_indices`] for `tables::StoragesHistory`
/// that supports writing to either `MDBX` or `RocksDB` based on storage settings.
#[allow(dead_code)]
pub(crate) fn load_storages_history_indices<Provider, P>(
provider: &Provider,
mut collector: Collector<
<tables::StoragesHistory as Table>::Key,
<tables::StoragesHistory as Table>::Value,
>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> StorageShardedKey,
decode_key: impl Fn(Vec<u8>) -> Result<StorageShardedKey, DatabaseError>,
get_partial: impl Fn(StorageShardedKey) -> P,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ NodePrimitivesProvider
+ StorageSettingsCache
+ RocksDBProviderFactory,
P: Copy + Default + Eq,
{
// Create EitherWriter for storage history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_storages_history(provider, rocksdb_batch)?;

// Create read cursor for checking existing shards
let mut read_cursor = provider.tx_ref().cursor_read::<tables::StoragesHistory>()?;

let mut current_partial = P::default();
let mut current_list = Vec::<u64>::new();

// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);

for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;

if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing storage history indices");
}

let partial_key = get_partial(sharded_key);

if current_partial != partial_key {
// Flush last shard for previous partial key
load_storages_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;

current_partial = partial_key;
current_list.clear();

// If not first sync, merge with existing shard
if !append_only &&
let Some((_, last_database_shard)) =
read_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}

current_list.extend(new_list.iter());
load_storages_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}

// Flush remaining shard
load_storages_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;

// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);

Ok(())
}

/// Shard and insert storage history indices according to [`LoadMode`] and list length.
#[allow(dead_code)]
fn load_storages_history_shard<P, CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> StorageShardedKey,
_append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
P: Copy,
{
shard_and_write(list, mode, |chunk, highest| {
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
Ok(writer.put_storage_history(key, &value)?)
})
}

/// Loads account history indices from a collector into the database using `EitherWriter`.
///
/// This is a specialized version of [`load_history_indices`] for `tables::AccountsHistory`
/// that supports writing to either `MDBX` or `RocksDB` based on storage settings.
#[allow(dead_code)]
pub(crate) fn load_accounts_history_indices<Provider, P>(
provider: &Provider,
mut collector: Collector<
<tables::AccountsHistory as Table>::Key,
<tables::AccountsHistory as Table>::Value,
>,
append_only: bool,
sharded_key_factory: impl Clone + Fn(P, u64) -> ShardedKey<Address>,
decode_key: impl Fn(Vec<u8>) -> Result<ShardedKey<Address>, DatabaseError>,
get_partial: impl Fn(ShardedKey<Address>) -> P,
) -> Result<(), StageError>
where
Provider: DBProvider<Tx: DbTxMut>
+ NodePrimitivesProvider
+ StorageSettingsCache
+ RocksDBProviderFactory,
P: Copy + Default + Eq,
{
// Create EitherWriter for account history
#[allow(clippy::let_unit_value)]
let rocksdb = make_rocksdb_provider(provider);
#[allow(clippy::let_unit_value)]
let rocksdb_batch = make_rocksdb_batch_arg(&rocksdb);
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;

// Create read cursor for checking existing shards
let mut read_cursor = provider.tx_ref().cursor_read::<tables::AccountsHistory>()?;

let mut current_partial = P::default();
let mut current_list = Vec::<u64>::new();

// observability
let total_entries = collector.len();
let interval = (total_entries / 10).max(1);

for (index, element) in collector.iter()?.enumerate() {
let (k, v) = element?;
let sharded_key = decode_key(k)?;
let new_list = BlockNumberList::decompress_owned(v)?;

if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing account history indices");
}

let partial_key = get_partial(sharded_key);

if current_partial != partial_key {
// Flush last shard for previous partial key
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;

current_partial = partial_key;
current_list.clear();

// If not first sync, merge with existing shard
if !append_only &&
let Some((_, last_database_shard)) =
read_cursor.seek_exact(sharded_key_factory(current_partial, u64::MAX))?
{
current_list.extend(last_database_shard.iter());
}
}

current_list.extend(new_list.iter());
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::KeepLast,
)?;
}

// Flush remaining shard
load_accounts_history_shard(
&mut writer,
current_partial,
&mut current_list,
&sharded_key_factory,
append_only,
LoadMode::Flush,
)?;

// Register RocksDB batch for commit
register_rocksdb_batch(provider, writer);

Ok(())
}

/// Shard and insert account history indices according to [`LoadMode`] and list length.
#[allow(dead_code)]
fn load_accounts_history_shard<P, CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
partial_key: P,
list: &mut Vec<BlockNumber>,
sharded_key_factory: &impl Fn(P, BlockNumber) -> ShardedKey<Address>,
_append_only: bool,
mode: LoadMode,
) -> Result<(), StageError>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
P: Copy,
{
shard_and_write(list, mode, |chunk, highest| {
let key = sharded_key_factory(partial_key, highest);
let value = BlockNumberList::new_pre_sorted(chunk);
Ok(writer.put_account_history(key, &value)?)
})
}

/// Unwinds storage history shards using `EitherWriter` for `RocksDB` support.
///
/// This reimplements the shard unwinding logic with support for both MDBX and `RocksDB`.
/// Walks through shards for a given key, deleting those >= unwind point and preserving
/// indices below the unwind point.
#[allow(dead_code)]
pub(crate) fn unwind_storages_history_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
address: Address,
storage_key: B256,
block_number: BlockNumber,
) -> ProviderResult<()>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::StoragesHistory> + DbCursorRO<tables::StoragesHistory>,
{
writer.unwind_storage_history_shards(address, storage_key, block_number)
}

/// Unwinds account history shards using `EitherWriter` for `RocksDB` support.
///
/// This reimplements the shard unwinding logic with support for both MDBX and `RocksDB`.
/// Walks through shards for a given key, deleting those >= unwind point and preserving
/// indices below the unwind point.
#[allow(dead_code)]
pub(crate) fn unwind_accounts_history_shards<CURSOR, N>(
writer: &mut EitherWriter<'_, CURSOR, N>,
address: Address,
block_number: BlockNumber,
) -> ProviderResult<()>
where
N: NodePrimitives,
CURSOR: DbCursorRW<tables::AccountsHistory> + DbCursorRO<tables::AccountsHistory>,
{
writer.unwind_account_history_shards(address, block_number)
}

/// Called when database is ahead of static files. Attempts to find the first block we are missing
/// transactions for.
pub(crate) fn missing_static_data_error<Provider>(
Expand Down
6 changes: 3 additions & 3 deletions crates/storage/db-api/src/models/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ impl StorageSettings {
receipts_in_static_files: true,
transaction_senders_in_static_files: true,
account_changesets_in_static_files: true,
storages_history_in_rocksdb: false,
transaction_hash_numbers_in_rocksdb: false,
account_history_in_rocksdb: false,
storages_history_in_rocksdb: true,
transaction_hash_numbers_in_rocksdb: true,
account_history_in_rocksdb: true,
}
}

Expand Down
Loading
Loading