Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 106 additions & 17 deletions crates/protocol/derive/src/stages/batch/batch_stream.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
//! This module contains the `BatchStream` stage.

use crate::{
L2ChainProvider, NextBatchProvider, OriginAdvancer, OriginProvider, PipelineEncodingError,
PipelineError, PipelineResult, Signal, SignalReceiver,
L2ChainProvider, NextBatchProvider, OriginAdvancer, OriginProvider, PipelineError,
PipelineResult, Signal, SignalReceiver,
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use kona_genesis::RollupConfig;
use kona_protocol::{
Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, SingleBatch, SpanBatch,
SpanBatchError,
};

/// Provides [`Batch`]es for the [`BatchStream`] stage.
Expand Down Expand Up @@ -72,11 +73,11 @@ where
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<SingleBatch> {
) -> Result<Option<SingleBatch>, SpanBatchError> {
trace!(target: "batch_span", "Attempting to get a SingleBatch from buffer len: {}", self.buffer.len());

self.try_hydrate_buffer(parent, l1_origins)?;
self.buffer.pop_front().ok_or_else(|| PipelineError::NotEnoughData.temp())
Ok(self.buffer.pop_front())
}

/// Hydrates the buffer with single batches derived from the span batch, if there is one
Expand All @@ -85,19 +86,17 @@ where
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<()> {
) -> Result<(), SpanBatchError> {
if let Some(span) = self.span.take() {
self.buffer.extend(
span.get_singular_batches(l1_origins, parent).map_err(|e| {
PipelineError::BadEncoding(PipelineEncodingError::from(e)).crit()
})?,
);
self.buffer.extend(span.get_singular_batches(l1_origins, parent)?);
Comment thread
theochap marked this conversation as resolved.
}
let batch_count = self.buffer.len() as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count);
#[cfg(feature = "metrics")]
let batch_size = std::mem::size_of_val(&self.buffer) as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size);
{
let batch_count = self.buffer.len() as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_BUFFER, batch_count);
let batch_size = std::mem::size_of_val(&self.buffer) as f64;
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_BATCH_MEM, batch_size);
}
Ok(())
}
}
Expand Down Expand Up @@ -194,7 +193,17 @@ where
}

// Attempt to pull a SingleBatch out of the SpanBatch.
self.get_single_batch(parent, l1_origins).map(Batch::Single)
match self.get_single_batch(parent, l1_origins) {
Ok(Some(single_batch)) => Ok(Batch::Single(single_batch)),
Ok(None) => Err(PipelineError::NotEnoughData.temp()),
Err(e) => {
warn!(target: "batch_span", "Extracting singular batches from span batch failed: {}", e);
// If singular batch extraction fails, it should be handled the same as a
// dropped batch during span batch prefix checks.
self.flush();
Err(PipelineError::NotEnoughData.temp())
Comment thread
theochap marked this conversation as resolved.
}
}
}
}

Expand Down Expand Up @@ -241,9 +250,12 @@ mod test {
types::ResetSignal,
};
use alloc::vec;
use alloy_eips::NumHash;
use kona_genesis::HardForkConfig;
use alloy_consensus::{BlockBody, Header};
use alloy_eips::{BlockNumHash, NumHash};
use alloy_primitives::{FixedBytes, b256};
use kona_genesis::{ChainGenesis, HardForkConfig};
use kona_protocol::{SingleBatch, SpanBatchElement};
use op_alloy_consensus::OpBlock;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::test]
Expand Down Expand Up @@ -399,6 +411,83 @@ mod test {
assert!(stream.span.is_none());
}

#[tokio::test]
async fn test_span_batch_extraction_error_flushes_stage() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

let parent_hash = b256!("1111111111111111111111111111111111111111000000000000000000000000");
let l1_block_hash =
b256!("3333333333333333333333333333333333333333000000000000000000000000");
let config = Arc::new(RollupConfig {
seq_window_size: 100,
block_time: 10,
hardforks: HardForkConfig {
delta_time: Some(0),
holocene_time: Some(0),
..Default::default()
},
genesis: ChainGenesis {
l2: BlockNumHash { number: 40, hash: parent_hash },
..Default::default()
},
..Default::default()
});

let l1_block =
BlockInfo { number: 10, timestamp: 5, hash: l1_block_hash, ..Default::default() };
let l1_blocks = vec![l1_block];
let l2_safe_head = L2BlockInfo {
block_info: BlockInfo { number: 41, timestamp: 10, parent_hash, ..Default::default() },
l1_origin: l1_block.id(),
..Default::default()
};
let l2_parent = L2BlockInfo {
block_info: BlockInfo {
number: 40,
hash: parent_hash,
timestamp: 0,
..Default::default()
},
l1_origin: BlockNumHash { number: 9, ..Default::default() },
..Default::default()
};
let op_block = OpBlock {
header: Header { number: 41, ..Default::default() },
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
};

let span_batch = SpanBatch {
batches: vec![
SpanBatchElement { epoch_num: 9, timestamp: 10, ..Default::default() },
SpanBatchElement { epoch_num: 9, timestamp: 20, ..Default::default() },
SpanBatchElement { epoch_num: 10, timestamp: 30, ..Default::default() },
],
parent_check: FixedBytes::<20>::from_slice(&parent_hash[..20]),
l1_origin_check: FixedBytes::<20>::from_slice(&l1_block_hash[..20]),
..Default::default()
};

let mut prev = TestBatchStreamProvider::new(vec![Ok(Batch::Span(span_batch))]);
prev.origin = Some(l1_block);

let mut provider = TestL2ChainProvider::default();
provider.blocks.push(l2_parent);
provider.op_blocks.push(op_block);

let mut stream = BatchStream::new(prev, config, provider);
let err = stream.next_batch(l2_safe_head, &l1_blocks).await.unwrap_err();

assert_eq!(err, PipelineError::NotEnoughData.temp());
assert!(stream.span.is_none());
assert_eq!(stream.span_buffer_size(), 0);

let logs = trace_store.get_by_level(tracing::Level::WARN);
assert_eq!(logs.len(), 1);
assert!(logs[0].contains("Extracting singular batches from span batch failed: Future batch L1 origin before safe head"));
Comment thread
theochap marked this conversation as resolved.
}
Comment thread
theochap marked this conversation as resolved.

#[tokio::test]
async fn test_single_batch_pass_through() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
Expand Down
3 changes: 3 additions & 0 deletions crates/protocol/protocol/src/batch/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum SpanBatchError {
/// Empty Span Batch
#[error("Empty span batch")]
EmptySpanBatch,
/// Future batch L1 origin before safe head
#[error("Future batch L1 origin before safe head")]
Comment thread
theochap marked this conversation as resolved.
L1OriginBeforeSafeHead,
Comment thread
theochap marked this conversation as resolved.
/// Missing L1 origin
#[error("Missing L1 origin")]
MissingL1Origin,
Expand Down
Loading
Loading