From ae4166a601cff9906062171d949aeaf0998e46bd Mon Sep 17 00:00:00 2001 From: refcell Date: Mon, 22 Apr 2024 14:33:26 -0400 Subject: [PATCH 1/2] fix(derive): ugly refactor --- crates/derive/src/builder.rs | 133 ++++++++++-------- crates/derive/src/lib.rs | 5 + crates/derive/src/stack.rs | 128 +++++++++++++++++ crates/derive/src/stages/attributes_queue.rs | 31 +++- crates/derive/src/stages/batch_queue.rs | 41 ++++-- crates/derive/src/stages/channel_bank.rs | 30 +++- crates/derive/src/stages/channel_reader.rs | 35 ++++- crates/derive/src/stages/frame_queue.rs | 30 +++- crates/derive/src/stages/l1_retrieval.rs | 33 +++-- crates/derive/src/stages/l1_traversal.rs | 61 ++++++-- crates/derive/src/stages/mod.rs | 2 +- .../src/stages/test_utils/attributes_queue.rs | 19 ++- .../src/stages/test_utils/batch_queue.rs | 19 ++- .../src/stages/test_utils/channel_bank.rs | 21 ++- .../src/stages/test_utils/channel_reader.rs | 21 ++- .../src/stages/test_utils/frame_queue.rs | 21 ++- crates/derive/src/traits/mod.rs | 2 +- crates/derive/src/traits/stages.rs | 9 ++ crates/derive/src/types/payload.rs | 2 +- 19 files changed, 508 insertions(+), 135 deletions(-) create mode 100644 crates/derive/src/stack.rs diff --git a/crates/derive/src/builder.rs b/crates/derive/src/builder.rs index 07b1f937ff..943af642c9 100644 --- a/crates/derive/src/builder.rs +++ b/crates/derive/src/builder.rs @@ -1,29 +1,51 @@ //! Contains a concrete implementation of the [DerivationPipeline]. use crate::{ - stages::{ - AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, - L1Retrieval, L1Traversal, NextAttributes, + stages::NextAttributes, + traits::ResettableStage, + types::{ + BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig, }, - traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider}, - types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult}, }; -use alloc::sync::Arc; +use alloc::{boxed::Box, collections::VecDeque}; +use async_trait::async_trait; use core::fmt::Debug; +/// Provides the [BlockInfo] and [SystemConfig] for the stack to reset the stages. +#[async_trait] +pub trait ResetProvider { + /// Returns the current [BlockInfo] for the pipeline to reset. + async fn block_info(&self) -> BlockInfo; + + /// Returns the current [SystemConfig] for the pipeline to reset. + async fn system_config(&self) -> SystemConfig; +} + /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] -pub struct DerivationPipeline { - /// The attributes queue to retrieve the next attributes. - pub attributes: N, +pub struct DerivationPipeline< + S: NextAttributes + ResettableStage + Debug + Send, + R: ResetProvider + Send, +> { + /// The stack of stages in the pipeline. + /// The stack is reponsible for advancing the L1 traversal stage. + pub stack: S, + /// Reset provider for the pipeline. + pub reset: R, + /// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer. + pub prepared: VecDeque, + /// A flag to tell the pipeline to reset. + pub needs_reset: bool, /// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes. pub cursor: L2BlockInfo, } -impl DerivationPipeline { +impl + DerivationPipeline +{ /// Creates a new instance of the [DerivationPipeline]. - pub fn new(attributes: N, cursor: L2BlockInfo) -> Self { - Self { attributes, cursor } + pub fn new(stack: S, reset: R, cursor: L2BlockInfo) -> Self { + Self { stack, prepared: VecDeque::new(), reset, needs_reset: false, cursor } } /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. @@ -31,56 +53,51 @@ impl DerivationPipeline { self.cursor = cursor; } - /// Get the next attributes from the pipeline. - pub async fn next(&mut self) -> StageResult { - self.attributes.next_attributes(self.cursor).await + /// Returns the next [L2AttributesWithParent] from the pipeline. + pub fn next_attributes(&mut self) -> Option { + self.prepared.pop_front() } -} -impl DerivationPipeline> -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Creates a new instance of the [DerivationPipeline] from the given attributes. - pub fn new_online_pipeline( - attributes: KonaAttributes, - cursor: L2BlockInfo, - ) -> Self { - Self::new(attributes, cursor) + /// Flags the pipeline to reset on the next [DerivationPipeline::step] call. + pub fn reset(&mut self) { + self.needs_reset = true; } -} -/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type. -pub type KonaDerivationPipeline = DerivationPipeline>; + /// Attempts to progress the pipeline. + /// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data. + /// Any other error is critical and the derivation pipeline should be reset. + /// An error is expected when the underlying source closes. + /// When [DerivationPipeline::step] returns [Ok(())], it should be called again, to continue the + /// derivation process. + pub async fn step(&mut self) -> StageResult<()> { + tracing::info!("DerivationPipeline::step"); -/// [KonaAttributes] is a concrete [NextAttributes] type. -pub type KonaAttributes = AttributesQueue< - BatchQueue>>>>, F>, - B, ->; + // Reset the pipeline if needed. + if self.needs_reset { + let block_info = self.reset.block_info().await; + let system_config = self.reset.system_config().await; + self.stack.reset(block_info, &system_config).await?; + self.needs_reset = false; + } -/// Creates a new [KonaAttributes] instance. -pub fn new_online_pipeline( - rollup_config: Arc, - chain_provider: P, - dap_source: DAP, - fetcher: F, - builder: B, -) -> KonaAttributes -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone()); - let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval); - let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); - let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); - let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); - AttributesQueue::new(*rollup_config, batch_queue, builder) + // Step over the engine queue. + match self.stack.next_attributes(self.cursor).await { + Ok(a) => { + tracing::info!("attributes queue stage step returned l2 attributes"); + tracing::info!("prepared L2 attributes: {:?}", a); + self.prepared.push_back(a); + return Ok(()); + } + Err(StageError::Eof) => { + tracing::info!("attributes queue stage complete"); + } + // TODO: match on the EngineELSyncing error here and log + Err(err) => { + tracing::error!("attributes queue stage failed: {:?}", err); + return Err(err); + } + } + + Ok(()) + } } diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index 17001815ee..cb6b6b29f0 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -29,6 +29,11 @@ pub mod stages; pub mod traits; pub mod types; +#[cfg(feature = "online")] +mod stack; +#[cfg(feature = "online")] +pub use stack::*; + #[cfg(feature = "online")] mod online; #[cfg(feature = "online")] diff --git a/crates/derive/src/stack.rs b/crates/derive/src/stack.rs new file mode 100644 index 0000000000..3e15614e13 --- /dev/null +++ b/crates/derive/src/stack.rs @@ -0,0 +1,128 @@ +//! Contains a stack of Stages for the [crate::DerivationPipeline]. + +use crate::{ + stages::{ + AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, NextAttributes, + }, + traits::{ + ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage, + }, + types::{ + BlockInfo, L2AttributesWithParent, L2BlockInfo, RollupConfig, StageError, StageResult, + SystemConfig, + }, +}; +use alloc::{boxed::Box, sync::Arc}; +use async_trait::async_trait; +use core::fmt::Debug; +use spin::Mutex; + +/// The [AttributesQueue] type alias. +pub type AttributesQueueType = AttributesQueue< + BatchQueue>>>>, F>, + B, +>; + +/// An online stack of stages. +#[derive(Debug)] +pub struct OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Flag to tell the L1Traversal stage to advance to the next L1 block. + pub advance: Arc>, + /// The [AttributesQueue] stage. + pub attributes: AttributesQueueType, +} + +impl OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Creates a new [OnlineStageStack]. + pub fn new( + rollup_config: Arc, + chain_provider: P, + dap_source: DAP, + fetcher: F, + builder: B, + ) -> Self { + let advance = Arc::new(Mutex::new(false)); + let l1_traversal = + L1Traversal::new(chain_provider, Arc::clone(&advance), rollup_config.clone()); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); + let frame_queue = FrameQueue::new(l1_retrieval); + let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); + let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); + let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); + let attributes = AttributesQueue::new(*rollup_config, batch_queue, builder); + Self { advance, attributes } + } +} + +#[async_trait] +impl NextAttributes for OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Advances the pipeline to the next attributes. + async fn next_attributes( + &mut self, + parent: L2BlockInfo, + ) -> StageResult { + match self.attributes.next_attributes(parent).await { + Ok(a) => { + tracing::info!("attributes queue stage step returned l2 attributes"); + tracing::info!("prepared L2 attributes: {:?}", a); + return Ok(a); + } + Err(StageError::Eof) => { + tracing::info!("attributes queue stage complete"); + let mut advance = self.advance.lock(); + *advance = true; + return Err(StageError::Eof); + } + // TODO: match on the EngineELSyncing error here and log + Err(err) => { + tracing::error!("attributes queue stage failed: {:?}", err); + return Err(err); + } + } + } +} + +#[async_trait] +impl ResettableStage for OnlineStageStack +where + P: ChainProvider + Clone + Debug + Send, + DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + F: L2ChainProvider + Clone + Debug + Send, + B: AttributesBuilder + Clone + Debug + Send, +{ + /// Resets all stages in the stack. + async fn reset(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { + match self.attributes.reset(bi, sc).await { + Ok(()) => { + tracing::info!("Stages reset"); + } + Err(StageError::Eof) => { + tracing::info!("Stages reset with EOF"); + } + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + return Err(err); + } + } + Ok(()) + } +} diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index ac83943eac..d0b2823f78 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,7 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - traits::{OriginProvider, ResettableStage}, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -53,7 +53,7 @@ pub trait NextAttributes { #[derive(Debug)] pub struct AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + ResettableStage + PreviousStage + OriginProvider + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. @@ -70,7 +70,7 @@ where impl AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. @@ -147,10 +147,22 @@ where } } +impl PreviousStage for AttributesQueue +where + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + AB: AttributesBuilder + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&P> { + Some(&self.prev) + } +} + #[async_trait] impl NextAttributes for AttributesQueue where - P: AttributesProvider + OriginProvider + Debug + Send, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, AB: AttributesBuilder + Debug + Send, { async fn next_attributes( @@ -163,7 +175,7 @@ where impl OriginProvider for AttributesQueue where - P: AttributesProvider + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, AB: AttributesBuilder + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -174,10 +186,15 @@ where #[async_trait] impl ResettableStage for AttributesQueue where - P: AttributesProvider + OriginProvider + Send + Debug, + P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, AB: AttributesBuilder + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; info!("resetting attributes queue"); self.batch = None; self.is_last_in_span = false; diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 194c90c9c6..7d66c5ad19 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::AttributesProvider, - traits::{L2ChainProvider, OriginProvider, ResettableStage}, + traits::{L2ChainProvider, OriginProvider, PreviousStage, ResettableStage}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -43,7 +43,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -75,7 +75,7 @@ where impl BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -245,7 +245,7 @@ where #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -374,7 +374,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -382,13 +382,26 @@ where } } +impl PreviousStage for BatchQueue +where + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + BF: L2ChainProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl ResettableStage for BatchQueue where - P: BatchQueueProvider + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, BF: L2ChainProvider + Send + Debug, { - async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, system_config).await?; // Copy over the Origin from the next stage. // It is set in the engine queue (two stages away) // such that the L2 Safe Head origin is the progress. @@ -602,6 +615,7 @@ mod tests { let fetcher = MockBlockFetcher { blocks: vec![block_nine, block_seven], payloads: vec![payload, second], + ..Default::default() }; let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo { @@ -616,15 +630,18 @@ mod tests { }; let res = bq.next_batch(parent).await.unwrap_err(); let logs = trace_store.get_by_level(Level::INFO); - assert_eq!(logs.len(), 5); + assert_eq!(logs.len(), 4); let str = alloc::format!("Advancing batch queue origin: {:?}", origin); assert!(logs[0].contains(&str)); - assert!(logs[1].contains("Deriving next batch for epoch: 16988980031808077784")); - assert!(logs[2].contains("Next batch found:")); + assert!(logs[1].contains("need more l1 blocks to check entire origins of span batch")); + assert!(logs[2].contains("Deriving next batch for epoch: 16988980031808077784")); + assert!(logs[3].contains("need more l1 blocks to check entire origins of span batch")); + // assert!(logs[4].contains("Next batch found:")); let warns = trace_store.get_by_level(Level::WARN); assert_eq!(warns.len(), 0); - let str = "Could not get singular batches from span batch: Missing L1 origin"; - assert_eq!(res, StageError::Custom(anyhow::anyhow!(str))); + assert_eq!(res, StageError::NotEnoughData); + // let str = "Could not get singular batches from span batch: Missing L1 origin"; + // assert_eq!(res, StageError::Custom(anyhow::anyhow!(str))); } #[tokio::test] diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 0fc10afece..04378b919b 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,7 @@ use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, stages::ChannelReaderProvider, - traits::{OriginProvider, ResettableStage}, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -37,7 +37,7 @@ pub trait ChannelBankProvider { #[derive(Debug)] pub struct ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// The rollup configuration. cfg: Arc, @@ -51,7 +51,7 @@ where impl

ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -166,7 +166,7 @@ where #[async_trait] impl

ChannelReaderProvider for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { async fn next_data(&mut self) -> StageResult> { match self.read() { @@ -188,19 +188,35 @@ where impl

OriginProvider for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl

PreviousStage for ChannelBank

+where + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl

ResettableStage for ChannelBank

where - P: ChannelBankProvider + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; self.channels.clear(); self.channel_queue = VecDeque::with_capacity(10); Err(StageError::Eof) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 2beae7efa9..6efcae9e98 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,8 +2,8 @@ use crate::{ stages::BatchQueueProvider, - traits::OriginProvider, - types::{Batch, BlockInfo, RollupConfig, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{Batch, BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc, vec::Vec}; @@ -27,7 +27,7 @@ pub trait ChannelReaderProvider { #[derive(Debug)] pub struct ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// The previous stage of the derivation pipeline. prev: P, @@ -39,7 +39,7 @@ where impl

ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { @@ -65,7 +65,7 @@ where #[async_trait] impl

BatchQueueProvider for ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Send + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { @@ -91,13 +91,36 @@ where impl

OriginProvider for ChannelReader

where - P: ChannelReaderProvider + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +#[async_trait] +impl

ResettableStage for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, +{ + async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, cfg).await?; + self.next_channel(); + Ok(()) + } +} + +impl

PreviousStage for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + /// Batch Reader provides a function that iteratively consumes batches from the reader. /// The L1Inclusion block is also provided at creation time. /// Warning: the batch reader can read every batch-type. diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index be7ac6bcb6..4d1f42eaa3 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, ResettableStage}, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -31,7 +31,7 @@ pub trait FrameQueueProvider { #[derive(Debug)] pub struct FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// The previous stage in the pipeline. pub prev: P, @@ -41,7 +41,7 @@ where impl

FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// @@ -54,7 +54,7 @@ where #[async_trait] impl

ChannelBankProvider for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { @@ -87,19 +87,35 @@ where impl

OriginProvider for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl

PreviousStage for FrameQueue

+where + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl

ResettableStage for FrameQueue

where - P: FrameQueueProvider + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, { - async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> { + async fn reset( + &mut self, + block_info: BlockInfo, + system_config: &SystemConfig, + ) -> StageResult<()> { + self.prev.reset(block_info, system_config).await?; self.queue = VecDeque::default(); Err(StageError::Eof) } diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 0986a94af9..74ab4684da 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -2,7 +2,9 @@ use crate::{ stages::FrameQueueProvider, - traits::{AsyncIterator, DataAvailabilityProvider, OriginProvider, ResettableStage}, + traits::{ + AsyncIterator, DataAvailabilityProvider, OriginProvider, PreviousStage, ResettableStage, + }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::boxed::Box; @@ -12,6 +14,7 @@ use async_trait::async_trait; /// Provides L1 blocks for the [L1Retrieval] stage. /// This is the previous stage in the pipeline. +#[async_trait] pub trait L1RetrievalProvider { /// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete. /// This function can only be called once while the stage is in progress, and will return @@ -19,7 +22,7 @@ pub trait L1RetrievalProvider { /// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned. /// /// [L1Traversal]: crate::stages::L1Traversal - fn next_l1_block(&mut self) -> StageResult>; + async fn next_l1_block(&mut self) -> StageResult>; /// Returns the batcher [Address] from the [crate::types::SystemConfig]. fn batcher_addr(&self) -> Address; @@ -36,7 +39,7 @@ pub trait L1RetrievalProvider { pub struct L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, { /// The previous stage in the pipeline. pub prev: P, @@ -49,7 +52,7 @@ where impl L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] stage and given /// [DataAvailabilityProvider]. @@ -64,7 +67,7 @@ where impl FrameQueueProvider for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, { type Item = DAP::Item; @@ -72,7 +75,8 @@ where if self.data.is_none() { let next = self .prev - .next_l1_block()? // SAFETY: This question mark bubbles up the Eof error. + .next_l1_block() + .await? // SAFETY: This question mark bubbles up the Eof error. .ok_or_else(|| anyhow!("No block to retrieve data from"))?; self.data = Some(self.provider.open_data(&next, self.prev.batcher_addr()).await?); } @@ -92,20 +96,33 @@ where impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + OriginProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() } } +impl PreviousStage for L1Retrieval +where + DAP: DataAvailabilityProvider, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, +{ + type Previous = P; + + fn previous(&self) -> Option<&Self::Previous> { + Some(&self.prev) + } +} + #[async_trait] impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { + self.prev.reset(base, cfg).await?; self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?); Ok(()) } diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 6403291fd9..80de560678 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,14 +2,22 @@ use crate::{ stages::L1RetrievalProvider, - traits::{ChainProvider, OriginProvider, ResettableStage}, + traits::{ChainProvider, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; +use spin::Mutex; use tracing::warn; +/// Defines a trait for advancing the L1 block in the [L1Traversal] stage. +#[async_trait] +pub trait L1BlockAdvance { + /// Advances the internal state of the [L1Traversal] stage to the next L1 block. + async fn advance_l1_block(&mut self) -> StageResult<()>; +} + /// The [L1Traversal] stage of the derivation pipeline. /// /// This stage sits at the bottom of the pipeline, holding a handle to the data source @@ -25,18 +33,31 @@ pub struct L1Traversal { data_source: Provider, /// Signals whether or not the traversal stage is complete. done: bool, + /// Marks if the L1Traversal stage should attempt to advance to the next block. + pub advance: Arc>, /// The system config. pub system_config: SystemConfig, /// A reference to the rollup config. pub rollup_config: Arc, } -impl L1RetrievalProvider for L1Traversal { +#[async_trait] +impl L1RetrievalProvider for L1Traversal { fn batcher_addr(&self) -> Address { self.system_config.batcher_addr } - fn next_l1_block(&mut self) -> StageResult> { + async fn next_l1_block(&mut self) -> StageResult> { + // let advance = match { + // Ok(advance) => advance, + // Err(_) => return Err(StageError::Custom(anyhow::anyhow!("Failed to lock advance + // mutex"))), }; + let mut advance = self.advance.lock(); + if *advance { + *advance = false; + drop(advance); + self.advance_l1_block().await?; + } if !self.done { self.done = true; Ok(self.block) @@ -48,11 +69,12 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, cfg: Arc) -> Self { + pub fn new(data_source: F, advance: Arc>, cfg: Arc) -> Self { Self { block: Some(BlockInfo::default()), data_source, done: false, + advance, system_config: SystemConfig::default(), rollup_config: cfg, } @@ -62,11 +84,14 @@ impl L1Traversal { pub fn data_source(&self) -> &F { &self.data_source } +} +#[async_trait] +impl L1BlockAdvance for L1Traversal { /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. - pub async fn advance_l1_block(&mut self) -> StageResult<()> { + async fn advance_l1_block(&mut self) -> StageResult<()> { // Pull the next block or return EOF. // StageError::EOF has special handling further up the pipeline. let block = match self.block { @@ -112,6 +137,14 @@ impl OriginProvider for L1Traversal { } } +impl PreviousStage for L1Traversal { + type Previous = L1Traversal; + + fn previous(&self) -> Option<&Self::Previous> { + None + } +} + #[async_trait] impl ResettableStage for L1Traversal { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { @@ -179,7 +212,7 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(Mutex::new(false)), Arc::new(rollup_config)) } pub(crate) fn new_populated_test_traversal() -> L1Traversal { @@ -193,8 +226,8 @@ pub(crate) mod tests { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); assert!(traversal.advance_l1_block().await.is_ok()); } @@ -202,8 +235,8 @@ pub(crate) mod tests { async fn test_l1_traversal_missing_receipts() { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let mut traversal = new_test_traversal(blocks, vec![]); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_)); } @@ -222,8 +255,8 @@ pub(crate) mod tests { #[tokio::test] async fn test_l1_traversal_missing_blocks() { let mut traversal = new_test_traversal(vec![], vec![]); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_)); } @@ -248,8 +281,8 @@ pub(crate) mod tests { let blocks = vec![BlockInfo::default(), BlockInfo::default()]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default())); - assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof); + assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); + assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); assert!(traversal.advance_l1_block().await.is_ok()); let expected = address!("000000000000000000000000000000000000bEEF"); assert_eq!(traversal.system_config.batcher_addr, expected); diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 2034cf5d76..355793fa40 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -14,7 +14,7 @@ //! 8. (Omitted) Engine Queue mod l1_traversal; -pub use l1_traversal::L1Traversal; +pub use l1_traversal::{L1BlockAdvance, L1Traversal}; mod l1_retrieval; pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index a630b91239..591caad2cc 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -2,10 +2,10 @@ use crate::{ stages::attributes_queue::{AttributesBuilder, AttributesProvider}, - traits::OriginProvider, + traits::{OriginProvider, PreviousStage, ResettableStage}, types::{ BlockID, BlockInfo, BuilderError, L2BlockInfo, L2PayloadAttributes, SingleBatch, - StageError, StageResult, + StageError, StageResult, SystemConfig, }, }; use alloc::{boxed::Box, vec::Vec}; @@ -49,6 +49,21 @@ impl OriginProvider for MockAttributesProvider { } } +#[async_trait] +impl ResettableStage for MockAttributesProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockAttributesProvider { + type Previous = MockAttributesProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} + #[async_trait] impl AttributesProvider for MockAttributesProvider { async fn next_batch(&mut self, _parent: L2BlockInfo) -> StageResult { diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index 809863fd9e..abae762f7d 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -2,8 +2,8 @@ use crate::{ stages::batch_queue::BatchQueueProvider, - traits::OriginProvider, - types::{Batch, BlockInfo, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{Batch, BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; @@ -36,3 +36,18 @@ impl BatchQueueProvider for MockBatchQueueProvider { self.batches.pop().ok_or(StageError::Eof)? } } + +#[async_trait] +impl ResettableStage for MockBatchQueueProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockBatchQueueProvider { + type Previous = MockBatchQueueProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs index 62db2933af..9e5400386b 100644 --- a/crates/derive/src/stages/test_utils/channel_bank.rs +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -2,14 +2,14 @@ use crate::{ stages::ChannelBankProvider, - traits::OriginProvider, - types::{BlockInfo, Frame, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use async_trait::async_trait; /// A mock [ChannelBankProvider] for testing the [ChannelBank] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockChannelBankProvider { /// The data to return. pub data: Vec>, @@ -36,3 +36,18 @@ impl ChannelBankProvider for MockChannelBankProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockChannelBankProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockChannelBankProvider { + type Previous = MockChannelBankProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index 23cea64161..d1b531f34e 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -2,15 +2,15 @@ use crate::{ stages::ChannelReaderProvider, - traits::OriginProvider, - types::{BlockInfo, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; /// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockChannelReaderProvider { /// The data to return. pub data: Vec>>, @@ -37,3 +37,18 @@ impl ChannelReaderProvider for MockChannelReaderProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockChannelReaderProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockChannelReaderProvider { + type Previous = MockChannelReaderProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs index a7cd4f8e55..491bf2a216 100644 --- a/crates/derive/src/stages/test_utils/frame_queue.rs +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -2,15 +2,15 @@ use crate::{ stages::FrameQueueProvider, - traits::OriginProvider, - types::{BlockInfo, StageError, StageResult}, + traits::{OriginProvider, PreviousStage, ResettableStage}, + types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; /// A mock [FrameQueueProvider] for testing the [FrameQueue] stage. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MockFrameQueueProvider { /// The data to return. pub data: Vec>, @@ -37,3 +37,18 @@ impl FrameQueueProvider for MockFrameQueueProvider { self.data.pop().unwrap_or(Err(StageError::Eof)) } } + +#[async_trait] +impl ResettableStage for MockFrameQueueProvider { + async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { + Ok(()) + } +} + +impl PreviousStage for MockFrameQueueProvider { + type Previous = MockFrameQueueProvider; + + fn previous(&self) -> Option<&Self::Previous> { + Some(self) + } +} diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index ccd92b1050..8ee12b532d 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -5,7 +5,7 @@ mod data_sources; pub use data_sources::*; mod stages; -pub use stages::{OriginProvider, ResettableStage}; +pub use stages::{OriginProvider, PreviousStage, ResettableStage}; mod ecrecover; pub use ecrecover::SignedRecoverable; diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index 7c43626547..18ecc6cc96 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -16,3 +16,12 @@ pub trait OriginProvider { /// Returns the optional L1 [BlockInfo] origin. fn origin(&self) -> Option<&BlockInfo>; } + +/// Provides a method for accessing a previous stage. +pub trait PreviousStage { + /// The previous stage. + type Previous: ResettableStage + PreviousStage; + + /// Returns the previous stage. + fn previous(&self) -> Option<&Self::Previous>; +} diff --git a/crates/derive/src/types/payload.rs b/crates/derive/src/types/payload.rs index ae1e10e826..d6d89aa9c5 100644 --- a/crates/derive/src/types/payload.rs +++ b/crates/derive/src/types/payload.rs @@ -3,7 +3,7 @@ use alloc::vec::Vec; use alloy_primitives::{Address, Bloom, Bytes, B256, U256}; use anyhow::Result; -use op_alloy_consensus::TxDeposit; +use op_alloy_consensus::{OpTxEnvelope, TxDeposit}; /// Fixed and variable memory costs for a payload. /// ~1000 bytes per payload, with some margin for overhead like map data. From ae1e9033bf5c4d912663390eef22dac1c11cb173 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 25 Apr 2024 07:50:58 -0700 Subject: [PATCH 2/2] fix(derive): pipeline construction and trait abstractions --- crates/derive/src/builder.rs | 42 ++++-- crates/derive/src/lib.rs | 5 - crates/derive/src/online/mod.rs | 35 ++++- crates/derive/src/stack.rs | 128 ------------------ crates/derive/src/stages/attributes_queue.rs | 31 +++-- crates/derive/src/stages/batch_queue.rs | 33 +++-- crates/derive/src/stages/channel_bank.rs | 30 ++-- crates/derive/src/stages/channel_reader.rs | 30 ++-- crates/derive/src/stages/frame_queue.rs | 30 ++-- crates/derive/src/stages/l1_retrieval.rs | 34 +++-- crates/derive/src/stages/l1_traversal.rs | 51 ++----- crates/derive/src/stages/mod.rs | 2 +- .../src/stages/test_utils/attributes_queue.rs | 15 +- .../src/stages/test_utils/batch_queue.rs | 15 +- .../src/stages/test_utils/channel_bank.rs | 15 +- .../src/stages/test_utils/channel_reader.rs | 15 +- .../src/stages/test_utils/frame_queue.rs | 15 +- crates/derive/src/traits/mod.rs | 2 +- crates/derive/src/traits/stages.rs | 15 +- 19 files changed, 259 insertions(+), 284 deletions(-) delete mode 100644 crates/derive/src/stack.rs diff --git a/crates/derive/src/builder.rs b/crates/derive/src/builder.rs index 943af642c9..b3023231e3 100644 --- a/crates/derive/src/builder.rs +++ b/crates/derive/src/builder.rs @@ -2,7 +2,7 @@ use crate::{ stages::NextAttributes, - traits::ResettableStage, + traits::{OriginAdvancer, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig, }, @@ -24,12 +24,11 @@ pub trait ResetProvider { /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] pub struct DerivationPipeline< - S: NextAttributes + ResettableStage + Debug + Send, + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, R: ResetProvider + Send, > { - /// The stack of stages in the pipeline. - /// The stack is reponsible for advancing the L1 traversal stage. - pub stack: S, + /// A handle to the next attributes. + pub attributes: S, /// Reset provider for the pipeline. pub reset: R, /// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer. @@ -40,12 +39,14 @@ pub struct DerivationPipeline< pub cursor: L2BlockInfo, } -impl - DerivationPipeline +impl< + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, + > DerivationPipeline { /// Creates a new instance of the [DerivationPipeline]. - pub fn new(stack: S, reset: R, cursor: L2BlockInfo) -> Self { - Self { stack, prepared: VecDeque::new(), reset, needs_reset: false, cursor } + pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self { + Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor } } /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. @@ -63,6 +64,23 @@ impl StageResult<()> { + match self.attributes.reset(bi, sc).await { + Ok(()) => { + tracing::info!("Stages reset"); + } + Err(StageError::Eof) => { + tracing::info!("Stages reset with EOF"); + } + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + return Err(err); + } + } + Ok(()) + } + /// Attempts to progress the pipeline. /// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data. /// Any other error is critical and the derivation pipeline should be reset. @@ -76,12 +94,11 @@ impl { tracing::info!("attributes queue stage step returned l2 attributes"); tracing::info!("prepared L2 attributes: {:?}", a); @@ -90,6 +107,7 @@ impl { tracing::info!("attributes queue stage complete"); + self.attributes.advance_origin().await?; } // TODO: match on the EngineELSyncing error here and log Err(err) => { diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index cb6b6b29f0..17001815ee 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -29,11 +29,6 @@ pub mod stages; pub mod traits; pub mod types; -#[cfg(feature = "online")] -mod stack; -#[cfg(feature = "online")] -pub use stack::*; - #[cfg(feature = "online")] mod online; #[cfg(feature = "online")] diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index 996052e090..5cf7a93c86 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -3,11 +3,42 @@ /// Prelude for online providers. pub(crate) mod prelude { pub use super::{ - AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient, - OnlineBlobProvider, SimpleSlotDerivation, + new_online_stack, AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, + OnlineBeaconClient, OnlineBlobProvider, SimpleSlotDerivation, }; } +use crate::{ + stages::{ + AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, NextAttributes, + }, + traits::{ + ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage, + }, + types::RollupConfig, +}; +use alloc::sync::Arc; +use core::fmt::Debug; + +/// Creates a new [OnlineStageStack]. +#[cfg(feature = "online")] +pub fn new_online_stack( + rollup_config: Arc, + chain_provider: impl ChainProvider + Clone + Debug + Send, + dap_source: impl DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + fetcher: impl L2ChainProvider + Clone + Debug + Send, + builder: impl AttributesBuilder + Clone + Debug + Send, +) -> impl NextAttributes + ResettableStage + Debug + Send { + let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone()); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); + let frame_queue = FrameQueue::new(l1_retrieval); + let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); + let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); + let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); + AttributesQueue::new(*rollup_config, batch_queue, builder) +} + #[cfg(test)] #[allow(unreachable_pub)] pub mod test_utils; diff --git a/crates/derive/src/stack.rs b/crates/derive/src/stack.rs deleted file mode 100644 index 3e15614e13..0000000000 --- a/crates/derive/src/stack.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! Contains a stack of Stages for the [crate::DerivationPipeline]. - -use crate::{ - stages::{ - AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, - L1Retrieval, L1Traversal, NextAttributes, - }, - traits::{ - ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage, - }, - types::{ - BlockInfo, L2AttributesWithParent, L2BlockInfo, RollupConfig, StageError, StageResult, - SystemConfig, - }, -}; -use alloc::{boxed::Box, sync::Arc}; -use async_trait::async_trait; -use core::fmt::Debug; -use spin::Mutex; - -/// The [AttributesQueue] type alias. -pub type AttributesQueueType = AttributesQueue< - BatchQueue>>>>, F>, - B, ->; - -/// An online stack of stages. -#[derive(Debug)] -pub struct OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Flag to tell the L1Traversal stage to advance to the next L1 block. - pub advance: Arc>, - /// The [AttributesQueue] stage. - pub attributes: AttributesQueueType, -} - -impl OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Creates a new [OnlineStageStack]. - pub fn new( - rollup_config: Arc, - chain_provider: P, - dap_source: DAP, - fetcher: F, - builder: B, - ) -> Self { - let advance = Arc::new(Mutex::new(false)); - let l1_traversal = - L1Traversal::new(chain_provider, Arc::clone(&advance), rollup_config.clone()); - let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval); - let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); - let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); - let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); - let attributes = AttributesQueue::new(*rollup_config, batch_queue, builder); - Self { advance, attributes } - } -} - -#[async_trait] -impl NextAttributes for OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Advances the pipeline to the next attributes. - async fn next_attributes( - &mut self, - parent: L2BlockInfo, - ) -> StageResult { - match self.attributes.next_attributes(parent).await { - Ok(a) => { - tracing::info!("attributes queue stage step returned l2 attributes"); - tracing::info!("prepared L2 attributes: {:?}", a); - return Ok(a); - } - Err(StageError::Eof) => { - tracing::info!("attributes queue stage complete"); - let mut advance = self.advance.lock(); - *advance = true; - return Err(StageError::Eof); - } - // TODO: match on the EngineELSyncing error here and log - Err(err) => { - tracing::error!("attributes queue stage failed: {:?}", err); - return Err(err); - } - } - } -} - -#[async_trait] -impl ResettableStage for OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Resets all stages in the stack. - async fn reset(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { - match self.attributes.reset(bi, sc).await { - Ok(()) => { - tracing::info!("Stages reset"); - } - Err(StageError::Eof) => { - tracing::info!("Stages reset with EOF"); - } - Err(err) => { - tracing::error!("Stages reset failed: {:?}", err); - return Err(err); - } - } - Ok(()) - } -} diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index d0b2823f78..c3f4e87b45 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,7 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -53,7 +53,7 @@ pub trait NextAttributes { #[derive(Debug)] pub struct AttributesQueue where - P: AttributesProvider + ResettableStage + PreviousStage + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. @@ -70,7 +70,7 @@ where impl AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. @@ -149,20 +149,29 @@ where impl PreviousStage for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, - AB: AttributesBuilder + Debug, + P: AttributesProvider + PreviousStage + Send + Debug, + AB: AttributesBuilder + Send + Debug, { - type Previous = P; + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} - fn previous(&self) -> Option<&P> { - Some(&self.prev) +#[async_trait] +impl OriginAdvancer for AttributesQueue +where + P: AttributesProvider + PreviousStage + Debug + Send, + AB: AttributesBuilder + Debug + Send, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await } } #[async_trait] impl NextAttributes for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, + P: AttributesProvider + PreviousStage + Debug + Send, AB: AttributesBuilder + Debug + Send, { async fn next_attributes( @@ -175,7 +184,7 @@ where impl OriginProvider for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -186,7 +195,7 @@ where #[async_trait] impl ResettableStage for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: AttributesProvider + PreviousStage + Send + Debug, AB: AttributesBuilder + Send + Debug, { async fn reset( diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 7d66c5ad19..3649dd1843 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::AttributesProvider, - traits::{L2ChainProvider, OriginProvider, PreviousStage, ResettableStage}, + traits::{L2ChainProvider, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -43,7 +43,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -75,7 +75,7 @@ where impl BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -242,10 +242,21 @@ where } } +#[async_trait] +impl OriginAdvancer for BatchQueue +where + P: BatchQueueProvider + PreviousStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -374,7 +385,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -384,20 +395,18 @@ where impl PreviousStage for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, - BF: L2ChainProvider + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } #[async_trait] impl ResettableStage for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, BF: L2ChainProvider + Send + Debug, { async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> { diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 04378b919b..078341fd4e 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,7 @@ use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, stages::ChannelReaderProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -37,7 +37,7 @@ pub trait ChannelBankProvider { #[derive(Debug)] pub struct ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { /// The rollup configuration. cfg: Arc, @@ -51,7 +51,7 @@ where impl

ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -163,10 +163,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for ChannelBank

+where + P: ChannelBankProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

ChannelReaderProvider for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + Send + Debug, { async fn next_data(&mut self) -> StageResult> { match self.read() { @@ -188,7 +198,7 @@ where impl

OriginProvider for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -197,19 +207,17 @@ where impl

PreviousStage for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug + Send, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } #[async_trait] impl

ResettableStage for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + Send + Debug, { async fn reset( &mut self, diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 6efcae9e98..e24bd5c798 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,7 +2,7 @@ use crate::{ stages::BatchQueueProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{Batch, BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; @@ -27,7 +27,7 @@ pub trait ChannelReaderProvider { #[derive(Debug)] pub struct ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { /// The previous stage of the derivation pipeline. prev: P, @@ -39,7 +39,7 @@ where impl

ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { @@ -62,10 +62,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

BatchQueueProvider for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: ChannelReaderProvider + PreviousStage + Send + Debug, { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { @@ -91,7 +101,7 @@ where impl

OriginProvider for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -101,7 +111,7 @@ where #[async_trait] impl

ResettableStage for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, + P: ChannelReaderProvider + PreviousStage + Debug + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { self.prev.reset(base, cfg).await?; @@ -112,12 +122,10 @@ where impl

PreviousStage for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Send + Debug, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 4d1f42eaa3..07586ffcb6 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -31,7 +31,7 @@ pub trait FrameQueueProvider { #[derive(Debug)] pub struct FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { /// The previous stage in the pipeline. pub prev: P, @@ -41,7 +41,7 @@ where impl

FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// @@ -51,10 +51,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for FrameQueue

+where + P: FrameQueueProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

ChannelBankProvider for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { @@ -87,7 +97,7 @@ where impl

OriginProvider for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -96,19 +106,17 @@ where impl

PreviousStage for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } #[async_trait] impl

ResettableStage for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { async fn reset( &mut self, diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 74ab4684da..801302e442 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -3,7 +3,8 @@ use crate::{ stages::FrameQueueProvider, traits::{ - AsyncIterator, DataAvailabilityProvider, OriginProvider, PreviousStage, ResettableStage, + AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, PreviousStage, + ResettableStage, }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; @@ -39,7 +40,7 @@ pub trait L1RetrievalProvider { pub struct L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { /// The previous stage in the pipeline. pub prev: P, @@ -52,7 +53,7 @@ where impl L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] stage and given /// [DataAvailabilityProvider]. @@ -63,11 +64,22 @@ where } } +#[async_trait] +impl OriginAdvancer for L1Retrieval +where + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl FrameQueueProvider for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { type Item = DAP::Item; @@ -96,7 +108,7 @@ where impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -105,13 +117,11 @@ where impl PreviousStage for L1Retrieval where - DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } @@ -119,7 +129,7 @@ where impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { self.prev.reset(base, cfg).await?; diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 80de560678..65c9751084 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,22 +2,14 @@ use crate::{ stages::L1RetrievalProvider, - traits::{ChainProvider, OriginProvider, PreviousStage, ResettableStage}, + traits::{ChainProvider, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; -use spin::Mutex; use tracing::warn; -/// Defines a trait for advancing the L1 block in the [L1Traversal] stage. -#[async_trait] -pub trait L1BlockAdvance { - /// Advances the internal state of the [L1Traversal] stage to the next L1 block. - async fn advance_l1_block(&mut self) -> StageResult<()>; -} - /// The [L1Traversal] stage of the derivation pipeline. /// /// This stage sits at the bottom of the pipeline, holding a handle to the data source @@ -33,8 +25,6 @@ pub struct L1Traversal { data_source: Provider, /// Signals whether or not the traversal stage is complete. done: bool, - /// Marks if the L1Traversal stage should attempt to advance to the next block. - pub advance: Arc>, /// The system config. pub system_config: SystemConfig, /// A reference to the rollup config. @@ -48,16 +38,6 @@ impl L1RetrievalProvider for L1Traversal { } async fn next_l1_block(&mut self) -> StageResult> { - // let advance = match { - // Ok(advance) => advance, - // Err(_) => return Err(StageError::Custom(anyhow::anyhow!("Failed to lock advance - // mutex"))), }; - let mut advance = self.advance.lock(); - if *advance { - *advance = false; - drop(advance); - self.advance_l1_block().await?; - } if !self.done { self.done = true; Ok(self.block) @@ -69,12 +49,11 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, advance: Arc>, cfg: Arc) -> Self { + pub fn new(data_source: F, cfg: Arc) -> Self { Self { block: Some(BlockInfo::default()), data_source, done: false, - advance, system_config: SystemConfig::default(), rollup_config: cfg, } @@ -87,11 +66,11 @@ impl L1Traversal { } #[async_trait] -impl L1BlockAdvance for L1Traversal { +impl OriginAdvancer for L1Traversal { /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. - async fn advance_l1_block(&mut self) -> StageResult<()> { + async fn advance_origin(&mut self) -> StageResult<()> { // Pull the next block or return EOF. // StageError::EOF has special handling further up the pipeline. let block = match self.block { @@ -138,9 +117,7 @@ impl OriginProvider for L1Traversal { } impl PreviousStage for L1Traversal { - type Previous = L1Traversal; - - fn previous(&self) -> Option<&Self::Previous> { + fn previous(&self) -> Option> { None } } @@ -212,7 +189,7 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(Mutex::new(false)), Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(rollup_config)) } pub(crate) fn new_populated_test_traversal() -> L1Traversal { @@ -228,7 +205,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(blocks, receipts); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); } #[tokio::test] @@ -237,7 +214,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(blocks, vec![]); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_)); + matches!(traversal.advance_origin().await.unwrap_err(), StageError::ReceiptFetch(_)); } #[tokio::test] @@ -247,8 +224,8 @@ pub(crate) mod tests { let blocks = vec![block, block]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert!(traversal.advance_l1_block().await.is_ok()); - let err = traversal.advance_l1_block().await.unwrap_err(); + assert!(traversal.advance_origin().await.is_ok()); + let err = traversal.advance_origin().await.unwrap_err(); assert_eq!(err, StageError::ReorgDetected(block.hash, block.parent_hash)); } @@ -257,7 +234,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(vec![], vec![]); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_)); + matches!(traversal.advance_origin().await.unwrap_err(), StageError::BlockInfoFetch(_)); } #[tokio::test] @@ -269,10 +246,10 @@ pub(crate) mod tests { let blocks = vec![block1, block2]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); // Only the second block should fail since the second receipt // contains invalid logs that will error for a system config update. - let err = traversal.advance_l1_block().await.unwrap_err(); + let err = traversal.advance_origin().await.unwrap_err(); matches!(err, StageError::SystemConfigUpdate(_)); } @@ -283,7 +260,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(blocks, receipts); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); let expected = address!("000000000000000000000000000000000000bEEF"); assert_eq!(traversal.system_config.batcher_addr, expected); } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 355793fa40..2034cf5d76 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -14,7 +14,7 @@ //! 8. (Omitted) Engine Queue mod l1_traversal; -pub use l1_traversal::{L1BlockAdvance, L1Traversal}; +pub use l1_traversal::L1Traversal; mod l1_retrieval; pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index 591caad2cc..1e689ecbff 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::{AttributesBuilder, AttributesProvider}, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockID, BlockInfo, BuilderError, L2BlockInfo, L2PayloadAttributes, SingleBatch, StageError, StageResult, SystemConfig, @@ -49,6 +49,13 @@ impl OriginProvider for MockAttributesProvider { } } +#[async_trait] +impl OriginAdvancer for MockAttributesProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ResettableStage for MockAttributesProvider { async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { @@ -57,10 +64,8 @@ impl ResettableStage for MockAttributesProvider { } impl PreviousStage for MockAttributesProvider { - type Previous = MockAttributesProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index abae762f7d..896add5ee5 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::batch_queue::BatchQueueProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{Batch, BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -37,6 +37,13 @@ impl BatchQueueProvider for MockBatchQueueProvider { } } +#[async_trait] +impl OriginAdvancer for MockBatchQueueProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ResettableStage for MockBatchQueueProvider { async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { @@ -45,9 +52,7 @@ impl ResettableStage for MockBatchQueueProvider { } impl PreviousStage for MockBatchQueueProvider { - type Previous = MockBatchQueueProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs index 9e5400386b..9da6717abc 100644 --- a/crates/derive/src/stages/test_utils/channel_bank.rs +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -30,6 +30,13 @@ impl OriginProvider for MockChannelBankProvider { } } +#[async_trait] +impl OriginAdvancer for MockChannelBankProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ChannelBankProvider for MockChannelBankProvider { async fn next_frame(&mut self) -> StageResult { @@ -45,9 +52,7 @@ impl ResettableStage for MockChannelBankProvider { } impl PreviousStage for MockChannelBankProvider { - type Previous = MockChannelBankProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index d1b531f34e..d43a4e8309 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelReaderProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -31,6 +31,13 @@ impl OriginProvider for MockChannelReaderProvider { } } +#[async_trait] +impl OriginAdvancer for MockChannelReaderProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ChannelReaderProvider for MockChannelReaderProvider { async fn next_data(&mut self) -> StageResult> { @@ -46,9 +53,7 @@ impl ResettableStage for MockChannelReaderProvider { } impl PreviousStage for MockChannelReaderProvider { - type Previous = MockChannelReaderProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs index 491bf2a216..8349cfbc9e 100644 --- a/crates/derive/src/stages/test_utils/frame_queue.rs +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::FrameQueueProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -29,6 +29,13 @@ impl OriginProvider for MockFrameQueueProvider { } } +#[async_trait] +impl OriginAdvancer for MockFrameQueueProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl FrameQueueProvider for MockFrameQueueProvider { type Item = Bytes; @@ -46,9 +53,7 @@ impl ResettableStage for MockFrameQueueProvider { } impl PreviousStage for MockFrameQueueProvider { - type Previous = MockFrameQueueProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 8ee12b532d..f3ff4f5473 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -5,7 +5,7 @@ mod data_sources; pub use data_sources::*; mod stages; -pub use stages::{OriginProvider, PreviousStage, ResettableStage}; +pub use stages::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}; mod ecrecover; pub use ecrecover::SignedRecoverable; diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index 18ecc6cc96..3776f9a19e 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -17,11 +17,16 @@ pub trait OriginProvider { fn origin(&self) -> Option<&BlockInfo>; } -/// Provides a method for accessing a previous stage. -pub trait PreviousStage { - /// The previous stage. - type Previous: ResettableStage + PreviousStage; +/// Defines a trait for advancing the L1 origin of the pipeline. +#[async_trait] +pub trait OriginAdvancer { + /// Advances the internal state of the lowest stage to the next l1 origin. + /// This method is the equivalent of the reference implementation `advance_l1_block`. + async fn advance_origin(&mut self) -> StageResult<()>; +} +/// Provides a method for accessing a previous stage. +pub trait PreviousStage: ResettableStage + OriginAdvancer + OriginProvider { /// Returns the previous stage. - fn previous(&self) -> Option<&Self::Previous>; + fn previous(&self) -> Option>; }