Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions crates/optimism/trie/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use alloy_primitives::{map::HashMap, B256, U256};
use auto_impl::auto_impl;
use reth_primitives_traits::Account;
use reth_trie::{updates::TrieUpdates, BranchNodeCompact, HashedPostState, Nibbles};
use std::fmt::Debug;
use std::{fmt::Debug, time::Duration};

/// Seeks and iterates over trie nodes in the database by path (lexicographical order)
pub trait OpProofsTrieCursorRO: Send + Sync {
Expand Down Expand Up @@ -57,6 +57,32 @@ pub struct BlockStateDiff {
pub post_state: HashedPostState,
}

/// Counts of trie updates written to storage.
#[derive(Debug, Clone, Default)]
pub struct WriteCounts {
/// Number of account trie updates written
pub account_trie_updates_written_total: u64,
/// Number of storage trie updates written
pub storage_trie_updates_written_total: u64,
/// Number of hashed accounts written
pub hashed_accounts_written_total: u64,
/// Number of hashed storages written
pub hashed_storages_written_total: u64,
}

/// Duration metrics for block processing.
#[derive(Debug, Default, Clone)]
pub struct OperationDurations {
/// Total time to process a block (end-to-end) in seconds
pub total_duration_seconds: Duration,
/// Time spent executing the block (EVM) in seconds
pub execution_duration_seconds: Duration,
/// Time spent calculating state root in seconds
pub state_root_duration_seconds: Duration,
/// Time spent writing trie updates to storage in seconds
pub write_duration_seconds: Duration,
}

/// Trait for reading trie nodes from the database.
///
/// Only leaf nodes and some branch nodes are stored. The bottom layer of branch nodes
Expand Down Expand Up @@ -157,7 +183,7 @@ pub trait OpProofsStore: Send + Sync + Debug {
&self,
block_ref: BlockWithParent,
block_state_diff: BlockStateDiff,
) -> impl Future<Output = OpProofsStorageResult<()>> + Send;
) -> impl Future<Output = OpProofsStorageResult<WriteCounts>> + Send;

/// Fetch all updates for a given block number.
fn fetch_trie_updates(
Expand Down
20 changes: 13 additions & 7 deletions crates/optimism/trie/src/db/store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{BlockNumberHash, ProofWindow, ProofWindowKey};
use crate::{
api::WriteCounts,
db::{
cursor::Dup,
models::{
Expand Down Expand Up @@ -81,7 +82,7 @@ impl MdbxProofsStorage {
tx: &<DatabaseEnv as Database>::TXMut,
block_ref: BlockWithParent,
block_state_diff: BlockStateDiff,
) -> OpProofsStorageResult<()> {
) -> OpProofsStorageResult<WriteCounts> {
let block_number = block_ref.block.number;
// TODO: refactor the code - remove sorting out of tx

Expand Down Expand Up @@ -134,13 +135,16 @@ impl MdbxProofsStorage {
let mut hashed_account_keys = Vec::<B256>::with_capacity(hashed_account_len);
let mut hashed_storage_keys = Vec::<HashedStorageKey>::with_capacity(hashed_storage_len);

let mut write_counts = WriteCounts::default();

let mut account_trie_cursor = tx.new_cursor::<AccountTrieHistory>()?;
for (path, node) in sorted_account_nodes {
let key: StoredNibbles = path.into();
let vv = VersionedValue { block_number, value: MaybeDeleted(node) };
account_trie_cursor.append_dup(key.clone(), vv)?;

account_trie_keys.push(key);
write_counts.account_trie_updates_written_total += 1;
}

let mut storage_trie_cursor = tx.new_cursor::<StorageTrieHistory>()?;
Expand All @@ -156,6 +160,7 @@ impl MdbxProofsStorage {
let del = VersionedValue { block_number, value: MaybeDeleted(None) };
storage_trie_cursor
.append_dup(StorageTrieKey::new(hashed_address, path.into()), del)?;
write_counts.storage_trie_updates_written_total += 1;
}
// Skip any further processing for this hashed_address
continue;
Expand All @@ -166,6 +171,7 @@ impl MdbxProofsStorage {
storage_trie_cursor.append_dup(key.clone(), vv)?;

storage_trie_keys.push(key);
write_counts.storage_trie_updates_written_total += 1;
}
}

Expand All @@ -175,6 +181,7 @@ impl MdbxProofsStorage {
account_cursor.append_dup(hashed_address, vv)?;

hashed_account_keys.push(hashed_address);
write_counts.hashed_accounts_written_total += 1;
}

let mut storage_cursor = tx.new_cursor::<HashedStorageHistory>()?;
Expand All @@ -189,6 +196,7 @@ impl MdbxProofsStorage {
// Mark deleted at current block
let del = VersionedValue { block_number, value: MaybeDeleted(None) };
storage_cursor.append_dup(HashedStorageKey::new(*hashed_address, key), del)?;
write_counts.hashed_storages_written_total += 1;
}
// Skip any further processing for this hashed_address
continue;
Expand All @@ -203,6 +211,7 @@ impl MdbxProofsStorage {
storage_cursor.append_dup(key.clone(), vv)?;

hashed_storage_keys.push(key);
write_counts.hashed_storages_written_total += 1;
}
}

Expand All @@ -224,7 +233,7 @@ impl MdbxProofsStorage {
ProofWindowKey::LatestBlock,
&BlockNumberHash::new(block_number, block_ref.block.hash),
)?;
Ok(())
Ok(write_counts)
}
}

Expand Down Expand Up @@ -406,11 +415,8 @@ impl OpProofsStore for MdbxProofsStorage {
&self,
block_ref: BlockWithParent,
block_state_diff: BlockStateDiff,
) -> OpProofsStorageResult<()> {
self.env.update(|tx| {
self.store_trie_updates_inner(tx, block_ref, block_state_diff)?;
Ok(())
})?
) -> OpProofsStorageResult<WriteCounts> {
self.env.update(|tx| self.store_trie_updates_inner(tx, block_ref, block_state_diff))?
}

async fn fetch_trie_updates(&self, block_number: u64) -> OpProofsStorageResult<BlockStateDiff> {
Expand Down
27 changes: 20 additions & 7 deletions crates/optimism/trie/src/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! In-memory implementation of [`OpProofsStore`] for testing purposes

use crate::{
BlockStateDiff, OpProofsHashedCursorRO, OpProofsStorageError, OpProofsStorageResult,
OpProofsStore, OpProofsTrieCursorRO,
api::WriteCounts, BlockStateDiff, OpProofsHashedCursorRO, OpProofsStorageError,
OpProofsStorageResult, OpProofsStore, OpProofsTrieCursorRO,
};
use alloy_eips::eip1898::BlockWithParent;
use alloy_primitives::{map::HashMap, B256, U256};
Expand Down Expand Up @@ -43,10 +43,17 @@ struct InMemoryStorageInner {
}

impl InMemoryStorageInner {
fn store_trie_updates(&mut self, block_number: u64, block_state_diff: BlockStateDiff) {
fn store_trie_updates(
&mut self,
block_number: u64,
block_state_diff: BlockStateDiff,
) -> WriteCounts {
let mut result = WriteCounts::default();

// Store account branch nodes
for (path, branch) in block_state_diff.trie_updates.account_nodes_ref() {
self.account_branches.insert((block_number, *path), Some(branch.clone()));
result.account_trie_updates_written_total += 1;
}

// Store removed account nodes
Expand All @@ -62,13 +69,15 @@ impl InMemoryStorageInner {

for (path, branch) in account_removals {
self.account_branches.insert((block_number, *path), branch);
result.account_trie_updates_written_total += 1;
}

// Store storage branch nodes and removals
for (address, storage_trie_updates) in block_state_diff.trie_updates.storage_tries_ref() {
// Store storage branch nodes
for (path, branch) in storage_trie_updates.storage_nodes_ref() {
self.storage_branches.insert((block_number, *address, *path), Some(branch.clone()));
result.storage_trie_updates_written_total += 1;
}

// Store removed storage nodes
Expand All @@ -82,11 +91,13 @@ impl InMemoryStorageInner {

for (path, branch) in storage_removals {
self.storage_branches.insert((block_number, *address, *path), branch);
result.storage_trie_updates_written_total += 1;
}
}

for (address, account) in &block_state_diff.post_state.accounts {
self.hashed_accounts.insert((block_number, *address), *account);
result.hashed_accounts_written_total += 1;
}

for (hashed_address, storage) in &block_state_diff.post_state.storages {
Expand Down Expand Up @@ -114,17 +125,21 @@ impl InMemoryStorageInner {
if !value.is_zero() {
self.hashed_storages
.insert((block_number, *hashed_address, slot), U256::ZERO);
result.hashed_storages_written_total += 1;
}
}
} else {
for (slot, value) in &storage.storage {
self.hashed_storages.insert((block_number, *hashed_address, *slot), *value);
result.hashed_storages_written_total += 1;
}
}
}

self.trie_updates.insert(block_number, block_state_diff.trie_updates.clone());
self.post_states.insert(block_number, block_state_diff.post_state.clone());

result
}
}

Expand Down Expand Up @@ -509,12 +524,10 @@ impl OpProofsStore for InMemoryProofsStorage {
&self,
block_ref: BlockWithParent,
block_state_diff: BlockStateDiff,
) -> OpProofsStorageResult<()> {
) -> OpProofsStorageResult<WriteCounts> {
let mut inner = self.inner.write().await;

inner.store_trie_updates(block_ref.block.number, block_state_diff);

Ok(())
Ok(inner.store_trie_updates(block_ref.block.number, block_state_diff))
}

async fn fetch_trie_updates(&self, block_number: u64) -> OpProofsStorageResult<BlockStateDiff> {
Expand Down
37 changes: 20 additions & 17 deletions crates/optimism/trie/src/live.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Live trie collector for external proofs storage.

use crate::{
provider::OpProofsStateProviderRef, BlockStateDiff, OpProofsStorage, OpProofsStorageError,
OpProofsStore,
api::OperationDurations, provider::OpProofsStateProviderRef, BlockStateDiff, OpProofsStorage,
OpProofsStorageError, OpProofsStore,
};
use alloy_eips::{eip1898::BlockWithParent, NumHash};
use derive_more::Constructor;
Expand Down Expand Up @@ -39,6 +39,8 @@ where
&self,
block: &RecoveredBlock<BlockTy<Evm::Primitives>>,
) -> eyre::Result<()> {
let mut operation_durations = OperationDurations::default();

let start = Instant::now();
// ensure that we have the state of the parent block
let (Some((earliest, _)), Some((latest, _))) = (
Expand All @@ -48,8 +50,6 @@ where
return Err(OpProofsStorageError::NoBlocksFound.into());
};

let fetch_block_duration = start.elapsed();

let parent_block_number = block.number() - 1;
if parent_block_number < earliest {
return Err(OpProofsStorageError::UnknownParent.into());
Expand All @@ -75,21 +75,20 @@ where
parent_block_number,
);

let init_provider_duration = start.elapsed() - fetch_block_duration;

let db = StateProviderDatabase::new(&state_provider);
let block_executor = self.evm_config.batch_executor(db);

let execution_result =
block_executor.execute(&(*block).clone()).map_err(|err| eyre::eyre!(err))?;

let execute_block_duration = start.elapsed() - init_provider_duration;
operation_durations.execution_duration_seconds = start.elapsed();

let hashed_state = state_provider.hashed_post_state(&execution_result.state);
let (state_root, trie_updates) =
state_provider.state_root_with_updates(hashed_state.clone())?;

let calculate_state_root_duration = start.elapsed() - execute_block_duration;
operation_durations.state_root_duration_seconds =
start.elapsed() - operation_durations.execution_duration_seconds;

if state_root != block.state_root() {
return Err(OpProofsStorageError::StateRootMismatch {
Expand All @@ -100,24 +99,28 @@ where
.into());
}

self.storage
let update_result = self
.storage
.store_trie_updates(
block_ref,
BlockStateDiff { trie_updates, post_state: hashed_state },
)
.await?;

let write_trie_updates_duration = start.elapsed() - calculate_state_root_duration;
let execute_and_store_total_duration = start.elapsed();
operation_durations.total_duration_seconds = start.elapsed();
operation_durations.write_duration_seconds = operation_durations.total_duration_seconds -
operation_durations.state_root_duration_seconds;

#[cfg(feature = "metrics")]
{
let block_metrics = self.storage.metrics().block_metrics();
block_metrics.record_operation_durations(&operation_durations);
block_metrics.increment_write_counts(&update_result);
}

info!(
block_number = block.number(),
?execute_and_store_total_duration,
?fetch_block_duration,
?init_provider_duration,
?execute_block_duration,
?calculate_state_root_duration,
?write_trie_updates_duration,
?operation_durations,
"Trie updates stored successfully",
);

Expand Down
26 changes: 24 additions & 2 deletions crates/optimism/trie/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Storage wrapper that records metrics for all operations.

use crate::{
api::{OperationDurations, WriteCounts},
cursor, BlockStateDiff, OpProofsHashedCursorRO, OpProofsStorageResult, OpProofsStore,
OpProofsTrieCursorRO,
};
Expand Down Expand Up @@ -218,6 +219,26 @@ pub struct BlockMetrics {
pub hashed_storages_written_total: Counter,
}

impl BlockMetrics {
/// Record operation durations for the processing of a block.
pub fn record_operation_durations(&self, durations: &OperationDurations) {
self.total_duration_seconds.record(durations.total_duration_seconds);
self.execution_duration_seconds.record(durations.execution_duration_seconds);
self.state_root_duration_seconds.record(durations.state_root_duration_seconds);
self.write_duration_seconds.record(durations.write_duration_seconds);
}

/// Increment write counts of historical trie updates for a single block.
pub fn increment_write_counts(&self, counts: &WriteCounts) {
self.account_trie_updates_written_total
.increment(counts.account_trie_updates_written_total);
self.storage_trie_updates_written_total
.increment(counts.storage_trie_updates_written_total);
self.hashed_accounts_written_total.increment(counts.hashed_accounts_written_total);
self.hashed_storages_written_total.increment(counts.hashed_storages_written_total);
}
}

/// Wrapper for [`OpProofsTrieCursorRO`] that records metrics.
#[derive(Debug, Constructor, Clone)]
pub struct OpProofsTrieCursorWithMetrics<C> {
Expand Down Expand Up @@ -459,16 +480,17 @@ where
Ok(OpProofsHashedCursorWithMetrics::new(cursor, self.metrics.clone()))
}

// no metrics for these
// metrics are handled by the live trie collector
#[inline]
async fn store_trie_updates(
&self,
block_ref: BlockWithParent,
block_state_diff: BlockStateDiff,
) -> OpProofsStorageResult<()> {
) -> OpProofsStorageResult<WriteCounts> {
self.storage.store_trie_updates(block_ref, block_state_diff).await
}

// no metrics for these
#[inline]
async fn fetch_trie_updates(&self, block_number: u64) -> OpProofsStorageResult<BlockStateDiff> {
self.storage.fetch_trie_updates(block_number).await
Expand Down
Loading