From bc5ce8f081716646912904f680bb450e08326ffb Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 11:14:14 -0400 Subject: [PATCH 1/5] feat(derive): batch queue testing --- crates/derive/src/stages/batch_queue.rs | 72 ++++++++++++------- crates/derive/src/stages/mod.rs | 5 +- .../src/stages/test_utils/channel_reader.rs | 28 ++++++++ crates/derive/src/stages/test_utils/mod.rs | 4 ++ crates/derive/src/traits/data_sources.rs | 6 ++ crates/derive/src/traits/mod.rs | 2 +- 6 files changed, 90 insertions(+), 27 deletions(-) create mode 100644 crates/derive/src/stages/test_utils/channel_reader.rs create mode 100644 crates/derive/src/stages/test_utils/mod.rs diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index d7fc4498b7..02872af547 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -1,21 +1,29 @@ //! This module contains the `BatchQueue` stage implementation. use crate::{ - stages::channel_reader::ChannelReader, traits::{ - ChainProvider, DataAvailabilityProvider, ResettableStage, SafeBlockFetcher, - TelemetryProvider, + OriginProvider, ResettableStage, SafeBlockFetcher, + TelemetryProvider, LogLevel, }, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, }, }; +use alloy_primitives::Bytes; use alloc::{boxed::Box, vec::Vec}; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; +/// A [Batch] provider for the [BatchQueue] stage. +/// Concretely, this is the previous stage in the pipeline. +#[async_trait] +pub trait BatchQueueProvider { + /// Pulls out the next [Batch] from the available channel. + async fn next_batch(&mut self) -> StageResult; +} + /// [BatchQueue] is responsible for o rdering unordered batches /// and gnerating empty batches when the sequence window has passed. /// @@ -31,20 +39,22 @@ use core::fmt::Debug; /// It is internally responsible for making sure that batches with L1 inclusions block outside it's /// working range are not considered or pruned. #[derive(Debug)] -pub struct BatchQueue +pub struct BatchQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: BatchQueueProvider + OriginProvider + Debug, BF: SafeBlockFetcher + Debug, T: TelemetryProvider + Debug, { /// The rollup config. cfg: RollupConfig, /// The previous stage of the derivation pipeline. - prev: ChannelReader, + prev: P, /// The l1 block ref origin: Option, + /// Telemetry + telemetry: T, + /// A consecutive, time-centric window of L1 Blocks. /// Every L1 origin of unsafe L2 Blocks must be included in this list. /// If every L2 Block corresponding to a single L1 Block becomes safe, @@ -63,19 +73,30 @@ where fetcher: BF, } -impl BatchQueue +impl OriginProvider for BatchQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: BatchQueueProvider + OriginProvider + Debug, + BF: SafeBlockFetcher + Debug, + T: TelemetryProvider + Debug, +{ + fn origin(&self) -> Option<&BlockInfo> { + self.origin.as_ref() + } +} + +impl BatchQueue +where + P: BatchQueueProvider + OriginProvider + Debug, BF: SafeBlockFetcher + Debug, T: TelemetryProvider + Debug, { /// Creates a new [BatchQueue] stage. - pub fn new(cfg: RollupConfig, prev: ChannelReader, fetcher: BF) -> Self { + pub fn new(cfg: RollupConfig, prev: P, telemetry: T, fetcher: BF) -> Self { Self { cfg, prev, origin: None, + telemetry, l1_blocks: Vec::new(), batches: Vec::new(), next_spans: Vec::new(), @@ -83,11 +104,6 @@ where } } - /// Returns the L1 origin [BlockInfo]. - pub fn origin(&self) -> Option<&BlockInfo> { - self.prev.origin() - } - /// Pops the next batch from the current queued up span-batch cache. /// The parent is used to set the parent hash of the batch. /// The parent is verified when the batch is later validated. @@ -114,9 +130,12 @@ where // Parent block does not match the next batch. // Means the previously returned batch is invalid. // Drop cached batches and find another batch. + self.telemetry.write( + Bytes::from( + alloc::format!("Parent block does not match the next batch. Dropping {} cached batches.", self.next_spans.len())), + LogLevel::Warning, + ); self.next_spans.clear(); - // TODO: log that the provided parent block does not match the next batch. - // TODO: metrice the internal batch drop. } // If the epoch is advanced, update the l1 blocks. @@ -128,8 +147,7 @@ where for (i, block) in self.l1_blocks.iter().enumerate() { if parent.l1_origin.number == block.number { self.l1_blocks.drain(0..i); - // TODO: log that the pipelien has advanced the epoch. - // TODO: metrice the internal epoch advancement. + self.telemetry.write(Bytes::from("Adancing epoch"), LogLevel::Info); break; } } @@ -158,7 +176,12 @@ where // reset is called, the origin behind is false. self.l1_blocks.clear(); } - // TODO: log batch queue origin advancement. + self.telemetry.write( + Bytes::from( + alloc::format!("Batch queue advanced origin: {:?}", self.origin) + ), + LogLevel::Info, + ); } // Load more data into the batch queue. @@ -168,7 +191,7 @@ where if !origin_behind { self.add_batch(b, parent).ok(); } else { - // TODO: metrice when the batch is dropped because the origin is behind. + self.telemetry.write(Bytes::from("[Batch Dropped]: Origin is behind"), LogLevel::Warning); } } Err(StageError::Eof) => out_of_data = true, @@ -349,10 +372,9 @@ where } #[async_trait] -impl ResettableStage for BatchQueue +impl ResettableStage for BatchQueue where - DAP: DataAvailabilityProvider + Send + Debug, - CP: ChainProvider + Send + Debug, + P: BatchQueueProvider + OriginProvider + Send + Debug, BF: SafeBlockFetcher + Send + Debug, T: TelemetryProvider + Send + Debug, { diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 1717c306c4..8e24d30289 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -28,7 +28,10 @@ mod channel_reader; pub use channel_reader::ChannelReader; mod batch_queue; -pub use batch_queue::BatchQueue; +pub use batch_queue::{BatchQueue, BatchQueueProvider}; mod engine_queue; mod payload_derivation; + +#[cfg(test)] +pub mod test_utils; diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs new file mode 100644 index 0000000000..d348c575d5 --- /dev/null +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -0,0 +1,28 @@ +//! Test [ChannelReader] utilities and mock implementation. + +use alloc::vec::Vec; +use alloc::boxed::Box; +use async_trait::async_trait; +use crate::stages::BatchQueueProvider; +use crate::traits::OriginProvider; +use crate::types::{StageResult, BlockInfo, StageError, Batch}; + +/// A mock implementation of [ChannelReader] for testing purposes. +#[derive(Debug, Default)] +pub struct MockChannelReader { + /// The list of batches to return. + pub batches: Vec>, +} + +#[async_trait] +impl BatchQueueProvider for MockChannelReader { + async fn next_batch(&mut self) -> StageResult { + self.batches.pop().unwrap_or(Err(StageError::NotEnoughData)) + } +} + +impl OriginProvider for MockChannelReader { + fn origin(&self) -> Option<&BlockInfo> { + None + } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs new file mode 100644 index 0000000000..bd5d8801a7 --- /dev/null +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -0,0 +1,4 @@ +//! Test utilities for stages. + +mod channel_reader; +pub use channel_reader::MockChannelReader; diff --git a/crates/derive/src/traits/data_sources.rs b/crates/derive/src/traits/data_sources.rs index 89ea08d5ec..682fa6c8c0 100644 --- a/crates/derive/src/traits/data_sources.rs +++ b/crates/derive/src/traits/data_sources.rs @@ -8,6 +8,12 @@ use anyhow::Result; use async_trait::async_trait; use core::fmt::Debug; +/// Provides a method for accessing the pipeline origin. +pub trait OriginProvider { + /// Returns the optional L1 [BlockInfo] origin. + fn origin(&self) -> Option<&BlockInfo>; +} + /// Describes the functionality of a data source that can provide information from the blockchain. #[async_trait] pub trait ChainProvider { diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index b2db574c5f..6700afb024 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod data_sources; -pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter, SafeBlockFetcher}; +pub use data_sources::*; mod stages; pub use stages::ResettableStage; From 650f930c02434d1242ed810c91d6e1955b301b11 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 11:32:50 -0400 Subject: [PATCH 2/5] fix(derive): lints --- crates/derive/src/stages/batch_queue.rs | 20 +++++++++---------- .../src/stages/test_utils/channel_reader.rs | 11 +++++----- crates/derive/src/stages/test_utils/mod.rs | 2 +- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index d7d02bef04..dbaaa27968 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -1,17 +1,14 @@ //! This module contains the `BatchQueue` stage implementation. use crate::{ - traits::{ - OriginProvider, ResettableStage, SafeBlockFetcher, - TelemetryProvider, LogLevel, - }, + traits::{LogLevel, OriginProvider, ResettableStage, SafeBlockFetcher, TelemetryProvider}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, }, }; -use alloy_primitives::Bytes; use alloc::{boxed::Box, vec::Vec}; +use alloy_primitives::Bytes; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; @@ -136,8 +133,10 @@ where // Means the previously returned batch is invalid. // Drop cached batches and find another batch. self.telemetry.write( - Bytes::from( - alloc::format!("Parent block does not match the next batch. Dropping {} cached batches.", self.next_spans.len())), + Bytes::from(alloc::format!( + "Parent block does not match the next batch. Dropping {} cached batches.", + self.next_spans.len() + )), LogLevel::Warning, ); self.next_spans.clear(); @@ -182,9 +181,7 @@ where self.l1_blocks.clear(); } self.telemetry.write( - Bytes::from( - alloc::format!("Batch queue advanced origin: {:?}", self.origin) - ), + Bytes::from(alloc::format!("Batch queue advanced origin: {:?}", self.origin)), LogLevel::Info, ); } @@ -196,7 +193,8 @@ where if !origin_behind { self.add_batch(b, parent).ok(); } else { - self.telemetry.write(Bytes::from("[Batch Dropped]: Origin is behind"), LogLevel::Warning); + self.telemetry + .write(Bytes::from("[Batch Dropped]: Origin is behind"), LogLevel::Warning); } } Err(StageError::Eof) => out_of_data = true, diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index d348c575d5..ec8335c9e7 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -1,11 +1,12 @@ //! Test [ChannelReader] utilities and mock implementation. -use alloc::vec::Vec; -use alloc::boxed::Box; +use crate::{ + stages::BatchQueueProvider, + traits::OriginProvider, + types::{Batch, BlockInfo, StageError, StageResult}, +}; +use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; -use crate::stages::BatchQueueProvider; -use crate::traits::OriginProvider; -use crate::types::{StageResult, BlockInfo, StageError, Batch}; /// A mock implementation of [ChannelReader] for testing purposes. #[derive(Debug, Default)] diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs index 1698056ebc..238666f57d 100644 --- a/crates/derive/src/stages/test_utils/mod.rs +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -8,4 +8,4 @@ mod batch_queue; pub use batch_queue::{new_mock_batch_queue, MockBatchQueue}; mod attributes_queue; -pub use attributes_queue::MockAttributesBuilder; \ No newline at end of file +pub use attributes_queue::MockAttributesBuilder; From 2284d2f387f01e4ad7d05e4c188f7fc2effb7f17 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 12:03:55 -0400 Subject: [PATCH 3/5] fix(derive): merge conflicts --- crates/derive/src/stages/batch_queue.rs | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index c89911a801..d176489f8a 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -1,11 +1,7 @@ //! This module contains the `BatchQueue` stage implementation. use crate::{ - stages::channel_reader::ChannelReader, - traits::{ - ChainProvider, DataAvailabilityProvider, OriginProvider, ResettableStage, SafeBlockFetcher, - TelemetryProvider, LogLevel, - }, + traits::{LogLevel, OriginProvider, ResettableStage, SafeBlockFetcher, TelemetryProvider}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -74,17 +70,6 @@ where fetcher: BF, } -impl OriginProvider for BatchQueue -where - P: BatchQueueProvider + OriginProvider + Debug, - BF: SafeBlockFetcher + Debug, - T: TelemetryProvider + Debug, -{ - fn origin(&self) -> Option<&BlockInfo> { - self.origin.as_ref() - } -} - impl BatchQueue where P: BatchQueueProvider + OriginProvider + Debug, @@ -378,10 +363,9 @@ where } } -impl OriginProvider for BatchQueue +impl OriginProvider for BatchQueue where - DAP: DataAvailabilityProvider + Debug, - CP: ChainProvider + Debug, + P: BatchQueueProvider + OriginProvider + Debug, BF: SafeBlockFetcher + Debug, T: TelemetryProvider + Debug, { From 9417d2e43df594f0ceed92434730867dfebe7878 Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 5 Apr 2024 08:18:51 -0400 Subject: [PATCH 4/5] fix(derive): bad upstream sync --- crates/derive/src/stages/test_utils/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs index 544a71810a..69516c25a3 100644 --- a/crates/derive/src/stages/test_utils/mod.rs +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -1,9 +1,6 @@ //! Test utilities for the stages module primarily contains //! mock implementations of the various stages for testing. -mod channel_reader; -pub use channel_reader::MockChannelReader; - mod batch_queue; pub use batch_queue::MockBatchQueueProvider; From e902151c7a77e464e26f0185f0c6b5d739f2babb Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 5 Apr 2024 08:24:46 -0400 Subject: [PATCH 5/5] fix(derive): batch queue logging --- crates/derive/src/stages/batch_queue.rs | 54 +++++++++++++++++++------ 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 88cb929fcb..2b5c73ffdb 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -117,8 +117,10 @@ where // Get the epoch let epoch = self.l1_blocks[0]; - // TODO: log that the next batch is being derived. - // TODO: metrice the time it takes to derive the next batch. + self.telemetry.write( + Bytes::from(alloc::format!("Deriving next batch for epoch: {}", epoch.number)), + LogLevel::Info, + ); // Note: epoch origin can now be one block ahead of the L2 Safe Head // This is in the case where we auto generate all batches in an epoch & advance the epoch @@ -149,9 +151,14 @@ where remaining.push(batch.clone()); } BatchValidity::Drop => { - // TODO: Log the drop reason with WARN level. - // batch.log_context(self.log).warn("Dropping batch", "parent", parent.id(), - // "parent_time", parent.info.time); + self.telemetry.write( + Bytes::from(alloc::format!( + "Dropping batch: {:?}, parent: {}", + batch.batch, + parent.block_info + )), + LogLevel::Warning, + ); continue; } BatchValidity::Accept => { @@ -171,7 +178,10 @@ where self.batches = remaining; if let Some(nb) = next_batch { - // TODO: log that the next batch is found. + self.telemetry.write( + Bytes::from(alloc::format!("Next batch found: {:?}", nb.batch)), + LogLevel::Info, + ); return Ok(nb.batch); } @@ -182,8 +192,6 @@ where expiry_epoch < parent.l1_origin.number; let first_of_epoch = epoch.number == parent.l1_origin.number + 1; - // TODO: Log the empty batch generation. - // If the sequencer window did not expire, // there is still room to receive batches for the current epoch. // No need to force-create empty batch(es) towards the next epoch yet. @@ -191,6 +199,15 @@ where return Err(StageError::Eof); } + self.telemetry.write( + Bytes::from(alloc::format!( + "Generating empty batches. Epoch: {}, Parent: {}", + epoch.number, + parent.l1_origin.number + )), + LogLevel::Info, + ); + // The next L1 block is needed to proceed towards the next epoch. if self.l1_blocks.len() < 2 { return Err(StageError::Eof); @@ -202,7 +219,10 @@ where // to preserve that L2 time >= L1 time. If this is the first block of the epoch, always // generate a batch to ensure that we at least have one batch per epoch. if next_timestamp < next_epoch.timestamp || first_of_epoch { - // TODO: log next batch generation. + self.telemetry.write( + Bytes::from(alloc::format!("Generating empty batch for epoch: {}", epoch.number)), + LogLevel::Info, + ); return Ok(Batch::Single(SingleBatch { parent_hash: parent.block_info.hash, epoch_num: epoch.number, @@ -214,9 +234,15 @@ where // At this point we have auto generated every batch for the current epoch // that we can, so we can advance to the next epoch. - // TODO: log that the epoch is advanced. - // bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, - // "next_epoch_time", nextEpoch.Time) + self.telemetry.write( + Bytes::from(alloc::format!( + "Advancing to next epoch: {}, timestamp: {}, epoch timestamp: {}", + next_epoch.number, + next_timestamp, + next_epoch.timestamp + )), + LogLevel::Info, + ); self.l1_blocks.remove(0); Err(StageError::Eof) } @@ -224,7 +250,8 @@ where /// Adds a batch to the queue. pub fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> { if self.l1_blocks.is_empty() { - // TODO: log that the batch cannot be added without an origin + self.telemetry + .write(Bytes::from("Cannot add batch without an origin"), LogLevel::Error); panic!("Cannot add batch without an origin"); } let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?; @@ -453,6 +480,7 @@ mod tests { // TODO(refcell): The batch reader here loops forever. // Maybe the cursor isn't being used? + // UPDATE: the batch data is not valid // #[tokio::test] // async fn test_next_batch_succeeds() { // let mut reader = new_batch_reader();