diff --git a/Cargo.lock b/Cargo.lock index a75587abdc..d01344d8d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4692,6 +4692,7 @@ dependencies = [ "alloy-rpc-types-eth 1.0.1", "alloy-transport", "alloy-transport-http", + "async-stream", "async-trait", "derive_more", "futures", diff --git a/Cargo.toml b/Cargo.toml index db82dcdf21..f210b27b24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -181,6 +181,7 @@ vergen-git2 = "1.0.0" parking_lot = "0.12.3" async-trait = "0.1.88" tokio-stream = "0.1.17" +async-stream = "0.3.6" async-channel = "2.3.1" http-body-util = "0.1.3" unsigned-varint = "0.8.0" diff --git a/crates/node/engine/src/attributes.rs b/crates/node/engine/src/attributes.rs index 06f1e69a7b..c36361183b 100644 --- a/crates/node/engine/src/attributes.rs +++ b/crates/node/engine/src/attributes.rs @@ -35,7 +35,7 @@ impl AttributesMatch { attributes: &OpAttributesWithParent, block: &Block, ) -> Self { - let attr_withdrawals = attributes.attributes.payload_attributes.withdrawals.as_ref(); + let attr_withdrawals = attributes.inner().payload_attributes.withdrawals.as_ref(); let attr_withdrawals = attr_withdrawals.map(|w| Withdrawals::new(w.to_vec())); let block_withdrawals = block.withdrawals.as_ref(); @@ -153,7 +153,7 @@ impl AttributesMatch { ) -> Self { // We can assume that the EIP-1559 params are set iff holocene is active. // Note here that we don't need to check for the attributes length because of type-safety. - let (ae, ad): (u128, u128) = match attributes.attributes.decode_eip_1559_params() { + let (ae, ad): (u128, u128) = match attributes.inner().decode_eip_1559_params() { None => { // Holocene is active but the eip1559 are not set. This is a bug! // Note: we checked the timestamp match above, so we can assume that both the @@ -237,18 +237,18 @@ impl AttributesMatch { .into(); } - if attributes.attributes.payload_attributes.timestamp != block.header.inner.timestamp { + if attributes.inner().payload_attributes.timestamp != block.header.inner.timestamp { return AttributesMismatch::Timestamp( - attributes.attributes.payload_attributes.timestamp, + attributes.inner().payload_attributes.timestamp, block.header.inner.timestamp, ) .into(); } let mix_hash = block.header.inner.mix_hash; - if attributes.attributes.payload_attributes.prev_randao != mix_hash { + if attributes.inner().payload_attributes.prev_randao != mix_hash { return AttributesMismatch::PrevRandao( - attributes.attributes.payload_attributes.prev_randao, + attributes.inner().payload_attributes.prev_randao, mix_hash, ) .into(); @@ -257,14 +257,14 @@ impl AttributesMatch { // Let's extract the list of attribute transactions let default_vec = vec![]; let attributes_txs = - attributes.attributes.transactions.as_ref().map_or_else(|| &default_vec, |attrs| attrs); + attributes.inner().transactions.as_ref().map_or_else(|| &default_vec, |attrs| attrs); // Check transactions if let mismatch @ Self::Mismatch(_) = Self::check_transactions(attributes_txs, block) { return mismatch } - let Some(gas_limit) = attributes.attributes.gas_limit else { + let Some(gas_limit) = attributes.inner().gas_limit else { return AttributesMismatch::MissingAttributesGasLimit.into(); }; @@ -276,21 +276,21 @@ impl AttributesMatch { return m.into(); } - if attributes.attributes.payload_attributes.parent_beacon_block_root != + if attributes.inner().payload_attributes.parent_beacon_block_root != block.header.inner.parent_beacon_block_root { return AttributesMismatch::ParentBeaconBlockRoot( - attributes.attributes.payload_attributes.parent_beacon_block_root, + attributes.inner().payload_attributes.parent_beacon_block_root, block.header.inner.parent_beacon_block_root, ) .into(); } - if attributes.attributes.payload_attributes.suggested_fee_recipient != + if attributes.inner().payload_attributes.suggested_fee_recipient != block.header.inner.beneficiary { return AttributesMismatch::FeeRecipient( - attributes.attributes.payload_attributes.suggested_fee_recipient, + attributes.inner().payload_attributes.suggested_fee_recipient, block.header.inner.beneficiary, ) .into(); @@ -371,15 +371,16 @@ mod tests { use alloy_primitives::{Bytes, FixedBytes, address, b256}; use alloy_rpc_types_eth::BlockTransactions; use arbitrary::{Arbitrary, Unstructured}; - use kona_protocol::L2BlockInfo; + use kona_protocol::{BlockInfo, L2BlockInfo}; use kona_registry::ROLLUP_CONFIGS; use op_alloy_consensus::encode_holocene_extra_data; use op_alloy_rpc_types_engine::OpPayloadAttributes; fn default_attributes() -> OpAttributesWithParent { OpAttributesWithParent { - attributes: OpPayloadAttributes::default(), + inner: OpPayloadAttributes::default(), parent: L2BlockInfo::default(), + l1_origin: BlockInfo::default(), is_last_in_span: true, } } @@ -414,7 +415,7 @@ mod tests { block.header.inner.timestamp = 1234567890; let check = AttributesMatch::check(cfg, &attributes, &block); let expected: AttributesMatch = AttributesMismatch::Timestamp( - attributes.attributes.payload_attributes.timestamp, + attributes.inner().payload_attributes.timestamp, block.header.inner.timestamp, ) .into(); @@ -431,7 +432,7 @@ mod tests { b256!("1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"); let check = AttributesMatch::check(cfg, &attributes, &block); let expected: AttributesMatch = AttributesMismatch::PrevRandao( - attributes.attributes.payload_attributes.prev_randao, + attributes.inner().payload_attributes.prev_randao, block.header.inner.mix_hash, ) .into(); @@ -455,12 +456,12 @@ mod tests { fn test_attributes_match_check_gas_limit() { let cfg = default_rollup_config(); let mut attributes = default_attributes(); - attributes.attributes.gas_limit = Some(123457); + attributes.inner.gas_limit = Some(123457); let mut block = Block::::default(); block.header.inner.gas_limit = 123456; let check = AttributesMatch::check(cfg, &attributes, &block); let expected: AttributesMatch = AttributesMismatch::GasLimit( - attributes.attributes.gas_limit.unwrap_or_default(), + attributes.inner().gas_limit.unwrap_or_default(), block.header.inner.gas_limit, ) .into(); @@ -472,13 +473,13 @@ mod tests { fn test_attributes_match_check_parent_beacon_block_root() { let cfg = default_rollup_config(); let mut attributes = default_attributes(); - attributes.attributes.gas_limit = Some(0); - attributes.attributes.payload_attributes.parent_beacon_block_root = + attributes.inner.gas_limit = Some(0); + attributes.inner.payload_attributes.parent_beacon_block_root = Some(b256!("1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")); let block = Block::::default(); let check = AttributesMatch::check(cfg, &attributes, &block); let expected: AttributesMatch = AttributesMismatch::ParentBeaconBlockRoot( - attributes.attributes.payload_attributes.parent_beacon_block_root, + attributes.inner().payload_attributes.parent_beacon_block_root, block.header.inner.parent_beacon_block_root, ) .into(); @@ -490,12 +491,12 @@ mod tests { fn test_attributes_match_check_fee_recipient() { let cfg = default_rollup_config(); let mut attributes = default_attributes(); - attributes.attributes.gas_limit = Some(0); + attributes.inner.gas_limit = Some(0); let mut block = Block::::default(); block.header.inner.beneficiary = address!("1234567890abcdef1234567890abcdef12345678"); let check = AttributesMatch::check(cfg, &attributes, &block); let expected: AttributesMatch = AttributesMismatch::FeeRecipient( - attributes.attributes.payload_attributes.suggested_fee_recipient, + attributes.inner().payload_attributes.suggested_fee_recipient, block.header.inner.beneficiary, ) .into(); @@ -526,8 +527,8 @@ mod tests { let transactions = generate_txs(NUM_TXS); let mut attributes = default_attributes(); - attributes.attributes.gas_limit = Some(0); - attributes.attributes.transactions = Some( + attributes.inner.gas_limit = Some(0); + attributes.inner.transactions = Some( transactions .iter() .map(|tx| { @@ -559,12 +560,12 @@ mod tests { fn test_attributes_mismatch_check_transactions_len() { let cfg = default_rollup_config(); let (mut attributes, block) = test_transactions_match_helper(); - attributes.attributes = OpPayloadAttributes { - transactions: attributes.attributes.transactions.map(|mut txs| { + attributes.inner = OpPayloadAttributes { + transactions: attributes.inner.transactions.map(|mut txs| { txs.pop(); txs }), - ..attributes.attributes + ..attributes.inner }; let block_txs_len = block.transactions.len(); @@ -608,7 +609,7 @@ mod tests { fn test_attributes_mismatch_empty_tx_attributes() { let cfg = default_rollup_config(); let (mut attributes, block) = test_transactions_match_helper(); - attributes.attributes = OpPayloadAttributes { transactions: None, ..attributes.attributes }; + attributes.inner = OpPayloadAttributes { transactions: None, ..attributes.inner }; let block_txs_len = block.transactions.len(); @@ -640,7 +641,7 @@ mod tests { fn test_attributes_transactions_wrong_format() { let cfg = default_rollup_config(); let (mut attributes, block) = test_transactions_match_helper(); - let txs = attributes.attributes.transactions.as_mut().unwrap(); + let txs = attributes.inner.transactions.as_mut().unwrap(); let first_tx_bytes = txs.first_mut().unwrap(); *first_tx_bytes = Bytes::copy_from_slice(&[0, 1, 2]); @@ -658,8 +659,7 @@ mod tests { let cfg = default_rollup_config(); let (mut attributes, mut block) = test_transactions_match_helper(); - attributes.attributes = - OpPayloadAttributes { transactions: Some(vec![]), ..attributes.attributes }; + attributes.inner = OpPayloadAttributes { transactions: Some(vec![]), ..attributes.inner }; block.transactions = BlockTransactions::Full(vec![]); @@ -668,7 +668,7 @@ mod tests { // Edge case: if the block transactions and the payload attributes are empty, we can also // use the hash format (this is the default value of `BlockTransactions`). - attributes.attributes = OpPayloadAttributes { transactions: None, ..attributes.attributes }; + attributes.inner = OpPayloadAttributes { transactions: None, ..attributes.inner }; block.transactions = BlockTransactions::Hashes(vec![]); let check = AttributesMatch::check(cfg, &attributes, &block); @@ -682,8 +682,7 @@ mod tests { let cfg = default_rollup_config(); let (mut attributes, mut block) = test_transactions_match_helper(); - attributes.attributes = - OpPayloadAttributes { transactions: Some(vec![]), ..attributes.attributes }; + attributes.inner = OpPayloadAttributes { transactions: Some(vec![]), ..attributes.inner }; block.transactions = BlockTransactions::Hashes(vec![]); @@ -697,8 +696,7 @@ mod tests { let cfg = default_rollup_config(); let (mut attributes, mut block) = test_transactions_match_helper(); - attributes.attributes = - OpPayloadAttributes { transactions: Some(vec![]), ..attributes.attributes }; + attributes.inner = OpPayloadAttributes { transactions: Some(vec![]), ..attributes.inner }; block.transactions = BlockTransactions::Uncle; @@ -716,9 +714,9 @@ mod tests { cfg.hardforks.holocene_time = Some(0); let mut attributes = default_attributes(); - attributes.attributes.gas_limit = Some(0); + attributes.inner.gas_limit = Some(0); // For canyon and above we need to specify the withdrawals - attributes.attributes.payload_attributes.withdrawals = Some(vec![]); + attributes.inner.payload_attributes.withdrawals = Some(vec![]); // For canyon and above we also need to specify the withdrawal headers let block = Block { @@ -751,7 +749,7 @@ mod tests { fn test_eip1559_parameters_specified_attributes_but_not_block() { let (cfg, mut attributes, block) = eip1559_test_setup(); - attributes.attributes.eip_1559_params = Some(Default::default()); + attributes.inner.eip_1559_params = Some(Default::default()); let check = AttributesMatch::check(&cfg, &attributes, &block); assert_eq!(check, AttributesMatch::Mismatch(AttributesMismatch::MissingBlockEIP1559)); @@ -764,7 +762,7 @@ mod tests { fn test_eip1559_parameters_specified_both_and_empty() { let (cfg, mut attributes, mut block) = eip1559_test_setup(); - attributes.attributes.eip_1559_params = Some(Default::default()); + attributes.inner.eip_1559_params = Some(Default::default()); block.header.extra_data = vec![0; 9].into(); let check = AttributesMatch::check(&cfg, &attributes, &block); @@ -782,7 +780,7 @@ mod tests { fn test_eip1559_parameters_empty_for_attr_only() { let (cfg, mut attributes, mut block) = eip1559_test_setup(); - attributes.attributes.eip_1559_params = Some(Default::default()); + attributes.inner.eip_1559_params = Some(Default::default()); block.header.extra_data = encode_holocene_extra_data( Default::default(), BaseFeeParams { max_change_denominator: 250, elasticity_multiplier: 6 }, @@ -806,7 +804,7 @@ mod tests { let eip1559_params: FixedBytes<8> = eip1559_extra_params.clone().split_off(1).as_ref().try_into().unwrap(); - attributes.attributes.eip_1559_params = Some(eip1559_params); + attributes.inner.eip_1559_params = Some(eip1559_params); block.header.extra_data = eip1559_extra_params; let check = AttributesMatch::check(&cfg, &attributes, &block); @@ -834,7 +832,7 @@ mod tests { .try_into() .unwrap(); - attributes.attributes.eip_1559_params = Some(eip1559_params); + attributes.inner.eip_1559_params = Some(eip1559_params); block.header.extra_data = eip1559_extra_params; let check = AttributesMatch::check(&cfg, &attributes, &block); @@ -861,7 +859,7 @@ mod tests { let eip1559_params: FixedBytes<8> = eip1559_extra_params.clone().split_off(1).as_ref().try_into().unwrap(); - attributes.attributes.eip_1559_params = Some(eip1559_params); + attributes.inner.eip_1559_params = Some(eip1559_params); block.header.extra_data = eip1559_extra_params; let check = AttributesMatch::check(&cfg, &attributes, &block); @@ -888,7 +886,7 @@ mod tests { let mut raw_extra_params_bytes = eip1559_extra_params.to_vec(); raw_extra_params_bytes[0] = 10; - attributes.attributes.eip_1559_params = Some(eip1559_params); + attributes.inner.eip_1559_params = Some(eip1559_params); block.header.extra_data = raw_extra_params_bytes.into(); let check = AttributesMatch::check(&cfg, &attributes, &block); @@ -903,7 +901,7 @@ mod tests { cfg.chain_op_config.eip1559_denominator_canyon = u128::MAX; cfg.chain_op_config.eip1559_elasticity = u128::MAX; - attributes.attributes.eip_1559_params = Some(Default::default()); + attributes.inner.eip_1559_params = Some(Default::default()); block.header.extra_data = vec![0; 9].into(); let check = AttributesMatch::check(&cfg, &attributes, &block); @@ -927,7 +925,7 @@ mod tests { fn test_attributes_match() { let cfg = default_rollup_config(); let mut attributes = default_attributes(); - attributes.attributes.gas_limit = Some(0); + attributes.inner.gas_limit = Some(0); let block = Block::::default(); let check = AttributesMatch::check(cfg, &attributes, &block); assert_eq!(check, AttributesMatch::Match); diff --git a/crates/node/engine/src/client.rs b/crates/node/engine/src/client.rs index 2d6a4759d5..94636335fd 100644 --- a/crates/node/engine/src/client.rs +++ b/crates/node/engine/src/client.rs @@ -101,6 +101,11 @@ impl EngineClient { &self.l1_provider } + /// Returns a reference to the inner [`RollupConfig`]. + pub fn cfg(&self) -> &RollupConfig { + &self.cfg + } + /// Fetches the [`Block`] for the given [`BlockNumberOrTag`]. pub async fn l2_block_by_label( &self, diff --git a/crates/node/engine/src/lib.rs b/crates/node/engine/src/lib.rs index 7512f6d3d2..fbd402b488 100644 --- a/crates/node/engine/src/lib.rs +++ b/crates/node/engine/src/lib.rs @@ -12,8 +12,8 @@ extern crate tracing; mod task_queue; pub use task_queue::{ BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError, - EngineTask, EngineTaskError, EngineTaskExt, ForkchoiceTask, ForkchoiceTaskError, - InsertUnsafeTask, InsertUnsafeTaskError, + EngineTask, EngineTaskError, EngineTaskExt, FinalizeTask, FinalizeTaskError, ForkchoiceTask, + ForkchoiceTaskError, InsertUnsafeTask, InsertUnsafeTaskError, }; mod attributes; diff --git a/crates/node/engine/src/metrics/mod.rs b/crates/node/engine/src/metrics/mod.rs index 59788a3961..d4623a0b78 100644 --- a/crates/node/engine/src/metrics/mod.rs +++ b/crates/node/engine/src/metrics/mod.rs @@ -28,6 +28,8 @@ impl Metrics { pub const FORKCHOICE_TASK_LABEL: &str = "forkchoice-update"; /// Build task label. pub const BUILD_TASK_LABEL: &str = "build"; + /// Finalize task label. + pub const FINALIZE_TASK_LABEL: &str = "finalize"; /// Identifier for the histogram that tracks engine method call time. pub const ENGINE_METHOD_REQUEST_DURATION: &str = "kona_node_engine_method_request_duration"; @@ -85,6 +87,7 @@ impl Metrics { kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::CONSOLIDATE_TASK_LABEL, 0); kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::FORKCHOICE_TASK_LABEL, 0); kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::BUILD_TASK_LABEL, 0); + kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::FINALIZE_TASK_LABEL, 0); // Engine reset count kona_macros::set!(counter, Self::ENGINE_RESET_COUNT, 0); diff --git a/crates/node/engine/src/task_queue/tasks/build/task.rs b/crates/node/engine/src/task_queue/tasks/build/task.rs index b00631cf24..84aab69cf7 100644 --- a/crates/node/engine/src/task_queue/tasks/build/task.rs +++ b/crates/node/engine/src/task_queue/tasks/build/task.rs @@ -66,31 +66,31 @@ impl BuildTask { ) -> Result { debug!( target: "engine_builder", - txs = attributes_envelope.attributes.transactions.as_ref().map_or(0, |txs| txs.len()), + txs = attributes_envelope.inner().transactions.as_ref().map_or(0, |txs| txs.len()), "Starting new build job" ); let forkchoice_version = EngineForkchoiceVersion::from_cfg( &self.cfg, - attributes_envelope.attributes.payload_attributes.timestamp, + attributes_envelope.inner().payload_attributes.timestamp, ); debug!(target: "engine_builder", ?forkchoice_version, "Forkchoice version"); let update = match forkchoice_version { EngineForkchoiceVersion::V3 => { engine_client - .fork_choice_updated_v3(forkchoice, Some(attributes_envelope.attributes)) + .fork_choice_updated_v3(forkchoice, Some(attributes_envelope.inner)) .await } EngineForkchoiceVersion::V2 => { engine_client - .fork_choice_updated_v2(forkchoice, Some(attributes_envelope.attributes)) + .fork_choice_updated_v2(forkchoice, Some(attributes_envelope.inner)) .await } EngineForkchoiceVersion::V1 => { engine_client .fork_choice_updated_v1( forkchoice, - Some(attributes_envelope.attributes.payload_attributes), + Some(attributes_envelope.inner.payload_attributes), ) .await } @@ -149,7 +149,7 @@ impl BuildTask { payload_id: PayloadId, payload_attrs: OpAttributesWithParent, ) -> Result { - let payload_timestamp = payload_attrs.attributes.payload_attributes.timestamp; + let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp; debug!( target: "engine_builder", @@ -233,7 +233,7 @@ impl BuildTask { Ok(L2BlockInfo::from_payload_and_genesis( payload, - payload_attrs.attributes.payload_attributes.parent_beacon_block_root, + payload_attrs.inner().payload_attributes.parent_beacon_block_root, &cfg.genesis, )?) } @@ -241,8 +241,7 @@ impl BuildTask { if payload_attrs.is_deposits_only() { error!(target: "engine_builder", "Critical: Deposit-only payload import failed: {validation_error}"); Err(BuildTaskError::DepositOnlyPayloadFailed) - } else if cfg - .is_holocene_active(payload_attrs.attributes.payload_attributes.timestamp) + } else if cfg.is_holocene_active(payload_attrs.inner().payload_attributes.timestamp) { warn!(target: "engine_builder", "Payload import failed: {validation_error}"); warn!(target: "engine_builder", "Re-attempting payload import with deposits only."); diff --git a/crates/node/engine/src/task_queue/tasks/finalize/error.rs b/crates/node/engine/src/task_queue/tasks/finalize/error.rs new file mode 100644 index 0000000000..e8ac414656 --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/finalize/error.rs @@ -0,0 +1,36 @@ +//! Contains error types for the [crate::FinalizeTask]. + +use crate::EngineTaskError; +use alloy_transport::{RpcError, TransportErrorKind}; +use kona_protocol::FromBlockError; +use thiserror::Error; + +/// An error that occurs when running the [crate::FinalizeTask]. +#[derive(Debug, Error)] +pub enum FinalizeTaskError { + /// The block is not safe, and therefore cannot be finalized. + #[error("Attempted to finalize a block that is not yet safe")] + BlockNotSafe, + /// The block to finalize was not found. + #[error("The block to finalize was not found: Number {0}")] + BlockNotFound(u64), + /// An error occurred while transforming the RPC block into [`L2BlockInfo`]. + /// + /// [`L2BlockInfo`]: kona_protocol::L2BlockInfo + #[error(transparent)] + FromBlock(#[from] FromBlockError), + /// A temporary RPC failure. + #[error(transparent)] + TransportError(#[from] RpcError), +} + +impl From for EngineTaskError { + fn from(value: FinalizeTaskError) -> Self { + match value { + FinalizeTaskError::BlockNotSafe => Self::Critical(Box::new(value)), + FinalizeTaskError::BlockNotFound(_) => Self::Critical(Box::new(value)), + FinalizeTaskError::FromBlock(_) => Self::Critical(Box::new(value)), + FinalizeTaskError::TransportError(_) => Self::Temporary(Box::new(value)), + } + } +} diff --git a/crates/node/engine/src/task_queue/tasks/finalize/mod.rs b/crates/node/engine/src/task_queue/tasks/finalize/mod.rs new file mode 100644 index 0000000000..dccec06202 --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/finalize/mod.rs @@ -0,0 +1,7 @@ +//! Task and its associated types for finalizing an L2 block. + +mod task; +pub use task::FinalizeTask; + +mod error; +pub use error::FinalizeTaskError; diff --git a/crates/node/engine/src/task_queue/tasks/finalize/task.rs b/crates/node/engine/src/task_queue/tasks/finalize/task.rs new file mode 100644 index 0000000000..d951f4a07d --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/finalize/task.rs @@ -0,0 +1,60 @@ +//! A task for finalizing an L2 block. + +use crate::{ + EngineClient, EngineState, EngineTaskError, EngineTaskExt, FinalizeTaskError, ForkchoiceTask, + Metrics, +}; +use alloy_provider::Provider; +use async_trait::async_trait; +use kona_protocol::L2BlockInfo; +use std::sync::Arc; + +/// The [`FinalizeTask`] fetches the [`L2BlockInfo`] at `block_number`, updates the [`EngineState`], +/// and dispatches a forkchoice update to finalize the block. +#[derive(Debug, Clone)] +pub struct FinalizeTask { + /// The engine client. + pub client: Arc, + /// The number of the L2 block to finalize. + pub block_number: u64, +} + +impl FinalizeTask { + /// Creates a new [`ForkchoiceTask`]. + pub const fn new(client: Arc, block_number: u64) -> Self { + Self { client, block_number } + } +} + +#[async_trait] +impl EngineTaskExt for FinalizeTask { + async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + // Sanity check that the block that is being finalized is at least safe. + if state.safe_head().block_info.number < self.block_number { + return Err(FinalizeTaskError::BlockNotSafe.into()); + } + + let block = self + .client + .l2_provider() + .get_block(self.block_number.into()) + .full() + .await + .map_err(FinalizeTaskError::TransportError)? + .ok_or(FinalizeTaskError::BlockNotFound(self.block_number))? + .into_consensus(); + let block_info = L2BlockInfo::from_block_and_genesis(&block, &self.client.cfg().genesis) + .map_err(FinalizeTaskError::FromBlock)?; + + // Update the finalized block in the engine state. + state.set_finalized_head(block_info); + + // Dispatch a forkchoice update. + ForkchoiceTask::new(self.client.clone()).execute(state).await?; + + // Update metrics. + kona_macros::inc!(counter, Metrics::ENGINE_TASK_COUNT, Metrics::FINALIZE_TASK_LABEL); + + Ok(()) + } +} diff --git a/crates/node/engine/src/task_queue/tasks/mod.rs b/crates/node/engine/src/task_queue/tasks/mod.rs index d2daa3d29c..9599af0605 100644 --- a/crates/node/engine/src/task_queue/tasks/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/mod.rs @@ -14,3 +14,6 @@ pub use build::{BuildTask, BuildTaskError}; mod consolidate; pub use consolidate::{ConsolidateTask, ConsolidateTaskError}; + +mod finalize; +pub use finalize::{FinalizeTask, FinalizeTaskError}; diff --git a/crates/node/engine/src/task_queue/tasks/task.rs b/crates/node/engine/src/task_queue/tasks/task.rs index 66bd56dc0e..bdc1adeba6 100644 --- a/crates/node/engine/src/task_queue/tasks/task.rs +++ b/crates/node/engine/src/task_queue/tasks/task.rs @@ -2,7 +2,7 @@ //! //! [`Engine`]: crate::Engine -use super::{BuildTask, ConsolidateTask, ForkchoiceTask, InsertUnsafeTask}; +use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertUnsafeTask}; use crate::EngineState; use async_trait::async_trait; use std::cmp::Ordering; @@ -23,6 +23,8 @@ pub enum EngineTask { /// Performs consolidation on the engine state, reverting to payload attribute processing /// via the [`BuildTask`] if consolidation fails. Consolidate(ConsolidateTask), + /// Finalizes an L2 block + Finalize(FinalizeTask), } impl EngineTask { @@ -33,6 +35,7 @@ impl EngineTask { Self::InsertUnsafe(task) => task.execute(state).await, Self::BuildBlock(task) => task.execute(state).await, Self::Consolidate(task) => task.execute(state).await, + Self::Finalize(task) => task.execute(state).await, } } } @@ -44,7 +47,8 @@ impl PartialEq for EngineTask { (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) | (Self::InsertUnsafe(_), Self::InsertUnsafe(_)) | (Self::BuildBlock(_), Self::BuildBlock(_)) | - (Self::Consolidate(_), Self::Consolidate(_)) + (Self::Consolidate(_), Self::Consolidate(_)) | + (Self::Finalize(_), Self::Finalize(_)) ) } } @@ -76,6 +80,7 @@ impl Ord for EngineTask { (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal, (Self::BuildBlock(_), Self::BuildBlock(_)) => Ordering::Equal, (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) => Ordering::Equal, + (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal, // Individual ForkchoiceUpdate tasks are the highest priority (Self::ForkchoiceUpdate(_), _) => Ordering::Greater, @@ -85,9 +90,13 @@ impl Ord for EngineTask { (Self::BuildBlock(_), _) => Ordering::Greater, (_, Self::BuildBlock(_)) => Ordering::Less, - // InsertUnsafe tasks are prioritized over Consolidate tasks + // InsertUnsafe tasks are prioritized over Consolidate and Finalize tasks (Self::InsertUnsafe(_), _) => Ordering::Greater, (_, Self::InsertUnsafe(_)) => Ordering::Less, + + // Consolidate tasks are prioritized over Finalize tasks + (Self::Consolidate(_), _) => Ordering::Greater, + (_, Self::Consolidate(_)) => Ordering::Less, } } } diff --git a/crates/node/service/Cargo.toml b/crates/node/service/Cargo.toml index 24082cb834..278e0912c5 100644 --- a/crates/node/service/Cargo.toml +++ b/crates/node/service/Cargo.toml @@ -46,6 +46,7 @@ tracing.workspace = true thiserror.workspace = true tokio-util.workspace = true async-trait.workspace = true +async-stream.workspace = true derive_more = { workspace = true, features = ["debug"] } jsonrpsee = { workspace = true, features = ["server"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index aba4e47fd2..0448914fa7 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -54,9 +54,9 @@ where /// /// Specs: derivation_signal_rx: UnboundedReceiver, - /// The receiver for L1 head update notifications. l1_head_updates: UnboundedReceiver, + /// The sender for derived [`OpAttributesWithParent`]s produced by the actor. attributes_out: UnboundedSender, /// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward diff --git a/crates/node/service/src/actors/engine.rs b/crates/node/service/src/actors/engine.rs index 4293a03278..ba0a7f528c 100644 --- a/crates/node/service/src/actors/engine.rs +++ b/crates/node/service/src/actors/engine.rs @@ -5,14 +5,14 @@ use async_trait::async_trait; use kona_derive::types::{ResetSignal, Signal}; use kona_engine::{ ConsolidateTask, Engine, EngineClient, EngineQueries, EngineResetError, EngineState, - EngineTask, EngineTaskError, InsertUnsafeTask, + EngineTask, EngineTaskError, FinalizeTask, InsertUnsafeTask, }; use kona_genesis::RollupConfig; -use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; +use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; use kona_sources::RuntimeConfig; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use tokio::{ sync::{ mpsc::{Receiver, UnboundedReceiver, UnboundedSender}, @@ -38,6 +38,7 @@ pub struct EngineActor { pub client: Arc, /// The [`Engine`]. pub engine: Engine, + /// The channel to send the l2 safe head to the derivation actor. engine_l2_safe_head_tx: WatchSender, /// Handler for inbound queries to the engine. @@ -48,6 +49,7 @@ pub struct EngineActor { /// A way for the engine actor to signal back to the derivation actor /// if a block building task produced an `INVALID` response. derivation_signal_tx: UnboundedSender, + /// A channel to receive [`RuntimeConfig`] from the runtime actor. runtime_config_rx: UnboundedReceiver, /// A channel to receive [`OpAttributesWithParent`] from the derivation actor. @@ -56,8 +58,16 @@ pub struct EngineActor { unsafe_block_rx: UnboundedReceiver, /// A channel to receive reset requests. reset_request_rx: UnboundedReceiver<()>, + /// A channel to receive finalized block updates. + finalized_block_rx: UnboundedReceiver, /// The cancellation token, shared between all tasks. cancellation: CancellationToken, + + /// A map of L1 block number -> highest derived L2 block number within the L1 epoch, used to + /// track derived attributes awaiting finalization. When a new finalized L1 block is + /// received, the highest L2 block whose inputs are contained within the finalized L1 chain + /// is finalized. + awaiting_finalization: BTreeMap, } impl EngineActor { @@ -74,6 +84,7 @@ impl EngineActor { attributes_rx: UnboundedReceiver, unsafe_block_rx: UnboundedReceiver, reset_request_rx: UnboundedReceiver<()>, + finalized_block_rx: UnboundedReceiver, inbound_queries: Option>, cancellation: CancellationToken, ) -> Self { @@ -88,16 +99,20 @@ impl EngineActor { attributes_rx, unsafe_block_rx, reset_request_rx, + finalized_block_rx, inbound_queries, cancellation, + awaiting_finalization: BTreeMap::new(), } } /// Resets the inner [`Engine`] and propagates the reset to the derivation actor. pub async fn reset(&mut self) -> Result<(), EngineError> { + // Reset the engine. let (l2_safe_head, l1_origin, system_config) = self.engine.reset(self.client.clone(), &self.config).await?; + // Signal the derivation actor to reset. let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; match self.derivation_signal_tx.send(signal.signal()) { Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"), @@ -108,7 +123,13 @@ impl EngineActor { } } + // Clear the queue of attributes awaiting finalization. It will be re-saturated following + // derivation. + self.awaiting_finalization.clear(); + + // Attempt to update the safe head following the reset. self.maybe_update_safe_head(); + Ok(()) } @@ -244,12 +265,11 @@ impl NodeActor for EngineActor { return Err(EngineError::ChannelClosed); }; let hash = envelope.payload_hash; - let task = InsertUnsafeTask::new( + let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new( Arc::clone(&self.client), Arc::clone(&self.config), envelope, - ); - let task = EngineTask::InsertUnsafe(task); + )); self.engine.enqueue(task); debug!(target: "engine", ?hash, "Enqueued unsafe block task."); self.check_sync().await?; @@ -260,13 +280,19 @@ impl NodeActor for EngineActor { self.cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - let task = ConsolidateTask::new( + + // Optimistically enqueue attributes for finalization. + self.awaiting_finalization + .entry(attributes.l1_origin.number) + .and_modify(|n| *n = (*n).max(attributes.parent.block_info.number + 1)) + .or_insert(attributes.parent.block_info.number + 1); + + let task = EngineTask::Consolidate(ConsolidateTask::new( Arc::clone(&self.client), Arc::clone(&self.config), attributes, true, - ); - let task = EngineTask::Consolidate(task); + )); self.engine.enqueue(task); debug!(target: "engine", "Enqueued attributes consolidation task."); } @@ -292,6 +318,25 @@ impl NodeActor for EngineActor { } }); } + new_finalized_l1 = self.finalized_block_rx.recv() => { + let Some(new_finalized_l1) = new_finalized_l1 else { + error!(target: "engine", "Finalized block receiver closed unexpectedly, exiting node"); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + }; + + // Find the highest safe L2 block that's contained in the finalized chain, that we know about. + let highest_safe = self.awaiting_finalization.range(..=new_finalized_l1.number).next_back(); + + if let Some((_, highest_safe_number)) = highest_safe { + // Enqueue a finalize task. + let task = EngineTask::Finalize(FinalizeTask::new(self.client.clone(), *highest_safe_number)); + self.engine.enqueue(task); + + // Drain the map of all L2 blocks that were derived prior or within the finalized L1 block. + self.awaiting_finalization.retain(|&number, _| number > new_finalized_l1.number); + } + } } } } diff --git a/crates/node/service/src/actors/l1_watcher_rpc.rs b/crates/node/service/src/actors/l1_watcher_rpc.rs index 5d79b20fda..b0e00d02f1 100644 --- a/crates/node/service/src/actors/l1_watcher_rpc.rs +++ b/crates/node/service/src/actors/l1_watcher_rpc.rs @@ -1,16 +1,20 @@ -//! [NodeActor] implementation for an L1 chain watcher that checks for L1 head updates over RPC. +//! [`NodeActor`] implementation for an L1 chain watcher that polls for L1 block updates over HTTP +//! RPC. use crate::NodeActor; -use alloy_eips::BlockId; +use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, B256}; use alloy_provider::{Provider, RootProvider}; -use alloy_rpc_types_eth::Log; +use alloy_rpc_client::PollerBuilder; +use alloy_rpc_types_eth::{Block, Log}; +use alloy_transport::TransportError; +use async_stream::stream; use async_trait::async_trait; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use kona_genesis::{RollupConfig, SystemConfigLog, SystemConfigUpdate, UnsafeBlockSignerUpdate}; use kona_protocol::BlockInfo; use kona_rpc::{L1State, L1WatcherQueries}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use thiserror::Error; use tokio::{ select, @@ -19,20 +23,21 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -/// An L1 chain watcher that checks for L1 head updates over RPC. +/// An L1 chain watcher that checks for L1 block updates over RPC. #[derive(Debug)] pub struct L1WatcherRpc { /// The [`RollupConfig`] to tell if ecotone is active. - /// This is used to determine if the L1 watcher should - /// check for unsafe block signer updates. + /// This is used to determine if the L1 watcher should check for unsafe block signer updates. config: Arc, /// The L1 provider. l1_provider: RootProvider, /// The latest L1 head sent to the derivation pipeline and watcher. Can be subscribed to, in /// order to get the state from the external watcher. latest_head: tokio::sync::watch::Sender>, - /// The outbound event sender. + /// The outbound L1 head block sender. head_sender: UnboundedSender, + /// The outbound L1 finalized block sender. + finalized_sender: UnboundedSender, /// The block signer sender. block_signer_sender: UnboundedSender
, /// The cancellation token, shared between all tasks. @@ -47,6 +52,7 @@ impl L1WatcherRpc { config: Arc, l1_provider: RootProvider, head_sender: UnboundedSender, + finalized_sender: UnboundedSender, block_signer_sender: UnboundedSender
, cancellation: CancellationToken, // Can be None if we disable communication with the L1 watcher. @@ -57,6 +63,7 @@ impl L1WatcherRpc { config, l1_provider, head_sender, + finalized_sender, latest_head: head_updates, block_signer_sender, cancellation, @@ -65,44 +72,15 @@ impl L1WatcherRpc { } /// Fetches logs for the given block hash. - async fn fetch_logs( - &mut self, - block_hash: B256, - ) -> Result, L1WatcherRpcError> { + async fn fetch_logs(&self, block_hash: B256) -> Result, L1WatcherRpcError> { let logs = self .l1_provider .get_logs(&alloy_rpc_types_eth::Filter::new().select(block_hash)) - .await - .map_err(|e| L1WatcherRpcError::Transport(format!("Failed to fetch logs: {}", e)))?; + .await?; Ok(logs) } - /// Fetches the block info for the current L1 head. - async fn block_info_by_hash( - &mut self, - block_hash: B256, - ) -> Result> { - // Fetch the block of the current unsafe L1 head. - let block = self - .l1_provider - .get_block_by_hash(block_hash) - .await - .map_err(|e| L1WatcherRpcError::Transport(e.to_string()))? - .ok_or(L1WatcherRpcError::L1BlockNotFound(block_hash))?; - - // Update the last observed head. The producer does not care about re-orgs, as this is - // handled downstream by receivers of the head update signal. - let head_block_info = BlockInfo { - hash: block.header.hash, - number: block.header.number, - parent_hash: block.header.parent_hash, - timestamp: block.header.timestamp, - }; - - Ok(head_block_info) - } - /// Spins up a task to process inbound queries. fn start_query_processor( &self, @@ -170,13 +148,15 @@ impl NodeActor for L1WatcherRpc { type Error = L1WatcherRpcError; async fn start(mut self) -> Result<(), Self::Error> { - let mut unsafe_head_stream = self - .l1_provider - .watch_blocks() - .await - .map_err(|e| L1WatcherRpcError::Transport(e.to_string()))? - .into_stream() - .flat_map(futures::stream::iter); + let mut head_stream = + BlockStream::new(&self.l1_provider, BlockNumberOrTag::Latest, Duration::from_secs(13)) + .into_stream(); + let mut finalized_stream = BlockStream::new( + &self.l1_provider, + BlockNumberOrTag::Finalized, + Duration::from_secs(60), + ) + .into_stream(); let inbound_queries = std::mem::take(&mut self.inbound_queries); let inbound_query_processor = @@ -196,17 +176,13 @@ impl NodeActor for L1WatcherRpc { if let Some(inbound_query_processor) = inbound_query_processor { inbound_query_processor.abort() } return Ok(()); - } - new_head = unsafe_head_stream.next() => match new_head { + }, + new_head = head_stream.next() => match new_head { None => { - // The stream ended, which should never happen. - return Err(L1WatcherRpcError::Transport( - "L1 block stream ended unexpectedly".to_string(), - )); + return Err(L1WatcherRpcError::StreamEnded); } - Some(new_head) => { + Some(head_block_info) => { // Send the head update event to all consumers. - let head_block_info = self.block_info_by_hash(new_head).await?; self.head_sender.send(head_block_info)?; self.latest_head.send_replace(Some(head_block_info)); @@ -214,7 +190,7 @@ impl NodeActor for L1WatcherRpc { // Build the `SystemConfigUpdate` from the log. // If the update is an Unsafe block signer update, send the address // to the block signer sender. - let logs = self.fetch_logs(new_head).await?; + let logs = self.fetch_logs(head_block_info.hash).await?; let ecotone_active = self.config.is_ecotone_active(head_block_info.timestamp); for log in logs { let sys_cfg_log = SystemConfigLog::new(log.into(), ecotone_active); @@ -232,6 +208,14 @@ impl NodeActor for L1WatcherRpc { } } }, + }, + new_finalized = finalized_stream.next() => match new_finalized { + None => { + return Err(L1WatcherRpcError::StreamEnded); + } + Some(finalized_block_info) => { + self.finalized_sender.send(finalized_block_info)?; + } } } } @@ -243,7 +227,57 @@ impl NodeActor for L1WatcherRpc { } } -/// The error type for the [L1WatcherRpc]. +/// A wrapper around a [`PollerBuilder`] that observes [`BlockId`] updates on a [`RootProvider`]. +/// +/// Note that this stream is not guaranteed to be contiguous. It may miss certain blocks, and +/// yielded items should only be considered to be the latest block matching the given +/// [`BlockNumberOrTag`]. +struct BlockStream<'a> { + /// The inner [`RootProvider`]. + l1_provider: &'a RootProvider, + /// The block tag to poll for. + tag: BlockNumberOrTag, + /// The poll interval (in seconds). + poll_interval: Duration, +} + +impl<'a> BlockStream<'a> { + /// Creates a new [`BlockStream`] instance. + /// + /// ## Panics + /// Panics if the passed [`BlockNumberOrTag`] is of the [`BlockNumberOrTag::Number`] variant. + fn new(l1_provider: &'a RootProvider, tag: BlockNumberOrTag, poll_interval: Duration) -> Self { + if matches!(tag, BlockNumberOrTag::Number(_)) { + panic!("Invalid BlockNumberOrTag variant - Must be a tag"); + } + Self { l1_provider, tag, poll_interval } + } + + /// Transforms the watcher into a [`Stream`]. + fn into_stream(self) -> impl Stream + Unpin { + let mut poll_stream = PollerBuilder::<_, Block>::new( + self.l1_provider.weak_client(), + "eth_getBlockByNumber", + (self.tag, false), + ) + .with_poll_interval(self.poll_interval) + .into_stream(); + + Box::pin(stream! { + let mut last_block = None; + while let Some(next) = poll_stream.next().await { + let info: BlockInfo = next.into_consensus().into(); + + if last_block.map(|b| b != info).unwrap_or(true) { + last_block = Some(info); + yield info; + } + } + }) + } +} + +/// The error type for the [`L1WatcherRpc`]. #[derive(Error, Debug)] pub enum L1WatcherRpcError { /// Error sending the head update event. @@ -251,11 +285,11 @@ pub enum L1WatcherRpcError { SendError(#[from] SendError), /// Error in the transport layer. #[error("Transport error: {0}")] - Transport(String), + Transport(#[from] TransportError), /// The L1 block was not found. #[error("L1 block not found: {0}")] - L1BlockNotFound(B256), - /// Nothing to update. - #[error("Nothing to update; L1 head is the same as the last observed head")] - NothingToUpdate, + L1BlockNotFound(BlockId), + /// Stream ended unexpectedly. + #[error("Stream ended unexpectedly")] + StreamEnded, } diff --git a/crates/node/service/src/service/standard/node.rs b/crates/node/service/src/service/standard/node.rs index 3fc11a9f36..1a15eb5b75 100644 --- a/crates/node/service/src/service/standard/node.rs +++ b/crates/node/service/src/service/standard/node.rs @@ -83,7 +83,8 @@ impl ValidatorNodeService for RollupNode { fn new_da_watcher( &self, - new_da_tx: UnboundedSender, + new_data_tx: UnboundedSender, + new_finalized_tx: UnboundedSender, block_signer_tx: UnboundedSender
, cancellation: CancellationToken, l1_watcher_inbound_queries: Option>, @@ -91,7 +92,8 @@ impl ValidatorNodeService for RollupNode { L1WatcherRpc::new( self.config.clone(), self.l1_provider.clone(), - new_da_tx, + new_data_tx, + new_finalized_tx, block_signer_tx, cancellation, l1_watcher_inbound_queries, diff --git a/crates/node/service/src/service/validator.rs b/crates/node/service/src/service/validator.rs index 9cfce252fd..4a45563798 100644 --- a/crates/node/service/src/service/validator.rs +++ b/crates/node/service/src/service/validator.rs @@ -27,27 +27,9 @@ use tokio_util::sync::CancellationToken; /// 2. The L2 sequencer, which produces unsafe L2 blocks and sends them to the network over p2p /// gossip. /// -/// From these two sources, the validator node imports `unsafe` blocks from the L2 sequencer and -/// `safe` blocks from the L2 derivation pipeline into the L2 execution layer via the Engine API. -/// -/// Finally, a state actor listens for new L2 block import events and updates the L2 state -/// accordingly, sending notifications to the other actors for synchronization. -/// -/// ## Actor Communication -/// -/// ```not_rust -/// ┌────────────┐ -/// │L2 Sequencer│ -/// │ ├───┐ -/// │ Gossip │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ -/// └────────────┘ │ │ │ │ │ │ │ -/// ├──►│ Derivation │──►│ Engine API │──►│ State │ -/// ┌────────────┐ │ │ │ │ │ │ │ -/// │ DA │ │ └────────────┘ └┬───────────┘ └┬───────────┘ -/// │ ├───┘ ▲ │ │ -/// │ Watcher │ └──────┴────────────────┘ -/// └────────────┘ -/// ``` +/// From these two sources, the validator node imports `unsafe` blocks from the L2 sequencer, +/// `safe` blocks from the L2 derivation pipeline into the L2 execution layer via the Engine API, +/// and finalizes `safe` blocks that it has derived when L1 finalized block updates are received. /// /// ## Types /// @@ -71,10 +53,13 @@ pub trait ValidatorNodeService { /// Creates a new [`NodeActor`] instance that watches the data availability layer. The /// `new_data_tx` channel is used to send updates on the data availability layer to the - /// derivation pipeline. The `cancellation` token is used to gracefully shut down the actor. + /// derivation pipeline. The `new_finalized_tx` is used to send updates on the data + /// availability layer to the engine for finalizing derived blocks. The `cancellation` + /// token is used to gracefully shut down the actor. fn new_da_watcher( &self, new_data_tx: UnboundedSender, + new_finalized_tx: UnboundedSender, block_signer_tx: UnboundedSender
, cancellation: CancellationToken, l1_watcher_inbound_queries: Option>, @@ -103,6 +88,7 @@ pub trait ValidatorNodeService { // Create channels for communication between actors. let (new_head_tx, new_head_rx) = mpsc::unbounded_channel(); + let (new_finalized_tx, new_finalized_rx) = mpsc::unbounded_channel(); let (derived_payload_tx, derived_payload_rx) = mpsc::unbounded_channel(); let (unsafe_block_tx, unsafe_block_rx) = mpsc::unbounded_channel(); let (sync_complete_tx, sync_complete_rx) = mpsc::unbounded_channel(); @@ -114,6 +100,7 @@ pub trait ValidatorNodeService { let (l1_watcher_queries_sender, l1_watcher_queries_recv) = tokio::sync::mpsc::channel(1024); let da_watcher = Some(self.new_da_watcher( new_head_tx, + new_finalized_tx, block_signer_tx, cancellation.clone(), Some(l1_watcher_queries_recv), @@ -161,6 +148,7 @@ pub trait ValidatorNodeService { derived_payload_rx, unsafe_block_rx, reset_request_rx, + new_finalized_rx, Some(engine_query_recv), cancellation.clone(), ); diff --git a/crates/node/sources/src/sync/mod.rs b/crates/node/sources/src/sync/mod.rs index 43bc2ff3a2..ea45918dcc 100644 --- a/crates/node/sources/src/sync/mod.rs +++ b/crates/node/sources/src/sync/mod.rs @@ -85,12 +85,16 @@ pub async fn find_starting_forkchoice( let is_behind_sequence_window = current_fc.un_safe.l1_origin.number.saturating_sub(cfg.seq_window_size) > safe_cursor.l1_origin.number; + let is_finalized = safe_cursor.block_info.hash == current_fc.finalized.block_info.hash; let is_genesis = safe_cursor.block_info.hash == cfg.genesis.l2.hash; - if is_behind_sequence_window || is_genesis { + if is_behind_sequence_window || is_finalized || is_genesis { info!( target: "sync_start", l2_safe = %safe_cursor.block_info.number, - "Found L2 safe block beyond sequencing window" + is_behind_sequence_window, + is_finalized, + is_genesis, + "Found suitable L2 safe block" ); current_fc.safe = safe_cursor; break; diff --git a/crates/protocol/derive/src/pipeline/core.rs b/crates/protocol/derive/src/pipeline/core.rs index 31e0a11673..bbccc3e6d7 100644 --- a/crates/protocol/derive/src/pipeline/core.rs +++ b/crates/protocol/derive/src/pipeline/core.rs @@ -200,7 +200,7 @@ mod tests { fn default_test_payload_attributes() -> OpAttributesWithParent { OpAttributesWithParent { - attributes: OpPayloadAttributes { + inner: OpPayloadAttributes { payload_attributes: PayloadAttributes { timestamp: 0, prev_randao: Default::default(), @@ -214,6 +214,7 @@ mod tests { eip_1559_params: None, }, parent: Default::default(), + l1_origin: Default::default(), is_last_in_span: false, } } diff --git a/crates/protocol/derive/src/stages/attributes_queue.rs b/crates/protocol/derive/src/stages/attributes_queue.rs index 6e6e25c567..5ffc45f9ae 100644 --- a/crates/protocol/derive/src/stages/attributes_queue.rs +++ b/crates/protocol/derive/src/stages/attributes_queue.rs @@ -85,8 +85,9 @@ where return Err(e); } }; + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; let populated_attributes = - OpAttributesWithParent { attributes, parent, is_last_in_span: self.is_last_in_span }; + OpAttributesWithParent::new(attributes, parent, origin, self.is_last_in_span); // Clear out the local state once payload attributes are prepared. self.batch = None; @@ -366,7 +367,8 @@ mod tests { #[tokio::test] async fn test_next_attributes_load_batch_last_in_span() { let cfg = RollupConfig::default(); - let mock = new_test_attributes_provider(None, vec![Ok(Default::default())]); + let mock = + new_test_attributes_provider(Some(Default::default()), vec![Ok(Default::default())]); let mut pa = default_optimism_payload_attributes(); let mock_builder = TestAttributesBuilder { attributes: vec![Ok(pa.clone())] }; let mut aq = AttributesQueue::new(Arc::new(cfg), mock, mock_builder); @@ -380,8 +382,9 @@ mod tests { let attributes = aq.next_attributes(L2BlockInfo::default()).await.unwrap(); pa.no_tx_pool = Some(true); let populated_attributes = OpAttributesWithParent { - attributes: pa, + inner: pa, parent: L2BlockInfo::default(), + l1_origin: BlockInfo::default(), is_last_in_span: true, }; assert_eq!(attributes, populated_attributes); diff --git a/crates/protocol/driver/src/core.rs b/crates/protocol/driver/src/core.rs index d9fb276905..7a35283273 100644 --- a/crates/protocol/driver/src/core.rs +++ b/crates/protocol/driver/src/core.rs @@ -13,7 +13,7 @@ use kona_derive::{ }; use kona_executor::BlockBuildingOutcome; use kona_genesis::RollupConfig; -use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; +use kona_protocol::L2BlockInfo; use op_alloy_consensus::{OpBlock, OpTxEnvelope, OpTxType}; use spin::RwLock; @@ -85,12 +85,9 @@ where } } - let OpAttributesWithParent { mut attributes, .. } = match self - .pipeline - .produce_payload(tip_cursor.l2_safe_head) - .await + let mut attributes = match self.pipeline.produce_payload(tip_cursor.l2_safe_head).await { - Ok(attrs) => attrs, + Ok(attrs) => attrs.take_inner(), Err(PipelineErrorKind::Critical(PipelineError::EndOfSource)) => { warn!(target: "client", "Exhausted data source; Halting derivation and using current safe head."); diff --git a/crates/protocol/protocol/src/attributes.rs b/crates/protocol/protocol/src/attributes.rs index d220ce6e22..daf023289c 100644 --- a/crates/protocol/protocol/src/attributes.rs +++ b/crates/protocol/protocol/src/attributes.rs @@ -1,18 +1,20 @@ //! Optimism Payload attributes that reference the parent L2 block. -use crate::L2BlockInfo; +use crate::{BlockInfo, L2BlockInfo}; use alloc::vec; use op_alloy_consensus::OpTxType; use op_alloy_rpc_types_engine::OpPayloadAttributes; -/// Optimism Payload Attributes with parent block reference. +/// Optimism Payload Attributes with parent block reference and the L1 origin block. #[derive(Debug, Clone, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct OpAttributesWithParent { /// The payload attributes. - pub attributes: OpPayloadAttributes, + pub inner: OpPayloadAttributes, /// The parent block reference. pub parent: L2BlockInfo, + /// The L1 block that the attributes were derived from. + pub l1_origin: BlockInfo, /// Whether the current batch is the last in its span. pub is_last_in_span: bool, } @@ -20,16 +22,22 @@ pub struct OpAttributesWithParent { impl OpAttributesWithParent { /// Create a new [OpAttributesWithParent] instance. pub const fn new( - attributes: OpPayloadAttributes, + inner: OpPayloadAttributes, parent: L2BlockInfo, + l1_origin: BlockInfo, is_last_in_span: bool, ) -> Self { - Self { attributes, parent, is_last_in_span } + Self { inner, parent, l1_origin, is_last_in_span } + } + + /// Consumes `self` and returns the inner [`OpPayloadAttributes`]. + pub fn take_inner(self) -> OpPayloadAttributes { + self.inner } /// Returns the payload attributes. - pub const fn attributes(&self) -> &OpPayloadAttributes { - &self.attributes + pub const fn inner(&self) -> &OpPayloadAttributes { + &self.inner } /// Returns the parent block reference. @@ -37,6 +45,11 @@ impl OpAttributesWithParent { &self.parent } + /// Returns the L1 origin block reference. + pub const fn l1_origin(&self) -> &BlockInfo { + &self.l1_origin + } + /// Returns whether the current batch is the last in its span. pub const fn is_last_in_span(&self) -> bool { self.is_last_in_span @@ -44,7 +57,7 @@ impl OpAttributesWithParent { /// Returns `true` if all transactions in the payload are deposits. pub fn is_deposits_only(&self) -> bool { - self.attributes + self.inner .transactions .iter() .all(|tx| tx.first().is_some_and(|tx| tx[0] == OpTxType::Deposit as u8)) @@ -53,15 +66,16 @@ impl OpAttributesWithParent { /// Converts the [`OpAttributesWithParent`] into a deposits-only payload. pub fn as_deposits_only(&self) -> Self { Self { - attributes: OpPayloadAttributes { - transactions: self.attributes.transactions.as_ref().map(|txs| { + inner: OpPayloadAttributes { + transactions: self.inner.transactions.as_ref().map(|txs| { txs.iter() .map(|_| alloy_primitives::Bytes::from(vec![OpTxType::Deposit as u8])) .collect() }), - ..self.attributes.clone() + ..self.inner.clone() }, parent: self.parent, + l1_origin: self.l1_origin, is_last_in_span: self.is_last_in_span, } } @@ -75,11 +89,12 @@ mod tests { fn test_op_attributes_with_parent() { let attributes = OpPayloadAttributes::default(); let parent = L2BlockInfo::default(); + let l1_origin = BlockInfo::default(); let is_last_in_span = true; let op_attributes_with_parent = - OpAttributesWithParent::new(attributes.clone(), parent, is_last_in_span); + OpAttributesWithParent::new(attributes.clone(), parent, l1_origin, is_last_in_span); - assert_eq!(op_attributes_with_parent.attributes(), &attributes); + assert_eq!(op_attributes_with_parent.inner(), &attributes); assert_eq!(op_attributes_with_parent.parent(), &parent); assert_eq!(op_attributes_with_parent.is_last_in_span(), is_last_in_span); }