Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ anyhow.workspace = true
alloy-primitives = { version = "0.7.0", default-features = false, features = ["rlp"] }
alloy-rlp = { version = "0.3.4", default-features = false, features = ["derive"] }
alloy-sol-types = { version = "0.7.0", default-features = false }
alloy-consensus = { git = "https://github.com/clabby/alloy", branch = "cl/alloy-consensus-no-std", default-features = false }
alloy-eips = { git = "https://github.com/clabby/alloy", branch = "cl/alloy-consensus-no-std", default-features = false }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", default-features = false }
async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
Expand Down
30 changes: 30 additions & 0 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::{ChainProvider, TelemetryProvider};
use types::RollupConfig;

mod params;
pub use params::{
ChannelID, CHANNEL_ID_LENGTH, CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC,
Expand All @@ -21,3 +26,28 @@ pub mod types;
/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P, T>(
_rollup_config: Arc<RollupConfig>,
_chain_provider: P,
_telemetry: Arc<T>,
) -> Self
where
P: ChainProvider + Clone + Debug + Send,
T: TelemetryProvider + Clone + Debug + Send + Sync,
{
// let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone(),
// telemetry.clone()); let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source,
// telemetry.clone()); let frame_queue = FrameQueue::new(l1_retrieval,
// telemetry.clone()); let channel_bank = ChannelBank::new(rollup_config.clone(),
// frame_queue, telemetry.clone()); let channel_reader =
// ChannelReader::new(channel_bank, telemetry.clone()); let batch_queue =
// BatchQueue::new(rollup_config.clone(), channel_reader, telemetry.clone(), fetcher);
// let attributes_queue = AttributesQueue::new(rollup_config.clone(), batch_queue,
// telemetry.clone(), builder);

unimplemented!()
}
}
1 change: 1 addition & 0 deletions crates/derive/src/sources/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ where
(tx.to(), tx.input.clone(), Some(tx.blob_versioned_hashes.clone()))
}
},
_ => continue,
};
let TxKind::Call(to) = tx_kind else { continue };

Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/sources/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
B: BlobProvider + Clone + Debug,
{
/// Creates a new factory.
pub fn new(provider: C, blobs: B, cfg: RollupConfig) -> Self {
pub fn new(provider: C, blobs: B, cfg: &RollupConfig) -> Self {
Self {
chain_provider: provider,
blob_provider: blobs,
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where
T: TelemetryProvider + Send + Debug,
AB: AttributesBuilder + Send + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.telemetry.write(Bytes::from("resetting attributes queue"), LogLevel::Info);
// TODO: metrice the reset using telemetry
// telemetry can provide a method of logging and metricing
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ where
BF: SafeBlockFetcher + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn reset(&mut self, base: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> {
// Copy over the Origin from the next stage.
// It is set in the engine queue (two stages away)
// such that the L2 Safe Head origin is the progress.
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where
P: ChannelBankProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Sync + Debug,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.channels.clear();
self.channel_queue = VecDeque::with_capacity(10);
Err(StageError::Eof)
Expand Down
27 changes: 13 additions & 14 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
//! This module contains the [FrameQueue] stage of the derivation pipeline.

use core::fmt::Debug;

use crate::{
stages::ChannelBankProvider,
traits::{LogLevel, OriginProvider, ResettableStage, TelemetryProvider},
types::{into_frames, BlockInfo, Frame, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, collections::VecDeque};
use alloc::{boxed::Box, collections::VecDeque, sync::Arc};
use alloy_primitives::Bytes;
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;

/// Provides data frames for the [FrameQueue] stage.
#[async_trait]
Expand All @@ -35,7 +34,7 @@ where
/// The previous stage in the pipeline.
pub prev: P,
/// Telemetry
pub telemetry: T,
pub telemetry: Arc<T>,
/// The current frame queue.
queue: VecDeque<Frame>,
}
Expand All @@ -46,7 +45,7 @@ where
T: TelemetryProvider + Debug,
{
/// Create a new [FrameQueue] stage with the given previous [L1Retrieval] stage.
pub fn new(prev: P, telemetry: T) -> Self {
pub fn new(prev: P, telemetry: Arc<T>) -> Self {
Self { prev, telemetry, queue: VecDeque::new() }
}
}
Expand All @@ -55,7 +54,7 @@ where
impl<P, T> ChannelBankProvider for FrameQueue<P, T>
where
P: FrameQueueProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn next_frame(&mut self) -> StageResult<Frame> {
if self.queue.is_empty() {
Expand Down Expand Up @@ -107,9 +106,9 @@ where
impl<P, T> ResettableStage for FrameQueue<P, T>
where
P: FrameQueueProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Debug + Sync,
{
async fn reset(&mut self, _: BlockInfo, _: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, _: BlockInfo, _: &SystemConfig) -> StageResult<()> {
self.queue = VecDeque::default();
Err(StageError::Eof)
}
Expand Down Expand Up @@ -147,7 +146,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_empty_bytes() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Ok(Bytes::from(vec![0x00]))];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -157,7 +156,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_no_frames_decoded() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Err(StageError::Eof), Ok(Bytes::default())];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -167,7 +166,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_wrong_derivation_version() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Ok(Bytes::from(vec![0x01]))];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -177,7 +176,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_frame_too_short() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = vec![Ok(Bytes::from(vec![0x00, 0x01]))];
let mock = MockFrameQueueProvider { data };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand All @@ -188,7 +187,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_frame_queue_single_frame() {
let data = new_encoded_test_frames(1);
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let mock = MockFrameQueueProvider { data: vec![Ok(data)] };
let mut frame_queue = FrameQueue::new(mock, telemetry);
let frame_decoded = frame_queue.next_frame().await.unwrap();
Expand All @@ -200,7 +199,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_multiple_frames() {
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let data = new_encoded_test_frames(3);
let mock = MockFrameQueueProvider { data: vec![Ok(data)] };
let mut frame_queue = FrameQueue::new(mock, telemetry);
Expand Down
20 changes: 10 additions & 10 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
},
types::{BlockInfo, StageError, StageResult, SystemConfig},
};
use alloc::boxed::Box;
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Address;
use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -40,7 +40,7 @@ where
/// The previous stage in the pipeline.
pub prev: P,
/// Telemetry provider for the L1 retrieval stage.
pub telemetry: T,
pub telemetry: Arc<T>,
/// The data availability provider to use for the L1 retrieval stage.
pub provider: DAP,
/// The current data iterator.
Expand All @@ -55,7 +55,7 @@ where
{
/// Creates a new [L1Retrieval] stage with the previous [L1Traversal]
/// stage and given [DataAvailabilityProvider].
pub fn new(prev: P, provider: DAP, telemetry: T) -> Self {
pub fn new(prev: P, provider: DAP, telemetry: Arc<T>) -> Self {
Self { prev, telemetry, provider, data: None }
}
}
Expand All @@ -65,7 +65,7 @@ impl<DAP, P, T> FrameQueueProvider for L1Retrieval<DAP, P, T>
where
DAP: DataAvailabilityProvider + Send,
P: L1RetrievalProvider + OriginProvider + Send,
T: TelemetryProvider + Send,
T: TelemetryProvider + Send + Sync,
{
type Item = DAP::Item;

Expand Down Expand Up @@ -106,9 +106,9 @@ impl<DAP, P, T> ResettableStage for L1Retrieval<DAP, P, T>
where
DAP: DataAvailabilityProvider + Send,
P: L1RetrievalProvider + OriginProvider + Send,
T: TelemetryProvider + Send,
T: TelemetryProvider + Send + Sync,
{
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> {
self.data = Some(self.provider.open_data(&base, cfg.batcher_addr).await?);
Ok(())
}
Expand All @@ -129,7 +129,7 @@ mod tests {
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let telemetry = TestTelemetry::new();
let retrieval = L1Retrieval::new(traversal, dap, telemetry);
let retrieval = L1Retrieval::new(traversal, dap, Arc::new(telemetry));
let expected = BlockInfo::default();
assert_eq!(retrieval.origin(), Some(&expected));
}
Expand All @@ -140,7 +140,7 @@ mod tests {
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
let dap = TestDAP { results };
let telemetry = TestTelemetry::new();
let mut retrieval = L1Retrieval::new(traversal, dap, telemetry);
let mut retrieval = L1Retrieval::new(traversal, dap, Arc::new(telemetry));
assert_eq!(retrieval.data, None);
let data = retrieval.next_data().await.unwrap();
assert_eq!(data, Bytes::default());
Expand All @@ -164,7 +164,7 @@ mod tests {
// Create a new traversal with no blocks or receipts.
// This would bubble up an error if the prev stage
// (traversal) is called in the retrieval stage.
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let traversal = new_test_traversal(vec![], vec![]);
let dap = TestDAP { results: vec![] };
let mut retrieval =
Expand All @@ -182,7 +182,7 @@ mod tests {
open_data_calls: vec![(BlockInfo::default(), Address::default())],
results: vec![Err(StageError::Eof)],
};
let telemetry = TestTelemetry::new();
let telemetry = Arc::new(TestTelemetry::new());
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let mut retrieval =
Expand Down
14 changes: 8 additions & 6 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct L1Traversal<Provider: ChainProvider, Telemetry: TelemetryProvider> {
/// The data source for the traversal stage.
data_source: Provider,
/// The telemetry provider for the traversal stage.
telemetry: Telemetry,
telemetry: Arc<Telemetry>,
/// Signals whether or not the traversal stage is complete.
done: bool,
/// The system config.
Expand All @@ -49,7 +49,7 @@ impl<F: ChainProvider, T: TelemetryProvider> L1RetrievalProvider for L1Traversal

impl<F: ChainProvider, T: TelemetryProvider> L1Traversal<F, T> {
/// Creates a new [L1Traversal] instance.
pub fn new(data_source: F, cfg: Arc<RollupConfig>, telemetry: T) -> Self {
pub fn new(data_source: F, cfg: Arc<RollupConfig>, telemetry: Arc<T>) -> Self {
Self {
block: Some(BlockInfo::default()),
data_source,
Expand Down Expand Up @@ -118,11 +118,13 @@ impl<F: ChainProvider, T: TelemetryProvider> OriginProvider for L1Traversal<F, T
}

#[async_trait]
impl<F: ChainProvider + Send, T: TelemetryProvider + Send> ResettableStage for L1Traversal<F, T> {
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()> {
impl<F: ChainProvider + Send, T: TelemetryProvider + Send + Sync> ResettableStage
for L1Traversal<F, T>
{
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()> {
self.block = Some(base);
self.done = false;
self.system_config = cfg;
self.system_config = *cfg;
Err(StageError::Eof)
}
}
Expand Down Expand Up @@ -173,7 +175,7 @@ pub(crate) mod tests {
receipts: alloc::vec::Vec<Receipt>,
) -> L1Traversal<TestChainProvider, TestTelemetry> {
let mut provider = TestChainProvider::default();
let telemetry = TestTelemetry::default();
let telemetry = Arc::new(TestTelemetry::default());
let rollup_config = RollupConfig {
l1_system_config_address: L1_SYS_CONFIG_ADDR,
..RollupConfig::default()
Expand Down
1 change: 1 addition & 0 deletions crates/derive/src/traits/ecrecover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl SignedRecoverable for TxEnvelope {
TxEnvelope::Eip4844(signed_tx) => {
recover_public_key(*signed_tx.signature(), &signed_tx.signature_hash())
}
_ => unreachable!("Impossible case"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/stages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ use async_trait::async_trait;
#[async_trait]
pub trait ResettableStage {
/// Resets the derivation stage to its initial state.
async fn reset(&mut self, base: BlockInfo, cfg: SystemConfig) -> StageResult<()>;
async fn reset(&mut self, base: BlockInfo, cfg: &SystemConfig) -> StageResult<()>;
}
2 changes: 1 addition & 1 deletion crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub use validity::BatchValidity;

mod span_batch;
pub use span_batch::{
RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchBuilder, SpanBatchEip1559TransactionData,
RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchEip1559TransactionData,
SpanBatchEip2930TransactionData, SpanBatchElement, SpanBatchError,
SpanBatchLegacyTransactionData, SpanBatchPayload, SpanBatchPrefix, SpanBatchTransactionData,
SpanBatchTransactions, SpanDecodingError, MAX_SPAN_BATCH_SIZE,
Expand Down
Loading