Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 73 additions & 51 deletions crates/trie/parallel/src/proof_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
};
use alloy_primitives::{
map::{B256Map, B256Set},
B256,
B256, U256,
};
use crossbeam_channel::{unbounded, Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use reth_execution_errors::{SparseTrieError, SparseTrieErrorKind, StateProofError};
Expand All @@ -43,10 +43,10 @@ use reth_provider::{DatabaseProviderROFactory, ProviderError, ProviderResult};
use reth_storage_errors::db::DatabaseError;
use reth_tasks::Runtime;
use reth_trie::{
hashed_cursor::HashedCursorFactory,
hashed_cursor::{HashedCursorFactory, HashedStorageCursor, InstrumentedHashedCursor},
proof::{ProofBlindedAccountProvider, ProofBlindedStorageProvider},
proof_v2,
trie_cursor::TrieCursorFactory,
trie_cursor::{InstrumentedTrieCursor, TrieCursorFactory, TrieStorageCursor},
DecodedMultiProofV2, HashedPostState, MultiProofTargetsV2, Nibbles, ProofTrieNodeV2,
ProofV2Target,
};
Expand All @@ -70,20 +70,20 @@ use crate::proof_task_metrics::{

type TrieNodeProviderResult = Result<Option<RevealedNode>, SparseTrieError>;

/// Type alias for the V2 account proof calculator.
/// Type alias for the V2 account proof calculator with instrumented cursors.
type V2AccountProofCalculator<'a, Provider> = proof_v2::ProofCalculator<
<Provider as TrieCursorFactory>::AccountTrieCursor<'a>,
<Provider as HashedCursorFactory>::AccountCursor<'a>,
InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::AccountTrieCursor<'a>>,
InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::AccountCursor<'a>>,
AsyncAccountValueEncoder<
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
<Provider as HashedCursorFactory>::StorageCursor<'a>,
InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
>,
>;

/// Type alias for the V2 storage proof calculator.
/// Type alias for the V2 storage proof calculator with instrumented cursors.
type V2StorageProofCalculator<'a, Provider> = proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'a>,
<Provider as HashedCursorFactory>::StorageCursor<'a>,
InstrumentedTrieCursor<'a, <Provider as TrieCursorFactory>::StorageTrieCursor<'a>>,
InstrumentedHashedCursor<'a, <Provider as HashedCursorFactory>::StorageCursor<'a>>,
>;

/// A handle that provides type-safe access to proof worker pools.
Expand Down Expand Up @@ -428,14 +428,15 @@ impl<Provider> ProofTaskTx<Provider>
where
Provider: TrieCursorFactory + HashedCursorFactory,
{
fn compute_v2_storage_proof(
fn compute_v2_storage_proof<TC, HC>(
&self,
input: StorageProofInput,
calculator: &mut proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
) -> Result<StorageProofResult, StateProofError> {
calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
) -> Result<StorageProofResult, StateProofError>
where
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = U256>,
{
let StorageProofInput { hashed_address, mut targets } = input;

let span = debug_span!(
Expand Down Expand Up @@ -706,8 +707,16 @@ where
let mut cursor_metrics_cache = ProofTaskCursorMetricsCache::default();
let trie_cursor = proof_tx.provider.storage_trie_cursor(B256::ZERO)?;
let hashed_cursor = proof_tx.provider.hashed_storage_cursor(B256::ZERO)?;
let mut v2_calculator =
proof_v2::StorageProofCalculator::new_storage(trie_cursor, hashed_cursor);
let instrumented_trie_cursor =
InstrumentedTrieCursor::new(trie_cursor, &mut cursor_metrics_cache.storage_trie_cursor);
let instrumented_hashed_cursor = InstrumentedHashedCursor::new(
hashed_cursor,
&mut cursor_metrics_cache.storage_hashed_cursor,
);
let mut v2_calculator = proof_v2::StorageProofCalculator::new_storage(
instrumented_trie_cursor,
instrumented_hashed_cursor,
);

// Initially mark this worker as available.
self.available_workers.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -763,6 +772,9 @@ where
idle_start = Instant::now();
}

// Drop calculator to release mutable borrows on cursor_metrics_cache.
drop(v2_calculator);

trace!(
target: "trie::proof_task",
worker_id = self.worker_id,
Expand All @@ -783,18 +795,17 @@ where
}

/// Processes a storage proof request.
fn process_storage_proof<Provider>(
fn process_storage_proof<Provider, TC, HC>(
&self,
proof_tx: &ProofTaskTx<Provider>,
v2_calculator: &mut proof_v2::StorageProofCalculator<
<Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
v2_calculator: &mut proof_v2::StorageProofCalculator<TC, HC>,
input: StorageProofInput,
proof_result_sender: CrossbeamSender<StorageProofResultMessage>,
storage_proofs_processed: &mut u64,
) where
Provider: TrieCursorFactory + HashedCursorFactory,
TC: TrieStorageCursor,
HC: HashedStorageCursor<Value = U256>,
{
let hashed_address = input.hashed_address;
let proof_start = Instant::now();
Expand Down Expand Up @@ -980,18 +991,42 @@ where
let storage_trie_cursor = provider.storage_trie_cursor(B256::ZERO)?;
let storage_hashed_cursor = provider.hashed_storage_cursor(B256::ZERO)?;

let mut v2_account_calculator = proof_v2::ProofCalculator::<
_,
_,
AsyncAccountValueEncoder<
<Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
<Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
>::new(account_trie_cursor, account_hashed_cursor);
let instrumented_account_trie_cursor = InstrumentedTrieCursor::new(
account_trie_cursor,
&mut cursor_metrics_cache.account_trie_cursor,
);
let instrumented_account_hashed_cursor = InstrumentedHashedCursor::new(
account_hashed_cursor,
&mut cursor_metrics_cache.account_hashed_cursor,
);
let instrumented_storage_trie_cursor = InstrumentedTrieCursor::new(
storage_trie_cursor,
&mut cursor_metrics_cache.storage_trie_cursor,
);
let instrumented_storage_hashed_cursor = InstrumentedHashedCursor::new(
storage_hashed_cursor,
&mut cursor_metrics_cache.storage_hashed_cursor,
);

let mut v2_account_calculator =
proof_v2::ProofCalculator::<
_,
_,
AsyncAccountValueEncoder<
InstrumentedTrieCursor<
'_,
<Factory::Provider as TrieCursorFactory>::StorageTrieCursor<'_>,
>,
InstrumentedHashedCursor<
'_,
<Factory::Provider as HashedCursorFactory>::StorageCursor<'_>,
>,
>,
>::new(instrumented_account_trie_cursor, instrumented_account_hashed_cursor);
let v2_storage_calculator =
Rc::new(RefCell::new(proof_v2::StorageProofCalculator::new_storage(
storage_trie_cursor,
storage_hashed_cursor,
instrumented_storage_trie_cursor,
instrumented_storage_hashed_cursor,
)));

// Count this worker as available only after successful initialization.
Expand Down Expand Up @@ -1027,7 +1062,6 @@ where
v2_storage_calculator.clone(),
*input,
&mut account_proofs_processed,
&mut cursor_metrics_cache,
);
total_idle_time += value_encoder_stats.storage_wait_time;
value_encoder_stats_cache.extend(&value_encoder_stats);
Expand All @@ -1050,6 +1084,10 @@ where
idle_start = Instant::now();
}

// Drop calculators to release mutable borrows on cursor_metrics_cache.
drop(v2_account_calculator);
drop(v2_storage_calculator);

trace!(
target: "trie::proof_task",
worker_id=self.worker_id,
Expand Down Expand Up @@ -1119,12 +1157,10 @@ where
v2_storage_calculator: Rc<RefCell<V2StorageProofCalculator<'a, Provider>>>,
input: AccountMultiproofInput,
account_proofs_processed: &mut u64,
cursor_metrics_cache: &mut ProofTaskCursorMetricsCache,
) -> ValueEncoderStats
where
Provider: TrieCursorFactory + HashedCursorFactory + 'a,
{
let proof_cursor_metrics = ProofTaskCursorMetricsCache::default();
let proof_start = Instant::now();

let AccountMultiproofInput { targets, proof_result_sender } = input;
Expand Down Expand Up @@ -1154,28 +1190,14 @@ where
);
}

proof_cursor_metrics.record_spans();

trace!(
target: "trie::proof_task",
proof_time_us = proof_elapsed.as_micros(),
total_elapsed_us = total_elapsed.as_micros(),
total_processed = account_proofs_processed,
account_trie_cursor_duration_us = proof_cursor_metrics.account_trie_cursor.total_duration.as_micros(),
account_hashed_cursor_duration_us = proof_cursor_metrics.account_hashed_cursor.total_duration.as_micros(),
storage_trie_cursor_duration_us = proof_cursor_metrics.storage_trie_cursor.total_duration.as_micros(),
storage_hashed_cursor_duration_us = proof_cursor_metrics.storage_hashed_cursor.total_duration.as_micros(),
account_trie_cursor_metrics = ?proof_cursor_metrics.account_trie_cursor,
account_hashed_cursor_metrics = ?proof_cursor_metrics.account_hashed_cursor,
storage_trie_cursor_metrics = ?proof_cursor_metrics.storage_trie_cursor,
storage_hashed_cursor_metrics = ?proof_cursor_metrics.storage_hashed_cursor,
"Account multiproof completed"
);

#[cfg(feature = "metrics")]
// Accumulate per-proof metrics into the worker's cache
cursor_metrics_cache.extend(&proof_cursor_metrics);

value_encoder_stats
}

Expand Down
Loading