diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 70d61ca8b44..2e626a1c63e 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -25,7 +25,10 @@ use eyre::Context; use fdlimit::raise_fd_limit; use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; -use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN}; +use reth_beacon_consensus::{ + hooks::{EngineHooks, PruneHook}, + BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN, +}; use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; @@ -446,16 +449,19 @@ impl NodeCommand { None }; - let pruner = prune_config.map(|prune_config| { + let mut hooks = EngineHooks::new(); + + if let Some(prune_config) = prune_config { info!(target: "reth::cli", ?prune_config, "Pruner initialized"); - reth_prune::Pruner::new( + let pruner = reth_prune::Pruner::new( db.clone(), self.chain.clone(), prune_config.block_interval, prune_config.parts, self.chain.prune_batch_sizes, - ) - }); + ); + hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); + } // Configure the consensus engine let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( @@ -471,7 +477,7 @@ impl NodeCommand { MIN_BLOCKS_FOR_PIPELINE_RUN, consensus_engine_tx, consensus_engine_rx, - pruner, + hooks, )?; info!(target: "reth::cli", "Consensus engine initialized"); diff --git a/crates/consensus/beacon/src/engine/error.rs b/crates/consensus/beacon/src/engine/error.rs index b78b3828bdb..eb4ffd62aca 100644 --- a/crates/consensus/beacon/src/engine/error.rs +++ b/crates/consensus/beacon/src/engine/error.rs @@ -1,4 +1,4 @@ -use reth_prune::PrunerError; +use crate::engine::hooks::EngineHookError; use reth_rpc_types::engine::ForkchoiceUpdateError; use reth_stages::PipelineError; @@ -20,9 +20,9 @@ pub enum BeaconConsensusEngineError { /// Pruner channel closed. #[error("Pruner channel closed")] PrunerChannelClosed, - /// Pruner error. + /// Hook error. #[error(transparent)] - Pruner(#[from] PrunerError), + Hook(#[from] EngineHookError), /// Common error. Wrapper around [reth_interfaces::Error]. #[error(transparent)] Common(#[from] reth_interfaces::Error), diff --git a/crates/consensus/beacon/src/engine/hooks/controller.rs b/crates/consensus/beacon/src/engine/hooks/controller.rs new file mode 100644 index 00000000000..1449629c70f --- /dev/null +++ b/crates/consensus/beacon/src/engine/hooks/controller.rs @@ -0,0 +1,132 @@ +use crate::hooks::{EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHooks}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; +use tracing::debug; + +/// Manages hooks under the control of the engine. +/// +/// This type polls the initialized hooks one by one, respecting the DB access level +/// (i.e. [crate::hooks::EngineHookDBAccessLevel::ReadWrite] that enforces running at most one such +/// hook). +pub(crate) struct EngineHooksController { + /// Collection of hooks. + /// + /// Hooks might be removed from the collection, and returned upon completion. + /// In the current implementation, it only happens when moved to `running_hook_with_db_write`. + hooks: VecDeque>, + /// Currently running hook with DB write access, if any. + running_hook_with_db_write: Option>, +} + +impl EngineHooksController { + /// Creates a new [`EngineHooksController`]. + pub(crate) fn new(hooks: EngineHooks) -> Self { + Self { hooks: hooks.inner.into(), running_hook_with_db_write: None } + } + + /// Polls currently running hook with DB write access, if any. + /// + /// Returns [`Poll::Ready`] if currently running hook with DB write access returned + /// an [event][`crate::hooks::EngineHookEvent`] that resulted in [action][`EngineHookAction`] or + /// error. + /// + /// Returns [`Poll::Pending`] in all other cases: + /// 1. No hook with DB write access is running. + /// 2. Currently running hook with DB write access returned [`Poll::Pending`] on polling. + /// 3. Currently running hook with DB write access returned [`Poll::Ready`] on polling, but no + /// action to act upon. + pub(crate) fn poll_running_hook_with_db_write( + &mut self, + cx: &mut Context<'_>, + args: EngineContext, + ) -> Poll> { + let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending }; + + match hook.poll(cx, args) { + Poll::Ready((event, action)) => { + debug!( + target: "consensus::engine::hooks", + hook = hook.name(), + ?action, + ?event, + "Polled running hook with db write access" + ); + + if !event.is_finished() { + self.running_hook_with_db_write = Some(hook); + } else { + self.hooks.push_back(hook); + } + + if let Some(action) = action { + return Poll::Ready(Ok(action)) + } + } + Poll::Pending => { + self.running_hook_with_db_write = Some(hook); + } + } + + Poll::Pending + } + + /// Polls next engine from the collection. + /// + /// Returns [`Poll::Ready`] if next hook returned an [event][`crate::hooks::EngineHookEvent`] + /// that resulted in [action][`EngineHookAction`]. + /// + /// Returns [`Poll::Pending`] in all other cases: + /// 1. Next hook is [`Option::None`], i.e. taken, meaning it's currently running and has a DB + /// write access. + /// 2. Next hook needs a DB write access, but either there's another hook with DB write access + /// running, or `db_write_active` passed into arguments is `true`. + /// 3. Next hook returned [`Poll::Pending`] on polling. + /// 4. Next hook returned [`Poll::Ready`] on polling, but no action to act upon. + pub(crate) fn poll_next_hook( + &mut self, + cx: &mut Context<'_>, + args: EngineContext, + db_write_active: bool, + ) -> Poll> { + let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending }; + + // Hook with DB write access level is not allowed to run due to already running hook with DB + // write access level or active DB write according to passed argument + if hook.db_access_level().is_read_write() && + (self.running_hook_with_db_write.is_some() || db_write_active) + { + return Poll::Pending + } + + if let Poll::Ready((event, action)) = hook.poll(cx, args) { + debug!( + target: "consensus::engine::hooks", + hook = hook.name(), + ?action, + ?event, + "Polled next hook" + ); + + if event.is_started() && hook.db_access_level().is_read_write() { + self.running_hook_with_db_write = Some(hook); + } else { + self.hooks.push_back(hook); + } + + if let Some(action) = action { + return Poll::Ready(Ok(action)) + } + } else { + self.hooks.push_back(hook); + } + + Poll::Pending + } + + /// Returns `true` if there's a hook with DB write access running. + pub(crate) fn is_hook_with_db_write_running(&self) -> bool { + self.running_hook_with_db_write.is_some() + } +} diff --git a/crates/consensus/beacon/src/engine/hooks/mod.rs b/crates/consensus/beacon/src/engine/hooks/mod.rs new file mode 100644 index 00000000000..8f0877aa135 --- /dev/null +++ b/crates/consensus/beacon/src/engine/hooks/mod.rs @@ -0,0 +1,128 @@ +use reth_interfaces::sync::SyncState; +use reth_primitives::BlockNumber; +use std::{ + fmt::Debug, + task::{Context, Poll}, +}; + +mod controller; +pub(crate) use controller::EngineHooksController; + +mod prune; +pub use prune::PruneHook; + +/// Collection of [engine hooks][`EngineHook`]. +#[derive(Default)] +pub struct EngineHooks { + inner: Vec>, +} + +impl EngineHooks { + /// Creates a new empty collection of [engine hooks][`EngineHook`]. + pub fn new() -> Self { + Self { inner: Vec::new() } + } + + /// Adds a new [engine hook][`EngineHook`] to the collection. + pub fn add(&mut self, hook: H) { + self.inner.push(Box::new(hook)) + } +} + +/// Hook that will be run during the main loop of +/// [consensus engine][`crate::engine::BeaconConsensusEngine`]. +pub trait EngineHook: Send + Sync + 'static { + /// Returns a human-readable name for the hook. + fn name(&self) -> &'static str; + + /// Advances the hook execution, emitting an [event][`EngineHookEvent`] and an optional + /// [action][`EngineHookAction`]. + fn poll( + &mut self, + cx: &mut Context<'_>, + ctx: EngineContext, + ) -> Poll<(EngineHookEvent, Option)>; + + /// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs. + fn db_access_level(&self) -> EngineHookDBAccessLevel; +} + +/// Engine context passed to the [hook polling function][`EngineHook::poll`]. +#[derive(Copy, Clone, Debug)] +pub struct EngineContext { + /// Tip block number. + pub tip_block_number: BlockNumber, +} + +/// An event emitted when [hook][`EngineHook`] is polled. +#[derive(Debug)] +pub enum EngineHookEvent { + /// Hook is not ready. + /// + /// If this is returned, the hook is idle. + NotReady, + /// Hook started. + /// + /// If this is returned, the hook is running. + Started, + /// Hook finished. + /// + /// If this is returned, the hook is idle. + Finished(Result<(), EngineHookError>), +} + +impl EngineHookEvent { + /// Returns `true` if the event is [`EngineHookEvent::Started`]. + pub fn is_started(&self) -> bool { + matches!(self, Self::Started) + } + + /// Returns `true` if the event is [`EngineHookEvent::Finished`]. + pub fn is_finished(&self) -> bool { + matches!(self, Self::Finished(_)) + } +} + +/// An action that the caller of [hook][`EngineHook`] should act upon. +#[derive(Debug, Copy, Clone)] +pub enum EngineHookAction { + /// Notify about a [SyncState] update. + UpdateSyncState(SyncState), + /// Read the last relevant canonical hashes from the database and update the block indices of + /// the blockchain tree. + RestoreCanonicalHashes, +} + +/// An error returned by [hook][`EngineHook`]. +#[derive(Debug, thiserror::Error)] +pub enum EngineHookError { + /// Hook channel closed. + #[error("Hook channel closed")] + ChannelClosed, + /// Common error. Wrapper around [reth_interfaces::Error]. + #[error(transparent)] + Common(#[from] reth_interfaces::Error), + /// An internal error occurred. + #[error("Internal hook error occurred.")] + Internal(#[from] Box), +} + +/// Level of database access the hook needs for execution. +pub enum EngineHookDBAccessLevel { + /// Read-only database access. + ReadOnly, + /// Read-write database access. + ReadWrite, +} + +impl EngineHookDBAccessLevel { + /// Returns `true` if the hook needs read-only access to the database. + pub fn is_read_only(&self) -> bool { + matches!(self, Self::ReadOnly) + } + + /// Returns `true` if the hook needs read-write access to the database. + pub fn is_read_write(&self) -> bool { + matches!(self, Self::ReadWrite) + } +} diff --git a/crates/consensus/beacon/src/engine/prune.rs b/crates/consensus/beacon/src/engine/hooks/prune.rs similarity index 50% rename from crates/consensus/beacon/src/engine/prune.rs rename to crates/consensus/beacon/src/engine/hooks/prune.rs index 4b2b4852dcc..1650f9f5215 100644 --- a/crates/consensus/beacon/src/engine/prune.rs +++ b/crates/consensus/beacon/src/engine/hooks/prune.rs @@ -1,9 +1,17 @@ -//! Prune management for the engine implementation. +//! Prune hook for the engine implementation. +use crate::{ + engine::hooks::{ + EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHookEvent, + }, + hooks::EngineHookDBAccessLevel, +}; use futures::FutureExt; +use metrics::Counter; use reth_db::database::Database; +use reth_interfaces::sync::SyncState; use reth_primitives::BlockNumber; -use reth_prune::{Pruner, PrunerResult, PrunerWithResult}; +use reth_prune::{Pruner, PrunerError, PrunerWithResult}; use reth_tasks::TaskSpawner; use std::task::{ready, Context, Poll}; use tokio::sync::oneshot; @@ -11,45 +19,67 @@ use tokio::sync::oneshot; /// Manages pruning under the control of the engine. /// /// This type controls the [Pruner]. -pub(crate) struct EnginePruneController { +pub struct PruneHook { /// The current state of the pruner. pruner_state: PrunerState, /// The type that can spawn the pruner task. pruner_task_spawner: Box, + metrics: Metrics, } -impl EnginePruneController { +impl PruneHook { /// Create a new instance - pub(crate) fn new(pruner: Pruner, pruner_task_spawner: Box) -> Self { - Self { pruner_state: PrunerState::Idle(Some(pruner)), pruner_task_spawner } - } - - /// Returns `true` if the pruner is idle. - pub(crate) fn is_pruner_idle(&self) -> bool { - self.pruner_state.is_idle() + pub fn new(pruner: Pruner, pruner_task_spawner: Box) -> Self { + Self { + pruner_state: PrunerState::Idle(Some(pruner)), + pruner_task_spawner, + metrics: Metrics::default(), + } } /// Advances the pruner state. /// /// This checks for the result in the channel, or returns pending if the pruner is idle. - fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll { - let res = match self.pruner_state { + fn poll_pruner( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<(EngineHookEvent, Option)> { + let result = match self.pruner_state { PrunerState::Idle(_) => return Poll::Pending, PrunerState::Running(ref mut fut) => { ready!(fut.poll_unpin(cx)) } }; - let ev = match res { + + let event = match result { Ok((pruner, result)) => { self.pruner_state = PrunerState::Idle(Some(pruner)); - EnginePruneEvent::Finished { result } + + match result { + Ok(_) => EngineHookEvent::Finished(Ok(())), + Err(err) => EngineHookEvent::Finished(Err(match err { + PrunerError::PrunePart(_) | PrunerError::InconsistentData(_) => { + EngineHookError::Internal(Box::new(err)) + } + PrunerError::Interface(err) => err.into(), + PrunerError::Database(err) => reth_interfaces::Error::Database(err).into(), + PrunerError::Provider(err) => reth_interfaces::Error::Provider(err).into(), + })), + } } Err(_) => { // failed to receive the pruner - EnginePruneEvent::TaskDropped + EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed)) } }; - Poll::Ready(ev) + + let action = if matches!(event, EngineHookEvent::Finished(Ok(_))) { + Some(EngineHookAction::RestoreCanonicalHashes) + } else { + None + }; + + Poll::Ready((event, action)) } /// This will try to spawn the pruner if it is idle: @@ -59,7 +89,10 @@ impl EnginePruneController { /// 2b. If pruning is not needed, set pruner state back to [PrunerState::Idle]. /// /// If pruner is already running, do nothing. - fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option { + fn try_spawn_pruner( + &mut self, + tip_block_number: BlockNumber, + ) -> Option<(EngineHookEvent, Option)> { match &mut self.pruner_state { PrunerState::Idle(pruner) => { let mut pruner = pruner.take()?; @@ -74,53 +107,51 @@ impl EnginePruneController { let _ = tx.send((pruner, result)); }), ); + self.metrics.runs.increment(1); self.pruner_state = PrunerState::Running(rx); - Some(EnginePruneEvent::Started(tip_block_number)) + Some(( + EngineHookEvent::Started, + // Engine can't process any FCU/payload messages from CL while we're + // pruning, as pruner needs an exclusive write access to the database. To + // prevent CL from sending us unneeded updates, we need to respond `true` + // on `eth_syncing` request. + Some(EngineHookAction::UpdateSyncState(SyncState::Syncing)), + )) } else { self.pruner_state = PrunerState::Idle(Some(pruner)); - Some(EnginePruneEvent::NotReady) + Some((EngineHookEvent::NotReady, None)) } } PrunerState::Running(_) => None, } } +} + +impl EngineHook for PruneHook { + fn name(&self) -> &'static str { + "Prune" + } - /// Advances the prune process with the tip block number. - pub(crate) fn poll( + fn poll( &mut self, cx: &mut Context<'_>, - tip_block_number: BlockNumber, - ) -> Poll { + ctx: EngineContext, + ) -> Poll<(EngineHookEvent, Option)> { // Try to spawn a pruner - match self.try_spawn_pruner(tip_block_number) { - Some(EnginePruneEvent::NotReady) => return Poll::Pending, - Some(event) => return Poll::Ready(event), + match self.try_spawn_pruner(ctx.tip_block_number) { + Some((EngineHookEvent::NotReady, _)) => return Poll::Pending, + Some((event, action)) => return Poll::Ready((event, action)), None => (), } // Poll pruner and check its status self.poll_pruner(cx) } -} -/// The event type emitted by the [EnginePruneController]. -#[derive(Debug)] -pub(crate) enum EnginePruneEvent { - /// Pruner is not ready - NotReady, - /// Pruner started with tip block number - Started(BlockNumber), - /// Pruner finished - /// - /// If this is returned, the pruner is idle. - Finished { - /// Final result of the pruner run. - result: PrunerResult, - }, - /// Pruner task was dropped after it was started, unable to receive it because channel - /// closed. This would indicate a panicked pruner task - TaskDropped, + fn db_access_level(&self) -> EngineHookDBAccessLevel { + EngineHookDBAccessLevel::ReadWrite + } } /// The possible pruner states within the sync controller. @@ -139,9 +170,9 @@ enum PrunerState { Running(oneshot::Receiver>), } -impl PrunerState { - /// Returns `true` if the state matches idle. - fn is_idle(&self) -> bool { - matches!(self, PrunerState::Idle(_)) - } +#[derive(reth_metrics::Metrics)] +#[metrics(scope = "consensus.engine.prune")] +struct Metrics { + /// The number of times the pruner was run. + runs: Counter, } diff --git a/crates/consensus/beacon/src/engine/metrics.rs b/crates/consensus/beacon/src/engine/metrics.rs index 6daae69eaa7..67bae71be8b 100644 --- a/crates/consensus/beacon/src/engine/metrics.rs +++ b/crates/consensus/beacon/src/engine/metrics.rs @@ -13,8 +13,6 @@ pub(crate) struct EngineMetrics { pub(crate) forkchoice_updated_messages: Counter, /// The total count of new payload messages received. pub(crate) new_payload_messages: Counter, - /// The number of times the pruner was run. - pub(crate) pruner_runs: Counter, /// Latency for making canonical already canonical block pub(crate) make_canonical_already_canonical_latency: Histogram, /// Latency for making canonical committed block diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 71a3577f26f..fac3e350763 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -3,8 +3,8 @@ use crate::{ forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker}, message::OnForkChoiceUpdated, metrics::EngineMetrics, - prune::{EnginePruneController, EnginePruneEvent}, }, + hooks::{EngineContext, EngineHookAction, EngineHooksController}, sync::{EngineSyncController, EngineSyncEvent}, }; use futures::{Future, StreamExt}; @@ -29,7 +29,6 @@ use reth_provider::{ BlockIdReader, BlockReader, BlockSource, CanonChainTracker, ChainSpecProvider, ProviderError, StageCheckpointReader, }; -use reth_prune::Pruner; use reth_rpc_types::engine::{ CancunPayloadFields, ExecutionPayload, PayloadAttributes, PayloadError, PayloadStatus, PayloadStatusEnum, PayloadValidationError, @@ -69,11 +68,17 @@ mod handle; pub use handle::BeaconConsensusEngineHandle; mod forkchoice; +use crate::hooks::EngineHooks; pub use forkchoice::ForkchoiceStatus; + mod metrics; -pub(crate) mod prune; + pub(crate) mod sync; +/// Hooks for running during the main loop of +/// [consensus engine][`crate::engine::BeaconConsensusEngine`]. +pub mod hooks; + #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; @@ -197,8 +202,7 @@ where /// blocks using the pipeline. Otherwise, the engine, sync controller, and blockchain tree will /// be used to download and execute the missing blocks. pipeline_run_threshold: u64, - /// Controls pruning triggered by engine updates. - prune: Option>, + hooks: EngineHooksController, } impl BeaconConsensusEngine @@ -226,7 +230,7 @@ where payload_builder: PayloadBuilderHandle, target: Option, pipeline_run_threshold: u64, - pruner: Option>, + hooks: EngineHooks, ) -> Result<(Self, BeaconConsensusEngineHandle), Error> { let (to_engine, rx) = mpsc::unbounded_channel(); Self::with_channel( @@ -242,7 +246,7 @@ where pipeline_run_threshold, to_engine, rx, - pruner, + hooks, ) } @@ -272,7 +276,7 @@ where pipeline_run_threshold: u64, to_engine: UnboundedSender, rx: UnboundedReceiver, - pruner: Option>, + hooks: EngineHooks, ) -> Result<(Self, BeaconConsensusEngineHandle), Error> { let handle = BeaconConsensusEngineHandle { to_engine }; let sync = EngineSyncController::new( @@ -283,7 +287,6 @@ where max_block, blockchain.chain_spec(), ); - let prune = pruner.map(|pruner| EnginePruneController::new(pruner, task_spawner)); let mut this = Self { sync, blockchain, @@ -296,7 +299,7 @@ where invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), metrics: EngineMetrics::default(), pipeline_run_threshold, - prune, + hooks: EngineHooksController::new(hooks), }; let maybe_pipeline_target = match target { @@ -638,12 +641,12 @@ where return Ok(OnForkChoiceUpdated::syncing()) } - if self.is_prune_active() { - // We can only process new forkchoice updates if the pruner is idle, since it requires - // exclusive access to the database + if self.hooks.is_hook_with_db_write_running() { + // We can only process new forkchoice updates if no hook with db write is running, + // since it requires exclusive access to the database warn!( target: "consensus::engine", - "Pruning is in progress, skipping forkchoice update. \ + "Hook is in progress, skipping forkchoice update. \ This may affect the performance of your node as a validator." ); return Ok(OnForkChoiceUpdated::syncing()) @@ -1083,13 +1086,13 @@ where return Ok(status) } - let res = if self.sync.is_pipeline_idle() && self.is_prune_idle() { - // we can only insert new payloads if the pipeline and the pruner are _not_ running, - // because they hold exclusive access to the database + let res = if self.sync.is_pipeline_idle() && !self.hooks.is_hook_with_db_write_running() { + // we can only insert new payloads if the pipeline and any hook with db write + // are _not_ running, because they hold exclusive access to the database self.try_insert_new_payload(block) } else { - if self.is_prune_active() { - debug!(target: "consensus::engine", "Pruning is in progress, buffering new payload."); + if self.hooks.is_hook_with_db_write_running() { + debug!(target: "consensus::engine", "Hook is in progress, buffering new payload."); } self.try_buffer_payload(block) }; @@ -1226,12 +1229,12 @@ where Ok(()) } - /// When the pipeline or the pruner is active, the tree is unable to commit any additional - /// blocks since the pipeline holds exclusive access to the database. + /// When the pipeline or a hook with DB write access is active, the tree is unable to commit + /// any additional blocks since the pipeline holds exclusive access to the database. /// /// In this scenario we buffer the payload in the tree if the payload is valid, once the - /// pipeline or pruner is finished, the tree is then able to also use the buffered payloads to - /// commit to a (newer) canonical chain. + /// pipeline or a hook with DB write access is finished, the tree is then able to also use the + /// buffered payloads to commit to a (newer) canonical chain. /// /// This will return `SYNCING` if the block was buffered successfully, and an error if an error /// occurred while buffering the block. @@ -1246,7 +1249,7 @@ where /// Attempts to insert a new payload into the tree. /// - /// Caution: This expects that the pipeline and the pruner are idle. + /// Caution: This expects that the pipeline and a hook with DB write access are idle. #[instrument(level = "trace", skip_all, target = "consensus::engine", ret)] fn try_insert_new_payload( &mut self, @@ -1339,14 +1342,6 @@ where Ok(synced_to_finalized) } - /// Attempt to restore the tree. - /// - /// This is invoked after a pruner run to update the tree with the most recent canonical - /// hashes. - fn update_tree_on_finished_pruner(&mut self) -> Result<(), Error> { - self.blockchain.restore_canonical_hashes() - } - /// Invoked if we successfully downloaded a new block from the network. /// /// This will attempt to insert the block into the tree. @@ -1686,72 +1681,20 @@ where None } - /// Event handler for events emitted by the [EnginePruneController]. - /// - /// This returns a result to indicate whether the engine future should resolve (fatal error). - fn on_prune_event( - &mut self, - event: EnginePruneEvent, - ) -> Option> { - match event { - EnginePruneEvent::NotReady => {} - EnginePruneEvent::Started(tip_block_number) => { - trace!(target: "consensus::engine", %tip_block_number, "Pruner started"); - self.metrics.pruner_runs.increment(1); - // Engine can't process any FCU/payload messages from CL while we're pruning, as - // pruner needs an exclusive write access to the database. To prevent CL from - // sending us unneeded updates, we need to respond `true` on `eth_syncing` request. - self.sync_state_updater.update_sync_state(SyncState::Syncing); - } - EnginePruneEvent::TaskDropped => { - error!(target: "consensus::engine", "Failed to receive spawned pruner"); - return Some(Err(BeaconConsensusEngineError::PrunerChannelClosed)) + fn on_hook_action(&self, action: EngineHookAction) -> Result<(), BeaconConsensusEngineError> { + match action { + EngineHookAction::UpdateSyncState(state) => { + self.sync_state_updater.update_sync_state(state) } - EnginePruneEvent::Finished { result } => { - trace!(target: "consensus::engine", ?result, "Pruner finished"); - match result { - Ok(_) => { - // Update the state and hashes of the blockchain tree if possible. - match self.update_tree_on_finished_pruner() { - Ok(()) => {} - Err(error) => { - error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state"); - return Some(Err(error.into())) - } - }; - } - // Any pruner error at this point is fatal. - Err(error) => return Some(Err(error.into())), - }; + EngineHookAction::RestoreCanonicalHashes => { + if let Err(error) = self.blockchain.restore_canonical_hashes() { + error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state"); + return Err(error.into()) + } } - }; - - None - } - - /// Returns `true` if the prune controller's pruner is idle. - fn is_prune_idle(&self) -> bool { - self.prune.as_ref().map(|prune| prune.is_pruner_idle()).unwrap_or(true) - } - - /// Returns `true` if the prune controller's pruner is active. - fn is_prune_active(&self) -> bool { - !self.is_prune_idle() - } - - /// Polls the prune controller, if it exists, and processes the event [`EnginePruneEvent`] - /// emitted by it. - /// - /// Returns [`Option::Some`] if prune controller emitted an event which resulted in the error - /// (see [`Self::on_prune_event`] for error handling) - fn poll_prune( - &mut self, - cx: &mut Context<'_>, - ) -> Option> { - match self.prune.as_mut()?.poll(cx, self.blockchain.canonical_tip().number) { - Poll::Ready(prune_event) => self.on_prune_event(prune_event), - Poll::Pending => None, } + + Ok(()) } } @@ -1783,11 +1726,14 @@ where // Process all incoming messages from the CL, these can affect the state of the // SyncController, hence they are polled first, and they're also time sensitive. loop { - // Poll prune controller first if it's active, as we will not be able to process any - // engine messages until it's finished. - if this.is_prune_active() { - if let Some(res) = this.poll_prune(cx) { - return Poll::Ready(res) + // Poll a running hook with db write access first, as we will not be able to process + // any engine messages until it's finished. + if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write( + cx, + EngineContext { tip_block_number: this.blockchain.canonical_tip().number }, + ) { + if let Err(err) = this.on_hook_action(result?) { + return Poll::Ready(Err(err)) } } @@ -1847,16 +1793,18 @@ where // we're pending if both engine messages and sync events are pending (fully drained) let is_pending = engine_messages_pending && sync_pending; - // Poll prune controller if all conditions are met: - // 1. Pipeline is idle - // 2. No engine and sync messages are pending - // 3. Latest FCU status is not INVALID - if this.sync.is_pipeline_idle() && - is_pending && - !this.forkchoice_state_tracker.is_latest_invalid() - { - if let Some(res) = this.poll_prune(cx) { - return Poll::Ready(res) + // Poll next hook if all conditions are met: + // 1. No engine and sync messages are pending + // 2. Latest FCU status is not INVALID + if is_pending && !this.forkchoice_state_tracker.is_latest_invalid() { + if let Poll::Ready(result) = this.hooks.poll_next_hook( + cx, + EngineContext { tip_block_number: this.blockchain.canonical_tip().number }, + this.sync.is_pipeline_active(), + ) { + if let Err(err) = this.on_hook_action(result?) { + return Poll::Ready(Err(err)) + } } } diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index cf2a47c3e9d..d29d4e42af9 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -1,7 +1,7 @@ use crate::{ - BeaconConsensus, BeaconConsensusEngine, BeaconConsensusEngineError, - BeaconConsensusEngineHandle, BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, - MIN_BLOCKS_FOR_PIPELINE_RUN, + engine::hooks::PruneHook, hooks::EngineHooks, BeaconConsensus, BeaconConsensusEngine, + BeaconConsensusEngineError, BeaconConsensusEngineHandle, BeaconForkChoiceUpdateError, + BeaconOnNewPayloadError, MIN_BLOCKS_FOR_PIPELINE_RUN, }; use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, @@ -518,6 +518,9 @@ where PruneBatchSizes::default(), ); + let mut hooks = EngineHooks::new(); + hooks.add(PruneHook::new(pruner, Box::::default())); + let (mut engine, handle) = BeaconConsensusEngine::new( client, pipeline, @@ -529,7 +532,7 @@ where payload_builder, None, self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN), - Some(pruner), + hooks, ) .expect("failed to create consensus engine"); diff --git a/crates/interfaces/src/sync.rs b/crates/interfaces/src/sync.rs index 622df29a3ca..78cad443d40 100644 --- a/crates/interfaces/src/sync.rs +++ b/crates/interfaces/src/sync.rs @@ -21,7 +21,7 @@ pub trait SyncStateProvider: Send + Sync { /// which point the node is considered fully synced. #[auto_impl::auto_impl(&, Arc, Box)] pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static { - /// Notifies about an [SyncState] update. + /// Notifies about a [SyncState] update. fn update_sync_state(&self, state: SyncState); /// Updates the status of the p2p node @@ -29,7 +29,7 @@ pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static { } /// The state the network is currently in when it comes to synchronization. -#[derive(Clone, Eq, PartialEq, Debug)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum SyncState { /// Node sync is complete. ///