Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,16 +981,10 @@ impl PayloadExecutionCache {
/// Updates the cache with a closure that has exclusive access to the guard.
/// This ensures that all cache operations happen atomically.
///
/// ## CRITICAL SAFETY REQUIREMENT
///
/// **Before calling this method, you MUST ensure there are no other active cache users.**
/// This includes:
/// - No running [`PrewarmCacheTask`] instances that could write to the cache
/// - No concurrent transactions that might access the cached state
/// - All prewarming operations must be completed or cancelled
///
/// Violating this requirement can result in cache corruption, incorrect state data,
/// and potential consensus failures.
/// Callers must not mutate the *underlying* [`ExecutionCache`] data (e.g. via
/// [`SavedCache::clear`]) while other tasks may hold clones of the same
/// `SavedCache`. Swapping the slot value (`*cached = Some(..)` / `*cached = None`)
/// is always safe because existing clones retain their own `Arc` references.
pub fn update_with_guard<F>(&self, update_fn: F)
where
F: FnOnce(&mut Option<SavedCache>),
Expand Down
68 changes: 35 additions & 33 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,16 @@ where
});
}

/// This method calls `ExecutionCache::update_with_guard` which requires exclusive access.
/// It should only be called after ensuring that:
/// 1. All prewarming tasks have completed execution
/// 2. No other concurrent operations are accessing the cache
///
/// Saves the warmed caches back into the shared slot after prewarming completes.
///
/// This consumes the `SavedCache` held by the task, which releases its usage guard and allows
/// the new, warmed cache to be inserted.
/// Waits for block validation without any lock held, then only on success splits the
/// saved cache, inserts state, and publishes under a brief write lock. This avoids
/// the ~100ms+ lock hold that previously blocked concurrent readers during
/// `valid_block_rx.recv()`.
Comment on lines +217 to +219
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are no concurrent readers here, because the main exec thread is busy validating the block

///
/// The ordering is critical: `split()` releases the usage guard and `insert_state()`
/// mutates the shared fixed-caches in-place, so both must happen only after
/// validation to prevent concurrent readers from observing unvalidated state.
///
/// This method is called from `run()` only after all execution tasks are complete.
#[instrument(level = "debug", target = "engine::tree::payload_processor::prewarm", skip_all)]
Expand All @@ -236,36 +237,37 @@ where

if let Some(saved_cache) = saved_cache {
debug!(target: "engine::caching", parent_hash=?hash, "Updating execution cache");
// Perform all cache operations atomically under the lock
execution_cache.update_with_guard(|cached| {
// consumes the `SavedCache` held by the prewarming task, which releases its usage
// guard
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);

// Insert state into cache while holding the lock
// Access the BundleState through the shared ExecutionOutcome
if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
// Clear the cache on error to prevent having a polluted cache
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on update error");
return;
}

// Wait for state root validation WITHOUT holding the cache lock.
// This is the key optimization: the original code held the lock across this
// blocking recv(), which blocked the next block's prewarming from accessing
// the cache for ~100ms+.
if valid_block_rx.recv().is_err() {
Comment on lines +241 to +245
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt this unproblematic anyways, because this fires once we have validated the block and the remaining ops here are cheap

debug!(target: "engine::caching", parent_hash=?hash, "skipped cache publish on invalid block");
return;
}

// Block is valid — build and publish the warmed cache.
// split() consumes the SavedCache (releasing its usage guard) and insert_state()
// mutates the shared caches in-place, so these must only happen after validation
// to prevent concurrent readers from observing unvalidated state.
let (caches, cache_metrics, disable_cache_metrics) = saved_cache.split();
let new_cache = SavedCache::new(hash, caches, cache_metrics)
.with_disable_cache_metrics(disable_cache_metrics);

if new_cache.cache().insert_state(&execution_outcome.state).is_err() {
execution_cache.update_with_guard(|cached| {
*cached = None;
});
debug!(target: "engine::caching", "cleared execution cache on update error");
} else {
new_cache.update_metrics();

if valid_block_rx.recv().is_ok() {
// Replace the shared cache with the new one; the previous cache (if any) is
// dropped.
// Publish under a brief lock.
execution_cache.update_with_guard(|cached| {
*cached = Some(new_cache);
} else {
// Block was invalid; caches were already mutated by insert_state above,
// so we must clear to prevent using polluted state
*cached = None;
debug!(target: "engine::caching", "cleared execution cache on invalid block");
}
});
});
}

let elapsed = start.elapsed();
debug!(target: "engine::caching", parent_hash=?hash, elapsed=?elapsed, "Updated execution cache");
Expand Down
Loading