Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 93 additions & 58 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,121 @@
//! Contains a concrete implementation of the [DerivationPipeline].

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
stages::NextAttributes,
traits::{OriginAdvancer, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig,
},
traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider},
types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult},
};
use alloc::sync::Arc;
use alloc::{boxed::Box, collections::VecDeque};
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides the [BlockInfo] and [SystemConfig] for the stack to reset the stages.
#[async_trait]
pub trait ResetProvider {
/// Returns the current [BlockInfo] for the pipeline to reset.
async fn block_info(&self) -> BlockInfo;

/// Returns the current [SystemConfig] for the pipeline to reset.
async fn system_config(&self) -> SystemConfig;
}

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<N: NextAttributes + Debug> {
/// The attributes queue to retrieve the next attributes.
pub attributes: N,
pub struct DerivationPipeline<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> {
/// A handle to the next attributes.
pub attributes: S,
/// Reset provider for the pipeline.
pub reset: R,
/// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer.
pub prepared: VecDeque<L2AttributesWithParent>,
/// A flag to tell the pipeline to reset.
pub needs_reset: bool,
/// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes.
pub cursor: L2BlockInfo,
}

impl<N: NextAttributes + Debug + Send> DerivationPipeline<N> {
impl<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> DerivationPipeline<S, R>
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: N, cursor: L2BlockInfo) -> Self {
Self { attributes, cursor }
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Get the next attributes from the pipeline.
pub async fn next(&mut self) -> StageResult<L2AttributesWithParent> {
self.attributes.next_attributes(self.cursor).await
/// Returns the next [L2AttributesWithParent] from the pipeline.
pub fn next_attributes(&mut self) -> Option<L2AttributesWithParent> {
self.prepared.pop_front()
}
}

impl<P, DAP, F, B> DerivationPipeline<KonaAttributes<P, DAP, F, B>>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Creates a new instance of the [DerivationPipeline] from the given attributes.
pub fn new_online_pipeline(
attributes: KonaAttributes<P, DAP, F, B>,
cursor: L2BlockInfo,
) -> Self {
Self::new(attributes, cursor)
/// Flags the pipeline to reset on the next [DerivationPipeline::step] call.
pub fn reset(&mut self) {
self.needs_reset = true;
}
}

/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type.
pub type KonaDerivationPipeline<P, DAP, F, B> = DerivationPipeline<KonaAttributes<P, DAP, F, B>>;
/// Resets the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
}

/// [KonaAttributes] is a concrete [NextAttributes] type.
pub type KonaAttributes<P, DAP, F, B> = AttributesQueue<
BatchQueue<ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<P>>>>>, F>,
B,
>;
/// Attempts to progress the pipeline.
/// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data.
/// Any other error is critical and the derivation pipeline should be reset.
/// An error is expected when the underlying source closes.
/// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the
/// derivation process.
pub async fn step(&mut self) -> StageResult<()> {
tracing::info!("DerivationPipeline::step");

/// Creates a new [KonaAttributes] instance.
pub fn new_online_pipeline<P, DAP, F, B>(
rollup_config: Arc<RollupConfig>,
chain_provider: P,
dap_source: DAP,
fetcher: F,
builder: B,
) -> KonaAttributes<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
// Reset the pipeline if needed.
if self.needs_reset {
let block_info = self.reset.block_info().await;
let system_config = self.reset.system_config().await;
self.reset_pipe(block_info, &system_config).await?;
self.needs_reset = false;
}

match self.attributes.next_attributes(self.cursor).await {
Ok(a) => {
tracing::info!("attributes queue stage step returned l2 attributes");
tracing::info!("prepared L2 attributes: {:?}", a);
self.prepared.push_back(a);
return Ok(());
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
self.attributes.advance_origin().await?;
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
tracing::error!("attributes queue stage failed: {:?}", err);
return Err(err);
}
}

Ok(())
}
}
35 changes: 33 additions & 2 deletions crates/derive/src/online/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,42 @@
/// Prelude for online providers.
pub(crate) mod prelude {
pub use super::{
AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient,
OnlineBlobProvider, SimpleSlotDerivation,
new_online_stack, AlloyChainProvider, AlloyL2ChainProvider, BeaconClient,
OnlineBeaconClient, OnlineBlobProvider, SimpleSlotDerivation,
};
}

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
},
traits::{
ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage,
},
types::RollupConfig,
};
use alloc::sync::Arc;
use core::fmt::Debug;

/// Creates a new [OnlineStageStack].
#[cfg(feature = "online")]
pub fn new_online_stack(
rollup_config: Arc<RollupConfig>,
chain_provider: impl ChainProvider + Clone + Debug + Send,
dap_source: impl DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
fetcher: impl L2ChainProvider + Clone + Debug + Send,
builder: impl AttributesBuilder + Clone + Debug + Send,
) -> impl NextAttributes + ResettableStage + Debug + Send {
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
}

#[cfg(test)]
#[allow(unreachable_pub)]
pub mod test_utils;
Expand Down
40 changes: 33 additions & 7 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
traits::{OriginProvider, ResettableStage},
traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -53,7 +53,7 @@ pub trait NextAttributes {
#[derive(Debug)]
pub struct AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
Expand All @@ -70,7 +70,7 @@ where

impl<P, AB> AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
Expand Down Expand Up @@ -147,10 +147,31 @@ where
}
}

impl<P, AB> PreviousStage for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
fn previous(&self) -> Option<Box<&dyn PreviousStage>> {
Some(Box::new(&self.prev))
}
}

#[async_trait]
impl<P, AB> OriginAdvancer for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn advance_origin(&mut self) -> StageResult<()> {
self.prev.advance_origin().await
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug + Send,
P: AttributesProvider + PreviousStage + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
Expand All @@ -163,7 +184,7 @@ where

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
Expand All @@ -174,10 +195,15 @@ where
#[async_trait]
impl<P, AB> ResettableStage for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Send + Debug,
P: AttributesProvider + PreviousStage + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
async fn reset(
&mut self,
block_info: BlockInfo,
system_config: &SystemConfig,
) -> StageResult<()> {
self.prev.reset(block_info, system_config).await?;
info!("resetting attributes queue");
self.batch = None;
self.is_last_in_span = false;
Expand Down
Loading