Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b0be50c
feat(engine): hooks
shekhirin Sep 12, 2023
3a2e623
hook arguments
shekhirin Sep 12, 2023
dcc04cd
running hook with db write
shekhirin Sep 12, 2023
df182b7
debug logs
shekhirin Sep 12, 2023
95a5297
helper enum fns
shekhirin Sep 12, 2023
27a14bc
fix vec init
shekhirin Sep 12, 2023
a783551
fix poll logic
shekhirin Sep 12, 2023
6ba1383
don't return pending
shekhirin Sep 12, 2023
ed43ac9
ughh ugly but works
shekhirin Sep 12, 2023
5396010
fix unwrap
shekhirin Sep 12, 2023
08810a4
hook controller
shekhirin Sep 13, 2023
250c889
simplify hooks idx access
shekhirin Sep 13, 2023
6e0b236
move hook idx incr
shekhirin Sep 13, 2023
13a6bd2
change prune hook name
shekhirin Sep 13, 2023
8b5c1e9
reorg crates
shekhirin Sep 13, 2023
ea4297d
hooks collections
shekhirin Sep 13, 2023
1a0d994
improve hook event finished result
shekhirin Sep 13, 2023
9330962
remove is_pipeline_active from args
shekhirin Sep 13, 2023
0de8793
move controller to a separate mod
shekhirin Sep 13, 2023
e5cc193
Merge remote-tracking branch 'origin/main' into alexey/engine-hooks
shekhirin Sep 13, 2023
477e9e3
fix comments
shekhirin Sep 13, 2023
73ee4ae
simplify deps
shekhirin Sep 13, 2023
7c2eaec
doc comments
shekhirin Sep 13, 2023
1b5e4c9
plural
shekhirin Sep 13, 2023
4708854
fix lint
shekhirin Sep 13, 2023
e2b4a5b
better hooks controller docs
shekhirin Sep 13, 2023
2a6fd5c
fix test consensus engine init
shekhirin Sep 13, 2023
f86f4da
vecdeque, rework actions
shekhirin Sep 14, 2023
3b69c3b
dependencies -> db access level
shekhirin Sep 15, 2023
7926ccf
engine hooks, engine context
shekhirin Sep 15, 2023
d5f2101
Merge remote-tracking branch 'origin/main' into alexey/engine-hooks
shekhirin Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{future::Either, pin_mut, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook},
BeaconConsensus, BeaconConsensusEngine, MIN_BLOCKS_FOR_PIPELINE_RUN,
};
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
Expand Down Expand Up @@ -446,16 +449,19 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None
};

let pruner = prune_config.map(|prune_config| {
let mut hooks = EngineHooks::new();

if let Some(prune_config) = prune_config {
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
reth_prune::Pruner::new(
let pruner = reth_prune::Pruner::new(
db.clone(),
self.chain.clone(),
prune_config.block_interval,
prune_config.parts,
self.chain.prune_batch_sizes,
)
});
);
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
}

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
Expand All @@ -471,7 +477,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
MIN_BLOCKS_FOR_PIPELINE_RUN,
consensus_engine_tx,
consensus_engine_rx,
pruner,
hooks,
)?;
info!(target: "reth::cli", "Consensus engine initialized");

Expand Down
6 changes: 3 additions & 3 deletions crates/consensus/beacon/src/engine/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use reth_prune::PrunerError;
use crate::engine::hooks::EngineHookError;
use reth_rpc_types::engine::ForkchoiceUpdateError;
use reth_stages::PipelineError;

Expand All @@ -20,9 +20,9 @@ pub enum BeaconConsensusEngineError {
/// Pruner channel closed.
#[error("Pruner channel closed")]
PrunerChannelClosed,
/// Pruner error.
/// Hook error.
#[error(transparent)]
Pruner(#[from] PrunerError),
Hook(#[from] EngineHookError),
/// Common error. Wrapper around [reth_interfaces::Error].
#[error(transparent)]
Common(#[from] reth_interfaces::Error),
Expand Down
132 changes: 132 additions & 0 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use crate::hooks::{EngineContext, EngineHook, EngineHookAction, EngineHookError, EngineHooks};
use std::{
collections::VecDeque,
task::{Context, Poll},
};
use tracing::debug;

/// Manages hooks under the control of the engine.
///
/// This type polls the initialized hooks one by one, respecting the DB access level
/// (i.e. [crate::hooks::EngineHookDBAccessLevel::ReadWrite] that enforces running at most one such
/// hook).
pub(crate) struct EngineHooksController {
/// Collection of hooks.
///
/// Hooks might be removed from the collection, and returned upon completion.
/// In the current implementation, it only happens when moved to `running_hook_with_db_write`.
hooks: VecDeque<Box<dyn EngineHook>>,
/// Currently running hook with DB write access, if any.
running_hook_with_db_write: Option<Box<dyn EngineHook>>,
}

impl EngineHooksController {
/// Creates a new [`EngineHooksController`].
pub(crate) fn new(hooks: EngineHooks) -> Self {
Self { hooks: hooks.inner.into(), running_hook_with_db_write: None }
}

/// Polls currently running hook with DB write access, if any.
///
/// Returns [`Poll::Ready`] if currently running hook with DB write access returned
/// an [event][`crate::hooks::EngineHookEvent`] that resulted in [action][`EngineHookAction`] or
/// error.
///
/// Returns [`Poll::Pending`] in all other cases:
/// 1. No hook with DB write access is running.
/// 2. Currently running hook with DB write access returned [`Poll::Pending`] on polling.
/// 3. Currently running hook with DB write access returned [`Poll::Ready`] on polling, but no
/// action to act upon.
pub(crate) fn poll_running_hook_with_db_write(
&mut self,
cx: &mut Context<'_>,
args: EngineContext,
) -> Poll<Result<EngineHookAction, EngineHookError>> {
let Some(mut hook) = self.running_hook_with_db_write.take() else { return Poll::Pending };

match hook.poll(cx, args) {
Poll::Ready((event, action)) => {
debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?action,
?event,
"Polled running hook with db write access"
);

if !event.is_finished() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}

if let Some(action) = action {
return Poll::Ready(Ok(action))
}
}
Poll::Pending => {
self.running_hook_with_db_write = Some(hook);
}
}

Poll::Pending
}

/// Polls next engine from the collection.
///
/// Returns [`Poll::Ready`] if next hook returned an [event][`crate::hooks::EngineHookEvent`]
/// that resulted in [action][`EngineHookAction`].
///
/// Returns [`Poll::Pending`] in all other cases:
/// 1. Next hook is [`Option::None`], i.e. taken, meaning it's currently running and has a DB
/// write access.
/// 2. Next hook needs a DB write access, but either there's another hook with DB write access
/// running, or `db_write_active` passed into arguments is `true`.
/// 3. Next hook returned [`Poll::Pending`] on polling.
/// 4. Next hook returned [`Poll::Ready`] on polling, but no action to act upon.
pub(crate) fn poll_next_hook(
&mut self,
cx: &mut Context<'_>,
args: EngineContext,
db_write_active: bool,
) -> Poll<Result<EngineHookAction, EngineHookError>> {
let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending };

// Hook with DB write access level is not allowed to run due to already running hook with DB
// write access level or active DB write according to passed argument
if hook.db_access_level().is_read_write() &&
(self.running_hook_with_db_write.is_some() || db_write_active)
{
return Poll::Pending
}

if let Poll::Ready((event, action)) = hook.poll(cx, args) {
debug!(
target: "consensus::engine::hooks",
hook = hook.name(),
?action,
?event,
"Polled next hook"
);

if event.is_started() && hook.db_access_level().is_read_write() {
self.running_hook_with_db_write = Some(hook);
} else {
self.hooks.push_back(hook);
}

if let Some(action) = action {
return Poll::Ready(Ok(action))
}
} else {
self.hooks.push_back(hook);
}

Poll::Pending
}

/// Returns `true` if there's a hook with DB write access running.
pub(crate) fn is_hook_with_db_write_running(&self) -> bool {
self.running_hook_with_db_write.is_some()
}
}
128 changes: 128 additions & 0 deletions crates/consensus/beacon/src/engine/hooks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use reth_interfaces::sync::SyncState;
use reth_primitives::BlockNumber;
use std::{
fmt::Debug,
task::{Context, Poll},
};

mod controller;
pub(crate) use controller::EngineHooksController;

mod prune;
pub use prune::PruneHook;

/// Collection of [engine hooks][`EngineHook`].
#[derive(Default)]
pub struct EngineHooks {
inner: Vec<Box<dyn EngineHook>>,
}

impl EngineHooks {
/// Creates a new empty collection of [engine hooks][`EngineHook`].
pub fn new() -> Self {
Self { inner: Vec::new() }
}

/// Adds a new [engine hook][`EngineHook`] to the collection.
pub fn add<H: EngineHook>(&mut self, hook: H) {
self.inner.push(Box::new(hook))
}
}

/// Hook that will be run during the main loop of
/// [consensus engine][`crate::engine::BeaconConsensusEngine`].
pub trait EngineHook: Send + Sync + 'static {
/// Returns a human-readable name for the hook.
fn name(&self) -> &'static str;

/// Advances the hook execution, emitting an [event][`EngineHookEvent`] and an optional
/// [action][`EngineHookAction`].
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<(EngineHookEvent, Option<EngineHookAction>)>;

/// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs.
fn db_access_level(&self) -> EngineHookDBAccessLevel;
}

/// Engine context passed to the [hook polling function][`EngineHook::poll`].
#[derive(Copy, Clone, Debug)]
pub struct EngineContext {
/// Tip block number.
pub tip_block_number: BlockNumber,
}

/// An event emitted when [hook][`EngineHook`] is polled.
#[derive(Debug)]
pub enum EngineHookEvent {
/// Hook is not ready.
///
/// If this is returned, the hook is idle.
NotReady,
/// Hook started.
///
/// If this is returned, the hook is running.
Started,
/// Hook finished.
///
/// If this is returned, the hook is idle.
Finished(Result<(), EngineHookError>),
}

impl EngineHookEvent {
/// Returns `true` if the event is [`EngineHookEvent::Started`].
pub fn is_started(&self) -> bool {
matches!(self, Self::Started)
}

/// Returns `true` if the event is [`EngineHookEvent::Finished`].
pub fn is_finished(&self) -> bool {
matches!(self, Self::Finished(_))
}
}

/// An action that the caller of [hook][`EngineHook`] should act upon.
#[derive(Debug, Copy, Clone)]
pub enum EngineHookAction {
/// Notify about a [SyncState] update.
UpdateSyncState(SyncState),
/// Read the last relevant canonical hashes from the database and update the block indices of
/// the blockchain tree.
RestoreCanonicalHashes,
}

/// An error returned by [hook][`EngineHook`].
#[derive(Debug, thiserror::Error)]
pub enum EngineHookError {
/// Hook channel closed.
#[error("Hook channel closed")]
ChannelClosed,
/// Common error. Wrapper around [reth_interfaces::Error].
#[error(transparent)]
Common(#[from] reth_interfaces::Error),
/// An internal error occurred.
#[error("Internal hook error occurred.")]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
}

/// Level of database access the hook needs for execution.
pub enum EngineHookDBAccessLevel {
/// Read-only database access.
ReadOnly,
/// Read-write database access.
ReadWrite,
}

impl EngineHookDBAccessLevel {
/// Returns `true` if the hook needs read-only access to the database.
pub fn is_read_only(&self) -> bool {
matches!(self, Self::ReadOnly)
}

/// Returns `true` if the hook needs read-write access to the database.
pub fn is_read_write(&self) -> bool {
matches!(self, Self::ReadWrite)
}
}
Loading