diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 0d45eda7fd6..c2328ead3ac 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -9,7 +9,7 @@ use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode, }; -use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; +use reth_prune::{PrunerError, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_tasks::spawn_os_thread; use std::{ @@ -74,18 +74,6 @@ where pending_safe_block: None, } } - - /// Prunes block data before the given block number according to the configured prune - /// configuration. - #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_num))] - fn prune_before(&mut self, block_num: u64) -> Result { - debug!(target: "engine::persistence", ?block_num, "Running pruner"); - let start_time = Instant::now(); - // TODO: doing this properly depends on pruner segment changes - let result = self.pruner.run(block_num); - self.metrics.prune_before_duration_seconds.record(start_time.elapsed()); - result - } } impl PersistenceService @@ -118,11 +106,6 @@ where let _ = self .sync_metrics_tx .send(MetricEvent::SyncHeight { height: block_number }); - - if self.pruner.is_pruning_needed(block_number) { - // We log `PrunerOutput` inside the `Pruner` - let _ = self.prune_before(block_number)?; - } } } PersistenceAction::SaveFinalizedBlock(finalized_block) => { @@ -171,11 +154,10 @@ where let start_time = Instant::now(); - if last_block.is_some() { + if let Some(last) = last_block { let provider_rw = self.provider.database_provider_rw()?; provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?; - // Commit pending finalized/safe block updates in the same transaction if let Some(finalized) = pending_finalized { provider_rw.save_finalized_block_number(finalized)?; } @@ -183,6 +165,15 @@ where provider_rw.save_safe_block_number(safe)?; } + if self.pruner.is_pruning_needed(last.number) { + debug!(target: "engine::persistence", block_num = last.number, "Running pruner in save_blocks transaction"); + let prune_start = Instant::now(); + if let Err(err) = self.pruner.run_with_provider(&provider_rw, last.number) { + error!(target: "engine::persistence", %err, block_num = last.number, "Pruning failed; will still commit persisted blocks"); + } + self.metrics.prune_before_duration_seconds.record(prune_start.elapsed()); + } + provider_rw.commit()?; } diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs index a6956c93f3c..f7679593c6e 100644 --- a/crates/engine/tree/src/tree/cached_state.rs +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -708,6 +708,7 @@ impl ExecutionCache { } self.account_cache.remove(addr); + self.account_stats.decrement_size(); continue } @@ -827,6 +828,11 @@ impl SavedCache { &self.metrics } + /// Returns whether cache metrics recording is disabled. + pub const fn disable_cache_metrics(&self) -> bool { + self.disable_cache_metrics + } + /// Updates the cache metrics (size/capacity/collisions) from the stats handlers. /// /// Note: This can be expensive with large cached state. Use diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index 4aa95c3c3b0..3629adb3065 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -239,37 +239,66 @@ where if let Some(saved_cache) = saved_cache { debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache"); - // Perform all cache operations atomically under the lock - execution_cache.update_with_guard(|cached| { - // consumes the `SavedCache` held by the prewarming task, which releases its usage - // guard - let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split(); - let new_cache = SavedCache::new(hash, caches, cache_metrics) - .with_disable_cache_metrics(disable_cache_metrics); - - // Insert state into cache while holding the lock - // Access the BundleState through the shared ExecutionOutcome - if new_cache.cache().insert_state(&execution_outcome.state).is_err() { - // Clear the cache on error to prevent having a polluted cache - *cached = None; - debug!(target: "engine::caching", "cleared execution cache on update error"); - return; - } - new_cache.update_metrics(); + // Get the parent hash we expect the cache to have for conditional swap + let expected_parent_hash = saved_cache.executed_block_hash(); + + // Clone internals WITHOUT dropping usage guard (keep saved_cache alive). + // This prevents another thread from clearing the cache we're still using. + let caches = saved_cache.cache().clone(); + let cache_metrics = saved_cache.metrics().clone(); + let disable_cache_metrics = saved_cache.disable_cache_metrics(); + + // Build new cache outside the lock + let new_cache = SavedCache::new(hash, caches, cache_metrics) + .with_disable_cache_metrics(disable_cache_metrics); + + // Insert state outside the lock + if new_cache.cache().insert_state(&execution_outcome.state).is_err() { + // Only clear if cache still matches our expected parent (prevents lost update) + execution_cache.update_with_guard(|cached| { + if cached + .as_ref() + .is_some_and(|c| c.executed_block_hash() == expected_parent_hash) + { + *cached = None; + } + }); + debug!(target: "engine::caching", "cleared execution cache on update error"); + return + } + + new_cache.update_metrics(); + + // Wait for block validity OUTSIDE the lock (this can block indefinitely) + let is_valid = valid_block_rx.recv().is_ok(); - if valid_block_rx.recv().is_ok() { - // Replace the shared cache with the new one; the previous cache (if any) is - // dropped. - *cached = Some(new_cache); + // Drop saved_cache now (releases usage guard) since we're done with it + drop(saved_cache); + + // Conditional swap: only update if cache still matches our expected parent. + // This prevents clobbering updates from another thread that ran while we blocked. + execution_cache.update_with_guard(|cached| { + if cached.as_ref().is_some_and(|c| c.executed_block_hash() == expected_parent_hash) + { + if is_valid { + *cached = Some(new_cache); + } else { + *cached = None; + } } else { - // Block was invalid; caches were already mutated by insert_state above, - // so we must clear to prevent using polluted state - *cached = None; - debug!(target: "engine::caching", "cleared execution cache on invalid block"); + debug!( + target: "engine::caching", + expected=?expected_parent_hash, + "Skipped cache save due to parent-hash mismatch" + ); } }); + if !is_valid { + debug!(target: "engine::caching", "cleared execution cache on invalid block"); + } + let elapsed = start.elapsed(); debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");