diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 30b462b2c5..2495371df5 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -50,6 +50,7 @@ pub type OracleAttributesQueue = AttributesQueue< ChannelReader< ChannelBank>>>>, >, + OracleL2ChainProvider, >, OracleL2ChainProvider, >, diff --git a/crates/derive/src/batch/span_batch/batch.rs b/crates/derive/src/batch/span_batch/batch.rs index 1d13842b5a..86fbc78f67 100644 --- a/crates/derive/src/batch/span_batch/batch.rs +++ b/crates/derive/src/batch/span_batch/batch.rs @@ -42,6 +42,11 @@ impl SpanBatch { self.batches[0].timestamp } + /// Returns the ending timestamp for the span batch. + pub fn end_timestamp(&self) -> u64 { + self.batches.last().map_or(0, |b| b.timestamp) + } + /// Returns the epoch number for the first batch in the span. pub fn starting_epoch_num(&self) -> u64 { self.batches[0].epoch_num @@ -57,6 +62,89 @@ impl SpanBatch { self.parent_check == hash[..20] } + /// Perform holocene checks on the span batch. + pub async fn is_batch_holocene_valid( + &self, + l2_safe_head: L2BlockInfo, + block_time: u64, + fetcher: &mut BF, + ) -> bool { + let mut parent_num = l2_safe_head.block_info.number; + let mut parent_block = l2_safe_head; + let next_timestamp = l2_safe_head.block_info.timestamp + block_time; + + // If the span batch L1 origin check is not part of + // the canonical L1 chain, the span batch is invalid. + let starting_epoch_num = self.starting_epoch_num(); + if starting_epoch_num > parent_block.l1_origin.number + 1 { + warn!( + "batch is for future epoch too far ahead, while it has the next timestamp, so it must be invalid. starting epoch: {} | next epoch: {}", + starting_epoch_num, + parent_block.l1_origin.number + 1 + ); + return false; + } + // Check if the batch is too old. + if starting_epoch_num < parent_block.l1_origin.number { + warn!("dropped batch, epoch is too old, minimum: {:?}", parent_block.block_info.id()); + return false; + } + + // A failed parent check invalidates the span batch. + if self.timestamp() < next_timestamp { + if self.timestamp() > l2_safe_head.block_info.timestamp { + // Batch timestamp cannot be between safe head and next timestamp. + warn!("batch has misaligned timestamp, block time is too short"); + return false; + } + if (l2_safe_head.block_info.timestamp - self.timestamp()) % block_time != 0 { + warn!("batch has misaligned timestamp, not overlapped exactly"); + return false; + } + parent_num = l2_safe_head.block_info.number - + (l2_safe_head.block_info.timestamp - self.timestamp()) / block_time - + 1; + parent_block = match fetcher.l2_block_info_by_number(parent_num).await { + Ok(block) => block, + Err(e) => { + warn!("failed to fetch L2 block number {parent_num}: {e}"); + return false; + } + }; + } + if !self.check_parent_hash(parent_block.block_info.hash) { + warn!( + "parent block number mismatch, expected: {parent_num}, received: {}, parent hash: {}, self hash: {}", + parent_block.block_info.number, + parent_block.block_info.hash, + self.parent_check, + ); + return false; + } + + // If starting timestamp of the span > next_timestamp, the span batch is invalid, + // because we disallow gaps due to the new strict batch ordering rules. + if self.timestamp() > next_timestamp { + warn!( + "received out-of-order batch for future processing after next batch ({} > {})", + self.timestamp(), + next_timestamp + ); + return false; + } + + // If span_end.timestamp < next_timestamp, the span batch is invalid, + // as it doesn't contain any new batches. This would also happen if + // applying timestamp checks to each derived singular batch individually. + if self.end_timestamp() < next_timestamp { + warn!("span batch has no new blocks after safe head"); + return false; + } + + // The span batch is valid post-holocene! + true + } + /// Checks if the span batch is valid. pub async fn check_batch( &self, diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index b0bb88e7a4..ef71910449 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -89,6 +89,11 @@ pub enum PipelineError { /// Provider error variant. #[error("Blob provider error: {0}")] Provider(String), + /// An invalid span batch is found by the [BatchStream] stage. + /// + /// [BatchStream]: crate::stages::BatchStream + #[error("Invalid span batch")] + InvalidSpanBatch, } impl PipelineError { diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index f51556fe20..6c853ba586 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -17,8 +17,8 @@ type L1RetrievalStage = L1Retrieval>; type FrameQueueStage = FrameQueue>; type ChannelBankStage = ChannelBank>; type ChannelReaderStage = ChannelReader>; -type BatchStreamStage = BatchStream>; -type BatchQueueStage = BatchQueue, T>; +type BatchStreamStage = BatchStream, T>; +type BatchQueueStage = BatchQueue, T>; type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. @@ -134,7 +134,8 @@ where let frame_queue = FrameQueue::new(l1_retrieval, Arc::clone(&rollup_config)); let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config)); - let batch_stream = BatchStream::new(channel_reader, rollup_config.clone()); + let batch_stream = + BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); let batch_queue = BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); let attributes = diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 26386c25a8..6fffe01248 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -22,7 +22,7 @@ pub trait BatchQueueProvider { /// complete and the batch has been consumed, an [PipelineError::Eof] error is returned. /// /// [ChannelReader]: crate::stages::ChannelReader - async fn next_batch(&mut self) -> PipelineResult; + async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult; /// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream] /// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op. @@ -336,7 +336,7 @@ where // Load more data into the batch queue. let mut out_of_data = false; - match self.prev.next_batch().await { + match self.prev.next_batch(parent).await { Ok(b) => { if !origin_behind { self.add_batch(b, parent).await.ok(); diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index 69e89b7fa3..1357c2743a 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -4,14 +4,14 @@ use alloc::{boxed::Box, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::fmt::Debug; use op_alloy_genesis::{RollupConfig, SystemConfig}; -use op_alloy_protocol::BlockInfo; -use tracing::trace; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; +use tracing::{info, trace}; use crate::{ batch::{Batch, SingleBatch, SpanBatch}, - errors::{PipelineError, PipelineResult}, + errors::{PipelineEncodingError, PipelineError, PipelineResult}, stages::BatchQueueProvider, - traits::{OriginAdvancer, OriginProvider, ResettableStage}, + traits::{L2ChainProvider, OriginAdvancer, OriginProvider, ResettableStage}, }; /// [BatchStream] stage in the derivation pipeline. @@ -24,28 +24,49 @@ use crate::{ /// [ChannelReader]: crate::stages::ChannelReader /// [BatchQueue]: crate::stages::BatchQueue #[derive(Debug)] -pub struct BatchStream

+pub struct BatchStream where P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + F: L2ChainProvider + Debug, { /// The previous stage in the derivation pipeline. prev: P, + /// A reference to the rollup config, used to check + /// if the [BatchStream] stage should be activated. + config: Arc, + /// The l1 block ref + origin: Option, /// There can only be a single staged span batch. span: Option, /// A buffer of single batches derived from the [SpanBatch]. buffer: Vec, - /// A reference to the rollup config, used to check - /// if the [BatchStream] stage should be activated. - config: Arc, + /// A consecutive, time-centric window of L1 Blocks. + /// Every L1 origin of unsafe L2 Blocks must be included in this list. + /// If every L2 Block corresponding to a single L1 Block becomes safe, + /// the block is popped from this list. + /// If new L2 Block's L1 origin is not included in this list, fetch and + /// push it to the list. + l1_blocks: Vec, + /// Used to validate the batches. + fetcher: F, } -impl

BatchStream

+impl BatchStream where P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + F: L2ChainProvider + Debug, { /// Create a new [BatchStream] stage. - pub const fn new(prev: P, config: Arc) -> Self { - Self { prev, span: None, buffer: Vec::new(), config } + pub const fn new(prev: P, config: Arc, fetcher: F) -> Self { + Self { + prev, + config, + fetcher, + origin: None, + span: None, + buffer: Vec::new(), + l1_blocks: Vec::new(), + } } /// Returns if the [BatchStream] stage is active based on the @@ -55,17 +76,71 @@ where Ok(self.config.is_holocene_active(origin.timestamp)) } + /// Updates the in-memory list of L1 Blocks. + pub fn update_l1_blocks(&mut self, parent: L2BlockInfo) -> PipelineResult<()> { + // If the epoch is advanced, update the l1 blocks. + // Advancing epoch must be done after the pipeline successfully applies the entire span + // batch to the chain. + // Because the span batch can be reverted during processing the batch, then we must + // preserve existing l1 blocks to verify the epochs of the next candidate batch. + if !self.l1_blocks.is_empty() && parent.l1_origin.number > self.l1_blocks[0].number { + for (i, block) in self.l1_blocks.iter().enumerate() { + if parent.l1_origin.number == block.number { + self.l1_blocks.drain(0..i); + info!(target: "batch-stream", "Advancing epoch"); + break; + } + } + // If the origin of the parent block is not included, we must advance the origin. + } + + // NOTE: The origin is used to determine if it's behind. + // It is the future origin that gets saved into the l1 blocks array. + // We always update the origin of this stage if it's not the same so + // after the update code runs, this is consistent. + let origin_behind = + self.prev.origin().map_or(true, |origin| origin.number < parent.l1_origin.number); + + // Advance the origin if needed. + // The entire pipeline has the same origin. + // Batches prior to the l1 origin of the l2 safe head are not accepted. + if self.origin != self.prev.origin() { + self.origin = self.prev.origin(); + if !origin_behind { + let origin = match self.origin.as_ref().ok_or(PipelineError::MissingOrigin.crit()) { + Ok(o) => o, + Err(e) => return Err(e), + }; + self.l1_blocks.push(*origin); + } else { + // This is to handle the special case of startup. + // At startup, the batch queue is reset and includes the + // l1 origin. That is the only time where immediately after + // reset is called, the origin behind is false. + self.l1_blocks.clear(); + } + info!(target: "batch-stream", "Advancing batch queue origin: {:?}", self.origin); + } + + Ok(()) + } + /// Gets a [SingleBatch] from the in-memory buffer. - pub fn get_single_batch(&mut self) -> Option { - trace!(target: "batch_span", "Attempting to get a SingleBatch from buffer len: {}", self.buffer.len()); - unimplemented!() + pub fn get_single_batch(&mut self, parent: L2BlockInfo) -> Option { + if self.buffer.is_empty() { + return None; + } + let mut next = self.buffer.remove(0); + next.parent_hash = parent.block_info.hash; + Some(next) } } #[async_trait] -impl

BatchQueueProvider for BatchStream

+impl BatchQueueProvider for BatchStream where P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + F: L2ChainProvider + Send + Debug, { fn flush(&mut self) { if self.is_active().unwrap_or(false) { @@ -73,21 +148,24 @@ where } } - async fn next_batch(&mut self) -> PipelineResult { + async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { // If the stage is not active, "pass" the next batch // through this stage to the BatchQueue stage. if !self.is_active()? { trace!(target: "batch_span", "BatchStream stage is inactive, pass-through."); - return self.prev.next_batch().await; + return self.prev.next_batch(parent).await; } + // First update the in-memory list of L1 Block origins. + self.update_l1_blocks(parent)?; + // First, attempt to pull a SinguleBatch out of the buffer. - if let Some(b) = self.get_single_batch() { + if let Some(b) = self.get_single_batch(parent) { return Ok(Batch::Single(b)); } // Safety: bubble up any errors from the batch reader. - let batch = self.prev.next_batch().await?; + let batch = self.prev.next_batch(parent).await?; // If the next batch is a singular batch, it is immediately // forwarded to the `BatchQueue` stage. @@ -95,28 +173,47 @@ where return Ok(batch); }; - // Set the current span batch. - self.span = Some(b); + // Validate the span batch. + // If it is invalid, drop the batch and flush the channel. + // + // See: + if !b.is_batch_holocene_valid(parent, self.config.block_time, &mut self.fetcher).await { + self.prev.flush(); + return Err(PipelineError::InvalidSpanBatch.temp()); + } + + // Extract the singular batches from the span batch. + let batches = match b.get_singular_batches(&self.l1_blocks, parent).map_err(|e| { + PipelineError::BadEncoding(PipelineEncodingError::SpanBatchError(e)).crit() + }) { + Ok(b) => b, + Err(e) => { + return Err(e); + } + }; + self.buffer = batches; // Attempt to pull a SingleBatch out of the SpanBatch. - self.get_single_batch() + self.get_single_batch(parent) .map_or_else(|| Err(PipelineError::NotEnoughData.temp()), |b| Ok(Batch::Single(b))) } } #[async_trait] -impl

OriginAdvancer for BatchStream

+impl OriginAdvancer for BatchStream where P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + F: L2ChainProvider + Send + Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { self.prev.advance_origin().await } } -impl

OriginProvider for BatchStream

+impl OriginProvider for BatchStream where P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, + F: L2ChainProvider + Debug, { fn origin(&self) -> Option { self.prev.origin() @@ -124,13 +221,20 @@ where } #[async_trait] -impl

ResettableStage for BatchStream

+impl ResettableStage for BatchStream where - P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug + Send, + P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, + F: L2ChainProvider + Send + Debug, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> PipelineResult<()> { self.prev.reset(base, cfg).await?; + self.origin = Some(base); self.span.take(); + // Include the new origin as an origin to build on. + // This is only for the initialization case. + // During normal resets we will later throw out this block. + self.l1_blocks.clear(); + self.l1_blocks.push(base); crate::inc!(STAGE_RESETS, &["batch-span"]); Ok(()) } @@ -142,6 +246,7 @@ mod test { use crate::{ batch::SingleBatch, stages::test_utils::{CollectingLayer, MockBatchQueueProvider, TraceStorage}, + traits::test_utils::TestL2ChainProvider, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -154,13 +259,14 @@ mod test { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let config = Arc::new(RollupConfig { holocene_time: Some(100), ..RollupConfig::default() }); let prev = MockBatchQueueProvider::new(data); - let mut stream = BatchStream::new(prev, config.clone()); + let fetcher = TestL2ChainProvider::default(); + let mut stream = BatchStream::new(prev, config.clone(), fetcher); // The stage should not be active. assert!(!stream.is_active().unwrap()); // The next batch should be passed through to the [BatchQueue] stage. - let batch = stream.next_batch().await.unwrap(); + let batch = stream.next_batch(Default::default()).await.unwrap(); assert_eq!(batch, Batch::Single(SingleBatch::default())); let logs = trace_store.get_by_level(tracing::Level::TRACE); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index a357414a7f..71215f9c4f 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -13,7 +13,7 @@ use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; use op_alloy_genesis::{RollupConfig, SystemConfig}; -use op_alloy_protocol::BlockInfo; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; use tracing::{debug, error, warn}; /// ZLIB Deflate Compression Method. @@ -99,10 +99,18 @@ impl

BatchQueueProvider for ChannelReader

where P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { - fn flush(&mut self) { /* noop */ + /// This method is called by the BatchStream if an invalid span batch is found. + /// In the case of an invalid span batch, the associated channel must be flushed. + /// + /// See: + /// + /// SAFETY: Only called post-holocene activation. + fn flush(&mut self) { + debug!(target: "channel-reader", "[POST-HOLOCENE] Flushing channel"); + self.next_channel(); } - async fn next_batch(&mut self) -> PipelineResult { + async fn next_batch(&mut self, _: L2BlockInfo) -> PipelineResult { crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer); if let Err(e) = self.set_batch_reader().await { debug!(target: "channel-reader", "Failed to set batch reader: {:?}", e); @@ -247,7 +255,7 @@ mod test { async fn test_next_batch_batch_reader_set_fails() { let mock = MockChannelReaderProvider::new(vec![Err(PipelineError::Eof.temp())]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - assert_eq!(reader.next_batch().await, Err(PipelineError::Eof.temp())); + assert_eq!(reader.next_batch(Default::default()).await, Err(PipelineError::Eof.temp())); assert!(reader.next_batch.is_none()); } @@ -256,7 +264,7 @@ mod test { let mock = MockChannelReaderProvider::new(vec![Ok(None)]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); assert!(matches!( - reader.next_batch().await.unwrap_err(), + reader.next_batch(Default::default()).await.unwrap_err(), PipelineErrorKind::Temporary(PipelineError::ChannelReaderEmpty) )); assert!(reader.next_batch.is_none()); @@ -268,7 +276,10 @@ mod test { let second = first.split_to(first.len() / 2); let mock = MockChannelReaderProvider::new(vec![Ok(Some(first)), Ok(Some(second))]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - assert_eq!(reader.next_batch().await, Err(PipelineError::NotEnoughData.temp())); + assert_eq!( + reader.next_batch(Default::default()).await, + Err(PipelineError::NotEnoughData.temp()) + ); assert!(reader.next_batch.is_none()); } @@ -277,7 +288,7 @@ mod test { let raw = new_compressed_batch_data(); let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - let res = reader.next_batch().await.unwrap(); + let res = reader.next_batch(Default::default()).await.unwrap(); matches!(res, Batch::Span(_)); assert!(reader.next_batch.is_some()); } diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index e0bcc94508..2bc038b5f4 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -9,7 +9,7 @@ use crate::{ use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; use op_alloy_genesis::SystemConfig; -use op_alloy_protocol::BlockInfo; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; /// A mock provider for the [BatchQueue] stage. #[derive(Debug, Default)] @@ -38,7 +38,7 @@ impl BatchQueueProvider for MockBatchQueueProvider { fn flush(&mut self) { /* noop */ } - async fn next_batch(&mut self) -> PipelineResult { + async fn next_batch(&mut self, _: L2BlockInfo) -> PipelineResult { self.batches.pop().ok_or(PipelineError::Eof.temp())? } } diff --git a/crates/providers-alloy/src/pipeline.rs b/crates/providers-alloy/src/pipeline.rs index 4b76e5f73c..881c24cde7 100644 --- a/crates/providers-alloy/src/pipeline.rs +++ b/crates/providers-alloy/src/pipeline.rs @@ -40,6 +40,7 @@ pub type OnlineAttributesQueue = AttributesQueue< ChannelReader< ChannelBank>>>, >, + AlloyL2ChainProvider, >, AlloyL2ChainProvider, >,