diff --git a/crates/engine/tree/src/launch.rs b/crates/engine/tree/src/launch.rs index ce7a7e39b2a..9ce317c409c 100644 --- a/crates/engine/tree/src/launch.rs +++ b/crates/engine/tree/src/launch.rs @@ -19,12 +19,8 @@ use reth_evm::ConfigureEvm; use reth_network_p2p::BlockClient; use reth_payload_builder::PayloadBuilderHandle; use reth_primitives_traits::NodePrimitives; -use reth_provider::{ - providers::{BlockchainProvider, ProviderNodeTypes}, - ProviderFactory, -}; -use reth_prune::PrunerWithFactory; -use reth_stages_api::{MetricEventsSender, Pipeline}; +use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes}; +use reth_stages_api::Pipeline; use reth_tasks::Runtime; use reth_trie_db::ChangesetCache; use std::sync::Arc; @@ -34,8 +30,8 @@ use std::sync::Arc; /// This spawns and wires together the following components: /// /// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync. -/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing -/// blocks and performing pruning outside the critical consensus path. +/// - **[`PersistenceHandle`]** — handle to the persistence service for writing blocks and +/// performing pruning outside the critical consensus path. /// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests /// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state. /// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL @@ -55,13 +51,11 @@ pub fn build_engine_orchestrator( incoming_requests: S, pipeline: Pipeline, pipeline_task_spawner: Runtime, - provider: ProviderFactory, blockchain_db: BlockchainProvider, - pruner: PrunerWithFactory>, + persistence_handle: PersistenceHandle, payload_builder: PayloadBuilderHandle, payload_validator: V, tree_config: TreeConfig, - sync_metrics_tx: MetricEventsSender, evm_config: C, changeset_cache: ChangesetCache, runtime: Runtime, @@ -82,9 +76,6 @@ where { let downloader = BasicBlockDownloader::new(client, consensus.clone()); - let persistence_handle = - PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx); - let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new( diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index da778e1a30f..1eb00686e12 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -14,6 +14,7 @@ use reth_stages_api::{MetricEvent, MetricEventsSender}; use reth_tasks::spawn_os_thread; use std::{ sync::{ + atomic::{AtomicBool, Ordering}, mpsc::{Receiver, SendError, Sender}, Arc, }, @@ -247,6 +248,8 @@ pub struct PersistenceHandle { /// Guard that joins the service thread when all handles are dropped. /// Uses `Arc` so the handle remains `Clone`. _service_guard: Arc, + /// External gate to suppress persistence. + persistence_gate: PersistenceGate, } impl PersistenceHandle { @@ -255,7 +258,11 @@ impl PersistenceHandle { /// This is intended for testing purposes where you want to mock the persistence service. /// For production use, prefer [`spawn_service`](Self::spawn_service). pub fn new(sender: Sender>) -> Self { - Self { sender, _service_guard: Arc::new(ServiceGuard(None)) } + Self { + sender, + _service_guard: Arc::new(ServiceGuard(None)), + persistence_gate: PersistenceGate::new(), + } } /// Create a new [`PersistenceHandle`], and spawn the persistence service. @@ -286,6 +293,7 @@ impl PersistenceHandle { PersistenceHandle { sender: db_service_tx, _service_guard: Arc::new(ServiceGuard(Some(join_handle))), + persistence_gate: PersistenceGate::new(), } } @@ -298,6 +306,11 @@ impl PersistenceHandle { self.sender.send(action) } + /// Returns a clone of the [`PersistenceGate`]. + pub fn persistence_gate(&self) -> PersistenceGate { + self.persistence_gate.clone() + } + /// Tells the persistence service to save a certain list of finalized blocks. The blocks are /// assumed to be ordered by block number. /// @@ -371,6 +384,41 @@ impl Drop for ServiceGuard { } } +/// Shared gate that controls whether the engine tree handler starts new persistence cycles. +/// +/// When disabled, +/// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist) +/// returns `false`, preventing new persistence cycles from starting. An in-flight persistence +/// task is unaffected. +#[derive(Clone, Debug)] +pub struct PersistenceGate(Arc); + +impl PersistenceGate { + /// Creates a new gate that is enabled by default. + pub fn new() -> Self { + Self(Arc::new(AtomicBool::new(true))) + } + + /// Returns `true` if persistence cycles are enabled. + pub fn is_enabled(&self) -> bool { + self.0.load(Ordering::Relaxed) + } + + /// Sets the persistence gate. + /// + /// When set to `false`, no new persistence cycles will be started by the tree handler. + /// An in-flight persistence task is unaffected. Set back to `true` to resume. + pub fn set_enabled(&self, enabled: bool) { + self.0.store(enabled, Ordering::Relaxed); + } +} + +impl Default for PersistenceGate { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index a254f85003d..d1d42e6777e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2010,9 +2010,13 @@ where } /// Returns true if the canonical chain length minus the last persisted - /// block is greater than or equal to the persistence threshold and - /// backfill is not running. - pub const fn should_persist(&self) -> bool { + /// block is greater than or equal to the persistence threshold, + /// backfill is not running, and the external persistence gate is open. + pub fn should_persist(&self) -> bool { + if !self.persistence.persistence_gate().is_enabled() { + return false + } + if !self.backfill_sync_state.is_idle() { // can't persist if backfill is running return false diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 6a33f20abfe..0ca04d35518 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -15,6 +15,7 @@ use reth_engine_tree::{ chain::{ChainEvent, FromOrchestrator}, engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler}, launch::build_engine_orchestrator, + persistence::PersistenceHandle, tree::TreeConfig, }; use reth_engine_util::EngineMessageStreamExt; @@ -226,6 +227,14 @@ impl EngineNodeLauncher { EngineApiKind::Ethereum }; + let persistence_handle = + PersistenceHandle::<::Primitives>::spawn_service( + ctx.provider_factory().clone(), + pruner, + ctx.sync_metrics_tx(), + ); + let persistence_gate = persistence_handle.persistence_gate(); + let mut orchestrator = build_engine_orchestrator( engine_kind, consensus.clone(), @@ -233,13 +242,11 @@ impl EngineNodeLauncher { Box::pin(consensus_engine_stream), pipeline, ctx.task_executor().clone(), - ctx.provider_factory().clone(), ctx.blockchain_db().clone(), - pruner, + persistence_handle, ctx.components().payload_builder_handle().clone(), engine_validator, engine_tree_config, - ctx.sync_metrics_tx(), ctx.components().evm_config().clone(), changeset_cache, ctx.task_executor().clone(), @@ -271,6 +278,7 @@ impl EngineNodeLauncher { engine_events, beacon_engine_handle, engine_shutdown: _, + persistence_gate: _, } = add_ons.launch_add_ons(add_ons_ctx).await?; // Create engine shutdown handle @@ -409,6 +417,7 @@ impl EngineNodeLauncher { engine_events, beacon_engine_handle, engine_shutdown, + persistence_gate, }, }; // Notify on node started diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index a10c40841f7..1c29661e92a 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -4,8 +4,8 @@ pub use jsonrpsee::{ core::middleware::layer::Either, server::middleware::rpc::{RpcService, RpcServiceBuilder}, }; -use reth_engine_tree::tree::WaitForCaches; pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator}; +use reth_engine_tree::{persistence::PersistenceGate, tree::WaitForCaches}; pub use reth_rpc_builder::{ middleware::{RethAuthHttpMiddleware, RethRpcMiddleware}, Identity, Stack, @@ -345,6 +345,8 @@ pub struct RpcHandle { pub beacon_engine_handle: ConsensusEngineHandle<::Payload>, /// Handle to trigger engine shutdown. pub engine_shutdown: EngineShutdown, + /// Gate to suppress engine persistence cycles. + pub persistence_gate: PersistenceGate, } impl Clone for RpcHandle { @@ -355,6 +357,7 @@ impl Clone for RpcHandle