Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/derive/src/batch/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ impl BatchValidity {
pub const fn is_drop(&self) -> bool {
matches!(self, Self::Drop)
}

/// Returns if the batch is future.
pub const fn is_future(&self) -> bool {
matches!(self, Self::Future)
}
}
307 changes: 303 additions & 4 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ where
let mut next_batch = None;
let next_timestamp = parent.block_info.timestamp + self.cfg.block_time;

let origin = self.origin.ok_or(PipelineError::MissingOrigin.crit())?;

// Go over all batches, in order of inclusion, and find the first batch we can accept.
// Filter in-place by only remembering the batches that may be processed in the future, or
// any undecided ones.
Expand All @@ -151,7 +153,15 @@ where
batch.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await;
match validity {
BatchValidity::Future => {
remaining.push(batch.clone());
// Drop Future batches post-holocene.
//
// See: <https://specs.optimism.io/protocol/holocene/derivation.html#batch-queue>
if !self.cfg.is_holocene_active(origin.timestamp) {
remaining.push(batch.clone());
} else {
self.prev.flush();
warn!(target: "batch-queue", "[HOLOCENE] Dropping future batch with parent: {}", parent.block_info.number);
}
}
BatchValidity::Drop => {
// If we drop a batch, flush previous batches buffered in the BatchStream
Expand Down Expand Up @@ -184,9 +194,8 @@ where
// If the current epoch is too old compared to the L1 block we are at,
// i.e. if the sequence window expired, we create empty batches for the current epoch
let expiry_epoch = epoch.number + self.cfg.seq_window_size;
let bq_origin = self.origin.ok_or(PipelineError::MissingOrigin.crit())?;
let force_empty_batches =
(expiry_epoch == bq_origin.number && empty) || expiry_epoch < bq_origin.number;
(expiry_epoch == origin.number && empty) || expiry_epoch < origin.number;
let first_of_epoch = epoch.number == parent.l1_origin.number + 1;

// If the sequencer window did not expire,
Expand Down Expand Up @@ -243,7 +252,12 @@ where
let origin = self.origin.ok_or(PipelineError::MissingOrigin.crit())?;
let data = BatchWithInclusionBlock { inclusion_block: origin, batch };
// If we drop the batch, validation logs the drop reason with WARN level.
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await.is_drop() {
let validity =
data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await;
// Post-Holocene, future batches are dropped due to prevent gaps.
let drop = validity.is_drop() ||
(self.cfg.is_holocene_active(origin.timestamp) && validity.is_future());
if drop {
self.prev.flush();
return Ok(());
}
Expand Down Expand Up @@ -483,6 +497,109 @@ mod tests {
BatchReader::from(bytes)
}

#[tokio::test]
async fn test_holocene_add_batch_valid() {
// Construct a future single batch.
let cfg = Arc::new(RollupConfig {
max_sequencer_drift: 700,
holocene_time: Some(0),
..Default::default()
});
assert!(cfg.is_holocene_active(0));
let batch = SingleBatch {
parent_hash: B256::default(),
epoch_num: 0,
epoch_hash: B256::default(),
timestamp: 100,
transactions: Vec::new(),
};
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 100, ..Default::default() },
..Default::default()
};

// Setup batch queue deps
let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))];
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();

// Configure batch queue
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.origin = Some(BlockInfo::default()); // Set the origin
bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks
bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq

// Add the batch to the batch queue
bq.add_batch(Batch::Single(batch), parent).await.unwrap();
assert_eq!(bq.batches.len(), 1);
}

#[tokio::test]
async fn test_holocene_add_batch_future() {
// Construct a future single batch.
let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() });
assert!(cfg.is_holocene_active(0));
let batch = SingleBatch {
parent_hash: B256::default(),
epoch_num: 0,
epoch_hash: B256::default(),
timestamp: 100,
transactions: Vec::new(),
};
let parent = L2BlockInfo::default();

// Setup batch queue deps
let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))];
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();

// Configure batch queue
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.origin = Some(BlockInfo::default()); // Set the origin
bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks
bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq

// Add the batch to the batch queue
bq.add_batch(Batch::Single(batch), parent).await.unwrap();
assert!(bq.batches.is_empty());
}

#[tokio::test]
async fn test_add_batch_drop() {
// Construct a single batch with BatchValidity::Drop.
let cfg = Arc::new(RollupConfig::default());
assert!(!cfg.is_holocene_active(0));
let batch = SingleBatch {
parent_hash: B256::default(),
epoch_num: 0,
epoch_hash: B256::default(),
timestamp: 100,
transactions: Vec::new(),
};
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 101, ..Default::default() },
..Default::default()
};

// Setup batch queue deps
let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))];
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();

// Configure batch queue
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.origin = Some(BlockInfo::default()); // Set the origin
bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks
bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq

// Add the batch to the batch queue
bq.add_batch(Batch::Single(batch), parent).await.unwrap();
assert!(bq.batches.is_empty());
}

#[tokio::test]
async fn test_derive_next_batch_missing_origin() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
Expand All @@ -495,6 +612,188 @@ mod tests {
assert_eq!(result, PipelineError::MissingOrigin.crit());
}

#[tokio::test]
async fn test_derive_next_batch_invalid_parent() {
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<PipelineResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let parent = L2BlockInfo {
l1_origin: BlockNumHash { number: 10, ..Default::default() },
..Default::default()
};
let result = bq.derive_next_batch(false, parent).await.unwrap_err();
assert_eq!(result, PipelineError::MissingOrigin.crit());
}

#[tokio::test]
async fn test_derive_next_batch_no_batches() {
// Setup
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<PipelineResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
bq.origin = Some(BlockInfo::default());
bq.l1_blocks.push(BlockInfo::default());

// Assertions
assert!(bq.batches.is_empty());
assert_eq!(bq.l1_blocks.len(), 1);
let result = bq.derive_next_batch(true, L2BlockInfo::default()).await.unwrap_err();
assert_eq!(result, PipelineError::Eof.temp());
assert!(bq.is_last_in_span());
}

#[tokio::test]
async fn test_derive_next_batch_dont_force_empty_batches() {
// Setup
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<PipelineResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
bq.origin = Some(BlockInfo::default());
bq.l1_blocks.push(BlockInfo::default());

// Assertions
assert!(bq.batches.is_empty());
assert_eq!(bq.l1_blocks.len(), 1);
let result = bq.derive_next_batch(false, L2BlockInfo::default()).await.unwrap_err();
assert_eq!(result, PipelineError::Eof.temp());
assert!(bq.is_last_in_span());
}

#[tokio::test]
async fn test_derive_next_batch_advances_l1_blocks() {
// Setup
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<PipelineResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
bq.origin = Some(BlockInfo::default());
bq.l1_blocks.push(BlockInfo::default());
bq.l1_blocks.push(BlockInfo::default());

// Assertions
assert!(bq.batches.is_empty());
assert_eq!(bq.l1_blocks.len(), 2);
let result = bq.derive_next_batch(true, L2BlockInfo::default()).await.unwrap_err();
assert_eq!(result, PipelineError::Eof.temp());
assert!(bq.is_last_in_span());
assert_eq!(bq.l1_blocks.len(), 1);
}

#[tokio::test]
async fn test_derive_next_batch_future_batch() {
// Construct a future single batch.
let cfg = Arc::new(RollupConfig::default());
assert!(!cfg.is_holocene_active(0)); // Asserts holocene is not active.
let batch = SingleBatch {
parent_hash: B256::default(),
epoch_num: 0,
epoch_hash: B256::default(),
timestamp: 100,
transactions: Vec::new(),
};
let parent = L2BlockInfo::default();

// Setup batch queue deps
let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))];
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();

// Configure batch queue
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.origin = Some(BlockInfo::default()); // Set the origin
bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks
bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq

// Add the batch to the batch queue
bq.add_batch(Batch::Single(batch), parent).await.unwrap();
assert_eq!(bq.batches.len(), 1);

// Derive next batch
let result = bq.derive_next_batch(true, L2BlockInfo::default()).await.unwrap_err();
assert_eq!(result, PipelineError::Eof.temp());
assert!(bq.is_last_in_span());
assert_eq!(bq.batches.len(), 1);
}

#[tokio::test]
async fn test_holocene_derive_next_batch_future() {
let trace_store: TraceStorage = Default::default();
let layer = CollectingLayer::new(trace_store.clone());
tracing_subscriber::Registry::default().with(layer).init();

// Construct a future single batch.
let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() });
assert!(cfg.is_holocene_active(0));
let batch = SingleBatch {
parent_hash: B256::default(),
epoch_num: 0,
epoch_hash: B256::default(),
timestamp: 100,
transactions: Vec::new(),
};
let parent = L2BlockInfo::default();

// Setup batch queue deps
let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))];
let mut mock = MockBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();

// Configure batch queue
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.origin = Some(BlockInfo::default()); // Set the origin
bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks
bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq

// Add the batch to the batch queue
let data = BatchWithInclusionBlock {
inclusion_block: parent.block_info,
batch: Batch::Single(batch),
};
bq.batches.push(data);
assert_eq!(bq.batches.len(), 1);

// Derive next batch
let result = bq.derive_next_batch(true, L2BlockInfo::default()).await.unwrap_err();
assert_eq!(result, PipelineError::Eof.temp());
assert!(bq.is_last_in_span());
assert!(bq.batches.is_empty());

// Validate logs
let logs = trace_store.get_by_level(Level::WARN);
assert_eq!(logs.len(), 1);
let warn_str = "[HOLOCENE] Dropping future batch with parent: 0";
assert!(logs[0].contains(warn_str));
}

#[tokio::test]
async fn test_next_batch_not_enough_data() {
let mut reader = new_batch_reader();
Expand Down
7 changes: 5 additions & 2 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ pub struct MockBatchQueueProvider {
pub origin: Option<BlockInfo>,
/// A list of batches to return.
pub batches: Vec<PipelineResult<Batch>>,
/// Tracks if the provider has been flushed.
pub flushed: bool,
}

impl MockBatchQueueProvider {
/// Creates a new [MockBatchQueueProvider] with the given origin and batches.
pub fn new(batches: Vec<PipelineResult<Batch>>) -> Self {
Self { origin: Some(BlockInfo::default()), batches }
Self { origin: Some(BlockInfo::default()), batches, flushed: false }
}
}

Expand All @@ -35,7 +37,8 @@ impl OriginProvider for MockBatchQueueProvider {

#[async_trait]
impl BatchQueueProvider for MockBatchQueueProvider {
fn flush(&mut self) { /* noop */
fn flush(&mut self) {
self.flushed = true;
}

async fn next_batch(&mut self, _: L2BlockInfo, _: &[BlockInfo]) -> PipelineResult<Batch> {
Expand Down