Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/storage/db-api/src/models/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,11 @@ impl StorageSettings {
self.account_changesets_in_static_files = value;
self
}

/// Returns `true` if any tables are configured to be stored in `RocksDB`.
pub const fn any_in_rocksdb(&self) -> bool {
self.transaction_hash_numbers_in_rocksdb ||
self.account_history_in_rocksdb ||
self.storages_history_in_rocksdb
}
}
58 changes: 43 additions & 15 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
},
providers::{
database::{chain::ChainStorage, metrics},
rocksdb::RocksDBProvider,
rocksdb::{PendingRocksDBBatches, RocksDBProvider, RocksDBWriteCtx},
static_file::{StaticFileWriteCtx, StaticFileWriter},
NodeTypesForProvider, StaticFileProvider,
},
Expand Down Expand Up @@ -188,8 +188,8 @@ pub struct DatabaseProvider<TX, N: NodeTypes> {
/// `RocksDB` provider
rocksdb_provider: RocksDBProvider,
/// Pending `RocksDB` batches to be committed at provider commit time.
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex<Vec<rocksdb::WriteBatchWithTransaction<true>>>,
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
pending_rocksdb_batches: PendingRocksDBBatches,
/// Minimum distance from tip required for pruning
minimum_pruning_distance: u64,
/// Database provider metrics
Expand All @@ -205,10 +205,10 @@ impl<TX: Debug, N: NodeTypes> Debug for DatabaseProvider<TX, N> {
.field("prune_modes", &self.prune_modes)
.field("storage", &self.storage)
.field("storage_settings", &self.storage_settings)
.field("rocksdb_provider", &self.rocksdb_provider);
#[cfg(all(unix, feature = "rocksdb"))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this?

s.field("pending_rocksdb_batches", &"<pending batches>");
s.field("minimum_pruning_distance", &self.minimum_pruning_distance).finish()
.field("rocksdb_provider", &self.rocksdb_provider)
.field("pending_rocksdb_batches", &"<pending batches>")
.field("minimum_pruning_distance", &self.minimum_pruning_distance)
.finish()
}
}

Expand Down Expand Up @@ -336,8 +336,7 @@ impl<TX: DbTxMut, N: NodeTypes> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this feature?

pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
Expand Down Expand Up @@ -403,6 +402,17 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
})
}

/// Creates the context for `RocksDB` writes.
#[cfg_attr(not(all(unix, feature = "rocksdb")), allow(dead_code))]
fn rocksdb_write_ctx(&self, first_block: BlockNumber) -> RocksDBWriteCtx {
RocksDBWriteCtx {
first_block_number: first_block,
prune_tx_lookup: self.prune_modes.transaction_lookup,
storage_settings: self.cached_storage_settings(),
pending_batches: self.pending_rocksdb_batches.clone(),
}
}

/// Writes executed blocks and state to storage.
///
/// This method parallelizes static file (SF) writes with MDBX writes.
Expand Down Expand Up @@ -452,6 +462,10 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
// avoid capturing &self.tx in scope below.
let sf_provider = &self.static_file_provider;
let sf_ctx = self.static_file_write_ctx(save_mode, first_number, last_block_number)?;
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_provider = self.rocksdb_provider.clone();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_ctx = self.rocksdb_write_ctx(first_number);

thread::scope(|s| {
// SF writes
Expand All @@ -461,6 +475,16 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
Ok::<_, ProviderError>(start.elapsed())
});

// RocksDB writes
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_handle = rocksdb_ctx.storage_settings.any_in_rocksdb().then(|| {
s.spawn(|| {
let start = Instant::now();
rocksdb_provider.write_blocks_data(&blocks, &tx_nums, rocksdb_ctx)?;
Ok::<_, ProviderError>(start.elapsed())
})
});

// MDBX writes
let mdbx_start = Instant::now();

Expand Down Expand Up @@ -557,6 +581,12 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
.join()
.map_err(|_| StaticFileWriterError::ThreadPanic("static file"))??;

// Wait for RocksDB thread
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(handle) = rocksdb_handle {
timings.rocksdb = handle.join().expect("RocksDB thread panicked")?;
}

timings.total = total_start.elapsed();

self.metrics.record_save_blocks(&timings);
Expand Down Expand Up @@ -821,8 +851,7 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> DatabaseProvider<TX, N> {
storage,
storage_settings,
rocksdb_provider,
#[cfg(all(unix, feature = "rocksdb"))]
pending_rocksdb_batches: parking_lot::Mutex::new(Vec::new()),
pending_rocksdb_batches: Default::default(),
minimum_pruning_distance: MINIMUM_PRUNING_DISTANCE,
metrics: metrics::DatabaseProviderMetrics::default(),
}
Expand Down Expand Up @@ -3167,14 +3196,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes> HistoryWriter for DatabaseProvi

#[instrument(level = "debug", target = "providers::db", skip_all)]
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<()> {
// account history stage
{
let storage_settings = self.cached_storage_settings();
if !storage_settings.account_history_in_rocksdb {
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
self.insert_account_history_index(indices)?;
}

// storage history stage
{
if !storage_settings.storages_history_in_rocksdb {
let indices = self.changed_storages_and_blocks_with_range(range)?;
self.insert_storage_history_index(indices)?;
}
Expand Down
1 change: 1 addition & 0 deletions crates/storage/provider/src/providers/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ mod invariants;
mod metrics;
mod provider;

pub(crate) use provider::{PendingRocksDBBatches, RocksDBWriteCtx};
pub use provider::{RocksDBBatch, RocksDBBuilder, RocksDBProvider, RocksTx};
158 changes: 156 additions & 2 deletions crates/storage/provider/src/providers/rocksdb/provider.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use super::metrics::{RocksDBMetrics, RocksDBOperation};
use crate::providers::{needs_prev_shard_check, HistoryInfo};
use alloy_primitives::{Address, BlockNumber, B256};
use alloy_consensus::transaction::TxHashRef;
use alloy_primitives::{Address, BlockNumber, TxNumber, B256};
use parking_lot::Mutex;
use reth_chain_state::ExecutedBlock;
use reth_db_api::{
models::{storage_sharded_key::StorageShardedKey, ShardedKey},
models::{storage_sharded_key::StorageShardedKey, ShardedKey, StorageSettings},
table::{Compress, Decode, Decompress, Encode, Table},
tables, BlockNumberList, DatabaseError,
};
use reth_primitives_traits::BlockBody as _;
use reth_prune_types::PruneMode;
use reth_storage_errors::{
db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation, LogLevel},
provider::{ProviderError, ProviderResult},
Expand All @@ -16,11 +21,41 @@ use rocksdb::{
OptimisticTransactionOptions, Options, Transaction, WriteBatchWithTransaction, WriteOptions,
};
use std::{
collections::BTreeMap,
fmt,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Instant,
};
use tracing::instrument;

/// Pending `RocksDB` batches type alias.
pub(crate) type PendingRocksDBBatches = Arc<Mutex<Vec<WriteBatchWithTransaction<true>>>>;

/// Context for `RocksDB` block writes.
#[derive(Clone)]
pub(crate) struct RocksDBWriteCtx {
/// The first block number being written.
pub first_block_number: BlockNumber,
/// The prune mode for transaction lookup, if any.
pub prune_tx_lookup: Option<PruneMode>,
/// Storage settings determining what goes to `RocksDB`.
pub storage_settings: StorageSettings,
/// Pending batches to push to after writing.
pub pending_batches: PendingRocksDBBatches,
}

impl fmt::Debug for RocksDBWriteCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDBWriteCtx")
.field("first_block_number", &self.first_block_number)
.field("prune_tx_lookup", &self.prune_tx_lookup)
.field("storage_settings", &self.storage_settings)
.field("pending_batches", &"<pending batches>")
.finish()
}
}

/// Default cache size for `RocksDB` block cache (128 MB).
const DEFAULT_CACHE_SIZE: usize = 128 << 20;
Expand Down Expand Up @@ -474,6 +509,125 @@ impl RocksDBProvider {
}))
})
}

/// Writes all `RocksDB` data for multiple blocks in parallel.
///
/// This handles transaction hash numbers, account history, and storage history based on
/// the provided storage settings. Each operation runs in parallel with its own batch,
/// pushing to `ctx.pending_batches` for later commit.
#[instrument(level = "debug", target = "providers::db", skip_all)]
pub(crate) fn write_blocks_data<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: RocksDBWriteCtx,
) -> ProviderResult<()> {
if !ctx.storage_settings.any_in_rocksdb() {
return Ok(());
}

thread::scope(|s| {
let handles: Vec<_> = [
(ctx.storage_settings.transaction_hash_numbers_in_rocksdb &&
ctx.prune_tx_lookup.is_none_or(|m| !m.is_full()))
.then(|| s.spawn(|| self.write_tx_hash_numbers(blocks, tx_nums, &ctx))),
ctx.storage_settings
.account_history_in_rocksdb
.then(|| s.spawn(|| self.write_account_history(blocks, &ctx))),
ctx.storage_settings
.storages_history_in_rocksdb
.then(|| s.spawn(|| self.write_storage_history(blocks, &ctx))),
]
.into_iter()
.enumerate()
.filter_map(|(i, h)| h.map(|h| (i, h)))
.collect();

for (i, handle) in handles {
handle.join().map_err(|_| {
ProviderError::Database(DatabaseError::Other(format!(
"rocksdb write thread {i} panicked"
)))
})??;
}

Ok(())
})
}

/// Writes transaction hash to number mappings for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_tx_hash_numbers<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
tx_nums: &[TxNumber],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
for (block, &first_tx_num) in blocks.iter().zip(tx_nums) {
let body = block.recovered_block().body();
let mut tx_num = first_tx_num;
for transaction in body.transactions_iter() {
batch.put::<tables::TransactionHashNumbers>(*transaction.tx_hash(), &tx_num)?;
tx_num += 1;
}
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}

/// Writes account history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_account_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut account_history: BTreeMap<Address, Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for &address in bundle.state().keys() {
account_history.entry(address).or_default().push(block_number);
}
}
for (address, blocks) in account_history {
let key = ShardedKey::new(address, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::AccountsHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}

/// Writes storage history indices for the given blocks.
#[instrument(level = "debug", target = "providers::db", skip_all)]
fn write_storage_history<N: reth_node_types::NodePrimitives>(
&self,
blocks: &[ExecutedBlock<N>],
ctx: &RocksDBWriteCtx,
) -> ProviderResult<()> {
let mut batch = self.batch();
let mut storage_history: BTreeMap<(Address, B256), Vec<u64>> = BTreeMap::new();
for (block_idx, block) in blocks.iter().enumerate() {
let block_number = ctx.first_block_number + block_idx as u64;
let bundle = &block.execution_outcome().bundle;
for (&address, account) in bundle.state() {
for &slot in account.storage.keys() {
let key = B256::new(slot.to_be_bytes());
storage_history.entry((address, key)).or_default().push(block_number);
}
}
}
for ((address, slot), blocks) in storage_history {
let key = StorageShardedKey::new(address, slot, u64::MAX);
let value = BlockNumberList::new_pre_sorted(blocks);
batch.put::<tables::StoragesHistory>(key, &value)?;
}
ctx.pending_batches.lock().push(batch.into_inner());
Ok(())
}
}

/// Handle for building a batch of operations atomically.
Expand Down
Loading
Loading