diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index ea03e88a59..43ba6d214c 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -1,6 +1,6 @@ //! [NodeActor] implementation for the derivation sub-routine. -use crate::{Metrics, NodeActor}; +use crate::{Metrics, NodeActor, actors::ActorContext}; use async_trait::async_trait; use kona_derive::{ ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal, @@ -12,7 +12,7 @@ use tokio::{ select, sync::{mpsc, oneshot, watch}, }; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// The [NodeActor] for the derivation sub-routine. /// @@ -26,8 +26,20 @@ where { /// The derivation pipeline. pipeline: P, + /// A flag indicating whether or not derivation is idle. Derivation is considered idle when it + /// has yielded to wait for more data on the DAL. + derivation_idle: bool, + /// A flag indicating whether or not derivation is waiting for a signal. When waiting for a + /// signal, derivation cannot process any incoming events. + waiting_for_signal: bool, +} - /// The l2 safe head from the engine. +/// The communication context used by the derivation actor. +#[derive(Debug)] +pub struct DerivationContext { + /// The receiver for L1 head update notifications. + l1_head_updates: watch::Receiver>, + /// The receiver for L2 safe head update notifications. engine_l2_safe_head: watch::Receiver, /// A receiver that tells derivation to begin. Completing EL sync consumes the instance. el_sync_complete_rx: oneshot::Receiver<()>, @@ -50,26 +62,21 @@ where /// /// Specs: derivation_signal_rx: mpsc::Receiver, - /// The receiver for L1 head update notifications. - l1_head_updates: watch::Receiver>, - /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. attributes_out: mpsc::Sender, /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward /// them to the engine. reset_request_tx: mpsc::Sender<()>, - - /// A flag indicating whether or not derivation is idle. Derivation is considered idle when it - /// has yielded to wait for more data on the DAL. - derivation_idle: bool, - /// A flag indicating whether or not derivation is waiting for a signal. When waiting for a - /// signal, derivation cannot process any incoming events. - waiting_for_signal: bool, - /// The cancellation token, shared between all tasks. cancellation: CancellationToken, } +impl ActorContext for DerivationContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} + impl

DerivationActor

where P: Pipeline + SignalReceiver, @@ -85,19 +92,18 @@ where attributes_out: mpsc::Sender, reset_request_tx: mpsc::Sender<()>, cancellation: CancellationToken, - ) -> Self { - Self { - pipeline, + ) -> (Self, DerivationContext) { + let actor = Self { pipeline, derivation_idle: true, waiting_for_signal: false }; + let context = DerivationContext { + l1_head_updates, engine_l2_safe_head, el_sync_complete_rx, derivation_signal_rx, - l1_head_updates, attributes_out, reset_request_tx, - derivation_idle: true, - waiting_for_signal: false, cancellation, - } + }; + (actor, context) } /// Handles a [`Signal`] received over the derivation signal receiver channel. @@ -116,12 +122,16 @@ where /// Attempts to step the derivation pipeline forward as much as possible in order to produce the /// next safe payload. - async fn produce_next_attributes(&mut self) -> Result { + async fn produce_next_attributes( + &mut self, + engine_l2_safe_head: &watch::Receiver, + reset_request_tx: &mpsc::Sender<()>, + ) -> Result { // As we start the safe head at the disputed block's parent, we step the pipeline until the // first attributes are produced. All batches at and before the safe head will be // dropped, so the first payload will always be the disputed one. loop { - let l2_safe_head = *self.engine_l2_safe_head.borrow(); + let l2_safe_head = *engine_l2_safe_head.borrow(); match self.pipeline.step(l2_safe_head).await { StepResult::PreparedAttributes => { /* continue; attributes will be sent off. */ } StepResult::AdvancedOrigin => { @@ -185,7 +195,7 @@ where .rollup_config() .is_interop_active(l2_safe_head.block_info.timestamp) { - self.reset_request_tx.send(()).await.map_err(|e| { + reset_request_tx.send(()).await.map_err(|e| { error!(target: "derivation", ?e, "Failed to send reset request"); DerivationError::Sender(Box::new(e)) })?; @@ -222,9 +232,16 @@ where /// attributes are successfully produced. If the pipeline step errors, /// the same [`L2BlockInfo`] is used again. If the [`L2BlockInfo`] is the /// zero hash, the pipeline is not stepped on. - async fn process(&mut self, msg: InboundDerivationMessage) -> Result<(), DerivationError> { + async fn process( + &mut self, + msg: InboundDerivationMessage, + engine_l2_safe_head: &mut watch::Receiver, + el_sync_complete_rx: &oneshot::Receiver<()>, + attributes_out: &mpsc::Sender, + reset_request_tx: &mpsc::Sender<()>, + ) -> Result<(), DerivationError> { // Only attempt derivation once the engine finishes syncing. - if !self.el_sync_complete_rx.is_terminated() { + if !el_sync_complete_rx.is_terminated() { trace!(target: "derivation", "Engine not ready, skipping derivation"); return Ok(()); } else if self.waiting_for_signal { @@ -236,7 +253,7 @@ where // check if the safe head has changed before continuing. This is to prevent attempts to // progress the pipeline while it is in the middle of processing a channel. if !(self.derivation_idle || msg == InboundDerivationMessage::SafeHeadUpdated) { - match self.engine_l2_safe_head.has_changed() { + match engine_l2_safe_head.has_changed() { Ok(true) => { /* Proceed to produce next payload attributes. */ } Ok(false) => { trace!(target: "derivation", "Safe head hasn't changed, skipping derivation."); @@ -250,7 +267,7 @@ where } // Wait for the engine to initialize unknowns prior to kicking off derivation. - let engine_safe_head = *self.engine_l2_safe_head.borrow(); + let engine_safe_head = *engine_l2_safe_head.borrow(); if engine_safe_head.block_info.hash.is_zero() { warn!(target: "derivation", engine_safe_head = ?engine_safe_head.block_info.number, "Waiting for engine to initialize state prior to derivation."); return Ok(()); @@ -258,26 +275,27 @@ where // Advance the pipeline as much as possible, new data may be available or there still may be // payloads in the attributes queue. - let payload_attrs = match self.produce_next_attributes().await { - Ok(attrs) => attrs, - Err(DerivationError::Yield) => { - // Yield until more data is available. - self.derivation_idle = true; - return Ok(()); - } - Err(e) => { - return Err(e); - } - }; + let payload_attrs = + match self.produce_next_attributes(engine_l2_safe_head, reset_request_tx).await { + Ok(attrs) => attrs, + Err(DerivationError::Yield) => { + // Yield until more data is available. + self.derivation_idle = true; + return Ok(()); + } + Err(e) => { + return Err(e); + } + }; // Mark derivation as busy. self.derivation_idle = false; // Mark the L2 safe head as seen. - self.engine_l2_safe_head.borrow_and_update(); + engine_l2_safe_head.borrow_and_update(); // Send payload attributes out for processing. - self.attributes_out + attributes_out .send(payload_attrs) .await .map_err(|e| DerivationError::Sender(Box::new(e)))?; @@ -292,20 +310,32 @@ where P: Pipeline + SignalReceiver + Send + Sync, { type Error = DerivationError; + type Context = DerivationContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + DerivationContext { + mut l1_head_updates, + mut engine_l2_safe_head, + mut el_sync_complete_rx, + mut derivation_signal_rx, + attributes_out, + reset_request_tx, + cancellation, + }: Self::Context, + ) -> Result<(), Self::Error> { loop { select! { biased; - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { info!( target: "derivation", "Received shutdown signal. Exiting derivation task." ); return Ok(()); } - signal = self.derivation_signal_rx.recv() => { + signal = derivation_signal_rx.recv() => { let Some(signal) = signal else { error!( target: "derivation", @@ -318,7 +348,7 @@ where self.signal(signal).await; self.waiting_for_signal = false; } - msg = self.l1_head_updates.changed() => { + msg = l1_head_updates.changed() => { if let Err(err) = msg { error!( target: "derivation", @@ -328,15 +358,15 @@ where return Ok(()); } - self.process(InboundDerivationMessage::NewDataAvailable).await?; + self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?; } - _ = self.engine_l2_safe_head.changed() => { - self.process(InboundDerivationMessage::SafeHeadUpdated).await?; + _ = engine_l2_safe_head.changed() => { + self.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?; } - _ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => { + _ = &mut el_sync_complete_rx, if !el_sync_complete_rx.is_terminated() => { info!(target: "derivation", "Engine finished syncing, starting derivation."); // Optimistically process the first message. - self.process(InboundDerivationMessage::NewDataAvailable).await?; + self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?; } } } diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 8e7d5fe1ad..da82bc320c 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -18,10 +18,10 @@ use tokio::{ sync::{mpsc, oneshot, watch}, task::JoinHandle, }; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use url::Url; -use crate::NodeActor; +use crate::{NodeActor, actors::ActorContext}; /// The [`EngineActor`] is responsible for managing the operations sent to the execution layer's /// Engine API. To accomplish this, it uses the [`Engine`] task queue to order Engine API @@ -36,19 +36,22 @@ pub struct EngineActor { engine: Engine, /// The [`L2Finalizer`], used to finalize L2 blocks. finalizer: L2Finalizer, + /// Handler for inbound queries to the engine. + inbound_queries: Option>, +} - /// The channel to send the l2 safe head to the derivation actor. +/// The communication context used by the engine actor. +#[derive(Debug)] +pub struct EngineContext { + /// The receiver for L2 safe head update notifications. engine_l2_safe_head_tx: watch::Sender, /// A channel to send a signal that EL sync has completed. Informs the derivation actor to /// start. Because the EL sync state machine within [`EngineState`] can only complete once, /// this channel is consumed after the first successful send. Future cases where EL sync is /// re-triggered can occur, but we will not block derivation on it. - sync_complete_tx: Option>, + sync_complete_tx: oneshot::Sender<()>, /// A way for the engine actor to send a [`Signal`] back to the derivation actor. derivation_signal_tx: mpsc::Sender, - - /// Handler for inbound queries to the engine. - inbound_queries: Option>, /// A channel to receive [`RuntimeConfig`] from the runtime actor. runtime_config_rx: mpsc::Receiver, /// A channel to receive [`OpAttributesWithParent`] from the derivation actor. @@ -61,6 +64,12 @@ pub struct EngineActor { cancellation: CancellationToken, } +impl ActorContext for EngineContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} + impl EngineActor { /// Constructs a new [`EngineActor`] from the params. #[allow(clippy::too_many_arguments)] @@ -78,44 +87,52 @@ impl EngineActor { finalized_block_rx: watch::Receiver>, inbound_queries: Option>, cancellation: CancellationToken, - ) -> Self { + ) -> (Self, EngineContext) { let client = Arc::new(client); - Self { + let actor = Self { config, client: Arc::clone(&client), engine, finalizer: L2Finalizer::new(finalized_block_rx, client), + inbound_queries, + }; + let context = EngineContext { engine_l2_safe_head_tx, - sync_complete_tx: Some(sync_complete_tx), + sync_complete_tx, derivation_signal_tx, runtime_config_rx, attributes_rx, unsafe_block_rx, reset_request_rx, - inbound_queries, cancellation, - } + }; + (actor, context) } /// Resets the inner [`Engine`] and propagates the reset to the derivation actor. - pub async fn reset(&mut self) -> Result<(), EngineError> { + pub async fn reset( + &mut self, + derivation_signal_tx: &mpsc::Sender, + engine_l2_safe_head_tx: &watch::Sender, + cancellation: &CancellationToken, + ) -> Result<(), EngineError> { // Reset the engine. let (l2_safe_head, l1_origin, system_config) = self.engine.reset(self.client.clone(), &self.config).await?; // Signal the derivation actor to reset. let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; - match self.derivation_signal_tx.send(signal.signal()).await { + match derivation_signal_tx.send(signal.signal()).await { Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"), Err(err) => { error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); } } // Attempt to update the safe head following the reset. - self.maybe_update_safe_head(); + self.maybe_update_safe_head(engine_l2_safe_head_tx); // Clear the queue of L2 blocks awaiting finalization. self.finalizer.clear(); @@ -124,14 +141,20 @@ impl EngineActor { } /// Drains the inner [`Engine`] task queue and attempts to update the safe head. - async fn drain(&mut self) -> Result<(), EngineError> { + async fn drain( + &mut self, + derivation_signal_tx: &mpsc::Sender, + sync_complete_tx: &mut Option>, + engine_l2_safe_head_tx: &watch::Sender, + cancellation: &CancellationToken, + ) -> Result<(), EngineError> { match self.engine.drain().await { Ok(_) => { trace!(target: "engine", "[ENGINE] tasks drained"); } Err(EngineTaskError::Reset(err)) => { warn!(target: "engine", ?err, "Received reset request"); - self.reset().await?; + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, cancellation).await?; } Err(EngineTaskError::Flush(err)) => { // This error is encountered when the payload is marked INVALID @@ -139,20 +162,20 @@ impl EngineActor { // a "deposits-only" block and re-executed. At the same time, // the channel and any remaining buffered batches are flushed. warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline."); - match self.derivation_signal_tx.send(Signal::FlushChannel).await { + match derivation_signal_tx.send(Signal::FlushChannel).await { Ok(_) => { debug!(target: "engine", "Sent flush signal to derivation actor") } Err(err) => { error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor."); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); } } } Err(err @ EngineTaskError::Critical(_)) => { error!(target: "engine", ?err, "Critical error draining engine tasks"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(err.into()); } Err(EngineTaskError::Temporary(err)) => { @@ -160,30 +183,42 @@ impl EngineActor { } } - self.maybe_update_safe_head(); - self.check_el_sync().await?; + self.maybe_update_safe_head(engine_l2_safe_head_tx); + self.check_el_sync( + derivation_signal_tx, + engine_l2_safe_head_tx, + sync_complete_tx, + cancellation, + ) + .await?; Ok(()) } /// Checks if the EL has finished syncing, notifying the derivation actor if it has. - async fn check_el_sync(&mut self) -> Result<(), EngineError> { + async fn check_el_sync( + &mut self, + derivation_signal_tx: &mpsc::Sender, + engine_l2_safe_head_tx: &watch::Sender, + sync_complete_tx: &mut Option>, + cancellation: &CancellationToken, + ) -> Result<(), EngineError> { if self.engine.state().el_sync_finished { - let Some(sender) = self.sync_complete_tx.take() else { + let Some(sync_complete_tx) = std::mem::take(sync_complete_tx) else { return Ok(()); }; // If the sync status is finished, we can reset the engine and start derivation. info!(target: "engine", "Performing initial engine reset"); - self.reset().await?; - sender.send(()).ok(); + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, cancellation).await?; + sync_complete_tx.send(()).ok(); } Ok(()) } /// Attempts to update the safe head via the watch channel. - fn maybe_update_safe_head(&self) { + fn maybe_update_safe_head(&self, engine_l2_safe_head_tx: &watch::Sender) { let state_safe_head = self.engine.state().safe_head(); let update = |head: &mut L2BlockInfo| { if head != &state_safe_head { @@ -192,7 +227,7 @@ impl EngineActor { } false }; - let sent = self.engine_l2_safe_head_tx.send_if_modified(update); + let sent = engine_l2_safe_head_tx.send_if_modified(update); trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update"); } @@ -218,11 +253,17 @@ impl EngineActor { }) } - async fn process(&mut self, msg: InboundEngineMessage) -> Result<(), EngineError> { + async fn process( + &mut self, + msg: InboundEngineMessage, + derivation_signal_tx: &mpsc::Sender, + engine_l2_safe_head_tx: &watch::Sender, + cancellation: &CancellationToken, + ) -> Result<(), EngineError> { match msg { InboundEngineMessage::ResetRequest => { warn!(target: "engine", "Received reset request"); - self.reset().await?; + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, cancellation).await?; } InboundEngineMessage::UnsafeBlockReceived(envelope) => { let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new( @@ -273,20 +314,43 @@ impl EngineActor { #[async_trait] impl NodeActor for EngineActor { type Error = EngineError; + type Context = EngineContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + EngineContext { + engine_l2_safe_head_tx, + sync_complete_tx, + derivation_signal_tx, + mut runtime_config_rx, + mut attributes_rx, + mut unsafe_block_rx, + mut reset_request_rx, + cancellation, + }: Self::Context, + ) -> Result<(), Self::Error> { // Start the engine query server in a separate task to avoid blocking the main task. let handle = std::mem::take(&mut self.inbound_queries) .map(|inbound_query_channel| self.start_query_task(inbound_query_channel)); + // The sync complete tx is consumed after the first successful send. Hence we need to wrap + // it in an `Option` to ensure we satisfy the borrow checker. + let mut sync_complete_tx = Some(sync_complete_tx); + loop { // Attempt to drain all outstanding tasks from the engine queue before adding new ones. - self.drain().await?; + self.drain( + &derivation_signal_tx, + &mut sync_complete_tx, + &engine_l2_safe_head_tx, + &cancellation, + ) + .await?; tokio::select! { biased; - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { warn!(target: "engine", "EngineActor received shutdown signal."); if let Some(handle) = handle { @@ -296,45 +360,45 @@ impl NodeActor for EngineActor { return Ok(()); } - reset = self.reset_request_rx.recv() => { + reset = reset_request_rx.recv() => { if reset.is_none() { error!(target: "engine", "Reset request receiver closed unexpectedly"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); } - self.process(InboundEngineMessage::ResetRequest).await?; + self.process(InboundEngineMessage::ResetRequest, &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; } - unsafe_block = self.unsafe_block_rx.recv() => { + unsafe_block = unsafe_block_rx.recv() => { let Some(envelope) = unsafe_block else { error!(target: "engine", "Unsafe block receiver closed unexpectedly"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - self.process(InboundEngineMessage::UnsafeBlockReceived(envelope.into())).await?; + self.process(InboundEngineMessage::UnsafeBlockReceived(envelope.into()), &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; } - attributes = self.attributes_rx.recv() => { + attributes = attributes_rx.recv() => { let Some(attributes) = attributes else { error!(target: "engine", "Attributes receiver closed unexpectedly"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - self.process(InboundEngineMessage::DerivedAttributesReceived(attributes.into())).await?; + self.process(InboundEngineMessage::DerivedAttributesReceived(attributes.into()), &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; } - config = self.runtime_config_rx.recv() => { + config = runtime_config_rx.recv() => { let Some(config) = config else { error!(target: "engine", "Runtime config receiver closed unexpectedly"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - self.process(InboundEngineMessage::RuntimeConfigUpdate(config.into())).await?; + self.process(InboundEngineMessage::RuntimeConfigUpdate(config.into()), &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; } msg = self.finalizer.new_finalized_block() => { if let Err(err) = msg { error!(target: "engine", ?err, "L1 finalized block receiver closed unexpectedly"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(EngineError::ChannelClosed); } - self.process(InboundEngineMessage::NewFinalizedL1Block).await?; + self.process(InboundEngineMessage::NewFinalizedL1Block, &derivation_signal_tx, &engine_l2_safe_head_tx, &cancellation).await?; } } } diff --git a/crates/node/service/src/actors/engine/mod.rs b/crates/node/service/src/actors/engine/mod.rs index edfd7df77e..bfbd764845 100644 --- a/crates/node/service/src/actors/engine/mod.rs +++ b/crates/node/service/src/actors/engine/mod.rs @@ -1,7 +1,7 @@ //! The [`EngineActor`] and its components. mod actor; -pub use actor::{EngineActor, EngineLauncher, InboundEngineMessage}; +pub use actor::{EngineActor, EngineContext, EngineLauncher, InboundEngineMessage}; mod error; pub use error::EngineError; diff --git a/crates/node/service/src/actors/l1_watcher_rpc.rs b/crates/node/service/src/actors/l1_watcher_rpc.rs index d376ae3eda..437f4772d9 100644 --- a/crates/node/service/src/actors/l1_watcher_rpc.rs +++ b/crates/node/service/src/actors/l1_watcher_rpc.rs @@ -1,7 +1,7 @@ //! [`NodeActor`] implementation for an L1 chain watcher that polls for L1 block updates over HTTP //! RPC. -use crate::NodeActor; +use crate::{NodeActor, actors::ActorContext}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, B256}; use alloy_provider::{Provider, RootProvider}; @@ -24,7 +24,7 @@ use tokio::{ }, task::JoinHandle, }; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// An L1 chain watcher that checks for L1 block updates over RPC. #[derive(Debug)] @@ -34,17 +34,27 @@ pub struct L1WatcherRpc { config: Arc, /// The L1 provider. l1_provider: RootProvider, +} + +/// The communication context used by the L1 watcher actor. +#[derive(Debug)] +pub struct L1WatcherRpcContext { + /// The block signer sender. + block_signer_sender: mpsc::Sender

, + /// The inbound queries to the L1 watcher. + inbound_queries: tokio::sync::mpsc::Receiver, /// The latest L1 head block. latest_head: watch::Sender>, /// The latest L1 finalized block. latest_finalized: watch::Sender>, - /// The latest - /// The block signer sender. - block_signer_sender: mpsc::Sender
, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, - /// Inbound queries to the L1 watcher. - inbound_queries: Option>, +} + +impl ActorContext for L1WatcherRpcContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } } impl L1WatcherRpc { @@ -56,18 +66,17 @@ impl L1WatcherRpc { finalized_updates: watch::Sender>, block_signer_sender: mpsc::Sender
, cancellation: CancellationToken, - // Can be None if we disable communication with the L1 watcher. - inbound_queries: Option>, - ) -> Self { - Self { - config, - l1_provider, + inbound_queries: mpsc::Receiver, + ) -> (Self, L1WatcherRpcContext) { + let actor = Self { config, l1_provider }; + let context = L1WatcherRpcContext { + block_signer_sender, + inbound_queries, latest_head: head_updates, latest_finalized: finalized_updates, - block_signer_sender, cancellation, - inbound_queries, - } + }; + (actor, context) } /// Fetches logs for the given block hash. @@ -84,11 +93,11 @@ impl L1WatcherRpc { fn start_query_processor( &self, mut inbound_queries: tokio::sync::mpsc::Receiver, + head_updates_recv: watch::Receiver>, ) -> JoinHandle<()> { // Start the inbound query processor in a separate task to avoid blocking the main task. // We can cheaply clone the l1 provider here because it is an Arc. let l1_provider = self.l1_provider.clone(); - let head_updates_recv = self.latest_head.subscribe(); let rollup_config = self.config.clone(); tokio::spawn(async move { @@ -144,8 +153,18 @@ impl L1WatcherRpc { #[async_trait] impl NodeActor for L1WatcherRpc { type Error = L1WatcherRpcError; + type Context = L1WatcherRpcContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + L1WatcherRpcContext { + inbound_queries, + latest_head, + latest_finalized, + block_signer_sender, + cancellation, + }: Self::Context, + ) -> Result<(), Self::Error> { let mut head_stream = BlockStream::new(&self.l1_provider, BlockNumberOrTag::Latest, Duration::from_secs(13)) .into_stream(); @@ -156,14 +175,13 @@ impl NodeActor for L1WatcherRpc { ) .into_stream(); - let inbound_queries = std::mem::take(&mut self.inbound_queries); let inbound_query_processor = - inbound_queries.map(|queries| self.start_query_processor(queries)); + self.start_query_processor(inbound_queries, latest_head.subscribe()); // Start the main processing loop. loop { select! { - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { // Exit the task on cancellation. info!( target: "l1_watcher", @@ -171,7 +189,7 @@ impl NodeActor for L1WatcherRpc { ); // Kill the inbound query processor. - if let Some(inbound_query_processor) = inbound_query_processor { inbound_query_processor.abort() } + inbound_query_processor.abort(); return Ok(()); }, @@ -181,7 +199,7 @@ impl NodeActor for L1WatcherRpc { } Some(head_block_info) => { // Send the head update event to all consumers. - self.latest_head.send_replace(Some(head_block_info)); + latest_head.send_replace(Some(head_block_info)); // For each log, attempt to construct a `SystemConfigLog`. // Build the `SystemConfigUpdate` from the log. @@ -200,7 +218,7 @@ impl NodeActor for L1WatcherRpc { target: "l1_watcher", "Unsafe block signer update: {unsafe_block_signer}" ); - if let Err(e) = self.block_signer_sender.send(unsafe_block_signer).await { + if let Err(e) = block_signer_sender.send(unsafe_block_signer).await { error!( target: "l1_watcher", "Error sending unsafe block signer update: {e}" @@ -215,7 +233,7 @@ impl NodeActor for L1WatcherRpc { return Err(L1WatcherRpcError::StreamEnded); } Some(finalized_block_info) => { - self.latest_finalized.send_replace(Some(finalized_block_info)); + latest_finalized.send_replace(Some(finalized_block_info)); } } } diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index 2972eb490d..03c9f09265 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -3,27 +3,32 @@ //! [NodeActor]: super::NodeActor mod traits; -pub use traits::NodeActor; +pub use traits::{ActorContext, NodeActor}; mod runtime; -pub use runtime::{RuntimeActor, RuntimeLauncher}; +pub use runtime::{RuntimeActor, RuntimeContext, RuntimeLauncher}; mod engine; -pub use engine::{EngineActor, EngineError, EngineLauncher, InboundEngineMessage, L2Finalizer}; +pub use engine::{ + EngineActor, EngineContext, EngineError, EngineLauncher, InboundEngineMessage, L2Finalizer, +}; mod supervisor; pub use supervisor::{ - SupervisorActor, SupervisorActorError, SupervisorExt, SupervisorRpcServerExt, + SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt, + SupervisorRpcServerExt, }; mod rpc; -pub use rpc::{RpcActor, RpcActorError}; +pub use rpc::{RpcActor, RpcActorError, RpcContext}; mod derivation; -pub use derivation::{DerivationActor, DerivationError, InboundDerivationMessage}; +pub use derivation::{ + DerivationActor, DerivationContext, DerivationError, InboundDerivationMessage, +}; mod l1_watcher_rpc; -pub use l1_watcher_rpc::{L1WatcherRpc, L1WatcherRpcError}; +pub use l1_watcher_rpc::{L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError}; mod network; -pub use network::{NetworkActor, NetworkActorError}; +pub use network::{NetworkActor, NetworkActorError, NetworkContext}; diff --git a/crates/node/service/src/actors/network.rs b/crates/node/service/src/actors/network.rs index 950283f629..947bcb61cd 100644 --- a/crates/node/service/src/actors/network.rs +++ b/crates/node/service/src/actors/network.rs @@ -1,6 +1,6 @@ //! Network Actor -use crate::NodeActor; +use crate::{NodeActor, actors::ActorContext}; use alloy_primitives::Address; use async_trait::async_trait; use derive_more::Debug; @@ -9,7 +9,7 @@ use libp2p::TransportError; use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; use thiserror::Error; use tokio::{select, sync::mpsc}; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// The network actor handles two core networking components of the rollup node: /// - *discovery*: Peer discovery over UDP using discv5. @@ -42,12 +42,6 @@ use tokio_util::sync::CancellationToken; pub struct NetworkActor { /// Network driver driver: Network, - /// The sender for [`OpExecutionPayloadEnvelope`]s received via p2p gossip. - blocks: mpsc::Sender, - /// The receiver for unsafe block signer updates. - signer: mpsc::Receiver
, - /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, } impl NetworkActor { @@ -57,16 +51,36 @@ impl NetworkActor { blocks: mpsc::Sender, signer: mpsc::Receiver
, cancellation: CancellationToken, - ) -> Self { - Self { driver, blocks, signer, cancellation } + ) -> (Self, NetworkContext) { + let actor = Self { driver }; + let context = NetworkContext { blocks, signer, cancellation }; + (actor, context) + } +} + +/// The communication context used by the network actor. +#[derive(Debug)] +pub struct NetworkContext { + blocks: mpsc::Sender, + signer: mpsc::Receiver
, + cancellation: CancellationToken, +} + +impl ActorContext for NetworkContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() } } #[async_trait] impl NodeActor for NetworkActor { type Error = NetworkActorError; + type Context = NetworkContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + NetworkContext { blocks, mut signer, cancellation }: Self::Context, + ) -> Result<(), Self::Error> { // Take the unsafe block receiver let mut unsafe_block_receiver = self.driver.unsafe_block_recv(); @@ -78,7 +92,7 @@ impl NodeActor for NetworkActor { loop { select! { - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { info!( target: "network", "Received shutdown signal. Exiting network task." @@ -88,7 +102,7 @@ impl NodeActor for NetworkActor { block = unsafe_block_receiver.recv() => { match block { Ok(block) => { - match self.blocks.send(block).await { + match blocks.send(block).await { Ok(_) => debug!(target: "network", "Forwarded unsafe block"), Err(_) => warn!(target: "network", "Failed to forward unsafe block"), } @@ -99,7 +113,7 @@ impl NodeActor for NetworkActor { } } } - signer = self.signer.recv() => { + signer = signer.recv() => { let Some(signer) = signer else { warn!( target: "network", diff --git a/crates/node/service/src/actors/rpc.rs b/crates/node/service/src/actors/rpc.rs index af218ff57e..adc08a3093 100644 --- a/crates/node/service/src/actors/rpc.rs +++ b/crates/node/service/src/actors/rpc.rs @@ -1,10 +1,10 @@ //! RPC Server Actor -use crate::NodeActor; +use crate::{NodeActor, actors::ActorContext}; use async_trait::async_trait; use jsonrpsee::core::RegisterMethodError; use kona_rpc::{RpcLauncher, RpcLauncherError}; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// An error returned by the [`RpcActor`]. #[derive(Debug, thiserror::Error)] @@ -28,22 +28,39 @@ pub enum RpcActorError { pub struct RpcActor { /// A launcher for the rpc. launcher: RpcLauncher, - /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, } impl RpcActor { /// Constructs a new [`RpcActor`] given the [`RpcLauncher`] and [`CancellationToken`]. - pub const fn new(launcher: RpcLauncher, cancellation: CancellationToken) -> Self { - Self { launcher, cancellation } + pub const fn new(launcher: RpcLauncher, cancellation: CancellationToken) -> (Self, RpcContext) { + let actor = Self { launcher }; + let context = RpcContext { cancellation }; + (actor, context) + } +} + +/// The communication context used by the RPC actor. +#[derive(Debug)] +pub struct RpcContext { + /// The cancellation token, shared between all tasks. + cancellation: CancellationToken, +} + +impl ActorContext for RpcContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() } } #[async_trait] impl NodeActor for RpcActor { type Error = RpcActorError; + type Context = RpcContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + RpcContext { cancellation }: Self::Context, + ) -> Result<(), Self::Error> { let restarts = self.launcher.restart_count(); let Some(mut handle) = self.launcher.clone().launch().await? else { @@ -62,12 +79,12 @@ impl NodeActor for RpcActor { } Err(err) => { error!(target: "rpc", ?err, "Failed to launch rpc server"); - self.cancellation.cancel(); + cancellation.cancel(); return Err(RpcActorError::ServerStopped); } } } - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { // The cancellation token has been triggered, so we should stop the server. handle.stop().map_err(|_| RpcActorError::StopFailed)?; // Since the RPC Server didn't originate the error, we should return Ok. @@ -77,7 +94,7 @@ impl NodeActor for RpcActor { } // Stop the node if there has already been 3 rpc restarts. - self.cancellation.cancel(); + cancellation.cancel(); return Err(RpcActorError::ServerStopped); } } diff --git a/crates/node/service/src/actors/runtime.rs b/crates/node/service/src/actors/runtime.rs index 4bc7b5d537..2fefdad5f0 100644 --- a/crates/node/service/src/actors/runtime.rs +++ b/crates/node/service/src/actors/runtime.rs @@ -4,9 +4,22 @@ use async_trait::async_trait; use kona_sources::{RuntimeConfig, RuntimeLoader, RuntimeLoaderError}; use std::time::Duration; use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; -use crate::NodeActor; +use crate::{NodeActor, actors::ActorContext}; + +/// The communication context used by the runtime actor. +#[derive(Debug)] +pub struct RuntimeContext { + runtime_config: mpsc::Sender, + cancellation: CancellationToken, +} + +impl ActorContext for RuntimeContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} /// The Runtime Actor. /// @@ -18,10 +31,6 @@ pub struct RuntimeActor { loader: RuntimeLoader, /// The interval at which to load the runtime. interval: Duration, - /// A channel to send the loaded runtime config to the engine actor. - runtime_config_tx: mpsc::Sender, - /// The cancellation token, shared between all tasks. - cancellation: CancellationToken, } impl RuntimeActor { @@ -29,10 +38,12 @@ impl RuntimeActor { pub const fn new( loader: RuntimeLoader, interval: Duration, - runtime_config_tx: mpsc::Sender, + runtime_config: mpsc::Sender, cancellation: CancellationToken, - ) -> Self { - Self { loader, interval, runtime_config_tx, cancellation } + ) -> (Self, RuntimeContext) { + let actor = Self { loader, interval }; + let context = RuntimeContext { runtime_config, cancellation }; + (actor, context) } } @@ -66,7 +77,7 @@ impl RuntimeLauncher { } /// Launches the [`RuntimeActor`]. - pub fn launch(self) -> Option { + pub fn launch(self) -> Option<(RuntimeActor, RuntimeContext)> { let cancellation = self.cancellation?; let tx = self.tx?; if self.interval.is_some() { @@ -79,19 +90,23 @@ impl RuntimeLauncher { #[async_trait] impl NodeActor for RuntimeActor { type Error = RuntimeLoaderError; + type Context = RuntimeContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + RuntimeContext { runtime_config, cancellation }: Self::Context, + ) -> Result<(), Self::Error> { let mut interval = tokio::time::interval(self.interval); loop { tokio::select! { - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { warn!(target: "runtime", "RuntimeActor received shutdown signal."); return Ok(()); } _ = interval.tick() => { let config = self.loader.load_latest().await?; debug!(target: "runtime", ?config, "Loaded latest runtime config"); - if let Err(e) = self.runtime_config_tx.send(config).await { + if let Err(e) = runtime_config.send(config).await { error!(target: "runtime", ?e, "Failed to send runtime config to the engine actor"); } } diff --git a/crates/node/service/src/actors/supervisor/actor.rs b/crates/node/service/src/actors/supervisor/actor.rs index 1f9bc7d9ef..f63c877fad 100644 --- a/crates/node/service/src/actors/supervisor/actor.rs +++ b/crates/node/service/src/actors/supervisor/actor.rs @@ -1,12 +1,12 @@ //! Contains an actor for the supervisor rpc api. -use crate::{NodeActor, SupervisorExt}; +use crate::{NodeActor, SupervisorExt, actors::ActorContext}; use async_trait::async_trait; use futures::StreamExt; use kona_interop::{ControlEvent, ManagedEvent}; use thiserror::Error; use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// The supervisor actor. /// @@ -17,28 +17,34 @@ use tokio_util::sync::CancellationToken; /// See: #[derive(Debug)] pub struct SupervisorActor { + /// The module to communicate with the supervisor. + supervisor_ext: E, +} + +/// The communication context used by the supervisor actor. +#[derive(Debug)] +pub struct SupervisorActorContext { /// A channel to receive `ManagedEvent`s from the kona node. node_events: mpsc::Receiver, /// A channel to communicate with the engine. engine_control: mpsc::Sender, - /// The module to communicate with the supervisor. - supervisor_ext: E, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, } +impl ActorContext for SupervisorActorContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} + impl SupervisorActor where E: SupervisorExt, { /// Creates a new instance of the supervisor actor. - pub const fn new( - node_events: mpsc::Receiver, - engine_control: mpsc::Sender, - supervisor_ext: E, - cancellation: CancellationToken, - ) -> Self { - Self { supervisor_ext, node_events, engine_control, cancellation } + pub const fn new(supervisor_ext: E) -> Self { + Self { supervisor_ext } } } @@ -48,16 +54,20 @@ where E: SupervisorExt + Send + Sync, { type Error = SupervisorActorError; + type Context = SupervisorActorContext; - async fn start(mut self) -> Result<(), Self::Error> { + async fn start( + mut self, + SupervisorActorContext { mut node_events, engine_control, cancellation }: Self::Context, + ) -> Result<(), Self::Error> { let mut control_events = Box::pin(self.supervisor_ext.subscribe_control_events()); loop { tokio::select! { - _ = self.cancellation.cancelled() => { + _ = cancellation.cancelled() => { warn!(target: "supervisor", "Supervisor actor cancelled"); return Ok(()); }, - Some(event) = self.node_events.recv() => { + Some(event) = node_events.recv() => { if let Err(err) = self.supervisor_ext.send_event(event).await { error!(target: "supervisor", ?err, "Failed to send event to supervisor"); } @@ -66,7 +76,7 @@ where Some(control_event) = control_events.next() => { // TODO: Handle the control event (e.g., restart, stop, etc.). debug!(target: "supervisor", "Received control event: {:?}", control_event); - self.engine_control + engine_control .send(control_event) .await .map_err(|_| SupervisorActorError::ControlEventSendFailed)?; diff --git a/crates/node/service/src/actors/supervisor/mod.rs b/crates/node/service/src/actors/supervisor/mod.rs index 6c66b7bb3c..7fea867ae3 100644 --- a/crates/node/service/src/actors/supervisor/mod.rs +++ b/crates/node/service/src/actors/supervisor/mod.rs @@ -4,7 +4,7 @@ mod traits; pub use traits::SupervisorExt; mod actor; -pub use actor::{SupervisorActor, SupervisorActorError}; +pub use actor::{SupervisorActor, SupervisorActorContext, SupervisorActorError}; mod ext; pub use ext::SupervisorRpcServerExt; diff --git a/crates/node/service/src/actors/traits.rs b/crates/node/service/src/actors/traits.rs index 7e6ba38e39..362721c009 100644 --- a/crates/node/service/src/actors/traits.rs +++ b/crates/node/service/src/actors/traits.rs @@ -1,6 +1,13 @@ //! [NodeActor] trait. use async_trait::async_trait; +use tokio_util::sync::WaitForCancellationFuture; + +/// The communication context used by the actor. +pub trait ActorContext: Send { + /// Returns a future that resolves when the actor is cancelled. + fn cancelled(&self) -> WaitForCancellationFuture<'_>; +} /// The [NodeActor] is an actor-like service for the node. /// @@ -12,7 +19,9 @@ use async_trait::async_trait; pub trait NodeActor { /// The error type for the actor. type Error: std::fmt::Debug; + /// The communication context used by the actor. + type Context: ActorContext; /// Starts the actor. - async fn start(self) -> Result<(), Self::Error>; + async fn start(self, context: Self::Context) -> Result<(), Self::Error>; } diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index 5343faf881..c08b3fdf29 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -14,10 +14,12 @@ pub use service::{NodeMode, RollupNode, RollupNodeBuilder, RollupNodeError, Roll mod actors; pub use actors::{ - DerivationActor, DerivationError, EngineActor, EngineError, EngineLauncher, - InboundDerivationMessage, InboundEngineMessage, L1WatcherRpc, L1WatcherRpcError, L2Finalizer, - NetworkActor, NetworkActorError, NodeActor, RpcActor, RpcActorError, RuntimeActor, - RuntimeLauncher, SupervisorActor, SupervisorActorError, SupervisorExt, SupervisorRpcServerExt, + ActorContext, DerivationActor, DerivationContext, DerivationError, EngineActor, EngineContext, + EngineError, EngineLauncher, InboundDerivationMessage, InboundEngineMessage, L1WatcherRpc, + L1WatcherRpcContext, L1WatcherRpcError, L2Finalizer, NetworkActor, NetworkActorError, + NetworkContext, NodeActor, RpcActor, RpcActorError, RpcContext, RuntimeActor, RuntimeContext, + RuntimeLauncher, SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt, + SupervisorRpcServerExt, }; mod metrics; diff --git a/crates/node/service/src/service/core.rs b/crates/node/service/src/service/core.rs index cad13e4ab5..77a1ed270d 100644 --- a/crates/node/service/src/service/core.rs +++ b/crates/node/service/src/service/core.rs @@ -73,8 +73,8 @@ pub trait RollupNodeService { finalized_updates: watch::Sender>, block_signer_tx: mpsc::Sender
, cancellation: CancellationToken, - l1_watcher_inbound_queries: Option>, - ) -> Self::DataAvailabilityWatcher; + l1_watcher_inbound_queries: mpsc::Receiver, + ) -> (Self::DataAvailabilityWatcher, ::Context); /// Creates a new instance of the [`Pipeline`] and initializes it. Returns the starting L2 /// forkchoice state and the initialized derivation pipeline. @@ -110,7 +110,7 @@ pub trait RollupNodeService { // Create a global cancellation token for graceful shutdown of tasks. let cancellation = CancellationToken::new(); - // Create channels for communication between actors. + // Create context for communication between actors. let (sync_complete_tx, sync_complete_rx) = oneshot::channel(); let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16); let (runtime_config_tx, runtime_config_rx) = mpsc::channel(16); @@ -130,7 +130,7 @@ pub trait RollupNodeService { finalized_updates_tx, block_signer_tx, cancellation.clone(), - Some(l1_watcher_queries_recv), + l1_watcher_queries_recv, ); // Create the derivation actor. diff --git a/crates/node/service/src/service/standard/node.rs b/crates/node/service/src/service/standard/node.rs index 7733f49f60..7707386046 100644 --- a/crates/node/service/src/service/standard/node.rs +++ b/crates/node/service/src/service/standard/node.rs @@ -1,8 +1,8 @@ //! Contains the [`RollupNode`] implementation. use crate::{ - EngineLauncher, L1WatcherRpc, NodeMode, RollupNodeBuilder, RollupNodeError, RollupNodeService, - RuntimeLauncher, SupervisorRpcServerExt, + EngineLauncher, L1WatcherRpc, NodeActor, NodeMode, RollupNodeBuilder, RollupNodeError, + RollupNodeService, RuntimeLauncher, SupervisorRpcServerExt, }; use alloy_primitives::Address; use alloy_provider::RootProvider; @@ -80,8 +80,9 @@ impl RollupNodeService for RollupNode { finalized_updates: watch::Sender>, block_signer_tx: mpsc::Sender
, cancellation: CancellationToken, - l1_watcher_inbound_queries: Option>, - ) -> Self::DataAvailabilityWatcher { + l1_watcher_inbound_queries: mpsc::Receiver, + ) -> (Self::DataAvailabilityWatcher, ::Context) + { L1WatcherRpc::new( self.config.clone(), self.l1_provider.clone(), diff --git a/crates/node/service/src/service/util.rs b/crates/node/service/src/service/util.rs index 40c650be84..d4b7c15ce2 100644 --- a/crates/node/service/src/service/util.rs +++ b/crates/node/service/src/service/util.rs @@ -14,9 +14,9 @@ macro_rules! spawn_and_wait { // Check if the actor is present, and spawn it if it is. $( - if let Some(actor) = $actor { + if let Some((actor, context)) = $actor { task_handles.spawn(async move { - if let Err(e) = actor.start().await { + if let Err(e) = actor.start(context).await { return Err(format!("{e:?}")); } Ok(())