Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{
stages::NextAttributes,
traits::ResettableStage,
traits::{OriginAdvancer, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig,
},
Expand All @@ -24,12 +24,11 @@ pub trait ResetProvider {
/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<
S: NextAttributes + ResettableStage + Debug + Send,
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> {
/// The stack of stages in the pipeline.
/// The stack is reponsible for advancing the L1 traversal stage.
pub stack: S,
/// A handle to the next attributes.
pub attributes: S,
/// Reset provider for the pipeline.
pub reset: R,
/// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer.
Expand All @@ -40,12 +39,14 @@ pub struct DerivationPipeline<
pub cursor: L2BlockInfo,
}

impl<S: NextAttributes + ResettableStage + Debug + Send, R: ResetProvider + Send>
DerivationPipeline<S, R>
impl<
S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send,
R: ResetProvider + Send,
> DerivationPipeline<S, R>
{
/// Creates a new instance of the [DerivationPipeline].
pub fn new(stack: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { stack, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self {
Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
Expand All @@ -63,6 +64,23 @@ impl<S: NextAttributes + ResettableStage + Debug + Send, R: ResetProvider + Send
self.needs_reset = true;
}

/// Resets the pipeline.
async fn reset_pipe(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> {
match self.attributes.reset(bi, sc).await {
Ok(()) => {
tracing::info!("Stages reset");
}
Err(StageError::Eof) => {
tracing::info!("Stages reset with EOF");
}
Err(err) => {
tracing::error!("Stages reset failed: {:?}", err);
return Err(err);
}
}
Ok(())
}

/// Attempts to progress the pipeline.
/// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data.
/// Any other error is critical and the derivation pipeline should be reset.
Expand All @@ -76,12 +94,11 @@ impl<S: NextAttributes + ResettableStage + Debug + Send, R: ResetProvider + Send
if self.needs_reset {
let block_info = self.reset.block_info().await;
let system_config = self.reset.system_config().await;
self.stack.reset(block_info, &system_config).await?;
self.reset_pipe(block_info, &system_config).await?;
self.needs_reset = false;
}

// Step over the engine queue.
match self.stack.next_attributes(self.cursor).await {
match self.attributes.next_attributes(self.cursor).await {
Ok(a) => {
tracing::info!("attributes queue stage step returned l2 attributes");
tracing::info!("prepared L2 attributes: {:?}", a);
Expand All @@ -90,6 +107,7 @@ impl<S: NextAttributes + ResettableStage + Debug + Send, R: ResetProvider + Send
}
Err(StageError::Eof) => {
tracing::info!("attributes queue stage complete");
self.attributes.advance_origin().await?;
}
// TODO: match on the EngineELSyncing error here and log
Err(err) => {
Expand Down
5 changes: 0 additions & 5 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ pub mod stages;
pub mod traits;
pub mod types;

#[cfg(feature = "online")]
mod stack;
#[cfg(feature = "online")]
pub use stack::*;

#[cfg(feature = "online")]
mod online;
#[cfg(feature = "online")]
Expand Down
35 changes: 33 additions & 2 deletions crates/derive/src/online/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,42 @@
/// Prelude for online providers.
pub(crate) mod prelude {
pub use super::{
AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient,
OnlineBlobProvider, SimpleSlotDerivation,
new_online_stack, AlloyChainProvider, AlloyL2ChainProvider, BeaconClient,
OnlineBeaconClient, OnlineBlobProvider, SimpleSlotDerivation,
};
}

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
},
traits::{
ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage,
},
types::RollupConfig,
};
use alloc::sync::Arc;
use core::fmt::Debug;

/// Creates a new [OnlineStageStack].
#[cfg(feature = "online")]
pub fn new_online_stack(
rollup_config: Arc<RollupConfig>,
chain_provider: impl ChainProvider + Clone + Debug + Send,
dap_source: impl DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
fetcher: impl L2ChainProvider + Clone + Debug + Send,
builder: impl AttributesBuilder + Clone + Debug + Send,
) -> impl NextAttributes + ResettableStage + Debug + Send {
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
}

#[cfg(test)]
#[allow(unreachable_pub)]
pub mod test_utils;
Expand Down
128 changes: 0 additions & 128 deletions crates/derive/src/stack.rs

This file was deleted.

31 changes: 20 additions & 11 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains the logic for the `AttributesQueue` stage.

use crate::{
traits::{OriginProvider, PreviousStage, ResettableStage},
traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage},
types::{
BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError,
RollupConfig, SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -53,7 +53,7 @@ pub trait NextAttributes {
#[derive(Debug)]
pub struct AttributesQueue<P, AB>
where
P: AttributesProvider + ResettableStage + PreviousStage + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
/// The rollup config.
Expand All @@ -70,7 +70,7 @@ where

impl<P, AB> AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
/// Create a new [AttributesQueue] stage.
Expand Down Expand Up @@ -149,20 +149,29 @@ where

impl<P, AB> PreviousStage for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug,
AB: AttributesBuilder + Debug,
P: AttributesProvider + PreviousStage + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
type Previous = P;
fn previous(&self) -> Option<Box<&dyn PreviousStage>> {
Some(Box::new(&self.prev))
}
}

fn previous(&self) -> Option<&P> {
Some(&self.prev)
#[async_trait]
impl<P, AB> OriginAdvancer for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn advance_origin(&mut self) -> StageResult<()> {
self.prev.advance_origin().await
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send,
P: AttributesProvider + PreviousStage + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
Expand All @@ -175,7 +184,7 @@ where

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug,
P: AttributesProvider + PreviousStage + Debug,
AB: AttributesBuilder + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
Expand All @@ -186,7 +195,7 @@ where
#[async_trait]
impl<P, AB> ResettableStage for AttributesQueue<P, AB>
where
P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug,
P: AttributesProvider + PreviousStage + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(
Expand Down
Loading