Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,15 @@ mod tests {
use super::*;
use crate::{
stages::{
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval,
l1_traversal::tests::new_test_traversal,
frame_queue::tests::new_test_frames, l1_retrieval::L1Retrieval, l1_traversal::tests::*,
},
traits::test_utils::TestDAP,
};
use alloc::vec;

#[test]
fn test_ingest_empty_origin() {
let mut traversal = new_test_traversal(false, false);
let mut traversal = new_test_traversal(vec![], vec![]);
traversal.block = None;
let dap = TestDAP::default();
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -218,7 +217,7 @@ mod tests {

#[test]
fn test_ingest_and_prune_channel_bank() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand Down Expand Up @@ -246,7 +245,7 @@ mod tests {

#[tokio::test]
async fn test_read_empty_channel_bank() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand Down
14 changes: 7 additions & 7 deletions crates/derive/src/stages/frame_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ where
pub(crate) mod tests {
use super::*;
use crate::{
stages::l1_traversal::tests::new_test_traversal, traits::test_utils::TestDAP,
stages::l1_traversal::tests::new_populated_test_traversal, traits::test_utils::TestDAP,
DERIVATION_VERSION_0,
};
use alloc::{vec, vec::Vec};
Expand Down Expand Up @@ -108,7 +108,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_empty_bytes() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -119,7 +119,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_no_frames_decoded() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -130,7 +130,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_wrong_derivation_version() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x01]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -141,7 +141,7 @@ pub(crate) mod tests {

#[tokio::test]
async fn test_frame_queue_frame_too_short() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Ok(Bytes::from(vec![0x00, 0x01]))];
let dap = TestDAP { results };
let retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -153,7 +153,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_frame_queue_single_frame() {
let data = new_encoded_test_frames(1);
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![Ok(data)] };
let retrieval = L1Retrieval::new(traversal, dap);
let mut frame_queue = FrameQueue::new(retrieval);
Expand All @@ -167,7 +167,7 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_frame_queue_multiple_frames() {
let data = new_encoded_test_frames(3);
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![Ok(data)] };
let retrieval = L1Retrieval::new(traversal, dap);
let mut frame_queue = FrameQueue::new(retrieval);
Expand Down
10 changes: 5 additions & 5 deletions crates/derive/src/stages/l1_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ where
mod tests {
use super::*;
use crate::{
stages::l1_traversal::tests::new_test_traversal,
stages::l1_traversal::tests::*,
traits::test_utils::{TestDAP, TestIter},
};
use alloc::vec;
use alloy_primitives::Address;

#[tokio::test]
async fn test_l1_retrieval_origin() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let retrieval = L1Retrieval::new(traversal, dap);
let expected = BlockInfo::default();
Expand All @@ -99,7 +99,7 @@ mod tests {

#[tokio::test]
async fn test_l1_retrieval_next_data() {
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let results = vec![Err(StageError::Eof), Ok(Bytes::default())];
let dap = TestDAP { results };
let mut retrieval = L1Retrieval::new(traversal, dap);
Expand All @@ -126,7 +126,7 @@ mod tests {
// Create a new traversal with no blocks or receipts.
// This would bubble up an error if the prev stage
// (traversal) is called in the retrieval stage.
let traversal = new_test_traversal(false, false);
let traversal = new_test_traversal(vec![], vec![]);
let dap = TestDAP { results: vec![] };
let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) };
let data = retrieval.next_data().await.unwrap();
Expand All @@ -142,7 +142,7 @@ mod tests {
open_data_calls: vec![(BlockInfo::default(), Address::default())],
results: vec![Err(StageError::Eof)],
};
let traversal = new_test_traversal(true, true);
let traversal = new_populated_test_traversal();
let dap = TestDAP { results: vec![] };
let mut retrieval = L1Retrieval { prev: traversal, provider: dap, data: Some(data) };
let data = retrieval.next_data().await.unwrap_err();
Expand Down
149 changes: 101 additions & 48 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
//! Contains the L1 traversal stage of the derivation pipeline.
//! Contains the [L1Traversal] stage of the derivation pipeline.

use crate::{
traits::{ChainProvider, ResettableStage},
types::{BlockInfo, RollupConfig, StageError, StageResult, SystemConfig},
};
use alloc::{boxed::Box, sync::Arc};
use anyhow::anyhow;
use async_trait::async_trait;

/// The L1 traversal stage of the derivation pipeline.
/// The [L1Traversal] stage of the derivation pipeline.
///
/// This stage sits at the bottom of the pipeline, holding a handle to the data source
/// (a [ChainProvider] implementation) and the current L1 [BlockInfo] in the pipeline,
/// which are used to traverse the L1 chain. When the [L1Traversal] stage is advanced,
/// it fetches the next L1 [BlockInfo] from the data source and updates the [SystemConfig]
/// with the receipts from the block.
#[derive(Debug, Clone)]
pub struct L1Traversal<Provider: ChainProvider> {
/// The current block in the traversal stage.
pub(crate) block: Option<BlockInfo>,
/// The data source for the traversal stage.
data_source: Provider,
/// Signals whether or not the traversal stage has been completed.
/// Signals whether or not the traversal stage is complete.
done: bool,
/// The system config
/// The system config.
pub system_config: SystemConfig,
/// The rollup config
/// A reference to the rollup config.
pub rollup_config: Arc<RollupConfig>,
}

Expand All @@ -40,9 +45,10 @@ impl<F: ChainProvider> L1Traversal<F> {
&self.data_source
}

/// Returns the next L1 block in the traversal stage, if the stage has not been completed.
/// This function can only be called once, and will return `None` on subsequent calls
/// unless the stage is reset.
/// Returns the next L1 [BlockInfo] in the [L1Traversal] stage, if the stage is not complete.
/// This function can only be called once while the stage is in progress, and will return
/// [`None`] on subsequent calls unless the stage is reset or complete. If the stage is
/// complete and the [BlockInfo] has been consumed, an [StageError::Eof] error is returned.
pub fn next_l1_block(&mut self) -> StageResult<Option<BlockInfo>> {
if !self.done {
self.done = true;
Expand All @@ -52,35 +58,41 @@ impl<F: ChainProvider> L1Traversal<F> {
}
}

/// Returns the current L1 block in the traversal stage, if it exists.
/// Returns the current L1 [BlockInfo] in the [L1Traversal] stage, if it exists.
pub fn origin(&self) -> Option<&BlockInfo> {
self.block.as_ref()
}

/// Advances the internal state of the [L1Traversal] stage to the next L1 block.
/// This function fetches the next L1 [BlockInfo] from the data source and updates the
/// [SystemConfig] with the receipts from the block.
pub async fn advance_l1_block(&mut self) -> StageResult<()> {
// Pull the next block or return EOF which has special
// handling further up the pipeline.
// Pull the next block or return EOF.
// StageError::EOF has special handling further up the pipeline.
let block = self.block.ok_or(StageError::Eof)?;
let next_l1_origin = self.data_source.block_info_by_number(block.number + 1).await?;
let next_l1_origin = match self.data_source.block_info_by_number(block.number + 1).await {
Ok(block) => block,
Err(e) => return Err(StageError::BlockInfoFetch(e)),
};

// Check for reorgs
// Check block hashes for reorgs.
if block.hash != next_l1_origin.parent_hash {
return Err(anyhow!(
"Detected L1 reorg from {} to {} with conflicting parent",
block.hash,
next_l1_origin.hash
)
.into());
return Err(StageError::ReorgDetected(block.hash, next_l1_origin.parent_hash));
}

// Fetch receipts.
let receipts = self.data_source.receipts_by_hash(next_l1_origin.hash).await?;
self.system_config.update_with_receipts(
// Fetch receipts for the next l1 block and update the system config.
let receipts = match self.data_source.receipts_by_hash(next_l1_origin.hash).await {
Ok(receipts) => receipts,
Err(e) => return Err(StageError::ReceiptFetch(e)),
};

if let Err(e) = self.system_config.update_with_receipts(
receipts.as_slice(),
&self.rollup_config,
next_l1_origin.timestamp,
)?;
) {
return Err(StageError::SystemConfigUpdate(e));
}

self.block = Some(next_l1_origin);
self.done = false;
Expand Down Expand Up @@ -126,62 +138,103 @@ pub(crate) mod tests {
}
}

pub(crate) fn new_receipts() -> alloc::vec::Vec<Receipt> {
let mut receipt = Receipt { success: true, ..Receipt::default() };
let bad = Log::new(
Address::from([2; 20]),
vec![CONFIG_UPDATE_TOPIC, B256::default()],
Bytes::default(),
)
.unwrap();
receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()];
vec![receipt.clone(), Receipt::default(), receipt]
}

pub(crate) fn new_test_traversal(
blocks: bool,
receipts: bool,
blocks: alloc::vec::Vec<BlockInfo>,
receipts: alloc::vec::Vec<Receipt>,
) -> L1Traversal<TestChainProvider> {
let mut provider = TestChainProvider::default();
let rollup_config = RollupConfig {
l1_system_config_address: L1_SYS_CONFIG_ADDR,
..RollupConfig::default()
};
let block = BlockInfo::default();
if blocks {
provider.insert_block(0, block);
provider.insert_block(1, block);
for (i, block) in blocks.iter().enumerate() {
provider.insert_block(i as u64, *block);
}
if receipts {
let mut receipt = Receipt { success: true, ..Receipt::default() };
let bad = Log::new(
Address::from([2; 20]),
vec![CONFIG_UPDATE_TOPIC, B256::default()],
Bytes::default(),
)
.unwrap();
receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()];
let receipts = vec![receipt.clone(), Receipt::default(), receipt];
provider.insert_receipts(block.hash, receipts);
for (i, receipt) in receipts.iter().enumerate() {
let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default();
provider.insert_receipts(hash, vec![receipt.clone()]);
}
L1Traversal::new(provider, rollup_config)
}

pub(crate) fn new_populated_test_traversal() -> L1Traversal<TestChainProvider> {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
new_test_traversal(blocks, receipts)
}

#[tokio::test]
async fn test_l1_traversal() {
let mut traversal = new_test_traversal(true, true);
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
assert!(traversal.advance_l1_block().await.is_ok());
}

#[tokio::test]
async fn test_l1_traversal_missing_receipts() {
let mut traversal = new_test_traversal(true, false);
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let mut traversal = new_test_traversal(blocks, vec![]);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::Custom(_));
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::ReceiptFetch(_));
}

#[tokio::test]
async fn test_l1_traversal_reorgs() {
let hash = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let block = BlockInfo { hash, ..BlockInfo::default() };
let blocks = vec![block, block];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert!(traversal.advance_l1_block().await.is_ok());
let err = traversal.advance_l1_block().await.unwrap_err();
assert_eq!(err, StageError::ReorgDetected(block.hash, block.parent_hash));
}

#[tokio::test]
async fn test_l1_traversal_missing_blocks() {
let mut traversal = new_test_traversal(false, false);
let mut traversal = new_test_traversal(vec![], vec![]);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::Custom(_));
matches!(traversal.advance_l1_block().await.unwrap_err(), StageError::BlockInfoFetch(_));
}

#[tokio::test]
async fn test_l1_traversal_system_config_update_fails() {
let first = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let second = b256!("4444444444444444444444444444444444444444444444444444444444444444");
let block1 = BlockInfo { hash: first, ..BlockInfo::default() };
let block2 = BlockInfo { hash: second, ..BlockInfo::default() };
let blocks = vec![block1, block2];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert!(traversal.advance_l1_block().await.is_ok());
// Only the second block should fail since the second receipt
// contains invalid logs that will error for a system config update.
let err = traversal.advance_l1_block().await.unwrap_err();
matches!(err, StageError::SystemConfigUpdate(_));
}

#[tokio::test]
async fn test_system_config_updated() {
let mut traversal = new_test_traversal(true, true);
async fn test_l1_traversal_system_config_updated() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_traversal(blocks, receipts);
assert_eq!(traversal.next_l1_block().unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().unwrap_err(), StageError::Eof);
assert!(traversal.advance_l1_block().await.is_ok());
Expand Down
Loading