Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_prune::{PrunerError, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_tasks::spawn_os_thread;
use std::{
Expand Down Expand Up @@ -74,18 +74,6 @@ where
pending_safe_block: None,
}
}

/// Prunes block data before the given block number according to the configured prune
/// configuration.
#[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_num))]
fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
debug!(target: "engine::persistence", ?block_num, "Running pruner");
let start_time = Instant::now();
// TODO: doing this properly depends on pruner segment changes
let result = self.pruner.run(block_num);
self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
result
}
Comment on lines -77 to -88
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would like to keep a helper method for this if possible and the instrumentation

}

impl<N> PersistenceService<N>
Expand Down Expand Up @@ -118,11 +106,6 @@ where
let _ = self
.sync_metrics_tx
.send(MetricEvent::SyncHeight { height: block_number });

if self.pruner.is_pruning_needed(block_number) {
// We log `PrunerOutput` inside the `Pruner`
let _ = self.prune_before(block_number)?;
}
}
}
PersistenceAction::SaveFinalizedBlock(finalized_block) => {
Expand Down Expand Up @@ -171,18 +154,26 @@ where

let start_time = Instant::now();

if last_block.is_some() {
if let Some(last) = last_block {
let provider_rw = self.provider.database_provider_rw()?;
provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;

// Commit pending finalized/safe block updates in the same transaction
if let Some(finalized) = pending_finalized {
provider_rw.save_finalized_block_number(finalized)?;
}
if let Some(safe) = pending_safe {
provider_rw.save_safe_block_number(safe)?;
}

if self.pruner.is_pruning_needed(last.number) {
debug!(target: "engine::persistence", block_num = last.number, "Running pruner in save_blocks transaction");
let prune_start = Instant::now();
if let Err(err) = self.pruner.run_with_provider(&provider_rw, last.number) {
error!(target: "engine::persistence", %err, block_num = last.number, "Pruning failed; will still commit persisted blocks");
}
self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
}

provider_rw.commit()?;
}

Expand Down
6 changes: 6 additions & 0 deletions crates/engine/tree/src/tree/cached_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ impl ExecutionCache {
}

self.account_cache.remove(addr);
self.account_stats.decrement_size();
continue
}

Expand Down Expand Up @@ -827,6 +828,11 @@ impl SavedCache {
&self.metrics
}

/// Returns whether cache metrics recording is disabled.
pub const fn disable_cache_metrics(&self) -> bool {
self.disable_cache_metrics
}

/// Updates the cache metrics (size/capacity/collisions) from the stats handlers.
///
/// Note: This can be expensive with large cached state. Use
Expand Down
79 changes: 54 additions & 25 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,37 +239,66 @@ where

if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);

// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}

new_cache.update_metrics();
// Get the parent hash we expect the cache to have for conditional swap
let expected_parent_hash = saved_cache.executed_block_hash();

// Clone internals WITHOUT dropping usage guard (keep saved_cache alive).
// This prevents another thread from clearing the cache we're still using.
let caches = saved_cache.cache().clone();
let cache_metrics = saved_cache.metrics().clone();
let disable_cache_metrics = saved_cache.disable_cache_metrics();

// Build new cache outside the lock
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);

// Insert state outside the lock
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Only clear if cache still matches our expected parent (prevents lost update)
execution_cache.update_with_guard(|cached| {
if cached
.as_ref()
.is_some_and(|c| c.executed_block_hash() == expected_parent_hash)
{
*cached = None;
}
});
debug!(target: "engine::caching", "cleared execution cache on update error");
return
}

new_cache.update_metrics();

// Wait for block validity OUTSIDE the lock (this can block indefinitely)
let is_valid = valid_block_rx.recv().is_ok();

if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
*cached = Some(new_cache);
// Drop saved_cache now (releases usage guard) since we're done with it
drop(saved_cache);

// Conditional swap: only update if cache still matches our expected parent.
// This prevents clobbering updates from another thread that ran while we blocked.
execution_cache.update_with_guard(|cached| {
if cached.as_ref().is_some_and(|c| c.executed_block_hash() == expected_parent_hash)
{
if is_valid {
*cached = Some(new_cache);
} else {
*cached = None;
}
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
debug!(
target: "engine::caching",
expected=?expected_parent_hash,
"Skipped cache save due to parent-hash mismatch"
);
}
});

if !is_valid {
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}

let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");

Expand Down
Loading