Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ where
{
let (to_sparse_trie, sparse_trie_rx) = channel();
// spawn multiproof task, save the trie input
let (trie_input, state_root_config) =
MultiProofConfig::new_from_input(consistent_view, trie_input);
let (trie_input, state_root_config) = MultiProofConfig::from_input(trie_input);
self.trie_input = Some(trie_input);

// Create and spawn the storage proof task
Expand All @@ -207,7 +206,7 @@ where
let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
let proof_task = match ProofTaskManager::new(
self.executor.handle().clone(),
state_root_config.consistent_view.clone(),
consistent_view,
task_ctx,
storage_worker_count,
account_worker_count,
Expand Down
115 changes: 43 additions & 72 deletions crates/engine/tree/src/tree/payload_processor/multiproof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use derive_more::derive::Deref;
use metrics::Histogram;
use reth_errors::ProviderError;
use reth_metrics::Metrics;
use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory};
use reth_revm::state::EvmState;
use reth_trie::{
added_removed_keys::MultiAddedRemovedKeys, prefix_set::TriePrefixSetsMut,
Expand Down Expand Up @@ -66,9 +65,7 @@ impl SparseTrieUpdate {

/// Common configuration for multi proof tasks
#[derive(Debug, Clone)]
pub(super) struct MultiProofConfig<Factory> {
/// View over the state in the database.
pub consistent_view: ConsistentDbView<Factory>,
pub(super) struct MultiProofConfig {
/// The sorted collection of cached in-memory intermediate trie nodes that
/// can be reused for computation.
pub nodes_sorted: Arc<TrieUpdatesSorted>,
Expand All @@ -80,17 +77,13 @@ pub(super) struct MultiProofConfig<Factory> {
pub prefix_sets: Arc<TriePrefixSetsMut>,
}

impl<Factory> MultiProofConfig<Factory> {
/// Creates a new state root config from the consistent view and the trie input.
impl MultiProofConfig {
/// Creates a new state root config from the trie input.
///
/// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the
/// [`TrieInput`].
Comment on lines +81 to 84
Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation comment is outdated. The method now extracts/splits configuration from input rather than creating 'from' input, and it returns both a cleared TrieInput and the config.

Suggested change
/// Creates a new state root config from the trie input.
///
/// This returns a cleared [`TrieInput`] so that we can reuse any allocated space in the
/// [`TrieInput`].
/// Extracts configuration from the given [`TrieInput`], splitting out the config and returning
/// both a cleared [`TrieInput`] (for reuse of allocated space) and the extracted [`MultiProofConfig`].
///
/// This allows for efficient reuse of the input's allocated space and separates the configuration
/// needed for multiproof computation.

Copilot uses AI. Check for mistakes.
pub(super) fn new_from_input(
consistent_view: ConsistentDbView<Factory>,
mut input: TrieInput,
) -> (TrieInput, Self) {
pub(super) fn from_input(mut input: TrieInput) -> (TrieInput, Self) {
let config = Self {
consistent_view,
nodes_sorted: Arc::new(input.nodes.drain_into_sorted()),
state_sorted: Arc::new(input.state.drain_into_sorted()),
prefix_sets: Arc::new(input.prefix_sets.clone()),
Expand Down Expand Up @@ -249,14 +242,14 @@ pub(crate) fn evm_state_to_hashed_post_state(update: EvmState) -> HashedPostStat

/// A pending multiproof task, either [`StorageMultiproofInput`] or [`MultiproofInput`].
#[derive(Debug)]
enum PendingMultiproofTask<Factory> {
enum PendingMultiproofTask {
/// A storage multiproof task input.
Storage(StorageMultiproofInput<Factory>),
Storage(StorageMultiproofInput),
/// A regular multiproof task input.
Regular(MultiproofInput<Factory>),
Regular(MultiproofInput),
}

impl<Factory> PendingMultiproofTask<Factory> {
impl PendingMultiproofTask {
/// Returns the proof sequence number of the task.
const fn proof_sequence_number(&self) -> u64 {
match self {
Expand All @@ -282,22 +275,22 @@ impl<Factory> PendingMultiproofTask<Factory> {
}
}

impl<Factory> From<StorageMultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
fn from(input: StorageMultiproofInput<Factory>) -> Self {
impl From<StorageMultiproofInput> for PendingMultiproofTask {
fn from(input: StorageMultiproofInput) -> Self {
Self::Storage(input)
}
}

impl<Factory> From<MultiproofInput<Factory>> for PendingMultiproofTask<Factory> {
fn from(input: MultiproofInput<Factory>) -> Self {
impl From<MultiproofInput> for PendingMultiproofTask {
fn from(input: MultiproofInput) -> Self {
Self::Regular(input)
}
}

/// Input parameters for spawning a dedicated storage multiproof calculation.
#[derive(Debug)]
struct StorageMultiproofInput<Factory> {
config: MultiProofConfig<Factory>,
struct StorageMultiproofInput {
config: MultiProofConfig,
source: Option<StateChangeSource>,
hashed_state_update: HashedPostState,
hashed_address: B256,
Expand All @@ -307,7 +300,7 @@ struct StorageMultiproofInput<Factory> {
multi_added_removed_keys: Arc<MultiAddedRemovedKeys>,
}

impl<Factory> StorageMultiproofInput<Factory> {
impl StorageMultiproofInput {
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
Expand All @@ -319,8 +312,8 @@ impl<Factory> StorageMultiproofInput<Factory> {

/// Input parameters for spawning a multiproof calculation.
#[derive(Debug)]
struct MultiproofInput<Factory> {
config: MultiProofConfig<Factory>,
struct MultiproofInput {
config: MultiProofConfig,
source: Option<StateChangeSource>,
hashed_state_update: HashedPostState,
proof_targets: MultiProofTargets,
Expand All @@ -329,7 +322,7 @@ struct MultiproofInput<Factory> {
multi_added_removed_keys: Option<Arc<MultiAddedRemovedKeys>>,
}

impl<Factory> MultiproofInput<Factory> {
impl MultiproofInput {
/// Destroys the input and sends a [`MultiProofMessage::EmptyProof`] message to the sender.
fn send_empty_proof(self) {
let _ = self.state_root_message_sender.send(MultiProofMessage::EmptyProof {
Expand All @@ -344,13 +337,13 @@ impl<Factory> MultiproofInput<Factory> {
/// concurrency, further calculation requests are queued and spawn later, after
/// availability has been signaled.
#[derive(Debug)]
pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
pub struct MultiproofManager {
/// Maximum number of concurrent calculations.
max_concurrent: usize,
/// Currently running calculations.
inflight: usize,
/// Queued calculations.
pending: VecDeque<PendingMultiproofTask<Factory>>,
pending: VecDeque<PendingMultiproofTask>,
/// Executor for tasks
executor: WorkloadExecutor,
/// Handle to the proof task manager used for creating `ParallelProof` instances for storage
Expand All @@ -374,10 +367,7 @@ pub struct MultiproofManager<Factory: DatabaseProviderFactory> {
metrics: MultiProofTaskMetrics,
}

impl<Factory> MultiproofManager<Factory>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
impl MultiproofManager {
/// Creates a new [`MultiproofManager`].
fn new(
executor: WorkloadExecutor,
Expand All @@ -404,7 +394,7 @@ where

/// Spawns a new multiproof calculation or enqueues it for later if
/// `max_concurrent` are already inflight.
fn spawn_or_queue(&mut self, input: PendingMultiproofTask<Factory>) {
fn spawn_or_queue(&mut self, input: PendingMultiproofTask) {
// If there are no proof targets, we can just send an empty multiproof back immediately
if input.proof_targets_is_empty() {
debug!(
Expand Down Expand Up @@ -438,7 +428,7 @@ where

/// Spawns a multiproof task, dispatching to `spawn_storage_proof` if the input is a storage
/// multiproof, and dispatching to `spawn_multiproof` otherwise.
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask<Factory>) {
fn spawn_multiproof_task(&mut self, input: PendingMultiproofTask) {
match input {
PendingMultiproofTask::Storage(storage_input) => {
self.spawn_storage_proof(storage_input);
Expand All @@ -450,7 +440,7 @@ where
}

/// Spawns a single storage proof calculation task.
fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput<Factory>) {
fn spawn_storage_proof(&mut self, storage_multiproof_input: StorageMultiproofInput) {
let StorageMultiproofInput {
config,
source,
Expand All @@ -476,7 +466,7 @@ where
"Starting dedicated storage proof calculation",
);
let start = Instant::now();
let proof_result = ParallelProof::<Factory>::new(
let proof_result = ParallelProof::new(
config.nodes_sorted,
config.state_sorted,
config.prefix_sets,
Expand Down Expand Up @@ -524,7 +514,7 @@ where
}

/// Spawns a single multiproof calculation task.
fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput<Factory>) {
fn spawn_multiproof(&mut self, multiproof_input: MultiproofInput) {
let MultiproofInput {
config,
source,
Expand Down Expand Up @@ -554,10 +544,8 @@ where
let start = Instant::now();

// Extend prefix sets with targets
let frozen_prefix_sets = ParallelProof::<Factory>::extend_prefix_sets_with_targets(
&config.prefix_sets,
&proof_targets,
);
let frozen_prefix_sets =
ParallelProof::extend_prefix_sets_with_targets(&config.prefix_sets, &proof_targets);

// Queue account multiproof to worker pool
let input = AccountMultiproofInput {
Expand Down Expand Up @@ -675,13 +663,13 @@ pub(crate) struct MultiProofTaskMetrics {
/// Then it updates relevant leaves according to the result of the transaction.
/// This feeds updates to the sparse trie task.
#[derive(Debug)]
pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
pub(super) struct MultiProofTask {
/// The size of proof targets chunk to spawn in one calculation.
///
/// If [`None`], then chunking is disabled.
chunk_size: Option<usize>,
/// Task configuration.
config: MultiProofConfig<Factory>,
config: MultiProofConfig,
/// Receiver for state root related messages.
rx: Receiver<MultiProofMessage>,
/// Sender for state root related messages.
Expand All @@ -695,18 +683,15 @@ pub(super) struct MultiProofTask<Factory: DatabaseProviderFactory> {
/// Proof sequencing handler.
proof_sequencer: ProofSequencer,
/// Manages calculation of multiproofs.
multiproof_manager: MultiproofManager<Factory>,
multiproof_manager: MultiproofManager,
/// multi proof task metrics
metrics: MultiProofTaskMetrics,
}

impl<Factory> MultiProofTask<Factory>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
impl MultiProofTask {
/// Creates a new multi proof task with the unified message channel
pub(super) fn new(
config: MultiProofConfig<Factory>,
config: MultiProofConfig,
executor: WorkloadExecutor,
proof_task_handle: ProofTaskManagerHandle,
to_sparse_trie: Sender<SparseTrieUpdate>,
Expand Down Expand Up @@ -1233,43 +1218,29 @@ fn get_proof_targets(
mod tests {
use super::*;
use alloy_primitives::map::B256Set;
use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory};
use reth_provider::{
providers::ConsistentDbView, test_utils::create_test_provider_factory, BlockReader,
DatabaseProviderFactory,
};
use reth_trie::{MultiProof, TrieInput};
use reth_trie_parallel::proof_task::{ProofTaskCtx, ProofTaskManager};
use revm_primitives::{B256, U256};
use std::sync::Arc;

fn create_state_root_config<F>(factory: F, input: TrieInput) -> MultiProofConfig<F>
where
F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
let consistent_view = ConsistentDbView::new(factory, None);
let nodes_sorted = Arc::new(input.nodes.clone().into_sorted());
let state_sorted = Arc::new(input.state.clone().into_sorted());
let prefix_sets = Arc::new(input.prefix_sets);

MultiProofConfig { consistent_view, nodes_sorted, state_sorted, prefix_sets }
}

fn create_test_state_root_task<F>(factory: F) -> MultiProofTask<F>
fn create_test_state_root_task<F>(factory: F) -> MultiProofTask
where
F: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
let executor = WorkloadExecutor::default();
let config = create_state_root_config(factory, TrieInput::default());
let (_trie_input, config) = MultiProofConfig::from_input(TrieInput::default());
let task_ctx = ProofTaskCtx::new(
config.nodes_sorted.clone(),
config.state_sorted.clone(),
config.prefix_sets.clone(),
);
let proof_task = ProofTaskManager::new(
executor.handle().clone(),
config.consistent_view.clone(),
task_ctx,
1,
1,
)
.expect("Failed to create ProofTaskManager");
let consistent_view = ConsistentDbView::new(factory, None);
let proof_task =
ProofTaskManager::new(executor.handle().clone(), consistent_view, task_ctx, 1, 1)
.expect("Failed to create ProofTaskManager");
let channel = channel();

MultiProofTask::new(config, executor, proof_task.handle(), channel.0, 1, None)
Expand Down
22 changes: 4 additions & 18 deletions crates/trie/parallel/src/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
use alloy_primitives::{map::B256Set, B256};
use dashmap::DashMap;
use reth_execution_errors::StorageRootError;
use reth_provider::{BlockReader, DatabaseProviderFactory};
use reth_storage_errors::db::DatabaseError;
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets, TriePrefixSetsMut},
Expand All @@ -28,7 +27,7 @@ use tracing::trace;
/// This can collect proof for many targets in parallel, spawning a task for each hashed address
/// that has proof targets.
#[derive(Debug)]
pub struct ParallelProof<Factory: DatabaseProviderFactory> {
pub struct ParallelProof {
/// The sorted collection of cached in-memory intermediate trie nodes that
/// can be reused for computation.
pub nodes_sorted: Arc<TrieUpdatesSorted>,
Expand All @@ -47,14 +46,11 @@ pub struct ParallelProof<Factory: DatabaseProviderFactory> {
/// Cached storage proof roots for missed leaves; this maps
/// hashed (missed) addresses to their storage proof roots.
missed_leaves_storage_roots: Arc<DashMap<B256, B256>>,
/// Marker to keep the Factory type parameter.
/// TODO: Remove this field if the Factory generic is not needed in the future.
_phantom: std::marker::PhantomData<Factory>,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics,
}

impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
impl ParallelProof {
/// Create new state proof generator.
pub fn new(
nodes_sorted: Arc<TrieUpdatesSorted>,
Expand All @@ -71,7 +67,6 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
collect_branch_node_masks: false,
multi_added_removed_keys: None,
proof_task_handle,
_phantom: std::marker::PhantomData,
#[cfg(feature = "metrics")]
metrics: ParallelTrieMetrics::new_with_labels(&[("type", "proof")]),
}
Expand All @@ -92,12 +87,6 @@ impl<Factory: DatabaseProviderFactory> ParallelProof<Factory> {
self.multi_added_removed_keys = multi_added_removed_keys;
self
}
}

impl<Factory> ParallelProof<Factory>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + 'static,
{
/// Queues a storage proof task and returns a receiver for the result.
fn queue_storage_proof(
&self,
Expand Down Expand Up @@ -251,9 +240,7 @@ mod tests {
use rand::Rng;
use reth_primitives_traits::{Account, StorageEntry};
use reth_provider::{
providers::ConsistentDbView,
test_utils::{create_test_provider_factory, MockNodeTypesWithDB},
HashingWriter, ProviderFactory,
providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter,
};
use reth_trie::proof::Proof;
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
Expand Down Expand Up @@ -334,8 +321,7 @@ mod tests {
// after we compute the state root
let join_handle = rt.spawn_blocking(move || proof_task.run());

type Factory = ProviderFactory<MockNodeTypesWithDB>;
let parallel_result = ParallelProof::<Factory>::new(
let parallel_result = ParallelProof::new(
Default::default(),
Default::default(),
Default::default(),
Expand Down
Loading