Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
ChannelReader<
ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<OracleL1ChainProvider<O>>>>>,
>,
OracleL2ChainProvider<O>,
>,
OracleL2ChainProvider<O>,
>,
Expand Down
88 changes: 88 additions & 0 deletions crates/derive/src/batch/span_batch/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ impl SpanBatch {
self.batches[0].timestamp
}

/// Returns the ending timestamp for the span batch.
pub fn end_timestamp(&self) -> u64 {
self.batches.last().map_or(0, |b| b.timestamp)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should follow the same behavior as timestamp and panic if batches is empty. Getting 0 could cause some weird UB.

}

/// Returns the epoch number for the first batch in the span.
pub fn starting_epoch_num(&self) -> u64 {
self.batches[0].epoch_num
Expand All @@ -57,6 +62,89 @@ impl SpanBatch {
self.parent_check == hash[..20]
}

/// Perform holocene checks on the span batch.
pub async fn is_batch_holocene_valid<BF: L2ChainProvider>(
&self,
l2_safe_head: L2BlockInfo,
block_time: u64,
fetcher: &mut BF,
) -> bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use BatchValidity? There are circumstances within these checks where we're undecided, not Drop.

let mut parent_num = l2_safe_head.block_info.number;
let mut parent_block = l2_safe_head;
let next_timestamp = l2_safe_head.block_info.timestamp + block_time;

// If the span batch L1 origin check is not part of
// the canonical L1 chain, the span batch is invalid.
let starting_epoch_num = self.starting_epoch_num();
if starting_epoch_num > parent_block.l1_origin.number + 1 {
warn!(
"batch is for future epoch too far ahead, while it has the next timestamp, so it must be invalid. starting epoch: {} | next epoch: {}",
starting_epoch_num,
parent_block.l1_origin.number + 1
);
return false;
}
// Check if the batch is too old.
if starting_epoch_num < parent_block.l1_origin.number {
warn!("dropped batch, epoch is too old, minimum: {:?}", parent_block.block_info.id());
return false;
}
Comment on lines +76 to +91
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// A failed parent check invalidates the span batch.
if self.timestamp() < next_timestamp {
if self.timestamp() > l2_safe_head.block_info.timestamp {
// Batch timestamp cannot be between safe head and next timestamp.
warn!("batch has misaligned timestamp, block time is too short");
return false;
}
if (l2_safe_head.block_info.timestamp - self.timestamp()) % block_time != 0 {
warn!("batch has misaligned timestamp, not overlapped exactly");
return false;
}
parent_num = l2_safe_head.block_info.number -
(l2_safe_head.block_info.timestamp - self.timestamp()) / block_time -
1;
parent_block = match fetcher.l2_block_info_by_number(parent_num).await {
Ok(block) => block,
Err(e) => {
warn!("failed to fetch L2 block number {parent_num}: {e}");
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RPC fetch failing should be temporary - should not influence batch validity

}
};
}
if !self.check_parent_hash(parent_block.block_info.hash) {
warn!(
"parent block number mismatch, expected: {parent_num}, received: {}, parent hash: {}, self hash: {}",
parent_block.block_info.number,
parent_block.block_info.hash,
self.parent_check,
);
return false;
}

// If starting timestamp of the span > next_timestamp, the span batch is invalid,
// because we disallow gaps due to the new strict batch ordering rules.
if self.timestamp() > next_timestamp {
warn!(
"received out-of-order batch for future processing after next batch ({} > {})",
self.timestamp(),
next_timestamp
);
return false;
}

// If span_end.timestamp < next_timestamp, the span batch is invalid,
// as it doesn't contain any new batches. This would also happen if
// applying timestamp checks to each derived singular batch individually.
if self.end_timestamp() < next_timestamp {
warn!("span batch has no new blocks after safe head");
return false;
}

// The span batch is valid post-holocene!
true
}

/// Checks if the span batch is valid.
pub async fn check_batch<BF: L2ChainProvider>(
&self,
Expand Down
5 changes: 5 additions & 0 deletions crates/derive/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub enum PipelineError {
/// Provider error variant.
#[error("Blob provider error: {0}")]
Provider(String),
/// An invalid span batch is found by the [BatchStream] stage.
///
/// [BatchStream]: crate::stages::BatchStream
#[error("Invalid span batch")]
InvalidSpanBatch,
}

impl PipelineError {
Expand Down
7 changes: 4 additions & 3 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type L1RetrievalStage<DAP, P> = L1Retrieval<DAP, L1TraversalStage<P>>;
type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelBankStage<DAP, P> = ChannelBank<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelBankStage<DAP, P>>;
type BatchStreamStage<DAP, P> = BatchStream<ChannelReaderStage<DAP, P>>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P>, T>;
type BatchStreamStage<DAP, P, T> = BatchStream<ChannelReaderStage<DAP, P>, T>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P, T>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;

/// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern.
Expand Down Expand Up @@ -134,7 +134,8 @@ where
let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config));
let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config));
let batch_stream = BatchStream::new(channel_reader, rollup_config.clone());
let batch_stream =
BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone());
let batch_queue =
BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());
let attributes =
Expand Down
4 changes: 2 additions & 2 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait BatchQueueProvider {
/// complete and the batch has been consumed, an [PipelineError::Eof] error is returned.
///
/// [ChannelReader]: crate::stages::ChannelReader
async fn next_batch(&mut self) -> PipelineResult<Batch>;
async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
/// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op.
Expand Down Expand Up @@ -336,7 +336,7 @@ where

// Load more data into the batch queue.
let mut out_of_data = false;
match self.prev.next_batch().await {
match self.prev.next_batch(parent).await {
Ok(b) => {
if !origin_behind {
self.add_batch(b, parent).await.ok();
Expand Down
Loading