diff --git a/crates/node/rpc/src/rollup.rs b/crates/node/rpc/src/rollup.rs index a87766ba63..e047dea577 100644 --- a/crates/node/rpc/src/rollup.rs +++ b/crates/node/rpc/src/rollup.rs @@ -33,8 +33,11 @@ impl RollupRpc { pub const RPC_IDENT: &'static str = "rollup_rpc"; /// Constructs a new [`RollupRpc`] given a sender channel. - pub const fn new(sender: EngineQuerySender, l1_watcher_sender: L1WatcherQuerySender) -> Self { - Self { engine_sender: sender, l1_watcher_sender } + pub const fn new( + engine_sender: EngineQuerySender, + l1_watcher_sender: L1WatcherQuerySender, + ) -> Self { + Self { engine_sender, l1_watcher_sender } } // Important note: we zero-out the fields that can't be derived yet to follow op-node's diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index 43ba6d214c..919d16fd57 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -1,6 +1,6 @@ //! [NodeActor] implementation for the derivation sub-routine. -use crate::{Metrics, NodeActor, actors::ActorContext}; +use crate::{Metrics, NodeActor, actors::CancellableContext}; use async_trait::async_trait; use kona_derive::{ ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal, @@ -21,28 +21,53 @@ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// to the [NodeActor] responsible for the execution sub-routine. #[derive(Debug)] pub struct DerivationActor

+where + P: Pipeline + SignalReceiver, +{ + /// The state for the derivation actor. + state: DerivationState

, + /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. + attributes_out: mpsc::Sender, + /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward + /// them to the engine. + reset_request_tx: mpsc::Sender<()>, +} + +/// The state for the derivation actor. +#[derive(Debug)] +pub struct DerivationState

where P: Pipeline + SignalReceiver, { /// The derivation pipeline. - pipeline: P, + pub pipeline: P, /// A flag indicating whether or not derivation is idle. Derivation is considered idle when it /// has yielded to wait for more data on the DAL. - derivation_idle: bool, + pub derivation_idle: bool, /// A flag indicating whether or not derivation is waiting for a signal. When waiting for a /// signal, derivation cannot process any incoming events. - waiting_for_signal: bool, + pub waiting_for_signal: bool, +} + +/// The outbound channels for the derivation actor. +#[derive(Debug)] +pub struct DerivationOutboundChannels { + /// The receiver for derived [`OpAttributesWithParent`]s produced by the actor. + pub attributes_out: mpsc::Receiver, + /// The receiver for reset requests, used to handle [`PipelineErrorKind::Reset`] events and + /// forward them to the engine. + pub reset_request_tx: mpsc::Receiver<()>, } /// The communication context used by the derivation actor. #[derive(Debug)] pub struct DerivationContext { /// The receiver for L1 head update notifications. - l1_head_updates: watch::Receiver>, + pub l1_head_updates: watch::Receiver>, /// The receiver for L2 safe head update notifications. - engine_l2_safe_head: watch::Receiver, + pub engine_l2_safe_head: watch::Receiver, /// A receiver that tells derivation to begin. Completing EL sync consumes the instance. - el_sync_complete_rx: oneshot::Receiver<()>, + pub el_sync_complete_rx: oneshot::Receiver<()>, /// A receiver that sends a [`Signal`] to the derivation pipeline. /// /// The derivation actor steps over the derivation pipeline to generate @@ -61,49 +86,24 @@ pub struct DerivationContext { /// occurs. /// /// Specs: - derivation_signal_rx: mpsc::Receiver, - /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. - attributes_out: mpsc::Sender, - /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward - /// them to the engine. - reset_request_tx: mpsc::Sender<()>, + pub derivation_signal_rx: mpsc::Receiver, /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, + pub cancellation: CancellationToken, } -impl ActorContext for DerivationContext { +impl CancellableContext for DerivationContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } } -impl

DerivationActor

+impl

DerivationState

where P: Pipeline + SignalReceiver, { - /// Creates a new instance of the [DerivationActor]. - #[allow(clippy::too_many_arguments)] - pub const fn new( - pipeline: P, - engine_l2_safe_head: watch::Receiver, - el_sync_complete_rx: oneshot::Receiver<()>, - derivation_signal_rx: mpsc::Receiver, - l1_head_updates: watch::Receiver>, - attributes_out: mpsc::Sender, - reset_request_tx: mpsc::Sender<()>, - cancellation: CancellationToken, - ) -> (Self, DerivationContext) { - let actor = Self { pipeline, derivation_idle: true, waiting_for_signal: false }; - let context = DerivationContext { - l1_head_updates, - engine_l2_safe_head, - el_sync_complete_rx, - derivation_signal_rx, - attributes_out, - reset_request_tx, - cancellation, - }; - (actor, context) + /// Creates a new instance of the [DerivationState]. + pub const fn new(pipeline: P) -> Self { + Self { pipeline, derivation_idle: true, waiting_for_signal: false } } /// Handles a [`Signal`] received over the derivation signal receiver channel. @@ -169,6 +169,7 @@ where .pipeline .origin() .ok_or(PipelineError::MissingOrigin.crit())?; + self.pipeline .signal( ActivationSignal { @@ -304,13 +305,40 @@ where } } +impl

DerivationActor

+where + P: Pipeline + SignalReceiver, +{ + /// Creates a new instance of the [DerivationActor]. + #[allow(clippy::too_many_arguments)] + pub fn new(state: DerivationState

) -> (DerivationOutboundChannels, Self) { + let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16); + let (reset_request_tx, reset_request_rx) = mpsc::channel(16); + let actor = Self { state, attributes_out: derived_payload_tx, reset_request_tx }; + + ( + DerivationOutboundChannels { + attributes_out: derived_payload_rx, + reset_request_tx: reset_request_rx, + }, + actor, + ) + } +} + #[async_trait] impl

NodeActor for DerivationActor

where - P: Pipeline + SignalReceiver + Send + Sync, + P: Pipeline + SignalReceiver + Send + Sync + 'static, { type Error = DerivationError; - type Context = DerivationContext; + type InboundData = DerivationContext; + type State = DerivationState

; + type OutboundData = DerivationOutboundChannels; + + fn build(config: Self::State) -> (Self::OutboundData, Self) { + Self::new(config) + } async fn start( mut self, @@ -319,10 +347,8 @@ where mut engine_l2_safe_head, mut el_sync_complete_rx, mut derivation_signal_rx, - attributes_out, - reset_request_tx, cancellation, - }: Self::Context, + }: Self::InboundData, ) -> Result<(), Self::Error> { loop { select! { @@ -345,8 +371,8 @@ where return Err(DerivationError::SignalReceiveFailed); }; - self.signal(signal).await; - self.waiting_for_signal = false; + self.state.signal(signal).await; + self.state.waiting_for_signal = false; } msg = l1_head_updates.changed() => { if let Err(err) = msg { @@ -358,15 +384,15 @@ where return Ok(()); } - self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?; + self.state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?; } _ = engine_l2_safe_head.changed() => { - self.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?; + self.state.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?; } _ = &mut el_sync_complete_rx, if !el_sync_complete_rx.is_terminated() => { info!(target: "derivation", "Engine finished syncing, starting derivation."); // Optimistically process the first message. - self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?; + self.state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?; } } } diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index da82bc320c..e5c3cf3dcc 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -5,11 +5,11 @@ use alloy_rpc_types_engine::JwtSecret; use async_trait::async_trait; use kona_derive::{ResetSignal, Signal}; use kona_engine::{ - ConsolidateTask, Engine, EngineClient, EngineQueries, EngineState, EngineTask, EngineTaskError, - InsertUnsafeTask, + ConsolidateTask, Engine, EngineClient, EngineQueries, EngineState as InnerEngineState, + EngineTask, EngineTaskError, InsertUnsafeTask, }; use kona_genesis::RollupConfig; -use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; +use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; use kona_sources::RuntimeConfig; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; @@ -21,50 +21,68 @@ use tokio::{ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use url::Url; -use crate::{NodeActor, actors::ActorContext}; +use crate::{NodeActor, actors::CancellableContext}; /// The [`EngineActor`] is responsible for managing the operations sent to the execution layer's /// Engine API. To accomplish this, it uses the [`Engine`] task queue to order Engine API /// interactions based off of the [`Ord`] implementation of [`EngineTask`]. #[derive(Debug)] pub struct EngineActor { + /// The [`EngineActorState`] used to build the actor. + state: EngineActorState, + /// The receiver for L2 safe head update notifications. + engine_l2_safe_head_tx: watch::Sender, + /// A channel to send a signal that EL sync has completed. Informs the derivation actor to + /// start. Because the EL sync state machine within [`InnerEngineState`] can only complete + /// once, this channel is consumed after the first successful send. Future cases where EL + /// sync is re-triggered can occur, but we will not block derivation on it. + sync_complete_tx: oneshot::Sender<()>, + /// A way for the engine actor to send a [`Signal`] back to the derivation actor. + derivation_signal_tx: mpsc::Sender, +} + +/// The outbound data for the [`EngineActor`]. +#[derive(Debug)] +pub struct EngineOutboundData { + /// A channel to receive L2 safe head update notifications. + pub engine_l2_safe_head_rx: watch::Receiver, + /// A channel to receive a signal that EL sync has completed. + pub sync_complete_rx: oneshot::Receiver<()>, + /// A channel to send a [`Signal`] back to the derivation actor. + pub derivation_signal_rx: mpsc::Receiver, +} + +/// The configuration for the [`EngineActor`]. +#[derive(Debug)] +pub struct EngineActorState { /// The [`RollupConfig`] used to build tasks. - config: Arc, + pub rollup: Arc, /// An [`EngineClient`] used for creating engine tasks. - client: Arc, + pub client: Arc, /// The [`Engine`] task queue. - engine: Engine, - /// The [`L2Finalizer`], used to finalize L2 blocks. - finalizer: L2Finalizer, - /// Handler for inbound queries to the engine. - inbound_queries: Option>, + pub engine: Engine, } /// The communication context used by the engine actor. #[derive(Debug)] pub struct EngineContext { - /// The receiver for L2 safe head update notifications. - engine_l2_safe_head_tx: watch::Sender, - /// A channel to send a signal that EL sync has completed. Informs the derivation actor to - /// start. Because the EL sync state machine within [`EngineState`] can only complete once, - /// this channel is consumed after the first successful send. Future cases where EL sync is - /// re-triggered can occur, but we will not block derivation on it. - sync_complete_tx: oneshot::Sender<()>, - /// A way for the engine actor to send a [`Signal`] back to the derivation actor. - derivation_signal_tx: mpsc::Sender, /// A channel to receive [`RuntimeConfig`] from the runtime actor. - runtime_config_rx: mpsc::Receiver, + pub runtime_config_rx: Option>, /// A channel to receive [`OpAttributesWithParent`] from the derivation actor. - attributes_rx: mpsc::Receiver, + pub attributes_rx: mpsc::Receiver, /// A channel to receive [`OpExecutionPayloadEnvelope`] from the network actor. - unsafe_block_rx: mpsc::Receiver, + pub unsafe_block_rx: mpsc::Receiver, /// A channel to receive reset requests. - reset_request_rx: mpsc::Receiver<()>, + pub reset_request_rx: mpsc::Receiver<()>, + /// Handler for inbound queries to the engine. + pub inbound_queries: mpsc::Receiver, /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, + pub cancellation: CancellationToken, + /// The [`L2Finalizer`], used to finalize L2 blocks. + pub finalizer: L2Finalizer, } -impl ActorContext for EngineContext { +impl CancellableContext for EngineContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } @@ -72,53 +90,60 @@ impl ActorContext for EngineContext { impl EngineActor { /// Constructs a new [`EngineActor`] from the params. - #[allow(clippy::too_many_arguments)] - pub fn new( - config: Arc, - client: EngineClient, - engine: Engine, - engine_l2_safe_head_tx: watch::Sender, - sync_complete_tx: oneshot::Sender<()>, - derivation_signal_tx: mpsc::Sender, - runtime_config_rx: mpsc::Receiver, - attributes_rx: mpsc::Receiver, - unsafe_block_rx: mpsc::Receiver, - reset_request_rx: mpsc::Receiver<()>, - finalized_block_rx: watch::Receiver>, - inbound_queries: Option>, - cancellation: CancellationToken, - ) -> (Self, EngineContext) { - let client = Arc::new(client); + pub fn new(initial_state: EngineActorState) -> (EngineOutboundData, Self) { + let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16); + let (engine_l2_safe_head_tx, engine_l2_safe_head_rx) = + watch::channel(L2BlockInfo::default()); + let (sync_complete_tx, sync_complete_rx) = oneshot::channel(); + let actor = Self { - config, - client: Arc::clone(&client), - engine, - finalizer: L2Finalizer::new(finalized_block_rx, client), - inbound_queries, - }; - let context = EngineContext { + state: initial_state, engine_l2_safe_head_tx, sync_complete_tx, derivation_signal_tx, - runtime_config_rx, - attributes_rx, - unsafe_block_rx, - reset_request_rx, - cancellation, }; - (actor, context) + + let outbound_data = + EngineOutboundData { engine_l2_safe_head_rx, sync_complete_rx, derivation_signal_rx }; + + (outbound_data, actor) } + /// Starts a task to handle engine queries. + fn start_query_task( + &self, + mut inbound_query_channel: tokio::sync::mpsc::Receiver, + ) -> JoinHandle<()> { + let state_recv = self.state.engine.subscribe(); + let engine_client = self.state.client.clone(); + let rollup_config = self.state.rollup.clone(); + + tokio::spawn(async move { + while let Some(req) = inbound_query_channel.recv().await { + { + trace!(target: "engine", ?req, "Received engine query request."); + + if let Err(e) = req.handle(&state_recv, &engine_client, &rollup_config).await { + warn!(target: "engine", err = ?e, "Failed to handle engine query request."); + } + } + } + }) + } +} + +impl EngineActorState { /// Resets the inner [`Engine`] and propagates the reset to the derivation actor. pub async fn reset( &mut self, derivation_signal_tx: &mpsc::Sender, engine_l2_safe_head_tx: &watch::Sender, + finalizer: &mut L2Finalizer, cancellation: &CancellationToken, ) -> Result<(), EngineError> { // Reset the engine. let (l2_safe_head, l1_origin, system_config) = - self.engine.reset(self.client.clone(), &self.config).await?; + self.engine.reset(self.client.clone(), &self.rollup).await?; // Signal the derivation actor to reset. let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; @@ -135,7 +160,7 @@ impl EngineActor { self.maybe_update_safe_head(engine_l2_safe_head_tx); // Clear the queue of L2 blocks awaiting finalization. - self.finalizer.clear(); + finalizer.clear(); Ok(()) } @@ -146,6 +171,7 @@ impl EngineActor { derivation_signal_tx: &mpsc::Sender, sync_complete_tx: &mut Option>, engine_l2_safe_head_tx: &watch::Sender, + finalizer: &mut L2Finalizer, cancellation: &CancellationToken, ) -> Result<(), EngineError> { match self.engine.drain().await { @@ -154,7 +180,8 @@ impl EngineActor { } Err(EngineTaskError::Reset(err)) => { warn!(target: "engine", ?err, "Received reset request"); - self.reset(derivation_signal_tx, engine_l2_safe_head_tx, cancellation).await?; + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer, cancellation) + .await?; } Err(EngineTaskError::Flush(err)) => { // This error is encountered when the payload is marked INVALID @@ -188,6 +215,7 @@ impl EngineActor { derivation_signal_tx, engine_l2_safe_head_tx, sync_complete_tx, + finalizer, cancellation, ) .await?; @@ -201,6 +229,7 @@ impl EngineActor { derivation_signal_tx: &mpsc::Sender, engine_l2_safe_head_tx: &watch::Sender, sync_complete_tx: &mut Option>, + finalizer: &mut L2Finalizer, cancellation: &CancellationToken, ) -> Result<(), EngineError> { if self.engine.state().el_sync_finished { @@ -210,7 +239,8 @@ impl EngineActor { // If the sync status is finished, we can reset the engine and start derivation. info!(target: "engine", "Performing initial engine reset"); - self.reset(derivation_signal_tx, engine_l2_safe_head_tx, cancellation).await?; + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer, cancellation) + .await?; sync_complete_tx.send(()).ok(); } @@ -231,61 +261,41 @@ impl EngineActor { trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update"); } - /// Starts a task to handle engine queries. - fn start_query_task( - &self, - mut inbound_query_channel: tokio::sync::mpsc::Receiver, - ) -> JoinHandle<()> { - let state_recv = self.engine.subscribe(); - let engine_client = self.client.clone(); - let rollup_config = self.config.clone(); - - tokio::spawn(async move { - while let Some(req) = inbound_query_channel.recv().await { - { - trace!(target: "engine", ?req, "Received engine query request."); - - if let Err(e) = req.handle(&state_recv, &engine_client, &rollup_config).await { - warn!(target: "engine", err = ?e, "Failed to handle engine query request."); - } - } - } - }) - } - async fn process( &mut self, msg: InboundEngineMessage, derivation_signal_tx: &mpsc::Sender, engine_l2_safe_head_tx: &watch::Sender, + finalizer: &mut L2Finalizer, cancellation: &CancellationToken, ) -> Result<(), EngineError> { match msg { InboundEngineMessage::ResetRequest => { warn!(target: "engine", "Received reset request"); - self.reset(derivation_signal_tx, engine_l2_safe_head_tx, cancellation).await?; + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer, cancellation) + .await?; } InboundEngineMessage::UnsafeBlockReceived(envelope) => { let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new( - Arc::clone(&self.client), - Arc::clone(&self.config), + self.client.clone(), + self.rollup.clone(), *envelope, )); self.engine.enqueue(task); } InboundEngineMessage::DerivedAttributesReceived(attributes) => { - self.finalizer.enqueue_for_finalization(&attributes); + finalizer.enqueue_for_finalization(&attributes); let task = EngineTask::Consolidate(ConsolidateTask::new( - Arc::clone(&self.client), - Arc::clone(&self.config), + self.client.clone(), + Arc::clone(&self.rollup), *attributes, true, )); self.engine.enqueue(task); } InboundEngineMessage::RuntimeConfigUpdate(config) => { - let client = Arc::clone(&self.client); + let client = self.client.clone(); tokio::task::spawn(async move { debug!(target: "engine", config = ?config, "Received runtime config"); let recommended = config.recommended_protocol_version; @@ -303,7 +313,7 @@ impl EngineActor { InboundEngineMessage::NewFinalizedL1Block => { // Attempt to finalize any L2 blocks that are contained within the finalized L1 // chain. - self.finalizer.try_finalize_next(&mut self.engine).await; + finalizer.try_finalize_next(&mut self.engine).await; } } @@ -314,49 +324,51 @@ impl EngineActor { #[async_trait] impl NodeActor for EngineActor { type Error = EngineError; - type Context = EngineContext; + type InboundData = EngineContext; + type OutboundData = EngineOutboundData; + type State = EngineActorState; + + fn build(initial_state: Self::State) -> (Self::OutboundData, Self) { + Self::new(initial_state) + } async fn start( mut self, EngineContext { - engine_l2_safe_head_tx, - sync_complete_tx, - derivation_signal_tx, + mut finalizer, mut runtime_config_rx, mut attributes_rx, mut unsafe_block_rx, mut reset_request_rx, cancellation, - }: Self::Context, + inbound_queries, + }: Self::InboundData, ) -> Result<(), Self::Error> { // Start the engine query server in a separate task to avoid blocking the main task. - let handle = std::mem::take(&mut self.inbound_queries) - .map(|inbound_query_channel| self.start_query_task(inbound_query_channel)); + let handle = self.start_query_task(inbound_queries); // The sync complete tx is consumed after the first successful send. Hence we need to wrap // it in an `Option` to ensure we satisfy the borrow checker. - let mut sync_complete_tx = Some(sync_complete_tx); + let mut sync_complete_tx = Some(self.sync_complete_tx); loop { // Attempt to drain all outstanding tasks from the engine queue before adding new ones. - self.drain( - &derivation_signal_tx, - &mut sync_complete_tx, - &engine_l2_safe_head_tx, - &cancellation, - ) - .await?; + self.state + .drain( + &self.derivation_signal_tx, + &mut sync_complete_tx, + &self.engine_l2_safe_head_tx, + &mut finalizer, + &cancellation, + ) + .await?; tokio::select! { biased; _ = cancellation.cancelled() => { - warn!(target: "engine", "EngineActor received shutdown signal."); - - if let Some(handle) = handle { - warn!(target: "engine", "Shutting down engine query task."); - handle.abort(); - } + warn!(target: "engine", "EngineActor received shutdown signal. Shutting down engine query task."); + handle.abort(); return Ok(()); } @@ -366,7 +378,7 @@ impl NodeActor for EngineActor { cancellation.cancel(); return Err(EngineError::ChannelClosed); } - self.process(InboundEngineMessage::ResetRequest, &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; + self.state.process(InboundEngineMessage::ResetRequest, &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?; } unsafe_block = unsafe_block_rx.recv() => { let Some(envelope) = unsafe_block else { @@ -374,7 +386,7 @@ impl NodeActor for EngineActor { cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - self.process(InboundEngineMessage::UnsafeBlockReceived(envelope.into()), &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; + self.state.process(InboundEngineMessage::UnsafeBlockReceived(envelope.into()), &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?; } attributes = attributes_rx.recv() => { let Some(attributes) = attributes else { @@ -382,23 +394,23 @@ impl NodeActor for EngineActor { cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - self.process(InboundEngineMessage::DerivedAttributesReceived(attributes.into()), &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; + self.state.process(InboundEngineMessage::DerivedAttributesReceived(attributes.into()), &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?; } - config = runtime_config_rx.recv() => { + config = runtime_config_rx.as_mut().map(|rx| rx.recv()).unwrap(), if runtime_config_rx.is_some() => { let Some(config) = config else { error!(target: "engine", "Runtime config receiver closed unexpectedly"); cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - self.process(InboundEngineMessage::RuntimeConfigUpdate(config.into()), &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; + self.state.process(InboundEngineMessage::RuntimeConfigUpdate(config.into()), &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?; } - msg = self.finalizer.new_finalized_block() => { + msg = finalizer.new_finalized_block() => { if let Err(err) = msg { error!(target: "engine", ?err, "L1 finalized block receiver closed unexpectedly"); cancellation.cancel(); return Err(EngineError::ChannelClosed); } - self.process(InboundEngineMessage::NewFinalizedL1Block, &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; + self.state.process(InboundEngineMessage::NewFinalizedL1Block, &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?; } } } @@ -439,7 +451,7 @@ impl EngineLauncher { /// Launches the [`Engine`]. Returns the [`Engine`] and a channel to receive engine state /// updates. pub fn launch(self) -> Engine { - let state = EngineState::default(); + let state = InnerEngineState::default(); let (engine_state_send, _) = tokio::sync::watch::channel(state); Engine::new(state, engine_state_send) } diff --git a/crates/node/service/src/actors/engine/mod.rs b/crates/node/service/src/actors/engine/mod.rs index bfbd764845..9efd20506f 100644 --- a/crates/node/service/src/actors/engine/mod.rs +++ b/crates/node/service/src/actors/engine/mod.rs @@ -1,7 +1,10 @@ //! The [`EngineActor`] and its components. mod actor; -pub use actor::{EngineActor, EngineContext, EngineLauncher, InboundEngineMessage}; +pub use actor::{ + EngineActor, EngineActorState, EngineContext, EngineLauncher, EngineOutboundData, + InboundEngineMessage, +}; mod error; pub use error::EngineError; diff --git a/crates/node/service/src/actors/l1_watcher_rpc.rs b/crates/node/service/src/actors/l1_watcher_rpc.rs index 437f4772d9..302f8d80ce 100644 --- a/crates/node/service/src/actors/l1_watcher_rpc.rs +++ b/crates/node/service/src/actors/l1_watcher_rpc.rs @@ -1,7 +1,7 @@ //! [`NodeActor`] implementation for an L1 chain watcher that polls for L1 block updates over HTTP //! RPC. -use crate::{NodeActor, actors::ActorContext}; +use crate::{NodeActor, actors::CancellableContext}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, B256}; use alloy_provider::{Provider, RootProvider}; @@ -29,29 +29,46 @@ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// An L1 chain watcher that checks for L1 block updates over RPC. #[derive(Debug)] pub struct L1WatcherRpc { + state: L1WatcherRpcState, + /// The latest L1 head block. + latest_head: watch::Sender>, + /// The latest L1 finalized block. + latest_finalized: watch::Sender>, + /// The block signer sender. + block_signer_sender: mpsc::Sender

, +} + +/// The configuration for the L1 watcher actor. +#[derive(Debug)] +pub struct L1WatcherRpcState { /// The [`RollupConfig`] to tell if ecotone is active. /// This is used to determine if the L1 watcher should check for unsafe block signer updates. - config: Arc, + pub rollup: Arc, /// The L1 provider. - l1_provider: RootProvider, + pub l1_provider: RootProvider, +} + +/// The outbound channels for the L1 watcher actor. +#[derive(Debug)] +pub struct L1WatcherRpcOutboundChannels { + /// The latest L1 head block. + pub latest_head: watch::Receiver>, + /// The latest L1 finalized block. + pub latest_finalized: watch::Receiver>, + /// The block signer sender. + pub block_signer_sender: mpsc::Receiver
, } /// The communication context used by the L1 watcher actor. #[derive(Debug)] pub struct L1WatcherRpcContext { - /// The block signer sender. - block_signer_sender: mpsc::Sender
, /// The inbound queries to the L1 watcher. - inbound_queries: tokio::sync::mpsc::Receiver, - /// The latest L1 head block. - latest_head: watch::Sender>, - /// The latest L1 finalized block. - latest_finalized: watch::Sender>, + pub inbound_queries: tokio::sync::mpsc::Receiver, /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, + pub cancellation: CancellationToken, } -impl ActorContext for L1WatcherRpcContext { +impl CancellableContext for L1WatcherRpcContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } @@ -59,29 +76,31 @@ impl ActorContext for L1WatcherRpcContext { impl L1WatcherRpc { /// Creates a new [`L1WatcherRpc`] instance. - pub const fn new( - config: Arc, - l1_provider: RootProvider, - head_updates: watch::Sender>, - finalized_updates: watch::Sender>, - block_signer_sender: mpsc::Sender
, - cancellation: CancellationToken, - inbound_queries: mpsc::Receiver, - ) -> (Self, L1WatcherRpcContext) { - let actor = Self { config, l1_provider }; - let context = L1WatcherRpcContext { - block_signer_sender, - inbound_queries, - latest_head: head_updates, - latest_finalized: finalized_updates, - cancellation, + pub fn new(config: L1WatcherRpcState) -> (L1WatcherRpcOutboundChannels, Self) { + let (head_updates_tx, head_updates_rx) = watch::channel(None); + let (block_signer_tx, block_signer_rx) = mpsc::channel(16); + let (finalized_updates_tx, finalized_updates_rx) = watch::channel(None); + + let actor = Self { + state: config, + latest_head: head_updates_tx, + latest_finalized: finalized_updates_tx, + block_signer_sender: block_signer_tx, }; - (actor, context) + ( + L1WatcherRpcOutboundChannels { + latest_head: head_updates_rx, + latest_finalized: finalized_updates_rx, + block_signer_sender: block_signer_rx, + }, + actor, + ) } /// Fetches logs for the given block hash. async fn fetch_logs(&self, block_hash: B256) -> Result, L1WatcherRpcError> { let logs = self + .state .l1_provider .get_logs(&alloy_rpc_types_eth::Filter::new().select(block_hash)) .await?; @@ -97,8 +116,8 @@ impl L1WatcherRpc { ) -> JoinHandle<()> { // Start the inbound query processor in a separate task to avoid blocking the main task. // We can cheaply clone the l1 provider here because it is an Arc. - let l1_provider = self.l1_provider.clone(); - let rollup_config = self.config.clone(); + let l1_provider = self.state.l1_provider.clone(); + let rollup_config = self.state.rollup.clone(); tokio::spawn(async move { while let Some(query) = inbound_queries.recv().await { @@ -153,30 +172,33 @@ impl L1WatcherRpc { #[async_trait] impl NodeActor for L1WatcherRpc { type Error = L1WatcherRpcError; - type Context = L1WatcherRpcContext; + type InboundData = L1WatcherRpcContext; + type OutboundData = L1WatcherRpcOutboundChannels; + type State = L1WatcherRpcState; + + fn build(config: Self::State) -> (Self::OutboundData, Self) { + Self::new(config) + } async fn start( mut self, - L1WatcherRpcContext { - inbound_queries, - latest_head, - latest_finalized, - block_signer_sender, - cancellation, - }: Self::Context, + L1WatcherRpcContext { inbound_queries, cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { - let mut head_stream = - BlockStream::new(&self.l1_provider, BlockNumberOrTag::Latest, Duration::from_secs(13)) - .into_stream(); + let mut head_stream = BlockStream::new( + &self.state.l1_provider, + BlockNumberOrTag::Latest, + Duration::from_secs(13), + ) + .into_stream(); let mut finalized_stream = BlockStream::new( - &self.l1_provider, + &self.state.l1_provider, BlockNumberOrTag::Finalized, Duration::from_secs(60), ) .into_stream(); let inbound_query_processor = - self.start_query_processor(inbound_queries, latest_head.subscribe()); + self.start_query_processor(inbound_queries, self.latest_head.subscribe()); // Start the main processing loop. loop { @@ -199,16 +221,16 @@ impl NodeActor for L1WatcherRpc { } Some(head_block_info) => { // Send the head update event to all consumers. - latest_head.send_replace(Some(head_block_info)); + self.latest_head.send_replace(Some(head_block_info)); // For each log, attempt to construct a `SystemConfigLog`. // Build the `SystemConfigUpdate` from the log. // If the update is an Unsafe block signer update, send the address // to the block signer sender. let logs = self.fetch_logs(head_block_info.hash).await?; - let ecotone_active = self.config.is_ecotone_active(head_block_info.timestamp); + let ecotone_active = self.state.rollup.is_ecotone_active(head_block_info.timestamp); for log in logs { - if log.address() != self.config.l1_system_config_address { + if log.address() != self.state.rollup.l1_system_config_address { continue; // Skip logs not related to the system config. } @@ -218,7 +240,7 @@ impl NodeActor for L1WatcherRpc { target: "l1_watcher", "Unsafe block signer update: {unsafe_block_signer}" ); - if let Err(e) = block_signer_sender.send(unsafe_block_signer).await { + if let Err(e) = self.block_signer_sender.send(unsafe_block_signer).await { error!( target: "l1_watcher", "Error sending unsafe block signer update: {e}" @@ -233,7 +255,7 @@ impl NodeActor for L1WatcherRpc { return Err(L1WatcherRpcError::StreamEnded); } Some(finalized_block_info) => { - latest_finalized.send_replace(Some(finalized_block_info)); + self.latest_finalized.send_replace(Some(finalized_block_info)); } } } diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index 03c9f09265..7a318526c5 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -3,20 +3,21 @@ //! [NodeActor]: super::NodeActor mod traits; -pub use traits::{ActorContext, NodeActor}; +pub use traits::{CancellableContext, NodeActor}; mod runtime; -pub use runtime::{RuntimeActor, RuntimeContext, RuntimeLauncher}; +pub use runtime::{RuntimeActor, RuntimeContext, RuntimeOutboundData, RuntimeState}; mod engine; pub use engine::{ - EngineActor, EngineContext, EngineError, EngineLauncher, InboundEngineMessage, L2Finalizer, + EngineActor, EngineActorState, EngineContext, EngineError, EngineLauncher, EngineOutboundData, + InboundEngineMessage, L2Finalizer, }; mod supervisor; pub use supervisor::{ SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt, - SupervisorRpcServerExt, + SupervisorOutboundData, SupervisorRpcServerExt, }; mod rpc; @@ -24,11 +25,15 @@ pub use rpc::{RpcActor, RpcActorError, RpcContext}; mod derivation; pub use derivation::{ - DerivationActor, DerivationContext, DerivationError, InboundDerivationMessage, + DerivationActor, DerivationContext, DerivationError, DerivationOutboundChannels, + DerivationState, InboundDerivationMessage, }; mod l1_watcher_rpc; -pub use l1_watcher_rpc::{L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError}; +pub use l1_watcher_rpc::{ + L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError, L1WatcherRpcOutboundChannels, + L1WatcherRpcState, +}; mod network; -pub use network::{NetworkActor, NetworkActorError, NetworkContext}; +pub use network::{NetworkActor, NetworkActorError, NetworkContext, NetworkOutboundData}; diff --git a/crates/node/service/src/actors/network.rs b/crates/node/service/src/actors/network.rs index 947bcb61cd..4c0f0d776c 100644 --- a/crates/node/service/src/actors/network.rs +++ b/crates/node/service/src/actors/network.rs @@ -1,6 +1,6 @@ //! Network Actor -use crate::{NodeActor, actors::ActorContext}; +use crate::{NodeActor, actors::CancellableContext}; use alloy_primitives::Address; use async_trait::async_trait; use derive_more::Debug; @@ -42,31 +42,37 @@ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; pub struct NetworkActor { /// Network driver driver: Network, + /// The channel for sending unsafe blocks from the network actor. + blocks: mpsc::Sender, +} + +/// The outbound data for the network actor. +#[derive(Debug)] +pub struct NetworkOutboundData { + /// The unsafe block received from the network. + pub unsafe_block: mpsc::Receiver, } impl NetworkActor { /// Constructs a new [`NetworkActor`] given the [`Network`] - pub const fn new( - driver: Network, - blocks: mpsc::Sender, - signer: mpsc::Receiver
, - cancellation: CancellationToken, - ) -> (Self, NetworkContext) { - let actor = Self { driver }; - let context = NetworkContext { blocks, signer, cancellation }; - (actor, context) + pub fn new(driver: Network) -> (NetworkOutboundData, Self) { + let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024); + let actor = Self { driver, blocks: unsafe_block_tx }; + let outbound_data = NetworkOutboundData { unsafe_block: unsafe_block_rx }; + (outbound_data, actor) } } /// The communication context used by the network actor. #[derive(Debug)] pub struct NetworkContext { - blocks: mpsc::Sender, - signer: mpsc::Receiver
, - cancellation: CancellationToken, + /// A channel to receive the unsafe block signer address. + pub signer: mpsc::Receiver
, + /// Cancels the network actor. + pub cancellation: CancellationToken, } -impl ActorContext for NetworkContext { +impl CancellableContext for NetworkContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } @@ -75,11 +81,17 @@ impl ActorContext for NetworkContext { #[async_trait] impl NodeActor for NetworkActor { type Error = NetworkActorError; - type Context = NetworkContext; + type InboundData = NetworkContext; + type OutboundData = NetworkOutboundData; + type State = Network; + + fn build(state: Self::State) -> (Self::OutboundData, Self) { + Self::new(state) + } async fn start( mut self, - NetworkContext { blocks, mut signer, cancellation }: Self::Context, + NetworkContext { mut signer, cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { // Take the unsafe block receiver let mut unsafe_block_receiver = self.driver.unsafe_block_recv(); @@ -102,7 +114,7 @@ impl NodeActor for NetworkActor { block = unsafe_block_receiver.recv() => { match block { Ok(block) => { - match blocks.send(block).await { + match self.blocks.send(block).await { Ok(_) => debug!(target: "network", "Forwarded unsafe block"), Err(_) => warn!(target: "network", "Failed to forward unsafe block"), } diff --git a/crates/node/service/src/actors/rpc.rs b/crates/node/service/src/actors/rpc.rs index adc08a3093..0706b48d56 100644 --- a/crates/node/service/src/actors/rpc.rs +++ b/crates/node/service/src/actors/rpc.rs @@ -1,6 +1,6 @@ //! RPC Server Actor -use crate::{NodeActor, actors::ActorContext}; +use crate::{NodeActor, actors::CancellableContext}; use async_trait::async_trait; use jsonrpsee::core::RegisterMethodError; use kona_rpc::{RpcLauncher, RpcLauncherError}; @@ -32,10 +32,8 @@ pub struct RpcActor { impl RpcActor { /// Constructs a new [`RpcActor`] given the [`RpcLauncher`] and [`CancellationToken`]. - pub const fn new(launcher: RpcLauncher, cancellation: CancellationToken) -> (Self, RpcContext) { - let actor = Self { launcher }; - let context = RpcContext { cancellation }; - (actor, context) + pub const fn new(launcher: RpcLauncher) -> Self { + Self { launcher } } } @@ -43,10 +41,10 @@ impl RpcActor { #[derive(Debug)] pub struct RpcContext { /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, + pub cancellation: CancellationToken, } -impl ActorContext for RpcContext { +impl CancellableContext for RpcContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } @@ -55,11 +53,17 @@ impl ActorContext for RpcContext { #[async_trait] impl NodeActor for RpcActor { type Error = RpcActorError; - type Context = RpcContext; + type InboundData = RpcContext; + type OutboundData = (); + type State = RpcLauncher; + + fn build(state: Self::State) -> (Self::OutboundData, Self) { + ((), Self { launcher: state }) + } async fn start( mut self, - RpcContext { cancellation }: Self::Context, + RpcContext { cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { let restarts = self.launcher.restart_count(); diff --git a/crates/node/service/src/actors/runtime.rs b/crates/node/service/src/actors/runtime.rs index 2fefdad5f0..c7cb7c4b5e 100644 --- a/crates/node/service/src/actors/runtime.rs +++ b/crates/node/service/src/actors/runtime.rs @@ -6,16 +6,16 @@ use std::time::Duration; use tokio::sync::mpsc; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; -use crate::{NodeActor, actors::ActorContext}; +use crate::{NodeActor, actors::CancellableContext}; /// The communication context used by the runtime actor. #[derive(Debug)] pub struct RuntimeContext { - runtime_config: mpsc::Sender, - cancellation: CancellationToken, + /// Cancels the runtime actor. + pub cancellation: CancellationToken, } -impl ActorContext for RuntimeContext { +impl CancellableContext for RuntimeContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } @@ -27,76 +27,52 @@ impl ActorContext for RuntimeContext { /// using the [`RuntimeLoader`]. #[derive(Debug)] pub struct RuntimeActor { - /// The [`RuntimeLoader`]. - loader: RuntimeLoader, - /// The interval at which to load the runtime. - interval: Duration, -} - -impl RuntimeActor { - /// Constructs a new [`RuntimeActor`] from the given [`RuntimeLoader`]. - pub const fn new( - loader: RuntimeLoader, - interval: Duration, - runtime_config: mpsc::Sender, - cancellation: CancellationToken, - ) -> (Self, RuntimeContext) { - let actor = Self { loader, interval }; - let context = RuntimeContext { runtime_config, cancellation }; - (actor, context) - } + state: RuntimeState, + runtime_config: mpsc::Sender, } -/// The Runtime Launcher is a simple launcher for the [`RuntimeActor`]. +/// The state of the runtime actor. #[derive(Debug, Clone)] -pub struct RuntimeLauncher { +pub struct RuntimeState { /// The [`RuntimeLoader`]. - loader: RuntimeLoader, + pub loader: RuntimeLoader, /// The interval at which to load the runtime. - interval: Option, - /// The channel to send the [`RuntimeConfig`] to the engine actor. - tx: Option>, - /// The cancellation token. - cancellation: Option, + pub interval: Duration, } -impl RuntimeLauncher { - /// Constructs a new [`RuntimeLoader`] from the given runtime loading interval. - pub const fn new(loader: RuntimeLoader, interval: Option) -> Self { - Self { loader, interval, tx: None, cancellation: None } - } - - /// Sets the runtime config tx channel. - pub fn with_tx(self, tx: mpsc::Sender) -> Self { - Self { tx: Some(tx), ..self } - } - - /// Sets the [`CancellationToken`] on the [`RuntimeLauncher`]. - pub fn with_cancellation(self, cancellation: CancellationToken) -> Self { - Self { cancellation: Some(cancellation), ..self } - } +/// The outbound data for the runtime actor. +#[derive(Debug)] +pub struct RuntimeOutboundData { + /// The channel to send the [`RuntimeConfig`] to the engine actor. + pub runtime_config: mpsc::Receiver, +} - /// Launches the [`RuntimeActor`]. - pub fn launch(self) -> Option<(RuntimeActor, RuntimeContext)> { - let cancellation = self.cancellation?; - let tx = self.tx?; - if self.interval.is_some() { - info!(target: "runtime", interval = ?self.interval, "Launched Runtime Actor"); - } - self.interval.map(|i| RuntimeActor::new(self.loader, i, tx, cancellation)) +impl RuntimeActor { + /// Constructs a new [`RuntimeActor`] from the given [`RuntimeLoader`]. + pub fn new(state: RuntimeState) -> (RuntimeOutboundData, Self) { + let (runtime_config_tx, runtime_config_rx) = mpsc::channel(1024); + let outbound_data = RuntimeOutboundData { runtime_config: runtime_config_rx }; + let actor = Self { state, runtime_config: runtime_config_tx }; + (outbound_data, actor) } } #[async_trait] impl NodeActor for RuntimeActor { type Error = RuntimeLoaderError; - type Context = RuntimeContext; + type InboundData = RuntimeContext; + type OutboundData = RuntimeOutboundData; + type State = RuntimeState; + + fn build(state: Self::State) -> (Self::OutboundData, Self) { + Self::new(state) + } async fn start( mut self, - RuntimeContext { runtime_config, cancellation }: Self::Context, + RuntimeContext { cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { - let mut interval = tokio::time::interval(self.interval); + let mut interval = tokio::time::interval(self.state.interval); loop { tokio::select! { _ = cancellation.cancelled() => { @@ -104,9 +80,9 @@ impl NodeActor for RuntimeActor { return Ok(()); } _ = interval.tick() => { - let config = self.loader.load_latest().await?; + let config = self.state.loader.load_latest().await?; debug!(target: "runtime", ?config, "Loaded latest runtime config"); - if let Err(e) = runtime_config.send(config).await { + if let Err(e) = self.runtime_config.send(config).await { error!(target: "runtime", ?e, "Failed to send runtime config to the engine actor"); } } diff --git a/crates/node/service/src/actors/supervisor/actor.rs b/crates/node/service/src/actors/supervisor/actor.rs index f63c877fad..4c6515d864 100644 --- a/crates/node/service/src/actors/supervisor/actor.rs +++ b/crates/node/service/src/actors/supervisor/actor.rs @@ -1,6 +1,6 @@ //! Contains an actor for the supervisor rpc api. -use crate::{NodeActor, SupervisorExt, actors::ActorContext}; +use crate::{NodeActor, SupervisorExt, actors::CancellableContext}; use async_trait::async_trait; use futures::StreamExt; use kona_interop::{ControlEvent, ManagedEvent}; @@ -19,6 +19,17 @@ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; pub struct SupervisorActor { /// The module to communicate with the supervisor. supervisor_ext: E, + /// A channel to communicate with the engine. + engine_control: mpsc::Sender, +} + +/// The outbound data for the supervisor actor. +#[derive(Debug)] +pub struct SupervisorOutboundData { + /// A channel to communicate with the engine. + /// For now, we don't support sending control events to the engine. + #[allow(dead_code)] + engine_control: mpsc::Receiver, } /// The communication context used by the supervisor actor. @@ -26,13 +37,11 @@ pub struct SupervisorActor { pub struct SupervisorActorContext { /// A channel to receive `ManagedEvent`s from the kona node. node_events: mpsc::Receiver, - /// A channel to communicate with the engine. - engine_control: mpsc::Sender, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, } -impl ActorContext for SupervisorActorContext { +impl CancellableContext for SupervisorActorContext { fn cancelled(&self) -> WaitForCancellationFuture<'_> { self.cancellation.cancelled() } @@ -43,22 +52,31 @@ where E: SupervisorExt, { /// Creates a new instance of the supervisor actor. - pub const fn new(supervisor_ext: E) -> Self { - Self { supervisor_ext } + pub fn new(supervisor_ext: E) -> (SupervisorOutboundData, Self) { + let (engine_control_tx, engine_control_rx) = mpsc::channel(1024); + let actor = Self { supervisor_ext, engine_control: engine_control_tx }; + let outbound_data = SupervisorOutboundData { engine_control: engine_control_rx }; + (outbound_data, actor) } } #[async_trait] impl NodeActor for SupervisorActor where - E: SupervisorExt + Send + Sync, + E: SupervisorExt + Send + Sync + 'static, { type Error = SupervisorActorError; - type Context = SupervisorActorContext; + type InboundData = SupervisorActorContext; + type OutboundData = SupervisorOutboundData; + type State = E; + + fn build(state: Self::State) -> (Self::OutboundData, Self) { + Self::new(state) + } async fn start( mut self, - SupervisorActorContext { mut node_events, engine_control, cancellation }: Self::Context, + SupervisorActorContext { mut node_events, cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { let mut control_events = Box::pin(self.supervisor_ext.subscribe_control_events()); loop { @@ -76,7 +94,7 @@ where Some(control_event) = control_events.next() => { // TODO: Handle the control event (e.g., restart, stop, etc.). debug!(target: "supervisor", "Received control event: {:?}", control_event); - engine_control + self.engine_control .send(control_event) .await .map_err(|_| SupervisorActorError::ControlEventSendFailed)?; diff --git a/crates/node/service/src/actors/supervisor/mod.rs b/crates/node/service/src/actors/supervisor/mod.rs index 7fea867ae3..e308e8bfd3 100644 --- a/crates/node/service/src/actors/supervisor/mod.rs +++ b/crates/node/service/src/actors/supervisor/mod.rs @@ -4,7 +4,9 @@ mod traits; pub use traits::SupervisorExt; mod actor; -pub use actor::{SupervisorActor, SupervisorActorContext, SupervisorActorError}; +pub use actor::{ + SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorOutboundData, +}; mod ext; pub use ext::SupervisorRpcServerExt; diff --git a/crates/node/service/src/actors/traits.rs b/crates/node/service/src/actors/traits.rs index 362721c009..fc8a52989f 100644 --- a/crates/node/service/src/actors/traits.rs +++ b/crates/node/service/src/actors/traits.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use tokio_util::sync::WaitForCancellationFuture; /// The communication context used by the actor. -pub trait ActorContext: Send { +pub trait CancellableContext: Send { /// Returns a future that resolves when the actor is cancelled. fn cancelled(&self) -> WaitForCancellationFuture<'_>; } @@ -16,12 +16,21 @@ pub trait ActorContext: Send { /// - Perform background tasks. /// - Emit new events for other actors to process. #[async_trait] -pub trait NodeActor { +pub trait NodeActor: Send + 'static { /// The error type for the actor. type Error: std::fmt::Debug; /// The communication context used by the actor. - type Context: ActorContext; + /// These are the channels that the actor will use to receive messages from other actors. + type InboundData: CancellableContext; + /// The outbound communication channels used by the actor. + /// These are the channels that the actor will use to send messages to other actors. + type OutboundData: Sized; + /// The inner state for the actor. + type State; + + /// Builds the actor. + fn build(initial_state: Self::State) -> (Self::OutboundData, Self); /// Starts the actor. - async fn start(self, context: Self::Context) -> Result<(), Self::Error>; + async fn start(self, inbound_context: Self::InboundData) -> Result<(), Self::Error>; } diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index c08b3fdf29..e9246df767 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -14,11 +14,14 @@ pub use service::{NodeMode, RollupNode, RollupNodeBuilder, RollupNodeError, Roll mod actors; pub use actors::{ - ActorContext, DerivationActor, DerivationContext, DerivationError, EngineActor, EngineContext, - EngineError, EngineLauncher, InboundDerivationMessage, InboundEngineMessage, L1WatcherRpc, - L1WatcherRpcContext, L1WatcherRpcError, L2Finalizer, NetworkActor, NetworkActorError, - NetworkContext, NodeActor, RpcActor, RpcActorError, RpcContext, RuntimeActor, RuntimeContext, - RuntimeLauncher, SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt, + CancellableContext, DerivationActor, DerivationContext, DerivationError, + DerivationOutboundChannels, DerivationState, EngineActor, EngineActorState, EngineContext, + EngineError, EngineLauncher, EngineOutboundData, InboundDerivationMessage, + InboundEngineMessage, L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError, + L1WatcherRpcOutboundChannels, L1WatcherRpcState, L2Finalizer, NetworkActor, NetworkActorError, + NetworkContext, NetworkOutboundData, NodeActor, RpcActor, RpcActorError, RpcContext, + RuntimeActor, RuntimeContext, RuntimeOutboundData, RuntimeState, SupervisorActor, + SupervisorActorContext, SupervisorActorError, SupervisorExt, SupervisorOutboundData, SupervisorRpcServerExt, }; diff --git a/crates/node/service/src/service/core.rs b/crates/node/service/src/service/core.rs index 77a1ed270d..19deaf75a5 100644 --- a/crates/node/service/src/service/core.rs +++ b/crates/node/service/src/service/core.rs @@ -2,21 +2,27 @@ use super::NodeMode; use crate::{ - DerivationActor, EngineActor, EngineLauncher, NetworkActor, NodeActor, RpcActor, - RuntimeLauncher, SupervisorExt, service::spawn_and_wait, + DerivationContext, DerivationState, EngineContext, EngineLauncher, L1WatcherRpcContext, + L2Finalizer, NetworkContext, NodeActor, RpcContext, RuntimeContext, SupervisorActorContext, + SupervisorExt, + actors::{ + DerivationOutboundChannels, EngineActorState, EngineOutboundData, + L1WatcherRpcOutboundChannels, L1WatcherRpcState, NetworkOutboundData, RuntimeOutboundData, + RuntimeState, SupervisorOutboundData, + }, + service::spawn_and_wait, }; -use alloy_primitives::Address; +use alloy_provider::RootProvider; use async_trait::async_trait; use kona_derive::{Pipeline, SignalReceiver}; use kona_genesis::RollupConfig; use kona_p2p::Network; -use kona_protocol::{BlockInfo, L2BlockInfo}; use kona_rpc::{ - L1WatcherQueries, NetworkRpc, OpP2PApiServer, RollupNodeApiServer, RollupRpc, RpcLauncher, - RpcLauncherError, WsRPC, WsServer, + NetworkRpc, OpP2PApiServer, RollupNodeApiServer, RollupRpc, RpcLauncher, RpcLauncherError, + WsRPC, WsServer, }; -use std::fmt::Display; -use tokio::sync::{mpsc, oneshot, watch}; +use std::{fmt::Display, sync::Arc}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; /// The [`RollupNodeService`] trait defines the common interface for running a rollup node. @@ -49,11 +55,62 @@ use tokio_util::sync::CancellationToken; #[async_trait] pub trait RollupNodeService { /// The type of [`NodeActor`] to use for the DA watcher service. - type DataAvailabilityWatcher: NodeActor + Send + Sync + 'static; + type DataAvailabilityWatcher: NodeActor< + Error: Display, + InboundData = L1WatcherRpcContext, + State = L1WatcherRpcState, + OutboundData = L1WatcherRpcOutboundChannels, + >; + /// The type of derivation pipeline to use for the service. type DerivationPipeline: Pipeline + SignalReceiver + Send + Sync + 'static; + + /// The type of derivation actor to use for the service. + type DerivationActor: NodeActor< + Error: Display, + InboundData = DerivationContext, + State = DerivationState, + OutboundData = DerivationOutboundChannels, + >; + + /// The type of engine actor to use for the service. + type EngineActor: NodeActor< + Error: Display, + InboundData = EngineContext, + State = EngineActorState, + OutboundData = EngineOutboundData, + >; + + /// The type of network actor to use for the service. + type NetworkActor: NodeActor< + Error: Display, + InboundData = NetworkContext, + State = Network, + OutboundData = NetworkOutboundData, + >; + /// The supervisor ext provider. type SupervisorExt: SupervisorExt + Send + Sync + 'static; + + /// The type of supervisor actor to use for the service. + type SupervisorActor: NodeActor< + Error: Display, + InboundData = SupervisorActorContext, + State = Self::SupervisorExt, + OutboundData = SupervisorOutboundData, + >; + + /// The type of runtime actor to use for the service. + type RuntimeActor: NodeActor< + Error: Display, + InboundData = RuntimeContext, + State = RuntimeState, + OutboundData = RuntimeOutboundData, + >; + + /// The type of rpc actor to use for the service. + type RpcActor: NodeActor; + /// The type of error for the service's entrypoint. type Error: From + From @@ -63,18 +120,10 @@ pub trait RollupNodeService { fn mode(&self) -> NodeMode; /// Returns a reference to the rollup node's [`RollupConfig`]. - fn config(&self) -> &RollupConfig; - - /// Creates a new [`NodeActor`] instance that watches the data availability layer. The - /// `cancellation` token is used to gracefully shut down the actor. - fn new_da_watcher( - &self, - head_updates: watch::Sender>, - finalized_updates: watch::Sender>, - block_signer_tx: mpsc::Sender
, - cancellation: CancellationToken, - l1_watcher_inbound_queries: mpsc::Receiver, - ) -> (Self::DataAvailabilityWatcher, ::Context); + fn config(&self) -> Arc; + + /// Returns the [`RootProvider`] for the L1 chain. + fn l1_provider(&self) -> RootProvider; /// Creates a new instance of the [`Pipeline`] and initializes it. Returns the starting L2 /// forkchoice state and the initialized derivation pipeline. @@ -86,8 +135,8 @@ pub trait RollupNodeService { /// Creates a new [`Self::SupervisorExt`] to be used in the supervisor rpc actor. async fn supervisor_ext(&self) -> Option; - /// Returns the [`RuntimeLauncher`] for the node. - fn runtime(&self) -> RuntimeLauncher; + /// Returns the [`RuntimeState`] for the node. + fn runtime(&self) -> Option<&RuntimeState>; /// Returns the [`EngineLauncher`] fn engine(&self) -> EngineLauncher; @@ -110,41 +159,19 @@ pub trait RollupNodeService { // Create a global cancellation token for graceful shutdown of tasks. let cancellation = CancellationToken::new(); - // Create context for communication between actors. - let (sync_complete_tx, sync_complete_rx) = oneshot::channel(); - let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16); - let (runtime_config_tx, runtime_config_rx) = mpsc::channel(16); - let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16); - let (reset_request_tx, reset_request_rx) = mpsc::channel(16); - let (block_signer_tx, block_signer_rx) = mpsc::channel(16); - let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024); - let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024); - let (engine_query_sender, engine_query_recv) = mpsc::channel(1024); - let (head_updates_tx, head_updates_rx) = watch::channel(None); - let (finalized_updates_tx, finalized_updates_rx) = watch::channel(None); - let (engine_l2_safe_tx, engine_l2_safe_rx) = watch::channel(L2BlockInfo::default()); - // Create the DA watcher actor. - let da_watcher = self.new_da_watcher( - head_updates_tx, - finalized_updates_tx, - block_signer_tx, - cancellation.clone(), - l1_watcher_queries_recv, - ); + let ( + L1WatcherRpcOutboundChannels { latest_head, latest_finalized, block_signer_sender }, + da_watcher, + ) = Self::DataAvailabilityWatcher::build(L1WatcherRpcState { + rollup: self.config(), + l1_provider: self.l1_provider(), + }); // Create the derivation actor. let derivation_pipeline = self.init_derivation().await?; - let derivation = DerivationActor::new( - derivation_pipeline, - engine_l2_safe_rx, - sync_complete_rx, - derivation_signal_rx, - head_updates_rx, - derived_payload_tx, - reset_request_tx, - cancellation.clone(), - ); + let (DerivationOutboundChannels { attributes_out, reset_request_tx }, derivation) = + Self::DerivationActor::build(DerivationState::new(derivation_pipeline)); // TODO: get the supervisor ext. // TODO: use the supervisor ext to create the supervisor actor. @@ -154,47 +181,41 @@ pub trait RollupNodeService { // ) // Create the runtime configuration actor. - let runtime = self + let (runtime_config, runtime) = self .runtime() - .with_tx(runtime_config_tx) - .with_cancellation(cancellation.clone()) - .launch(); + .map(|state| { + let (RuntimeOutboundData { runtime_config }, runtime) = + Self::RuntimeActor::build(state.clone()); + (runtime_config, runtime) + }) + .unzip(); // Create the engine actor. let engine_launcher = self.engine(); let client = engine_launcher.client(); let engine_task_queue = engine_launcher.launch(); - let engine = EngineActor::new( - std::sync::Arc::new(self.config().clone()), - client, - engine_task_queue, - engine_l2_safe_tx, - sync_complete_tx, - derivation_signal_tx, - runtime_config_rx, - derived_payload_rx, - unsafe_block_rx, - reset_request_rx, - finalized_updates_rx, - Some(engine_query_recv), - cancellation.clone(), - ); + let ( + EngineOutboundData { engine_l2_safe_head_rx, sync_complete_rx, derivation_signal_rx }, + engine, + ) = Self::EngineActor::build(EngineActorState { + rollup: self.config(), + client: client.clone().into(), + engine: engine_task_queue, + }); // Create the p2p actor. - let (p2p_rpc_module, network) = { - let (driver, module) = self.init_network().await?; - let actor = - NetworkActor::new(driver, unsafe_block_tx, block_signer_rx, cancellation.clone()); - - (module, actor) - }; + let (driver, p2p_rpc_module) = self.init_network().await?; + let (NetworkOutboundData { unsafe_block }, network) = Self::NetworkActor::build(driver); // Create the RPC server actor. - let rpc = { + let (engine_query_recv, l1_watcher_queries_recv, (_, rpc)) = { let mut rpc_launcher = self.rpc().with_healthz()?; rpc_launcher.merge(p2p_rpc_module.into_rpc())?; + // Create context for communication between actors. + let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024); + let (engine_query_sender, engine_query_recv) = mpsc::channel(1024); let rollup_rpc = RollupRpc::new(engine_query_sender.clone(), l1_watcher_queries_sender); rpc_launcher.merge(rollup_rpc.into_rpc())?; @@ -204,18 +225,46 @@ pub trait RollupNodeService { .map_err(Self::Error::from)?; } - RpcActor::new(rpc_launcher, cancellation.clone()) + (engine_query_recv, l1_watcher_queries_recv, Self::RpcActor::build(rpc_launcher)) + }; + + let network_context = + NetworkContext { signer: block_signer_sender, cancellation: cancellation.clone() }; + + let da_watcher_context = L1WatcherRpcContext { + inbound_queries: l1_watcher_queries_recv, + cancellation: cancellation.clone(), }; + let derivation_context = DerivationContext { + l1_head_updates: latest_head, + engine_l2_safe_head: engine_l2_safe_head_rx, + el_sync_complete_rx: sync_complete_rx, + derivation_signal_rx, + cancellation: cancellation.clone(), + }; + + let engine_context = EngineContext { + runtime_config_rx: runtime_config, + attributes_rx: attributes_out, + unsafe_block_rx: unsafe_block, + reset_request_rx: reset_request_tx, + inbound_queries: engine_query_recv, + cancellation: cancellation.clone(), + finalizer: L2Finalizer::new(latest_finalized, client.into()), + }; + + let rpc_context = RpcContext { cancellation: cancellation.clone() }; + spawn_and_wait!( cancellation, actors = [ - runtime, - Some(network), - Some(da_watcher), - Some(derivation), - Some(engine), - Some(rpc) + runtime.map(|r| (r, RuntimeContext { cancellation: cancellation.clone() })), + Some((network, network_context)), + Some((da_watcher, da_watcher_context)), + Some((derivation, derivation_context)), + Some((engine, engine_context)), + Some((rpc, rpc_context)), ] ); Ok(()) diff --git a/crates/node/service/src/service/standard/builder.rs b/crates/node/service/src/service/standard/builder.rs index 618ee2e6ad..3292b3f04a 100644 --- a/crates/node/service/src/service/standard/builder.rs +++ b/crates/node/service/src/service/standard/builder.rs @@ -1,6 +1,6 @@ //! Contains the builder for the [`RollupNode`]. -use crate::{EngineLauncher, NodeMode, RollupNode, RuntimeLauncher}; +use crate::{EngineLauncher, NodeMode, RollupNode, actors::RuntimeState}; use alloy_primitives::Bytes; use alloy_provider::RootProvider; use alloy_rpc_client::RpcClient; @@ -145,10 +145,10 @@ impl RollupNodeBuilder { jwt_secret, }; - let runtime_launcher = RuntimeLauncher::new( - kona_sources::RuntimeLoader::new(l1_rpc_url, rollup_config.clone()), - self.runtime_load_interval, - ); + let runtime_launcher = self.runtime_load_interval.map(|load_interval| RuntimeState { + loader: kona_sources::RuntimeLoader::new(l1_rpc_url, rollup_config.clone()), + interval: load_interval, + }); let supervisor_rpc = self.supervisor_rpc_config.unwrap_or_default(); let p2p_config = self.p2p_config.expect("P2P config not set"); diff --git a/crates/node/service/src/service/standard/node.rs b/crates/node/service/src/service/standard/node.rs index 7707386046..8f587f8c5e 100644 --- a/crates/node/service/src/service/standard/node.rs +++ b/crates/node/service/src/service/standard/node.rs @@ -1,17 +1,14 @@ //! Contains the [`RollupNode`] implementation. use crate::{ - EngineLauncher, L1WatcherRpc, NodeActor, NodeMode, RollupNodeBuilder, RollupNodeError, - RollupNodeService, RuntimeLauncher, SupervisorRpcServerExt, + DerivationActor, EngineActor, EngineLauncher, L1WatcherRpc, NetworkActor, NodeMode, + RollupNodeBuilder, RollupNodeError, RollupNodeService, RpcActor, RuntimeActor, SupervisorActor, + SupervisorRpcServerExt, actors::RuntimeState, }; -use alloy_primitives::Address; use alloy_provider::RootProvider; use async_trait::async_trait; -use kona_protocol::BlockInfo; use op_alloy_network::Optimism; use std::sync::Arc; -use tokio::sync::{mpsc, watch}; -use tokio_util::sync::CancellationToken; use kona_genesis::RollupConfig; use kona_p2p::{Config, Network, NetworkBuilder}; @@ -19,9 +16,7 @@ use kona_providers_alloy::{ AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlineBlobProvider, OnlinePipeline, }; -use kona_rpc::{ - L1WatcherQueries, NetworkRpc, RpcLauncher, SupervisorRpcConfig, SupervisorRpcServer, -}; +use kona_rpc::{NetworkRpc, RpcLauncher, SupervisorRpcConfig, SupervisorRpcServer}; /// The size of the cache used in the derivation pipeline's providers. const DERIVATION_PROVIDER_CACHE_SIZE: usize = 1024; @@ -46,8 +41,8 @@ pub struct RollupNode { pub(crate) rpc_launcher: RpcLauncher, /// The P2P [`Config`] for the node. pub(crate) p2p_config: Config, - /// The [`RuntimeLauncher`] for the runtime loading service. - pub(crate) runtime_launcher: RuntimeLauncher, + /// The [`RuntimeState`] for the runtime loading service. + pub(crate) runtime_launcher: Option, /// The supervisor rpc server config. pub(crate) supervisor_rpc: SupervisorRpcConfig, } @@ -66,32 +61,23 @@ impl RollupNodeService for RollupNode { type SupervisorExt = SupervisorRpcServerExt; type Error = RollupNodeError; + type RuntimeActor = RuntimeActor; + type RpcActor = RpcActor; + type EngineActor = EngineActor; + type NetworkActor = NetworkActor; + type DerivationActor = DerivationActor; + type SupervisorActor = SupervisorActor; + fn mode(&self) -> NodeMode { self.mode } - fn config(&self) -> &RollupConfig { - &self.config + fn config(&self) -> Arc { + self.config.clone() } - fn new_da_watcher( - &self, - head_updates: watch::Sender>, - finalized_updates: watch::Sender>, - block_signer_tx: mpsc::Sender
, - cancellation: CancellationToken, - l1_watcher_inbound_queries: mpsc::Receiver, - ) -> (Self::DataAvailabilityWatcher, ::Context) - { - L1WatcherRpc::new( - self.config.clone(), - self.l1_provider.clone(), - head_updates, - finalized_updates, - block_signer_tx, - cancellation, - l1_watcher_inbound_queries, - ) + fn l1_provider(&self) -> RootProvider { + self.l1_provider.clone() } async fn supervisor_ext(&self) -> Option { @@ -112,8 +98,8 @@ impl RollupNodeService for RollupNode { Some(SupervisorRpcServerExt::new(handle, events_tx, control_rx)) } - fn runtime(&self) -> RuntimeLauncher { - self.runtime_launcher.clone() + fn runtime(&self) -> Option<&RuntimeState> { + self.runtime_launcher.as_ref() } fn engine(&self) -> EngineLauncher {