diff --git a/.config/kurtosis_network_params.yaml b/.config/kurtosis_network_params.yaml index 37a1f18906..cd99a91b70 100644 --- a/.config/kurtosis_network_params.yaml +++ b/.config/kurtosis_network_params.yaml @@ -4,12 +4,16 @@ optimism_package: chains: - participants: - - el_type: op-reth + - el_type: op-geth cl_type: op-node + cl_log_level: "debug" + el_log_level: "debug" + count: 4 - el_type: op-reth cl_type: kona-node - cl_log_level: "info" - count: 1 + cl_log_level: "debug" + el_log_level: "debug" + count: 4 network_params: network: "kurtosis" network_id: "2151908" diff --git a/Cargo.lock b/Cargo.lock index 62d92554ab..08f7395566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4400,7 +4400,9 @@ dependencies = [ "kona-genesis", "kona-macros", "kona-protocol", + "kona-providers-alloy", "kona-registry", + "kona-sources", "metrics", "metrics-exporter-prometheus", "op-alloy-consensus", diff --git a/bin/node/src/commands/node.rs b/bin/node/src/commands/node.rs index 923e9950c4..fc1cfb1f72 100644 --- a/bin/node/src/commands/node.rs +++ b/bin/node/src/commands/node.rs @@ -1,5 +1,6 @@ //! Node Subcommand. +use alloy_provider::RootProvider; use alloy_rpc_types_engine::JwtSecret; use anyhow::{Result, bail}; use backon::{ExponentialBuilder, Retryable}; @@ -107,8 +108,12 @@ impl NodeCommand { /// Since the engine client will fail if the jwt token is invalid, this allows to ensure /// that the jwt token passed as a cli arg is correct. pub async fn validate_jwt(&self, config: &RollupConfig) -> anyhow::Result { + let l1_rpc_url = self.l1_eth_rpc.clone(); + let l1_provider = RootProvider::new_http(l1_rpc_url.clone()); + let jwt_secret = self.jwt_secret().ok_or(anyhow::anyhow!("Invalid JWT secret"))?; let engine_client = kona_engine::EngineClient::new_http( + l1_provider, self.l2_engine_rpc.clone(), self.l2_provider_rpc.clone(), Arc::new(config.clone()), diff --git a/crates/node/engine/Cargo.toml b/crates/node/engine/Cargo.toml index 52eb483092..4a9294227f 100644 --- a/crates/node/engine/Cargo.toml +++ b/crates/node/engine/Cargo.toml @@ -16,6 +16,8 @@ workspace = true kona-genesis.workspace = true kona-macros.workspace = true kona-protocol = {workspace = true, features = ["serde", "std"]} +kona-sources.workspace = true +kona-providers-alloy.workspace = true # alloy alloy-eips.workspace = true diff --git a/crates/node/engine/src/client.rs b/crates/node/engine/src/client.rs index df119941ae..b43485efe4 100644 --- a/crates/node/engine/src/client.rs +++ b/crates/node/engine/src/client.rs @@ -1,7 +1,7 @@ //! An Engine API Client. use alloy_eips::eip1898::BlockNumberOrTag; -use alloy_network::{AnyNetwork, Network}; +use alloy_network::{AnyNetwork, Ethereum, Network}; use alloy_primitives::{B256, BlockHash, Bytes}; use alloy_provider::{Provider, RootProvider}; use alloy_rpc_client::RpcClient; @@ -20,6 +20,7 @@ use alloy_transport_http::{ }; use derive_more::Deref; use http_body_util::Full; +use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider}; use op_alloy_network::Optimism; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types::Transaction; @@ -59,9 +60,26 @@ pub struct EngineClient { rpc: RootProvider, /// The [RollupConfig] for the chain used to timestamp which version of the engine api to use. cfg: Arc, + /// The L1 provider. + l1_provider: RootProvider, } impl EngineClient { + const ALLOY_PROVIDERS_CACHE_SIZE: usize = 1024; + + /// Returns alloy chain providers. + pub fn alloy_providers(&self) -> (AlloyChainProvider, AlloyL2ChainProvider) { + let l1_provider = + AlloyChainProvider::new(self.l1_provider.clone(), Self::ALLOY_PROVIDERS_CACHE_SIZE); + let l2_provider = AlloyL2ChainProvider::new( + self.rpc.clone(), + self.cfg.clone(), + Self::ALLOY_PROVIDERS_CACHE_SIZE, + ); + + (l1_provider, l2_provider) + } + /// Creates a new RPC client for the given address and JWT secret. fn rpc_client(addr: Url, jwt: JwtSecret) -> RootProvider { let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); @@ -75,11 +93,17 @@ impl EngineClient { } /// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret]. - pub fn new_http(engine: Url, rpc: Url, cfg: Arc, jwt: JwtSecret) -> Self { + pub fn new_http( + l1_provider: RootProvider, + engine: Url, + rpc: Url, + cfg: Arc, + jwt: JwtSecret, + ) -> Self { let engine = Self::rpc_client::(engine, jwt); let rpc = Self::rpc_client::(rpc, jwt); - Self { engine, rpc, cfg } + Self { l1_provider, engine, rpc, cfg } } /// Returns the [`SyncStatus`] of the engine. diff --git a/crates/node/engine/src/task_queue/core.rs b/crates/node/engine/src/task_queue/core.rs index a03357a11a..2fe51f349d 100644 --- a/crates/node/engine/src/task_queue/core.rs +++ b/crates/node/engine/src/task_queue/core.rs @@ -1,9 +1,14 @@ //! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s. -use super::{EngineTaskError, EngineTaskExt}; -use crate::{EngineState, EngineTask, EngineTaskType}; +use super::{EngineTaskError, EngineTaskExt, ForkchoiceTask}; +use crate::{EngineClient, EngineState, EngineTask, EngineTaskType}; +use kona_genesis::RollupConfig; use kona_protocol::L2BlockInfo; -use std::collections::{HashMap, VecDeque}; +use kona_sources::find_starting_forkchoice; +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; use tokio::sync::watch::Sender; /// The [`Engine`] task queue. @@ -46,6 +51,35 @@ impl Engine { self.tasks.entry(task.ty()).or_default().push_back(task); } + /// Resets the engine state. + pub async fn reset( + &mut self, + client: Arc, + config: Arc, + ) -> Result<(), EngineTaskError> { + let (mut l1_provider, mut l2_provider) = client.alloy_providers(); + let forkchoice = find_starting_forkchoice(&config, &mut l1_provider, &mut l2_provider) + .await + .map_err(|e| EngineTaskError::Critical(Box::new(e)))?; + + self.state.set_finalized_head(forkchoice.finalized); + self.state.set_safe_head(forkchoice.safe); + self.state.set_unsafe_head(forkchoice.un_safe); + // If the cross unsafe head is not set, set it to the safe head. + self.state.set_cross_unsafe_head(self.state.safe_head); + // If the local safe head is not set, set it to the safe head. + self.state.set_local_safe_head(self.state.safe_head); + + self.state.forkchoice_update_needed = true; + + debug!(target: "engine", unsafe = ?self.state.unsafe_head(), safe = ?self.state.safe_head(), finalized = ?self.state.finalized_head(), + "Resetted engine state. Sending FCU"); + + self.enqueue(EngineTask::ForkchoiceUpdate(ForkchoiceTask::new(client))); + + Ok(()) + } + /// Returns the L2 Safe Head [`L2BlockInfo`] from the state. pub const fn safe_head(&self) -> L2BlockInfo { self.state.safe_head() @@ -91,10 +125,12 @@ impl Engine { let Some(task) = task.front() else { return Ok(()); }; + let ty = task.ty(); + match task.execute(&mut self.state).await { Ok(_) => {} Err(EngineTaskError::Reset(e)) => { - self.clear(); + // The engine actor should trigger a reset by calling [`Engine::reset`]. return Err(EngineTaskError::Reset(e)); } e => return e, @@ -103,7 +139,6 @@ impl Engine { // Update the state and notify the engine actor. self.state_sender.send_replace(self.state); - let ty = task.ty(); if let Some(queue) = self.tasks.get_mut(&ty) { queue.pop_front(); }; diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs index bfe08d3b9e..023a446a65 100644 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs +++ b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs @@ -2,8 +2,9 @@ use crate::{ EngineClient, EngineState, EngineTaskError, EngineTaskExt, ForkchoiceTaskError, Metrics, + SyncStatus, }; -use alloy_rpc_types_engine::INVALID_FORK_CHOICE_STATE_ERROR; +use alloy_rpc_types_engine::{ForkchoiceState, INVALID_FORK_CHOICE_STATE_ERROR}; use async_trait::async_trait; use op_alloy_provider::ext::engine::OpEngineApi; use std::sync::Arc; @@ -21,11 +22,47 @@ impl ForkchoiceTask { pub const fn new(client: Arc) -> Self { Self { client } } + + async fn send_fcu(&self, forkchoice: ForkchoiceState) -> Result<(), EngineTaskError> { + // Handle the forkchoice update result. + if let Err(e) = self.client.fork_choice_updated_v3(forkchoice, None).await { + let e = e + .as_error_resp() + .and_then(|e| { + (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64) + .then_some(ForkchoiceTaskError::InvalidForkchoiceState) + }) + .unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e)); + + return Err(e.into()); + } + + Ok(()) + } } #[async_trait] impl EngineTaskExt for ForkchoiceTask { async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + // The very first FCU. We should: + // - Send a FCU with empty hashes for the safe and finalized blocks, this kickstarts the EL. + // - Reset the engine state. + if state.sync_status == SyncStatus::ExecutionLayerWillStart { + let mut forkchoice = state.create_forkchoice_state(); + forkchoice.safe_block_hash = Default::default(); + forkchoice.finalized_block_hash = Default::default(); + + info!(target: "engine", "Starting execution layer sync"); + state.sync_status = SyncStatus::ExecutionLayerStarted; + + self.send_fcu(forkchoice).await?; + + // Reset the engine state. + return Err(EngineTaskError::Reset(Box::new( + ForkchoiceTaskError::InvalidForkchoiceState, + ))); + } + // Check if a forkchoice update is not needed, return early. if !state.forkchoice_update_needed { return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded.into()); @@ -49,18 +86,7 @@ impl EngineTaskExt for ForkchoiceTask { // Send the forkchoice update through the input. let forkchoice = state.create_forkchoice_state(); - // Handle the forkchoice update result. - if let Err(e) = self.client.fork_choice_updated_v3(forkchoice, None).await { - let e = e - .as_error_resp() - .and_then(|e| { - (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64) - .then_some(ForkchoiceTaskError::InvalidForkchoiceState) - }) - .unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e)); - - return Err(e.into()); - } + self.send_fcu(forkchoice).await?; state.forkchoice_update_needed = false; diff --git a/crates/node/engine/src/task_queue/tasks/insert/task.rs b/crates/node/engine/src/task_queue/tasks/insert/task.rs index 78fe108a0f..af2660d9ff 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/task.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/task.rs @@ -75,12 +75,6 @@ impl InsertUnsafeTask { #[async_trait] impl EngineTaskExt for InsertUnsafeTask { async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { - // Always transition to EL sync on startup. - if state.sync_status == SyncStatus::ExecutionLayerWillStart { - info!(target: "engine", "Starting execution layer sync"); - state.sync_status = SyncStatus::ExecutionLayerStarted; - } - let time_start = Instant::now(); // Insert the new payload. @@ -188,9 +182,6 @@ impl EngineTaskExt for InsertUnsafeTask { state.sync_status = SyncStatus::ExecutionLayerFinished; } - // Initialize unknowns if needed. - crate::init_unknowns(state, self.client.clone()).await; - info!( target: "engine", hash = new_unsafe_ref.block_info.hash.to_string(), diff --git a/crates/node/engine/src/task_queue/tasks/task.rs b/crates/node/engine/src/task_queue/tasks/task.rs index 5235dee672..fd342e6ccf 100644 --- a/crates/node/engine/src/task_queue/tasks/task.rs +++ b/crates/node/engine/src/task_queue/tasks/task.rs @@ -89,7 +89,6 @@ impl EngineTaskExt for EngineTask { return Err(EngineTaskError::Critical(e)); } EngineTaskError::Reset(e) => { - warn!(target: "engine", "Engine requested derivation reset"); return Err(EngineTaskError::Reset(e)); } EngineTaskError::Flush(e) => { @@ -115,14 +114,14 @@ pub trait EngineTaskExt { pub enum EngineTaskError { /// A temporary error within the engine. #[error("Temporary engine task error: {0}")] - Temporary(Box), + Temporary(Box), /// A critical error within the engine. #[error("Critical engine task error: {0}")] - Critical(Box), + Critical(Box), /// An error that requires a derivation pipeline reset. #[error("Derivation pipeline reset required: {0}")] - Reset(Box), + Reset(Box), /// An error that requires the derivation pipeline to be flushed. #[error("Derivation pipeline flush required: {0}")] - Flush(Box), + Flush(Box), } diff --git a/crates/node/rpc/src/launcher.rs b/crates/node/rpc/src/launcher.rs index e8b7ecd27b..ce561c8c66 100644 --- a/crates/node/rpc/src/launcher.rs +++ b/crates/node/rpc/src/launcher.rs @@ -103,6 +103,7 @@ mod tests { let mut launcher = RpcLauncher::new(); launcher = launcher.set_addr(SocketAddr::from(([127, 0, 0, 1], 8080))); let result = launcher.launch().await; + println!("{result:?}"); assert!(result.is_ok()); } diff --git a/crates/node/service/src/actors/engine.rs b/crates/node/service/src/actors/engine.rs index 1d8d990fd6..c4d452d4e7 100644 --- a/crates/node/service/src/actors/engine.rs +++ b/crates/node/service/src/actors/engine.rs @@ -1,15 +1,17 @@ //! The Engine Actor +use alloy_provider::RootProvider; use alloy_rpc_types_engine::JwtSecret; use async_trait::async_trait; use kona_derive::types::Signal; use kona_engine::{ ConsolidateTask, Engine, EngineClient, EngineQueries, EngineStateBuilder, - EngineStateBuilderError, EngineTask, EngineTaskError, InsertUnsafeTask, + EngineStateBuilderError, EngineTask, EngineTaskError, ForkchoiceTask, InsertUnsafeTask, }; use kona_genesis::RollupConfig; use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; use kona_sources::RuntimeConfig; +use op_alloy_network::Ethereum; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope; use std::sync::Arc; @@ -137,6 +139,8 @@ impl EngineActor { /// Configuration for the Engine Actor. #[derive(Debug, Clone)] pub struct EngineLauncher { + /// A provider for the L1 chain. + pub l1_provider: RootProvider, /// The [`RollupConfig`]. pub config: Arc, /// The engine rpc url. @@ -154,12 +158,18 @@ impl EngineLauncher { let state = self.state_builder().build().await?; let (engine_state_send, _) = tokio::sync::watch::channel(state); - Ok(Engine::new(state, engine_state_send)) + let mut engine = Engine::new(state, engine_state_send); + + // Start with a forkchoice update. + engine.enqueue(EngineTask::ForkchoiceUpdate(ForkchoiceTask::new(self.client().into()))); + + Ok(engine) } /// Returns the [`EngineClient`]. pub fn client(&self) -> EngineClient { EngineClient::new_http( + self.l1_provider.clone(), self.engine_url.clone(), self.l2_rpc_url.clone(), self.config.clone(), @@ -197,19 +207,13 @@ impl NodeActor for EngineActor { } res = self.engine.drain() => { match res { - Ok(_) => { - trace!(target: "engine", "[ENGINE] tasks drained"); - // Update the l2 safe head if needed. - let state_safe_head = self.engine.safe_head(); - let update = |head: &mut L2BlockInfo| { - if head != &state_safe_head { - *head = state_safe_head; - return true; - } - false - }; - let sent = self.engine_l2_safe_head_tx.send_if_modified(update); - trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update"); + Ok(_) => {} + Err(EngineTaskError::Reset(error)) => { + warn!(target: "engine", err = ?error, "Resetting engine state"); + + if let Err(e) = self.engine.reset(self.client.clone(), self.config.clone()).await { + error!(target: "engine", ?e, "Failed to reset engine state"); + } } Err(EngineTaskError::Flush(e)) => { // This error is encountered when the payload is marked INVALID @@ -228,6 +232,17 @@ impl NodeActor for EngineActor { } Err(e) => warn!(target: "engine", ?e, "Error draining engine tasks"), } + + // Update the l2 safe head if needed. + let state_safe_head = self.engine.safe_head(); + let update = |head: &mut L2BlockInfo| { + if head != &state_safe_head { + *head = state_safe_head; + return true; + } + false + }; + self.engine_l2_safe_head_tx.send_if_modified(update); } attributes = self.attributes_rx.recv() => { let Some(attributes) = attributes else { diff --git a/crates/node/service/src/service/standard/builder.rs b/crates/node/service/src/service/standard/builder.rs index 7f0f8bfd02..de4e4a1b1d 100644 --- a/crates/node/service/src/service/standard/builder.rs +++ b/crates/node/service/src/service/standard/builder.rs @@ -137,6 +137,7 @@ impl RollupNodeBuilder { let config = Arc::new(self.config); let engine_launcher = EngineLauncher { + l1_provider: l1_provider.clone(), config: Arc::clone(&config), l2_rpc_url, engine_url: self.l2_engine_rpc_url.expect("missing l2 engine rpc url"),