Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,31 @@ mod tests {
AttributesQueue::new(Arc::new(cfg), mock_batch_queue, mock_attributes_builder)
}

#[tokio::test]
async fn test_attributes_queue_flush() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
attributes_queue.batch = Some(SingleBatch::default());
assert!(!attributes_queue.prev.flushed);
attributes_queue.flush_channel().await.unwrap();
assert!(attributes_queue.prev.flushed);
assert!(attributes_queue.batch.is_none());
}

#[tokio::test]
async fn test_attributes_queue_reset() {
let cfg = RollupConfig::default();
let mock = new_test_attributes_provider(None, vec![]);
let mock_builder = TestAttributesBuilder::default();
let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder);
aq.batch = Some(SingleBatch::default());
assert!(!aq.prev.reset);
let block_info = BlockInfo::default();
let system_config = SystemConfig::default();
aq.reset(block_info, &system_config).await.unwrap();
assert!(aq.batch.is_none());
assert!(aq.prev.reset);
}

#[tokio::test]
async fn test_load_batch_eof() {
let mut attributes_queue = new_attributes_queue(None, None, vec![]);
Expand Down
42 changes: 42 additions & 0 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,48 @@ mod tests {
BatchReader::from(bytes)
}

#[tokio::test]
async fn test_batch_queue_reset() {
let cfg = Arc::new(RollupConfig::default());
let mock = MockBatchQueueProvider::new(vec![]);
let fetcher = TestL2ChainProvider::default();
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.l1_blocks.push(BlockInfo::default());
bq.next_spans.push(SingleBatch::default());
bq.batches.push(BatchWithInclusionBlock {
inclusion_block: BlockInfo::default(),
batch: Batch::Single(SingleBatch::default()),
});
assert!(!bq.prev.reset);
let base = BlockInfo::default();
let system_config = SystemConfig::default();
bq.reset(base, &system_config).await.unwrap();
assert!(bq.prev.reset);
assert_eq!(bq.origin, Some(base));
assert!(bq.batches.is_empty());
assert_eq!(bq.l1_blocks, vec![base]);
assert!(bq.next_spans.is_empty());
}

#[tokio::test]
async fn test_batch_queue_flush() {
let cfg = Arc::new(RollupConfig::default());
let mock = MockBatchQueueProvider::new(vec![]);
let fetcher = TestL2ChainProvider::default();
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.l1_blocks.push(BlockInfo::default());
bq.next_spans.push(SingleBatch::default());
bq.batches.push(BatchWithInclusionBlock {
inclusion_block: BlockInfo::default(),
batch: Batch::Single(SingleBatch::default()),
});
bq.flush_channel().await.unwrap();
assert!(bq.prev.flushed);
assert!(bq.batches.is_empty());
assert!(bq.l1_blocks.is_empty());
assert!(bq.next_spans.is_empty());
}

#[tokio::test]
async fn test_holocene_add_batch_valid() {
// Construct a future single batch.
Expand Down
42 changes: 42 additions & 0 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,48 @@ mod test {
use kona_providers::test_utils::TestL2ChainProvider;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::test]
async fn test_batch_stream_flush() {
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(vec![]);
let mut stream = BatchStream::new(prev, config, TestL2ChainProvider::default());
stream.buffer.push_back(SingleBatch::default());
stream.span = Some(SpanBatch::default());
assert!(!stream.buffer.is_empty());
assert!(stream.span.is_some());
stream.flush();
assert!(stream.buffer.is_empty());
assert!(stream.span.is_none());
}

#[tokio::test]
async fn test_batch_stream_reset() {
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(vec![]);
let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default());
stream.buffer.push_back(SingleBatch::default());
stream.span = Some(SpanBatch::default());
assert!(!stream.prev.reset);
stream.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap();
assert!(stream.prev.reset);
assert!(stream.buffer.is_empty());
assert!(stream.span.is_none());
}

#[tokio::test]
async fn test_batch_stream_flush_channel() {
let config = Arc::new(RollupConfig { holocene_time: Some(0), ..RollupConfig::default() });
let prev = MockBatchStreamProvider::new(vec![]);
let mut stream = BatchStream::new(prev, config.clone(), TestL2ChainProvider::default());
stream.buffer.push_back(SingleBatch::default());
stream.span = Some(SpanBatch::default());
assert!(!stream.prev.flushed);
stream.flush_channel().await.unwrap();
assert!(stream.prev.flushed);
assert!(stream.buffer.is_empty());
assert!(stream.span.is_none());
}

#[tokio::test]
async fn test_batch_stream_inactive() {
let trace_store: TraceStorage = Default::default();
Expand Down
10 changes: 10 additions & 0 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ pub(crate) mod tests {
encode_frames(frames)
}

#[tokio::test]
async fn test_frame_queue_reset() {
let mock = MockFrameQueueProvider::new(vec![]);
let mut frame_queue = FrameQueue::new(mock, Default::default());
assert!(!frame_queue.prev.reset);
frame_queue.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap();
assert_eq!(frame_queue.queue.len(), 0);
assert!(frame_queue.prev.reset);
}

#[tokio::test]
async fn test_frame_queue_empty_bytes() {
let data = vec![Ok(Bytes::from(vec![0x00]))];
Expand Down
13 changes: 13 additions & 0 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ mod tests {
use alloc::vec;
use alloy_primitives::Bytes;

#[tokio::test]
async fn test_l1_retrieval_reset() {
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![], batch_inbox_address: Address::default() };
let mut retrieval = L1Retrieval::new(traversal, dap);
retrieval.prev.block = None;
assert!(retrieval.prev.block.is_none());
retrieval.data = None;
retrieval.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap();
assert!(retrieval.data.is_some());
assert_eq!(retrieval.prev.block, Some(BlockInfo::default()));
}

#[tokio::test]
async fn test_l1_retrieval_origin() {
let traversal = new_populated_test_traversal();
Expand Down
27 changes: 22 additions & 5 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ impl<F: ChainProvider> L1Traversal<F> {
rollup_config: cfg,
}
}

/// Retrieves a reference to the inner data source of the [L1Traversal] stage.
pub const fn data_source(&self) -> &F {
&self.data_source
}
}

#[async_trait]
Expand Down Expand Up @@ -208,6 +203,28 @@ pub(crate) mod tests {
new_test_traversal(blocks, receipts)
}

#[test]
fn test_l1_traversal_batcher_address() {
let mut traversal = new_populated_test_traversal();
traversal.system_config.batcher_address = L1_SYS_CONFIG_ADDR;
assert_eq!(traversal.batcher_addr(), L1_SYS_CONFIG_ADDR);
}

#[tokio::test]
async fn test_l1_traversal_reset() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert!(traversal.advance_origin().await.is_ok());
let base = BlockInfo::default();
let cfg = SystemConfig::default();
traversal.done = true;
assert!(traversal.reset(base, &cfg).await.is_ok());
assert_eq!(traversal.origin(), Some(base));
assert_eq!(traversal.system_config, cfg);
assert!(!traversal.done);
}

#[tokio::test]
async fn test_l1_traversal() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
Expand Down
15 changes: 13 additions & 2 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
batch::Batch,
errors::{PipelineError, PipelineResult},
stages::batch_queue::BatchQueueProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, vec::Vec};
use async_trait::async_trait;
Expand All @@ -20,12 +20,14 @@ pub struct MockBatchQueueProvider {
pub batches: Vec<PipelineResult<Batch>>,
/// Tracks if the provider has been flushed.
pub flushed: bool,
/// Tracks if the reset method was called.
pub reset: bool,
}

impl MockBatchQueueProvider {
/// Creates a new [MockBatchQueueProvider] with the given origin and batches.
pub fn new(batches: Vec<PipelineResult<Batch>>) -> Self {
Self { origin: Some(BlockInfo::default()), batches, flushed: false }
Self { origin: Some(BlockInfo::default()), batches, flushed: false, reset: false }
}
}

Expand Down Expand Up @@ -53,9 +55,18 @@ impl OriginAdvancer for MockBatchQueueProvider {
}
}

#[async_trait]
impl FlushableStage for MockBatchQueueProvider {
async fn flush_channel(&mut self) -> PipelineResult<()> {
self.flushed = true;
Ok(())
}
}

#[async_trait]
impl ResettableStage for MockBatchQueueProvider {
async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> {
self.reset = true;
Ok(())
}
}
17 changes: 15 additions & 2 deletions crates/derive/src/stages/test_utils/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
batch::Batch,
errors::{PipelineError, PipelineResult},
stages::batch_stream::BatchStreamProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
traits::{FlushableStage, OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, vec::Vec};
use async_trait::async_trait;
Expand All @@ -18,12 +18,16 @@ pub struct MockBatchStreamProvider {
pub origin: Option<BlockInfo>,
/// A list of batches to return.
pub batches: Vec<PipelineResult<Batch>>,
/// Wether the reset method was called.
pub reset: bool,
/// Wether the provider was flushed.
pub flushed: bool,
}

impl MockBatchStreamProvider {
/// Creates a new [MockBatchStreamProvider] with the given origin and batches.
pub fn new(batches: Vec<PipelineResult<Batch>>) -> Self {
Self { origin: Some(BlockInfo::default()), batches }
Self { origin: Some(BlockInfo::default()), batches, reset: false, flushed: false }
}
}

Expand All @@ -33,6 +37,14 @@ impl OriginProvider for MockBatchStreamProvider {
}
}

#[async_trait]
impl FlushableStage for MockBatchStreamProvider {
async fn flush_channel(&mut self) -> PipelineResult<()> {
self.flushed = true;
Ok(())
}
}

#[async_trait]
impl BatchStreamProvider for MockBatchStreamProvider {
fn flush(&mut self) {}
Expand All @@ -52,6 +64,7 @@ impl OriginAdvancer for MockBatchStreamProvider {
#[async_trait]
impl ResettableStage for MockBatchStreamProvider {
async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> {
self.reset = true;
Ok(())
}
}
5 changes: 4 additions & 1 deletion crates/derive/src/stages/test_utils/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ pub struct MockFrameQueueProvider {
pub data: Vec<PipelineResult<Bytes>>,
/// The origin to return.
pub origin: Option<BlockInfo>,
/// Wether the reset method was called.
pub reset: bool,
}

impl MockFrameQueueProvider {
/// Creates a new [MockFrameQueueProvider] with the given data.
pub const fn new(data: Vec<PipelineResult<Bytes>>) -> Self {
Self { data, origin: None }
Self { data, origin: None, reset: false }
}

/// Sets the origin for the [MockFrameQueueProvider].
Expand Down Expand Up @@ -57,6 +59,7 @@ impl FrameQueueProvider for MockFrameQueueProvider {
#[async_trait]
impl ResettableStage for MockFrameQueueProvider {
async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> {
self.reset = true;
Ok(())
}
}
17 changes: 15 additions & 2 deletions crates/derive/src/test_utils/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
batch::SingleBatch,
errors::{BuilderError, PipelineError, PipelineErrorKind, PipelineResult},
stages::AttributesProvider,
traits::{AttributesBuilder, OriginAdvancer, OriginProvider, ResettableStage},
traits::{AttributesBuilder, FlushableStage, OriginAdvancer, OriginProvider, ResettableStage},
};
use alloc::{boxed::Box, string::ToString, vec::Vec};
use alloy_eips::BlockNumHash;
Expand Down Expand Up @@ -45,6 +45,10 @@ pub struct TestAttributesProvider {
origin: Option<BlockInfo>,
/// A list of batches to return.
batches: Vec<PipelineResult<SingleBatch>>,
/// Tracks if the provider has been reset.
pub reset: bool,
/// Tracks if the provider has been flushed.
pub flushed: bool,
}

impl OriginProvider for TestAttributesProvider {
Expand All @@ -60,9 +64,18 @@ impl OriginAdvancer for TestAttributesProvider {
}
}

#[async_trait]
impl FlushableStage for TestAttributesProvider {
async fn flush_channel(&mut self) -> PipelineResult<()> {
self.flushed = true;
Ok(())
}
}

#[async_trait]
impl ResettableStage for TestAttributesProvider {
async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> {
self.reset = true;
Ok(())
}
}
Expand All @@ -83,5 +96,5 @@ pub const fn new_test_attributes_provider(
origin: Option<BlockInfo>,
batches: Vec<PipelineResult<SingleBatch>>,
) -> TestAttributesProvider {
TestAttributesProvider { origin, batches }
TestAttributesProvider { origin, batches, reset: false, flushed: false }
}