Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use kona_derive::{
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{BlobProvider, ChainProvider, L2ChainProvider, OriginProvider},
};
Expand Down Expand Up @@ -46,8 +46,10 @@ pub type OracleAttributesBuilder<O> =
/// An oracle-backed attributes queue for the derivation pipeline.
pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
BatchQueue<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
>,
>,
OracleL2ChainProvider<O>,
>,
Expand Down
9 changes: 7 additions & 2 deletions crates/derive/src/online/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use op_alloy_protocol::BlockInfo;

// Pipeline internal stages aren't re-exported at the module-level.
use crate::stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
};

/// An online derivation pipeline.
Expand All @@ -32,7 +33,11 @@ pub type OnlineAttributesBuilder =
/// An `online` attributes queue for the derivation pipeline.
pub type OnlineAttributesQueue<DAP> = AttributesQueue<
BatchQueue<
ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>>,
BatchStream<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>,
>,
>,
AlloyL2ChainProvider,
>,
OnlineAttributesBuilder,
Expand Down
9 changes: 6 additions & 3 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use super::{
AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, L2ChainProvider,
};
use crate::stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal,
AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal,
};
use alloc::sync::Arc;
use core::fmt::Debug;
Expand All @@ -16,7 +17,8 @@ type L1RetrievalStage<DAP, P> = L1Retrieval<DAP, L1TraversalStage<P>>;
type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelBankStage<DAP, P> = ChannelBank<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelBankStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<ChannelReaderStage<DAP, P>, T>;
type BatchStreamStage<DAP, P> = BatchStream<ChannelReaderStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;

/// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern.
Expand Down Expand Up @@ -132,8 +134,9 @@ where
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config));
let batch_stream = BatchStream::new(channel_reader, rollup_config.clone());
let batch_queue =
BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider.clone());
BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());
let attributes =
AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder);

Expand Down
8 changes: 8 additions & 0 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub trait BatchQueueProvider {
///
/// [ChannelReader]: crate::stages::ChannelReader
async fn next_batch(&mut self) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
/// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op.
fn flush(&mut self);
}

/// [BatchQueue] is responsible for o rdering unordered batches
Expand Down Expand Up @@ -146,6 +150,9 @@ where
remaining.push(batch.clone());
}
BatchValidity::Drop => {
// If we drop a batch, flush previous batches buffered in the BatchStream
// stage.
self.prev.flush();
Comment thread
refcell marked this conversation as resolved.
warn!(target: "batch-queue", "Dropping batch with parent: {}", parent.block_info);
continue;
}
Expand Down Expand Up @@ -233,6 +240,7 @@ where
let data = BatchWithInclusionBlock { inclusion_block: origin, batch };
// If we drop the batch, validation logs the drop reason with WARN level.
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await.is_drop() {
self.prev.flush();
return Ok(());
}
self.batches.push(data);
Expand Down
6 changes: 6 additions & 0 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ impl<P> BatchQueueProvider for BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
fn flush(&mut self) {
if self.is_active().unwrap_or(false) {
self.buffer.clear();
}
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
// If the stage is not active, "pass" the next batch
// through this stage to the BatchQueue stage.
Expand Down
3 changes: 3 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ impl<P> BatchQueueProvider for ChannelReader<P>
where
P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
fn flush(&mut self) { /* noop */
Comment thread
refcell marked this conversation as resolved.
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer);
if let Err(e) = self.set_batch_reader().await {
Expand Down
3 changes: 3 additions & 0 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ impl OriginProvider for MockBatchQueueProvider {

#[async_trait]
impl BatchQueueProvider for MockBatchQueueProvider {
fn flush(&mut self) { /* noop */
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
self.batches.pop().ok_or(PipelineError::Eof.temp())?
}
Expand Down