Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use kona_derive::{
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{BlobProvider, ChainProvider, L2ChainProvider, OriginProvider},
};
Expand Down Expand Up @@ -46,8 +46,10 @@ pub type OracleAttributesBuilder<O> =
/// An oracle-backed attributes queue for the derivation pipeline.
pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
BatchQueue<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
>,
>,
OracleL2ChainProvider<O>,
>,
Expand Down
9 changes: 7 additions & 2 deletions crates/derive/src/online/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use op_alloy_protocol::BlockInfo;

// Pipeline internal stages aren't re-exported at the module-level.
use crate::stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
};

/// An online derivation pipeline.
Expand All @@ -32,7 +33,11 @@ pub type OnlineAttributesBuilder =
/// An `online` attributes queue for the derivation pipeline.
pub type OnlineAttributesQueue<DAP> = AttributesQueue<
BatchQueue<
ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>>,
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>,
>,
>,
AlloyL2ChainProvider,
>,
OnlineAttributesBuilder,
Expand Down
9 changes: 6 additions & 3 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::{
AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, L2ChainProvider,
};
use crate::stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
};
use alloc::sync::Arc;
use core::fmt::Debug;
Expand All @@ -16,7 +17,8 @@ type L1RetrievalStage<DAP, P> = L1Retrieval<DAP, L1TraversalStage<P>>;
type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelBankStage<DAP, P> = ChannelBank<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelBankStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<ChannelReaderStage<DAP, P>, T>;
type BatchStreamStage<DAP, P> = BatchStream<ChannelReaderStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;

/// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern.
Expand Down Expand Up @@ -132,8 +134,9 @@ where
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config));
let batch_stream = BatchStream::new(channel_reader, rollup_config.clone());
let batch_queue =
BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider.clone());
BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());
let attributes =
AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder);

Expand Down