Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use reth_ethereum_primitives::EthPrimitives;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
Expand Down Expand Up @@ -151,7 +151,7 @@ where
if last_block.is_some() {
let provider_rw = self.provider.database_provider_rw()?;

provider_rw.save_blocks(blocks)?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
provider_rw.commit()?;
}

Expand Down
8 changes: 6 additions & 2 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt};
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter,
ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader,
};
use reth_stages::{StageCheckpoint, StageId};
Expand Down Expand Up @@ -228,7 +228,11 @@ where
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());

// finally, write the receipts
provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(
&execution_outcome,
OriginalValuesKnown::Yes,
StateWriteConfig::default(),
)?;
}

// Only commit if we have imported as many receipts as the number of transactions.
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockReader, DBProvider, EitherWriter, ExecutionOutcome, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriteConfig, StateWriter,
StaticFileProviderFactory, StatsReader, StorageSettingsCache, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
Expand Down Expand Up @@ -463,7 +463,7 @@ where
}

// write output
provider.write_state(&state, OriginalValuesKnown::Yes)?;
provider.write_state(&state, OriginalValuesKnown::Yes, StateWriteConfig::default())?;

let db_write_duration = time.elapsed();
debug!(
Expand Down
10 changes: 7 additions & 3 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use reth_provider::{
errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader,
BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome,
HashingWriter, HeaderProvider, HistoryWriter, MetadataWriter, OriginalValuesKnown,
ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriter,
StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, StateWriteConfig,
StateWriter, StaticFileProviderFactory, StorageSettings, StorageSettingsCache, TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_static_file_types::StaticFileSegment;
Expand Down Expand Up @@ -334,7 +334,11 @@ where
Vec::new(),
);

provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?;
provider.write_state(
&execution_outcome,
OriginalValuesKnown::Yes,
StateWriteConfig::default(),
)?;

trace!(target: "reth::cli", "Inserted state");

Expand Down
3 changes: 3 additions & 0 deletions crates/storage/errors/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ pub enum StaticFileWriterError {
/// Cannot call `sync_all` or `finalize` when prune is queued.
#[error("cannot call sync_all or finalize when prune is queued, use commit() instead")]
FinalizeWithPruneQueued,
/// Thread panicked during execution.
#[error("thread panicked: {_0}")]
ThreadPanic(&'static str),
/// Other error with message.
#[error("{0}")]
Other(String),
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/libmdbx-rs/src/txn_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ impl TxnManager {
match rx.recv() {
Ok(msg) => match msg {
TxnManagerMessage::Begin { parent, flags, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
.entered();
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
let res = mdbx_result(unsafe {
ffi::mdbx_txn_begin_ex(
Expand All @@ -72,9 +75,13 @@ impl TxnManager {
sender.send(res).unwrap();
}
TxnManagerMessage::Abort { tx, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
sender
.send({
let mut latency = CommitLatency::new();
Expand Down
35 changes: 35 additions & 0 deletions crates/storage/provider/src/either_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,41 @@ where
}
}

/// Puts multiple transaction hash number mappings in a batch.
///
/// Accepts a vector of `(TxHash, TxNumber)` tuples and writes them all using the same cursor.
/// This is more efficient than calling `put_transaction_hash_number` repeatedly.
///
/// When `append_only` is true, uses `cursor.append()` which requires entries to be
/// pre-sorted and the table to be empty or have only lower keys.
/// When false, uses `cursor.upsert()` which handles arbitrary insertion order.
pub fn put_transaction_hash_numbers_batch(
&mut self,
entries: Vec<(TxHash, TxNumber)>,
append_only: bool,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => {
for (hash, tx_num) in entries {
if append_only {
cursor.append(hash, &tx_num)?;
} else {
cursor.upsert(hash, &tx_num)?;
}
}
Ok(())
}
Self::StaticFile(_) => Err(ProviderError::UnsupportedProvider),
#[cfg(all(unix, feature = "rocksdb"))]
Self::RocksDB(batch) => {
for (hash, tx_num) in entries {
batch.put::<tables::TransactionHashNumbers>(hash, &tx_num)?;
}
Ok(())
}
}
}

/// Deletes a transaction hash number mapping.
pub fn delete_transaction_hash_number(&mut self, hash: TxHash) -> ProviderResult<()> {
match self {
Expand Down
7 changes: 4 additions & 3 deletions crates/storage/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub mod providers;
pub use providers::{
DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, HistoricalStateProvider,
HistoricalStateProviderRef, LatestStateProvider, LatestStateProviderRef, ProviderFactory,
StaticFileAccess, StaticFileProviderBuilder, StaticFileWriter,
SaveBlocksMode, StaticFileAccess, StaticFileProviderBuilder, StaticFileWriteCtx,
StaticFileWriter,
};

pub mod changeset_walker;
Expand All @@ -44,8 +45,8 @@ pub use revm_database::states::OriginalValuesKnown;
// reexport traits to avoid breaking changes
pub use reth_static_file_types as static_file;
pub use reth_storage_api::{
HistoryWriter, MetadataProvider, MetadataWriter, StatsReader, StorageSettings,
StorageSettingsCache,
HistoryWriter, MetadataProvider, MetadataWriter, StateWriteConfig, StatsReader,
StorageSettings, StorageSettingsCache,
};
/// Re-export provider error.
pub use reth_storage_errors::provider::{ProviderError, ProviderResult};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ mod tests {
create_test_provider_factory, create_test_provider_factory_with_chain_spec,
MockNodeTypesWithDB,
},
BlockWriter, CanonChainTracker, ProviderFactory,
BlockWriter, CanonChainTracker, ProviderFactory, SaveBlocksMode,
};
use alloy_eips::{BlockHashOrNumber, BlockNumHash, BlockNumberOrTag};
use alloy_primitives::{BlockNumber, TxNumber, B256};
Expand All @@ -808,8 +808,8 @@ mod tests {
use reth_storage_api::{
BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader,
BlockReaderIdExt, BlockSource, ChangeSetReader, DBProvider, DatabaseProviderFactory,
HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, StateWriter,
TransactionVariant, TransactionsProvider,
HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory,
StateWriteConfig, StateWriter, TransactionVariant, TransactionsProvider,
};
use reth_testing_utils::generators::{
self, random_block, random_block_range, random_changeset_range, random_eoa_accounts,
Expand Down Expand Up @@ -907,6 +907,7 @@ mod tests {
..Default::default()
},
OriginalValuesKnown::No,
StateWriteConfig::default(),
)?;
}

Expand Down Expand Up @@ -997,7 +998,7 @@ mod tests {

// Push to disk
let provider_rw = hook_provider.database_provider_rw().unwrap();
provider_rw.save_blocks(vec![lowest_memory_block]).unwrap();
provider_rw.save_blocks(vec![lowest_memory_block], SaveBlocksMode::Full).unwrap();
provider_rw.commit().unwrap();

// Remove from memory
Expand Down
93 changes: 64 additions & 29 deletions crates/storage/provider/src/providers/database/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,8 @@ pub(crate) enum Action {
InsertHeaderNumbers,
InsertBlockBodyIndices,
InsertTransactionBlocks,
GetNextTxNum,
InsertTransactionSenders,
InsertTransactionHashNumbers,
SaveBlocksInsertBlock,
SaveBlocksWriteState,
SaveBlocksWriteHashedState,
SaveBlocksWriteTrieChangesets,
SaveBlocksWriteTrieUpdates,
SaveBlocksUpdateHistoryIndices,
SaveBlocksUpdatePipelineStages,
}

/// Database provider metrics
Expand All @@ -66,19 +58,24 @@ pub(crate) struct DatabaseProviderMetrics {
insert_history_indices: Histogram,
/// Duration of update pipeline stages
update_pipeline_stages: Histogram,
/// Duration of insert canonical headers
/// Duration of insert header numbers
insert_header_numbers: Histogram,
/// Duration of insert block body indices
insert_block_body_indices: Histogram,
/// Duration of insert transaction blocks
insert_tx_blocks: Histogram,
/// Duration of get next tx num
get_next_tx_num: Histogram,
/// Duration of insert transaction senders
insert_transaction_senders: Histogram,
/// Duration of insert transaction hash numbers
insert_transaction_hash_numbers: Histogram,
/// Duration of `save_blocks`
save_blocks_total: Histogram,
/// Duration of MDBX work in `save_blocks`
save_blocks_mdbx: Histogram,
/// Duration of static file work in `save_blocks`
save_blocks_sf: Histogram,
/// Duration of `RocksDB` work in `save_blocks`
save_blocks_rocksdb: Histogram,
/// Duration of `insert_block` in `save_blocks`
save_blocks_insert_block: Histogram,
/// Duration of `write_state` in `save_blocks`
Expand All @@ -93,6 +90,39 @@ pub(crate) struct DatabaseProviderMetrics {
save_blocks_update_history_indices: Histogram,
/// Duration of `update_pipeline_stages` in `save_blocks`
save_blocks_update_pipeline_stages: Histogram,
/// Number of blocks per `save_blocks` call
save_blocks_block_count: Histogram,
/// Duration of MDBX commit in `save_blocks`
save_blocks_commit_mdbx: Histogram,
/// Duration of static file commit in `save_blocks`
save_blocks_commit_sf: Histogram,
/// Duration of `RocksDB` commit in `save_blocks`
save_blocks_commit_rocksdb: Histogram,
}

/// Timings collected during a `save_blocks` call.
#[derive(Debug, Default)]
pub(crate) struct SaveBlocksTimings {
pub total: Duration,
pub mdbx: Duration,
pub sf: Duration,
pub rocksdb: Duration,
pub insert_block: Duration,
pub write_state: Duration,
pub write_hashed_state: Duration,
pub write_trie_changesets: Duration,
pub write_trie_updates: Duration,
pub update_history_indices: Duration,
pub update_pipeline_stages: Duration,
pub block_count: u64,
}

/// Timings collected during a `commit` call.
#[derive(Debug, Default)]
pub(crate) struct CommitTimings {
pub mdbx: Duration,
pub sf: Duration,
pub rocksdb: Duration,
}

impl DatabaseProviderMetrics {
Expand All @@ -107,28 +137,33 @@ impl DatabaseProviderMetrics {
Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration),
Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration),
Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration),
Action::GetNextTxNum => self.get_next_tx_num.record(duration),
Action::InsertTransactionSenders => self.insert_transaction_senders.record(duration),
Action::InsertTransactionHashNumbers => {
self.insert_transaction_hash_numbers.record(duration)
}
Action::SaveBlocksInsertBlock => self.save_blocks_insert_block.record(duration),
Action::SaveBlocksWriteState => self.save_blocks_write_state.record(duration),
Action::SaveBlocksWriteHashedState => {
self.save_blocks_write_hashed_state.record(duration)
}
Action::SaveBlocksWriteTrieChangesets => {
self.save_blocks_write_trie_changesets.record(duration)
}
Action::SaveBlocksWriteTrieUpdates => {
self.save_blocks_write_trie_updates.record(duration)
}
Action::SaveBlocksUpdateHistoryIndices => {
self.save_blocks_update_history_indices.record(duration)
}
Action::SaveBlocksUpdatePipelineStages => {
self.save_blocks_update_pipeline_stages.record(duration)
}
}
}

/// Records all `save_blocks` timings.
pub(crate) fn record_save_blocks(&self, timings: &SaveBlocksTimings) {
self.save_blocks_total.record(timings.total);
self.save_blocks_mdbx.record(timings.mdbx);
self.save_blocks_sf.record(timings.sf);
self.save_blocks_rocksdb.record(timings.rocksdb);
self.save_blocks_insert_block.record(timings.insert_block);
self.save_blocks_write_state.record(timings.write_state);
self.save_blocks_write_hashed_state.record(timings.write_hashed_state);
self.save_blocks_write_trie_changesets.record(timings.write_trie_changesets);
self.save_blocks_write_trie_updates.record(timings.write_trie_updates);
self.save_blocks_update_history_indices.record(timings.update_history_indices);
self.save_blocks_update_pipeline_stages.record(timings.update_pipeline_stages);
self.save_blocks_block_count.record(timings.block_count as f64);
}

/// Records all commit timings.
pub(crate) fn record_commit(&self, timings: &CommitTimings) {
self.save_blocks_commit_mdbx.record(timings.mdbx);
self.save_blocks_commit_sf.record(timings.sf);
self.save_blocks_commit_rocksdb.record(timings.rocksdb);
}
}
2 changes: 1 addition & 1 deletion crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::{
use tracing::trace;

mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW, SaveBlocksMode};

use super::ProviderNodeTypes;
use reth_trie::KeccakKeyHasher;
Expand Down
Loading
Loading