diff --git a/Cargo.lock b/Cargo.lock index f2cb8343b5..627894fe80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4260,6 +4260,7 @@ dependencies = [ "kona-macros", "kona-protocol", "kona-registry", + "kona-sources", "metrics", "metrics-exporter-prometheus", "op-alloy-consensus", diff --git a/bin/node/src/commands/node.rs b/bin/node/src/commands/node.rs index ccb151c36f..4593849120 100644 --- a/bin/node/src/commands/node.rs +++ b/bin/node/src/commands/node.rs @@ -111,6 +111,7 @@ impl NodeCommand { let engine_client = kona_engine::EngineClient::new_http( self.l2_engine_rpc.clone(), self.l2_provider_rpc.clone(), + self.l1_eth_rpc.clone(), Arc::new(config.clone()), jwt_secret, ); diff --git a/crates/node/engine/Cargo.toml b/crates/node/engine/Cargo.toml index 52eb483092..d274e554ea 100644 --- a/crates/node/engine/Cargo.toml +++ b/crates/node/engine/Cargo.toml @@ -16,6 +16,7 @@ workspace = true kona-genesis.workspace = true kona-macros.workspace = true kona-protocol = {workspace = true, features = ["serde", "std"]} +kona-sources.workspace = true # alloy alloy-eips.workspace = true diff --git a/crates/node/engine/src/client.rs b/crates/node/engine/src/client.rs index c9bd7fb873..106dc2f9a7 100644 --- a/crates/node/engine/src/client.rs +++ b/crates/node/engine/src/client.rs @@ -10,7 +10,7 @@ use alloy_rpc_types_engine::{ ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2, ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus, }; -use alloy_rpc_types_eth::{Block, SyncStatus}; +use alloy_rpc_types_eth::Block; use alloy_transport::{RpcError, TransportErrorKind, TransportResult}; use alloy_transport_http::{ AuthLayer, AuthService, Http, HyperClient, @@ -56,7 +56,9 @@ pub struct EngineClient { #[deref] engine: RootProvider, /// The L2 chain provider. - rpc: RootProvider, + l2_provider: RootProvider, + /// The L1 chain provider. + l1_provider: RootProvider, /// The [RollupConfig] for the chain used to timestamp which version of the engine api to use. cfg: Arc, } @@ -75,17 +77,28 @@ impl EngineClient { } /// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret]. - pub fn new_http(engine: Url, rpc: Url, cfg: Arc, jwt: JwtSecret) -> Self { + pub fn new_http( + engine: Url, + l2_rpc: Url, + l1_rpc: Url, + cfg: Arc, + jwt: JwtSecret, + ) -> Self { let engine = Self::rpc_client::(engine, jwt); - let rpc = Self::rpc_client::(rpc, jwt); + let l2_provider = RootProvider::::new_http(l2_rpc); + let l1_provider = RootProvider::new_http(l1_rpc); - Self { engine, rpc, cfg } + Self { engine, l2_provider, l1_provider, cfg } } - /// Returns the [`SyncStatus`] of the engine. - pub async fn syncing(&self) -> Result { - let status = >::syncing(&self.engine).await?; - Ok(status) + /// Returns a reference to the inner L2 [`RootProvider`]. + pub const fn l2_provider(&self) -> &RootProvider { + &self.l2_provider + } + + /// Returns a reference to the inner L1 [`RootProvider`]. + pub const fn l1_provider(&self) -> &RootProvider { + &self.l1_provider } /// Fetches the [`Block`] for the given [`BlockNumberOrTag`]. @@ -93,7 +106,7 @@ impl EngineClient { &self, numtag: BlockNumberOrTag, ) -> Result>, EngineClientError> { - Ok(>::get_block_by_number(&self.rpc, numtag).full().await?) + Ok(>::get_block_by_number(&self.l2_provider, numtag).full().await?) } /// Fetches the [L2BlockInfo] by [BlockNumberOrTag]. @@ -101,7 +114,8 @@ impl EngineClient { &self, numtag: BlockNumberOrTag, ) -> Result, EngineClientError> { - let block = >::get_block_by_number(&self.rpc, numtag).full().await?; + let block = + >::get_block_by_number(&self.l2_provider, numtag).full().await?; let Some(block) = block else { return Ok(None); }; diff --git a/crates/node/engine/src/lib.rs b/crates/node/engine/src/lib.rs index 0824ffeecb..d58f6aad64 100644 --- a/crates/node/engine/src/lib.rs +++ b/crates/node/engine/src/lib.rs @@ -11,9 +11,9 @@ extern crate tracing; mod task_queue; pub use task_queue::{ - BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineTask, - EngineTaskError, EngineTaskExt, EngineTaskType, ForkchoiceTask, ForkchoiceTaskError, - InsertUnsafeTask, InsertUnsafeTaskError, init_unknowns, + BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError, + EngineTask, EngineTaskError, EngineTaskExt, EngineTaskType, ForkchoiceTask, + ForkchoiceTaskError, InsertUnsafeTask, InsertUnsafeTaskError, }; mod attributes; diff --git a/crates/node/engine/src/task_queue/core.rs b/crates/node/engine/src/task_queue/core.rs index 1d412782fa..bdc38fa4f8 100644 --- a/crates/node/engine/src/task_queue/core.rs +++ b/crates/node/engine/src/task_queue/core.rs @@ -1,8 +1,18 @@ //! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s. use super::{EngineTaskError, EngineTaskExt}; -use crate::{EngineState, EngineTask, EngineTaskType}; -use std::collections::{HashMap, VecDeque}; +use crate::{EngineClient, EngineState, EngineTask, EngineTaskType, ForkchoiceTask}; +use alloy_provider::Provider; +use alloy_rpc_types_eth::Transaction; +use kona_genesis::{RollupConfig, SystemConfig}; +use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config}; +use kona_sources::{SyncStartError, find_starting_forkchoice}; +use op_alloy_consensus::OpTxEnvelope; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; +use thiserror::Error; use tokio::sync::watch::Sender; /// The [`Engine`] task queue. @@ -40,14 +50,63 @@ impl Engine { } } + /// Returns a reference to the inner [`EngineState`]. + pub const fn state(&self) -> &EngineState { + &self.state + } + + /// Returns a receiver that can be used to listen to engine state updates. + pub fn subscribe(&self) -> tokio::sync::watch::Receiver { + self.state_sender.subscribe() + } + /// Enqueues a new [`EngineTask`] for execution. pub fn enqueue(&mut self, task: EngineTask) { self.tasks.entry(task.ty()).or_default().push_back(task); } - /// Returns a reference to the inner [`EngineState`]. - pub const fn state(&self) -> &EngineState { - &self.state + /// Resets the engine by finding a plausible sync starting point via + /// [`find_starting_forkchoice`]. The state will be updated to the starting point, and a + /// forkchoice update will be enqueued in order to reorg the execution layer. + pub async fn reset( + &mut self, + client: Arc, + config: &RollupConfig, + ) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> { + let start = + find_starting_forkchoice(config, client.l1_provider(), client.l2_provider()).await?; + + self.state.set_unsafe_head(start.un_safe); + self.state.set_cross_unsafe_head(start.un_safe); + self.state.set_local_safe_head(start.safe); + self.state.set_safe_head(start.safe); + self.state.set_finalized_head(start.finalized); + + self.enqueue(EngineTask::ForkchoiceUpdate(ForkchoiceTask::new(client.clone()))); + + let origin_block = + start.safe.l1_origin.number - config.channel_timeout(start.safe.block_info.timestamp); + let l1_origin_info: BlockInfo = client + .l1_provider() + .get_block(origin_block.into()) + .await + .map_err(SyncStartError::RpcError)? + .ok_or(SyncStartError::BlockNotFound(origin_block.into()))? + .into_consensus() + .into(); + + let l2_safe_block = client + .l2_provider() + .get_block(start.safe.block_info.hash.into()) + .full() + .await + .map_err(SyncStartError::RpcError)? + .ok_or(SyncStartError::BlockNotFound(origin_block.into()))? + .into_consensus() + .map_transactions(|t| as Clone>::clone(&t).into_inner()); + let system_config = to_system_config(&l2_safe_block, config)?; + + Ok((start.safe, l1_origin_info, system_config)) } /// Clears the task queue. @@ -69,11 +128,6 @@ impl Engine { ty } - /// Returns a receiver that can be used to listen to engine state updates. - pub fn subscribe(&self) -> tokio::sync::watch::Receiver { - self.state_sender.subscribe() - } - /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns /// an error along the way, it is not popped from the queue (in case it must be retried) and /// the error is returned. @@ -109,3 +163,17 @@ impl Engine { } } } + +/// An error occurred while attempting to reset the [`Engine`]. +#[derive(Debug, Error)] +pub enum EngineResetError { + /// An error that originated from within the engine task. + #[error(transparent)] + Task(#[from] EngineTaskError), + /// An error occurred while traversing the L1 for the sync starting point. + #[error(transparent)] + SyncStart(#[from] SyncStartError), + /// An error occurred while constructing the SystemConfig for the new safe head. + #[error(transparent)] + SystemConfigConversion(#[from] OpBlockConversionError), +} diff --git a/crates/node/engine/src/task_queue/mod.rs b/crates/node/engine/src/task_queue/mod.rs index c83b78efa6..57fd6ee990 100644 --- a/crates/node/engine/src/task_queue/mod.rs +++ b/crates/node/engine/src/task_queue/mod.rs @@ -1,7 +1,7 @@ //! The [`Engine`] task queue and the [`EngineTask`]s it can execute. mod core; -pub use core::Engine; +pub use core::{Engine, EngineResetError}; mod tasks; pub use tasks::*; diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs index bfe08d3b9e..c7b1f18534 100644 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs +++ b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs @@ -2,6 +2,7 @@ use crate::{ EngineClient, EngineState, EngineTaskError, EngineTaskExt, ForkchoiceTaskError, Metrics, + SyncStatus, }; use alloy_rpc_types_engine::INVALID_FORK_CHOICE_STATE_ERROR; use async_trait::async_trait; @@ -31,8 +32,8 @@ impl EngineTaskExt for ForkchoiceTask { return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded.into()); } - // If the engine is syncing, log a warning. We can still attempt to apply the forkchoice - // update. + // If the engine is syncing, log a warning. We can still attempt to apply the + // forkchoice update. if state.sync_status.is_syncing() { warn!(target: "engine", "Attempting to update forkchoice state while EL syncing"); } @@ -47,7 +48,17 @@ impl EngineTaskExt for ForkchoiceTask { } // Send the forkchoice update through the input. - let forkchoice = state.create_forkchoice_state(); + let mut forkchoice = state.create_forkchoice_state(); + + // For the first FCU triggered by the initial engine reset, zero out the `safe` and + // `finalized` hashes to trigger optimistic pipeline sync on `op-reth`. + if matches!(state.sync_status, SyncStatus::ExecutionLayerWillStart) { + // Progress the engine sync state machine. + state.sync_status = SyncStatus::ExecutionLayerStarted; + + forkchoice.safe_block_hash = Default::default(); + forkchoice.finalized_block_hash = Default::default(); + } // Handle the forkchoice update result. if let Err(e) = self.client.fork_choice_updated_v3(forkchoice, None).await { diff --git a/crates/node/engine/src/task_queue/tasks/insert/task.rs b/crates/node/engine/src/task_queue/tasks/insert/task.rs index 78fe108a0f..b821571136 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/task.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/task.rs @@ -188,9 +188,6 @@ impl EngineTaskExt for InsertUnsafeTask { state.sync_status = SyncStatus::ExecutionLayerFinished; } - // Initialize unknowns if needed. - crate::init_unknowns(state, self.client.clone()).await; - info!( target: "engine", hash = new_unsafe_ref.block_info.hash.to_string(), diff --git a/crates/node/engine/src/task_queue/tasks/mod.rs b/crates/node/engine/src/task_queue/tasks/mod.rs index 64d7b0de7b..19d7a1a929 100644 --- a/crates/node/engine/src/task_queue/tasks/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/mod.rs @@ -1,8 +1,5 @@ //! Tasks to update the engine state. -mod unknowns; -pub use unknowns::init_unknowns; - mod task; pub use task::{EngineTask, EngineTaskError, EngineTaskExt, EngineTaskType}; diff --git a/crates/node/engine/src/task_queue/tasks/task.rs b/crates/node/engine/src/task_queue/tasks/task.rs index 5235dee672..2bd0e6663f 100644 --- a/crates/node/engine/src/task_queue/tasks/task.rs +++ b/crates/node/engine/src/task_queue/tasks/task.rs @@ -115,14 +115,14 @@ pub trait EngineTaskExt { pub enum EngineTaskError { /// A temporary error within the engine. #[error("Temporary engine task error: {0}")] - Temporary(Box), + Temporary(Box), /// A critical error within the engine. #[error("Critical engine task error: {0}")] - Critical(Box), + Critical(Box), /// An error that requires a derivation pipeline reset. #[error("Derivation pipeline reset required: {0}")] - Reset(Box), + Reset(Box), /// An error that requires the derivation pipeline to be flushed. #[error("Derivation pipeline flush required: {0}")] - Flush(Box), + Flush(Box), } diff --git a/crates/node/engine/src/task_queue/tasks/unknowns.rs b/crates/node/engine/src/task_queue/tasks/unknowns.rs deleted file mode 100644 index b4a9643794..0000000000 --- a/crates/node/engine/src/task_queue/tasks/unknowns.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! Utility function for initializing unknown engine state. - -use crate::{EngineClient, EngineState}; -use alloy_eips::eip1898::BlockNumberOrTag; -use std::sync::Arc; - -/// Initialize Unknown Engine State. -/// -/// For each unknown head, attempt to fetch it from the [`EngineClient`]. -pub async fn init_unknowns(state: &mut EngineState, client: Arc) { - // Initialize the unsafe head if it is not already set. - if state.unsafe_head.block_info.hash.is_zero() { - let head = match client.l2_block_info_by_label(BlockNumberOrTag::Pending).await { - Ok(Some(h)) => h, - Ok(None) => { - warn!(target: "engine", "No pending head found"); - return; - } - Err(e) => { - warn!(target: "engine", ?e, "Error fetching pending head"); - return; - } - }; - state.set_unsafe_head(head); - } - - // Initialize the finalized head if it is not already set. - if state.finalized_head.block_info.hash.is_zero() { - let head = match client.l2_block_info_by_label(BlockNumberOrTag::Finalized).await { - Ok(Some(h)) => h, - Ok(None) => { - warn!(target: "engine", "No finalized head found"); - return; - } - Err(e) => { - warn!(target: "engine", ?e, "Error fetching finalized head"); - return; - } - }; - state.set_finalized_head(head); - } - - // Initialize the safe head if it is not already set. - if state.safe_head.block_info.hash.is_zero() { - let head = match client.l2_block_info_by_label(BlockNumberOrTag::Safe).await { - Ok(Some(h)) => h, - Ok(None) => { - warn!(target: "engine", "No safe head found"); - return; - } - Err(e) => { - warn!(target: "engine", ?e, "Error fetching safe head"); - return; - } - }; - state.set_safe_head(head); - } - - // If the cross unsafe head is not set, set it to the safe head. - if state.cross_unsafe_head.block_info.hash.is_zero() { - state.set_cross_unsafe_head(state.safe_head); - } - // If the local safe head is not set, set it to the safe head. - if state.local_safe_head.block_info.hash.is_zero() { - state.set_local_safe_head(state.safe_head); - } -} diff --git a/crates/node/service/src/actors/derivation.rs b/crates/node/service/src/actors/derivation.rs index dffea0a119..9d7fe49791 100644 --- a/crates/node/service/src/actors/derivation.rs +++ b/crates/node/service/src/actors/derivation.rs @@ -214,6 +214,8 @@ where async fn start(mut self) -> Result<(), Self::Error> { loop { select! { + biased; + _ = self.cancellation.cancelled() => { info!( target: "derivation", @@ -221,6 +223,18 @@ where ); return Ok(()); } + signal = self.derivation_signal_rx.recv() => { + let Some(signal) = signal else { + error!( + target: "derivation", + ?signal, + "DerivationActor failed to receive signal" + ); + return Err(DerivationError::SignalReceiveFailed); + }; + + self.signal(signal).await; + } _ = self.sync_complete_rx.recv() => { if self.engine_ready { // Already received the signal, ignore. @@ -243,18 +257,6 @@ where self.process(InboundDerivationMessage::NewDataAvailable).await?; } - signal = self.derivation_signal_rx.recv() => { - let Some(signal) = signal else { - error!( - target: "derivation", - ?signal, - "DerivationActor failed to receive signal" - ); - return Err(DerivationError::SignalReceiveFailed); - }; - - self.signal(signal).await; - } _ = self.engine_l2_safe_head.changed() => { self.process(InboundDerivationMessage::SafeHeadUpdated).await?; } diff --git a/crates/node/service/src/actors/engine.rs b/crates/node/service/src/actors/engine.rs index f1f5435fb4..4aeaa0b10c 100644 --- a/crates/node/service/src/actors/engine.rs +++ b/crates/node/service/src/actors/engine.rs @@ -2,9 +2,9 @@ use alloy_rpc_types_engine::JwtSecret; use async_trait::async_trait; -use kona_derive::types::Signal; +use kona_derive::types::{ResetSignal, Signal}; use kona_engine::{ - ConsolidateTask, Engine, EngineClient, EngineQueries, EngineStateBuilder, + ConsolidateTask, Engine, EngineClient, EngineQueries, EngineResetError, EngineStateBuilder, EngineStateBuilderError, EngineTask, EngineTaskError, InsertUnsafeTask, }; use kona_genesis::RollupConfig; @@ -133,8 +133,10 @@ pub struct EngineLauncher { pub config: Arc, /// The engine rpc url. pub engine_url: Url, - /// The l2 rpc url. + /// The L2 rpc url. pub l2_rpc_url: Url, + /// The L1 rpc url. + pub l1_rpc_url: Url, /// The engine jwt secret. pub jwt_secret: JwtSecret, } @@ -146,7 +148,14 @@ impl EngineLauncher { let state = self.state_builder().build().await?; let (engine_state_send, _) = tokio::sync::watch::channel(state); - Ok(Engine::new(state, engine_state_send)) + let mut engine = Engine::new(state, engine_state_send); + + engine + .reset(Arc::new(self.client()), &self.config) + .await + .expect("TODO: Handled in follow-up PR"); + + Ok(engine) } /// Returns the [`EngineClient`]. @@ -154,6 +163,7 @@ impl EngineLauncher { EngineClient::new_http( self.engine_url.clone(), self.l2_rpc_url.clone(), + self.l1_rpc_url.clone(), self.config.clone(), self.jwt_secret, ) @@ -203,12 +213,27 @@ impl NodeActor for EngineActor { let sent = self.engine_l2_safe_head_tx.send_if_modified(update); trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update"); } + Err(EngineTaskError::Reset(e)) => { + warn!(target: "engine", err = ?e, "Received reset request"); + let (l2_safe_head, l1_origin, system_config) = + self.engine.reset(self.client.clone(), &self.config).await?; + + let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; + match self.derivation_signal_tx.send(signal.signal()) { + Ok(_) => debug!(target: "engine", "Sent reset signal to derivation actor"), + Err(e) => { + error!(target: "engine", ?e, "Failed to send reset signal to the derivation actor"); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + } + } + } Err(EngineTaskError::Flush(e)) => { // This error is encountered when the payload is marked INVALID // by the engine api. Post-holocene, the payload is replaced by // a "deposits-only" block and re-executed. At the same time, // the channel and any remaining buffered batches are flushed. - warn!(target: "engine", ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline."); + warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline."); match self.derivation_signal_tx.send(Signal::FlushChannel) { Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"), Err(e) => { @@ -285,4 +310,7 @@ pub enum EngineError { /// Closed channel error. #[error("closed channel error")] ChannelClosed, + /// Engine reset error. + #[error(transparent)] + EngineReset(#[from] EngineResetError), } diff --git a/crates/node/service/src/service/standard/builder.rs b/crates/node/service/src/service/standard/builder.rs index 7f0f8bfd02..0ce4218d56 100644 --- a/crates/node/service/src/service/standard/builder.rs +++ b/crates/node/service/src/service/standard/builder.rs @@ -139,6 +139,7 @@ impl RollupNodeBuilder { let engine_launcher = EngineLauncher { config: Arc::clone(&config), l2_rpc_url, + l1_rpc_url: l1_rpc_url.clone(), engine_url: self.l2_engine_rpc_url.expect("missing l2 engine rpc url"), jwt_secret, };