diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index d4196814c4..910574de58 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -25,7 +25,7 @@ pub trait BatchQueueProvider { async fn next_batch( &mut self, parent: L2BlockInfo, - origins: &[BlockInfo], + l1_origins: &[BlockInfo], ) -> PipelineResult; /// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream] diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index a4abcde805..a622d31415 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -1,18 +1,17 @@ //! This module contains the `BatchStream` stage. -use alloc::{boxed::Box, sync::Arc, vec::Vec}; -use async_trait::async_trait; -use core::fmt::Debug; -use op_alloy_genesis::{RollupConfig, SystemConfig}; -use op_alloy_protocol::{BlockInfo, L2BlockInfo}; -use tracing::trace; - use crate::{ batch::{Batch, SingleBatch, SpanBatch}, - errors::{PipelineError, PipelineResult}, + errors::{PipelineEncodingError, PipelineError, PipelineResult}, stages::BatchQueueProvider, traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; +use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; +use async_trait::async_trait; +use core::fmt::Debug; +use op_alloy_genesis::{RollupConfig, SystemConfig}; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; +use tracing::trace; /// Provides [Batch]es for the [BatchStream] stage. #[async_trait] @@ -43,7 +42,7 @@ where /// There can only be a single staged span batch. span: Option, /// A buffer of single batches derived from the [SpanBatch]. - buffer: Vec, + buffer: VecDeque, /// A reference to the rollup config, used to check /// if the [BatchStream] stage should be activated. config: Arc, @@ -55,7 +54,7 @@ where { /// Create a new [BatchStream] stage. pub const fn new(prev: P, config: Arc) -> Self { - Self { prev, span: None, buffer: Vec::new(), config } + Self { prev, span: None, buffer: VecDeque::new(), config } } /// Returns if the [BatchStream] stage is active based on the @@ -66,9 +65,32 @@ where } /// Gets a [SingleBatch] from the in-memory buffer. - pub fn get_single_batch(&mut self) -> Option { + pub fn get_single_batch( + &mut self, + parent: L2BlockInfo, + l1_origins: &[BlockInfo], + ) -> PipelineResult { trace!(target: "batch_span", "Attempting to get a SingleBatch from buffer len: {}", self.buffer.len()); - unimplemented!() + + self.try_hydrate_buffer(parent, l1_origins)?; + self.buffer.pop_front().ok_or_else(|| PipelineError::NotEnoughData.temp()) + } + + /// Hydrates the buffer with single batches derived from the span batch, if there is one + /// queued up. + pub fn try_hydrate_buffer( + &mut self, + parent: L2BlockInfo, + l1_origins: &[BlockInfo], + ) -> PipelineResult<()> { + if let Some(span) = self.span.take() { + self.buffer.extend( + span.get_singular_batches(l1_origins, parent).map_err(|e| { + PipelineError::BadEncoding(PipelineEncodingError::from(e)).crit() + })?, + ); + } + Ok(()) } } @@ -83,7 +105,11 @@ where } } - async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult { + async fn next_batch( + &mut self, + parent: L2BlockInfo, + l1_origins: &[BlockInfo], + ) -> PipelineResult { // If the stage is not active, "pass" the next batch // through this stage to the BatchQueue stage. if !self.is_active()? { @@ -91,26 +117,25 @@ where return self.prev.next_batch().await; } - // First, attempt to pull a SinguleBatch out of the buffer. - if let Some(b) = self.get_single_batch() { - return Ok(Batch::Single(b)); - } - - // Safety: bubble up any errors from the batch reader. - let batch = self.prev.next_batch().await?; - - // If the next batch is a singular batch, it is immediately - // forwarded to the `BatchQueue` stage. - let Batch::Span(b) = batch else { - return Ok(batch); - }; + // If the buffer is empty, attempt to pull a batch from the previous stage. + if self.buffer.is_empty() { + // Safety: bubble up any errors from the batch reader. + let batch = self.prev.next_batch().await?; - // Set the current span batch. - self.span = Some(b); + // If the next batch is a singular batch, it is immediately + // forwarded to the `BatchQueue` stage. Otherwise, we buffer + // the span batch in this stage. + match batch { + Batch::Single(b) => return Ok(Batch::Single(b)), + Batch::Span(b) => { + // TODO: New span batch prefix checks. + self.span = Some(b) + } + } + } // Attempt to pull a SingleBatch out of the SpanBatch. - self.get_single_batch() - .map_or_else(|| Err(PipelineError::NotEnoughData.temp()), |b| Ok(Batch::Single(b))) + self.get_single_batch(parent, l1_origins).map(Batch::Single) } } @@ -150,7 +175,7 @@ where mod test { use super::*; use crate::{ - batch::SingleBatch, + batch::{SingleBatch, SpanBatchElement}, stages::test_utils::{CollectingLayer, MockBatchStreamProvider, TraceStorage}, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -177,4 +202,88 @@ mod test { assert_eq!(logs.len(), 1); assert!(logs[0].contains("BatchStream stage is inactive, pass-through.")); } + + #[tokio::test] + async fn test_span_buffer() { + let mock_batch = SpanBatch { + batches: vec![ + SpanBatchElement { epoch_num: 10, timestamp: 10, ..Default::default() }, + SpanBatchElement { epoch_num: 10, timestamp: 12, ..Default::default() }, + ], + ..Default::default() + }; + let mock_origins = [BlockInfo { number: 10, timestamp: 12, ..Default::default() }]; + + let data = vec![Ok(Batch::Span(mock_batch.clone()))]; + let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); + let prev = MockBatchStreamProvider::new(data); + let mut stream = BatchStream::new(prev, config.clone()); + + // The stage should be active. + assert!(stream.is_active().unwrap()); + + // The next batches should be single batches derived from the span batch. + let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); + if let Batch::Single(single) = batch { + assert_eq!(single.epoch_num, 10); + assert_eq!(single.timestamp, 10); + } else { + panic!("Wrong batch type"); + } + + let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); + if let Batch::Single(single) = batch { + assert_eq!(single.epoch_num, 10); + assert_eq!(single.timestamp, 12); + } else { + panic!("Wrong batch type"); + } + + let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + assert_eq!(stream.buffer.len(), 0); + assert!(stream.span.is_none()); + + // Add more data into the provider, see if the buffer is re-hydrated. + stream.prev.batches.push(Ok(Batch::Span(mock_batch))); + + // The next batches should be single batches derived from the span batch. + let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); + if let Batch::Single(single) = batch { + assert_eq!(single.epoch_num, 10); + assert_eq!(single.timestamp, 10); + } else { + panic!("Wrong batch type"); + } + + let batch = stream.next_batch(Default::default(), &mock_origins).await.unwrap(); + if let Batch::Single(single) = batch { + assert_eq!(single.epoch_num, 10); + assert_eq!(single.timestamp, 12); + } else { + panic!("Wrong batch type"); + } + + let err = stream.next_batch(Default::default(), &mock_origins).await.unwrap_err(); + assert_eq!(err, PipelineError::Eof.temp()); + assert_eq!(stream.buffer.len(), 0); + assert!(stream.span.is_none()); + } + + #[tokio::test] + async fn test_single_batch_pass_through() { + let data = vec![Ok(Batch::Single(SingleBatch::default()))]; + let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() }); + let prev = MockBatchStreamProvider::new(data); + let mut stream = BatchStream::new(prev, config.clone()); + + // The stage should be active. + assert!(stream.is_active().unwrap()); + + // The next batch should be passed through to the [BatchQueue] stage. + let batch = stream.next_batch(Default::default(), &[]).await.unwrap(); + assert!(matches!(batch, Batch::Single(_))); + assert_eq!(stream.buffer.len(), 0); + assert!(stream.span.is_none()); + } }