Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ where
state: B,
/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: mpsc::Sender<OpAttributesWithParent>,
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
/// them to the engine.
reset_request_tx: mpsc::Sender<()>,
}

/// The state for the derivation actor.
Expand Down Expand Up @@ -124,14 +121,14 @@ impl PipelineBuilder for DerivationBuilder {
pub struct DerivationOutboundChannels {
/// The receiver for derived [`OpAttributesWithParent`]s produced by the actor.
pub attributes_out: mpsc::Receiver<OpAttributesWithParent>,
/// The receiver for reset requests, used to handle [`PipelineErrorKind::Reset`] events and
/// forward them to the engine.
pub reset_request_tx: mpsc::Receiver<()>,
}

/// The communication context used by the derivation actor.
#[derive(Debug)]
pub struct DerivationContext {
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
/// them to the engine.
pub reset_request_tx: mpsc::Sender<()>,
/// The receiver for L1 head update notifications.
pub l1_head_updates: watch::Receiver<Option<BlockInfo>>,
/// The receiver for L2 safe head update notifications.
Expand Down Expand Up @@ -382,16 +379,9 @@ where
/// Creates a new instance of the [DerivationActor].
pub fn new(state: B) -> (DerivationOutboundChannels, Self) {
let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16);
let (reset_request_tx, reset_request_rx) = mpsc::channel(16);
let actor = Self { state, attributes_out: derived_payload_tx, reset_request_tx };

(
DerivationOutboundChannels {
attributes_out: derived_payload_rx,
reset_request_tx: reset_request_rx,
},
actor,
)
let actor = Self { state, attributes_out: derived_payload_tx };

(DerivationOutboundChannels { attributes_out: derived_payload_rx }, actor)
}
}

Expand All @@ -416,6 +406,7 @@ where
mut engine_l2_safe_head,
mut el_sync_complete_rx,
mut derivation_signal_rx,
reset_request_tx,
cancellation,
}: Self::InboundData,
) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -455,15 +446,15 @@ where
return Ok(());
}

state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?;
state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &reset_request_tx).await?;
}
_ = engine_l2_safe_head.changed() => {
state.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?;
state.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &reset_request_tx).await?;
}
_ = &mut el_sync_complete_rx, if !el_sync_complete_rx.is_terminated() => {
info!(target: "derivation", "Engine finished syncing, starting derivation.");
// Optimistically process the first message.
state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?;
state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &reset_request_tx).await?;
}
}
}
Expand Down
21 changes: 14 additions & 7 deletions crates/node/service/src/actors/engine/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use crate::{NodeActor, actors::CancellableContext};
pub struct EngineActor {
/// The [`EngineActorState`] used to build the actor.
builder: EngineBuilder,
/// The receiver for L2 safe head update notifications.
/// The receiver for reset requests.
reset_request_rx: mpsc::Receiver<()>,
/// The sender for L2 safe head update notifications.
engine_l2_safe_head_tx: watch::Sender<L2BlockInfo>,
/// A channel to send a signal that EL sync has completed. Informs the derivation actor to
/// start. Because the EL sync state machine within [`InnerEngineState`] can only complete
Expand All @@ -44,6 +46,8 @@ pub struct EngineActor {
/// The outbound data for the [`EngineActor`].
#[derive(Debug)]
pub struct EngineOutboundData {
/// A channel to request the engine actor to reset.
pub reset_request_tx: mpsc::Sender<()>,
/// A channel to receive L2 safe head update notifications.
pub engine_l2_safe_head_rx: watch::Receiver<L2BlockInfo>,
/// A channel to receive a signal that EL sync has completed.
Expand Down Expand Up @@ -114,8 +118,6 @@ pub struct EngineContext {
pub attributes_rx: mpsc::Receiver<OpAttributesWithParent>,
/// A channel to receive [`OpExecutionPayloadEnvelope`] from the network actor.
pub unsafe_block_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
/// A channel to receive reset requests.
pub reset_request_rx: mpsc::Receiver<()>,
/// Handler for inbound queries to the engine.
pub inbound_queries: mpsc::Receiver<EngineQueries>,
/// The cancellation token, shared between all tasks.
Expand All @@ -133,20 +135,26 @@ impl CancellableContext for EngineContext {
impl EngineActor {
/// Constructs a new [`EngineActor`] from the params.
pub fn new(config: EngineBuilder) -> (EngineOutboundData, Self) {
let (reset_request_tx, reset_request_rx) = mpsc::channel(1);
let (derivation_signal_tx, derivation_signal_rx) = mpsc::channel(16);
let (engine_l2_safe_head_tx, engine_l2_safe_head_rx) =
watch::channel(L2BlockInfo::default());
let (sync_complete_tx, sync_complete_rx) = oneshot::channel();

let actor = Self {
builder: config,
reset_request_rx,
engine_l2_safe_head_tx,
sync_complete_tx,
derivation_signal_tx,
};

let outbound_data =
EngineOutboundData { engine_l2_safe_head_rx, sync_complete_rx, derivation_signal_rx };
let outbound_data = EngineOutboundData {
reset_request_tx,
engine_l2_safe_head_rx,
sync_complete_rx,
derivation_signal_rx,
};

(outbound_data, actor)
}
Expand Down Expand Up @@ -339,7 +347,6 @@ impl NodeActor for EngineActor {
mut runtime_config_rx,
mut attributes_rx,
mut unsafe_block_rx,
mut reset_request_rx,
cancellation,
inbound_queries,
}: Self::InboundData,
Expand Down Expand Up @@ -374,7 +381,7 @@ impl NodeActor for EngineActor {

return Ok(());
}
reset = reset_request_rx.recv() => {
reset = self.reset_request_rx.recv() => {
if reset.is_none() {
error!(target: "engine", "Reset request receiver closed unexpectedly");
cancellation.cancel();
Expand Down
11 changes: 8 additions & 3 deletions crates/node/service/src/service/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub trait RollupNodeService {

// Create the derivation actor.
let derivation_builder = self.derivation_builder();
let (DerivationOutboundChannels { attributes_out, reset_request_tx }, derivation) =
let (DerivationOutboundChannels { attributes_out }, derivation) =
Self::DerivationActor::build(derivation_builder);

// TODO: get the supervisor ext.
Expand All @@ -168,7 +168,12 @@ pub trait RollupNodeService {
// Create the engine actor.
let engine_builder = self.engine_builder();
let (
EngineOutboundData { engine_l2_safe_head_rx, sync_complete_rx, derivation_signal_rx },
EngineOutboundData {
reset_request_tx,
engine_l2_safe_head_rx,
sync_complete_rx,
derivation_signal_rx,
},
engine,
) = Self::EngineActor::build(engine_builder);

Expand Down Expand Up @@ -209,6 +214,7 @@ pub trait RollupNodeService {
};

let derivation_context = DerivationContext {
reset_request_tx,
l1_head_updates: latest_head,
engine_l2_safe_head: engine_l2_safe_head_rx.clone(),
el_sync_complete_rx: sync_complete_rx,
Expand All @@ -220,7 +226,6 @@ pub trait RollupNodeService {
runtime_config_rx: runtime_config,
attributes_rx: attributes_out,
unsafe_block_rx: unsafe_block,
reset_request_rx: reset_request_tx,
inbound_queries: engine_query_recv,
cancellation: cancellation.clone(),
finalizer: L2Finalizer::new(latest_finalized),
Expand Down