Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/node/rpc/src/rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@
pub const RPC_IDENT: &'static str = "rollup_rpc";

/// Constructs a new [`RollupRpc`] given a sender channel.
pub const fn new(sender: EngineQuerySender, l1_watcher_sender: L1WatcherQuerySender) -> Self {
Self { engine_sender: sender, l1_watcher_sender }
pub const fn new(
engine_sender: EngineQuerySender,
l1_watcher_sender: L1WatcherQuerySender,
) -> Self {
Self { engine_sender, l1_watcher_sender }

Check warning on line 40 in crates/node/rpc/src/rollup.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/rpc/src/rollup.rs#L36-L40

Added lines #L36 - L40 were not covered by tests
}

// Important note: we zero-out the fields that can't be derived yet to follow op-node's
Expand Down
124 changes: 75 additions & 49 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [NodeActor] implementation for the derivation sub-routine.

use crate::{Metrics, NodeActor, actors::ActorContext};
use crate::{Metrics, NodeActor, actors::CancellableContext};
use async_trait::async_trait;
use kona_derive::{
ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal,
Expand All @@ -21,28 +21,53 @@ use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
/// to the [NodeActor] responsible for the execution sub-routine.
#[derive(Debug)]
pub struct DerivationActor<P>
where
P: Pipeline + SignalReceiver,
{
/// The state for the derivation actor.
state: DerivationState<P>,
/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: mpsc::Sender<OpAttributesWithParent>,
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
/// them to the engine.
reset_request_tx: mpsc::Sender<()>,
}

/// The state for the derivation actor.
#[derive(Debug)]
pub struct DerivationState<P>
where
P: Pipeline + SignalReceiver,
{
/// The derivation pipeline.
pipeline: P,
pub pipeline: P,
/// A flag indicating whether or not derivation is idle. Derivation is considered idle when it
/// has yielded to wait for more data on the DAL.
derivation_idle: bool,
pub derivation_idle: bool,
/// A flag indicating whether or not derivation is waiting for a signal. When waiting for a
/// signal, derivation cannot process any incoming events.
waiting_for_signal: bool,
pub waiting_for_signal: bool,
}

/// The outbound channels for the derivation actor.
#[derive(Debug)]
pub struct DerivationOutboundChannels {
/// The receiver for derived [`OpAttributesWithParent`]s produced by the actor.
pub attributes_out: mpsc::Receiver<OpAttributesWithParent>,
/// The receiver for reset requests, used to handle [`PipelineErrorKind::Reset`] events and
/// forward them to the engine.
pub reset_request_tx: mpsc::Receiver<()>,
}

/// The communication context used by the derivation actor.
#[derive(Debug)]
pub struct DerivationContext {
/// The receiver for L1 head update notifications.
l1_head_updates: watch::Receiver<Option<BlockInfo>>,
pub l1_head_updates: watch::Receiver<Option<BlockInfo>>,
/// The receiver for L2 safe head update notifications.
engine_l2_safe_head: watch::Receiver<L2BlockInfo>,
pub engine_l2_safe_head: watch::Receiver<L2BlockInfo>,
/// A receiver that tells derivation to begin. Completing EL sync consumes the instance.
el_sync_complete_rx: oneshot::Receiver<()>,
pub el_sync_complete_rx: oneshot::Receiver<()>,
/// A receiver that sends a [`Signal`] to the derivation pipeline.
///
/// The derivation actor steps over the derivation pipeline to generate
Expand All @@ -61,49 +86,24 @@ pub struct DerivationContext {
/// occurs.
///
/// Specs: <https://specs.optimism.io/protocol/derivation.html#l1-sync-payload-attributes-processing>
derivation_signal_rx: mpsc::Receiver<Signal>,
/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: mpsc::Sender<OpAttributesWithParent>,
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
/// them to the engine.
reset_request_tx: mpsc::Sender<()>,
pub derivation_signal_rx: mpsc::Receiver<Signal>,
/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
pub cancellation: CancellationToken,
}

impl ActorContext for DerivationContext {
impl CancellableContext for DerivationContext {
fn cancelled(&self) -> WaitForCancellationFuture<'_> {
self.cancellation.cancelled()
}
}

impl<P> DerivationActor<P>
impl<P> DerivationState<P>
where
P: Pipeline + SignalReceiver,
{
/// Creates a new instance of the [DerivationActor].
#[allow(clippy::too_many_arguments)]
pub const fn new(
pipeline: P,
engine_l2_safe_head: watch::Receiver<L2BlockInfo>,
el_sync_complete_rx: oneshot::Receiver<()>,
derivation_signal_rx: mpsc::Receiver<Signal>,
l1_head_updates: watch::Receiver<Option<BlockInfo>>,
attributes_out: mpsc::Sender<OpAttributesWithParent>,
reset_request_tx: mpsc::Sender<()>,
cancellation: CancellationToken,
) -> (Self, DerivationContext) {
let actor = Self { pipeline, derivation_idle: true, waiting_for_signal: false };
let context = DerivationContext {
l1_head_updates,
engine_l2_safe_head,
el_sync_complete_rx,
derivation_signal_rx,
attributes_out,
reset_request_tx,
cancellation,
};
(actor, context)
/// Creates a new instance of the [DerivationState].
pub const fn new(pipeline: P) -> Self {
Self { pipeline, derivation_idle: true, waiting_for_signal: false }
}

/// Handles a [`Signal`] received over the derivation signal receiver channel.
Expand Down Expand Up @@ -169,6 +169,7 @@ where
.pipeline
.origin()
.ok_or(PipelineError::MissingOrigin.crit())?;

self.pipeline
.signal(
ActivationSignal {
Expand Down Expand Up @@ -304,13 +305,40 @@ where
}
}

impl<P> DerivationActor<P>
where
P: Pipeline + SignalReceiver,
{
/// Creates a new instance of the [DerivationActor].
#[allow(clippy::too_many_arguments)]
pub fn new(state: DerivationState<P>) -> (DerivationOutboundChannels, Self) {
let (derived_payload_tx, derived_payload_rx) = mpsc::channel(16);
let (reset_request_tx, reset_request_rx) = mpsc::channel(16);
let actor = Self { state, attributes_out: derived_payload_tx, reset_request_tx };

(
DerivationOutboundChannels {
attributes_out: derived_payload_rx,
reset_request_tx: reset_request_rx,
},
actor,
)
}
}

#[async_trait]
impl<P> NodeActor for DerivationActor<P>
where
P: Pipeline + SignalReceiver + Send + Sync,
P: Pipeline + SignalReceiver + Send + Sync + 'static,
{
type Error = DerivationError;
type Context = DerivationContext;
type InboundData = DerivationContext;
type State = DerivationState<P>;
type OutboundData = DerivationOutboundChannels;

fn build(config: Self::State) -> (Self::OutboundData, Self) {
Self::new(config)
}

async fn start(
mut self,
Expand All @@ -319,10 +347,8 @@ where
mut engine_l2_safe_head,
mut el_sync_complete_rx,
mut derivation_signal_rx,
attributes_out,
reset_request_tx,
cancellation,
}: Self::Context,
}: Self::InboundData,
) -> Result<(), Self::Error> {
loop {
select! {
Expand All @@ -345,8 +371,8 @@ where
return Err(DerivationError::SignalReceiveFailed);
};

self.signal(signal).await;
self.waiting_for_signal = false;
self.state.signal(signal).await;
self.state.waiting_for_signal = false;
}
msg = l1_head_updates.changed() => {
if let Err(err) = msg {
Expand All @@ -358,15 +384,15 @@ where
return Ok(());
}

self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?;
self.state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?;
}
_ = engine_l2_safe_head.changed() => {
self.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?;
self.state.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?;
}
_ = &mut el_sync_complete_rx, if !el_sync_complete_rx.is_terminated() => {
info!(target: "derivation", "Engine finished syncing, starting derivation.");
// Optimistically process the first message.
self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?;
self.state.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &self.attributes_out, &self.reset_request_tx).await?;
Comment thread
theochap marked this conversation as resolved.
}
}
}
Expand Down
Loading