diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index 1b1f5b801c..8cd72b6e40 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -37,9 +37,6 @@ where state: B, /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. attributes_out: mpsc::Sender, - /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward - /// them to the engine. - reset_request_tx: mpsc::Sender<()>, } /// The state for the derivation actor. @@ -124,14 +121,14 @@ impl PipelineBuilder for DerivationBuilder { pub struct DerivationOutboundChannels { /// The receiver for derived [`OpAttributesWithParent`]s produced by the actor. pub attributes_out: mpsc::Receiver, - /// The receiver for reset requests, used to handle [`PipelineErrorKind::Reset`] events and - /// forward them to the engine. - pub reset_request_tx: mpsc::Receiver<()>, } /// The communication context used by the derivation actor. #[derive(Debug)] pub struct DerivationContext { + /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward + /// them to the engine. + pub reset_request_tx: mpsc::Sender<()>, /// The receiver for L1 head update notifications. pub l1_head_updates: watch::Receiver>, /// The receiver for L2 safe head update notifications. @@ -382,16 +379,9 @@ where /// Creates a new instance of the [DerivationActor]. pub fn new(state: B) -> (DerivationOutboundChannels, Self) { let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16); - let (reset_request_tx, reset_request_rx) = mpsc::channel(16); - let actor = Self { state, attributes_out: derived_payload_tx, reset_request_tx }; - - ( - DerivationOutboundChannels { - attributes_out: derived_payload_rx, - reset_request_tx: reset_request_rx, - }, - actor, - ) + let actor = Self { state, attributes_out: derived_payload_tx }; + + (DerivationOutboundChannels { attributes_out: derived_payload_rx }, actor) } } @@ -416,6 +406,7 @@ where mut engine_l2_safe_head, mut el_sync_complete_rx, mut derivation_signal_rx, + reset_request_tx, cancellation, }: Self::InboundData, ) -> Result<(), Self::Error> { @@ -455,15 +446,15 @@ where return Ok(()); } - state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?; + state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &reset_request_tx).await?; } _ = engine_l2_safe_head.changed() => { - state.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?; + state.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &reset_request_tx).await?; } _ = &mut el_sync_complete_rx, if !el_sync_complete_rx.is_terminated() => { info!(target: "derivation", "Engine finished syncing, starting derivation."); // Optimistically process the first message. - state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?; + state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &reset_request_tx).await?; } } } diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 5161f8ce75..7baf9d73e7 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -30,7 +30,9 @@ use crate::{NodeActor, actors::CancellableContext}; pub struct EngineActor { /// The [`EngineActorState`] used to build the actor. builder: EngineBuilder, - /// The receiver for L2 safe head update notifications. + /// The receiver for reset requests. + reset_request_rx: mpsc::Receiver<()>, + /// The sender for L2 safe head update notifications. engine_l2_safe_head_tx: watch::Sender, /// A channel to send a signal that EL sync has completed. Informs the derivation actor to /// start. Because the EL sync state machine within [`InnerEngineState`] can only complete @@ -44,6 +46,8 @@ pub struct EngineActor { /// The outbound data for the [`EngineActor`]. #[derive(Debug)] pub struct EngineOutboundData { + /// A channel to request the engine actor to reset. + pub reset_request_tx: mpsc::Sender<()>, /// A channel to receive L2 safe head update notifications. pub engine_l2_safe_head_rx: watch::Receiver, /// A channel to receive a signal that EL sync has completed. @@ -114,8 +118,6 @@ pub struct EngineContext { pub attributes_rx: mpsc::Receiver, /// A channel to receive [`OpExecutionPayloadEnvelope`] from the network actor. pub unsafe_block_rx: mpsc::Receiver, - /// A channel to receive reset requests. - pub reset_request_rx: mpsc::Receiver<()>, /// Handler for inbound queries to the engine. pub inbound_queries: mpsc::Receiver, /// The cancellation token, shared between all tasks. @@ -133,6 +135,7 @@ impl CancellableContext for EngineContext { impl EngineActor { /// Constructs a new [`EngineActor`] from the params. pub fn new(config: EngineBuilder) -> (EngineOutboundData, Self) { + let (reset_request_tx, reset_request_rx) = mpsc::channel(1); let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16); let (engine_l2_safe_head_tx, engine_l2_safe_head_rx) = watch::channel(L2BlockInfo::default()); @@ -140,13 +143,18 @@ impl EngineActor { let actor = Self { builder: config, + reset_request_rx, engine_l2_safe_head_tx, sync_complete_tx, derivation_signal_tx, }; - let outbound_data = - EngineOutboundData { engine_l2_safe_head_rx, sync_complete_rx, derivation_signal_rx }; + let outbound_data = EngineOutboundData { + reset_request_tx, + engine_l2_safe_head_rx, + sync_complete_rx, + derivation_signal_rx, + }; (outbound_data, actor) } @@ -339,7 +347,6 @@ impl NodeActor for EngineActor { mut runtime_config_rx, mut attributes_rx, mut unsafe_block_rx, - mut reset_request_rx, cancellation, inbound_queries, }: Self::InboundData, @@ -374,7 +381,7 @@ impl NodeActor for EngineActor { return Ok(()); } - reset = reset_request_rx.recv() => { + reset = self.reset_request_rx.recv() => { if reset.is_none() { error!(target: "engine", "Reset request receiver closed unexpectedly"); cancellation.cancel(); diff --git a/crates/node/service/src/service/core.rs b/crates/node/service/src/service/core.rs index cf6a639400..61c1b8821d 100644 --- a/crates/node/service/src/service/core.rs +++ b/crates/node/service/src/service/core.rs @@ -145,7 +145,7 @@ pub trait RollupNodeService { // Create the derivation actor. let derivation_builder = self.derivation_builder(); - let (DerivationOutboundChannels { attributes_out, reset_request_tx }, derivation) = + let (DerivationOutboundChannels { attributes_out }, derivation) = Self::DerivationActor::build(derivation_builder); // TODO: get the supervisor ext. @@ -168,7 +168,12 @@ pub trait RollupNodeService { // Create the engine actor. let engine_builder = self.engine_builder(); let ( - EngineOutboundData { engine_l2_safe_head_rx, sync_complete_rx, derivation_signal_rx }, + EngineOutboundData { + reset_request_tx, + engine_l2_safe_head_rx, + sync_complete_rx, + derivation_signal_rx, + }, engine, ) = Self::EngineActor::build(engine_builder); @@ -209,6 +214,7 @@ pub trait RollupNodeService { }; let derivation_context = DerivationContext { + reset_request_tx, l1_head_updates: latest_head, engine_l2_safe_head: engine_l2_safe_head_rx.clone(), el_sync_complete_rx: sync_complete_rx, @@ -220,7 +226,6 @@ pub trait RollupNodeService { runtime_config_rx: runtime_config, attributes_rx: attributes_out, unsafe_block_rx: unsafe_block, - reset_request_rx: reset_request_tx, inbound_queries: engine_query_recv, cancellation: cancellation.clone(), finalizer: L2Finalizer::new(latest_finalized),