Skip to content
6 changes: 6 additions & 0 deletions .changelog/cool-hens-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
reth-engine-tree: minor
reth-node-builder: minor
---

Added `EngineSharedCaches` struct that bundles `PayloadExecutionCache`, `SharedPreservedSparseTrie`, and `PrecompileCacheMap` into a single launcher-owned handle. Updated `PayloadProcessor::new()` and `EngineValidatorBuilder::build_tree_validator` to accept `EngineSharedCaches`.
49 changes: 39 additions & 10 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ pub mod prewarm;
pub mod receipt_root_task;
pub mod sparse_trie;

use preserved_sparse_trie::{PreservedSparseTrie, SharedPreservedSparseTrie};
use preserved_sparse_trie::PreservedSparseTrie;
pub use preserved_sparse_trie::SharedPreservedSparseTrie;

/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
Expand Down Expand Up @@ -154,24 +155,26 @@ where
&self.executor
}

/// Creates a new payload processor.
/// Creates a new payload processor from the launcher-owned [`EngineSharedCaches`].
pub fn new(
executor: Runtime,
evm_config: Evm,
config: &TreeConfig,
precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
shared_caches: EngineSharedCaches<Evm>,
) -> Self {
let EngineSharedCaches { execution_cache, sparse_trie_cache, precompile_cache_map } =
shared_caches;
Self {
executor,
execution_cache: Default::default(),
execution_cache,
trie_metrics: Default::default(),
cross_block_cache_size: config.cross_block_cache_size(),
disable_transaction_prewarming: config.disable_prewarming(),
evm_config,
disable_state_cache: config.disable_state_cache(),
precompile_cache_disabled: config.precompile_cache_disabled(),
precompile_cache_map,
sparse_state_trie: SharedPreservedSparseTrie::default(),
sparse_state_trie: sparse_trie_cache,
sparse_trie_max_hot_slots: config.sparse_trie_max_hot_slots(),
sparse_trie_max_hot_accounts: config.sparse_trie_max_hot_accounts(),
disable_sparse_trie_cache_pruning: config.disable_sparse_trie_cache_pruning(),
Expand Down Expand Up @@ -744,6 +747,31 @@ where
}
}

/// Launcher-owned cache bundle shared across engine payload processing.
///
/// Created once by the node launcher and threaded through the engine validator build path.
/// Destructured by [`PayloadProcessor::new()`] into private fields.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct EngineSharedCaches<Evm: ConfigureEvm> {
/// Execution cache handle.
pub execution_cache: PayloadExecutionCache,
/// Sparse trie cache handle.
pub sparse_trie_cache: SharedPreservedSparseTrie,
/// Precompile cache map.
pub precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
}

impl<Evm: ConfigureEvm> Default for EngineSharedCaches<Evm> {
fn default() -> Self {
Self {
execution_cache: PayloadExecutionCache::default(),
sparse_trie_cache: SharedPreservedSparseTrie::default(),
precompile_cache_map: PrecompileCacheMap::default(),
}
}
}

/// Converts transactions sequentially and sends them to the prewarm and execute channels.
fn convert_serial<RawTx, Tx, TxEnv, InnerTx, Recovered, Err, C>(
iter: impl Iterator<Item = RawTx>,
Expand Down Expand Up @@ -1175,8 +1203,9 @@ mod tests {
use super::PayloadExecutionCache;
use crate::tree::{
cached_state::{CachedStateMetrics, ExecutionCache, SavedCache},
payload_processor::{evm_state_to_hashed_post_state, ExecutionEnv, PayloadProcessor},
precompile_cache::PrecompileCacheMap,
payload_processor::{
evm_state_to_hashed_post_state, EngineSharedCaches, ExecutionEnv, PayloadProcessor,
},
StateProviderBuilder, TreeConfig,
};
use alloy_eips::eip1898::{BlockNumHash, BlockWithParent};
Expand Down Expand Up @@ -1288,7 +1317,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
EngineSharedCaches::default(),
);

let parent_hash = B256::from([1u8; 32]);
Expand Down Expand Up @@ -1317,7 +1346,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(Arc::new(ChainSpec::default())),
&TreeConfig::default(),
PrecompileCacheMap::default(),
EngineSharedCaches::default(),
);

// Setup: populate cache with block 1
Expand Down Expand Up @@ -1452,7 +1481,7 @@ mod tests {
reth_tasks::Runtime::test(),
EthEvmConfig::new(factory.chain_spec()),
&TreeConfig::default(),
PrecompileCacheMap::default(),
EngineSharedCaches::default(),
);

let provider_factory = BlockchainProvider::new(factory).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ use tracing::debug;
/// Type alias for the sparse trie type used in preservation.
pub(super) type SparseTrie = SparseStateTrie<ConfigurableSparseTrie, ConfigurableSparseTrie>;

/// Shared handle to a preserved sparse trie that can be reused across payload validations.
/// Shared handle to the preserved sparse trie cache.
///
/// This is stored in [`PayloadProcessor`](super::PayloadProcessor) and cloned to pass to
/// [`SparseTrieCacheTask`](super::sparse_trie::SparseTrieCacheTask) for trie reuse.
/// Lightweight, cloneable handle backed by `Arc<Mutex<..>>`. Stored in
/// [`EngineSharedCaches`](super::EngineSharedCaches) and used directly by
/// [`PayloadProcessor`](super::PayloadProcessor).
///
/// All mutating methods are crate-internal; external consumers only hold the handle
/// for threading it through the builder path.
#[derive(Debug, Default, Clone)]
pub(super) struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);
pub struct SharedPreservedSparseTrie(Arc<Mutex<Option<PreservedSparseTrie>>>);

impl SharedPreservedSparseTrie {
/// Takes the preserved trie if present, leaving `None` in its place.
Expand All @@ -31,11 +35,8 @@ impl SharedPreservedSparseTrie {

/// Waits until the sparse trie lock becomes available.
///
/// This acquires and immediately releases the lock, ensuring that any
/// ongoing operations complete before returning. Useful for synchronization
/// before starting payload processing.
///
/// Returns the time spent waiting for the lock.
/// Acquires and immediately releases the lock, ensuring any ongoing operations
/// complete before returning. Returns the time spent waiting.
pub(super) fn wait_for_availability(&self) -> std::time::Duration {
let start = Instant::now();
let _guard = self.0.lock();
Expand Down
13 changes: 5 additions & 8 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::tree::{
cached_state::{CacheStats, CachedStateProvider},
error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
instrumented_state::{InstrumentedStateProvider, StateProviderStats},
payload_processor::PayloadProcessor,
payload_processor::{EngineSharedCaches, PayloadProcessor},
precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
sparse_trie::StateRootComputeOutcome,
CacheWaitDurations, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
Expand Down Expand Up @@ -192,14 +192,11 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
changeset_cache: ChangesetCache,
runtime: reth_tasks::Runtime,
shared_caches: EngineSharedCaches<Evm>,
) -> Self {
let precompile_cache_map = PrecompileCacheMap::default();
let payload_processor = PayloadProcessor::new(
runtime.clone(),
evm_config.clone(),
&config,
precompile_cache_map.clone(),
);
let precompile_cache_map = shared_caches.precompile_cache_map.clone();
let payload_processor =
PayloadProcessor::new(runtime.clone(), evm_config.clone(), &config, shared_caches);
Self {
provider,
consensus,
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/tree/src/tree/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ impl TestHarness {
Box::new(NoopInvalidBlockHook::default()),
changeset_cache.clone(),
reth_tasks::Runtime::test(),
EngineSharedCaches::default(),
);

let tree = EngineApiTreeHandler::new(
Expand Down Expand Up @@ -409,6 +410,7 @@ impl ValidatorTestHarness {
Box::new(NoopInvalidBlockHook::default()),
changeset_cache,
reth_tasks::Runtime::test(),
EngineSharedCaches::default(),
);

Self { harness, validator, metrics: TestMetrics::default() }
Expand Down
25 changes: 20 additions & 5 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
rpc::{
EngineSharedCaches, EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder,
RethRpcAddOns, RpcHandle,
},
setup::build_networked_pipeline,
AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
Expand Down Expand Up @@ -191,10 +194,16 @@ impl EngineNodeLauncher {
};
let validator_builder = add_ons.engine_validator_builder();

// Build the engine validator with all required components
let shared_caches = EngineSharedCaches::default();

let engine_validator = validator_builder
.clone()
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), changeset_cache.clone())
.build_tree_validator(
&add_ons_ctx,
engine_tree_config.clone(),
changeset_cache.clone(),
shared_caches,
)
.await?;

// Create the consensus engine stream with optional reorg
Expand All @@ -205,10 +214,16 @@ impl EngineNodeLauncher {
ctx.blockchain_db().clone(),
ctx.components().evm_config().clone(),
|| async {
// Create a separate cache for reorg validator (not shared with main engine)
// Create separate caches for reorg validator (not shared with main engine)
let reorg_cache = ChangesetCache::new();
let reorg_shared_caches = EngineSharedCaches::default();
validator_builder
.build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
.build_tree_validator(
&add_ons_ctx,
engine_tree_config.clone(),
reorg_cache,
reorg_shared_caches,
)
.await
},
node_config.debug.reorg_frequency,
Expand Down
7 changes: 4 additions & 3 deletions crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

pub use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceBuilder};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineSharedCaches, EngineValidator};
pub use reth_rpc_builder::{middleware::RethRpcMiddleware, Identity, Stack};
pub use reth_trie_db::ChangesetCache;

Expand Down Expand Up @@ -1286,13 +1286,12 @@ pub trait EngineValidatorBuilder<Node: FullNodeComponents>: Send + Sync + Clone
+ WaitForCaches;

/// Builds the tree validator for the consensus engine.
///
/// Returns a validator that handles block execution, state validation, and fork handling.
fn build_tree_validator(
self,
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> impl Future<Output = eyre::Result<Self::EngineValidator>> + Send;
}

Expand Down Expand Up @@ -1341,6 +1340,7 @@ where
ctx: &AddOnsContext<'_, Node>,
tree_config: TreeConfig,
changeset_cache: ChangesetCache,
shared_caches: EngineSharedCaches<Node::Evm>,
) -> eyre::Result<Self::EngineValidator> {
let validator = self.payload_validator_builder.build(ctx).await?;
let data_dir = ctx.config.datadir.clone().resolve_datadir(ctx.config.chain.chain());
Expand All @@ -1355,6 +1355,7 @@ where
invalid_block_hook,
changeset_cache,
ctx.node.task_executor().clone(),
shared_caches,
))
}
}
Expand Down
Loading