Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 80 additions & 50 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [NodeActor] implementation for the derivation sub-routine.

use crate::{Metrics, NodeActor};
use crate::{Metrics, NodeActor, actors::ActorContext};
use async_trait::async_trait;
use kona_derive::{
ActivationSignal, Pipeline, PipelineError, PipelineErrorKind, ResetError, ResetSignal, Signal,
Expand All @@ -12,7 +12,7 @@ use tokio::{
select,
sync::{mpsc, oneshot, watch},
};
use tokio_util::sync::CancellationToken;
use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};

/// The [NodeActor] for the derivation sub-routine.
///
Expand All @@ -26,8 +26,20 @@ where
{
/// The derivation pipeline.
pipeline: P,
/// A flag indicating whether or not derivation is idle. Derivation is considered idle when it
/// has yielded to wait for more data on the DAL.
derivation_idle: bool,
/// A flag indicating whether or not derivation is waiting for a signal. When waiting for a
/// signal, derivation cannot process any incoming events.
waiting_for_signal: bool,
}

/// The l2 safe head from the engine.
/// The communication context used by the derivation actor.
#[derive(Debug)]
pub struct DerivationContext {
/// The receiver for L1 head update notifications.
l1_head_updates: watch::Receiver<Option<BlockInfo>>,
/// The receiver for L2 safe head update notifications.
engine_l2_safe_head: watch::Receiver<L2BlockInfo>,
/// A receiver that tells derivation to begin. Completing EL sync consumes the instance.
el_sync_complete_rx: oneshot::Receiver<()>,
Expand All @@ -50,26 +62,21 @@ where
///
/// Specs: <https://specs.optimism.io/protocol/derivation.html#l1-sync-payload-attributes-processing>
derivation_signal_rx: mpsc::Receiver<Signal>,
/// The receiver for L1 head update notifications.
l1_head_updates: watch::Receiver<Option<BlockInfo>>,

/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: mpsc::Sender<OpAttributesWithParent>,
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
/// them to the engine.
reset_request_tx: mpsc::Sender<()>,

/// A flag indicating whether or not derivation is idle. Derivation is considered idle when it
/// has yielded to wait for more data on the DAL.
derivation_idle: bool,
/// A flag indicating whether or not derivation is waiting for a signal. When waiting for a
/// signal, derivation cannot process any incoming events.
waiting_for_signal: bool,

/// The cancellation token, shared between all tasks.
cancellation: CancellationToken,
}

impl ActorContext for DerivationContext {
fn cancelled(&self) -> WaitForCancellationFuture<'_> {
self.cancellation.cancelled()
}
}

impl<P> DerivationActor<P>
where
P: Pipeline + SignalReceiver,
Expand All @@ -85,19 +92,18 @@ where
attributes_out: mpsc::Sender<OpAttributesWithParent>,
reset_request_tx: mpsc::Sender<()>,
cancellation: CancellationToken,
) -> Self {
Self {
pipeline,
) -> (Self, DerivationContext) {
let actor = Self { pipeline, derivation_idle: true, waiting_for_signal: false };
let context = DerivationContext {
l1_head_updates,
engine_l2_safe_head,
el_sync_complete_rx,
derivation_signal_rx,
l1_head_updates,
attributes_out,
reset_request_tx,
derivation_idle: true,
waiting_for_signal: false,
cancellation,
}
};
(actor, context)
}

/// Handles a [`Signal`] received over the derivation signal receiver channel.
Expand All @@ -116,12 +122,16 @@ where

/// Attempts to step the derivation pipeline forward as much as possible in order to produce the
/// next safe payload.
async fn produce_next_attributes(&mut self) -> Result<OpAttributesWithParent, DerivationError> {
async fn produce_next_attributes(
&mut self,
engine_l2_safe_head: &watch::Receiver<L2BlockInfo>,
reset_request_tx: &mpsc::Sender<()>,
) -> Result<OpAttributesWithParent, DerivationError> {
// As we start the safe head at the disputed block's parent, we step the pipeline until the
// first attributes are produced. All batches at and before the safe head will be
// dropped, so the first payload will always be the disputed one.
loop {
let l2_safe_head = *self.engine_l2_safe_head.borrow();
let l2_safe_head = *engine_l2_safe_head.borrow();
match self.pipeline.step(l2_safe_head).await {
StepResult::PreparedAttributes => { /* continue; attributes will be sent off. */ }
StepResult::AdvancedOrigin => {
Expand Down Expand Up @@ -185,7 +195,7 @@ where
.rollup_config()
.is_interop_active(l2_safe_head.block_info.timestamp)
{
self.reset_request_tx.send(()).await.map_err(|e| {
reset_request_tx.send(()).await.map_err(|e| {
error!(target: "derivation", ?e, "Failed to send reset request");
DerivationError::Sender(Box::new(e))
})?;
Expand Down Expand Up @@ -222,9 +232,16 @@ where
/// attributes are successfully produced. If the pipeline step errors,
/// the same [`L2BlockInfo`] is used again. If the [`L2BlockInfo`] is the
/// zero hash, the pipeline is not stepped on.
async fn process(&mut self, msg: InboundDerivationMessage) -> Result<(), DerivationError> {
async fn process(
&mut self,
msg: InboundDerivationMessage,
engine_l2_safe_head: &mut watch::Receiver<L2BlockInfo>,
el_sync_complete_rx: &oneshot::Receiver<()>,
attributes_out: &mpsc::Sender<OpAttributesWithParent>,
reset_request_tx: &mpsc::Sender<()>,
) -> Result<(), DerivationError> {
// Only attempt derivation once the engine finishes syncing.
if !self.el_sync_complete_rx.is_terminated() {
if !el_sync_complete_rx.is_terminated() {
trace!(target: "derivation", "Engine not ready, skipping derivation");
return Ok(());
} else if self.waiting_for_signal {
Expand All @@ -236,7 +253,7 @@ where
// check if the safe head has changed before continuing. This is to prevent attempts to
// progress the pipeline while it is in the middle of processing a channel.
if !(self.derivation_idle || msg == InboundDerivationMessage::SafeHeadUpdated) {
match self.engine_l2_safe_head.has_changed() {
match engine_l2_safe_head.has_changed() {
Ok(true) => { /* Proceed to produce next payload attributes. */ }
Ok(false) => {
trace!(target: "derivation", "Safe head hasn't changed, skipping derivation.");
Expand All @@ -250,34 +267,35 @@ where
}

// Wait for the engine to initialize unknowns prior to kicking off derivation.
let engine_safe_head = *self.engine_l2_safe_head.borrow();
let engine_safe_head = *engine_l2_safe_head.borrow();
if engine_safe_head.block_info.hash.is_zero() {
warn!(target: "derivation", engine_safe_head = ?engine_safe_head.block_info.number, "Waiting for engine to initialize state prior to derivation.");
return Ok(());
}

// Advance the pipeline as much as possible, new data may be available or there still may be
// payloads in the attributes queue.
let payload_attrs = match self.produce_next_attributes().await {
Ok(attrs) => attrs,
Err(DerivationError::Yield) => {
// Yield until more data is available.
self.derivation_idle = true;
return Ok(());
}
Err(e) => {
return Err(e);
}
};
let payload_attrs =
match self.produce_next_attributes(engine_l2_safe_head, reset_request_tx).await {
Ok(attrs) => attrs,
Err(DerivationError::Yield) => {
// Yield until more data is available.
self.derivation_idle = true;
return Ok(());
}
Err(e) => {
return Err(e);
}
};

// Mark derivation as busy.
self.derivation_idle = false;

// Mark the L2 safe head as seen.
self.engine_l2_safe_head.borrow_and_update();
engine_l2_safe_head.borrow_and_update();

// Send payload attributes out for processing.
self.attributes_out
attributes_out
.send(payload_attrs)
.await
.map_err(|e| DerivationError::Sender(Box::new(e)))?;
Expand All @@ -292,20 +310,32 @@ where
P: Pipeline + SignalReceiver + Send + Sync,
{
type Error = DerivationError;
type Context = DerivationContext;

async fn start(mut self) -> Result<(), Self::Error> {
async fn start(
mut self,
DerivationContext {
mut l1_head_updates,
mut engine_l2_safe_head,
mut el_sync_complete_rx,
mut derivation_signal_rx,
attributes_out,
reset_request_tx,
cancellation,
}: Self::Context,
) -> Result<(), Self::Error> {
loop {
select! {
biased;

_ = self.cancellation.cancelled() => {
_ = cancellation.cancelled() => {
info!(
target: "derivation",
"Received shutdown signal. Exiting derivation task."
);
return Ok(());
}
signal = self.derivation_signal_rx.recv() => {
signal = derivation_signal_rx.recv() => {
let Some(signal) = signal else {
error!(
target: "derivation",
Expand All @@ -318,7 +348,7 @@ where
self.signal(signal).await;
self.waiting_for_signal = false;
}
msg = self.l1_head_updates.changed() => {
msg = l1_head_updates.changed() => {
if let Err(err) = msg {
error!(
target: "derivation",
Expand All @@ -328,15 +358,15 @@ where
return Ok(());
}

self.process(InboundDerivationMessage::NewDataAvailable).await?;
self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?;
}
_ = self.engine_l2_safe_head.changed() => {
self.process(InboundDerivationMessage::SafeHeadUpdated).await?;
_ = engine_l2_safe_head.changed() => {
self.process(InboundDerivationMessage::SafeHeadUpdated, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?;
Comment thread
theochap marked this conversation as resolved.
}
_ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => {
_ = &mut el_sync_complete_rx, if !el_sync_complete_rx.is_terminated() => {
info!(target: "derivation", "Engine finished syncing, starting derivation.");
// Optimistically process the first message.
self.process(InboundDerivationMessage::NewDataAvailable).await?;
self.process(InboundDerivationMessage::NewDataAvailable, &mut engine_l2_safe_head, &el_sync_complete_rx, &attributes_out, &reset_request_tx).await?;
}
}
}
Expand Down
Loading