diff --git a/crates/node/service/src/actors/derivation/actor.rs b/crates/node/service/src/actors/derivation/actor.rs index c4f7000176..265c1580b2 100644 --- a/crates/node/service/src/actors/derivation/actor.rs +++ b/crates/node/service/src/actors/derivation/actor.rs @@ -1,31 +1,17 @@ //! [NodeActor] implementation for the derivation sub-routine. use crate::{ - InteropMode, Metrics, NodeActor, - actors::{ - CancellableContext, - derivation::{DerivationEngineClient, L2Finalizer}, - }, + CancellableContext, Metrics, NodeActor, + actors::derivation::{DerivationActorRequest, DerivationEngineClient, L2Finalizer}, }; -use alloy_provider::RootProvider; use async_trait::async_trait; use kona_derive::{ ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal, SignalReceiver, StepResult, }; -use kona_genesis::{L1ChainConfig, RollupConfig}; -use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; -use kona_providers_alloy::{ - AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider, - OnlinePipeline, -}; -use op_alloy_network::Optimism; -use std::{marker::PhantomData, sync::Arc}; +use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; use thiserror::Error; -use tokio::{ - select, - sync::{mpsc, oneshot, watch}, -}; +use tokio::{select, sync::mpsc}; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// The [NodeActor] for the derivation sub-routine. @@ -34,172 +20,42 @@ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// derivation pipeline forward to produce new payload attributes. The actor then sends the payload /// to the [NodeActor] responsible for the execution sub-routine. #[derive(Debug)] -pub struct DerivationActor +pub struct DerivationActor where DerivationEngineClient_: DerivationEngineClient, - PipelineBuilder_: PipelineBuilder, + PipelineSignalReceiver: Pipeline + SignalReceiver, { /// The cancellation token, shared between all tasks. cancellation_token: CancellationToken, - /// The state for the derivation actor. - state: PipelineBuilder_, - /// The receiver for L1 head update notifications. - l1_head_updates: watch::Receiver>, - /// The receiver for L2 safe head update notifications. - engine_l2_safe_head: watch::Receiver, - /// A receiver used by the engine to signal derivation to begin. Completing EL sync consumes - /// the instance. - el_sync_complete_rx: oneshot::Receiver<()>, - /// A receiver that sends a [`Signal`] to the derivation pipeline. - /// - /// The derivation actor steps over the derivation pipeline to generate - /// [`OpAttributesWithParent`]. These attributes then need to be executed - /// via the engine api, which is done by sending them to the engine. - /// - /// When the engine api receives an `INVALID` response for a new block ( - /// the new [`OpAttributesWithParent`]) during block building, the payload - /// is reduced to "deposits-only". When this happens, the channel and - /// remaining buffered batches need to be flushed out of the derivation - /// pipeline. - /// - /// This channel allows the engine to send a [`Signal::FlushChannel`] - /// message back to the derivation pipeline when an `INVALID` response - /// occurs. - /// - /// Specs: - derivation_signal_rx: mpsc::Receiver, - - /// The receiver for L1 finalized block notifications. - l1_finalized_updates: watch::Receiver>, - + /// The channel on which all inbound requests are received by the [`DerivationActor`]. + inbound_request_rx: mpsc::Receiver, /// The Engine client used to interact with the engine. engine_client: DerivationEngineClient_, -} - -/// The state for the derivation actor. -#[derive(Debug)] -pub struct DerivationState -where - DerivationEngineClient_: DerivationEngineClient, - PipelineSignalReceiver: Pipeline + SignalReceiver, -{ /// The derivation pipeline. - pub pipeline: PipelineSignalReceiver, - /// A flag indicating whether or not derivation is idle. Derivation is considered idle when it + pipeline: PipelineSignalReceiver, + + /// The engine's L2 safe head, according to updates from the Engine. + engine_l2_safe_head: L2BlockInfo, + /// Whether we are waiting on the engine to acknowledge the last derived attributes + awaiting_engine_l2_safe_head_update: bool, + + /// A flag indicating whether derivation is idle. Derivation is considered idle when it /// has yielded to wait for more data on the DAL. - pub derivation_idle: bool, - /// A flag indicating whether or not derivation is waiting for a signal. When waiting for a - /// signal, derivation cannot process any incoming events. - pub waiting_for_signal: bool, + derivation_idle: bool, /// The [`L2Finalizer`] tracks derived L2 blocks awaiting finalization. pub(crate) finalizer: L2Finalizer, - - phantom: PhantomData, -} - -/// The size of the cache used in the derivation pipeline's providers. -const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024; - -/// A trait for building derivation pipelines. -#[async_trait] -pub trait PipelineBuilder: - Send + Sync + 'static -{ - /// The type of pipeline to build. - type Pipeline: Pipeline + SignalReceiver + Send + Sync + 'static; - - /// Builds the derivation pipeline. - async fn build(self) -> DerivationState; -} - -/// The configuration necessary to build the derivation actor. -#[derive(Debug)] -pub struct DerivationBuilder { - /// The L1 provider. - pub l1_provider: RootProvider, - /// Whether to trust the L1 RPC. - pub l1_trust_rpc: bool, - /// The L1 beacon client. - pub l1_beacon: OnlineBeaconClient, - /// The L2 provider. - pub l2_provider: RootProvider, - /// Whether to trust the L2 RPC. - pub l2_trust_rpc: bool, - /// The rollup config. - pub rollup_config: Arc, - /// The L1 chain configuration. - pub l1_config: Arc, - /// The interop mode. - pub interop_mode: InteropMode, -} - -#[async_trait] -impl PipelineBuilder for DerivationBuilder -where - DerivationEngineClient_: DerivationEngineClient + 'static, -{ - type Pipeline = OnlinePipeline; - - async fn build(self) -> DerivationState { - // Create the caching L1/L2 EL providers for derivation. - let l1_derivation_provider = AlloyChainProvider::new_with_trust( - self.l1_provider.clone(), - DERIVATION_PROVIDER_CACHE_SIZE, - self.l1_trust_rpc, - ); - let l2_derivation_provider = AlloyL2ChainProvider::new_with_trust( - self.l2_provider.clone(), - self.rollup_config.clone(), - DERIVATION_PROVIDER_CACHE_SIZE, - self.l2_trust_rpc, - ); - - let pipeline = match self.interop_mode { - InteropMode::Polled => OnlinePipeline::new_polled( - self.rollup_config.clone(), - self.l1_config.clone(), - OnlineBlobProvider::init(self.l1_beacon.clone()).await, - l1_derivation_provider, - l2_derivation_provider, - ), - InteropMode::Indexed => OnlinePipeline::new_indexed( - self.rollup_config.clone(), - self.l1_config.clone(), - OnlineBlobProvider::init(self.l1_beacon.clone()).await, - l1_derivation_provider, - l2_derivation_provider, - ), - }; - - DerivationState::new(pipeline) - } -} - -/// The inbound channels for the derivation actor. -/// These channels are used to send messages to the derivation actor by other actors. -#[derive(Debug)] -pub struct DerivationInboundChannels { - /// The sender for L1 head update notifications. - pub l1_head_updates_tx: watch::Sender>, - /// The sender for L1 finalized block notifications. - pub l1_finalized_updates_tx: watch::Sender>, - /// The sender for L2 safe head update notifications. - pub engine_l2_safe_head_tx: watch::Sender, - /// A sender used by the engine to signal derivation to begin. Completing EL sync consumes the - /// instance. - pub el_sync_complete_tx: oneshot::Sender<()>, - /// A sender that sends a [`Signal`] to the derivation pipeline. - /// - /// This channel should be used by the engine actor to send [`Signal`]s to the derivation - /// pipeline. The signals are received by `DerivationActor::derivation_signal_rx`. - pub derivation_signal_tx: mpsc::Sender, + /// A flag indicating whether derivation is waiting for a signal. When waiting for a + /// signal, derivation cannot process any incoming events. + waiting_for_signal: bool, + /// Whether the engine sync has completed. This will only ever go from false -> true. + has_engine_sync_completed: bool, } -impl CancellableContext - for DerivationActor +impl CancellableContext + for DerivationActor where DerivationEngineClient_: DerivationEngineClient, - PipelineBuilder_: PipelineBuilder, + PipelineSignalReceiver: Pipeline + SignalReceiver + Send + Sync, { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation_token.cancelled() @@ -207,19 +63,29 @@ where } impl - DerivationState + DerivationActor where - DerivationEngineClient_: DerivationEngineClient + 'static, + DerivationEngineClient_: DerivationEngineClient, PipelineSignalReceiver: Pipeline + SignalReceiver, { - /// Creates a new instance of the [DerivationState]. - pub fn new(pipeline: PipelineSignalReceiver) -> Self { + /// Creates a new instance of the [DerivationActor]. + pub fn new( + engine_client: DerivationEngineClient_, + cancellation_token: CancellationToken, + inbound_request_rx: mpsc::Receiver, + pipeline: PipelineSignalReceiver, + ) -> Self { Self { + cancellation_token, pipeline, + inbound_request_rx, + engine_client, derivation_idle: true, waiting_for_signal: false, + engine_l2_safe_head: L2BlockInfo::default(), + awaiting_engine_l2_safe_head_update: false, + has_engine_sync_completed: false, finalizer: L2Finalizer::default(), - phantom: PhantomData, } } @@ -241,17 +107,12 @@ where /// Attempts to step the derivation pipeline forward as much as possible in order to produce the /// next safe payload. - async fn produce_next_attributes( - &mut self, - engine_l2_safe_head: &watch::Receiver, - engine_client: &DerivationEngineClient_, - ) -> Result { + async fn produce_next_attributes(&mut self) -> Result { // As we start the safe head at the disputed block's parent, we step the pipeline until the // first attributes are produced. All batches at and before the safe head will be // dropped, so the first payload will always be the disputed one. loop { - let l2_safe_head = *engine_l2_safe_head.borrow(); - match self.pipeline.step(l2_safe_head).await { + match self.pipeline.step(self.engine_l2_safe_head).await { StepResult::PreparedAttributes => { /* continue; attributes will be sent off. */ } StepResult::AdvancedOrigin => { let origin = @@ -280,7 +141,7 @@ where let system_config = self .pipeline - .system_config_by_number(l2_safe_head.block_info.number) + .system_config_by_number(self.engine_l2_safe_head.block_info.number) .await?; if matches!(e, ResetError::HoloceneActivation) { @@ -292,7 +153,7 @@ where self.pipeline .signal( ActivationSignal { - l2_safe_head, + l2_safe_head: self.engine_l2_safe_head, l1_origin, system_config: Some(system_config), } @@ -310,12 +171,10 @@ where } // send the `reset` signal to the engine actor only when interop is // not active. - if !self - .pipeline - .rollup_config() - .is_interop_active(l2_safe_head.block_info.timestamp) - { - engine_client.reset_engine_forkchoice().await.map_err(|e| { + if !self.pipeline.rollup_config().is_interop_active( + self.engine_l2_safe_head.block_info.timestamp, + ) { + self.engine_client.reset_engine_forkchoice().await.map_err(|e| { error!(target: "derivation", ?e, "Failed to send reset request"); DerivationError::Sender(Box::new(e)) })?; @@ -340,6 +199,56 @@ where } } + async fn handle_derivation_actor_request( + &mut self, + request_type: DerivationActorRequest, + ) -> Result<(), DerivationError> { + match request_type { + DerivationActorRequest::ProcessEngineSignalRequest(signal) => { + self.signal(*signal).await; + self.waiting_for_signal = false; + } + DerivationActorRequest::ProcessFinalizedL1Block(finalized_l1_block) => { + // Attempt to finalize the block. If successful, notify engine. + if let Some(l2_block_number) = self.finalizer.try_finalize_next(*finalized_l1_block) + { + self.engine_client + .send_finalized_l2_block(l2_block_number) + .await + .map_err(|e| DerivationError::Sender(Box::new(e)))?; + } + } + DerivationActorRequest::ProcessL1HeadUpdateRequest(l1_head) => { + info!(target: "derivation", l1_head = ?*l1_head, "Processing l1 head update"); + + // If derivation isn't idle and the message hasn't observed a safe head update + // already, check if the safe head has changed before continuing. + // This is to prevent attempts to progress the pipeline while it is + // in the middle of processing a channel. + if !self.derivation_idle && self.awaiting_engine_l2_safe_head_update { + info!(target: "derivation", "Safe head hasn't changed, skipping derivation."); + } else { + self.attempt_derivation().await?; + } + } + DerivationActorRequest::ProcessEngineSafeHeadUpdateRequest(safe_head) => { + info!(target: "derivation", safe_head = ?*safe_head, "Received safe head from engine."); + self.engine_l2_safe_head = *safe_head; + self.awaiting_engine_l2_safe_head_update = false; + + self.attempt_derivation().await?; + } + DerivationActorRequest::ProcessEngineSyncCompletionRequest => { + info!(target: "derivation", "Engine finished syncing, starting derivation."); + self.has_engine_sync_completed = true; + + self.attempt_derivation().await?; + } + } + + Ok(()) + } + /// Attempts to process the next payload attributes. /// /// There are a few constraints around stepping on the derivation pipeline. @@ -352,73 +261,44 @@ where /// attributes are successfully produced. If the pipeline step errors, /// the same [`L2BlockInfo`] is used again. If the [`L2BlockInfo`] is the /// zero hash, the pipeline is not stepped on. - async fn process( - &mut self, - msg: InboundDerivationMessage, - engine_l2_safe_head: &mut watch::Receiver, - el_sync_complete_rx: &oneshot::Receiver<()>, - engine_client: &DerivationEngineClient_, - ) -> Result<(), DerivationError> { - // Only attempt derivation once the engine finishes syncing. - if !el_sync_complete_rx.is_terminated() { - trace!(target: "derivation", "Engine not ready, skipping derivation"); + async fn attempt_derivation(&mut self) -> Result<(), DerivationError> { + if !self.has_engine_sync_completed { + info!(target: "derivation", "Engine sync has not completed, skipping derivation"); return Ok(()); } else if self.waiting_for_signal { - trace!(target: "derivation", "Waiting to receive a signal, skipping derivation"); + info!(target: "derivation", "Waiting to receive a signal, skipping derivation"); return Ok(()); - } - - // If derivation isn't idle and the message hasn't observed a safe head update already, - // check if the safe head has changed before continuing. This is to prevent attempts to - // progress the pipeline while it is in the middle of processing a channel. - if !(self.derivation_idle || msg == InboundDerivationMessage::SafeHeadUpdated) { - match engine_l2_safe_head.has_changed() { - Ok(true) => { /* Proceed to produce next payload attributes. */ } - Ok(false) => { - trace!(target: "derivation", "Safe head hasn't changed, skipping derivation."); - return Ok(()); - } - Err(e) => { - error!(target: "derivation", ?e, "Failed to check if safe head has changed"); - return Err(DerivationError::L2SafeHeadReceiveFailed); - } - } - } - - // Wait for the engine to initialize unknowns prior to kicking off derivation. - let engine_safe_head = *engine_l2_safe_head.borrow(); - if engine_safe_head.block_info.hash.is_zero() { - warn!(target: "derivation", engine_safe_head = ?engine_safe_head.block_info.number, "Waiting for engine to initialize state prior to derivation."); + } else if self.engine_l2_safe_head.block_info.hash.is_zero() { + warn!(target: "derivation", engine_safe_head = ?self.engine_l2_safe_head.block_info.number, "Waiting for engine to initialize state prior to derivation."); return Ok(()); } + trace!(target: "derivation", "Attempting derivation."); // Advance the pipeline as much as possible, new data may be available or there still may be // payloads in the attributes queue. - let payload_attrs = - match self.produce_next_attributes(engine_l2_safe_head, engine_client).await { - Ok(attrs) => attrs, - Err(DerivationError::Yield) => { - // Yield until more data is available. - self.derivation_idle = true; - return Ok(()); - } - Err(e) => { - return Err(e); - } - }; + let payload_attributes = match self.produce_next_attributes().await { + Ok(attrs) => attrs, + Err(DerivationError::Yield) => { + info!(target: "derivation", "Yielding derivation until more data is available."); + self.derivation_idle = true; + return Ok(()); + } + Err(e) => { + return Err(e); + } + }; + trace!(target: "derivation", ?payload_attributes, "Produced payload attributes."); // Mark derivation as busy. self.derivation_idle = false; - - // Mark the L2 safe head as seen. - engine_l2_safe_head.borrow_and_update(); + self.awaiting_engine_l2_safe_head_update = true; // Enqueue the payload attributes for finalization tracking. - self.finalizer.enqueue_for_finalization(&payload_attrs); + self.finalizer.enqueue_for_finalization(&payload_attributes); // Send payload attributes out for processing. - engine_client - .send_derived_attributes(payload_attrs) + self.engine_client + .send_derived_attributes(payload_attributes) .await .map_err(|e| DerivationError::Sender(Box::new(e)))?; @@ -426,61 +306,17 @@ where } } -impl - DerivationActor -where - DerivationEngineClient_: DerivationEngineClient, - PipelineBuilder_: PipelineBuilder, -{ - /// Creates a new instance of the [DerivationActor]. - pub fn new( - engine_client: DerivationEngineClient_, - cancellation_token: CancellationToken, - state: PipelineBuilder_, - ) -> (DerivationInboundChannels, Self) { - let (l1_head_updates_tx, l1_head_updates_rx) = watch::channel(None); - let (engine_l2_safe_head_tx, engine_l2_safe_head_rx) = - watch::channel(L2BlockInfo::default()); - let (el_sync_complete_tx, el_sync_complete_rx) = oneshot::channel(); - let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16); - let (l1_finalized_updates_tx, l1_finalized_updates_rx) = watch::channel(None); - let actor = Self { - cancellation_token, - state, - l1_head_updates: l1_head_updates_rx, - l1_finalized_updates: l1_finalized_updates_rx, - engine_l2_safe_head: engine_l2_safe_head_rx, - el_sync_complete_rx, - derivation_signal_rx, - engine_client, - }; - - ( - DerivationInboundChannels { - l1_head_updates_tx, - l1_finalized_updates_tx, - engine_l2_safe_head_tx, - el_sync_complete_tx, - derivation_signal_tx, - }, - actor, - ) - } -} - #[async_trait] -impl NodeActor - for DerivationActor +impl NodeActor + for DerivationActor where DerivationEngineClient_: DerivationEngineClient + 'static, - PipelineBuilder_: PipelineBuilder, + PipelineSignalReceiver: Pipeline + SignalReceiver + Send + Sync + 'static, { type Error = DerivationError; type StartData = (); async fn start(mut self, _: Self::StartData) -> Result<(), Self::Error> { - let mut state = self.state.build().await; - loop { select! { biased; @@ -492,67 +328,20 @@ where ); return Ok(()); } - signal = self.derivation_signal_rx.recv() => { - let Some(signal) = signal else { - error!( - target: "derivation", - ?signal, - "DerivationActor failed to receive signal" - ); - return Err(DerivationError::SignalReceiveFailed); + req = self.inbound_request_rx.recv() => { + let Some(request_type) = req else { + error!(target: "derivation", "DerivationActor inbound request receiver closed unexpectedly"); + self.cancellation_token.cancel(); + return Err(DerivationError::RequestReceiveFailed); }; - state.signal(signal).await; - state.waiting_for_signal = false; - } - msg = self.l1_head_updates.changed() => { - if let Err(err) = msg { - error!( - target: "derivation", - ?err, - "L1 head update stream closed without cancellation. Exiting derivation task." - ); - return Ok(()); - } - - state.process(InboundDerivationMessage::NewDataAvailable, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &self.engine_client).await?; - } - _ = self.engine_l2_safe_head.changed() => { - state.process(InboundDerivationMessage::SafeHeadUpdated, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &self.engine_client).await?; - } - _ = self.l1_finalized_updates.changed() => { - // Extract the value before awaiting to avoid holding the borrow across await. - let finalized_l1_block = *self.l1_finalized_updates.borrow_and_update(); - if let Some(finalized_l1_block) = finalized_l1_block { - // Attempt to finalize L2 blocks when a new finalized L1 block is received. - if let Some(l2_block_number) = state.finalizer.try_finalize_next(finalized_l1_block) { - self.engine_client - .send_finalized_l2_block(l2_block_number) - .await - .map_err(|e| DerivationError::Sender(Box::new(e)))?; - } - } - } - _ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => { - info!(target: "derivation", "Engine finished syncing, starting derivation."); - // Optimistically process the first message. - state.process(InboundDerivationMessage::NewDataAvailable, &mut self.engine_l2_safe_head, &self.el_sync_complete_rx, &self.engine_client).await?; + self.handle_derivation_actor_request(request_type).await?; } } } } } -/// Messages that the [DerivationActor] can receive from other actors. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum InboundDerivationMessage { - /// New data is potentially available for processing on the data availability layer. - NewDataAvailable, - /// The engine has updated its safe head. An attempt to process the next payload attributes can - /// be made. - SafeHeadUpdated, -} - /// An error from the [DerivationActor]. #[derive(Error, Debug)] pub enum DerivationError { @@ -565,10 +354,7 @@ pub enum DerivationError { /// An error originating from the broadcast sender. #[error("Failed to send event to broadcast sender: {0}")] Sender(Box), - /// An error from the signal receiver. - #[error("Failed to receive signal")] - SignalReceiveFailed, - /// Unable to receive the L2 safe head to step on the pipeline. - #[error("Failed to receive L2 safe head")] - L2SafeHeadReceiveFailed, + /// Failed to receive inbound request + #[error("Failed to receive inbound request")] + RequestReceiveFailed, } diff --git a/crates/node/service/src/actors/derivation/engine_client.rs b/crates/node/service/src/actors/derivation/engine_client.rs index 0ce5654f72..016ef9a4c4 100644 --- a/crates/node/service/src/actors/derivation/engine_client.rs +++ b/crates/node/service/src/actors/derivation/engine_client.rs @@ -1,5 +1,6 @@ use crate::{EngineActorRequest, EngineClientError, EngineClientResult, ResetRequest}; use async_trait::async_trait; +use derive_more::Constructor; use kona_protocol::OpAttributesWithParent; use std::fmt::Debug; use tokio::sync::mpsc; @@ -24,7 +25,7 @@ pub trait DerivationEngineClient: Debug + Send + Sync { } /// Client to use to send messages to the Engine Actor's inbound channel. -#[derive(Debug)] +#[derive(Constructor, Debug)] pub struct QueuedDerivationEngineClient { /// A channel to use to send the [`EngineActorRequest`]s to the EngineActor. pub engine_actor_request_tx: mpsc::Sender, @@ -35,21 +36,27 @@ impl DerivationEngineClient for QueuedDerivationEngineClient { async fn reset_engine_forkchoice(&self) -> EngineClientResult<()> { let (result_tx, mut result_rx) = mpsc::channel(1); + info!(target: "derivation", "Sending reset request to engine."); self.engine_actor_request_tx .send(EngineActorRequest::ResetRequest(Box::new(ResetRequest { result_tx }))) .await .map_err(|_| EngineClientError::RequestError("request channel closed.".to_string()))?; - result_rx.recv().await.ok_or_else(|| { - error!(target: "derivation_engine_client", "Failed to receive built payload"); - EngineClientError::ResponseError("response channel closed.".to_string()) - })? + result_rx + .recv() + .await + .inspect(|_| info!(target: "derivation", "Engine reset successfully.")) + .ok_or_else(|| { + error!(target: "derivation_engine_client", "Failed to receive built payload"); + EngineClientError::ResponseError("response channel closed.".to_string()) + })? } async fn send_derived_attributes( &self, attributes: OpAttributesWithParent, ) -> EngineClientResult<()> { + trace!(target: "derivation", ?attributes, "Sending derived attributes to engine."); self.engine_actor_request_tx .send(EngineActorRequest::ProcessDerivedL2AttributesRequest(Box::new(attributes))) .await @@ -59,8 +66,9 @@ impl DerivationEngineClient for QueuedDerivationEngineClient { } async fn send_finalized_l2_block(&self, block_number: u64) -> EngineClientResult<()> { + trace!(target: "derivation", block_number, "Sending finalized L2 block number to engine."); self.engine_actor_request_tx - .send(EngineActorRequest::ProcessFinalizedL2BlockRequest(block_number)) + .send(EngineActorRequest::ProcessFinalizedL2BlockNumberRequest(Box::new(block_number))) .await .map_err(|_| EngineClientError::RequestError("request channel closed.".to_string()))?; diff --git a/crates/node/service/src/actors/derivation/mod.rs b/crates/node/service/src/actors/derivation/mod.rs index de5a2be16e..da2a0e0da5 100644 --- a/crates/node/service/src/actors/derivation/mod.rs +++ b/crates/node/service/src/actors/derivation/mod.rs @@ -1,11 +1,11 @@ mod actor; -pub use actor::{ - DerivationActor, DerivationBuilder, DerivationError, DerivationInboundChannels, - DerivationState, InboundDerivationMessage, PipelineBuilder, -}; +pub use actor::{DerivationActor, DerivationError}; mod engine_client; pub use engine_client::{DerivationEngineClient, QueuedDerivationEngineClient}; mod finalizer; pub(crate) use finalizer::L2Finalizer; + +mod request; +pub use request::{DerivationActorRequest, DerivationClientError, DerivationClientResult}; diff --git a/crates/node/service/src/actors/derivation/request.rs b/crates/node/service/src/actors/derivation/request.rs new file mode 100644 index 0000000000..d6d4be1f38 --- /dev/null +++ b/crates/node/service/src/actors/derivation/request.rs @@ -0,0 +1,35 @@ +use kona_derive::Signal; +use kona_protocol::{BlockInfo, L2BlockInfo}; +use thiserror::Error; + +/// The result of an Engine client call. +pub type DerivationClientResult = Result; + +/// Error making requests to the [`crate::DerivationActor`]. +#[derive(Debug, Error)] +pub enum DerivationClientError { + /// Error making a request to the [`crate::DerivationActor`]. The request never made it there. + #[error("Error making a request to the derivation actor: {0}.")] + RequestError(String), + + /// Error receiving response from the [`crate::DerivationActor`]. + /// This means the request may or may not have succeeded. + #[error("Error receiving response from the derivation actor: {0}..")] + ResponseError(String), +} + +/// Inbound requests that the [`crate::DerivationActor`] can process. +#[derive(Debug)] +pub enum DerivationActorRequest { + /// Request to process the fact that Engine sync has completed. + ProcessEngineSyncCompletionRequest, + /// Request to process the provided L2 engine safe head update. + ProcessEngineSafeHeadUpdateRequest(Box), + /// A request containing a [`Signal`] to the derivation pipeline. + /// This allows the Engine to send the DerivationActor signals (e.g. to Flush or Reset). + ProcessEngineSignalRequest(Box), + /// A request to process the provided finalized L1 [`BlockInfo`]. + ProcessFinalizedL1Block(Box), + /// Request to process the provided L1 head block update. + ProcessL1HeadUpdateRequest(Box), +} diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 3b2424112d..9e1c7119d6 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -1,506 +1,59 @@ //! The [`EngineActor`]. use crate::{ - BuildRequest, EngineActorRequest, EngineClientError, EngineError, EngineRpcRequest, NodeActor, - NodeMode, ResetRequest, SealRequest, actors::CancellableContext, + EngineActorRequest, EngineError, EngineProcessingRequest, EngineRequestReceiver, + EngineRpcRequestReceiver, NodeActor, actors::CancellableContext, }; -use alloy_provider::RootProvider; -use alloy_rpc_types_engine::JwtSecret; use async_trait::async_trait; +use derive_more::Constructor; use futures::FutureExt; -use kona_derive::{ResetSignal, Signal}; -use kona_engine::{ - BuildTask, ConsolidateTask, Engine, EngineClient, EngineClientBuilder, - EngineClientBuilderError, EngineState as InnerEngineState, EngineTask, EngineTaskError, - EngineTaskErrorSeverity, FinalizeTask, InsertTask, OpEngineClient, RollupBoostServer, - RollupBoostServerArgs, SealTask, -}; -use kona_genesis::RollupConfig; -use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; -use kona_rpc::RollupBoostAdminQuery; -use op_alloy_network::Optimism; -use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; -use std::{fmt::Debug, sync::Arc, time::Duration}; -use tokio::{ - sync::{mpsc, oneshot, watch}, - task::JoinHandle, -}; +use tokio::sync::mpsc; use tokio_util::{ future::FutureExt as _, sync::{CancellationToken, WaitForCancellationFuture}, }; -use url::Url; -/// The [`EngineActor`] is responsible for managing the operations sent to the execution layer's -/// Engine API. To accomplish this, it uses the [`Engine`] task queue to order Engine API -/// interactions based off of the [`Ord`] implementation of [`EngineTask`]. -#[derive(Debug)] -pub struct EngineActor { +/// The [`EngineActor`] is an intermediary that receives [`EngineActorRequest`] and delegates: +/// - Engine RPC queries requests to the configured [`EngineRpcRequestReceiver`] +/// - Node engine requests to the configured [`EngineRequestReceiver`]. +#[derive(Constructor, Debug)] +pub struct EngineActor +where + EngineRequestReceiver_: EngineRequestReceiver, + RpcRequestReceiver: EngineRpcRequestReceiver, +{ + /// The cancellation token shared by all tasks. + cancellation_token: CancellationToken, + /// The inbound request channel. inbound_request_rx: mpsc::Receiver, - /// The [`EngineConfig`] used to build the actor. - builder: EngineConfig, - - /// A channel to use to relay the current unsafe head. - /// ## Note - /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator - /// mode. - unsafe_head_tx: Option>, + /// The processor for engine requests + engine_receiver: EngineRequestReceiver_, + /// The processor for engine RPC requests + rpc_receiver: RpcRequestReceiver, } -/// The outbound data for the [`EngineActor`]. -#[derive(Debug)] -pub struct EngineInboundData { - /// A channel that sends requests to the EngineActor. - pub inbound_request_tx: mpsc::Sender, - /// A receiver to use to view the latest unsafe head [`L2BlockInfo`] and await its changes. - /// - /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator - /// mode. - pub unsafe_head_rx: Option>, -} - -/// Configuration for the Engine Actor. -#[derive(Debug, Clone)] -pub struct EngineConfig { - /// The [`RollupConfig`]. - pub config: Arc, - - /// Builder url. - pub builder_url: Url, - /// Builder jwt secret. - pub builder_jwt_secret: JwtSecret, - /// Builder timeout. - pub builder_timeout: Duration, - - /// The engine rpc url. - pub l2_url: Url, - /// The engine jwt secret. - pub l2_jwt_secret: JwtSecret, - /// The l2 timeout. - pub l2_timeout: Duration, - - /// The L1 rpc url. - pub l1_url: Url, - - /// The mode of operation for the node. - /// When the node is in sequencer mode, the engine actor will receive requests to build blocks - /// from the sequencer actor. - pub mode: NodeMode, - - /// The rollup boost arguments. - pub rollup_boost: RollupBoostServerArgs, -} - -impl EngineConfig { - /// Launches the [`Engine`]. Returns the [`Engine`] and a channel to receive engine state - /// updates. - fn build_state( - self, - ) -> Result< - EngineActorState>>, - EngineClientBuilderError, - > { - let client = EngineClientBuilder { - builder: self.builder_url.clone(), - builder_jwt: self.builder_jwt_secret, - builder_timeout: self.builder_timeout, - l2: self.l2_url.clone(), - l2_jwt: self.l2_jwt_secret, - l2_timeout: self.l2_timeout, - l1_rpc: self.l1_url.clone(), - cfg: self.config.clone(), - rollup_boost: self.rollup_boost.clone(), - } - .build()? - .into(); - - let state = InnerEngineState::default(); - let (engine_state_send, _) = tokio::sync::watch::channel(state); - let (engine_queue_length_send, _) = tokio::sync::watch::channel(0); - - Ok(EngineActorState { - rollup: self.config, - client, - engine: Engine::new(state, engine_state_send, engine_queue_length_send), - }) - } -} - -/// The configuration for the [`EngineActor`]. -#[derive(Debug)] -pub(super) struct EngineActorState { - /// The [`RollupConfig`] used to build tasks. - pub(super) rollup: Arc, - /// An [`OpEngineClient`] used for creating engine tasks. - pub(super) client: Arc, - /// The [`Engine`] task queue. - pub(super) engine: Engine, -} - -/// The communication context used by the engine actor. -#[derive(Debug)] -pub struct EngineContext { - /// The cancellation token, shared between all tasks. - pub cancellation: CancellationToken, - /// The sender for L2 safe head update notifications. - pub engine_l2_safe_head_tx: watch::Sender, - /// A channel to send a signal that EL sync has completed. Informs the derivation actor to - /// start. Because the EL sync state machine within [`InnerEngineState`] can only complete - /// once, this channel is consumed after the first successful send. Future cases where EL - /// sync is re-triggered can occur, but we will not block derivation on it. - pub sync_complete_tx: oneshot::Sender<()>, - /// A way for the engine actor to send a [`Signal`] back to the derivation actor. - pub derivation_signal_tx: mpsc::Sender, -} - -impl CancellableContext for EngineContext { +impl CancellableContext + for EngineActor +where + EngineRequestReceiver_: EngineRequestReceiver, + RpcRequestReceiver: EngineRpcRequestReceiver, +{ fn cancelled(&self) -> WaitForCancellationFuture<'_> { - self.cancellation.cancelled() - } -} - -impl EngineActor { - /// Constructs a new [`EngineActor`] from the params. - pub fn new(config: EngineConfig) -> (EngineInboundData, Self) { - let (inbound_request_tx, inbound_request_rx) = mpsc::channel(1024); - - let (unsafe_head_tx, unsafe_head_rx) = if config.mode.is_sequencer() { - let (unsafe_head_tx, unsafe_head_rx) = watch::channel(L2BlockInfo::default()); - - (Some(unsafe_head_tx), Some(unsafe_head_rx)) - } else { - (None, None) - }; - - let actor = Self { builder: config, inbound_request_rx, unsafe_head_tx }; - - let outbound_data = EngineInboundData { inbound_request_tx, unsafe_head_rx }; - - (outbound_data, actor) - } -} - -/// A request to process engine tasks. -#[derive(Debug)] -enum EngineProcessingRequest { - Build(Box), - ProcessDerivedL2Attributes(Box), - ProcessFinalizedL2Block(u64), - ProcessUnsafeL2Block(Box), - Reset(Box), - Seal(Box), -} - -impl EngineActorState { - /// Starts a task to handle engine queries. - fn start_rpc_handling_task( - &self, - mut inbound_rpc_channel: mpsc::Receiver, - rollup_boost: Arc, - ) -> JoinHandle> { - let state_recv = self.engine.state_subscribe(); - let queue_length_recv = self.engine.queue_length_subscribe(); - let engine_client = self.client.clone(); - let rollup_config = self.rollup.clone(); - - tokio::spawn(async move { - loop { - tokio::select! { - query = inbound_rpc_channel.recv(), if !inbound_rpc_channel.is_closed() => { - let Some(query) = query else { - error!(target: "engine", "Engine rpc request receiver closed unexpectedly"); - return Err(EngineError::ChannelClosed); - }; - match query { - EngineRpcRequest::EngineQuery(req) => { - trace!(target: "engine", ?req, "Received engine query."); - - if let Err(e) = req - .handle(&state_recv, &queue_length_recv, &engine_client, &rollup_config) - .await - { - warn!(target: "engine", err = ?e, "Failed to handle engine query."); - } - }, - EngineRpcRequest::RollupBoostAdminRequest(RollupBoostAdminQuery::SetExecutionMode { execution_mode, sender }) => { - trace!(target: "engine", ?execution_mode, "Received rollup boost set execution mode admin query."); - - rollup_boost.server.set_execution_mode(execution_mode); - let _ = sender.send(()).inspect_err(|_| { - warn!(target: "engine", "set execution mode response channel closed when trying to send"); - }); - }, - EngineRpcRequest::RollupBoostAdminRequest(RollupBoostAdminQuery::GetExecutionMode{sender}) => { - trace!(target: "engine", "Received rollup boost get execution mode admin query."); - - let execution_mode = rollup_boost.server.get_execution_mode(); - let _ = sender.send(execution_mode).inspect_err(|_| { - warn!(target: "engine", "get execution mode response channel closed when trying to send"); - }); - }, - EngineRpcRequest::RollupBoostHealthRequest(health_query) => { - trace!(target: "engine", ?health_query, "Received rollup boost health query."); - - let health = rollup_boost.get_health(); - let _ = health_query.sender.send(health.into()).inspect_err(|_| { - warn!(target: "engine", "rollup boost health query response channel closed when trying to send"); - }); - }, - - } - } - } - } - }) - } - - /// Starts a task to handle engine processing requests. - fn start_engine_processing_task( - mut self, - mut inbound_processing_channel: mpsc::Receiver, - derivation_signal_tx: mpsc::Sender, - engine_l2_safe_head_tx: watch::Sender, - mut sync_complete_tx: Option>, - unsafe_head_tx: Option>, - ) -> JoinHandle> { - tokio::spawn(async move { - loop { - // Attempt to drain all outstanding tasks from the engine queue before adding new - // ones. - self.drain(&derivation_signal_tx, &mut sync_complete_tx, &engine_l2_safe_head_tx) - .await - .inspect_err( - |err| error!(target: "engine", ?err, "Failed to drain engine tasks"), - )?; - - // If the unsafe head has updated, propagate it to the outbound channels. - if let Some(unsafe_head_tx) = unsafe_head_tx.as_ref() { - unsafe_head_tx.send_if_modified(|val| { - let new_head = self.engine.state().sync_state.unsafe_head(); - (*val != new_head).then(|| *val = new_head).is_some() - }); - } - - // Wait for the next processing request. - let Some(request) = inbound_processing_channel.recv().await else { - error!(target: "engine", "Engine processing request receiver closed unexpectedly"); - return Err(EngineError::ChannelClosed); - }; - - match request { - EngineProcessingRequest::Build(build_request) => { - let BuildRequest { attributes, result_tx } = *build_request; - let task = EngineTask::Build(Box::new(BuildTask::new( - self.client.clone(), - self.rollup.clone(), - attributes, - Some(result_tx), - ))); - self.engine.enqueue(task); - } - EngineProcessingRequest::ProcessDerivedL2Attributes(attributes) => { - let task = EngineTask::Consolidate(Box::new(ConsolidateTask::new( - self.client.clone(), - self.rollup.clone(), - *attributes, - true, - ))); - self.engine.enqueue(task); - } - EngineProcessingRequest::ProcessFinalizedL2Block(l2_block_number) => { - // Finalize the L2 block at the provided block number. - let task = EngineTask::Finalize(Box::new(FinalizeTask::new( - self.client.clone(), - self.rollup.clone(), - l2_block_number, - ))); - self.engine.enqueue(task); - } - EngineProcessingRequest::ProcessUnsafeL2Block(envelope) => { - let task = EngineTask::Insert(Box::new(InsertTask::new( - self.client.clone(), - self.rollup.clone(), - *envelope, - false, /* The payload is not derived in this case. This is an unsafe - * block. */ - ))); - self.engine.enqueue(task); - } - EngineProcessingRequest::Reset(reset_request) => { - warn!(target: "engine", "Received reset request"); - - let reset_res = - self.reset(&derivation_signal_tx, &engine_l2_safe_head_tx).await; - - // Send the result to the provided channel. - let response_payload = reset_res - .as_ref() - .map(|_| ()) - .map_err(|e| EngineClientError::ResetForkchoiceError(e.to_string())); - if reset_request.result_tx.send(response_payload).await.is_err() { - warn!(target: "engine", "Sending reset response failed"); - } - - reset_res?; - } - EngineProcessingRequest::Seal(seal_request) => { - let SealRequest { payload_id, attributes, result_tx } = *seal_request; - let task = EngineTask::Seal(Box::new(SealTask::new( - self.client.clone(), - self.rollup.clone(), - payload_id, - attributes, - // The payload is not derived in this case. - false, - Some(result_tx), - ))); - self.engine.enqueue(task); - } - } - } - }) - } - - /// Resets the inner [`Engine`] and propagates the reset to the derivation actor. - pub(super) async fn reset( - &mut self, - derivation_signal_tx: &mpsc::Sender, - engine_l2_safe_head_tx: &watch::Sender, - ) -> Result<(), EngineError> { - // Reset the engine. - let (l2_safe_head, l1_origin, system_config) = - self.engine.reset(self.client.clone(), self.rollup.clone()).await?; - - // Attempt to update the safe head following the reset. - // IMPORTANT NOTE: We need to update the safe head BEFORE sending the reset signal to the - // derivation actor. Since the derivation actor receives the safe head via a watch - // channel, updating the safe head after sending the reset signal may cause a race - // condition where the derivation actor receives the pre-reset safe head. - self.maybe_update_safe_head(engine_l2_safe_head_tx); - - // Signal the derivation actor to reset. - let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; - match derivation_signal_tx.send(signal.signal()).await { - Ok(_) => info!(target: "engine", "Sent reset signal to derivation actor"), - Err(err) => { - error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor"); - return Err(EngineError::ChannelClosed); - } - } - - Ok(()) - } - - /// Drains the inner [`Engine`] task queue and attempts to update the safe head. - async fn drain( - &mut self, - derivation_signal_tx: &mpsc::Sender, - sync_complete_tx: &mut Option>, - engine_l2_safe_head_tx: &watch::Sender, - ) -> Result<(), EngineError> { - match self.engine.drain().await { - Ok(_) => { - trace!(target: "engine", "[ENGINE] tasks drained"); - } - Err(err) => { - match err.severity() { - EngineTaskErrorSeverity::Critical => { - error!(target: "engine", ?err, "Critical error draining engine tasks"); - return Err(err.into()); - } - EngineTaskErrorSeverity::Reset => { - warn!(target: "engine", ?err, "Received reset request"); - self.reset(derivation_signal_tx, engine_l2_safe_head_tx).await?; - } - EngineTaskErrorSeverity::Flush => { - // This error is encountered when the payload is marked INVALID - // by the engine api. Post-holocene, the payload is replaced by - // a "deposits-only" block and re-executed. At the same time, - // the channel and any remaining buffered batches are flushed. - warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline."); - match derivation_signal_tx.send(Signal::FlushChannel).await { - Ok(_) => { - debug!(target: "engine", "Sent flush signal to derivation actor") - } - Err(err) => { - error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor."); - return Err(EngineError::ChannelClosed); - } - } - } - EngineTaskErrorSeverity::Temporary => { - trace!(target: "engine", ?err, "Temporary error draining engine tasks"); - } - } - } - } - - self.maybe_update_safe_head(engine_l2_safe_head_tx); - self.check_el_sync(derivation_signal_tx, engine_l2_safe_head_tx, sync_complete_tx).await?; - - Ok(()) - } - - /// Checks if the EL has finished syncing, notifying the derivation actor if it has. - async fn check_el_sync( - &mut self, - derivation_signal_tx: &mpsc::Sender, - engine_l2_safe_head_tx: &watch::Sender, - sync_complete_tx: &mut Option>, - ) -> Result<(), EngineError> { - if self.engine.state().el_sync_finished { - let Some(sync_complete_tx) = std::mem::take(sync_complete_tx) else { - return Ok(()); - }; - - // Only reset the engine if the sync state does not already know about a finalized - // block. - if self.engine.state().sync_state.finalized_head() != L2BlockInfo::default() { - return Ok(()); - } - - // If the sync status is finished, we can reset the engine and start derivation. - info!(target: "engine", "Performing initial engine reset"); - self.reset(derivation_signal_tx, engine_l2_safe_head_tx).await?; - sync_complete_tx.send(()).ok(); - } - - Ok(()) - } - - /// Attempts to update the safe head via the watch channel. - fn maybe_update_safe_head(&self, engine_l2_safe_head_tx: &watch::Sender) { - let state_safe_head = self.engine.state().sync_state.safe_head(); - let update = |head: &mut L2BlockInfo| { - if head != &state_safe_head { - *head = state_safe_head; - return true; - } - false - }; - let sent = engine_l2_safe_head_tx.send_if_modified(update); - info!(target: "engine", safe_head = ?state_safe_head, ?sent, "Attempted L2 Safe Head Update"); + self.cancellation_token.cancelled() } } #[async_trait] -impl NodeActor for EngineActor { +impl NodeActor + for EngineActor +where + EngineRequestReceiver_: EngineRequestReceiver + 'static, + RpcRequestReceiver: EngineRpcRequestReceiver + 'static, +{ type Error = EngineError; - type StartData = EngineContext; - - async fn start( - mut self, - EngineContext { - cancellation, - engine_l2_safe_head_tx, - sync_complete_tx, - derivation_signal_tx, - }: Self::StartData, - ) -> Result<(), Self::Error> { - let state = self.builder.build_state()?; + type StartData = (); - // All requests to the engine are sent to one of two tasks: RPC handling and Engine - // processing. + async fn start(mut self, _: Self::StartData) -> Result<(), Self::Error> { let (rpc_tx, rpc_rx) = mpsc::channel(1024); let (engine_processing_tx, engine_processing_rx) = mpsc::channel(1024); @@ -532,36 +85,34 @@ impl NodeActor for EngineActor { } }; + let rpc_cancellation = self.cancellation_token.clone(); // Start the engine query server in a separate task to avoid blocking the main task. - let rpc_handle = state - .start_rpc_handling_task(rpc_rx, state.client.rollup_boost.clone()) - .with_cancellation_token(&cancellation) - .then(handle_task_result("Engine query", cancellation.clone())); + let rpc_handle = self + .rpc_receiver + .start(rpc_rx) + .with_cancellation_token(&rpc_cancellation) + .then(handle_task_result("Engine query", rpc_cancellation.clone())); + let processing_cancellation = self.cancellation_token.clone(); // Start the engine processing task. - let processing_handle = state - .start_engine_processing_task( - engine_processing_rx, - derivation_signal_tx, - engine_l2_safe_head_tx, - Some(sync_complete_tx), - self.unsafe_head_tx, - ) - .with_cancellation_token(&cancellation) - .then(handle_task_result("Engine processing", cancellation.clone())); + let processing_handle = self + .engine_receiver + .start(engine_processing_rx) + .with_cancellation_token(&processing_cancellation) + .then(handle_task_result("Engine processing", processing_cancellation.clone())); // Helper to send processing requests with error handling. let send_engine_processing_request = |req: EngineProcessingRequest| async { engine_processing_tx.send(req).await.map_err(|_| { error!(target: "engine", "Engine processing channel closed unexpectedly"); - cancellation.cancel(); + self.cancellation_token.clone().cancel(); EngineError::ChannelClosed }) }; loop { tokio::select! { - _ = cancellation.cancelled() => { + _ = self.cancellation_token.cancelled() => { warn!(target: "engine", "EngineActor received shutdown signal. Awaiting task completion."); rpc_handle.await?; @@ -573,7 +124,7 @@ impl NodeActor for EngineActor { req = self.inbound_request_rx.recv() => { let Some(request) = req else { error!(target: "engine", "Engine inbound request receiver closed unexpectedly"); - cancellation.cancel(); + self.cancellation_token.cancel(); return Err(EngineError::ChannelClosed); }; @@ -582,7 +133,7 @@ impl NodeActor for EngineActor { EngineActorRequest::RpcRequest(rpc_req) => { rpc_tx.send(*rpc_req).await.map_err(|_| { error!(target: "engine", "Engine RPC request handler channel closed unexpectedly"); - cancellation.cancel(); + self.cancellation_token.cancel(); EngineError::ChannelClosed })?; } @@ -592,8 +143,8 @@ impl NodeActor for EngineActor { EngineActorRequest::ProcessDerivedL2AttributesRequest(attributes) => { send_engine_processing_request(EngineProcessingRequest::ProcessDerivedL2Attributes(attributes)).await?; } - EngineActorRequest::ProcessFinalizedL2BlockRequest(block_number) => { - send_engine_processing_request(EngineProcessingRequest::ProcessFinalizedL2Block(block_number)).await?; + EngineActorRequest::ProcessFinalizedL2BlockNumberRequest(block_number) => { + send_engine_processing_request(EngineProcessingRequest::ProcessFinalizedL2BlockNumber(block_number)).await?; } EngineActorRequest::ProcessUnsafeL2BlockRequest(envelope) => { send_engine_processing_request(EngineProcessingRequest::ProcessUnsafeL2Block(envelope)).await?; diff --git a/crates/node/service/src/actors/engine/client.rs b/crates/node/service/src/actors/engine/client.rs index 3ae20a2bbc..9a2cae43bb 100644 --- a/crates/node/service/src/actors/engine/client.rs +++ b/crates/node/service/src/actors/engine/client.rs @@ -1,96 +1,77 @@ -use alloy_rpc_types_engine::PayloadId; -use kona_engine::{BuildTaskError, EngineQueries, SealTaskError}; -use kona_protocol::OpAttributesWithParent; -use kona_rpc::{RollupBoostAdminQuery, RollupBoostHealthQuery}; -use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; -use thiserror::Error; +use crate::{DerivationActorRequest, DerivationClientError, DerivationClientResult}; +use async_trait::async_trait; +use derive_more::Constructor; +use kona_derive::Signal; +use kona_protocol::L2BlockInfo; +use std::fmt::Debug; use tokio::sync::mpsc; -/// The result of an Engine client call. -pub type EngineClientResult = Result; +/// Client to use to interact with the [`crate::DerivationActor`]. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait EngineDerivationClient: Debug + Send + Sync { + /// Notifies the [`crate::DerivationActor`] that engine syncing has completed. + /// Note: Does not wait for the derivation client to process this message. + async fn notify_sync_completed(&self) -> DerivationClientResult<()>; -/// Error making requests to the BlockEngine. -#[derive(Debug, Error)] -pub enum EngineClientError { - /// Error making a request to the engine. The request never made it there. - #[error("Error making a request to the engine: {0}.")] - RequestError(String), + /// Sends the new engine safe_head to the [`crate::DerivationActor`]. + /// Note: Does not wait for the derivation client to process this message. + async fn send_new_engine_safe_head(&self, safe_head: L2BlockInfo) + -> DerivationClientResult<()>; - /// Error receiving response from the engine. - /// This means the request may or may not have succeeded. - #[error("Error receiving response from the engine: {0}..")] - ResponseError(String), + /// Sends the [`crate::DerivationActor`] the provided [`Signal`]. + /// Note: Does not wait for the derivation client to process this message. + async fn send_signal(&self, signal: Signal) -> DerivationClientResult<()>; +} - /// An error occurred starting to build a block. - #[error(transparent)] - StartBuildError(#[from] BuildTaskError), +/// Client to use to send messages to the [`crate::DerivationActor`]'s inbound channel. +#[derive(Constructor, Debug)] +pub struct QueuedEngineDerivationClient { + /// A channel to use to send the [`DerivationActorRequest`]s to the [`crate::DerivationActor`]. + pub derivation_actor_request_tx: mpsc::Sender, +} - /// An error occurred sealing a block. - #[error(transparent)] - SealError(#[from] SealTaskError), +#[async_trait] +impl EngineDerivationClient for QueuedEngineDerivationClient { + async fn notify_sync_completed(&self) -> DerivationClientResult<()> { + info!(target: "engine", "Sending sync completed to derivation actor"); - /// An error occurred performing the reset. - #[error("An error occurred performing the reset: {0}.")] - ResetForkchoiceError(String), -} + self.derivation_actor_request_tx + .send(DerivationActorRequest::ProcessEngineSyncCompletionRequest) + .await + .map_err(|_| { + DerivationClientError::RequestError("request channel closed.".to_string()) + })?; -/// Inbound requests that the [`crate::EngineActor`] can process. -#[derive(Debug)] -pub enum EngineActorRequest { - /// Request to build. - BuildRequest(Box), - /// Request to consolidate based on the provided attributes. - ProcessDerivedL2AttributesRequest(Box), - /// Request to finalize the L2 block at the provided block number. - ProcessFinalizedL2BlockRequest(u64), - /// Request to insert the provided unsafe block. - ProcessUnsafeL2BlockRequest(Box), - /// Request to reset engine forkchoice. - ResetRequest(Box), - /// Request for the engine to process the provided RPC request. - RpcRequest(Box), - /// Request to seal the block with the provided details. - SealRequest(Box), -} + Ok(()) + } -/// RPC Request for the engine to handle. -#[derive(Debug)] -pub enum EngineRpcRequest { - /// Engine RPC query. - EngineQuery(EngineQueries), - /// Rollup boost admin request. - RollupBoostAdminRequest(RollupBoostAdminQuery), - /// Rollup boost health request. - RollupBoostHealthRequest(RollupBoostHealthQuery), -} + async fn send_new_engine_safe_head( + &self, + safe_head: L2BlockInfo, + ) -> DerivationClientResult<()> { + info!(target: "engine", safe_head = ?safe_head, "Sending new safe head to derivation actor"); -/// A request to build a payload. -/// Contains the attributes to build and a channel to send back the resulting `PayloadId`. -#[derive(Debug)] -pub struct BuildRequest { - /// The [`OpAttributesWithParent`] from which the block build should be started. - pub attributes: OpAttributesWithParent, - /// The channel on which the result, successful or not, will be sent. - pub result_tx: mpsc::Sender, -} + self.derivation_actor_request_tx + .send(DerivationActorRequest::ProcessEngineSafeHeadUpdateRequest(Box::new(safe_head))) + .await + .map_err(|_| { + DerivationClientError::RequestError("request channel closed.".to_string()) + })?; -/// A request to reset the engine forkchoice. -/// Contains a channel to send back the response indicating that the reset was successful or -/// containing the error if there was one. -#[derive(Debug)] -pub struct ResetRequest { - /// response will be sent to this channel. - pub result_tx: mpsc::Sender>, -} + Ok(()) + } + + async fn send_signal(&self, signal: Signal) -> DerivationClientResult<()> { + info!(target: "engine", signal = ?signal, "Sending signal to derivation actor"); + + self.derivation_actor_request_tx + .send(DerivationActorRequest::ProcessEngineSignalRequest(Box::new(signal))) + .await + .map_err(|_| { + DerivationClientError::RequestError("request channel closed.".to_string()) + })?; -/// A request to seal and canonicalize a payload. -/// Contains the `PayloadId`, attributes, and a channel to send back the result. -#[derive(Debug)] -pub struct SealRequest { - /// The `PayloadId` to seal and canonicalize. - pub payload_id: PayloadId, - /// The attributes necessary for the seal operation. - pub attributes: OpAttributesWithParent, - /// The channel on which the result, successful or not, will be sent. - pub result_tx: mpsc::Sender>, + Ok(()) + } } diff --git a/crates/node/service/src/actors/engine/config.rs b/crates/node/service/src/actors/engine/config.rs new file mode 100644 index 0000000000..be4bd3b83f --- /dev/null +++ b/crates/node/service/src/actors/engine/config.rs @@ -0,0 +1,63 @@ +use crate::NodeMode; +use alloy_provider::RootProvider; +use alloy_rpc_types_engine::JwtSecret; +use kona_engine::{ + EngineClientBuilder, EngineClientBuilderError, OpEngineClient, RollupBoostServerArgs, +}; +use kona_genesis::RollupConfig; +use op_alloy_network::Optimism; +use std::{sync::Arc, time::Duration}; +use url::Url; + +/// Configuration for the Engine Actor. +#[derive(Debug, Clone)] +pub struct EngineConfig { + /// The [`RollupConfig`]. + pub config: Arc, + + /// Builder url. + pub builder_url: Url, + /// Builder jwt secret. + pub builder_jwt_secret: JwtSecret, + /// Builder timeout. + pub builder_timeout: Duration, + + /// The engine rpc url. + pub l2_url: Url, + /// The engine jwt secret. + pub l2_jwt_secret: JwtSecret, + /// The l2 timeout. + pub l2_timeout: Duration, + + /// The L1 rpc url. + pub l1_url: Url, + + /// The mode of operation for the node. + /// When the node is in sequencer mode, the engine actor will receive requests to build blocks + /// from the sequencer actor. + pub mode: NodeMode, + + /// The rollup boost arguments. + pub rollup_boost: RollupBoostServerArgs, +} + +impl EngineConfig { + /// Builds and returns the [`OpEngineClient`]. + pub fn build_engine_client( + self, + ) -> Result>, EngineClientBuilderError> + { + EngineClientBuilder { + builder: self.builder_url.clone(), + builder_jwt: self.builder_jwt_secret, + builder_timeout: self.builder_timeout, + l2: self.l2_url.clone(), + l2_jwt: self.l2_jwt_secret, + l2_timeout: self.l2_timeout, + l1_rpc: self.l1_url.clone(), + cfg: self.config.clone(), + rollup_boost: self.rollup_boost, + } + .build() + } +} diff --git a/crates/node/service/src/actors/engine/engine_request_processor.rs b/crates/node/service/src/actors/engine/engine_request_processor.rs new file mode 100644 index 0000000000..cce497d8dc --- /dev/null +++ b/crates/node/service/src/actors/engine/engine_request_processor.rs @@ -0,0 +1,317 @@ +use crate::{ + BuildRequest, EngineClientError, EngineDerivationClient, EngineError, ResetRequest, SealRequest, +}; +use kona_derive::{ResetSignal, Signal}; +use kona_engine::{ + BuildTask, ConsolidateTask, Engine, EngineClient, EngineTask, EngineTaskError, + EngineTaskErrorSeverity, FinalizeTask, InsertTask, SealTask, +}; +use kona_genesis::RollupConfig; +use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; +use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; +use std::sync::Arc; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, +}; + +/// Requires that the implementor handles [`EngineProcessingRequest`]s via the provided channel. +/// Note: this exists to facilitate unit testing rather than consolidate multiple implementations +/// under a well-thought-out interface. +pub trait EngineRequestReceiver: Send + Sync { + /// Starts a task to handle engine processing requests. + fn start( + self, + request_channel: mpsc::Receiver, + ) -> JoinHandle>; +} + +/// A request to process engine tasks. +#[derive(Debug)] +pub enum EngineProcessingRequest { + /// Request to start building a block. + Build(Box), + /// Request to process a derived L2 safe head. + ProcessDerivedL2Attributes(Box), + /// Request to process the finalized L2 block with the provided block number. + ProcessFinalizedL2BlockNumber(Box), + /// Request to process a received unsafe L2 block. + ProcessUnsafeL2Block(Box), + /// Request to reset the forkchoice. + Reset(Box), + /// Request to seal a block. + Seal(Box), +} + +/// Responsible for managing the operations sent to the execution layer's Engine API. To accomplish +/// this, it uses the [`Engine`] task queue to order Engine API interactions based off of +/// the [`Ord`] implementation of [`EngineTask`]. +#[derive(Debug)] +pub struct EngineProcessor +where + EngineClient_: EngineClient, + DerivationClient: EngineDerivationClient, +{ + /// The client used to send messages to the [`crate::DerivationActor`]. + derivation_client: DerivationClient, + /// Whether the EL sync is complete. This should only ever go from false to true. + el_sync_complete: bool, + /// The last safe head update sent. + last_safe_head_sent: L2BlockInfo, + /// The [`RollupConfig`] . + /// A channel to use to relay the current unsafe head. + /// ## Note + /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator + /// mode. + unsafe_head_tx: Option>, + + /// The [`RollupConfig`] used to build tasks. + rollup: Arc, + /// An [`EngineClient`] used for creating engine tasks. + client: Arc, + /// The [`Engine`] task queue. + engine: Engine, +} + +impl EngineProcessor +where + EngineClient_: EngineClient + 'static, + DerivationClient: EngineDerivationClient + 'static, +{ + /// Constructs a new [`EngineProcessor`] from the params. + pub fn new( + client: Arc, + config: Arc, + derivation_client: DerivationClient, + engine: Engine, + unsafe_head_tx: Option>, + ) -> Self { + Self { + client, + derivation_client, + el_sync_complete: false, + engine, + last_safe_head_sent: L2BlockInfo::default(), + rollup: config, + unsafe_head_tx, + } + } + + /// Resets the inner [`Engine`] and propagates the reset to the derivation actor. + async fn reset(&mut self) -> Result<(), EngineError> { + // Reset the engine. + let (l2_safe_head, l1_origin, system_config) = + self.engine.reset(self.client.clone(), self.rollup.clone()).await?; + + // Signal the derivation actor to reset. + let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; + match self.derivation_client.send_signal(signal.signal()).await { + Ok(_) => info!(target: "engine", "Sent reset signal to derivation actor"), + Err(err) => { + error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor"); + return Err(EngineError::ChannelClosed); + } + } + + self.send_derivation_actor_safe_head_if_updated().await?; + + Ok(()) + } + + /// Drains the inner [`Engine`] task queue and attempts to update the safe head. + async fn drain(&mut self) -> Result<(), EngineError> { + match self.engine.drain().await { + Ok(_) => { + trace!(target: "engine", "[ENGINE] tasks drained"); + } + Err(err) => { + match err.severity() { + EngineTaskErrorSeverity::Critical => { + error!(target: "engine", ?err, "Critical error draining engine tasks"); + return Err(err.into()); + } + EngineTaskErrorSeverity::Reset => { + warn!(target: "engine", ?err, "Received reset request"); + self.reset().await?; + } + EngineTaskErrorSeverity::Flush => { + // This error is encountered when the payload is marked INVALID + // by the engine api. Post-holocene, the payload is replaced by + // a "deposits-only" block and re-executed. At the same time, + // the channel and any remaining buffered batches are flushed. + warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline."); + match self.derivation_client.send_signal(Signal::FlushChannel).await { + Ok(_) => { + debug!(target: "engine", "Sent flush signal to derivation actor") + } + Err(err) => { + error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor."); + return Err(EngineError::ChannelClosed); + } + } + } + EngineTaskErrorSeverity::Temporary => { + trace!(target: "engine", ?err, "Temporary error draining engine tasks"); + } + } + } + } + + self.send_derivation_actor_safe_head_if_updated().await?; + + if !self.el_sync_complete && self.engine.state().el_sync_finished { + self.mark_el_sync_complete_and_notify_derivation_actor().await?; + } + + Ok(()) + } + + async fn mark_el_sync_complete_and_notify_derivation_actor( + &mut self, + ) -> Result<(), EngineError> { + self.el_sync_complete = true; + + // Reset the engine if the sync state does not already know about a finalized block. + if self.engine.state().sync_state.finalized_head() == L2BlockInfo::default() { + // If the sync status is finished, we can reset the engine and start derivation. + info!(target: "engine", "Performing initial engine reset"); + self.reset().await?; + } else { + info!(target: "engine", "finalized head is not default, so not resetting"); + } + + self.derivation_client.notify_sync_completed().await.map(|_| Ok(())).map_err(|e| { + error!(target: "engine", ?e, "Failed to notify sync completed"); + EngineError::ChannelClosed + })? + } + + /// Attempts to send the [`crate::DerivationActor`] the safe head if updated. + async fn send_derivation_actor_safe_head_if_updated(&mut self) -> Result<(), EngineError> { + let engine_safe_head = self.engine.state().sync_state.safe_head(); + if engine_safe_head == self.last_safe_head_sent { + info!(target: "engine", safe_head = ?engine_safe_head, "Safe head unchanged"); + // This was already sent, so do not send it. + return Ok(()); + } + + self.derivation_client.send_new_engine_safe_head(engine_safe_head).await.map_err(|e| { + error!(target: "engine", ?e, "Failed to send new engine safe head"); + EngineError::ChannelClosed + })?; + + info!(target: "engine", safe_head = ?engine_safe_head, "Attempted L2 Safe Head Update"); + self.last_safe_head_sent = engine_safe_head; + + Ok(()) + } +} + +impl EngineRequestReceiver + for EngineProcessor +where + EngineClient_: EngineClient + 'static, + DerivationClient: EngineDerivationClient + 'static, +{ + fn start( + mut self, + mut request_channel: mpsc::Receiver, + ) -> JoinHandle> { + tokio::spawn(async move { + loop { + // Attempt to drain all outstanding tasks from the engine queue before adding new + // ones. + self.drain().await.inspect_err( + |err| error!(target: "engine", ?err, "Failed to drain engine tasks"), + )?; + + // If the unsafe head has updated, propagate it to the outbound channels. + if let Some(unsafe_head_tx) = self.unsafe_head_tx.as_ref() { + unsafe_head_tx.send_if_modified(|val| { + let new_head = self.engine.state().sync_state.unsafe_head(); + (*val != new_head).then(|| *val = new_head).is_some() + }); + } + + // Wait for the next processing request. + let Some(request) = request_channel.recv().await else { + error!(target: "engine", "Engine processing request receiver closed unexpectedly"); + return Err(EngineError::ChannelClosed); + }; + + match request { + EngineProcessingRequest::Build(build_request) => { + let BuildRequest { attributes, result_tx } = *build_request; + let task = EngineTask::Build(Box::new(BuildTask::new( + self.client.clone(), + self.rollup.clone(), + attributes, + Some(result_tx), + ))); + self.engine.enqueue(task); + } + EngineProcessingRequest::ProcessDerivedL2Attributes(attributes) => { + let task = EngineTask::Consolidate(Box::new(ConsolidateTask::new( + self.client.clone(), + self.rollup.clone(), + *attributes, + true, + ))); + self.engine.enqueue(task); + } + EngineProcessingRequest::ProcessFinalizedL2BlockNumber( + finalized_l2_block_number, + ) => { + // Finalize the L2 block at the provided block number. + let task = EngineTask::Finalize(Box::new(FinalizeTask::new( + self.client.clone(), + self.rollup.clone(), + *finalized_l2_block_number, + ))); + self.engine.enqueue(task); + } + EngineProcessingRequest::ProcessUnsafeL2Block(envelope) => { + let task = EngineTask::Insert(Box::new(InsertTask::new( + self.client.clone(), + self.rollup.clone(), + *envelope, + false, /* The payload is not derived in this case. This is an unsafe + * block. */ + ))); + self.engine.enqueue(task); + } + EngineProcessingRequest::Reset(reset_request) => { + warn!(target: "engine", "Received reset request"); + + let reset_res = self.reset().await; + + // Send the result. + let response_payload = reset_res + .as_ref() + .map(|_| ()) + .map_err(|e| EngineClientError::ResetForkchoiceError(e.to_string())); + if reset_request.result_tx.send(response_payload).await.is_err() { + warn!(target: "engine", "Sending reset response failed"); + // If there was an error and we couldn't notify the caller to handle it, + // return the error. + reset_res?; + } + } + EngineProcessingRequest::Seal(seal_request) => { + let SealRequest { payload_id, attributes, result_tx } = *seal_request; + let task = EngineTask::Seal(Box::new(SealTask::new( + self.client.clone(), + self.rollup.clone(), + payload_id, + attributes, + // The payload is not derived in this case. + false, + Some(result_tx), + ))); + self.engine.enqueue(task); + } + } + } + }) + } +} diff --git a/crates/node/service/src/actors/engine/mod.rs b/crates/node/service/src/actors/engine/mod.rs index 7698384b5d..51acbe6e16 100644 --- a/crates/node/service/src/actors/engine/mod.rs +++ b/crates/node/service/src/actors/engine/mod.rs @@ -1,13 +1,27 @@ //! The [`EngineActor`] and its components. mod actor; -pub use actor::{EngineActor, EngineConfig, EngineContext, EngineInboundData}; +pub use actor::EngineActor; mod client; -pub use client::{ +pub use client::{EngineDerivationClient, QueuedEngineDerivationClient}; + +mod config; +pub use config::EngineConfig; + +mod error; +pub use error::EngineError; + +mod request; +pub use request::{ BuildRequest, EngineActorRequest, EngineClientError, EngineClientResult, EngineRpcRequest, ResetRequest, SealRequest, }; -mod error; -pub use error::EngineError; +mod engine_request_processor; +pub use engine_request_processor::{ + EngineProcessingRequest, EngineProcessor, EngineRequestReceiver, +}; + +mod rpc_request_processor; +pub use rpc_request_processor::{EngineRpcProcessor, EngineRpcRequestReceiver}; diff --git a/crates/node/service/src/actors/engine/request.rs b/crates/node/service/src/actors/engine/request.rs new file mode 100644 index 0000000000..dcdf4cdb19 --- /dev/null +++ b/crates/node/service/src/actors/engine/request.rs @@ -0,0 +1,96 @@ +use alloy_rpc_types_engine::PayloadId; +use kona_engine::{BuildTaskError, EngineQueries, SealTaskError}; +use kona_protocol::OpAttributesWithParent; +use kona_rpc::{RollupBoostAdminQuery, RollupBoostHealthQuery}; +use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; +use thiserror::Error; +use tokio::sync::mpsc; + +/// The result of an Engine client call. +pub type EngineClientResult = Result; + +/// Error making requests to the BlockEngine. +#[derive(Debug, Error)] +pub enum EngineClientError { + /// Error making a request to the engine. The request never made it there. + #[error("Error making a request to the engine: {0}.")] + RequestError(String), + + /// Error receiving response from the engine. + /// This means the request may or may not have succeeded. + #[error("Error receiving response from the engine: {0}.")] + ResponseError(String), + + /// An error occurred starting to build a block. + #[error(transparent)] + StartBuildError(#[from] BuildTaskError), + + /// An error occurred sealing a block. + #[error(transparent)] + SealError(#[from] SealTaskError), + + /// An error occurred performing the reset. + #[error("An error occurred performing the reset: {0}.")] + ResetForkchoiceError(String), +} + +/// Inbound requests that the [`crate::EngineActor`] can process. +#[derive(Debug)] +pub enum EngineActorRequest { + /// Request to build. + BuildRequest(Box), + /// Request to consolidate based on the provided attributes. + ProcessDerivedL2AttributesRequest(Box), + /// Request to finalize the L2 block at the provided block number. + ProcessFinalizedL2BlockNumberRequest(Box), + /// Request to insert the provided unsafe block. + ProcessUnsafeL2BlockRequest(Box), + /// Request to reset engine forkchoice. + ResetRequest(Box), + /// Request for the engine to process the provided RPC request. + RpcRequest(Box), + /// Request to seal the block with the provided details. + SealRequest(Box), +} + +/// RPC Request for the engine to handle. +#[derive(Debug)] +pub enum EngineRpcRequest { + /// Engine RPC query. + EngineQuery(Box), + /// Rollup boost admin request. + RollupBoostAdminRequest(Box), + /// Rollup boost health request. + RollupBoostHealthRequest(Box), +} + +/// A request to build a payload. +/// Contains the attributes to build and a channel to send back the resulting `PayloadId`. +#[derive(Debug)] +pub struct BuildRequest { + /// The [`OpAttributesWithParent`] from which the block build should be started. + pub attributes: OpAttributesWithParent, + /// The channel on which the result, successful or not, will be sent. + pub result_tx: mpsc::Sender, +} + +/// A request to reset the engine forkchoice. +/// Optionally contains a channel to send back the response if the caller would like to know that +/// the request was successfully processed. +#[derive(Debug)] +pub struct ResetRequest { + /// response will be sent to this channel, if `Some`. + pub result_tx: mpsc::Sender>, +} + +/// A request to seal and canonicalize a payload. +/// Contains the `PayloadId`, attributes, and a channel to send back the result. +#[derive(Debug)] +pub struct SealRequest { + /// The `PayloadId` to seal and canonicalize. + pub payload_id: PayloadId, + /// The attributes necessary for the seal operation. + pub attributes: OpAttributesWithParent, + /// The channel on which the result, successful or not, will be sent. + pub result_tx: mpsc::Sender>, +} diff --git a/crates/node/service/src/actors/engine/rpc_request_processor.rs b/crates/node/service/src/actors/engine/rpc_request_processor.rs new file mode 100644 index 0000000000..691e0e68e9 --- /dev/null +++ b/crates/node/service/src/actors/engine/rpc_request_processor.rs @@ -0,0 +1,115 @@ +use crate::{EngineError, EngineRpcRequest}; +use derive_more::Constructor; +use kona_engine::{EngineClient, EngineState, RollupBoostServer}; +use kona_genesis::RollupConfig; +use kona_rpc::RollupBoostAdminQuery; +use std::sync::Arc; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, +}; + +/// Requires that the implementor handles [`EngineRpcRequest`]s via the provided channel. +/// Note: this exists to facilitate unit testing rather than consolidate multiple implementations +/// under a well-thought-out interface. +pub trait EngineRpcRequestReceiver: Send + Sync { + /// Starts a task to handle engine queries. + fn start( + self, + request_channel: mpsc::Receiver, + ) -> JoinHandle>; +} + +/// Processor for [`EngineRpcRequest`] requests. +#[derive(Constructor, Debug)] +pub struct EngineRpcProcessor { + /// An [`EngineClient`] used for creating engine tasks. + engine_client: Arc, + // RollupBoost server handle + rollup_boost_server: Arc, + /// The [`RollupConfig`] used to build tasks. + rollup_config: Arc, + /// Receiver for [`EngineState`] updates. + engine_state_receiver: watch::Receiver, + /// Receiver for engine queue length updates. + engine_queue_length_receiver: watch::Receiver, +} + +impl EngineRpcProcessor +where + EngineClient_: EngineClient + 'static, +{ + async fn handle_rpc_request(&self, request: EngineRpcRequest) -> Result<(), EngineError> { + match request { + EngineRpcRequest::EngineQuery(req) => { + trace!(target: "engine", ?req, "Received engine query."); + + if let Err(e) = req + .handle( + &self.engine_state_receiver, + &self.engine_queue_length_receiver, + &self.engine_client, + &self.rollup_config, + ) + .await + { + warn!(target: "engine", err = ?e, "Failed to handle engine query."); + } + } + EngineRpcRequest::RollupBoostAdminRequest(admin_query) => { + trace!(target: "engine", ?admin_query, "Received rollup boost admin query."); + + self.handle_rollup_boost_admin_query(*admin_query); + } + EngineRpcRequest::RollupBoostHealthRequest(health_query) => { + trace!(target: "engine", ?health_query, "Received rollup boost health query."); + + let health = self.rollup_boost_server.get_health(); + health_query.sender.send(health.into()).unwrap(); + } + } + + Ok(()) + } + + fn handle_rollup_boost_admin_query(&self, admin_query: RollupBoostAdminQuery) { + match admin_query { + RollupBoostAdminQuery::SetExecutionMode { execution_mode, sender } => { + self.rollup_boost_server.server.set_execution_mode(execution_mode); + let _ = sender.send(()).map_err(|_| { + warn!(target: "engine", "set execution mode response channel closed when trying to send"); + }); + } + RollupBoostAdminQuery::GetExecutionMode { sender } => { + let execution_mode = self.rollup_boost_server.server.get_execution_mode(); + let _ = sender.send(execution_mode).map_err(|_| { + warn!(target: "engine", "get execution mode response channel closed when trying to send"); + }); + } + } + } +} + +impl EngineRpcRequestReceiver for EngineRpcProcessor +where + EngineClient_: EngineClient + 'static, +{ + fn start( + self, + mut request_channel: mpsc::Receiver, + ) -> JoinHandle> { + tokio::spawn(async move { + loop { + tokio::select! { + query = request_channel.recv(), if !request_channel.is_closed() => { + let Some(query) = query else { + error!(target: "engine", "Engine rpc request receiver closed unexpectedly"); + return Err(EngineError::ChannelClosed); + }; + self.handle_rpc_request(query).await?; + } + } + } + }) + } +} diff --git a/crates/node/service/src/actors/l1_watcher/actor.rs b/crates/node/service/src/actors/l1_watcher/actor.rs index d56349e8b5..0065711f65 100644 --- a/crates/node/service/src/actors/l1_watcher/actor.rs +++ b/crates/node/service/src/actors/l1_watcher/actor.rs @@ -23,12 +23,15 @@ use tokio::{ }; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use super::L1WatcherDerivationClient; + /// An L1 chain watcher that checks for L1 block updates over RPC. #[derive(Debug)] -pub struct L1WatcherActor +pub struct L1WatcherActor where BlockStream: Stream + Unpin + Send, L1Provider: Provider, + L1WatcherDerivationClient_: L1WatcherDerivationClient, { /// The [`RollupConfig`] to tell if ecotone is active. /// This is used to determine if the L1 watcher should check for unsafe block signer updates. @@ -39,8 +42,8 @@ where inbound_queries: mpsc::Receiver, /// The latest L1 head block. latest_head: watch::Sender>, - /// The finalized L1 head block. - latest_finalized: watch::Sender>, + /// Client used to interact with the [`crate::DerivationActor`]. + derivation_client: L1WatcherDerivationClient_, /// The block signer sender. block_signer_sender: mpsc::Sender
, /// The cancellation token, shared between all tasks. @@ -50,10 +53,12 @@ where /// A stream over the finalized block accepted as canonical. finalized_stream: BlockStream, } -impl L1WatcherActor +impl + L1WatcherActor where BlockStream: Stream + Unpin + Send, L1Provider: Provider, + L1WatcherDerivationClient_: L1WatcherDerivationClient, { /// Instantiate a new [`L1WatcherActor`]. #[allow(clippy::too_many_arguments)] @@ -62,7 +67,7 @@ where l1_provider: L1Provider, l1_query_rx: mpsc::Receiver, l1_head_updates_tx: watch::Sender>, - l1_finalized_updates_tx: watch::Sender>, + derivation_client: L1WatcherDerivationClient_, signer: mpsc::Sender
, cancellation: CancellationToken, head_stream: BlockStream, @@ -73,7 +78,7 @@ where l1_provider, inbound_queries: l1_query_rx, latest_head: l1_head_updates_tx, - latest_finalized: l1_finalized_updates_tx, + derivation_client, block_signer_sender: signer, cancellation, head_stream, @@ -83,10 +88,12 @@ where } #[async_trait] -impl NodeActor for L1WatcherActor +impl NodeActor + for L1WatcherActor where BlockStream: Stream + Unpin + Send + 'static, L1Provider: Provider + 'static, + L1WatcherDerivationClient_: L1WatcherDerivationClient + 'static, { type Error = L1WatcherActorError; type StartData = (); @@ -114,6 +121,10 @@ where Some(head_block_info) => { // Send the head update event to all consumers. self.latest_head.send_replace(Some(head_block_info)); + self.derivation_client.send_new_l1_head(head_block_info).await.map_err(|e| { + warn!(target: "l1_watcher", "Error sending l1 head update to derivation actor: {e}"); + L1WatcherActorError::DerivationClientError(e) + })?; // For each log, attempt to construct a [`SystemConfigLog`]. // Build the [`SystemConfigUpdate`] from the log. @@ -144,7 +155,10 @@ where return Err(L1WatcherActorError::StreamEnded); } Some(finalized_block_info) => { - self.latest_finalized.send_replace(Some(finalized_block_info)); + self.derivation_client.send_finalized_l1_block(finalized_block_info).await.map_err(|e| { + warn!(target: "l1_watcher", "Error sending finalized l1 block update to derivation actor: {e}"); + L1WatcherActorError::DerivationClientError(e) + })?; } }, inbound_query = self.inbound_queries.recv() => match inbound_query { @@ -201,10 +215,12 @@ where } } -impl CancellableContext for L1WatcherActor +impl CancellableContext + for L1WatcherActor where BlockStream: Stream + Unpin + Send + 'static, L1Provider: Provider, + L1WatcherDerivationClient_: L1WatcherDerivationClient + 'static, { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() diff --git a/crates/node/service/src/actors/l1_watcher/client.rs b/crates/node/service/src/actors/l1_watcher/client.rs new file mode 100644 index 0000000000..304b85734c --- /dev/null +++ b/crates/node/service/src/actors/l1_watcher/client.rs @@ -0,0 +1,56 @@ +use crate::{DerivationActorRequest, DerivationClientError, DerivationClientResult}; +use async_trait::async_trait; +use derive_more::Constructor; +use kona_protocol::BlockInfo; +use std::fmt::Debug; +use tokio::sync::mpsc; + +/// Client to use to interact with the [`crate::DerivationActor`]. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait L1WatcherDerivationClient: Debug + Send + Sync { + /// Sends the [`crate::DerivationActor`] the provided finalized L1 block. + /// Note: this function just guarantees that it is received by the actor but does not have + /// any insight into whether it was processed or processed successfully. + async fn send_finalized_l1_block(&self, block: BlockInfo) -> DerivationClientResult<()>; + + /// Sends the latest L1 Head to the [`crate::DerivationActor`]. + /// Note: this function just guarantees that it is received by the actor but does not have + /// any insight into whether it was processed or processed successfully. + async fn send_new_l1_head(&self, block: BlockInfo) -> DerivationClientResult<()>; +} + +/// Client to use to send messages to the [`crate::DerivationActor`]'s inbound channel. +#[derive(Constructor, Debug)] +pub struct QueuedL1WatcherDerivationClient { + /// A channel to use to send the [`DerivationActorRequest`]s to the [`crate::DerivationActor`]. + pub derivation_actor_request_tx: mpsc::Sender, +} + +#[async_trait] +impl L1WatcherDerivationClient for QueuedL1WatcherDerivationClient { + async fn send_finalized_l1_block(&self, block: BlockInfo) -> DerivationClientResult<()> { + trace!(target: "l1_watcher", ?block, "Sending finalized l1 block to derivation actor."); + let _ = self + .derivation_actor_request_tx + .send(DerivationActorRequest::ProcessFinalizedL1Block(Box::new(block))) + .await + .map_err(|_| { + DerivationClientError::RequestError("request channel closed.".to_string()) + })?; + + Ok(()) + } + + async fn send_new_l1_head(&self, block: BlockInfo) -> DerivationClientResult<()> { + trace!(target: "l1_watcher", ?block, "Sending new l1 head to derivation actor."); + self.derivation_actor_request_tx + .send(DerivationActorRequest::ProcessL1HeadUpdateRequest(Box::new(block))) + .await + .map_err(|_| { + DerivationClientError::RequestError("request channel closed.".to_string()) + })?; + + Ok(()) + } +} diff --git a/crates/node/service/src/actors/l1_watcher/error.rs b/crates/node/service/src/actors/l1_watcher/error.rs index 9add016e2e..4ea4c9e166 100644 --- a/crates/node/service/src/actors/l1_watcher/error.rs +++ b/crates/node/service/src/actors/l1_watcher/error.rs @@ -1,5 +1,6 @@ use std::sync::mpsc::SendError; +use crate::DerivationClientError; use alloy_eips::BlockId; use alloy_transport::TransportError; use thiserror::Error; @@ -19,4 +20,7 @@ pub enum L1WatcherActorError { /// Stream ended unexpectedly. #[error("Stream ended unexpectedly")] StreamEnded, + /// Derivation client error. + #[error("derivation client error: {0}")] + DerivationClientError(#[from] DerivationClientError), } diff --git a/crates/node/service/src/actors/l1_watcher/mod.rs b/crates/node/service/src/actors/l1_watcher/mod.rs index 0d2b76fb82..9c871b5050 100644 --- a/crates/node/service/src/actors/l1_watcher/mod.rs +++ b/crates/node/service/src/actors/l1_watcher/mod.rs @@ -4,5 +4,8 @@ pub use actor::L1WatcherActor; mod blockstream; pub use blockstream::BlockStream; +mod client; +pub use client::{L1WatcherDerivationClient, QueuedL1WatcherDerivationClient}; + mod error; pub use error::L1WatcherActorError; diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index 1fedf8afe7..4686d14db5 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -8,8 +8,9 @@ pub use traits::{CancellableContext, NodeActor}; mod engine; pub use engine::{ BuildRequest, EngineActor, EngineActorRequest, EngineClientError, EngineClientResult, - EngineConfig, EngineContext, EngineError, EngineInboundData, EngineRpcRequest, ResetRequest, - SealRequest, + EngineConfig, EngineDerivationClient, EngineError, EngineProcessingRequest, EngineProcessor, + EngineRequestReceiver, EngineRpcProcessor, EngineRpcRequest, EngineRpcRequestReceiver, + QueuedEngineDerivationClient, ResetRequest, SealRequest, }; mod rpc; @@ -20,13 +21,15 @@ pub use rpc::{ mod derivation; pub use derivation::{ - DerivationActor, DerivationBuilder, DerivationEngineClient, DerivationError, - DerivationInboundChannels, DerivationState, InboundDerivationMessage, PipelineBuilder, - QueuedDerivationEngineClient, + DerivationActor, DerivationActorRequest, DerivationClientError, DerivationClientResult, + DerivationEngineClient, DerivationError, QueuedDerivationEngineClient, }; mod l1_watcher; -pub use l1_watcher::{BlockStream, L1WatcherActor, L1WatcherActorError}; +pub use l1_watcher::{ + BlockStream, L1WatcherActor, L1WatcherActorError, L1WatcherDerivationClient, + QueuedL1WatcherDerivationClient, +}; mod network; pub use network::{ diff --git a/crates/node/service/src/actors/network/engine_client.rs b/crates/node/service/src/actors/network/engine_client.rs index 89890c2b88..226b93063f 100644 --- a/crates/node/service/src/actors/network/engine_client.rs +++ b/crates/node/service/src/actors/network/engine_client.rs @@ -23,6 +23,7 @@ pub struct QueuedNetworkEngineClient { #[async_trait] impl NetworkEngineClient for QueuedNetworkEngineClient { async fn send_unsafe_block(&self, block: OpExecutionPayloadEnvelope) -> EngineClientResult<()> { + trace!(target: "network", ?block, "Sending unsafe block to engine."); Ok(self .engine_actor_request_tx .send(EngineActorRequest::ProcessUnsafeL2BlockRequest(Box::new(block))) diff --git a/crates/node/service/src/actors/rpc/engine_rpc_client.rs b/crates/node/service/src/actors/rpc/engine_rpc_client.rs index 959fb4e508..4222082250 100644 --- a/crates/node/service/src/actors/rpc/engine_rpc_client.rs +++ b/crates/node/service/src/actors/rpc/engine_rpc_client.rs @@ -29,7 +29,7 @@ impl EngineRpcClient for QueuedEngineRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new(EngineRpcRequest::EngineQuery( - EngineQueries::Config(config_tx), + Box::new(EngineQueries::Config(config_tx)), )))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -45,7 +45,7 @@ impl EngineRpcClient for QueuedEngineRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new(EngineRpcRequest::EngineQuery( - EngineQueries::State(state_tx), + Box::new(EngineQueries::State(state_tx)), )))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -64,7 +64,7 @@ impl EngineRpcClient for QueuedEngineRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new(EngineRpcRequest::EngineQuery( - EngineQueries::OutputAtBlock { block, sender: output_tx }, + Box::new(EngineQueries::OutputAtBlock { block, sender: output_tx }), )))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -80,7 +80,7 @@ impl EngineRpcClient for QueuedEngineRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new(EngineRpcRequest::EngineQuery( - EngineQueries::TaskQueueLength(length_tx), + Box::new(EngineQueries::TaskQueueLength(length_tx)), )))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -96,7 +96,7 @@ impl EngineRpcClient for QueuedEngineRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new(EngineRpcRequest::EngineQuery( - EngineQueries::QueueLengthReceiver(sub_tx), + Box::new(EngineQueries::QueueLengthReceiver(sub_tx)), )))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -111,7 +111,7 @@ impl EngineRpcClient for QueuedEngineRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new(EngineRpcRequest::EngineQuery( - EngineQueries::StateReceiver(sub_tx), + Box::new(EngineQueries::StateReceiver(sub_tx)), )))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; diff --git a/crates/node/service/src/actors/rpc/rollup_boost_rpc_client.rs b/crates/node/service/src/actors/rpc/rollup_boost_rpc_client.rs index 2cb09a0e48..5eb8bd94e8 100644 --- a/crates/node/service/src/actors/rpc/rollup_boost_rpc_client.rs +++ b/crates/node/service/src/actors/rpc/rollup_boost_rpc_client.rs @@ -24,9 +24,9 @@ impl RollupBoostHealthzApiServer for RollupBoostHealthRpcClient { self.engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new( - EngineRpcRequest::RollupBoostHealthRequest(kona_rpc::RollupBoostHealthQuery { - sender: health_tx, - }), + EngineRpcRequest::RollupBoostHealthRequest(Box::new( + kona_rpc::RollupBoostHealthQuery { sender: health_tx }, + )), ))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -56,12 +56,12 @@ impl RollupBoostAdminClient for RollupBoostAdminApiClient { engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new( - EngineRpcRequest::RollupBoostAdminRequest( + EngineRpcRequest::RollupBoostAdminRequest(Box::new( kona_rpc::RollupBoostAdminQuery::SetExecutionMode { execution_mode: request.execution_mode, sender: mode_tx, }, - ), + )), ))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; @@ -78,9 +78,9 @@ impl RollupBoostAdminClient for RollupBoostAdminApiClient { engine_actor_request_tx .send(EngineActorRequest::RpcRequest(Box::new( - EngineRpcRequest::RollupBoostAdminRequest( + EngineRpcRequest::RollupBoostAdminRequest(Box::new( kona_rpc::RollupBoostAdminQuery::GetExecutionMode { sender: mode_tx }, - ), + )), ))) .await .map_err(|_| ErrorObject::from(ErrorCode::InternalError))?; diff --git a/crates/node/service/src/actors/sequencer/engine_client.rs b/crates/node/service/src/actors/sequencer/engine_client.rs index a39f6f262e..4b49aad6b0 100644 --- a/crates/node/service/src/actors/sequencer/engine_client.rs +++ b/crates/node/service/src/actors/sequencer/engine_client.rs @@ -60,15 +60,20 @@ impl SequencerEngineClient for QueuedSequencerEngineClient { async fn reset_engine_forkchoice(&self) -> EngineClientResult<()> { let (result_tx, mut result_rx) = mpsc::channel(1); + info!(target: "sequencer", "Sending reset request to engine."); self.engine_actor_request_tx .send(EngineActorRequest::ResetRequest(Box::new(ResetRequest { result_tx }))) .await .map_err(|_| EngineClientError::RequestError("request channel closed.".to_string()))?; - result_rx.recv().await.ok_or_else(|| { - error!(target: "block_engine", "Failed to receive built payload"); - EngineClientError::ResponseError("response channel closed.".to_string()) - })? + result_rx + .recv() + .await + .inspect(|_| info!(target: "sequencer", "Engine reset successfully.")) + .ok_or_else(|| { + error!(target: "block_engine", "Failed to receive built payload"); + EngineClientError::ResponseError("response channel closed.".to_string()) + })? } async fn start_build_block( @@ -77,6 +82,7 @@ impl SequencerEngineClient for QueuedSequencerEngineClient { ) -> EngineClientResult { let (payload_id_tx, mut payload_id_rx) = mpsc::channel(1); + trace!(target: "sequencer", "Sending start build request to engine."); if self .engine_actor_request_tx .send(EngineActorRequest::BuildRequest(Box::new(BuildRequest { @@ -89,7 +95,10 @@ impl SequencerEngineClient for QueuedSequencerEngineClient { return Err(EngineClientError::RequestError("request channel closed.".to_string())); } - payload_id_rx.recv().await.ok_or_else(|| { + payload_id_rx.recv() + .await + .inspect(|payload_id| trace!(target: "sequencer", ?payload_id, "Start build request successfully.")) + .ok_or_else(|| { error!(target: "block_engine", "Failed to receive payload for initiated block build"); EngineClientError::ResponseError("response channel closed.".to_string()) }) @@ -102,6 +111,7 @@ impl SequencerEngineClient for QueuedSequencerEngineClient { ) -> EngineClientResult { let (result_tx, mut result_rx) = mpsc::channel(1); + trace!(target: "sequencer", ?attributes, "Sending seal request to engine."); self.engine_actor_request_tx .send(EngineActorRequest::SealRequest(Box::new(SealRequest { payload_id, @@ -112,8 +122,14 @@ impl SequencerEngineClient for QueuedSequencerEngineClient { .map_err(|_| EngineClientError::RequestError("request channel closed.".to_string()))?; match result_rx.recv().await { - Some(Ok(payload)) => Ok(payload), - Some(Err(err)) => Err(EngineClientError::SealError(err)), + Some(Ok(payload)) => { + trace!(target: "sequencer", ?payload, "Seal succeeded."); + Ok(payload) + } + Some(Err(err)) => { + info!(target: "sequencer", ?err, "Seal failed."); + Err(EngineClientError::SealError(err)) + } None => { error!(target: "block_engine", "Failed to receive built payload"); Err(EngineClientError::ResponseError("response channel closed.".to_string())) diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index d8dc69335c..8e661119ca 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -17,15 +17,17 @@ pub use service::{ mod actors; pub use actors::{ BlockStream, BuildRequest, CancellableContext, Conductor, ConductorClient, ConductorError, - DelayedL1OriginSelectorProvider, DerivationActor, DerivationBuilder, DerivationEngineClient, - DerivationError, DerivationInboundChannels, DerivationState, EngineActor, EngineActorRequest, - EngineClientError, EngineClientResult, EngineConfig, EngineContext, EngineError, - EngineInboundData, EngineRpcRequest, InboundDerivationMessage, L1OriginSelector, - L1OriginSelectorError, L1OriginSelectorProvider, L1WatcherActor, L1WatcherActorError, - NetworkActor, NetworkActorError, NetworkBuilder, NetworkBuilderError, NetworkConfig, - NetworkDriver, NetworkDriverError, NetworkEngineClient, NetworkHandler, NetworkInboundData, - NodeActor, OriginSelector, PipelineBuilder, QueuedDerivationEngineClient, - QueuedEngineRpcClient, QueuedNetworkEngineClient, QueuedSequencerAdminAPIClient, + DelayedL1OriginSelectorProvider, DerivationActor, DerivationActorRequest, + DerivationClientError, DerivationClientResult, DerivationEngineClient, DerivationError, + EngineActor, EngineActorRequest, EngineClientError, EngineClientResult, EngineConfig, + EngineDerivationClient, EngineError, EngineProcessingRequest, EngineProcessor, + EngineRequestReceiver, EngineRpcProcessor, EngineRpcRequest, EngineRpcRequestReceiver, + L1OriginSelector, L1OriginSelectorError, L1OriginSelectorProvider, L1WatcherActor, + L1WatcherActorError, L1WatcherDerivationClient, NetworkActor, NetworkActorError, + NetworkBuilder, NetworkBuilderError, NetworkConfig, NetworkDriver, NetworkDriverError, + NetworkEngineClient, NetworkHandler, NetworkInboundData, NodeActor, OriginSelector, + QueuedDerivationEngineClient, QueuedEngineDerivationClient, QueuedEngineRpcClient, + QueuedL1WatcherDerivationClient, QueuedNetworkEngineClient, QueuedSequencerAdminAPIClient, QueuedSequencerEngineClient, QueuedUnsafePayloadGossipClient, ResetRequest, RollupBoostAdminApiClient, RollupBoostHealthRpcClient, RpcActor, RpcActorError, RpcContext, SealRequest, SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig, diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index 14bc521e79..c5d48e01c9 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -1,25 +1,28 @@ //! Contains the [`RollupNode`] implementation. use crate::{ - ConductorClient, DelayedL1OriginSelectorProvider, DerivationActor, DerivationBuilder, - EngineActor, EngineConfig, EngineContext, InteropMode, L1OriginSelector, L1WatcherActor, - NetworkActor, NetworkBuilder, NetworkConfig, NodeActor, NodeMode, QueuedDerivationEngineClient, - QueuedEngineRpcClient, QueuedNetworkEngineClient, QueuedSequencerAdminAPIClient, + ConductorClient, DelayedL1OriginSelectorProvider, DerivationActor, EngineActor, + EngineActorRequest, EngineConfig, EngineProcessor, EngineRpcProcessor, InteropMode, + L1OriginSelector, L1WatcherActor, NetworkActor, NetworkBuilder, NetworkConfig, NodeActor, + NodeMode, QueuedDerivationEngineClient, QueuedEngineDerivationClient, QueuedEngineRpcClient, + QueuedL1WatcherDerivationClient, QueuedNetworkEngineClient, QueuedSequencerAdminAPIClient, QueuedSequencerEngineClient, RollupBoostAdminApiClient, RollupBoostHealthRpcClient, RpcActor, RpcContext, SequencerActor, SequencerConfig, - actors::{ - BlockStream, DerivationInboundChannels, EngineInboundData, NetworkInboundData, - QueuedUnsafePayloadGossipClient, - }, + actors::{BlockStream, NetworkInboundData, QueuedUnsafePayloadGossipClient}, }; use alloy_eips::BlockNumberOrTag; use alloy_provider::RootProvider; use kona_derive::StatefulAttributesBuilder; +use kona_engine::{Engine, EngineState, OpEngineClient}; use kona_genesis::{L1ChainConfig, RollupConfig}; -use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient}; +use kona_protocol::L2BlockInfo; +use kona_providers_alloy::{ + AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider, + OnlinePipeline, +}; use kona_rpc::RpcBuilder; use op_alloy_network::Optimism; use std::{ops::Not as _, sync::Arc, time::Duration}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, watch}; use tokio_util::sync::CancellationToken; const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024; @@ -69,20 +72,6 @@ impl RollupNode { self.engine_config.mode } - /// Returns a derivation builder for the node. - fn derivation_builder(&self) -> DerivationBuilder { - DerivationBuilder { - l1_provider: self.l1_config.engine_provider.clone(), - l1_trust_rpc: self.l1_config.trust_rpc, - l1_beacon: self.l1_config.beacon_client.clone(), - l2_provider: self.l2_provider.clone(), - l2_trust_rpc: self.l2_trust_rpc, - rollup_config: self.config.clone(), - l1_config: self.l1_config.chain_config.clone(), - interop_mode: self.interop_mode, - } - } - /// Creates a network builder for the node. fn network_builder(&self) -> NetworkBuilder { NetworkBuilder::from(self.p2p_config.clone()) @@ -122,6 +111,93 @@ impl RollupNode { ) } + async fn create_pipeline(&self) -> OnlinePipeline { + // Create the caching L1/L2 EL providers for derivation. + let l1_derivation_provider = AlloyChainProvider::new_with_trust( + self.l1_config.engine_provider.clone(), + DERIVATION_PROVIDER_CACHE_SIZE, + self.l1_config.trust_rpc, + ); + let l2_derivation_provider = AlloyL2ChainProvider::new_with_trust( + self.l2_provider.clone(), + self.config.clone(), + DERIVATION_PROVIDER_CACHE_SIZE, + self.l2_trust_rpc, + ); + + match self.interop_mode { + InteropMode::Polled => OnlinePipeline::new_polled( + self.config.clone(), + self.l1_config.chain_config.clone(), + OnlineBlobProvider::init(self.l1_config.beacon_client.clone()).await, + l1_derivation_provider, + l2_derivation_provider, + ), + InteropMode::Indexed => OnlinePipeline::new_indexed( + self.config.clone(), + self.l1_config.chain_config.clone(), + OnlineBlobProvider::init(self.l1_config.beacon_client.clone()).await, + l1_derivation_provider, + l2_derivation_provider, + ), + } + } + + /// Helper function to assemble the [`EngineActor`] since there are many structs created that + /// are not relevant to other actors or logic. + /// Note: ignoring complex type warning. This type only pertains to this function, so it is + /// better to have the full type here than have to piece it together from multiple type defs. + #[allow(clippy::type_complexity)] + fn create_engine_actor( + &self, + cancellation_token: CancellationToken, + engine_request_rx: mpsc::Receiver, + derivation_client: QueuedEngineDerivationClient, + unsafe_head_tx: watch::Sender, + ) -> Result< + EngineActor< + EngineProcessor< + OpEngineClient>, + QueuedEngineDerivationClient, + >, + EngineRpcProcessor>>, + >, + String, + > { + let engine_state = EngineState::default(); + let (engine_state_tx, engine_state_rx) = watch::channel(engine_state); + let (engine_queue_length_tx, engine_queue_length_rx) = watch::channel(0); + let engine = Engine::new(engine_state, engine_state_tx, engine_queue_length_tx); + + let engine_client = Arc::new(self.engine_config().build_engine_client().map_err(|e| { + error!(target: "service", error = ?e, "engine client build failed"); + format!("Engine client build failed: {e:?}") + })?); + + let engine_processor = EngineProcessor::new( + engine_client.clone(), + self.config.clone(), + derivation_client, + engine, + if self.mode().is_sequencer() { Some(unsafe_head_tx) } else { None }, + ); + + let engine_rpc_processor = EngineRpcProcessor::new( + engine_client.clone(), + engine_client.rollup_boost.clone(), + self.config.clone(), + engine_state_rx, + engine_queue_length_rx, + ); + + Ok(EngineActor::new( + cancellation_token, + engine_request_rx, + engine_processor, + engine_rpc_processor, + )) + } + /// Starts the rollup node service. /// /// The rollup node, in validator mode, listens to two sources of information to sync the L2 @@ -145,28 +221,26 @@ impl RollupNode { // Create a global cancellation token for graceful shutdown of tasks. let cancellation = CancellationToken::new(); - // Create the engine actor. - let ( - EngineInboundData { inbound_request_tx: engine_actor_request_tx, unsafe_head_rx }, - engine, - ) = EngineActor::new(self.engine_config()); + let (derivation_actor_request_tx, derivation_actor_request_rx) = mpsc::channel(1024); + + let (engine_actor_request_tx, engine_actor_request_rx) = mpsc::channel(1024); + let (unsafe_head_tx, unsafe_head_rx) = watch::channel(L2BlockInfo::default()); + + let engine_actor = self.create_engine_actor( + cancellation.clone(), + engine_actor_request_rx, + QueuedEngineDerivationClient::new(derivation_actor_request_tx.clone()), + unsafe_head_tx, + )?; // Create the derivation actor. - let ( - DerivationInboundChannels { - derivation_signal_tx, - l1_head_updates_tx, - l1_finalized_updates_tx, - engine_l2_safe_head_tx, - el_sync_complete_tx, - }, - derivation, - ) = DerivationActor::new( + let derivation = DerivationActor::<_, OnlinePipeline>::new( QueuedDerivationEngineClient { engine_actor_request_tx: engine_actor_request_tx.clone(), }, cancellation.clone(), - self.derivation_builder(), + derivation_actor_request_rx, + self.create_pipeline().await, ); // Create the p2p actor. @@ -184,9 +258,10 @@ impl RollupNode { self.network_builder(), ); + let (l1_head_updates_tx, l1_head_updates_rx) = watch::channel(None); let delayed_l1_provider = DelayedL1OriginSelectorProvider::new( self.l1_config.engine_provider.clone(), - l1_head_updates_tx.subscribe(), + l1_head_updates_rx, self.sequencer_config.l1_conf_delay, ); @@ -219,7 +294,7 @@ impl RollupNode { self.l1_config.engine_provider.clone(), l1_query_rx, l1_head_updates_tx.clone(), - l1_finalized_updates_tx.clone(), + QueuedL1WatcherDerivationClient { derivation_actor_request_tx }, signer, cancellation.clone(), head_stream, @@ -230,10 +305,7 @@ impl RollupNode { let (sequencer_actor, sequencer_admin_client) = if self.mode().is_sequencer() { let sequencer_engine_client = QueuedSequencerEngineClient { engine_actor_request_tx: engine_actor_request_tx.clone(), - unsafe_head_rx: unsafe_head_rx.ok_or( - "unsafe_head_rx is None in sequencer mode. This should never happen." - .to_string(), - )?, + unsafe_head_rx, }; // Create the admin API channel @@ -291,15 +363,7 @@ impl RollupNode { Some((network, ())), Some((l1_watcher, ())), Some((derivation, ())), - Some(( - engine, - EngineContext { - engine_l2_safe_head_tx, - sync_complete_tx: el_sync_complete_tx, - derivation_signal_tx, - cancellation: cancellation.clone(), - } - )), + Some((engine_actor, ())), ] ); Ok(())