diff --git a/Cargo.lock b/Cargo.lock index c1ed1d98ce..6af68b111a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1691,9 +1691,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.33" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324a1be68054ef05ad64b861cc9eaf1d623d2d8cb25b4bf2cb9cdd902b4bf253" +checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" dependencies = [ "crc32fast", "miniz_oxide", diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 70d4db4c7a..30b462b2c5 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -16,8 +16,8 @@ use kona_derive::{ pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ - AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, - L1Traversal, + AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, }, traits::{BlobProvider, ChainProvider, L2ChainProvider, OriginProvider}, }; @@ -46,8 +46,10 @@ pub type OracleAttributesBuilder = /// An oracle-backed attributes queue for the derivation pipeline. pub type OracleAttributesQueue = AttributesQueue< BatchQueue< - ChannelReader< - ChannelBank>>>>, + BatchStream< + ChannelReader< + ChannelBank>>>>, + >, >, OracleL2ChainProvider, >, diff --git a/crates/derive/src/online/pipeline.rs b/crates/derive/src/online/pipeline.rs index 9213c01d48..4eb634400c 100644 --- a/crates/derive/src/online/pipeline.rs +++ b/crates/derive/src/online/pipeline.rs @@ -11,7 +11,8 @@ use op_alloy_protocol::BlockInfo; // Pipeline internal stages aren't re-exported at the module-level. use crate::stages::{ - AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, + AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, }; /// An online derivation pipeline. @@ -32,7 +33,11 @@ pub type OnlineAttributesBuilder = /// An `online` attributes queue for the derivation pipeline. pub type OnlineAttributesQueue = AttributesQueue< BatchQueue< - ChannelReader>>>>, + BatchStream< + ChannelReader< + ChannelBank>>>, + >, + >, AlloyL2ChainProvider, >, OnlineAttributesBuilder, diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index c9293b6402..89e85fa50d 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -4,7 +4,8 @@ use super::{ AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, L2ChainProvider, }; use crate::stages::{ - AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, + AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, }; use alloc::sync::Arc; use core::fmt::Debug; @@ -16,7 +17,8 @@ type L1RetrievalStage = L1Retrieval>; type FrameQueueStage = FrameQueue>; type ChannelBankStage = ChannelBank>; type ChannelReaderStage = ChannelReader>; -type BatchQueueStage = BatchQueue, T>; +type BatchStreamStage = BatchStream>; +type BatchQueueStage = BatchQueue, T>; type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. @@ -132,8 +134,9 @@ where let frame_queue = FrameQueue::new(l1_retrieval); let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config)); + let batch_stream = BatchStream::new(channel_reader, rollup_config.clone()); let batch_queue = - BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider.clone()); + BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); let attributes = AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder); diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 6f80f0323c..26386c25a8 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -23,6 +23,10 @@ pub trait BatchQueueProvider { /// /// [ChannelReader]: crate::stages::ChannelReader async fn next_batch(&mut self) -> PipelineResult; + + /// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream] + /// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op. + fn flush(&mut self); } /// [BatchQueue] is responsible for o rdering unordered batches @@ -146,6 +150,9 @@ where remaining.push(batch.clone()); } BatchValidity::Drop => { + // If we drop a batch, flush previous batches buffered in the BatchStream + // stage. + self.prev.flush(); warn!(target: "batch-queue", "Dropping batch with parent: {}", parent.block_info); continue; } @@ -233,6 +240,7 @@ where let data = BatchWithInclusionBlock { inclusion_block: origin, batch }; // If we drop the batch, validation logs the drop reason with WARN level. if data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await.is_drop() { + self.prev.flush(); return Ok(()); } self.batches.push(data); diff --git a/crates/derive/src/stages/batch_stream.rs b/crates/derive/src/stages/batch_stream.rs index 419cb5f522..69e89b7fa3 100644 --- a/crates/derive/src/stages/batch_stream.rs +++ b/crates/derive/src/stages/batch_stream.rs @@ -67,6 +67,12 @@ impl

BatchQueueProvider for BatchStream

where P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { + fn flush(&mut self) { + if self.is_active().unwrap_or(false) { + self.buffer.clear(); + } + } + async fn next_batch(&mut self) -> PipelineResult { // If the stage is not active, "pass" the next batch // through this stage to the BatchQueue stage. diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 310a8128d5..a357414a7f 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -99,6 +99,9 @@ impl

BatchQueueProvider for ChannelReader

where P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug, { + fn flush(&mut self) { /* noop */ + } + async fn next_batch(&mut self) -> PipelineResult { crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer); if let Err(e) = self.set_batch_reader().await { diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index 036c97e3c1..e0bcc94508 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -35,6 +35,9 @@ impl OriginProvider for MockBatchQueueProvider { #[async_trait] impl BatchQueueProvider for MockBatchQueueProvider { + fn flush(&mut self) { /* noop */ + } + async fn next_batch(&mut self) -> PipelineResult { self.batches.pop().ok_or(PipelineError::Eof.temp())? }