Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions crates/engine/tree/src/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ use reth_evm::ConfigureEvm;
use reth_network_p2p::BlockClient;
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives_traits::NodePrimitives;
use reth_provider::{
providers::{BlockchainProvider, ProviderNodeTypes},
ProviderFactory,
};
use reth_prune::PrunerWithFactory;
use reth_stages_api::{MetricEventsSender, Pipeline};
use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
use reth_stages_api::Pipeline;
use reth_tasks::Runtime;
use reth_trie_db::ChangesetCache;
use std::sync::Arc;
Expand All @@ -34,8 +30,8 @@ use std::sync::Arc;
/// This spawns and wires together the following components:
///
/// - **[`BasicBlockDownloader`]** — downloads blocks on demand from the network during live sync.
/// - **[`PersistenceHandle`]** — spawns the persistence service on a background thread for writing
/// blocks and performing pruning outside the critical consensus path.
/// - **[`PersistenceHandle`]** — handle to the persistence service for writing blocks and
/// performing pruning outside the critical consensus path.
/// - **[`EngineApiTreeHandler`]** — spawns the tree handler that processes engine API requests
/// (`newPayload`, `forkchoiceUpdated`) and maintains the in-memory chain state.
/// - **[`EngineApiRequestHandler`]** + **[`EngineHandler`]** — glue that routes incoming CL
Expand All @@ -55,13 +51,11 @@ pub fn build_engine_orchestrator<N, Client, S, V, C>(
incoming_requests: S,
pipeline: Pipeline<N>,
pipeline_task_spawner: Runtime,
provider: ProviderFactory<N>,
blockchain_db: BlockchainProvider<N>,
pruner: PrunerWithFactory<ProviderFactory<N>>,
persistence_handle: PersistenceHandle<N::Primitives>,
payload_builder: PayloadBuilderHandle<N::Payload>,
payload_validator: V,
tree_config: TreeConfig,
sync_metrics_tx: MetricEventsSender,
evm_config: C,
changeset_cache: ChangesetCache,
runtime: Runtime,
Expand All @@ -82,9 +76,6 @@ where
{
let downloader = BasicBlockDownloader::new(client, consensus.clone());

let persistence_handle =
PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);

let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();

let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
Expand Down
50 changes: 49 additions & 1 deletion crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_tasks::spawn_os_thread;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, SendError, Sender},
Arc,
},
Expand Down Expand Up @@ -247,6 +248,8 @@ pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
/// Guard that joins the service thread when all handles are dropped.
/// Uses `Arc` so the handle remains `Clone`.
_service_guard: Arc<ServiceGuard>,
/// External gate to suppress persistence.
persistence_gate: PersistenceGate,
}

impl<T: NodePrimitives> PersistenceHandle<T> {
Expand All @@ -255,7 +258,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// This is intended for testing purposes where you want to mock the persistence service.
/// For production use, prefer [`spawn_service`](Self::spawn_service).
pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
Self {
sender,
_service_guard: Arc::new(ServiceGuard(None)),
persistence_gate: PersistenceGate::new(),
}
}

/// Create a new [`PersistenceHandle`], and spawn the persistence service.
Expand Down Expand Up @@ -286,6 +293,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
PersistenceHandle {
sender: db_service_tx,
_service_guard: Arc::new(ServiceGuard(Some(join_handle))),
persistence_gate: PersistenceGate::new(),
}
}

Expand All @@ -298,6 +306,11 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
self.sender.send(action)
}

/// Returns a clone of the [`PersistenceGate`].
pub fn persistence_gate(&self) -> PersistenceGate {
self.persistence_gate.clone()
}

/// Tells the persistence service to save a certain list of finalized blocks. The blocks are
/// assumed to be ordered by block number.
///
Expand Down Expand Up @@ -371,6 +384,41 @@ impl Drop for ServiceGuard {
}
}

/// Shared gate that controls whether the engine tree handler starts new persistence cycles.
///
/// When disabled,
/// [`EngineApiTreeHandler::should_persist`](crate::tree::EngineApiTreeHandler::should_persist)
/// returns `false`, preventing new persistence cycles from starting. An in-flight persistence
/// task is unaffected.
#[derive(Clone, Debug)]
pub struct PersistenceGate(Arc<AtomicBool>);

impl PersistenceGate {
/// Creates a new gate that is enabled by default.
pub fn new() -> Self {
Self(Arc::new(AtomicBool::new(true)))
}

/// Returns `true` if persistence cycles are enabled.
pub fn is_enabled(&self) -> bool {
self.0.load(Ordering::Relaxed)
}

/// Sets the persistence gate.
///
/// When set to `false`, no new persistence cycles will be started by the tree handler.
/// An in-flight persistence task is unaffected. Set back to `true` to resume.
pub fn set_enabled(&self, enabled: bool) {
self.0.store(enabled, Ordering::Relaxed);
}
}

impl Default for PersistenceGate {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
10 changes: 7 additions & 3 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2010,9 +2010,13 @@ where
}

/// Returns true if the canonical chain length minus the last persisted
/// block is greater than or equal to the persistence threshold and
/// backfill is not running.
pub const fn should_persist(&self) -> bool {
/// block is greater than or equal to the persistence threshold,
/// backfill is not running, and the external persistence gate is open.
pub fn should_persist(&self) -> bool {
if !self.persistence.persistence_gate().is_enabled() {
return false
}

if !self.backfill_sync_state.is_idle() {
// can't persist if backfill is running
return false
Expand Down
15 changes: 12 additions & 3 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use reth_engine_tree::{
chain::{ChainEvent, FromOrchestrator},
engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
launch::build_engine_orchestrator,
persistence::PersistenceHandle,
tree::TreeConfig,
};
use reth_engine_util::EngineMessageStreamExt;
Expand Down Expand Up @@ -226,20 +227,26 @@ impl EngineNodeLauncher {
EngineApiKind::Ethereum
};

let persistence_handle =
PersistenceHandle::<<T::Types as NodeTypes>::Primitives>::spawn_service(
ctx.provider_factory().clone(),
pruner,
ctx.sync_metrics_tx(),
);
let persistence_gate = persistence_handle.persistence_gate();

let mut orchestrator = build_engine_orchestrator(
engine_kind,
consensus.clone(),
network_client.clone(),
Box::pin(consensus_engine_stream),
pipeline,
ctx.task_executor().clone(),
ctx.provider_factory().clone(),
ctx.blockchain_db().clone(),
pruner,
persistence_handle,
ctx.components().payload_builder_handle().clone(),
engine_validator,
engine_tree_config,
ctx.sync_metrics_tx(),
ctx.components().evm_config().clone(),
changeset_cache,
ctx.task_executor().clone(),
Expand Down Expand Up @@ -271,6 +278,7 @@ impl EngineNodeLauncher {
engine_events,
beacon_engine_handle,
engine_shutdown: _,
persistence_gate: _,
} = add_ons.launch_add_ons(add_ons_ctx).await?;

// Create engine shutdown handle
Expand Down Expand Up @@ -409,6 +417,7 @@ impl EngineNodeLauncher {
engine_events,
beacon_engine_handle,
engine_shutdown,
persistence_gate,
},
};
// Notify on node started
Expand Down
6 changes: 5 additions & 1 deletion crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pub use jsonrpsee::{
core::middleware::layer::Either,
server::middleware::rpc::{RpcService, RpcServiceBuilder},
};
use reth_engine_tree::tree::WaitForCaches;
pub use reth_engine_tree::tree::{BasicEngineValidator, EngineValidator};
use reth_engine_tree::{persistence::PersistenceGate, tree::WaitForCaches};
pub use reth_rpc_builder::{
middleware::{RethAuthHttpMiddleware, RethRpcMiddleware},
Identity, Stack,
Expand Down Expand Up @@ -345,6 +345,8 @@ pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
pub beacon_engine_handle: ConsensusEngineHandle<<Node::Types as NodeTypes>::Payload>,
/// Handle to trigger engine shutdown.
pub engine_shutdown: EngineShutdown,
/// Gate to suppress engine persistence cycles.
pub persistence_gate: PersistenceGate,
}

impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
Expand All @@ -355,6 +357,7 @@ impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, Et
engine_events: self.engine_events.clone(),
beacon_engine_handle: self.beacon_engine_handle.clone(),
engine_shutdown: self.engine_shutdown.clone(),
persistence_gate: self.persistence_gate.clone(),
}
}
}
Expand Down Expand Up @@ -1084,6 +1087,7 @@ where
engine_events,
beacon_engine_handle: engine_handle,
engine_shutdown: EngineShutdown::default(),
persistence_gate: PersistenceGate::default(),
})
}

Expand Down
Loading