From ae1e9033bf5c4d912663390eef22dac1c11cb173 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 25 Apr 2024 07:50:58 -0700 Subject: [PATCH] fix(derive): pipeline construction and trait abstractions --- crates/derive/src/builder.rs | 42 ++++-- crates/derive/src/lib.rs | 5 - crates/derive/src/online/mod.rs | 35 ++++- crates/derive/src/stack.rs | 128 ------------------ crates/derive/src/stages/attributes_queue.rs | 31 +++-- crates/derive/src/stages/batch_queue.rs | 33 +++-- crates/derive/src/stages/channel_bank.rs | 30 ++-- crates/derive/src/stages/channel_reader.rs | 30 ++-- crates/derive/src/stages/frame_queue.rs | 30 ++-- crates/derive/src/stages/l1_retrieval.rs | 34 +++-- crates/derive/src/stages/l1_traversal.rs | 51 ++----- crates/derive/src/stages/mod.rs | 2 +- .../src/stages/test_utils/attributes_queue.rs | 15 +- .../src/stages/test_utils/batch_queue.rs | 15 +- .../src/stages/test_utils/channel_bank.rs | 15 +- .../src/stages/test_utils/channel_reader.rs | 15 +- .../src/stages/test_utils/frame_queue.rs | 15 +- crates/derive/src/traits/mod.rs | 2 +- crates/derive/src/traits/stages.rs | 15 +- 19 files changed, 259 insertions(+), 284 deletions(-) delete mode 100644 crates/derive/src/stack.rs diff --git a/crates/derive/src/builder.rs b/crates/derive/src/builder.rs index 943af642c9..b3023231e3 100644 --- a/crates/derive/src/builder.rs +++ b/crates/derive/src/builder.rs @@ -2,7 +2,7 @@ use crate::{ stages::NextAttributes, - traits::ResettableStage, + traits::{OriginAdvancer, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, StageError, StageResult, SystemConfig, }, @@ -24,12 +24,11 @@ pub trait ResetProvider { /// The derivation pipeline is responsible for deriving L2 inputs from L1 data. #[derive(Debug)] pub struct DerivationPipeline< - S: NextAttributes + ResettableStage + Debug + Send, + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, R: ResetProvider + Send, > { - /// The stack of stages in the pipeline. - /// The stack is reponsible for advancing the L1 traversal stage. - pub stack: S, + /// A handle to the next attributes. + pub attributes: S, /// Reset provider for the pipeline. pub reset: R, /// A list of prepared [L2AttributesWithParent] to be used by the derivation pipeline consumer. @@ -40,12 +39,14 @@ pub struct DerivationPipeline< pub cursor: L2BlockInfo, } -impl - DerivationPipeline +impl< + S: NextAttributes + ResettableStage + OriginAdvancer + Debug + Send, + R: ResetProvider + Send, + > DerivationPipeline { /// Creates a new instance of the [DerivationPipeline]. - pub fn new(stack: S, reset: R, cursor: L2BlockInfo) -> Self { - Self { stack, prepared: VecDeque::new(), reset, needs_reset: false, cursor } + pub fn new(attributes: S, reset: R, cursor: L2BlockInfo) -> Self { + Self { attributes, prepared: VecDeque::new(), reset, needs_reset: false, cursor } } /// Set the [L2BlockInfo] cursor to be used when pulling the next attributes. @@ -63,6 +64,23 @@ impl StageResult<()> { + match self.attributes.reset(bi, sc).await { + Ok(()) => { + tracing::info!("Stages reset"); + } + Err(StageError::Eof) => { + tracing::info!("Stages reset with EOF"); + } + Err(err) => { + tracing::error!("Stages reset failed: {:?}", err); + return Err(err); + } + } + Ok(()) + } + /// Attempts to progress the pipeline. /// A [StageError::Eof] is returned if the pipeline is blocked by waiting for new L1 data. /// Any other error is critical and the derivation pipeline should be reset. @@ -76,12 +94,11 @@ impl { tracing::info!("attributes queue stage step returned l2 attributes"); tracing::info!("prepared L2 attributes: {:?}", a); @@ -90,6 +107,7 @@ impl { tracing::info!("attributes queue stage complete"); + self.attributes.advance_origin().await?; } // TODO: match on the EngineELSyncing error here and log Err(err) => { diff --git a/crates/derive/src/lib.rs b/crates/derive/src/lib.rs index cb6b6b29f0..17001815ee 100644 --- a/crates/derive/src/lib.rs +++ b/crates/derive/src/lib.rs @@ -29,11 +29,6 @@ pub mod stages; pub mod traits; pub mod types; -#[cfg(feature = "online")] -mod stack; -#[cfg(feature = "online")] -pub use stack::*; - #[cfg(feature = "online")] mod online; #[cfg(feature = "online")] diff --git a/crates/derive/src/online/mod.rs b/crates/derive/src/online/mod.rs index 996052e090..5cf7a93c86 100644 --- a/crates/derive/src/online/mod.rs +++ b/crates/derive/src/online/mod.rs @@ -3,11 +3,42 @@ /// Prelude for online providers. pub(crate) mod prelude { pub use super::{ - AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, OnlineBeaconClient, - OnlineBlobProvider, SimpleSlotDerivation, + new_online_stack, AlloyChainProvider, AlloyL2ChainProvider, BeaconClient, + OnlineBeaconClient, OnlineBlobProvider, SimpleSlotDerivation, }; } +use crate::{ + stages::{ + AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, NextAttributes, + }, + traits::{ + ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage, + }, + types::RollupConfig, +}; +use alloc::sync::Arc; +use core::fmt::Debug; + +/// Creates a new [OnlineStageStack]. +#[cfg(feature = "online")] +pub fn new_online_stack( + rollup_config: Arc, + chain_provider: impl ChainProvider + Clone + Debug + Send, + dap_source: impl DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, + fetcher: impl L2ChainProvider + Clone + Debug + Send, + builder: impl AttributesBuilder + Clone + Debug + Send, +) -> impl NextAttributes + ResettableStage + Debug + Send { + let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone()); + let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); + let frame_queue = FrameQueue::new(l1_retrieval); + let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); + let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); + let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); + AttributesQueue::new(*rollup_config, batch_queue, builder) +} + #[cfg(test)] #[allow(unreachable_pub)] pub mod test_utils; diff --git a/crates/derive/src/stack.rs b/crates/derive/src/stack.rs deleted file mode 100644 index 3e15614e13..0000000000 --- a/crates/derive/src/stack.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! Contains a stack of Stages for the [crate::DerivationPipeline]. - -use crate::{ - stages::{ - AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, - L1Retrieval, L1Traversal, NextAttributes, - }, - traits::{ - ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider, ResettableStage, - }, - types::{ - BlockInfo, L2AttributesWithParent, L2BlockInfo, RollupConfig, StageError, StageResult, - SystemConfig, - }, -}; -use alloc::{boxed::Box, sync::Arc}; -use async_trait::async_trait; -use core::fmt::Debug; -use spin::Mutex; - -/// The [AttributesQueue] type alias. -pub type AttributesQueueType = AttributesQueue< - BatchQueue>>>>, F>, - B, ->; - -/// An online stack of stages. -#[derive(Debug)] -pub struct OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Flag to tell the L1Traversal stage to advance to the next L1 block. - pub advance: Arc>, - /// The [AttributesQueue] stage. - pub attributes: AttributesQueueType, -} - -impl OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Creates a new [OnlineStageStack]. - pub fn new( - rollup_config: Arc, - chain_provider: P, - dap_source: DAP, - fetcher: F, - builder: B, - ) -> Self { - let advance = Arc::new(Mutex::new(false)); - let l1_traversal = - L1Traversal::new(chain_provider, Arc::clone(&advance), rollup_config.clone()); - let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source); - let frame_queue = FrameQueue::new(l1_retrieval); - let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue); - let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone()); - let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher); - let attributes = AttributesQueue::new(*rollup_config, batch_queue, builder); - Self { advance, attributes } - } -} - -#[async_trait] -impl NextAttributes for OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Advances the pipeline to the next attributes. - async fn next_attributes( - &mut self, - parent: L2BlockInfo, - ) -> StageResult { - match self.attributes.next_attributes(parent).await { - Ok(a) => { - tracing::info!("attributes queue stage step returned l2 attributes"); - tracing::info!("prepared L2 attributes: {:?}", a); - return Ok(a); - } - Err(StageError::Eof) => { - tracing::info!("attributes queue stage complete"); - let mut advance = self.advance.lock(); - *advance = true; - return Err(StageError::Eof); - } - // TODO: match on the EngineELSyncing error here and log - Err(err) => { - tracing::error!("attributes queue stage failed: {:?}", err); - return Err(err); - } - } - } -} - -#[async_trait] -impl ResettableStage for OnlineStageStack -where - P: ChainProvider + Clone + Debug + Send, - DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send, - F: L2ChainProvider + Clone + Debug + Send, - B: AttributesBuilder + Clone + Debug + Send, -{ - /// Resets all stages in the stack. - async fn reset(&mut self, bi: BlockInfo, sc: &SystemConfig) -> StageResult<()> { - match self.attributes.reset(bi, sc).await { - Ok(()) => { - tracing::info!("Stages reset"); - } - Err(StageError::Eof) => { - tracing::info!("Stages reset with EOF"); - } - Err(err) => { - tracing::error!("Stages reset failed: {:?}", err); - return Err(err); - } - } - Ok(()) - } -} diff --git a/crates/derive/src/stages/attributes_queue.rs b/crates/derive/src/stages/attributes_queue.rs index d0b2823f78..c3f4e87b45 100644 --- a/crates/derive/src/stages/attributes_queue.rs +++ b/crates/derive/src/stages/attributes_queue.rs @@ -1,7 +1,7 @@ //! Contains the logic for the `AttributesQueue` stage. use crate::{ - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockInfo, L2AttributesWithParent, L2BlockInfo, L2PayloadAttributes, ResetError, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -53,7 +53,7 @@ pub trait NextAttributes { #[derive(Debug)] pub struct AttributesQueue where - P: AttributesProvider + ResettableStage + PreviousStage + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { /// The rollup config. @@ -70,7 +70,7 @@ where impl AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { /// Create a new [AttributesQueue] stage. @@ -149,20 +149,29 @@ where impl PreviousStage for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, - AB: AttributesBuilder + Debug, + P: AttributesProvider + PreviousStage + Send + Debug, + AB: AttributesBuilder + Send + Debug, { - type Previous = P; + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) + } +} - fn previous(&self) -> Option<&P> { - Some(&self.prev) +#[async_trait] +impl OriginAdvancer for AttributesQueue +where + P: AttributesProvider + PreviousStage + Debug + Send, + AB: AttributesBuilder + Debug + Send, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await } } #[async_trait] impl NextAttributes for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, + P: AttributesProvider + PreviousStage + Debug + Send, AB: AttributesBuilder + Debug + Send, { async fn next_attributes( @@ -175,7 +184,7 @@ where impl OriginProvider for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: AttributesProvider + PreviousStage + Debug, AB: AttributesBuilder + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -186,7 +195,7 @@ where #[async_trait] impl ResettableStage for AttributesQueue where - P: AttributesProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: AttributesProvider + PreviousStage + Send + Debug, AB: AttributesBuilder + Send + Debug, { async fn reset( diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index 7d66c5ad19..3649dd1843 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::AttributesProvider, - traits::{L2ChainProvider, OriginProvider, PreviousStage, ResettableStage}, + traits::{L2ChainProvider, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig, SingleBatch, StageError, StageResult, SystemConfig, @@ -43,7 +43,7 @@ pub trait BatchQueueProvider { #[derive(Debug)] pub struct BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { /// The rollup config. @@ -75,7 +75,7 @@ where impl BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. @@ -242,10 +242,21 @@ where } } +#[async_trait] +impl OriginAdvancer for BatchQueue +where + P: BatchQueueProvider + PreviousStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl AttributesProvider for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, BF: L2ChainProvider + Send + Debug, { /// Returns the next valid batch upon the given safe head. @@ -374,7 +385,7 @@ where impl OriginProvider for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: BatchQueueProvider + PreviousStage + Debug, BF: L2ChainProvider + Debug, { fn origin(&self) -> Option<&BlockInfo> { @@ -384,20 +395,18 @@ where impl PreviousStage for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, - BF: L2ChainProvider + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, + BF: L2ChainProvider + Send + Debug, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } #[async_trait] impl ResettableStage for BatchQueue where - P: BatchQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: BatchQueueProvider + PreviousStage + Send + Debug, BF: L2ChainProvider + Send + Debug, { async fn reset(&mut self, base: BlockInfo, system_config: &SystemConfig) -> StageResult<()> { diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 04378b919b..078341fd4e 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -3,7 +3,7 @@ use crate::{ params::{ChannelID, MAX_CHANNEL_BANK_SIZE}, stages::ChannelReaderProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Channel, Frame, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque, sync::Arc}; @@ -37,7 +37,7 @@ pub trait ChannelBankProvider { #[derive(Debug)] pub struct ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { /// The rollup configuration. cfg: Arc, @@ -51,7 +51,7 @@ where impl

ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { /// Create a new [ChannelBank] stage. pub fn new(cfg: Arc, prev: P) -> Self { @@ -163,10 +163,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for ChannelBank

+where + P: ChannelBankProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

ChannelReaderProvider for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + Send + Debug, { async fn next_data(&mut self) -> StageResult> { match self.read() { @@ -188,7 +198,7 @@ where impl

OriginProvider for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -197,19 +207,17 @@ where impl

PreviousStage for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelBankProvider + PreviousStage + Debug + Send, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } #[async_trait] impl

ResettableStage for ChannelBank

where - P: ChannelBankProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: ChannelBankProvider + PreviousStage + Send + Debug, { async fn reset( &mut self, diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 6efcae9e98..e24bd5c798 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -2,7 +2,7 @@ use crate::{ stages::BatchQueueProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{Batch, BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; @@ -27,7 +27,7 @@ pub trait ChannelReaderProvider { #[derive(Debug)] pub struct ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { /// The previous stage of the derivation pipeline. prev: P, @@ -39,7 +39,7 @@ where impl

ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { /// Create a new [ChannelReader] stage. pub fn new(prev: P, cfg: Arc) -> Self { @@ -62,10 +62,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for ChannelReader

+where + P: ChannelReaderProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

BatchQueueProvider for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: ChannelReaderProvider + PreviousStage + Send + Debug, { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { @@ -91,7 +101,7 @@ where impl

OriginProvider for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -101,7 +111,7 @@ where #[async_trait] impl

ResettableStage for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug + Send, + P: ChannelReaderProvider + PreviousStage + Debug + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { self.prev.reset(base, cfg).await?; @@ -112,12 +122,10 @@ where impl

PreviousStage for ChannelReader

where - P: ChannelReaderProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: ChannelReaderProvider + PreviousStage + Send + Debug, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } diff --git a/crates/derive/src/stages/frame_queue.rs b/crates/derive/src/stages/frame_queue.rs index 4d1f42eaa3..07586ffcb6 100644 --- a/crates/derive/src/stages/frame_queue.rs +++ b/crates/derive/src/stages/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, collections::VecDeque}; @@ -31,7 +31,7 @@ pub trait FrameQueueProvider { #[derive(Debug)] pub struct FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { /// The previous stage in the pipeline. pub prev: P, @@ -41,7 +41,7 @@ where impl

FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { /// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage. /// @@ -51,10 +51,20 @@ where } } +#[async_trait] +impl

OriginAdvancer for FrameQueue

+where + P: FrameQueueProvider + PreviousStage + Send + Debug, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl

ChannelBankProvider for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { async fn next_frame(&mut self) -> StageResult { if self.queue.is_empty() { @@ -87,7 +97,7 @@ where impl

OriginProvider for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Debug, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -96,19 +106,17 @@ where impl

PreviousStage for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } #[async_trait] impl

ResettableStage for FrameQueue

where - P: FrameQueueProvider + PreviousStage + ResettableStage + OriginProvider + Send + Debug, + P: FrameQueueProvider + PreviousStage + Send + Debug, { async fn reset( &mut self, diff --git a/crates/derive/src/stages/l1_retrieval.rs b/crates/derive/src/stages/l1_retrieval.rs index 74ab4684da..801302e442 100644 --- a/crates/derive/src/stages/l1_retrieval.rs +++ b/crates/derive/src/stages/l1_retrieval.rs @@ -3,7 +3,8 @@ use crate::{ stages::FrameQueueProvider, traits::{ - AsyncIterator, DataAvailabilityProvider, OriginProvider, PreviousStage, ResettableStage, + AsyncIterator, DataAvailabilityProvider, OriginAdvancer, OriginProvider, PreviousStage, + ResettableStage, }, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; @@ -39,7 +40,7 @@ pub trait L1RetrievalProvider { pub struct L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { /// The previous stage in the pipeline. pub prev: P, @@ -52,7 +53,7 @@ where impl L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { /// Creates a new [L1Retrieval] stage with the previous [L1Traversal] stage and given /// [DataAvailabilityProvider]. @@ -63,11 +64,22 @@ where } } +#[async_trait] +impl OriginAdvancer for L1Retrieval +where + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, +{ + async fn advance_origin(&mut self) -> StageResult<()> { + self.prev.advance_origin().await + } +} + #[async_trait] impl FrameQueueProvider for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { type Item = DAP::Item; @@ -96,7 +108,7 @@ where impl OriginProvider for L1Retrieval where DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + P: L1RetrievalProvider + PreviousStage, { fn origin(&self) -> Option<&BlockInfo> { self.prev.origin() @@ -105,13 +117,11 @@ where impl PreviousStage for L1Retrieval where - DAP: DataAvailabilityProvider, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider, + DAP: DataAvailabilityProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { - type Previous = P; - - fn previous(&self) -> Option<&Self::Previous> { - Some(&self.prev) + fn previous(&self) -> Option> { + Some(Box::new(&self.prev)) } } @@ -119,7 +129,7 @@ where impl ResettableStage for L1Retrieval where DAP: DataAvailabilityProvider + Send, - P: L1RetrievalProvider + PreviousStage + ResettableStage + OriginProvider + Send, + P: L1RetrievalProvider + PreviousStage + Send, { async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> { self.prev.reset(base, cfg).await?; diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 80de560678..65c9751084 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -2,22 +2,14 @@ use crate::{ stages::L1RetrievalProvider, - traits::{ChainProvider, OriginProvider, PreviousStage, ResettableStage}, + traits::{ChainProvider, OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; -use spin::Mutex; use tracing::warn; -/// Defines a trait for advancing the L1 block in the [L1Traversal] stage. -#[async_trait] -pub trait L1BlockAdvance { - /// Advances the internal state of the [L1Traversal] stage to the next L1 block. - async fn advance_l1_block(&mut self) -> StageResult<()>; -} - /// The [L1Traversal] stage of the derivation pipeline. /// /// This stage sits at the bottom of the pipeline, holding a handle to the data source @@ -33,8 +25,6 @@ pub struct L1Traversal { data_source: Provider, /// Signals whether or not the traversal stage is complete. done: bool, - /// Marks if the L1Traversal stage should attempt to advance to the next block. - pub advance: Arc>, /// The system config. pub system_config: SystemConfig, /// A reference to the rollup config. @@ -48,16 +38,6 @@ impl L1RetrievalProvider for L1Traversal { } async fn next_l1_block(&mut self) -> StageResult> { - // let advance = match { - // Ok(advance) => advance, - // Err(_) => return Err(StageError::Custom(anyhow::anyhow!("Failed to lock advance - // mutex"))), }; - let mut advance = self.advance.lock(); - if *advance { - *advance = false; - drop(advance); - self.advance_l1_block().await?; - } if !self.done { self.done = true; Ok(self.block) @@ -69,12 +49,11 @@ impl L1RetrievalProvider for L1Traversal { impl L1Traversal { /// Creates a new [L1Traversal] instance. - pub fn new(data_source: F, advance: Arc>, cfg: Arc) -> Self { + pub fn new(data_source: F, cfg: Arc) -> Self { Self { block: Some(BlockInfo::default()), data_source, done: false, - advance, system_config: SystemConfig::default(), rollup_config: cfg, } @@ -87,11 +66,11 @@ impl L1Traversal { } #[async_trait] -impl L1BlockAdvance for L1Traversal { +impl OriginAdvancer for L1Traversal { /// Advances the internal state of the [L1Traversal] stage to the next L1 block. /// This function fetches the next L1 [BlockInfo] from the data source and updates the /// [SystemConfig] with the receipts from the block. - async fn advance_l1_block(&mut self) -> StageResult<()> { + async fn advance_origin(&mut self) -> StageResult<()> { // Pull the next block or return EOF. // StageError::EOF has special handling further up the pipeline. let block = match self.block { @@ -138,9 +117,7 @@ impl OriginProvider for L1Traversal { } impl PreviousStage for L1Traversal { - type Previous = L1Traversal; - - fn previous(&self) -> Option<&Self::Previous> { + fn previous(&self) -> Option> { None } } @@ -212,7 +189,7 @@ pub(crate) mod tests { let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default(); provider.insert_receipts(hash, vec![receipt.clone()]); } - L1Traversal::new(provider, Arc::new(Mutex::new(false)), Arc::new(rollup_config)) + L1Traversal::new(provider, Arc::new(rollup_config)) } pub(crate) fn new_populated_test_traversal() -> L1Traversal { @@ -228,7 +205,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(blocks, receipts); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); } #[tokio::test] @@ -237,7 +214,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(blocks, vec![]); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_)); + matches!(traversal.advance_origin().await.unwrap_err(), StageError::ReceiptFetch(_)); } #[tokio::test] @@ -247,8 +224,8 @@ pub(crate) mod tests { let blocks = vec![block, block]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert!(traversal.advance_l1_block().await.is_ok()); - let err = traversal.advance_l1_block().await.unwrap_err(); + assert!(traversal.advance_origin().await.is_ok()); + let err = traversal.advance_origin().await.unwrap_err(); assert_eq!(err, StageError::ReorgDetected(block.hash, block.parent_hash)); } @@ -257,7 +234,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(vec![], vec![]); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_)); + matches!(traversal.advance_origin().await.unwrap_err(), StageError::BlockInfoFetch(_)); } #[tokio::test] @@ -269,10 +246,10 @@ pub(crate) mod tests { let blocks = vec![block1, block2]; let receipts = new_receipts(); let mut traversal = new_test_traversal(blocks, receipts); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); // Only the second block should fail since the second receipt // contains invalid logs that will error for a system config update. - let err = traversal.advance_l1_block().await.unwrap_err(); + let err = traversal.advance_origin().await.unwrap_err(); matches!(err, StageError::SystemConfigUpdate(_)); } @@ -283,7 +260,7 @@ pub(crate) mod tests { let mut traversal = new_test_traversal(blocks, receipts); assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default())); assert_eq!(traversal.next_l1_block().await.unwrap_err(), StageError::Eof); - assert!(traversal.advance_l1_block().await.is_ok()); + assert!(traversal.advance_origin().await.is_ok()); let expected = address!("000000000000000000000000000000000000bEEF"); assert_eq!(traversal.system_config.batcher_addr, expected); } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 355793fa40..2034cf5d76 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -14,7 +14,7 @@ //! 8. (Omitted) Engine Queue mod l1_traversal; -pub use l1_traversal::{L1BlockAdvance, L1Traversal}; +pub use l1_traversal::L1Traversal; mod l1_retrieval; pub use l1_retrieval::{L1Retrieval, L1RetrievalProvider}; diff --git a/crates/derive/src/stages/test_utils/attributes_queue.rs b/crates/derive/src/stages/test_utils/attributes_queue.rs index 591caad2cc..1e689ecbff 100644 --- a/crates/derive/src/stages/test_utils/attributes_queue.rs +++ b/crates/derive/src/stages/test_utils/attributes_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::attributes_queue::{AttributesBuilder, AttributesProvider}, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{ BlockID, BlockInfo, BuilderError, L2BlockInfo, L2PayloadAttributes, SingleBatch, StageError, StageResult, SystemConfig, @@ -49,6 +49,13 @@ impl OriginProvider for MockAttributesProvider { } } +#[async_trait] +impl OriginAdvancer for MockAttributesProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ResettableStage for MockAttributesProvider { async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { @@ -57,10 +64,8 @@ impl ResettableStage for MockAttributesProvider { } impl PreviousStage for MockAttributesProvider { - type Previous = MockAttributesProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/batch_queue.rs b/crates/derive/src/stages/test_utils/batch_queue.rs index abae762f7d..896add5ee5 100644 --- a/crates/derive/src/stages/test_utils/batch_queue.rs +++ b/crates/derive/src/stages/test_utils/batch_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::batch_queue::BatchQueueProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{Batch, BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -37,6 +37,13 @@ impl BatchQueueProvider for MockBatchQueueProvider { } } +#[async_trait] +impl OriginAdvancer for MockBatchQueueProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ResettableStage for MockBatchQueueProvider { async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> StageResult<()> { @@ -45,9 +52,7 @@ impl ResettableStage for MockBatchQueueProvider { } impl PreviousStage for MockBatchQueueProvider { - type Previous = MockBatchQueueProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/channel_bank.rs b/crates/derive/src/stages/test_utils/channel_bank.rs index 9e5400386b..9da6717abc 100644 --- a/crates/derive/src/stages/test_utils/channel_bank.rs +++ b/crates/derive/src/stages/test_utils/channel_bank.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelBankProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, Frame, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -30,6 +30,13 @@ impl OriginProvider for MockChannelBankProvider { } } +#[async_trait] +impl OriginAdvancer for MockChannelBankProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ChannelBankProvider for MockChannelBankProvider { async fn next_frame(&mut self) -> StageResult { @@ -45,9 +52,7 @@ impl ResettableStage for MockChannelBankProvider { } impl PreviousStage for MockChannelBankProvider { - type Previous = MockChannelBankProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs index d1b531f34e..d43a4e8309 100644 --- a/crates/derive/src/stages/test_utils/channel_reader.rs +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -2,7 +2,7 @@ use crate::{ stages::ChannelReaderProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -31,6 +31,13 @@ impl OriginProvider for MockChannelReaderProvider { } } +#[async_trait] +impl OriginAdvancer for MockChannelReaderProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl ChannelReaderProvider for MockChannelReaderProvider { async fn next_data(&mut self) -> StageResult> { @@ -46,9 +53,7 @@ impl ResettableStage for MockChannelReaderProvider { } impl PreviousStage for MockChannelReaderProvider { - type Previous = MockChannelReaderProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/stages/test_utils/frame_queue.rs b/crates/derive/src/stages/test_utils/frame_queue.rs index 491bf2a216..8349cfbc9e 100644 --- a/crates/derive/src/stages/test_utils/frame_queue.rs +++ b/crates/derive/src/stages/test_utils/frame_queue.rs @@ -2,7 +2,7 @@ use crate::{ stages::FrameQueueProvider, - traits::{OriginProvider, PreviousStage, ResettableStage}, + traits::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}, types::{BlockInfo, StageError, StageResult, SystemConfig}, }; use alloc::{boxed::Box, vec::Vec}; @@ -29,6 +29,13 @@ impl OriginProvider for MockFrameQueueProvider { } } +#[async_trait] +impl OriginAdvancer for MockFrameQueueProvider { + async fn advance_origin(&mut self) -> StageResult<()> { + Ok(()) + } +} + #[async_trait] impl FrameQueueProvider for MockFrameQueueProvider { type Item = Bytes; @@ -46,9 +53,7 @@ impl ResettableStage for MockFrameQueueProvider { } impl PreviousStage for MockFrameQueueProvider { - type Previous = MockFrameQueueProvider; - - fn previous(&self) -> Option<&Self::Previous> { - Some(self) + fn previous(&self) -> Option> { + Some(Box::new(self)) } } diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index 8ee12b532d..f3ff4f5473 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -5,7 +5,7 @@ mod data_sources; pub use data_sources::*; mod stages; -pub use stages::{OriginProvider, PreviousStage, ResettableStage}; +pub use stages::{OriginAdvancer, OriginProvider, PreviousStage, ResettableStage}; mod ecrecover; pub use ecrecover::SignedRecoverable; diff --git a/crates/derive/src/traits/stages.rs b/crates/derive/src/traits/stages.rs index 18ecc6cc96..3776f9a19e 100644 --- a/crates/derive/src/traits/stages.rs +++ b/crates/derive/src/traits/stages.rs @@ -17,11 +17,16 @@ pub trait OriginProvider { fn origin(&self) -> Option<&BlockInfo>; } -/// Provides a method for accessing a previous stage. -pub trait PreviousStage { - /// The previous stage. - type Previous: ResettableStage + PreviousStage; +/// Defines a trait for advancing the L1 origin of the pipeline. +#[async_trait] +pub trait OriginAdvancer { + /// Advances the internal state of the lowest stage to the next l1 origin. + /// This method is the equivalent of the reference implementation `advance_l1_block`. + async fn advance_origin(&mut self) -> StageResult<()>; +} +/// Provides a method for accessing a previous stage. +pub trait PreviousStage: ResettableStage + OriginAdvancer + OriginProvider { /// Returns the previous stage. - fn previous(&self) -> Option<&Self::Previous>; + fn previous(&self) -> Option>; }