Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,606 changes: 1,561 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ async-trait = "0.1.77"
hashbrown = "0.14.3"
unsigned-varint = "0.8.0"
miniz_oxide = { version = "0.7.2" }
lru = "0.12.3"

# Optional
# `serde` feature dependencies
serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true }

# `online` feature dependencies
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", optional = true}
alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "e3f2f07", optional = true }
reqwest = { version = "0.12", default-features = false, optional = true }

[dev-dependencies]
tokio = { version = "1.36", features = ["full"] }
proptest = "1.4.0"
Expand All @@ -37,3 +43,9 @@ tracing-subscriber = "0.3.18"
default = ["serde", "k256"]
serde = ["dep:serde", "alloy-primitives/serde"]
k256 = ["alloy-primitives/k256", "alloy-consensus/k256"]
online = [
"dep:alloy-provider",
"dep:alloy-transport-http",
"dep:reqwest",
"alloy-consensus/serde"
]
195 changes: 195 additions & 0 deletions crates/derive/src/alloy_providers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! This module contains concrete implementations of the data provider traits, using an alloy
//! provider on the backend.

use crate::{
traits::{ChainProvider, L2ChainProvider},
types::{Block, BlockInfo, ExecutionPayloadEnvelope, L2BlockInfo, RollupConfig},
};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_consensus::{Header, Receipt, ReceiptWithBloom, TxEnvelope, TxType};
use alloy_primitives::{Bytes, B256, U64};
use alloy_provider::Provider;
use alloy_rlp::{Buf, Decodable};
use alloy_transport_http::Http;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use core::num::NonZeroUsize;
use lru::LruCache;

const CACHE_SIZE: usize = 16;

/// The [AlloyChainProvider] is a concrete implementation of the [ChainProvider] trait, providing
/// data over Ethereum JSON-RPC using an alloy provider as the backend.
///
/// **Note**:
/// This provider fetches data using the `debug_getRawHeader`, `debug_getRawReceipts`, and
/// `debug_getRawBlock` methods. The RPC must support this namespace.
#[derive(Debug)]
pub struct AlloyChainProvider<T: Provider<Http<reqwest::Client>>> {
/// The inner Ethereum JSON-RPC provider.
inner: T,
/// `block_info_by_number` LRU cache.
block_info_by_number_cache: LruCache<u64, BlockInfo>,
/// `block_info_by_number` LRU cache.
receipts_by_hash_cache: LruCache<B256, Vec<Receipt>>,
/// `block_info_and_transactions_by_hash` LRU cache.
block_info_and_transactions_by_hash_cache: LruCache<B256, (BlockInfo, Vec<TxEnvelope>)>,
}

impl<T: Provider<Http<reqwest::Client>>> AlloyChainProvider<T> {
/// Creates a new [AlloyChainProvider] with the given alloy provider.
pub fn new(inner: T) -> Self {
Self {
inner,
block_info_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
receipts_by_hash_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
block_info_and_transactions_by_hash_cache: LruCache::new(
NonZeroUsize::new(CACHE_SIZE).unwrap(),
),
}
}
}

#[async_trait]
impl<T: Provider<Http<reqwest::Client>>> ChainProvider for AlloyChainProvider<T> {
async fn block_info_by_number(&mut self, number: u64) -> Result<BlockInfo> {
if let Some(block_info) = self.block_info_by_number_cache.get(&number) {
return Ok(*block_info);
}

let raw_header: Bytes = self
.inner
.client()
.request("debug_getRawHeader", [U64::from(number)])
.await
.map_err(|e| anyhow!(e))?;
let header = Header::decode(&mut raw_header.as_ref()).map_err(|e| anyhow!(e))?;

let block_info = BlockInfo {
hash: header.hash_slow(),
number,
parent_hash: header.parent_hash,
timestamp: header.timestamp,
};
self.block_info_by_number_cache.put(number, block_info);
Ok(block_info)
}

async fn receipts_by_hash(&mut self, hash: B256) -> Result<Vec<Receipt>> {
if let Some(receipts) = self.receipts_by_hash_cache.get(&hash) {
return Ok(receipts.clone());
}

let raw_receipts: Vec<Bytes> = self
.inner
.client()
.request("debug_getRawReceipts", [hash])
.await
.map_err(|e| anyhow!(e))?;

let receipts = raw_receipts
.iter()
.map(|r| {
let r = &mut r.as_ref();

// Skip the transaction type byte if it exists
if !r.is_empty() && r[0] <= TxType::Eip4844 as u8 {
r.advance(1);
}

Ok(ReceiptWithBloom::decode(r).map_err(|e| anyhow!(e))?.receipt)
})
.collect::<Result<Vec<_>>>()?;
self.receipts_by_hash_cache.put(hash, receipts.clone());
Ok(receipts)
}

async fn block_info_and_transactions_by_hash(
&mut self,
hash: B256,
) -> Result<(BlockInfo, Vec<TxEnvelope>)> {
if let Some(block_info_and_txs) = self.block_info_and_transactions_by_hash_cache.get(&hash)
{
return Ok(block_info_and_txs.clone());
}

let raw_block: Bytes = self
.inner
.client()
.request("debug_getRawBlock", [hash])
.await
.map_err(|e| anyhow!(e))?;
let block = Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;

let block_info = BlockInfo {
hash: block.header.hash_slow(),
number: block.header.number,
parent_hash: block.header.parent_hash,
timestamp: block.header.timestamp,
};
self.block_info_and_transactions_by_hash_cache.put(hash, (block_info, block.body.clone()));
Ok((block_info, block.body))
}
}

/// The [AlloyL2SafeHeadProvider] is a concrete implementation of the [L2ChainProvider] trait,
/// providing data over Ethereum JSON-RPC using an alloy provider as the backend.
///
/// **Note**:
/// This provider fetches data using the `debug_getRawBlock` method. The RPC must support this
/// namespace.
#[derive(Debug)]
pub struct AlloyL2SafeHeadProvider<T: Provider<Http<reqwest::Client>>> {
/// The inner Ethereum JSON-RPC provider.
inner: T,
/// The rollup configuration.
rollup_config: Arc<RollupConfig>,
/// `payload_by_number` LRU cache.
payload_by_number_cache: LruCache<u64, ExecutionPayloadEnvelope>,
/// `l2_block_info_by_number` LRU cache.
l2_block_info_by_number_cache: LruCache<u64, L2BlockInfo>,
}

impl<T: Provider<Http<reqwest::Client>>> AlloyL2SafeHeadProvider<T> {
/// Creates a new [AlloyL2SafeHeadProvider] with the given alloy provider and [RollupConfig].
pub fn new(inner: T, rollup_config: Arc<RollupConfig>) -> Self {
Self {
inner,
rollup_config,
payload_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
l2_block_info_by_number_cache: LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap()),
}
}
}

#[async_trait]
impl<T: Provider<Http<reqwest::Client>>> L2ChainProvider for AlloyL2SafeHeadProvider<T> {
async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo> {
if let Some(l2_block_info) = self.l2_block_info_by_number_cache.get(&number) {
return Ok(*l2_block_info);
}

let payload = self.payload_by_number(number).await?;
let l2_block_info = payload.to_l2_block_ref(self.rollup_config.as_ref())?;
self.l2_block_info_by_number_cache.put(number, l2_block_info);
Ok(l2_block_info)
}

async fn payload_by_number(&mut self, number: u64) -> Result<ExecutionPayloadEnvelope> {
if let Some(payload) = self.payload_by_number_cache.get(&number) {
return Ok(payload.clone());
}

let raw_block: Bytes = self
.inner
.client()
.request("debug_getRawBlock", [U64::from(number)])
.await
.map_err(|e| anyhow!(e))?;
let block = Block::decode(&mut raw_block.as_ref()).map_err(|e| anyhow!(e))?;
let payload_envelope: ExecutionPayloadEnvelope = block.into();

self.payload_by_number_cache.put(number, payload_envelope.clone());
Ok(payload_envelope)
}
}
15 changes: 4 additions & 11 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod stages;
pub mod traits;
pub mod types;

#[cfg(feature = "online")]
pub mod alloy_providers;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;
Expand All @@ -33,16 +36,6 @@ impl DerivationPipeline {
where
P: ChainProvider + Clone + Debug + Send,
{
// let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone(),
// telemetry.clone()); let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source,
// telemetry.clone()); let frame_queue = FrameQueue::new(l1_retrieval,
// telemetry.clone()); let channel_bank = ChannelBank::new(rollup_config.clone(),
// frame_queue, telemetry.clone()); let channel_reader =
// ChannelReader::new(channel_bank, telemetry.clone()); let batch_queue =
// BatchQueue::new(rollup_config.clone(), channel_reader, telemetry.clone(), fetcher);
// let attributes_queue = AttributesQueue::new(rollup_config.clone(), batch_queue,
// telemetry.clone(), builder);

unimplemented!()
unimplemented!("TODO: High-level pipeline composition helper.")
}
}
12 changes: 6 additions & 6 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{
stages::attributes_queue::AttributesProvider,
traits::{OriginProvider, ResettableStage, SafeBlockFetcher},
traits::{L2ChainProvider, OriginProvider, ResettableStage},
types::{
Batch, BatchValidity, BatchWithInclusionBlock, BlockInfo, L2BlockInfo, RollupConfig,
SingleBatch, StageError, StageResult, SystemConfig,
Expand Down Expand Up @@ -42,7 +42,7 @@ pub trait BatchQueueProvider {
pub struct BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
{
/// The rollup config.
cfg: RollupConfig,
Expand Down Expand Up @@ -72,7 +72,7 @@ where
impl<P, BF> BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
{
/// Creates a new [BatchQueue] stage.
pub fn new(cfg: RollupConfig, prev: P, fetcher: BF) -> Self {
Expand Down Expand Up @@ -237,7 +237,7 @@ where
impl<P, BF> AttributesProvider for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
/// Returns the next valid batch upon the given safe head.
/// Also returns the boolean that indicates if the batch is the last block in the batch.
Expand Down Expand Up @@ -362,7 +362,7 @@ where
impl<P, BF> OriginProvider for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Debug,
BF: SafeBlockFetcher + Debug,
BF: L2ChainProvider + Debug,
{
fn origin(&self) -> Option<&BlockInfo> {
self.prev.origin()
Expand All @@ -373,7 +373,7 @@ where
impl<P, BF> ResettableStage for BatchQueue<P, BF>
where
P: BatchQueueProvider + OriginProvider + Send + Debug,
BF: SafeBlockFetcher + Send + Debug,
BF: L2ChainProvider + Send + Debug,
{
async fn reset(&mut self, base: BlockInfo, _: &SystemConfig) -> StageResult<()> {
// Copy over the Origin from the next stage.
Expand Down
12 changes: 3 additions & 9 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,9 @@ impl BatchReader {
}
}

impl From<&[u8]> for BatchReader {
fn from(data: &[u8]) -> Self {
Self { data: Some(data.to_vec()), decompressed: Vec::new(), cursor: 0 }
}
}

impl From<Vec<u8>> for BatchReader {
fn from(data: Vec<u8>) -> Self {
Self { data: Some(data), decompressed: Vec::new(), cursor: 0 }
impl<T: Into<Vec<u8>>> From<T> for BatchReader {
fn from(data: T) -> Self {
Self { data: Some(data.into()), decompressed: Vec::new(), cursor: 0 }
}
}

Expand Down
Loading