diff --git a/crates/derive/src/traits/test_utils.rs b/crates/derive/src/traits/test_utils.rs index ffc3d44653..371fc30a6b 100644 --- a/crates/derive/src/traits/test_utils.rs +++ b/crates/derive/src/traits/test_utils.rs @@ -67,7 +67,7 @@ impl DataAvailabilityProvider for TestDAP { #[derive(Debug, Clone, Default)] pub struct TestChainProvider { /// Maps block numbers to block information using a tuple list. - pub blocks: Vec<(u64, BlockInfo)>, + pub blocks: Vec<(u64, BlockInfo, Vec)>, /// Maps block hashes to header information using a tuple list. pub headers: Vec<(B256, Header)>, /// Maps block hashes to receipts using a tuple list. @@ -77,7 +77,7 @@ pub struct TestChainProvider { impl TestChainProvider { /// Insert a block into the mock chain provider. pub fn insert_block(&mut self, number: u64, block: BlockInfo) { - self.blocks.push((number, block)); + self.blocks.push((number, block, Vec::new())); } /// Insert receipts into the mock chain provider. @@ -124,7 +124,7 @@ impl ChainProvider for TestChainProvider { } async fn block_info_by_number(&mut self, _number: u64) -> Result { - if let Some((_, block)) = self.blocks.iter().find(|(n, _)| *n == _number) { + if let Some((_, block, _)) = self.blocks.iter().find(|(n, _, _)| *n == _number) { Ok(*block) } else { Err(anyhow::anyhow!("Block not found")) @@ -143,13 +143,13 @@ impl ChainProvider for TestChainProvider { &mut self, hash: B256, ) -> Result<(BlockInfo, Vec)> { - let block = self + let (block, txs) = self .blocks .iter() - .find(|(_, b)| b.hash == hash) - .map(|(_, b)| *b) + .find(|(_, b, _)| b.hash == hash) + .map(|(_, b, v)| (*b, v.clone())) .ok_or_else(|| anyhow::anyhow!("Block not found"))?; - Ok((block, Vec::new())) + Ok((block, txs)) } } diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index afed20ff37..c5eea8b0ab 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -62,6 +62,12 @@ impl PartialEq for StageError { if let (StageError::Reset(a), StageError::Reset(b)) = (self, other) { return a == b; } + if let (StageError::AttributesBuild(a), StageError::AttributesBuild(b)) = (self, other) { + return a == b; + } + if let (StageError::BlockFetch(a), StageError::BlockFetch(b)) = (self, other) { + return a == b; + } matches!( (self, other), (StageError::Eof, StageError::Eof) | @@ -72,7 +78,6 @@ impl PartialEq for StageError { (StageError::NoChannel, StageError::NoChannel) | (StageError::ChannelNotFound, StageError::ChannelNotFound) | (StageError::MissingOrigin, StageError::MissingOrigin) | - (StageError::AttributesBuild(_), StageError::AttributesBuild(_)) | (StageError::ReceiptFetch(_), StageError::ReceiptFetch(_)) | (StageError::BlockInfoFetch(_), StageError::BlockInfoFetch(_)) | (StageError::SystemConfigUpdate(_), StageError::SystemConfigUpdate(_)) | diff --git a/crates/plasma/src/plasma.rs b/crates/plasma/src/plasma.rs index f29b04765d..22f9033965 100644 --- a/crates/plasma/src/plasma.rs +++ b/crates/plasma/src/plasma.rs @@ -10,7 +10,7 @@ use alloy_primitives::{Address, Bytes}; use anyhow::Result; use async_trait::async_trait; use kona_derive::{ - traits::{ChainProvider, DataAvailabilityProvider}, + traits::{AsyncIterator, ChainProvider, DataAvailabilityProvider}, types::{BlockInfo, RollupConfig}, }; use kona_primitives::BlockID; @@ -21,7 +21,7 @@ pub struct PlasmaDataSource where C: ChainProvider + Send + Clone, PIF: PlasmaInputFetcher + Clone, - I: Iterator + Send + Clone, + I: AsyncIterator + Send + Clone, { /// The chain provider to use for the factory. pub chain_provider: C, @@ -37,7 +37,7 @@ impl PlasmaDataSource where C: ChainProvider + Send + Clone + Debug, PIF: PlasmaInputFetcher + Clone, - I: Iterator + Send + Clone, + I: AsyncIterator + Send + Clone, { /// Creates a new factory. pub fn new(provider: C, pif: PIF, s: I, cfg: &RollupConfig) -> Self { @@ -55,7 +55,7 @@ impl DataAvailabilityProvider for PlasmaDataSource where C: ChainProvider + Send + Clone + Debug + Sync, PIF: PlasmaInputFetcher + Clone + Debug + Send + Sync, - I: Iterator + Send + Clone + Debug + Sync, + I: AsyncIterator + Send + Clone + Debug + Sync, { type Item = Bytes; type DataIter = PlasmaSource; diff --git a/crates/plasma/src/source.rs b/crates/plasma/src/source.rs index c9b8de50e5..69fe37576e 100644 --- a/crates/plasma/src/source.rs +++ b/crates/plasma/src/source.rs @@ -22,7 +22,7 @@ pub struct PlasmaSource where C: ChainProvider + Send, F: PlasmaInputFetcher + Send, - I: Iterator, + I: AsyncIterator, { /// The chain provider to use for the plasma source. pub chain_provider: C, @@ -40,7 +40,7 @@ impl PlasmaSource where C: ChainProvider + Send, F: PlasmaInputFetcher + Send, - I: Iterator, + I: AsyncIterator, { /// Instantiates a new plasma data source. pub fn new(chain_provider: C, input_fetcher: F, source: I, id: BlockID) -> Self { @@ -53,7 +53,7 @@ impl AsyncIterator for PlasmaSource where C: ChainProvider + Send, F: PlasmaInputFetcher + Send, - I: Iterator + Send, + I: AsyncIterator + Send, { type Item = Bytes; @@ -85,8 +85,12 @@ where // Set the commitment if it isn't available. if self.commitment.is_none() { // The l1 source returns the input commitment for the batch. - let data = match self.source.next().ok_or(PlasmaError::NotEnoughData) { - Ok(d) => d, + let data = match self.source.next().await.ok_or(PlasmaError::NotEnoughData) { + Ok(Ok(d)) => d, + Ok(Err(e)) => { + tracing::warn!("failed to pull next data from the plasma source iterator"); + return Some(Err(e)); + } Err(e) => { tracing::warn!("failed to pull next data from the plasma source iterator"); return Some(Err(StageError::Custom(anyhow!(e)))); @@ -188,11 +192,15 @@ where mod tests { use super::*; use crate::test_utils::TestPlasmaInputFetcher; - use alloc::vec; + use alloc::{vec, vec::Vec}; + use alloy_consensus::{SignableTransaction, TxEip1559, TxEnvelope}; + use alloy_primitives::{Address, Signature, TxKind, U256}; use kona_derive::{ + sources::CalldataSource, stages::test_utils::{CollectingLayer, TraceStorage}, traits::test_utils::TestChainProvider, }; + use kona_primitives::block::BlockInfo; use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -203,11 +211,14 @@ mod tests { advances: vec![Err(PlasmaError::ReorgRequired)], ..Default::default() }; - let source = vec![Bytes::from("hello"), Bytes::from("world")].into_iter(); + let source = CalldataSource::new( + chain_provider.clone(), + Address::default(), + BlockInfo::default(), + Address::default(), + ); let id = BlockID { number: 1, ..Default::default() }; - let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); - let err = plasma_source.next().await.unwrap().unwrap_err(); assert_eq!(err, StageError::Reset(ResetError::NewExpiredChallenge)); } @@ -219,69 +230,246 @@ mod tests { advances: vec![Err(PlasmaError::NotEnoughData)], ..Default::default() }; - let source = vec![Bytes::from("hello"), Bytes::from("world")].into_iter(); + let source = CalldataSource::new( + chain_provider.clone(), + Address::default(), + BlockInfo::default(), + Address::default(), + ); let id = BlockID { number: 1, ..Default::default() }; - let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); - let err = plasma_source.next().await.unwrap().unwrap_err(); matches!(err, StageError::Temporary(_)); } #[tokio::test] - async fn test_next_plasma_not_enough_source_data() { + async fn test_next_plasma_internal_block_fetch_fail() { let chain_provider = TestChainProvider::default(); let input_fetcher = TestPlasmaInputFetcher { advances: vec![Ok(())], ..Default::default() }; - let source = vec![].into_iter(); + let source = CalldataSource::new( + chain_provider.clone(), + Address::default(), + BlockInfo::default(), + Address::default(), + ); let id = BlockID { number: 1, ..Default::default() }; + let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); + let err = plasma_source.next().await.unwrap().unwrap_err(); + assert_eq!(err, StageError::BlockFetch(Default::default())); + } + #[tokio::test] + async fn test_next_plasma_calldata_eof() { + let chain_provider = TestChainProvider::default(); + let input_fetcher = TestPlasmaInputFetcher { advances: vec![Ok(())], ..Default::default() }; + let id = BlockID { number: 1, ..Default::default() }; + let source_chain_provider = TestChainProvider { + blocks: vec![(1, BlockInfo::default(), Vec::new())], + ..Default::default() + }; + let source = CalldataSource::new( + source_chain_provider, + Address::default(), + BlockInfo::default(), + Address::default(), + ); let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); + let err = plasma_source.next().await.unwrap().unwrap_err(); + assert_eq!(err, StageError::Eof); + } + #[tokio::test] + async fn test_next_plasma_not_enough_source_data() { + let chain_provider = TestChainProvider::default(); + let input_fetcher = TestPlasmaInputFetcher { advances: vec![Ok(())], ..Default::default() }; + let id = BlockID { number: 1, ..Default::default() }; + let signature = Signature::test_signature(); + let batcher_address = Address::left_padding_from(&[6]); + let tx = TxEnvelope::Eip1559( + TxEip1559 { + chain_id: 1u64, + nonce: 2, + max_fee_per_gas: 3, + max_priority_fee_per_gas: 4, + gas_limit: 5, + to: TxKind::Call(batcher_address), + value: U256::from(7_u64), + input: Bytes::from(vec![]), + access_list: Default::default(), + } + .into_signed(signature), + ); + let signer = alloy_primitives::address!("616268d0e4d1a33d8f95aba56e880b6e29551174"); + let txs = vec![tx]; + let source_chain_provider = TestChainProvider { + blocks: vec![(1, BlockInfo::default(), txs)], + ..Default::default() + }; + let source = CalldataSource::new( + source_chain_provider, + batcher_address, + BlockInfo::default(), + signer, + ); + let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); let err = plasma_source.next().await.unwrap().unwrap_err(); - assert_eq!(err, StageError::Custom(anyhow!(PlasmaError::NotEnoughData))); + // We cannot assert NotEnoughData since the calldata source can't pop nothing. + assert_eq!(err, StageError::Eof); } #[tokio::test] - async fn test_next_plasma_empty_source_data() { + async fn test_next_plasma_non_plasma_tx_data_forwards() { let trace_store: TraceStorage = Default::default(); let layer = CollectingLayer::new(trace_store.clone()); tracing_subscriber::Registry::default().with(layer).init(); let chain_provider = TestChainProvider::default(); let input_fetcher = TestPlasmaInputFetcher { advances: vec![Ok(())], ..Default::default() }; - let source = vec![Bytes::from("")].into_iter(); let id = BlockID { number: 1, ..Default::default() }; + let signature = Signature::test_signature(); + let batcher_address = Address::left_padding_from(&[6]); + let tx = TxEnvelope::Eip1559( + TxEip1559 { + chain_id: 1u64, + nonce: 2, + max_fee_per_gas: 3, + max_priority_fee_per_gas: 4, + gas_limit: 5, + to: TxKind::Call(batcher_address), + value: U256::from(7_u64), + input: Bytes::from(vec![8]), + access_list: Default::default(), + } + .into_signed(signature), + ); + let signer = alloy_primitives::address!("616268d0e4d1a33d8f95aba56e880b6e29551174"); + let txs = vec![tx]; + let source_chain_provider = TestChainProvider { + blocks: vec![(1, BlockInfo::default(), txs)], + ..Default::default() + }; + let source = CalldataSource::new( + source_chain_provider, + batcher_address, + BlockInfo::default(), + signer, + ); + let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); + + let data = plasma_source.next().await.unwrap().unwrap(); + assert_eq!(data, vec![8u8]); + let logs = trace_store.get_by_level(Level::INFO); + assert_eq!(logs.len(), 1); + assert!(logs[0].contains("non-plasma tx data, forwarding downstream")); + } + + #[tokio::test] + async fn test_next_plasma_valid_commitment_failed_to_pull_next_data() { + let trace_store: TraceStorage = Default::default(); + let layer = CollectingLayer::new(trace_store.clone()); + tracing_subscriber::Registry::default().with(layer).init(); + + let chain_provider = TestChainProvider::default(); + let input_fetcher = TestPlasmaInputFetcher { advances: vec![Ok(())], ..Default::default() }; + let id = BlockID { number: 1, ..Default::default() }; + let signature = Signature::test_signature(); + let batcher_address = Address::left_padding_from(&[6]); + let input = Bytes::from( + &b"01001d2b0bda21d56b8bd12d4f94ebacffdfb35f5e226f84b461103bb8beab6353be"[..], + ); + let tx = TxEnvelope::Eip1559( + TxEip1559 { + chain_id: 1u64, + nonce: 2, + max_fee_per_gas: 3, + max_priority_fee_per_gas: 4, + gas_limit: 5, + to: TxKind::Call(batcher_address), + value: U256::from(7_u64), + input, + access_list: Default::default(), + } + .into_signed(signature), + ); + let signer = alloy_primitives::address!("616268d0e4d1a33d8f95aba56e880b6e29551174"); + let txs = vec![tx]; + let source_chain_provider = TestChainProvider { + blocks: vec![(1, BlockInfo::default(), txs)], + ..Default::default() + }; + let source = CalldataSource::new( + source_chain_provider, + batcher_address, + BlockInfo::default(), + signer, + ); let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); let err = plasma_source.next().await.unwrap().unwrap_err(); - assert_eq!(err, StageError::Custom(anyhow!(PlasmaError::NotEnoughData))); + assert_eq!(err, StageError::Eof); let logs = trace_store.get_by_level(Level::WARN); assert_eq!(logs.len(), 1); - assert!(logs[0].contains("empty data from plasma source")); + assert!(logs[0].contains("failed to pull next data from the plasma source iterator")); } #[tokio::test] - async fn test_next_plasma_non_plasma_tx_data_forwards() { + async fn test_next_plasma_succeeds() { let trace_store: TraceStorage = Default::default(); let layer = CollectingLayer::new(trace_store.clone()); tracing_subscriber::Registry::default().with(layer).init(); let chain_provider = TestChainProvider::default(); - let input_fetcher = TestPlasmaInputFetcher { advances: vec![Ok(())], ..Default::default() }; - let first = Bytes::copy_from_slice(&[2u8]); - let source = vec![first.clone()].into_iter(); + let expected_input = Bytes::from( + &b"11111111111111111111111111111111111111111111111111111111111111111111"[..], + ); + let input_fetcher = TestPlasmaInputFetcher { + advances: vec![Ok(())], + inputs: vec![Ok(expected_input.clone())], + ..Default::default() + }; let id = BlockID { number: 1, ..Default::default() }; + let signature = Signature::test_signature(); + let batcher_address = Address::left_padding_from(&[6]); + let input = Bytes::from(alloy_primitives::hex!( + "01001d2b0bda21d56b8bd12d4f94ebacffdfb35f5e226f84b461103bb8beab6353be" + )); + let tx = TxEnvelope::Eip1559( + TxEip1559 { + chain_id: 1u64, + nonce: 2, + max_fee_per_gas: 3, + max_priority_fee_per_gas: 4, + gas_limit: 5, + to: TxKind::Call(batcher_address), + value: U256::from(7_u64), + input, + access_list: Default::default(), + } + .into_signed(signature), + ); + let signer = alloy_primitives::address!("26e7b8bddd30a259d73d04ac83072d5eefec0eb0"); + let txs = vec![tx]; + let source_chain_provider = TestChainProvider { + blocks: vec![(1, BlockInfo::default(), txs)], + ..Default::default() + }; + let source = CalldataSource::new( + source_chain_provider, + batcher_address, + BlockInfo::default(), + signer, + ); let mut plasma_source = PlasmaSource::new(chain_provider, input_fetcher, source, id); let data = plasma_source.next().await.unwrap().unwrap(); - assert_eq!(data, first); + assert_eq!(data, expected_input); - let logs = trace_store.get_by_level(Level::INFO); + let logs = trace_store.get_by_level(Level::DEBUG); assert_eq!(logs.len(), 1); - assert!(logs[0].contains("non-plasma tx data, forwarding downstream")); + assert!(logs[0].contains("plasma input fetcher - l1 origin advanced")); } // TODO: more tests