Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 89 additions & 21 deletions crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::pipeline::{PipelineAction, PipelineEvent, PipelineHandler};
use futures::Stream;
use reth_primitives::stage::PipelineTarget;
use std::{
pin::Pin,
task::{Context, Poll},
Expand All @@ -15,8 +17,8 @@ use std::{
/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
/// handler. However, due to database restrictions (e.g. exclusive write access), following
/// invariants apply:
/// - If the handler requests a pipeline run (e.g. [`PipelineAction::SyncPipeline`]), the handler
/// must ensure that while the pipeline is running, no other write access is granted.
/// - If the handler requests a pipeline run (e.g. [`PipelineAction::Start`]), the handler must
/// ensure that while the pipeline is running, no other write access is granted.
/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the
/// request for write access.
Expand All @@ -26,21 +28,23 @@ use std::{
/// [`ChainHandler::on_event`].
#[must_use = "Stream does nothing unless polled"]
#[derive(Debug)]
pub struct ChainOrchestrator<T>
pub struct ChainOrchestrator<T, P>
where
T: ChainHandler,
P: PipelineHandler,
{
/// The handler for advancing the chain.
handler: T,
/// Controls pipeline sync.
pipeline: (),
pipeline: P,
/// Additional hooks (e.g. pruning) that can require exclusive access to the database.
hooks: (),
}

impl<T> ChainOrchestrator<T>
impl<T, P> ChainOrchestrator<T, P>
where
T: ChainHandler,
T: ChainHandler + Unpin,
P: PipelineHandler + Unpin,
{
/// Returns the handler
pub const fn handler(&self) -> &T {
Expand All @@ -57,13 +61,71 @@ where
/// Polls the `ChainOrchestrator` for the next event.
#[tracing::instrument(level = "debug", name = "ChainOrchestrator::poll", skip(self, cx))]
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ChainEvent> {
todo!("do we need this?")
let this = self.get_mut();

// This loop polls the components
//
// 1. Polls the pipeline to completion, if active.
// 2. Advances the chain by polling the handler.
'outer: loop {
// try to poll the pipeline to completion, if active
match this.pipeline.poll(cx) {
Poll::Ready(pipeline_event) => match pipeline_event {
PipelineEvent::Idle => {}
PipelineEvent::Started(_) => {
// notify handler that pipeline started
this.handler.on_event(FromOrchestrator::PipelineStarted);
return Poll::Ready(ChainEvent::PipelineStarted);
}
PipelineEvent::Finished(res) => {
return match res {
Ok(event) => {
tracing::debug!(?event, "pipeline finished");
// notify handler that pipeline finished
this.handler.on_event(FromOrchestrator::PipelineFinished);
Poll::Ready(ChainEvent::PipelineFinished)
}
Err(err) => {
tracing::error!( %err, "pipeline failed");
Poll::Ready(ChainEvent::FatalError)
}
}
}
},
Poll::Pending => {}
}

// drain the handler
loop {
// poll the handler for the next event
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(PipelineAction::Start(target));
continue 'outer
}
HandlerEvent::WriteAccessPaused => {}
HandlerEvent::WriteAccessAcquired => {}
}
}
Poll::Pending => {
// no more events to process
break 'outer
}
}
}
}

Poll::Pending
}
}

impl<T> Stream for ChainOrchestrator<T>
impl<T, P> Stream for ChainOrchestrator<T, P>
where
T: ChainHandler,
T: ChainHandler + Unpin,
P: PipelineHandler + Unpin,
{
type Item = ChainEvent;

Expand All @@ -72,13 +134,25 @@ where
}
}

/// Represents the sync mode the chain is operating in.
#[derive(Debug, Default)]
enum SyncMode {
#[default]
Handler,
Pipeline,
}

/// Event emitted by the [`ChainOrchestrator`]
///
/// These are meant to be used for observability and debugging purposes.
#[derive(Debug)]
pub enum ChainEvent {
/// Pipeline sync started
PipelineStarted,
/// Pipeline sync finished
PipelineFinished,
/// Fatal error
FatalError,
}

/// A trait that advances the chain by handling actions.
Expand All @@ -89,25 +163,17 @@ pub trait ChainHandler: Send + Sync {
fn on_event(&mut self, event: FromOrchestrator);

/// Polls for actions that [`ChainOrchestrator`] should handle.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<HandlerEvent>;
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent>;
}

/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent {
Pipeline(PipelineAction),
Pipeline(PipelineTarget),
/// Ack paused write access to the database
WriteAccessPaused,
/// Operating in write-access mode
WriteAccess,
}

#[derive(Clone, Debug)]
pub enum PipelineAction {
/// Start pipeline sync
SyncPipeline,
/// Unwind via the pipeline
UnwindPipeline,
WriteAccessAcquired,
}

/// Internal events issued by the [`ChainOrchestrator`].
Expand All @@ -118,7 +184,9 @@ pub enum FromOrchestrator {
/// Orchestrator no longer requires exclusive write access to the database.
ReleaseWriteHookAccess,
/// Invoked when pipeline sync finished
OnPipelineOutcome,
PipelineFinished,
/// Invoked when pipeline started
PipelineStarted,
}

/// Represents the state of the chain.
Expand Down
2 changes: 1 addition & 1 deletion crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ where
todo!()
}

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<HandlerEvent> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<HandlerEvent> {
todo!()
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ pub use reth_blockchain_tree_api::*;
pub mod chain;
/// Engine Api chain handler support.
pub mod engine;
/// Support for managing the pipeline.
pub mod pipeline;
/// Support for interacting with the blockchain tree.
pub mod tree;
40 changes: 40 additions & 0 deletions crates/engine/tree/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! It is expected that the node has two sync modes:
//!
//! - Pipeline sync: Sync to a certain block height in stages, e.g. download data from p2p then
//! execute that range.
//! - Live sync: In this mode the nodes is keeping up with the latest tip and listens for new
//! requests from the consensus client.
//!
//! These modes are mutually exclusive and the node can only be in one mode at a time.

use reth_primitives::stage::PipelineTarget;
use reth_stages_api::{ControlFlow, PipelineError};
use std::task::{Context, Poll};

/// A handler for the pipeline.
pub trait PipelineHandler: Send + Sync {
/// Performs an action on the pipeline.
fn on_action(&mut self, event: PipelineAction);

/// Polls the pipeline for completion.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PipelineEvent>;
}

/// The actions that can be performed on the pipeline.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PipelineAction {
/// Start the pipeline with the given target.
Start(PipelineTarget),
}

/// The events that can be emitted by the pipeline.
#[derive(Debug)]
pub enum PipelineEvent {
Idle,
/// Pipeline started syncing
Started(PipelineTarget),
/// Pipeline finished
///
/// If this is returned, the pipeline is idle.
Finished(Result<ControlFlow, PipelineError>),
}
2 changes: 1 addition & 1 deletion crates/engine/tree/src/tree.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{chain::PipelineAction, engine::DownloadRequest};
use crate::{engine::DownloadRequest, pipeline::PipelineAction};
use parking_lot::{Mutex, MutexGuard, RwLock};
use reth_beacon_consensus::{ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated};
use reth_blockchain_tree::BlockBuffer;
Expand Down