From c34dbd62d97ae5a1dfe5f8074c78541c65912717 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 03:44:48 +0000 Subject: [PATCH 1/4] perf(tree): avoid blocking recv() under PayloadExecutionCache write lock Previously, `PrewarmCacheTask::save_cache` called `valid_block_rx.recv()` inside `PayloadExecutionCache::update_with_guard`, which holds a write lock. This blocking call could stall all readers/writers of the execution cache indefinitely, causing system-wide contention (observed via 5ms lock contention warnings in `get_cache_for()`). This change: 1. Moves all cache building operations (clone, insert_state, update_metrics) outside the write lock 2. Moves the blocking `recv()` call outside the lock 3. Adds conditional swap with parent hash check to prevent race conditions 4. Keeps the usage guard alive until after recv() to prevent another thread from clearing the cache we're still using The lock is now only held for the minimal final assignment. Amp-Thread-ID: https://ampcode.com/threads/T-019c2db8-0945-765a-979e-99d922bd4791 --- crates/engine/tree/src/tree/cached_state.rs | 5 ++ .../src/tree/payload_processor/prewarm.rs | 77 ++++++++++++------- 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index a6956c93f3c..02715060dd4 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -827,6 +827,11 @@ impl SavedCache { &self.metrics } + /// Returns whether cache metrics recording is disabled. + pub const fn disable_cache_metrics(&self) -> bool { + self.disable_cache_metrics + } + /// Updates the cache metrics (size/capacity/collisions) from the stats handlers. /// /// Note: This can be expensive with large cached state. Use diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 4aa95c3c3b0..55e2b0da47b 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -239,37 +239,60 @@ where if let Some(saved_cache) = saved_cache { debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache"); - // Perform all cache operations atomically under the lock - execution_cache.update_with_guard(|cached| { - // consumes the `SavedCache` held by the prewarming task, which releases its usage - // guard - let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split(); - let new_cache = SavedCache::new(hash, caches, cache_metrics) - .with_disable_cache_metrics(disable_cache_metrics); - - // Insert state into cache while holding the lock - // Access the BundleState through the shared ExecutionOutcome - if new_cache.cache().insert_state(&execution_outcome.state).is_err() { - // Clear the cache on error to prevent having a polluted cache - *cached = None; - debug!(target: "engine::caching", "cleared execution cache on update error"); - return; - } - new_cache.update_metrics(); - - if valid_block_rx.recv().is_ok() { - // Replace the shared cache with the new one; the previous cache (if any) is - // dropped. - *cached = Some(new_cache); - } else { - // Block was invalid; caches were already mutated by insert_state above, - // so we must clear to prevent using polluted state - *cached = None; - debug!(target: "engine::caching", "cleared execution cache on invalid block"); + // Get the parent hash we expect the cache to have for conditional swap + let expected_parent_hash = saved_cache.executed_block_hash(); + + // Clone internals WITHOUT dropping usage guard (keep saved_cache alive). + // This prevents another thread from clearing the cache we're still using. + let caches = saved_cache.cache().clone(); + let cache_metrics = saved_cache.metrics().clone(); + let disable_cache_metrics = saved_cache.disable_cache_metrics(); + + // Build new cache outside the lock + let new_cache = SavedCache::new(hash, caches, cache_metrics) + .with_disable_cache_metrics(disable_cache_metrics); + + // Insert state outside the lock + if new_cache.cache().insert_state(&execution_outcome.state).is_err() { + // Only clear if cache still matches our expected parent (prevents lost update) + execution_cache.update_with_guard(|cached| { + if cached + .as_ref() + .is_some_and(|c| c.executed_block_hash() == expected_parent_hash) + { + *cached = None; + } + }); + debug!(target: "engine::caching", "cleared execution cache on update error"); + return + } + + new_cache.update_metrics(); + + // Wait for block validity OUTSIDE the lock (this can block indefinitely) + let is_valid = valid_block_rx.recv().is_ok(); + + // Drop saved_cache now (releases usage guard) since we're done with it + drop(saved_cache); + + // Conditional swap: only update if cache still matches our expected parent. + // This prevents clobbering updates from another thread that ran while we blocked. + execution_cache.update_with_guard(|cached| { + if cached.as_ref().is_some_and(|c| c.executed_block_hash() == expected_parent_hash) + { + if is_valid { + *cached = Some(new_cache); + } else { + *cached = None; + } } }); + if !is_valid { + debug!(target: "engine::caching", "cleared execution cache on invalid block"); + } + let elapsed = start.elapsed(); debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache"); From 403fd832b887197c49f1423eea4c07b2f97dfa61 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 04:27:15 +0000 Subject: [PATCH 2/4] fix: restore decrement_size and add debug log for race skip Amp-Thread-ID: https://ampcode.com/threads/T-019c312b-9a38-751c-a28d-6f759815d037 --- crates/engine/tree/src/tree/cached_state.rs | 1 + crates/engine/tree/src/tree/payload_processor/prewarm.rs | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index 02715060dd4..f7679593c6e 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -708,6 +708,7 @@ impl ExecutionCache { } self.account_cache.remove(addr); + self.account_stats.decrement_size(); continue } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 55e2b0da47b..3629adb3065 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -286,6 +286,12 @@ where } else { *cached = None; } + } else { + debug!( + target: "engine::caching", + expected=?expected_parent_hash, + "Skipped cache save due to parent-hash mismatch" + ); } }); From 57824d3266768ff8857d109aa2f90c9c3c5635ca Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 06:00:36 +0000 Subject: [PATCH 3/4] perf(engine): merge pruning into save_blocks write transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, after on_save_blocks committed blocks (fsync #1), the persistence thread ran pruning in a separate MDBX write transaction with its own commit (fsync #2). During this entire pruning pass, the persistence thread could not process new requests. Merge pruning into the same write transaction as save_blocks by calling Pruner::run_with_provider() with the existing provider_rw before commit. This eliminates the second fsync entirely — one write transaction, one commit, one fsync per cycle. Prune errors are caught and logged but do not prevent block persistence. This preserves the existing guarantee that blocks are always committed regardless of prune outcome. Based on bench metrics (rf7d8): save p50=305ms, prune p50=128ms firing every other save. Prune accounts for 14.9% of total persistence wall time (53s / 356s). This change eliminates ~128ms of redundant fsync latency on every prune cycle. Amp-Thread-ID: https://ampcode.com/threads/T-019c3183-3b50-7379-8a4b-42f7a68aac22 --- crates/engine/tree/src/persistence.rs | 31 +++++++++++---------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 0d45eda7fd6..c7fe0daefa5 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -9,7 +9,7 @@ use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode, }; -use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; +use reth_prune::{PrunerError, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_tasks::spawn_os_thread; use std::{ @@ -74,18 +74,6 @@ where pending_safe_block: None, } } - - /// Prunes block data before the given block number according to the configured prune - /// configuration. - #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_num))] - fn prune_before(&mut self, block_num: u64) -> Result { - debug!(target: "engine::persistence", ?block_num, "Running pruner"); - let start_time = Instant::now(); - // TODO: doing this properly depends on pruner segment changes - let result = self.pruner.run(block_num); - self.metrics.prune_before_duration_seconds.record(start_time.elapsed()); - result - } } impl PersistenceService @@ -118,11 +106,6 @@ where let _ = self .sync_metrics_tx .send(MetricEvent::SyncHeight { height: block_number }); - - if self.pruner.is_pruning_needed(block_number) { - // We log `PrunerOutput` inside the `Pruner` - let _ = self.prune_before(block_number)?; - } } } PersistenceAction::SaveFinalizedBlock(finalized_block) => { @@ -183,6 +166,18 @@ where provider_rw.save_safe_block_number(safe)?; } + // Run pruning in the same write transaction to avoid a second fsync + if let Some(last) = last_block { + if self.pruner.is_pruning_needed(last.number) { + debug!(target: "engine::persistence", block_num = last.number, "Running pruner in save_blocks transaction"); + let prune_start = Instant::now(); + if let Err(err) = self.pruner.run_with_provider(&provider_rw, last.number) { + error!(target: "engine::persistence", %err, block_num = last.number, "Pruning failed; will still commit persisted blocks"); + } + self.metrics.prune_before_duration_seconds.record(prune_start.elapsed()); + } + } + provider_rw.commit()?; } From e7e7d011b7b644cb1209a9417cc565ee3cf56327 Mon Sep 17 00:00:00 2001 From: yongkangc Date: Fri, 6 Feb 2026 07:35:11 +0000 Subject: [PATCH 4/4] fix: resolve clippy collapsible-if by using if-let for outer guard Amp-Thread-ID: https://ampcode.com/threads/T-019c3183-3b50-7379-8a4b-42f7a68aac22 --- crates/engine/tree/src/persistence.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index c7fe0daefa5..c2328ead3ac 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -154,11 +154,10 @@ where let start_time = Instant::now(); - if last_block.is_some() { + if let Some(last) = last_block { let provider_rw = self.provider.database_provider_rw()?; provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?; - // Commit pending finalized/safe block updates in the same transaction if let Some(finalized) = pending_finalized { provider_rw.save_finalized_block_number(finalized)?; } @@ -166,16 +165,13 @@ where provider_rw.save_safe_block_number(safe)?; } - // Run pruning in the same write transaction to avoid a second fsync - if let Some(last) = last_block { - if self.pruner.is_pruning_needed(last.number) { - debug!(target: "engine::persistence", block_num = last.number, "Running pruner in save_blocks transaction"); - let prune_start = Instant::now(); - if let Err(err) = self.pruner.run_with_provider(&provider_rw, last.number) { - error!(target: "engine::persistence", %err, block_num = last.number, "Pruning failed; will still commit persisted blocks"); - } - self.metrics.prune_before_duration_seconds.record(prune_start.elapsed()); + if self.pruner.is_pruning_needed(last.number) { + debug!(target: "engine::persistence", block_num = last.number, "Running pruner in save_blocks transaction"); + let prune_start = Instant::now(); + if let Err(err) = self.pruner.run_with_provider(&provider_rw, last.number) { + error!(target: "engine::persistence", %err, block_num = last.number, "Pruning failed; will still commit persisted blocks"); } + self.metrics.prune_before_duration_seconds.record(prune_start.elapsed()); } provider_rw.commit()?;