From 8f18164660f96b14bcf104aefa512feb4630b1fe Mon Sep 17 00:00:00 2001 From: Derek Cofausper <256792747+decofe@users.noreply.github.com> Date: Thu, 16 Apr 2026 12:18:59 +0000 Subject: [PATCH 1/5] feat(engine): add persistence gate to PersistenceHandle Adds a persistence gate to PersistenceHandle so external callers can suppress new persistence cycles during latency-critical windows (e.g. block proposal on fast-block-time chains). PersistenceHandle::set_persistence_enabled(false) causes should_persist() to return false, preventing new SaveBlocks from being dispatched. In-flight persistence is unaffected. Defaults to true. Co-Authored-By: joshieDo <93316087+joshieDo@users.noreply.github.com> Co-Authored-By: Georgios Konstantopoulos <17802178+gakonst@users.noreply.github.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d961d-12d6-73da-9a80-f732efbbd314 --- crates/engine/tree/src/persistence.rs | 31 ++++++++++++++++++++++++++- crates/engine/tree/src/tree/mod.rs | 13 ++++++++--- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index da778e1a30f..cb715d7f705 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -14,6 +14,7 @@ use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_tasks::spawn_os_thread; use std::{ sync::{ + atomic::{AtomicBool, Ordering}, mpsc::{Receiver, SendError, Sender}, Arc, }, @@ -247,6 +248,11 @@ pub struct PersistenceHandle { /// Guard that joins the service thread when all handles are dropped. /// Uses `Arc` so the handle remains `Clone`. _service_guard: Arc, + /// External gate to suppress persistence. When `false`, + /// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist) + /// returns `false`, preventing new persistence cycles from starting. An in-flight persistence + /// task is unaffected. + persistence_enabled: Arc, } impl PersistenceHandle { @@ -255,7 +261,11 @@ impl PersistenceHandle { /// This is intended for testing purposes where you want to mock the persistence service. /// For production use, prefer [`spawn_service`](Self::spawn_service). pub fn new(sender: Sender>) -> Self { - Self { sender, _service_guard: Arc::new(ServiceGuard(None)) } + Self { + sender, + _service_guard: Arc::new(ServiceGuard(None)), + persistence_enabled: Arc::new(AtomicBool::new(true)), + } } /// Create a new [`PersistenceHandle`], and spawn the persistence service. @@ -286,6 +296,7 @@ impl PersistenceHandle { PersistenceHandle { sender: db_service_tx, _service_guard: Arc::new(ServiceGuard(Some(join_handle))), + persistence_enabled: Arc::new(AtomicBool::new(true)), } } @@ -298,6 +309,24 @@ impl PersistenceHandle { self.sender.send(action) } + /// Returns `true` if persistence is enabled. + /// + /// When `false`, the tree handler will not start new persistence cycles. + pub fn is_persistence_enabled(&self) -> bool { + self.persistence_enabled.load(Ordering::Relaxed) + } + + /// Sets the persistence gate. + /// + /// When set to `false`, no new persistence cycles will be started by the tree handler. + /// An in-flight persistence task is unaffected. + /// + /// When set back to `true`, the tree handler will resume starting persistence cycles + /// on its next loop iteration. + pub fn set_persistence_enabled(&self, enabled: bool) { + self.persistence_enabled.store(enabled, Ordering::Relaxed); + } + /// Tells the persistence service to save a certain list of finalized blocks. The blocks are /// assumed to be ordered by block number. /// diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index a254f85003d..acbce65c196 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -405,6 +405,9 @@ where /// /// Returns the sender through which incoming requests can be sent to the task and the receiver /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine. + /// + /// Persistence can be suppressed at runtime via + /// [`PersistenceHandle::set_persistence_enabled`]. #[expect(clippy::complexity)] pub fn spawn_new( provider: P, @@ -2010,9 +2013,13 @@ where } /// Returns true if the canonical chain length minus the last persisted - /// block is greater than or equal to the persistence threshold and - /// backfill is not running. - pub const fn should_persist(&self) -> bool { + /// block is greater than or equal to the persistence threshold, + /// backfill is not running, and the external persistence gate is open. + pub fn should_persist(&self) -> bool { + if !self.persistence.is_persistence_enabled() { + return false + } + if !self.backfill_sync_state.is_idle() { // can't persist if backfill is running return false From d3c4589790d27cdc00bb0a3cffd0688a6871899a Mon Sep 17 00:00:00 2001 From: Derek Cofausper <256792747+decofe@users.noreply.github.com> Date: Thu, 16 Apr 2026 12:32:38 +0000 Subject: [PATCH 2/5] rm spawn_new doc comment per review Co-Authored-By: joshieDo <93316087+joshieDo@users.noreply.github.com> Co-Authored-By: Georgios Konstantopoulos <17802178+gakonst@users.noreply.github.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d961d-12d6-73da-9a80-f732efbbd314 --- crates/engine/tree/src/tree/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index acbce65c196..c8212159bc9 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -405,9 +405,6 @@ where /// /// Returns the sender through which incoming requests can be sent to the task and the receiver /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine. - /// - /// Persistence can be suppressed at runtime via - /// [`PersistenceHandle::set_persistence_enabled`]. #[expect(clippy::complexity)] pub fn spawn_new( provider: P, From b545cded624e554b3ffd792af8df6b45591765c7 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:05:57 +0100 Subject: [PATCH 3/5] feat(engine): add PersistenceGate and expose on RpcHandle Introduces PersistenceGate, a shared gate that controls whether the engine tree handler starts new persistence cycles. Exposed on RpcHandle so external consumers (e.g. Tempo) can pause/resume persistence via set_persistence_enabled()/is_persistence_enabled(). Also refactors build_engine_orchestrator to accept a PersistenceHandle directly instead of spawning it internally, removing three parameters and simplifying the return type. Amp-Thread-ID: https://ampcode.com/threads/T-019d96a7-4bee-7528-8c27-75b8d57f247d Co-authored-by: Amp --- crates/engine/tree/src/launch.rs | 22 +++----- crates/engine/tree/src/persistence.rs | 65 +++++++++++++++--------- crates/engine/tree/src/tree/mod.rs | 2 +- crates/node/builder/src/launch/engine.rs | 15 ++++-- crates/node/builder/src/rpc.rs | 19 ++++++- 5 files changed, 80 insertions(+), 43 deletions(-) diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index ce7a7e39b2a..e5403e2d2af 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -19,23 +19,17 @@ use reth_evm::ConfigureEvm; use reth_network_p2p::BlockClient; use reth_payload_builder::PayloadBuilderHandle; use reth_primitives_traits::NodePrimitives; -use reth_provider::{ - providers::{BlockchainProvider, ProviderNodeTypes}, - ProviderFactory, -}; -use reth_prune::PrunerWithFactory; -use reth_stages_api::{MetricEventsSender, Pipeline}; +use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes}; +use reth_stages_api::Pipeline; use reth_tasks::Runtime; use reth_trie_db::ChangesetCache; use std::sync::Arc; /// Builds the engine [`ChainOrchestrator`] that drives the chain forward. /// -/// This spawns and wires together the following components: +/// This wires together the following components: /// /// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync. -/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing -/// blocks and performing pruning outside the critical consensus path. /// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests /// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state. /// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL @@ -43,6 +37,9 @@ use std::sync::Arc; /// - **[`PipelineSync`]** — wraps the staged sync [`Pipeline`] for backfill sync when the node /// needs to catch up over large block ranges. /// +/// The caller is responsible for spawning the [`PersistenceHandle`] via +/// [`PersistenceHandle::spawn_service`] and passing it in. +/// /// The returned orchestrator implements [`Stream`] and yields /// [`ChainEvent`]s. /// @@ -55,13 +52,11 @@ pub fn build_engine_orchestrator( incoming_requests: S, pipeline: Pipeline, pipeline_task_spawner: Runtime, - provider: ProviderFactory, blockchain_db: BlockchainProvider, - pruner: PrunerWithFactory>, + persistence_handle: PersistenceHandle, payload_builder: PayloadBuilderHandle, payload_validator: V, tree_config: TreeConfig, - sync_metrics_tx: MetricEventsSender, evm_config: C, changeset_cache: ChangesetCache, runtime: Runtime, @@ -82,9 +77,6 @@ where { let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let persistence_handle = - PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); - let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new( diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index cb715d7f705..1eb00686e12 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -248,11 +248,8 @@ pub struct PersistenceHandle { /// Guard that joins the service thread when all handles are dropped. /// Uses `Arc` so the handle remains `Clone`. _service_guard: Arc, - /// External gate to suppress persistence. When `false`, - /// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist) - /// returns `false`, preventing new persistence cycles from starting. An in-flight persistence - /// task is unaffected. - persistence_enabled: Arc, + /// External gate to suppress persistence. + persistence_gate: PersistenceGate, } impl PersistenceHandle { @@ -264,7 +261,7 @@ impl PersistenceHandle { Self { sender, _service_guard: Arc::new(ServiceGuard(None)), - persistence_enabled: Arc::new(AtomicBool::new(true)), + persistence_gate: PersistenceGate::new(), } } @@ -296,7 +293,7 @@ impl PersistenceHandle { PersistenceHandle { sender: db_service_tx, _service_guard: Arc::new(ServiceGuard(Some(join_handle))), - persistence_enabled: Arc::new(AtomicBool::new(true)), + persistence_gate: PersistenceGate::new(), } } @@ -309,22 +306,9 @@ impl PersistenceHandle { self.sender.send(action) } - /// Returns `true` if persistence is enabled. - /// - /// When `false`, the tree handler will not start new persistence cycles. - pub fn is_persistence_enabled(&self) -> bool { - self.persistence_enabled.load(Ordering::Relaxed) - } - - /// Sets the persistence gate. - /// - /// When set to `false`, no new persistence cycles will be started by the tree handler. - /// An in-flight persistence task is unaffected. - /// - /// When set back to `true`, the tree handler will resume starting persistence cycles - /// on its next loop iteration. - pub fn set_persistence_enabled(&self, enabled: bool) { - self.persistence_enabled.store(enabled, Ordering::Relaxed); + /// Returns a clone of the [`PersistenceGate`]. + pub fn persistence_gate(&self) -> PersistenceGate { + self.persistence_gate.clone() } /// Tells the persistence service to save a certain list of finalized blocks. The blocks are @@ -400,6 +384,41 @@ impl Drop for ServiceGuard { } } +/// Shared gate that controls whether the engine tree handler starts new persistence cycles. +/// +/// When disabled, +/// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist) +/// returns `false`, preventing new persistence cycles from starting. An in-flight persistence +/// task is unaffected. +#[derive(Clone, Debug)] +pub struct PersistenceGate(Arc); + +impl PersistenceGate { + /// Creates a new gate that is enabled by default. + pub fn new() -> Self { + Self(Arc::new(AtomicBool::new(true))) + } + + /// Returns `true` if persistence cycles are enabled. + pub fn is_enabled(&self) -> bool { + self.0.load(Ordering::Relaxed) + } + + /// Sets the persistence gate. + /// + /// When set to `false`, no new persistence cycles will be started by the tree handler. + /// An in-flight persistence task is unaffected. Set back to `true` to resume. + pub fn set_enabled(&self, enabled: bool) { + self.0.store(enabled, Ordering::Relaxed); + } +} + +impl Default for PersistenceGate { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index c8212159bc9..d1d42e6777e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2013,7 +2013,7 @@ where /// block is greater than or equal to the persistence threshold, /// backfill is not running, and the external persistence gate is open. pub fn should_persist(&self) -> bool { - if !self.persistence.is_persistence_enabled() { + if !self.persistence.persistence_gate().is_enabled() { return false } diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 6a33f20abfe..0ca04d35518 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -15,6 +15,7 @@ use reth_engine_tree::{ chain::{ChainEvent, FromOrchestrator}, engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler}, launch::build_engine_orchestrator, + persistence::PersistenceHandle, tree::TreeConfig, }; use reth_engine_util::EngineMessageStreamExt; @@ -226,6 +227,14 @@ impl EngineNodeLauncher { EngineApiKind::Ethereum }; + let persistence_handle = + PersistenceHandle::<::Primitives>::spawn_service( + ctx.provider_factory().clone(), + pruner, + ctx.sync_metrics_tx(), + ); + let persistence_gate = persistence_handle.persistence_gate(); + let mut orchestrator = build_engine_orchestrator( engine_kind, consensus.clone(), @@ -233,13 +242,11 @@ impl EngineNodeLauncher { Box::pin(consensus_engine_stream), pipeline, ctx.task_executor().clone(), - ctx.provider_factory().clone(), ctx.blockchain_db().clone(), - pruner, + persistence_handle, ctx.components().payload_builder_handle().clone(), engine_validator, engine_tree_config, - ctx.sync_metrics_tx(), ctx.components().evm_config().clone(), changeset_cache, ctx.task_executor().clone(), @@ -271,6 +278,7 @@ impl EngineNodeLauncher { engine_events, beacon_engine_handle, engine_shutdown: _, + persistence_gate: _, } = add_ons.launch_add_ons(add_ons_ctx).await?; // Create engine shutdown handle @@ -409,6 +417,7 @@ impl EngineNodeLauncher { engine_events, beacon_engine_handle, engine_shutdown, + persistence_gate, }, }; // Notify on node started diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index a10c40841f7..3752b5ac091 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -4,8 +4,8 @@ pub use jsonrpsee::{ core::middleware::layer::Either, server::middleware::rpc::{RpcService, RpcServiceBuilder}, }; -use reth_engine_tree::tree::WaitForCaches; pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator}; +use reth_engine_tree::{persistence::PersistenceGate, tree::WaitForCaches}; pub use reth_rpc_builder::{ middleware::{RethAuthHttpMiddleware, RethRpcMiddleware}, Identity, Stack, @@ -345,6 +345,8 @@ pub struct RpcHandle { pub beacon_engine_handle: ConsensusEngineHandle<::Payload>, /// Handle to trigger engine shutdown. pub engine_shutdown: EngineShutdown, + /// Gate to suppress engine persistence cycles. + pub persistence_gate: PersistenceGate, } impl Clone for RpcHandle { @@ -355,6 +357,7 @@ impl Clone for RpcHandle RpcHandle { &self.beacon_engine_handle } + /// Returns `true` if engine persistence cycles are enabled. + pub fn is_persistence_enabled(&self) -> bool { + self.persistence_gate.is_enabled() + } + + /// Sets the engine persistence gate. + /// + /// When set to `false`, no new persistence cycles will be started by the tree handler. + /// An in-flight persistence task is unaffected. Set back to `true` to resume. + pub fn set_persistence_enabled(&self, enabled: bool) { + self.persistence_gate.set_enabled(enabled); + } + /// Returns the consensus engine events sender. pub const fn consensus_engine_events( &self, @@ -1084,6 +1100,7 @@ where engine_events, beacon_engine_handle: engine_handle, engine_shutdown: EngineShutdown::default(), + persistence_gate: PersistenceGate::default(), }) } From bb7276bdea345af3aeb23213e57659b0cb1260b6 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:08:28 +0100 Subject: [PATCH 4/5] refactor: remove RpcHandle convenience methods, restore doc comment Amp-Thread-ID: https://ampcode.com/threads/T-019d96a7-4bee-7528-8c27-75b8d57f247d Co-authored-by: Amp --- crates/engine/tree/src/launch.rs | 2 +- crates/node/builder/src/rpc.rs | 13 ------------- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index e5403e2d2af..d888921fc46 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -27,7 +27,7 @@ use std::sync::Arc; /// Builds the engine [`ChainOrchestrator`] that drives the chain forward. /// -/// This wires together the following components: +/// This spawns and wires together the following components: /// /// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync. /// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 3752b5ac091..1c29661e92a 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -398,19 +398,6 @@ impl RpcHandle { &self.beacon_engine_handle } - /// Returns `true` if engine persistence cycles are enabled. - pub fn is_persistence_enabled(&self) -> bool { - self.persistence_gate.is_enabled() - } - - /// Sets the engine persistence gate. - /// - /// When set to `false`, no new persistence cycles will be started by the tree handler. - /// An in-flight persistence task is unaffected. Set back to `true` to resume. - pub fn set_persistence_enabled(&self, enabled: bool) { - self.persistence_gate.set_enabled(enabled); - } - /// Returns the consensus engine events sender. pub const fn consensus_engine_events( &self, From c5aa6782fae0203246111861bd9b2cd06cb21962 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:10:20 +0100 Subject: [PATCH 5/5] docs: fix build_engine_orchestrator doc comments Amp-Thread-ID: https://ampcode.com/threads/T-019d96a7-4bee-7528-8c27-75b8d57f247d Co-authored-by: Amp --- crates/engine/tree/src/launch.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index d888921fc46..9ce317c409c 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -30,6 +30,8 @@ use std::sync::Arc; /// This spawns and wires together the following components: /// /// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync. +/// - **[`PersistenceHandle`]** — handle to the persistence service for writing blocks and +/// performing pruning outside the critical consensus path. /// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests /// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state. /// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL @@ -37,9 +39,6 @@ use std::sync::Arc; /// - **[`PipelineSync`]** — wraps the staged sync [`Pipeline`] for backfill sync when the node /// needs to catch up over large block ranges. /// -/// The caller is responsible for spawning the [`PersistenceHandle`] via -/// [`PersistenceHandle::spawn_service`] and passing it in. -/// /// The returned orchestrator implements [`Stream`] and yields /// [`ChainEvent`]s. ///