Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait BatchQueueProvider {
async fn next_batch(
&mut self,
parent: L2BlockInfo,
origins: &[BlockInfo],
l1_origins: &[BlockInfo],
) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
Expand Down
171 changes: 140 additions & 31 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
//! This module contains the `BatchStream` stage.

use alloc::{boxed::Box, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::trace;

use crate::{
batch::{Batch, SingleBatch, SpanBatch},
errors::{PipelineError, PipelineResult},
errors::{PipelineEncodingError, PipelineError, PipelineResult},
stages::BatchQueueProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::trace;

/// Provides [Batch]es for the [BatchStream] stage.
#[async_trait]
Expand Down Expand Up @@ -43,7 +42,7 @@ where
/// There can only be a single staged span batch.
span: Option<SpanBatch>,
/// A buffer of single batches derived from the [SpanBatch].
buffer: Vec<SingleBatch>,
buffer: VecDeque<SingleBatch>,
/// A reference to the rollup config, used to check
/// if the [BatchStream] stage should be activated.
config: Arc<RollupConfig>,
Expand All @@ -55,7 +54,7 @@ where
{
/// Create a new [BatchStream] stage.
pub const fn new(prev: P, config: Arc<RollupConfig>) -> Self {
Self { prev, span: None, buffer: Vec::new(), config }
Self { prev, span: None, buffer: VecDeque::new(), config }
}

/// Returns if the [BatchStream] stage is active based on the
Expand All @@ -66,9 +65,32 @@ where
}

/// Gets a [SingleBatch] from the in-memory buffer.
pub fn get_single_batch(&mut self) -> Option<SingleBatch> {
pub fn get_single_batch(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<SingleBatch> {
trace!(target: "batch_span", "Attempting to get a SingleBatch from buffer len: {}", self.buffer.len());
unimplemented!()

self.try_hydrate_buffer(parent, l1_origins)?;
self.buffer.pop_front().ok_or_else(|| PipelineError::NotEnoughData.temp())
}

/// Hydrates the buffer with single batches derived from the span batch, if there is one
/// queued up.
pub fn try_hydrate_buffer(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<()> {
if let Some(span) = self.span.take() {
self.buffer.extend(
span.get_singular_batches(l1_origins, parent).map_err(|e| {
PipelineError::BadEncoding(PipelineEncodingError::from(e)).crit()
})?,
);
}
Ok(())
}
}

Expand All @@ -83,34 +105,37 @@ where
}
}

async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult<Batch> {
async fn next_batch(
&mut self,
parent: L2BlockInfo,
l1_origins: &[BlockInfo],
) -> PipelineResult<Batch> {
// If the stage is not active, "pass" the next batch
// through this stage to the BatchQueue stage.
if !self.is_active()? {
trace!(target: "batch_span", "BatchStream stage is inactive, pass-through.");
return self.prev.next_batch().await;
}

// First, attempt to pull a SinguleBatch out of the buffer.
if let Some(b) = self.get_single_batch() {
return Ok(Batch::Single(b));
}

// Safety: bubble up any errors from the batch reader.
let batch = self.prev.next_batch().await?;

// If the next batch is a singular batch, it is immediately
// forwarded to the `BatchQueue` stage.
let Batch::Span(b) = batch else {
return Ok(batch);
};
// If the buffer is empty, attempt to pull a batch from the previous stage.
if self.buffer.is_empty() {
// Safety: bubble up any errors from the batch reader.
let batch = self.prev.next_batch().await?;

// Set the current span batch.
self.span = Some(b);
// If the next batch is a singular batch, it is immediately
// forwarded to the `BatchQueue` stage. Otherwise, we buffer
// the span batch in this stage.
match batch {
Batch::Single(b) => return Ok(Batch::Single(b)),
Batch::Span(b) => {
// TODO: New span batch prefix checks.
self.span = Some(b)
}
}
}

// Attempt to pull a SingleBatch out of the SpanBatch.
self.get_single_batch()
.map_or_else(|| Err(PipelineError::NotEnoughData.temp()), |b| Ok(Batch::Single(b)))
self.get_single_batch(parent, l1_origins).map(Batch::Single)
}
}

Expand Down Expand Up @@ -150,7 +175,7 @@ where
mod test {
use super::*;
use crate::{
batch::SingleBatch,
batch::{SingleBatch, SpanBatchElement},
stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage},
};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -177,4 +202,88 @@ mod test {
assert_eq!(logs.len(), 1);
assert!(logs[0].contains("BatchStream stage is inactive, pass-through."));
}

#[tokio::test]
async fn test_span_buffer() {
let mock_batch = SpanBatch {
batches: vec![
SpanBatchElement { epoch_num: 10, timestamp: 10, ..Default::default() },
SpanBatchElement { epoch_num: 10, timestamp: 12, ..Default::default() },
],
..Default::default()
};
let mock_origins = [BlockInfo { number: 10, timestamp: 12, ..Default::default() }];

let data = vec![Ok(Batch::Span(mock_batch.clone()))];
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());

// The stage should be active.
assert!(stream.is_active().unwrap());

// The next batches should be single batches derived from the span batch.
let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 10);
} else {
panic!("Wrong batch type");
}

let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 12);
} else {
panic!("Wrong batch type");
}

let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
assert_eq!(stream.buffer.len(), 0);
assert!(stream.span.is_none());

// Add more data into the provider, see if the buffer is re-hydrated.
stream.prev.batches.push(Ok(Batch::Span(mock_batch)));

// The next batches should be single batches derived from the span batch.
let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 10);
} else {
panic!("Wrong batch type");
}

let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap();
if let Batch::Single(single) = batch {
assert_eq!(single.epoch_num, 10);
assert_eq!(single.timestamp, 12);
} else {
panic!("Wrong batch type");
}

let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err();
assert_eq!(err, PipelineError::Eof.temp());
assert_eq!(stream.buffer.len(), 0);
assert!(stream.span.is_none());
}

#[tokio::test]
async fn test_single_batch_pass_through() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(data);
let mut stream = BatchStream::new(prev, config.clone());

// The stage should be active.
assert!(stream.is_active().unwrap());

// The next batch should be passed through to the [BatchQueue] stage.
let batch = stream.next_batch(Default::default(), &[]).await.unwrap();
assert!(matches!(batch, Batch::Single(_)));
assert_eq!(stream.buffer.len(), 0);
assert!(stream.span.is_none());
}
}