diff --git a/crates/node/engine/src/lib.rs b/crates/node/engine/src/lib.rs index ba13fafaac..201b371080 100644 --- a/crates/node/engine/src/lib.rs +++ b/crates/node/engine/src/lib.rs @@ -11,10 +11,10 @@ extern crate tracing; mod task_queue; pub use task_queue::{ - BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError, - EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, EngineTaskExt, - FinalizeTask, FinalizeTaskError, ForkchoiceTask, ForkchoiceTaskError, InsertTask, - InsertTaskError, + BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineBuildError, + EngineResetError, EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, + EngineTaskExt, FinalizeTask, FinalizeTaskError, InsertTask, InsertTaskError, SynchronizeTask, + SynchronizeTaskError, }; mod attributes; diff --git a/crates/node/engine/src/task_queue/core.rs b/crates/node/engine/src/task_queue/core.rs index f7d70cf531..240fb87d8f 100644 --- a/crates/node/engine/src/task_queue/core.rs +++ b/crates/node/engine/src/task_queue/core.rs @@ -3,7 +3,8 @@ use super::EngineTaskExt; use crate::{ EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, EngineTaskError, - EngineTaskErrorSeverity, ForkchoiceTask, Metrics, task_queue::EngineTaskErrors, + EngineTaskErrorSeverity, Metrics, SynchronizeTask, SynchronizeTaskError, + task_queue::EngineTaskErrors, }; use alloy_provider::Provider; use alloy_rpc_types_eth::Transaction; @@ -42,9 +43,6 @@ pub struct Engine { impl Engine { /// Creates a new [`Engine`] with an empty task queue and the passed initial [`EngineState`]. - /// - /// An initial [`EngineTask::ForkchoiceUpdate`] is added to the task queue to synchronize the - /// engine with the forkchoice state of the [`EngineState`]. pub fn new( initial_state: EngineState, state_sender: Sender, @@ -89,7 +87,8 @@ impl Engine { let start = find_starting_forkchoice(&config, client.l1_provider(), client.l2_provider()).await?; - if let Err(err) = ForkchoiceTask::new( + // Retry to synchronize the engine until we succeeds or a critical error occurs. + while let Err(err) = SynchronizeTask::new( client.clone(), config.clone(), EngineSyncStateUpdate { @@ -99,16 +98,19 @@ impl Engine { safe_head: Some(start.safe), finalized_head: Some(start.finalized), }, - None, ) .execute(&mut self.state) .await { - // Ignore temporary errors. - if matches!(err.severity(), EngineTaskErrorSeverity::Temporary) { - debug!(target: "engine", "Forkchoice update failed temporarily during reset: {}", err); - } else { - return Err(EngineTaskErrors::Forkchoice(err).into()); + match err.severity() { + EngineTaskErrorSeverity::Temporary | + EngineTaskErrorSeverity::Flush | + EngineTaskErrorSeverity::Reset => { + debug!(target: "engine", ?err, "Forkchoice update failed during reset. Trying again..."); + } + EngineTaskErrorSeverity::Critical => { + return Err(EngineResetError::Forkchoice(err)); + } } } @@ -172,9 +174,9 @@ impl Engine { /// An error occurred while attempting to reset the [`Engine`]. #[derive(Debug, Error)] pub enum EngineResetError { - /// An error that originated from within an engine task. + /// An error that occurred while updating the forkchoice state. #[error(transparent)] - Task(#[from] EngineTaskErrors), + Forkchoice(#[from] SynchronizeTaskError), /// An error occurred while traversing the L1 for the sync starting point. #[error(transparent)] SyncStart(#[from] SyncStartError), diff --git a/crates/node/engine/src/task_queue/tasks/build/error.rs b/crates/node/engine/src/task_queue/tasks/build/error.rs index 49bf7db55e..d619d7d0b9 100644 --- a/crates/node/engine/src/task_queue/tasks/build/error.rs +++ b/crates/node/engine/src/task_queue/tasks/build/error.rs @@ -1,7 +1,7 @@ -//! Contains error types for the [crate::ForkchoiceTask]. +//! Contains error types for the [crate::SynchronizeTask]. use crate::{ - EngineTaskError, ForkchoiceTaskError, InsertTaskError, + EngineTaskError, InsertTaskError, SynchronizeTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, }; use alloy_rpc_types_engine::PayloadStatusEnum; @@ -11,33 +11,44 @@ use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; use thiserror::Error; use tokio::sync::mpsc; -/// An error that occurs when running the [crate::ForkchoiceTask]. +/// An error that occurs when building a payload in the engine. #[derive(Debug, Error)] -pub enum BuildTaskError { - /// The forkchoice update is not needed. - #[error("No forkchoice update needed")] - NoForkchoiceUpdateNeeded, +pub enum EngineBuildError { + /// The finalized head is ahead of the unsafe head. + #[error("Finalized head is ahead of unsafe head")] + FinalizedAheadOfUnsafe(u64, u64), + /// The forkchoice update call to the engine api failed. + #[error("Failed to build payload attributes in the engine. Forkchoice RPC error: {0}")] + AttributesInsertionFailed(#[from] RpcError), + /// The inserted payload is invalid. + #[error("The inserted payload is invalid: {0}")] + InvalidPayload(String), + /// The inserted payload status is unexpected. + #[error("The inserted payload status is unexpected: {0}")] + UnexpectedPayloadStatus(PayloadStatusEnum), + /// The payload ID is missing. + #[error("The inserted payload ID is missing")] + MissingPayloadId, /// The engine is syncing. - #[error("Attempting to update forkchoice state while EL syncing")] + #[error("The engine is syncing")] EngineSyncing, - /// Missing payload ID. - #[error("Missing payload ID")] - MissingPayloadId, +} + +/// An error that occurs when running the [crate::SynchronizeTask]. +#[derive(Debug, Error)] +pub enum BuildTaskError { + /// An error occurred when building the payload attributes in the engine. + #[error("An error occurred when building the payload attributes to the engine.")] + EngineBuildError(EngineBuildError), /// The initial forkchoice update call to the engine api failed. #[error(transparent)] - ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), + ForkchoiceUpdateFailed(#[from] SynchronizeTaskError), /// Impossible to insert the payload into the engine. #[error(transparent)] PayloadInsertionFailed(#[from] InsertTaskError), - /// Unexpected payload status - #[error("Unexpected payload status: {0}")] - UnexpectedPayloadStatus(PayloadStatusEnum), /// The get payload call to the engine api failed. #[error(transparent)] GetPayloadFailed(RpcError), - /// The new payload call to the engine api failed. - #[error(transparent)] - NewPayloadFailed(RpcError), /// A deposit-only payload failed to import. #[error("Deposit-only payload failed to import")] DepositOnlyPayloadFailed, @@ -64,13 +75,26 @@ impl EngineTaskError for BuildTaskError { match self { Self::ForkchoiceUpdateFailed(inner) => inner.severity(), Self::PayloadInsertionFailed(inner) => inner.severity(), - Self::NoForkchoiceUpdateNeeded => EngineTaskErrorSeverity::Temporary, - Self::EngineSyncing => EngineTaskErrorSeverity::Temporary, + Self::EngineBuildError(EngineBuildError::FinalizedAheadOfUnsafe(_, _)) => { + EngineTaskErrorSeverity::Critical + } + Self::EngineBuildError(EngineBuildError::AttributesInsertionFailed(_)) => { + EngineTaskErrorSeverity::Temporary + } + Self::EngineBuildError(EngineBuildError::InvalidPayload(_)) => { + EngineTaskErrorSeverity::Temporary + } + Self::EngineBuildError(EngineBuildError::UnexpectedPayloadStatus(_)) => { + EngineTaskErrorSeverity::Temporary + } + Self::EngineBuildError(EngineBuildError::MissingPayloadId) => { + EngineTaskErrorSeverity::Temporary + } + Self::EngineBuildError(EngineBuildError::EngineSyncing) => { + EngineTaskErrorSeverity::Temporary + } Self::GetPayloadFailed(_) => EngineTaskErrorSeverity::Temporary, - Self::NewPayloadFailed(_) => EngineTaskErrorSeverity::Temporary, Self::HoloceneInvalidFlush => EngineTaskErrorSeverity::Flush, - Self::MissingPayloadId => EngineTaskErrorSeverity::Critical, - Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical, Self::DepositOnlyPayloadReattemptFailed => EngineTaskErrorSeverity::Critical, Self::DepositOnlyPayloadFailed => EngineTaskErrorSeverity::Critical, Self::FromBlock(_) => EngineTaskErrorSeverity::Critical, diff --git a/crates/node/engine/src/task_queue/tasks/build/mod.rs b/crates/node/engine/src/task_queue/tasks/build/mod.rs index 65a027e3fa..0688404fd7 100644 --- a/crates/node/engine/src/task_queue/tasks/build/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/build/mod.rs @@ -4,4 +4,4 @@ mod task; pub use task::BuildTask; mod error; -pub use error::BuildTaskError; +pub use error::{BuildTaskError, EngineBuildError}; diff --git a/crates/node/engine/src/task_queue/tasks/build/task.rs b/crates/node/engine/src/task_queue/tasks/build/task.rs index ebcf199d00..237cb727cf 100644 --- a/crates/node/engine/src/task_queue/tasks/build/task.rs +++ b/crates/node/engine/src/task_queue/tasks/build/task.rs @@ -1,12 +1,13 @@ //! A task for building a new block and importing it. use super::BuildTaskError; use crate::{ - EngineClient, EngineGetPayloadVersion, EngineState, EngineTaskExt, ForkchoiceTask, - ForkchoiceTaskError, InsertTask, + EngineClient, EngineForkchoiceVersion, EngineGetPayloadVersion, EngineState, EngineTaskExt, + InsertTask, InsertTaskError::{self}, state::EngineSyncStateUpdate, + task_queue::tasks::build::error::EngineBuildError, }; -use alloy_rpc_types_engine::{ExecutionPayload, PayloadId}; +use alloy_rpc_types_engine::{ExecutionPayload, PayloadId, PayloadStatusEnum}; use async_trait::async_trait; use kona_genesis::RollupConfig; use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; @@ -43,6 +44,112 @@ impl BuildTask { Self { engine, cfg, attributes, is_attributes_derived, payload_tx } } + /// Starts the block building process by sending an initial `engine_forkchoiceUpdate` call with + /// the payload attributes to build. + /// + /// ## Observed [PayloadStatusEnum] Variants + /// The `engine_forkchoiceUpdate` payload statuses that this function observes are below. Any + /// other [PayloadStatusEnum] variant is considered a failure. + /// + /// ### Success (`VALID`) + /// If the build is successful, the [PayloadId] is returned for sealing and the external + /// actor is notified of the successful forkchoice update. + /// + /// ### Failure (`INVALID`) + /// If the forkchoice update fails, the external actor is notified of the failure. + /// + /// ### Syncing (`SYNCING`) + /// If the EL is syncing, the payload attributes are buffered and the function returns early. + /// This is a temporary state, and the function should be called again later. + async fn start_build( + &self, + state: &EngineState, + engine_client: &EngineClient, + attributes_envelope: OpAttributesWithParent, + ) -> Result { + debug!( + target: "engine_builder", + txs = attributes_envelope.inner.transactions.as_ref().map_or(0, |txs| txs.len()), + "Starting new build job" + ); + + // Sanity check if the head is behind the finalized head. If it is, this is a critical + // error. + if state.sync_state.unsafe_head().block_info.number < + state.sync_state.finalized_head().block_info.number + { + return Err(BuildTaskError::EngineBuildError(EngineBuildError::FinalizedAheadOfUnsafe( + state.sync_state.unsafe_head().block_info.number, + state.sync_state.finalized_head().block_info.number, + ))); + } + + // When inserting a payload, we advertise the parent's unsafe head as the current unsafe + // head to build on top of. + let new_forkchoice = state + .sync_state + .apply_update(EngineSyncStateUpdate { + unsafe_head: Some(attributes_envelope.parent), + ..Default::default() + }) + .create_forkchoice_state(); + + let forkchoice_version = EngineForkchoiceVersion::from_cfg( + &self.cfg, + attributes_envelope.inner.payload_attributes.timestamp, + ); + let update = match forkchoice_version { + EngineForkchoiceVersion::V3 => { + engine_client + .fork_choice_updated_v3(new_forkchoice, Some(attributes_envelope.inner)) + .await + } + EngineForkchoiceVersion::V2 => { + engine_client + .fork_choice_updated_v2(new_forkchoice, Some(attributes_envelope.inner)) + .await + } + } + .map_err(|e| { + error!(target: "engine_builder", "Forkchoice update failed: {}", e); + BuildTaskError::EngineBuildError(EngineBuildError::AttributesInsertionFailed(e)) + })?; + + match update.payload_status.status { + PayloadStatusEnum::Valid => { + debug!( + target: "engine_builder", + unsafe_hash = new_forkchoice.head_block_hash.to_string(), + safe_hash = new_forkchoice.safe_block_hash.to_string(), + finalized_hash = new_forkchoice.finalized_block_hash.to_string(), + "Forkchoice update with attributes successful" + ); + } + PayloadStatusEnum::Invalid { validation_error } => { + error!(target: "engine_builder", "Forkchoice update failed: {}", validation_error); + return Err(BuildTaskError::EngineBuildError(EngineBuildError::InvalidPayload( + validation_error, + ))); + } + PayloadStatusEnum::Syncing => { + warn!(target: "engine_builder", "Forkchoice update failed temporarily: EL is syncing"); + return Err(BuildTaskError::EngineBuildError(EngineBuildError::EngineSyncing)); + } + s => { + // Other codes are never returned by `engine_forkchoiceUpdate` + return Err(BuildTaskError::EngineBuildError( + EngineBuildError::UnexpectedPayloadStatus(s), + )); + } + } + + // Fetch the payload ID from the FCU. If no payload ID was returned, something went wrong - + // the block building job on the EL should have been initiated. + update + .payload_id + .ok_or(BuildTaskError::EngineBuildError(EngineBuildError::MissingPayloadId)) + } + /// Fetches the execution payload from the EL. /// /// ## Engine Method Selection @@ -129,18 +236,8 @@ impl EngineTaskExt for BuildTask { // Start the build by sending an FCU call with the current forkchoice and the input // payload attributes. let fcu_start_time = Instant::now(); - let payload_id = ForkchoiceTask::new( - self.engine.clone(), - self.cfg.clone(), - EngineSyncStateUpdate { - unsafe_head: Some(self.attributes.parent), - ..Default::default() - }, - Some(self.attributes.clone()), - ) - .execute(state) - .await? - .ok_or(BuildTaskError::MissingPayloadId)?; + let payload_id = self.start_build(state, &self.engine, self.attributes.clone()).await?; + let fcu_duration = fcu_start_time.elapsed(); // Fetch the payload just inserted from the EL and import it into the engine. @@ -166,18 +263,17 @@ impl EngineTaskExt for BuildTask { .execute(state) .await { - Err(InsertTaskError::ForkchoiceUpdateFailed( - ForkchoiceTaskError::InvalidPayloadStatus(e), - )) if self.attributes.is_deposits_only() => { + Err(InsertTaskError::UnexpectedPayloadStatus(e)) + if self.attributes.is_deposits_only() => + { error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed"); return Err(BuildTaskError::DepositOnlyPayloadFailed) } // HOLOCENE: Re-attempt payload import with deposits only - Err(InsertTaskError::ForkchoiceUpdateFailed( - ForkchoiceTaskError::InvalidPayloadStatus(e), - )) if self - .cfg - .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) => + Err(InsertTaskError::UnexpectedPayloadStatus(e)) + if self + .cfg + .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) => { warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only."); // HOLOCENE: Re-attempt payload import with deposits only diff --git a/crates/node/engine/src/task_queue/tasks/consolidate/error.rs b/crates/node/engine/src/task_queue/tasks/consolidate/error.rs index 098137748a..7eb9a41097 100644 --- a/crates/node/engine/src/task_queue/tasks/consolidate/error.rs +++ b/crates/node/engine/src/task_queue/tasks/consolidate/error.rs @@ -1,7 +1,7 @@ //! Contains error types for the [`crate::ConsolidateTask`]. use crate::{ - BuildTaskError, EngineTaskError, ForkchoiceTaskError, + BuildTaskError, EngineTaskError, SynchronizeTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, }; use thiserror::Error; @@ -20,7 +20,7 @@ pub enum ConsolidateTaskError { BuildTaskFailed(#[from] BuildTaskError), /// The consolidation forkchoice update call to the engine api failed. #[error(transparent)] - ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), + ForkchoiceUpdateFailed(#[from] SynchronizeTaskError), } impl EngineTaskError for ConsolidateTaskError { diff --git a/crates/node/engine/src/task_queue/tasks/consolidate/task.rs b/crates/node/engine/src/task_queue/tasks/consolidate/task.rs index 1e5ba983c1..3c76eeb8c3 100644 --- a/crates/node/engine/src/task_queue/tasks/consolidate/task.rs +++ b/crates/node/engine/src/task_queue/tasks/consolidate/task.rs @@ -1,7 +1,7 @@ //! A task to consolidate the engine state. use crate::{ - BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, ForkchoiceTask, + BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, SynchronizeTask, state::EngineSyncStateUpdate, }; use async_trait::async_trait; @@ -112,7 +112,7 @@ impl ConsolidateTask { Ok(block_info) => { let fcu_start = Instant::now(); - ForkchoiceTask::new( + SynchronizeTask::new( Arc::clone(&self.client), self.cfg.clone(), EngineSyncStateUpdate { @@ -120,7 +120,6 @@ impl ConsolidateTask { local_safe_head: Some(block_info), ..Default::default() }, - None, ) .execute(state) .await diff --git a/crates/node/engine/src/task_queue/tasks/finalize/error.rs b/crates/node/engine/src/task_queue/tasks/finalize/error.rs index 31ad81fff6..60a11b4da9 100644 --- a/crates/node/engine/src/task_queue/tasks/finalize/error.rs +++ b/crates/node/engine/src/task_queue/tasks/finalize/error.rs @@ -1,7 +1,7 @@ //! Contains error types for the [crate::FinalizeTask]. use crate::{ - EngineTaskError, ForkchoiceTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, + EngineTaskError, SynchronizeTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, }; use alloy_transport::{RpcError, TransportErrorKind}; use kona_protocol::FromBlockError; @@ -26,7 +26,7 @@ pub enum FinalizeTaskError { TransportError(#[from] RpcError), /// The forkchoice update call to finalize the block failed. #[error(transparent)] - ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), + ForkchoiceUpdateFailed(#[from] SynchronizeTaskError), } impl EngineTaskError for FinalizeTaskError { diff --git a/crates/node/engine/src/task_queue/tasks/finalize/task.rs b/crates/node/engine/src/task_queue/tasks/finalize/task.rs index e0414c8523..adaa91f54d 100644 --- a/crates/node/engine/src/task_queue/tasks/finalize/task.rs +++ b/crates/node/engine/src/task_queue/tasks/finalize/task.rs @@ -1,7 +1,7 @@ //! A task for finalizing an L2 block. use crate::{ - EngineClient, EngineState, EngineTaskExt, FinalizeTaskError, ForkchoiceTask, + EngineClient, EngineState, EngineTaskExt, FinalizeTaskError, SynchronizeTask, state::EngineSyncStateUpdate, }; use alloy_provider::Provider; @@ -23,7 +23,7 @@ pub struct FinalizeTask { } impl FinalizeTask { - /// Creates a new [`ForkchoiceTask`]. + /// Creates a new [`SynchronizeTask`]. pub const fn new(client: Arc, cfg: Arc, block_number: u64) -> Self { Self { client, cfg, block_number } } @@ -57,11 +57,10 @@ impl EngineTaskExt for FinalizeTask { // Dispatch a forkchoice update. let fcu_start = Instant::now(); - ForkchoiceTask::new( + SynchronizeTask::new( self.client.clone(), self.cfg.clone(), EngineSyncStateUpdate { finalized_head: Some(block_info), ..Default::default() }, - None, ) .execute(state) .await?; diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs deleted file mode 100644 index 5fdd5a78d9..0000000000 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs +++ /dev/null @@ -1,168 +0,0 @@ -//! A task for the `engine_forkchoiceUpdated` method, with no attributes. - -use crate::{ - EngineClient, EngineForkchoiceVersion, EngineState, EngineTaskExt, ForkchoiceTaskError, - state::EngineSyncStateUpdate, -}; -use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadId, PayloadStatusEnum}; -use async_trait::async_trait; -use kona_genesis::RollupConfig; -use kona_protocol::OpAttributesWithParent; -use op_alloy_provider::ext::engine::OpEngineApi; -use std::sync::Arc; -use tokio::time::Instant; - -/// The [`ForkchoiceTask`] executes an `engine_forkchoiceUpdated` call with the current -/// [`EngineState`]'s forkchoice, and no payload attributes. -#[derive(Debug, Clone)] -pub struct ForkchoiceTask { - /// The engine client. - pub client: Arc, - /// The rollup config. - pub rollup: Arc, - /// Optional payload attributes to be used for the forkchoice update. - pub envelope: Option, - /// The sync state update to apply to the engine state. - pub state_update: EngineSyncStateUpdate, -} - -impl ForkchoiceTask { - /// Creates a new [`ForkchoiceTask`]. - pub const fn new( - client: Arc, - rollup: Arc, - state_update: EngineSyncStateUpdate, - payload_attributes: Option, - ) -> Self { - Self { client, rollup, envelope: payload_attributes, state_update } - } - - /// Checks the response of the `engine_forkchoiceUpdated` call, and updates the sync status if - /// necessary. - fn check_forkchoice_updated_status( - &self, - state: &mut EngineState, - status: &PayloadStatusEnum, - ) -> Result<(), ForkchoiceTaskError> { - match status { - PayloadStatusEnum::Valid => { - if !state.el_sync_finished { - info!( - target: "engine", - "Finished execution layer sync." - ); - state.el_sync_finished = true; - } - - Ok(()) - } - PayloadStatusEnum::Syncing => { - if self.envelope.is_some() { - // If we're building a new payload, we should retry the FCU once the engine is - // done syncing. - debug!(target: "engine", "Build initiation FCU failed temporarily: EL is syncing"); - Err(ForkchoiceTaskError::EngineSyncing) - } else { - // If we're not building a new payload, we're driving EL sync. - Ok(()) - } - } - PayloadStatusEnum::Invalid { validation_error } => { - error!(target: "engine", "Forkchoice update failed: {}", validation_error); - Err(ForkchoiceTaskError::InvalidPayloadStatus(validation_error.clone())) - } - s => { - // Other codes are never returned by `engine_forkchoiceUpdate` - Err(ForkchoiceTaskError::UnexpectedPayloadStatus(s.clone())) - } - } - } -} - -#[async_trait] -impl EngineTaskExt for ForkchoiceTask { - type Output = Option; - type Error = ForkchoiceTaskError; - - async fn execute(&self, state: &mut EngineState) -> Result { - // Apply the sync state update to the engine state. - let new_sync_state = state.sync_state.apply_update(self.state_update); - - // Check if a forkchoice update is not needed, return early. - // A forkchoice update is not needed if... - // 1. The engine state is not default (initial forkchoice state has been emitted), and - // 2. The new sync state is the same as the current sync state (no changes to the sync - // state). - if state.sync_state != Default::default() && - state.sync_state == new_sync_state && - self.envelope.is_none() - { - return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded); - } - - // Check if the head is behind the finalized head. - if new_sync_state.unsafe_head().block_info.number < - new_sync_state.finalized_head().block_info.number - { - return Err(ForkchoiceTaskError::FinalizedAheadOfUnsafe( - new_sync_state.unsafe_head().block_info.number, - new_sync_state.finalized_head().block_info.number, - )); - } - - let fcu_time_start = Instant::now(); - - // Determine the forkchoice version to use. - // Note that if the envelope is not provided, we use the forkchoice version from the - // timestamp zero. The version number in `fork_choice_updated_v*` - // methods only matters for the payload attributes. - let version = EngineForkchoiceVersion::from_cfg( - &self.rollup, - self.envelope.as_ref().map(|p| p.inner.payload_attributes.timestamp).unwrap_or(0), - ); - - // TODO(@theochap, ``): we should avoid cloning the payload attributes here. - let payload_attributes = self.envelope.as_ref().map(|p| p.inner()).cloned(); - - // Send the forkchoice update through the input. - let forkchoice = new_sync_state.create_forkchoice_state(); - - // Handle the forkchoice update result. - let response = match version { - EngineForkchoiceVersion::V2 => { - self.client.fork_choice_updated_v2(forkchoice, payload_attributes).await - } - EngineForkchoiceVersion::V3 => { - self.client.fork_choice_updated_v3(forkchoice, payload_attributes).await - } - }; - - let valid_response = response.map_err(|e| { - // Fatal forkchoice update error. - e.as_error_resp() - .and_then(|e| { - (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64) - .then_some(ForkchoiceTaskError::InvalidForkchoiceState) - }) - .unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e)) - })?; - - // Unexpected forkchoice payload status. - // We may be able to recover from this by resetting the engine. - self.check_forkchoice_updated_status(state, &valid_response.payload_status.status)?; - - // Apply the new sync state to the engine state. - state.sync_state = new_sync_state; - - let fcu_duration = fcu_time_start.elapsed(); - debug!( - target: "engine", - fcu_duration = ?fcu_duration, - forkchoice = ?forkchoice, - response = ?valid_response, - "Forkchoice updated" - ); - - Ok(valid_response.payload_id) - } -} diff --git a/crates/node/engine/src/task_queue/tasks/insert/error.rs b/crates/node/engine/src/task_queue/tasks/insert/error.rs index 2233cbe371..6126625a86 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/error.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/error.rs @@ -3,7 +3,7 @@ //! [InsertTask]: crate::InsertTask use crate::{ - EngineTaskError, ForkchoiceTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, + EngineTaskError, SynchronizeTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, }; use alloy_rpc_types_engine::PayloadStatusEnum; use alloy_transport::{RpcError, TransportErrorKind}; @@ -32,7 +32,7 @@ pub enum InsertTaskError { L2BlockInfoConstruction(#[from] FromBlockError), /// The forkchoice update call to consolidate the block into the engine state failed. #[error(transparent)] - ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), + ForkchoiceUpdateFailed(#[from] SynchronizeTaskError), } impl EngineTaskError for InsertTaskError { diff --git a/crates/node/engine/src/task_queue/tasks/insert/task.rs b/crates/node/engine/src/task_queue/tasks/insert/task.rs index 406fe42199..cde6cf6278 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/task.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/task.rs @@ -1,7 +1,7 @@ //! A task to insert an unsafe payload into the execution engine. use crate::{ - EngineClient, EngineState, EngineTaskExt, ForkchoiceTask, InsertTaskError, + EngineClient, EngineState, EngineTaskExt, InsertTaskError, SynchronizeTask, state::EngineSyncStateUpdate, }; use alloy_eips::eip7685::EMPTY_REQUESTS_HASH; @@ -124,7 +124,7 @@ impl EngineTaskExt for InsertTask { .map_err(InsertTaskError::L2BlockInfoConstruction)?; // Send a FCU to canonicalize the imported block. - ForkchoiceTask::new( + SynchronizeTask::new( Arc::clone(&self.client), self.rollup_config.clone(), EngineSyncStateUpdate { @@ -134,7 +134,6 @@ impl EngineTaskExt for InsertTask { safe_head: self.is_payload_safe.then_some(new_unsafe_ref), ..Default::default() }, - None, ) .execute(state) .await?; diff --git a/crates/node/engine/src/task_queue/tasks/mod.rs b/crates/node/engine/src/task_queue/tasks/mod.rs index 81b44794f8..c59a9bba38 100644 --- a/crates/node/engine/src/task_queue/tasks/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/mod.rs @@ -5,14 +5,14 @@ pub use task::{ EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, EngineTaskExt, }; -mod forkchoice; -pub use forkchoice::{ForkchoiceTask, ForkchoiceTaskError}; +mod synchronize; +pub use synchronize::{SynchronizeTask, SynchronizeTaskError}; mod insert; pub use insert::{InsertTask, InsertTaskError}; mod build; -pub use build::{BuildTask, BuildTaskError}; +pub use build::{BuildTask, BuildTaskError, EngineBuildError}; mod consolidate; pub use consolidate::{ConsolidateTask, ConsolidateTaskError}; diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/error.rs b/crates/node/engine/src/task_queue/tasks/synchronize/error.rs similarity index 57% rename from crates/node/engine/src/task_queue/tasks/forkchoice/error.rs rename to crates/node/engine/src/task_queue/tasks/synchronize/error.rs index 602079c123..2b3b971204 100644 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/error.rs +++ b/crates/node/engine/src/task_queue/tasks/synchronize/error.rs @@ -1,21 +1,15 @@ -//! Contains error types for the [crate::ForkchoiceTask]. +//! Contains error types for the [crate::SynchronizeTask]. use crate::{EngineTaskError, task_queue::tasks::task::EngineTaskErrorSeverity}; use alloy_rpc_types_engine::PayloadStatusEnum; use alloy_transport::{RpcError, TransportErrorKind}; use thiserror::Error; -/// An error that occurs when running the [crate::ForkchoiceTask]. +/// An error that occurs when running the [crate::SynchronizeTask]. #[derive(Debug, Error)] -pub enum ForkchoiceTaskError { - /// The forkchoice update is not needed. - #[error("No forkchoice update needed")] - NoForkchoiceUpdateNeeded, - /// The engine is syncing. - #[error("Attempting to update forkchoice state while EL syncing")] - EngineSyncing, +pub enum SynchronizeTaskError { /// The forkchoice update call to the engine api failed. - #[error("Forkchoice update engine api call failed")] + #[error("Forkchoice update engine api call failed due to an RPC error: {0}")] ForkchoiceUpdateFailed(RpcError), /// The finalized head is behind the unsafe head. #[error("Invalid forkchoice state: unsafe head {0} is ahead of finalized head {1}")] @@ -23,24 +17,18 @@ pub enum ForkchoiceTaskError { /// The forkchoice state is invalid. #[error("Invalid forkchoice state")] InvalidForkchoiceState, - /// The payload status is invalid. - #[error("Invalid payload status: {0}")] - InvalidPayloadStatus(String), /// The payload status is unexpected. #[error("Unexpected payload status: {0}")] UnexpectedPayloadStatus(PayloadStatusEnum), } -impl EngineTaskError for ForkchoiceTaskError { +impl EngineTaskError for SynchronizeTaskError { fn severity(&self) -> EngineTaskErrorSeverity { match self { - Self::NoForkchoiceUpdateNeeded => EngineTaskErrorSeverity::Temporary, - Self::EngineSyncing => EngineTaskErrorSeverity::Temporary, - Self::ForkchoiceUpdateFailed(_) => EngineTaskErrorSeverity::Temporary, Self::FinalizedAheadOfUnsafe(_, _) => EngineTaskErrorSeverity::Critical, - Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical, + Self::ForkchoiceUpdateFailed(_) => EngineTaskErrorSeverity::Temporary, + Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Temporary, Self::InvalidForkchoiceState => EngineTaskErrorSeverity::Reset, - Self::InvalidPayloadStatus(_) => EngineTaskErrorSeverity::Reset, } } } diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/mod.rs b/crates/node/engine/src/task_queue/tasks/synchronize/mod.rs similarity index 57% rename from crates/node/engine/src/task_queue/tasks/forkchoice/mod.rs rename to crates/node/engine/src/task_queue/tasks/synchronize/mod.rs index fdea6df13c..0c4cf2ccd8 100644 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/synchronize/mod.rs @@ -1,7 +1,7 @@ //! Task and its associated types for the forkchoice engine update. mod task; -pub use task::ForkchoiceTask; +pub use task::SynchronizeTask; mod error; -pub use error::ForkchoiceTaskError; +pub use error::SynchronizeTaskError; diff --git a/crates/node/engine/src/task_queue/tasks/synchronize/task.rs b/crates/node/engine/src/task_queue/tasks/synchronize/task.rs new file mode 100644 index 0000000000..030bb3ac22 --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/synchronize/task.rs @@ -0,0 +1,138 @@ +//! A task for the `engine_forkchoiceUpdated` method, with no attributes. + +use crate::{ + EngineClient, EngineState, EngineTaskExt, SynchronizeTaskError, state::EngineSyncStateUpdate, +}; +use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadStatusEnum}; +use async_trait::async_trait; +use kona_genesis::RollupConfig; +use op_alloy_provider::ext::engine::OpEngineApi; +use std::sync::Arc; +use tokio::time::Instant; + +/// The [`SynchronizeTask`] executes an `engine_forkchoiceUpdated` call with the current +/// [`EngineState`]'s forkchoice, and no payload attributes. +#[derive(Debug, Clone)] +pub struct SynchronizeTask { + /// The engine client. + pub client: Arc, + /// The rollup config. + pub rollup: Arc, + /// The sync state update to apply to the engine state. + pub state_update: EngineSyncStateUpdate, +} + +impl SynchronizeTask { + /// Creates a new [`SynchronizeTask`]. + pub const fn new( + client: Arc, + rollup: Arc, + state_update: EngineSyncStateUpdate, + ) -> Self { + Self { client, rollup, state_update } + } + + /// Checks the response of the `engine_forkchoiceUpdated` call, and updates the sync status if + /// necessary. + fn check_forkchoice_updated_status( + &self, + state: &mut EngineState, + status: &PayloadStatusEnum, + ) -> Result<(), SynchronizeTaskError> { + match status { + PayloadStatusEnum::Valid => { + if !state.el_sync_finished { + info!( + target: "engine", + "Finished execution layer sync." + ); + state.el_sync_finished = true; + } + + Ok(()) + } + PayloadStatusEnum::Syncing => { + // If we're not building a new payload, we're driving EL sync. + debug!(target: "engine", "Attempting to update forkchoice state while EL syncing"); + Ok(()) + } + s => { + // Other codes are not expected. + Err(SynchronizeTaskError::UnexpectedPayloadStatus(s.clone())) + } + } + } +} + +#[async_trait] +impl EngineTaskExt for SynchronizeTask { + type Output = (); + type Error = SynchronizeTaskError; + + async fn execute(&self, state: &mut EngineState) -> Result { + // Apply the sync state update to the engine state. + let new_sync_state = state.sync_state.apply_update(self.state_update); + + // Check if a forkchoice update is not needed, return early. + // A forkchoice update is not needed if... + // 1. The engine state is not default (initial forkchoice state has been emitted), and + // 2. The new sync state is the same as the current sync state (no changes to the sync + // state). + // + // NOTE: + // We shouldn't retry the synchronize task there. Since the `sync_state` is only updated + // inside the `SynchronizeTask` (except inside the ConsolidateTask, when the block is not + // the last in the batch) - the engine will get stuck retrying the `SynchronizeTask` + if state.sync_state != Default::default() && state.sync_state == new_sync_state { + debug!(target: "engine", ?new_sync_state, "No forkchoice update needed"); + return Ok(()); + } + + // Check if the head is behind the finalized head. + if new_sync_state.unsafe_head().block_info.number < + new_sync_state.finalized_head().block_info.number + { + return Err(SynchronizeTaskError::FinalizedAheadOfUnsafe( + new_sync_state.unsafe_head().block_info.number, + new_sync_state.finalized_head().block_info.number, + )); + } + + let fcu_time_start = Instant::now(); + + // Send the forkchoice update through the input. + let forkchoice = new_sync_state.create_forkchoice_state(); + + // Handle the forkchoice update result. + // NOTE: it doesn't matter which version we use here, because we're not sending any + // payload attributes. The forkchoice updated call is version agnostic if no payload + // attributes are provided. + let response = self.client.fork_choice_updated_v3(forkchoice, None).await; + + let valid_response = response.map_err(|e| { + // Fatal forkchoice update error. + e.as_error_resp() + .and_then(|e| { + (e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64) + .then_some(SynchronizeTaskError::InvalidForkchoiceState) + }) + .unwrap_or_else(|| SynchronizeTaskError::ForkchoiceUpdateFailed(e)) + })?; + + self.check_forkchoice_updated_status(state, &valid_response.payload_status.status)?; + + // Apply the new sync state to the engine state. + state.sync_state = new_sync_state; + + let fcu_duration = fcu_time_start.elapsed(); + debug!( + target: "engine", + fcu_duration = ?fcu_duration, + forkchoice = ?forkchoice, + response = ?valid_response, + "Forkchoice updated" + ); + + Ok(()) + } +} diff --git a/crates/node/engine/src/task_queue/tasks/task.rs b/crates/node/engine/src/task_queue/tasks/task.rs index c3893f55ec..cfc6d91343 100644 --- a/crates/node/engine/src/task_queue/tasks/task.rs +++ b/crates/node/engine/src/task_queue/tasks/task.rs @@ -2,10 +2,9 @@ //! //! [`Engine`]: crate::Engine -use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertTask}; +use super::{BuildTask, ConsolidateTask, FinalizeTask, InsertTask}; use crate::{ - BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, ForkchoiceTaskError, - InsertTaskError, + BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, InsertTaskError, }; use async_trait::async_trait; use derive_more::Display; @@ -56,9 +55,6 @@ pub trait EngineTaskExt { /// An error that may occur during an [`EngineTask`]'s execution. #[derive(Error, Debug)] pub enum EngineTaskErrors { - /// An error that occurred while updating the forkchoice state. - #[error(transparent)] - Forkchoice(#[from] ForkchoiceTaskError), /// An error that occurred while inserting a block into the engine. #[error(transparent)] Insert(#[from] InsertTaskError), @@ -76,7 +72,6 @@ pub enum EngineTaskErrors { impl EngineTaskError for EngineTaskErrors { fn severity(&self) -> EngineTaskErrorSeverity { match self { - Self::Forkchoice(inner) => inner.severity(), Self::Insert(inner) => inner.severity(), Self::Build(inner) => inner.severity(), Self::Consolidate(inner) => inner.severity(), @@ -90,9 +85,6 @@ impl EngineTaskError for EngineTaskErrors { /// [`Engine`]: crate::Engine #[derive(Debug, Clone)] pub enum EngineTask { - /// Perform a `engine_forkchoiceUpdated` call with the current [`EngineState`]'s forkchoice, - /// and no payload attributes. - ForkchoiceUpdate(ForkchoiceTask), /// Inserts a payload into the execution engine. Insert(InsertTask), /// Builds a new block with the given attributes, and inserts it into the execution engine. @@ -108,7 +100,6 @@ impl EngineTask { /// Executes the task without consuming it. async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskErrors> { match self.clone() { - Self::ForkchoiceUpdate(task) => task.execute(state).await.map(|_| ())?, Self::Insert(task) => task.execute(state).await?, Self::Build(task) => task.execute(state).await?, Self::Consolidate(task) => task.execute(state).await?, @@ -123,7 +114,6 @@ impl EngineTask { Self::Insert(_) => crate::Metrics::INSERT_TASK_LABEL, Self::Consolidate(_) => crate::Metrics::CONSOLIDATE_TASK_LABEL, Self::Build(_) => crate::Metrics::BUILD_TASK_LABEL, - Self::ForkchoiceUpdate(_) => crate::Metrics::FORKCHOICE_TASK_LABEL, Self::Finalize(_) => crate::Metrics::FINALIZE_TASK_LABEL, } } @@ -133,8 +123,7 @@ impl PartialEq for EngineTask { fn eq(&self, other: &Self) -> bool { matches!( (self, other), - (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) | - (Self::Insert(_), Self::Insert(_)) | + (Self::Insert(_), Self::Insert(_)) | (Self::Build(_), Self::Build(_)) | (Self::Consolidate(_), Self::Consolidate(_)) | (Self::Finalize(_), Self::Finalize(_)) @@ -168,13 +157,8 @@ impl Ord for EngineTask { (Self::Insert(_), Self::Insert(_)) => Ordering::Equal, (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal, (Self::Build(_), Self::Build(_)) => Ordering::Equal, - (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) => Ordering::Equal, (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal, - // Individual ForkchoiceUpdate tasks are the highest priority - (Self::ForkchoiceUpdate(_), _) => Ordering::Greater, - (_, Self::ForkchoiceUpdate(_)) => Ordering::Less, - // BuildBlock tasks are prioritized over InsertUnsafe and Consolidate tasks (Self::Build(_), _) => Ordering::Greater, (_, Self::Build(_)) => Ordering::Less, diff --git a/docs/docs/pages/node/design/engine.mdx b/docs/docs/pages/node/design/engine.mdx index 36064376cd..0c0770627a 100644 --- a/docs/docs/pages/node/design/engine.mdx +++ b/docs/docs/pages/node/design/engine.mdx @@ -65,12 +65,12 @@ The engine uses a priority-based task queue where tasks are ordered according to ### Task Types -#### ForkchoiceTask +#### SynchronizeTask Updates the execution layer's forkchoice state: ```rust -pub struct ForkchoiceTask { +pub struct SynchronizeTask { pub client: Arc, pub rollup: Arc, pub envelope: Option, @@ -226,7 +226,7 @@ let engine = Engine::new(state, state_sender); ```rust // Add a forkchoice update task -let task = EngineTask::ForkchoiceUpdate(ForkchoiceTask::new( +let task = EngineTask::ForkchoiceUpdate(SynchronizeTask::new( client.clone(), rollup_config.clone(), state_update,