Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ pub use params::{
MAX_RLP_BYTES_PER_CHANNEL, MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS,
};

pub mod builder;
pub use builder::DerivationPipeline;

pub mod pipeline;
pub mod sources;
pub mod stages;
pub mod traits;
Expand Down
67 changes: 67 additions & 0 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Contains the `PipelineBuilder` object that is used to build a `DerivationPipeline`.

use super::{DerivationPipeline, NextAttributes, OriginAdvancer, ResetProvider, ResettableStage};
use alloc::collections::VecDeque;
use core::fmt::Debug;
use kona_primitives::L2BlockInfo;

/// The PipelineBuilder constructs a [DerivationPipeline].
#[derive(Debug)]
pub struct PipelineBuilder<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
attributes: Option<S>,
reset: Option<R>,
start_cursor: Option<L2BlockInfo>,
}

impl<S, R> PipelineBuilder<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
/// Sets the attributes for the pipeline.
pub fn attributes(mut self, attributes: S) -> Self {
self.attributes = Some(attributes);
self
}

/// Sets the reset provider for the pipeline.
pub fn reset(mut self, reset: R) -> Self {
self.reset = Some(reset);
self
}

/// Sets the start cursor for the pipeline.
pub fn start_cursor(mut self, cursor: L2BlockInfo) -> Self {
self.start_cursor = Some(cursor);
self
}

/// Builds the pipeline.
pub fn build(self) -> DerivationPipeline<S, R> {
self.into()
}
}

impl<S, R> From<PipelineBuilder<S, R>> for DerivationPipeline<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
fn from(builder: PipelineBuilder<S, R>) -> Self {
let attributes = builder.attributes.expect("attributes must be set");
let reset = builder.reset.expect("reset must be set");
let start_cursor = builder.start_cursor.expect("start_cursor must be set");

DerivationPipeline {
attributes,
reset,
prepared: VecDeque::new(),
needs_reset: false,
cursor: start_cursor,
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Contains a concrete implementation of the [DerivationPipeline].
//! Contains the core derivation pipeline.

use crate::{
stages::NextAttributes,
traits::{OriginAdvancer, ResetProvider, ResettableStage},
types::{StageError, StageResult},
use super::{
NextAttributes, OriginAdvancer, Pipeline, ResetProvider, ResettableStage, StageError,
StageResult,
};
use alloc::collections::VecDeque;
use alloc::{boxed::Box, collections::VecDeque};
use async_trait::async_trait;
use core::fmt::Debug;
use kona_primitives::{BlockInfo, L2AttributesWithParent, L2BlockInfo, SystemConfig};

Expand All @@ -27,46 +27,25 @@ pub struct DerivationPipeline<
pub cursor: L2BlockInfo,
}

impl<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> DerivationPipeline<S, R>
#[async_trait]
impl<S, R> Pipeline for DerivationPipeline<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
fn reset(&mut self) {
self.needs_reset = true;
}

/// Returns the next [L2AttributesWithParent] from the pipeline.
pub fn next_attributes(&mut self) -> Option<L2AttributesWithParent> {
/// Pops the next prepared [L2AttributesWithParent] from the pipeline.
fn pop(&mut self) -> Option<L2AttributesWithParent> {
self.prepared.pop_front()
}

/// Flags the pipeline to reset on the next [DerivationPipeline::step] call.
pub fn reset(&mut self) {
self.needs_reset = true;
}

/// Resets the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
/// Updates the L2 Safe Head cursor of the pipeline.
/// The cursor is used to fetch the next attributes.
fn update_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Attempts to progress the pipeline.
Expand All @@ -75,14 +54,14 @@ impl<
/// An error is expected when the underlying source closes.
/// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the
/// derivation process.
pub async fn step(&mut self) -> StageResult<()> {
async fn step(&mut self) -> anyhow::Result<()> {
tracing::info!("DerivationPipeline::step");

// Reset the pipeline if needed.
if self.needs_reset {
let block_info = self.reset.block_info().await;
let system_config = self.reset.system_config().await;
self.reset_pipe(block_info, &system_config).await?;
self.reset_pipe(block_info, &system_config).await.map_err(|e| anyhow::anyhow!(e))?;
self.needs_reset = false;
}

Expand All @@ -95,15 +74,43 @@ impl<
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
self.attributes.advance_origin().await?;
self.attributes.advance_origin().await.map_err(|e| anyhow::anyhow!(e))?;
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
tracing::error!("attributes queue stage failed: {:?}", err);
return Err(err);
return Err(anyhow::anyhow!(err));
}
}

Ok(())
}
}

impl<S, R> DerivationPipeline<S, R>
where
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Internal helper to reset the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
}
}
13 changes: 13 additions & 0 deletions crates/derive/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Module containing the derivation pipeline.

/// Re-export trait arguments.
pub use crate::traits::{NextAttributes, OriginAdvancer, Pipeline, ResetProvider, ResettableStage};

/// Re-export commonly used types.
pub use crate::types::{StageError, StageResult};

mod builder;
pub use builder::PipelineBuilder;

mod core;
pub use core::DerivationPipeline;
10 changes: 1 addition & 9 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
traits::{NextAttributes, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -30,14 +30,6 @@ pub trait AttributesProvider {
fn is_last_in_span(&self) -> bool;
}

/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [L2PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
Expand Down
3 changes: 1 addition & 2 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ pub use batch_queue::{BatchQueue, BatchQueueProvider};

mod attributes_queue;
pub use attributes_queue::{
AttributesBuilder, AttributesProvider, AttributesQueue, NextAttributes,
StatefulAttributesBuilder,
AttributesBuilder, AttributesProvider, AttributesQueue, StatefulAttributesBuilder,
};

#[cfg(any(test, feature = "test-utils"))]
Expand Down
14 changes: 14 additions & 0 deletions crates/derive/src/traits/attributes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Contains traits for working with payload attributes and their providers.

use crate::types::{L2AttributesWithParent, L2BlockInfo, StageResult};
use alloc::boxed::Box;
use async_trait::async_trait;

/// [NextAttributes] defines the interface for pulling attributes from
/// the top level `AttributesQueue` stage of the pipeline.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}
6 changes: 6 additions & 0 deletions crates/derive/src/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
//! This module contains all of the traits describing functionality of portions of the derivation
//! pipeline.

mod pipeline;
pub use pipeline::Pipeline;

mod attributes;
pub use attributes::NextAttributes;

mod data_sources;
pub use data_sources::{AsyncIterator, BlobProvider, DataAvailabilityProvider};

Expand Down
22 changes: 22 additions & 0 deletions crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Defines the interface for the core derivation pipeline.

use alloc::boxed::Box;
use async_trait::async_trait;
use kona_primitives::{L2AttributesWithParent, L2BlockInfo};

/// This trait defines the interface for interacting with the derivation pipeline.
#[async_trait]
pub trait Pipeline {
/// Resets the pipeline on the next [Pipeline::step] call.
fn reset(&mut self);

/// Attempts to progress the pipeline.
async fn step(&mut self) -> anyhow::Result<()>;

/// Pops the next prepared [L2AttributesWithParent] from the pipeline.
fn pop(&mut self) -> Option<L2AttributesWithParent>;

/// Updates the L2 Safe Head cursor of the pipeline.
/// This is used when fetching the next attributes.
fn update_cursor(&mut self, cursor: L2BlockInfo);
}