diff --git a/crates/node/engine/src/lib.rs b/crates/node/engine/src/lib.rs index 2ef6256cd5..ba13fafaac 100644 --- a/crates/node/engine/src/lib.rs +++ b/crates/node/engine/src/lib.rs @@ -12,8 +12,9 @@ extern crate tracing; mod task_queue; pub use task_queue::{ BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError, - EngineTask, EngineTaskError, EngineTaskExt, FinalizeTask, FinalizeTaskError, ForkchoiceTask, - ForkchoiceTaskError, InsertUnsafeTask, InsertUnsafeTaskError, + EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, EngineTaskExt, + FinalizeTask, FinalizeTaskError, ForkchoiceTask, ForkchoiceTaskError, InsertTask, + InsertTaskError, }; mod attributes; diff --git a/crates/node/engine/src/task_queue/core.rs b/crates/node/engine/src/task_queue/core.rs index 4e97ffd5e8..04a6b394cd 100644 --- a/crates/node/engine/src/task_queue/core.rs +++ b/crates/node/engine/src/task_queue/core.rs @@ -1,8 +1,9 @@ //! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s. -use super::{EngineTaskError, EngineTaskExt}; +use super::EngineTaskExt; use crate::{ EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, ForkchoiceTask, Metrics, + task_queue::EngineTaskErrors, }; use alloy_provider::Provider; use alloy_rpc_types_eth::Transaction; @@ -88,7 +89,8 @@ impl Engine { None, ) .execute(&mut self.state) - .await?; + .await + .map_err(EngineTaskErrors::Forkchoice)?; // Find the new safe head's L1 origin and SystemConfig. let origin_block = start @@ -128,7 +130,7 @@ impl Engine { /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns /// an error along the way, it is not popped from the queue (in case it must be retried) and /// the error is returned. - pub async fn drain(&mut self) -> Result<(), EngineTaskError> { + pub async fn drain(&mut self) -> Result<(), EngineTaskErrors> { // Drain tasks in order of priority, halting on errors for a retry to be attempted. while let Some(task) = self.tasks.peek() { // Execute the task @@ -148,9 +150,9 @@ impl Engine { /// An error occurred while attempting to reset the [`Engine`]. #[derive(Debug, Error)] pub enum EngineResetError { - /// An error that originated from within the engine task. + /// An error that originated from within an engine task. #[error(transparent)] - Task(#[from] EngineTaskError), + Task(#[from] EngineTaskErrors), /// An error occurred while traversing the L1 for the sync starting point. #[error(transparent)] SyncStart(#[from] SyncStartError), diff --git a/crates/node/engine/src/task_queue/tasks/build/error.rs b/crates/node/engine/src/task_queue/tasks/build/error.rs index 2c9b0c5d9c..49bf7db55e 100644 --- a/crates/node/engine/src/task_queue/tasks/build/error.rs +++ b/crates/node/engine/src/task_queue/tasks/build/error.rs @@ -1,6 +1,9 @@ //! Contains error types for the [crate::ForkchoiceTask]. -use crate::EngineTaskError; +use crate::{ + EngineTaskError, ForkchoiceTaskError, InsertTaskError, + task_queue::tasks::task::EngineTaskErrorSeverity, +}; use alloy_rpc_types_engine::PayloadStatusEnum; use alloy_transport::{RpcError, TransportErrorKind}; use kona_protocol::FromBlockError; @@ -20,6 +23,12 @@ pub enum BuildTaskError { /// Missing payload ID. #[error("Missing payload ID")] MissingPayloadId, + /// The initial forkchoice update call to the engine api failed. + #[error(transparent)] + ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), + /// Impossible to insert the payload into the engine. + #[error(transparent)] + PayloadInsertionFailed(#[from] InsertTaskError), /// Unexpected payload status #[error("Unexpected payload status: {0}")] UnexpectedPayloadStatus(PayloadStatusEnum), @@ -50,20 +59,22 @@ pub enum BuildTaskError { MpscSend(#[from] mpsc::error::SendError), } -impl From for EngineTaskError { - fn from(value: BuildTaskError) -> Self { - match value { - BuildTaskError::NoForkchoiceUpdateNeeded => Self::Temporary(Box::new(value)), - BuildTaskError::EngineSyncing => Self::Temporary(Box::new(value)), - BuildTaskError::GetPayloadFailed(_) => Self::Temporary(Box::new(value)), - BuildTaskError::NewPayloadFailed(_) => Self::Temporary(Box::new(value)), - BuildTaskError::HoloceneInvalidFlush => Self::Flush(Box::new(value)), - BuildTaskError::MissingPayloadId => Self::Critical(Box::new(value)), - BuildTaskError::UnexpectedPayloadStatus(_) => Self::Critical(Box::new(value)), - BuildTaskError::DepositOnlyPayloadReattemptFailed => Self::Critical(Box::new(value)), - BuildTaskError::DepositOnlyPayloadFailed => Self::Critical(Box::new(value)), - BuildTaskError::FromBlock(_) => Self::Critical(Box::new(value)), - BuildTaskError::MpscSend(_) => Self::Critical(Box::new(value)), +impl EngineTaskError for BuildTaskError { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::ForkchoiceUpdateFailed(inner) => inner.severity(), + Self::PayloadInsertionFailed(inner) => inner.severity(), + Self::NoForkchoiceUpdateNeeded => EngineTaskErrorSeverity::Temporary, + Self::EngineSyncing => EngineTaskErrorSeverity::Temporary, + Self::GetPayloadFailed(_) => EngineTaskErrorSeverity::Temporary, + Self::NewPayloadFailed(_) => EngineTaskErrorSeverity::Temporary, + Self::HoloceneInvalidFlush => EngineTaskErrorSeverity::Flush, + Self::MissingPayloadId => EngineTaskErrorSeverity::Critical, + Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical, + Self::DepositOnlyPayloadReattemptFailed => EngineTaskErrorSeverity::Critical, + Self::DepositOnlyPayloadFailed => EngineTaskErrorSeverity::Critical, + Self::FromBlock(_) => EngineTaskErrorSeverity::Critical, + Self::MpscSend(_) => EngineTaskErrorSeverity::Critical, } } } diff --git a/crates/node/engine/src/task_queue/tasks/build/task.rs b/crates/node/engine/src/task_queue/tasks/build/task.rs index 71ae45d1d6..bdcbe28bd9 100644 --- a/crates/node/engine/src/task_queue/tasks/build/task.rs +++ b/crates/node/engine/src/task_queue/tasks/build/task.rs @@ -1,15 +1,13 @@ //! A task for building a new block and importing it. - use super::BuildTaskError; use crate::{ - EngineClient, EngineGetPayloadVersion, EngineState, EngineTaskError, EngineTaskExt, - ForkchoiceTask, Metrics, state::EngineSyncStateUpdate, -}; -use alloy_provider::ext::EngineApi; -use alloy_rpc_types_engine::{ - ExecutionPayloadFieldV2, ExecutionPayloadInputV2, PayloadId, PayloadStatusEnum, + EngineClient, EngineGetPayloadVersion, EngineState, EngineTaskExt, ForkchoiceTask, + ForkchoiceTaskError, InsertTask, + InsertTaskError::{self}, + Metrics, + state::EngineSyncStateUpdate, }; -use alloy_transport::RpcError; +use alloy_rpc_types_engine::{ExecutionPayload, PayloadId}; use async_trait::async_trait; use kona_genesis::RollupConfig; use kona_protocol::{L2BlockInfo, OpAttributesWithParent}; @@ -46,8 +44,7 @@ impl BuildTask { Self { engine, cfg, attributes, is_attributes_derived, payload_tx } } - /// Fetches the execution payload from the EL and imports it into the engine via - /// `engine_newPayload`. + /// Fetches the execution payload from the EL. /// /// ## Engine Method Selection /// The method used to fetch the payload from the EL is determined by the payload timestamp. The @@ -56,14 +53,13 @@ impl BuildTask { /// - `engine_getPayloadV2` is used for payloads with a timestamp before the Ecotone fork. /// - `engine_getPayloadV3` is used for payloads with a timestamp after the Ecotone fork. /// - `engine_getPayloadV4` is used for payloads with a timestamp after the Isthmus fork. - async fn fetch_and_import_payload( + async fn fetch_payload( &self, - state: &mut EngineState, cfg: &RollupConfig, engine: &EngineClient, payload_id: PayloadId, payload_attrs: OpAttributesWithParent, - ) -> Result<(OpExecutionPayloadEnvelope, L2BlockInfo), BuildTaskError> { + ) -> Result { let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp; debug!( @@ -74,148 +70,47 @@ impl BuildTask { ); let get_payload_version = EngineGetPayloadVersion::from_cfg(cfg, payload_timestamp); - let (payload_envelope, response) = match get_payload_version { + let payload_envelope = match get_payload_version { EngineGetPayloadVersion::V4 => { let payload = engine.get_payload_v4(payload_id).await.map_err(|e| { error!(target: "engine_builder", "Payload fetch failed: {e}"); BuildTaskError::GetPayloadFailed(e) })?; - let response = engine - .new_payload_v4( - payload.execution_payload.clone(), - payload.parent_beacon_block_root, - ) - .await - .map_err(|e| { - error!(target: "engine_builder", "Payload import failed: {e}"); - BuildTaskError::NewPayloadFailed(e) - })?; - ( - OpExecutionPayloadEnvelope { - parent_beacon_block_root: Some(payload.parent_beacon_block_root), - payload: OpExecutionPayload::V4(payload.execution_payload), - }, - response, - ) + OpExecutionPayloadEnvelope { + parent_beacon_block_root: Some(payload.parent_beacon_block_root), + payload: OpExecutionPayload::V4(payload.execution_payload), + } } EngineGetPayloadVersion::V3 => { let payload = engine.get_payload_v3(payload_id).await.map_err(|e| { error!(target: "engine_builder", "Payload fetch failed: {e}"); BuildTaskError::GetPayloadFailed(e) })?; - let response = engine - .new_payload_v3( - payload.execution_payload.clone(), - payload.parent_beacon_block_root, - ) - .await - .map_err(|e| { - error!(target: "engine_builder", "Payload import failed: {e}"); - BuildTaskError::NewPayloadFailed(e) - })?; - ( - OpExecutionPayloadEnvelope { - parent_beacon_block_root: Some(payload.parent_beacon_block_root), - payload: OpExecutionPayload::V3(payload.execution_payload), - }, - response, - ) + OpExecutionPayloadEnvelope { + parent_beacon_block_root: Some(payload.parent_beacon_block_root), + payload: OpExecutionPayload::V3(payload.execution_payload), + } } EngineGetPayloadVersion::V2 => { let payload = engine.get_payload_v2(payload_id).await.map_err(|e| { error!(target: "engine_builder", "Payload fetch failed: {e}"); BuildTaskError::GetPayloadFailed(e) })?; - match payload.execution_payload { - ExecutionPayloadFieldV2::V2(payload) => { - let payload_input = ExecutionPayloadInputV2 { - execution_payload: payload.payload_inner.clone(), - withdrawals: Some(payload.withdrawals.clone()), - }; - let response = engine.new_payload_v2(payload_input).await.map_err(|e| { - error!(target: "engine_builder", "Payload import failed: {e}"); - BuildTaskError::NewPayloadFailed(e) - })?; - - ( - OpExecutionPayloadEnvelope { - parent_beacon_block_root: None, - payload: OpExecutionPayload::V2(payload), - }, - response, - ) - } - ExecutionPayloadFieldV2::V1(payload) => { - let response = - engine.new_payload_v1(payload.clone()).await.map_err(|e| { - error!(target: "engine_builder", "Payload import failed: {e}"); - BuildTaskError::NewPayloadFailed(e) - })?; - ( - OpExecutionPayloadEnvelope { - parent_beacon_block_root: None, - payload: OpExecutionPayload::V1(payload), - }, - response, - ) - } + OpExecutionPayloadEnvelope { + parent_beacon_block_root: None, + payload: match payload.execution_payload.into_payload() { + ExecutionPayload::V1(payload) => OpExecutionPayload::V1(payload), + ExecutionPayload::V2(payload) => OpExecutionPayload::V2(payload), + _ => unreachable!("the response should be a V1 or V2 payload"), + }, } } }; - match response.status { - PayloadStatusEnum::Valid | PayloadStatusEnum::Syncing => { - debug!(target: "engine_builder", "Payload import successful"); - - Ok(( - payload_envelope.clone(), - L2BlockInfo::from_payload_and_genesis( - payload_envelope.payload, - payload_attrs.inner().payload_attributes.parent_beacon_block_root, - &cfg.genesis, - )?, - )) - } - PayloadStatusEnum::Invalid { validation_error } => { - if payload_attrs.is_deposits_only() { - error!(target: "engine_builder", "Critical: Deposit-only payload import failed: {validation_error}"); - Err(BuildTaskError::DepositOnlyPayloadFailed) - } else if cfg.is_holocene_active(payload_attrs.inner().payload_attributes.timestamp) - { - warn!(target: "engine_builder", "Payload import failed: {validation_error}"); - warn!(target: "engine_builder", "Re-attempting payload import with deposits only."); - // HOLOCENE: Re-attempt payload import with deposits only - match Self::new( - self.engine.clone(), - self.cfg.clone(), - self.attributes.as_deposits_only(), - self.is_attributes_derived, - self.payload_tx.clone(), - ) - .execute(state) - .await - { - Ok(_) => { - info!(target: "engine_builder", "Successfully imported deposits-only payload") - } - Err(_) => return Err(BuildTaskError::DepositOnlyPayloadReattemptFailed), - } - Err(BuildTaskError::HoloceneInvalidFlush) - } else { - error!(target: "engine_builder", "Payload import failed: {validation_error}"); - Err(BuildTaskError::NewPayloadFailed(RpcError::local_usage_str( - &validation_error, - ))) - } - } - s => { - // Other codes are never returned by `engine_newPayload` - Err(BuildTaskError::UnexpectedPayloadStatus(s)) - } - } + Ok(payload_envelope) } } @@ -223,7 +118,9 @@ impl BuildTask { impl EngineTaskExt for BuildTask { type Output = (); - async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + type Error = BuildTaskError; + + async fn execute(&self, state: &mut EngineState) -> Result<(), BuildTaskError> { debug!( target: "engine_builder", txs = self.attributes.inner().transactions.as_ref().map_or(0, |txs| txs.len()), @@ -247,34 +144,71 @@ impl EngineTaskExt for BuildTask { .ok_or(BuildTaskError::MissingPayloadId)?; let fcu_duration = fcu_start_time.elapsed(); - // Fetch the payload from the EL and import it into the engine. + // Fetch the payload just inserted from the EL and import it into the engine. let block_import_start_time = Instant::now(); - let (new_payload, new_block_ref) = self - .fetch_and_import_payload( - state, - &self.cfg, - &self.engine, - payload_id, - self.attributes.clone(), - ) + let new_payload = self + .fetch_payload(&self.cfg, &self.engine, payload_id, self.attributes.clone()) .await?; - let block_import_duration = block_import_start_time.elapsed(); - // Send a FCU to canonicalize the imported block. - ForkchoiceTask::new( + let new_block_ref = L2BlockInfo::from_payload_and_genesis( + new_payload.payload.clone(), + self.attributes.inner().payload_attributes.parent_beacon_block_root, + &self.cfg.genesis, + ) + .map_err(BuildTaskError::FromBlock)?; + + // Insert the new block into the engine. + match InsertTask::new( Arc::clone(&self.engine), self.cfg.clone(), - EngineSyncStateUpdate { - unsafe_head: Some(new_block_ref), - cross_unsafe_head: Some(new_block_ref), - local_safe_head: self.is_attributes_derived.then_some(new_block_ref), - safe_head: self.is_attributes_derived.then_some(new_block_ref), - ..Default::default() - }, - None, + new_payload.clone(), + self.is_attributes_derived, ) .execute(state) - .await?; + .await + { + Err(InsertTaskError::ForkchoiceUpdateFailed( + ForkchoiceTaskError::InvalidPayloadStatus(e), + )) if self.attributes.is_deposits_only() => { + error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed"); + return Err(BuildTaskError::DepositOnlyPayloadFailed) + } + // HOLOCENE: Re-attempt payload import with deposits only + Err(InsertTaskError::ForkchoiceUpdateFailed( + ForkchoiceTaskError::InvalidPayloadStatus(e), + )) if self + .cfg + .is_holocene_active(self.attributes.inner().payload_attributes.timestamp) => + { + warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only."); + // HOLOCENE: Re-attempt payload import with deposits only + match Self::new( + self.engine.clone(), + self.cfg.clone(), + self.attributes.as_deposits_only(), + self.is_attributes_derived, + self.payload_tx.clone(), + ) + .execute(state) + .await + { + Ok(_) => { + info!(target: "engine_builder", "Successfully imported deposits-only payload") + } + Err(_) => return Err(BuildTaskError::DepositOnlyPayloadReattemptFailed), + } + return Err(BuildTaskError::HoloceneInvalidFlush) + } + Err(e) => { + error!(target: "engine_builder", "Payload import failed: {e}"); + return Err(e.into()) + } + Ok(_) => { + info!(target: "engine_builder", "Successfully imported payload") + } + } + + let block_import_duration = block_import_start_time.elapsed(); // If a channel was provided, send the built payload envelope to it. if let Some(tx) = &self.payload_tx { diff --git a/crates/node/engine/src/task_queue/tasks/consolidate/error.rs b/crates/node/engine/src/task_queue/tasks/consolidate/error.rs index 7040836cdc..098137748a 100644 --- a/crates/node/engine/src/task_queue/tasks/consolidate/error.rs +++ b/crates/node/engine/src/task_queue/tasks/consolidate/error.rs @@ -1,6 +1,9 @@ //! Contains error types for the [`crate::ConsolidateTask`]. -use crate::EngineTaskError; +use crate::{ + BuildTaskError, EngineTaskError, ForkchoiceTaskError, + task_queue::tasks::task::EngineTaskErrorSeverity, +}; use thiserror::Error; /// An error that occurs when running the [`crate::ConsolidateTask`]. @@ -12,13 +15,21 @@ pub enum ConsolidateTaskError { /// Failed to fetch the unsafe L2 block. #[error("Failed to fetch the unsafe L2 block")] FailedToFetchUnsafeL2Block, + /// The build task failed. + #[error(transparent)] + BuildTaskFailed(#[from] BuildTaskError), + /// The consolidation forkchoice update call to the engine api failed. + #[error(transparent)] + ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), } -impl From for EngineTaskError { - fn from(value: ConsolidateTaskError) -> Self { - match value { - ConsolidateTaskError::MissingUnsafeL2Block(_) => Self::Reset(Box::new(value)), - ConsolidateTaskError::FailedToFetchUnsafeL2Block => Self::Temporary(Box::new(value)), +impl EngineTaskError for ConsolidateTaskError { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::MissingUnsafeL2Block(_) => EngineTaskErrorSeverity::Reset, + Self::FailedToFetchUnsafeL2Block => EngineTaskErrorSeverity::Temporary, + Self::BuildTaskFailed(inner) => inner.severity(), + Self::ForkchoiceUpdateFailed(inner) => inner.severity(), } } } diff --git a/crates/node/engine/src/task_queue/tasks/consolidate/task.rs b/crates/node/engine/src/task_queue/tasks/consolidate/task.rs index 7d5061429a..2e7f72ce6a 100644 --- a/crates/node/engine/src/task_queue/tasks/consolidate/task.rs +++ b/crates/node/engine/src/task_queue/tasks/consolidate/task.rs @@ -1,8 +1,8 @@ //! A task to consolidate the engine state. use crate::{ - BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskError, EngineTaskExt, - ForkchoiceTask, Metrics, state::EngineSyncStateUpdate, + BuildTask, ConsolidateTaskError, EngineClient, EngineState, EngineTaskExt, ForkchoiceTask, + Metrics, state::EngineSyncStateUpdate, }; use async_trait::async_trait; use kona_genesis::RollupConfig; @@ -38,7 +38,10 @@ impl ConsolidateTask { /// Executes a new [`BuildTask`]. /// This is used when the [`ConsolidateTask`] fails to consolidate the engine state. - async fn execute_build_task(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + async fn execute_build_task( + &self, + state: &mut EngineState, + ) -> Result<(), ConsolidateTaskError> { let build_task = BuildTask::new( self.client.clone(), self.cfg.clone(), @@ -46,11 +49,11 @@ impl ConsolidateTask { self.is_attributes_derived, None, ); - build_task.execute(state).await + Ok(build_task.execute(state).await?) } /// Attempts consolidation on the engine state. - pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + pub async fn consolidate(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> { let global_start = Instant::now(); // Fetch the unsafe l2 block after the attributes parent. @@ -60,11 +63,11 @@ impl ConsolidateTask { Ok(Some(block)) => block, Ok(None) => { warn!(target: "engine", "Received `None` block for {}", block_num); - return Err(ConsolidateTaskError::MissingUnsafeL2Block(block_num).into()); + return Err(ConsolidateTaskError::MissingUnsafeL2Block(block_num)); } Err(_) => { warn!(target: "engine", "Failed to fetch unsafe l2 block for consolidation"); - return Err(ConsolidateTaskError::FailedToFetchUnsafeL2Block.into()); + return Err(ConsolidateTaskError::FailedToFetchUnsafeL2Block); } }; let block_fetch_duration = fetch_start.elapsed(); @@ -162,7 +165,9 @@ impl ConsolidateTask { impl EngineTaskExt for ConsolidateTask { type Output = (); - async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + type Error = ConsolidateTaskError; + + async fn execute(&self, state: &mut EngineState) -> Result<(), ConsolidateTaskError> { // Skip to building the payload attributes if consolidation is not needed. if state.sync_state.safe_head().block_info.number < state.sync_state.unsafe_head().block_info.number diff --git a/crates/node/engine/src/task_queue/tasks/finalize/error.rs b/crates/node/engine/src/task_queue/tasks/finalize/error.rs index e8ac414656..31ad81fff6 100644 --- a/crates/node/engine/src/task_queue/tasks/finalize/error.rs +++ b/crates/node/engine/src/task_queue/tasks/finalize/error.rs @@ -1,6 +1,8 @@ //! Contains error types for the [crate::FinalizeTask]. -use crate::EngineTaskError; +use crate::{ + EngineTaskError, ForkchoiceTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, +}; use alloy_transport::{RpcError, TransportErrorKind}; use kona_protocol::FromBlockError; use thiserror::Error; @@ -22,15 +24,19 @@ pub enum FinalizeTaskError { /// A temporary RPC failure. #[error(transparent)] TransportError(#[from] RpcError), + /// The forkchoice update call to finalize the block failed. + #[error(transparent)] + ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), } -impl From for EngineTaskError { - fn from(value: FinalizeTaskError) -> Self { - match value { - FinalizeTaskError::BlockNotSafe => Self::Critical(Box::new(value)), - FinalizeTaskError::BlockNotFound(_) => Self::Critical(Box::new(value)), - FinalizeTaskError::FromBlock(_) => Self::Critical(Box::new(value)), - FinalizeTaskError::TransportError(_) => Self::Temporary(Box::new(value)), +impl EngineTaskError for FinalizeTaskError { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::BlockNotSafe => EngineTaskErrorSeverity::Critical, + Self::BlockNotFound(_) => EngineTaskErrorSeverity::Critical, + Self::FromBlock(_) => EngineTaskErrorSeverity::Critical, + Self::TransportError(_) => EngineTaskErrorSeverity::Temporary, + Self::ForkchoiceUpdateFailed(inner) => inner.severity(), } } } diff --git a/crates/node/engine/src/task_queue/tasks/finalize/task.rs b/crates/node/engine/src/task_queue/tasks/finalize/task.rs index 5c3c3ba6f3..ab6fc0acee 100644 --- a/crates/node/engine/src/task_queue/tasks/finalize/task.rs +++ b/crates/node/engine/src/task_queue/tasks/finalize/task.rs @@ -1,8 +1,8 @@ //! A task for finalizing an L2 block. use crate::{ - EngineClient, EngineState, EngineTaskError, EngineTaskExt, FinalizeTaskError, ForkchoiceTask, - Metrics, state::EngineSyncStateUpdate, + EngineClient, EngineState, EngineTaskExt, FinalizeTaskError, ForkchoiceTask, Metrics, + state::EngineSyncStateUpdate, }; use alloy_provider::Provider; use async_trait::async_trait; @@ -33,10 +33,12 @@ impl FinalizeTask { impl EngineTaskExt for FinalizeTask { type Output = (); - async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + type Error = FinalizeTaskError; + + async fn execute(&self, state: &mut EngineState) -> Result<(), FinalizeTaskError> { // Sanity check that the block that is being finalized is at least safe. if state.sync_state.safe_head().block_info.number < self.block_number { - return Err(FinalizeTaskError::BlockNotSafe.into()); + return Err(FinalizeTaskError::BlockNotSafe); } let block_fetch_start = Instant::now(); diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/error.rs b/crates/node/engine/src/task_queue/tasks/forkchoice/error.rs index 82e427c436..602079c123 100644 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/error.rs +++ b/crates/node/engine/src/task_queue/tasks/forkchoice/error.rs @@ -1,6 +1,6 @@ //! Contains error types for the [crate::ForkchoiceTask]. -use crate::EngineTaskError; +use crate::{EngineTaskError, task_queue::tasks::task::EngineTaskErrorSeverity}; use alloy_rpc_types_engine::PayloadStatusEnum; use alloy_transport::{RpcError, TransportErrorKind}; use thiserror::Error; @@ -31,16 +31,16 @@ pub enum ForkchoiceTaskError { UnexpectedPayloadStatus(PayloadStatusEnum), } -impl From for EngineTaskError { - fn from(value: ForkchoiceTaskError) -> Self { - match value { - ForkchoiceTaskError::NoForkchoiceUpdateNeeded => Self::Temporary(Box::new(value)), - ForkchoiceTaskError::EngineSyncing => Self::Temporary(Box::new(value)), - ForkchoiceTaskError::ForkchoiceUpdateFailed(_) => Self::Temporary(Box::new(value)), - ForkchoiceTaskError::FinalizedAheadOfUnsafe(_, _) => Self::Critical(Box::new(value)), - ForkchoiceTaskError::UnexpectedPayloadStatus(_) => Self::Critical(Box::new(value)), - ForkchoiceTaskError::InvalidForkchoiceState => Self::Reset(Box::new(value)), - ForkchoiceTaskError::InvalidPayloadStatus(_) => Self::Reset(Box::new(value)), +impl EngineTaskError for ForkchoiceTaskError { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::NoForkchoiceUpdateNeeded => EngineTaskErrorSeverity::Temporary, + Self::EngineSyncing => EngineTaskErrorSeverity::Temporary, + Self::ForkchoiceUpdateFailed(_) => EngineTaskErrorSeverity::Temporary, + Self::FinalizedAheadOfUnsafe(_, _) => EngineTaskErrorSeverity::Critical, + Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical, + Self::InvalidForkchoiceState => EngineTaskErrorSeverity::Reset, + Self::InvalidPayloadStatus(_) => EngineTaskErrorSeverity::Reset, } } } diff --git a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs index 71bcd22b85..4d522a61fc 100644 --- a/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs +++ b/crates/node/engine/src/task_queue/tasks/forkchoice/task.rs @@ -1,8 +1,8 @@ //! A task for the `engine_forkchoiceUpdated` method, with no attributes. use crate::{ - EngineClient, EngineForkchoiceVersion, EngineState, EngineTaskError, EngineTaskExt, - ForkchoiceTaskError, Metrics, state::EngineSyncStateUpdate, + EngineClient, EngineForkchoiceVersion, EngineState, EngineTaskExt, ForkchoiceTaskError, + Metrics, state::EngineSyncStateUpdate, }; use alloy_provider::ext::EngineApi; use alloy_rpc_types_engine::{INVALID_FORK_CHOICE_STATE_ERROR, PayloadId, PayloadStatusEnum}; @@ -79,8 +79,9 @@ impl ForkchoiceTask { #[async_trait] impl EngineTaskExt for ForkchoiceTask { type Output = Option; + type Error = ForkchoiceTaskError; - async fn execute(&self, state: &mut EngineState) -> Result { + async fn execute(&self, state: &mut EngineState) -> Result { // Apply the sync state update to the engine state. let new_sync_state = state.sync_state.apply_update(self.state_update); @@ -93,7 +94,7 @@ impl EngineTaskExt for ForkchoiceTask { state.sync_state == new_sync_state && self.envelope.is_none() { - return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded.into()); + return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded); } // Check if the head is behind the finalized head. @@ -103,8 +104,7 @@ impl EngineTaskExt for ForkchoiceTask { return Err(ForkchoiceTaskError::FinalizedAheadOfUnsafe( new_sync_state.unsafe_head().block_info.number, new_sync_state.finalized_head().block_info.number, - ) - .into()); + )); } let fcu_time_start = Instant::now(); diff --git a/crates/node/engine/src/task_queue/tasks/insert/error.rs b/crates/node/engine/src/task_queue/tasks/insert/error.rs index 66fd4b5f84..2233cbe371 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/error.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/error.rs @@ -1,18 +1,20 @@ -//! Contains the error types for the [InsertUnsafeTask]. +//! Contains the error types for the [InsertTask]. //! -//! [InsertUnsafeTask]: crate::InsertUnsafeTask +//! [InsertTask]: crate::InsertTask -use crate::EngineTaskError; +use crate::{ + EngineTaskError, ForkchoiceTaskError, task_queue::tasks::task::EngineTaskErrorSeverity, +}; use alloy_rpc_types_engine::PayloadStatusEnum; use alloy_transport::{RpcError, TransportErrorKind}; use kona_protocol::FromBlockError; use op_alloy_rpc_types_engine::OpPayloadError; -/// An error that occurs when running the [InsertUnsafeTask]. +/// An error that occurs when running the [InsertTask]. /// -/// [InsertUnsafeTask]: crate::InsertUnsafeTask +/// [InsertTask]: crate::InsertTask #[derive(Debug, thiserror::Error)] -pub enum InsertUnsafeTaskError { +pub enum InsertTaskError { /// Error converting a payload into a block. #[error(transparent)] FromBlockError(#[from] OpPayloadError), @@ -28,16 +30,20 @@ pub enum InsertUnsafeTaskError { /// Error converting the payload + chain genesis into an L2 block info. #[error(transparent)] L2BlockInfoConstruction(#[from] FromBlockError), + /// The forkchoice update call to consolidate the block into the engine state failed. + #[error(transparent)] + ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError), } -impl From for EngineTaskError { - fn from(value: InsertUnsafeTaskError) -> Self { - match value { - InsertUnsafeTaskError::FromBlockError(_) => Self::Critical(Box::new(value)), - InsertUnsafeTaskError::InsertFailed(_) => Self::Temporary(Box::new(value)), - InsertUnsafeTaskError::UnexpectedPayloadStatus(_) => Self::Critical(Box::new(value)), - InsertUnsafeTaskError::L2BlockInfoConstruction(_) => Self::Critical(Box::new(value)), - InsertUnsafeTaskError::InconsistentForkchoiceState => Self::Reset(Box::new(value)), +impl EngineTaskError for InsertTaskError { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::FromBlockError(_) => EngineTaskErrorSeverity::Critical, + Self::InsertFailed(_) => EngineTaskErrorSeverity::Temporary, + Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical, + Self::L2BlockInfoConstruction(_) => EngineTaskErrorSeverity::Critical, + Self::InconsistentForkchoiceState => EngineTaskErrorSeverity::Reset, + Self::ForkchoiceUpdateFailed(inner) => inner.severity(), } } } diff --git a/crates/node/engine/src/task_queue/tasks/insert/mod.rs b/crates/node/engine/src/task_queue/tasks/insert/mod.rs index f824374602..397e2e5513 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/mod.rs @@ -1,7 +1,7 @@ //! Task to insert an unsafe payload into the execution engine. mod task; -pub use task::InsertUnsafeTask; +pub use task::InsertTask; mod error; -pub use error::InsertUnsafeTaskError; +pub use error::InsertTaskError; diff --git a/crates/node/engine/src/task_queue/tasks/insert/task.rs b/crates/node/engine/src/task_queue/tasks/insert/task.rs index 5f90083c81..2e6a8f30f7 100644 --- a/crates/node/engine/src/task_queue/tasks/insert/task.rs +++ b/crates/node/engine/src/task_queue/tasks/insert/task.rs @@ -1,8 +1,8 @@ //! A task to insert an unsafe payload into the execution engine. use crate::{ - EngineClient, EngineState, EngineTaskError, EngineTaskExt, ForkchoiceTask, - InsertUnsafeTaskError, Metrics, state::EngineSyncStateUpdate, + EngineClient, EngineState, EngineTaskExt, ForkchoiceTask, InsertTaskError, Metrics, + state::EngineSyncStateUpdate, }; use alloy_eips::eip7685::EMPTY_REQUESTS_HASH; use alloy_provider::ext::EngineApi; @@ -19,25 +19,29 @@ use op_alloy_rpc_types_engine::{ }; use std::{sync::Arc, time::Instant}; -/// The task to insert an unsafe payload into the execution engine. +/// The task to insert a payload into the execution engine. #[derive(Debug, Clone)] -pub struct InsertUnsafeTask { +pub struct InsertTask { /// The engine client. client: Arc, /// The rollup config. rollup_config: Arc, /// The network payload envelope. envelope: OpExecutionPayloadEnvelope, + /// If the payload is safe this is true. + /// A payload is safe if it is derived from a safe block. + is_payload_safe: bool, } -impl InsertUnsafeTask { +impl InsertTask { /// Creates a new insert task. pub const fn new( client: Arc, rollup_config: Arc, envelope: OpExecutionPayloadEnvelope, + is_attributes_derived: bool, ) -> Self { - Self { client, rollup_config, envelope } + Self { client, rollup_config, envelope, is_payload_safe: is_attributes_derived } } /// Checks the response of the `engine_newPayload` call. @@ -47,10 +51,12 @@ impl InsertUnsafeTask { } #[async_trait] -impl EngineTaskExt for InsertUnsafeTask { +impl EngineTaskExt for InsertTask { type Output = (); - async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + type Error = InsertTaskError; + + async fn execute(&self, state: &mut EngineState) -> Result<(), InsertTaskError> { let time_start = Instant::now(); // Insert the new payload. @@ -64,7 +70,7 @@ impl EngineTaskExt for InsertUnsafeTask { .payload .clone() .try_into_block() - .map_err(InsertUnsafeTaskError::FromBlockError)?, + .map_err(InsertTaskError::FromBlockError)?, ), OpExecutionPayload::V2(payload) => { let payload_input = ExecutionPayloadInputV2 { @@ -77,7 +83,7 @@ impl EngineTaskExt for InsertUnsafeTask { .payload .clone() .try_into_block() - .map_err(InsertUnsafeTaskError::FromBlockError)?, + .map_err(InsertTaskError::FromBlockError)?, ) } OpExecutionPayload::V3(payload) => ( @@ -88,7 +94,7 @@ impl EngineTaskExt for InsertUnsafeTask { .try_into_block_with_sidecar(&OpExecutionPayloadSidecar::v3( CancunPayloadFields::new(parent_beacon_block_root, vec![]), )) - .map_err(InsertUnsafeTaskError::FromBlockError)?, + .map_err(InsertTaskError::FromBlockError)?, ), OpExecutionPayload::V4(payload) => ( self.client.new_payload_v4(payload, parent_beacon_block_root).await, @@ -99,23 +105,23 @@ impl EngineTaskExt for InsertUnsafeTask { CancunPayloadFields::new(parent_beacon_block_root, vec![]), PraguePayloadFields::new(EMPTY_REQUESTS_HASH), )) - .map_err(InsertUnsafeTaskError::FromBlockError)?, + .map_err(InsertTaskError::FromBlockError)?, ), }; // Check the `engine_newPayload` response. let response = match response { Ok(resp) => resp, - Err(e) => return Err(InsertUnsafeTaskError::InsertFailed(e).into()), + Err(e) => return Err(InsertTaskError::InsertFailed(e)), }; if !self.check_new_payload_status(&response.status) { - return Err(InsertUnsafeTaskError::UnexpectedPayloadStatus(response.status).into()); + return Err(InsertTaskError::UnexpectedPayloadStatus(response.status)); } let insert_duration = insert_time_start.elapsed(); let new_unsafe_ref = L2BlockInfo::from_block_and_genesis(&block, &self.rollup_config.genesis) - .map_err(InsertUnsafeTaskError::L2BlockInfoConstruction)?; + .map_err(InsertTaskError::L2BlockInfoConstruction)?; // Send a FCU to canonicalize the imported block. ForkchoiceTask::new( @@ -124,6 +130,8 @@ impl EngineTaskExt for InsertUnsafeTask { EngineSyncStateUpdate { cross_unsafe_head: Some(new_unsafe_ref), unsafe_head: Some(new_unsafe_ref), + local_safe_head: self.is_payload_safe.then_some(new_unsafe_ref), + safe_head: self.is_payload_safe.then_some(new_unsafe_ref), ..Default::default() }, None, diff --git a/crates/node/engine/src/task_queue/tasks/mod.rs b/crates/node/engine/src/task_queue/tasks/mod.rs index 9599af0605..81b44794f8 100644 --- a/crates/node/engine/src/task_queue/tasks/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/mod.rs @@ -1,13 +1,15 @@ //! Tasks to update the engine state. mod task; -pub use task::{EngineTask, EngineTaskError, EngineTaskExt}; +pub use task::{ + EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, EngineTaskExt, +}; mod forkchoice; pub use forkchoice::{ForkchoiceTask, ForkchoiceTaskError}; mod insert; -pub use insert::{InsertUnsafeTask, InsertUnsafeTaskError}; +pub use insert::{InsertTask, InsertTaskError}; mod build; pub use build::{BuildTask, BuildTaskError}; diff --git a/crates/node/engine/src/task_queue/tasks/task.rs b/crates/node/engine/src/task_queue/tasks/task.rs index 1ce7654b03..1f4c5b203b 100644 --- a/crates/node/engine/src/task_queue/tasks/task.rs +++ b/crates/node/engine/src/task_queue/tasks/task.rs @@ -2,12 +2,84 @@ //! //! [`Engine`]: crate::Engine -use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertUnsafeTask}; -use crate::EngineState; +use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertTask}; +use crate::{ + BuildTaskError, ConsolidateTaskError, EngineState, FinalizeTaskError, ForkchoiceTaskError, + InsertTaskError, +}; use async_trait::async_trait; use std::cmp::Ordering; use thiserror::Error; +/// The severity of an engine task error. +/// +/// This is used to determine how to handle the error when draining the engine task queue. +#[derive(Debug, PartialEq, Eq)] +pub enum EngineTaskErrorSeverity { + /// The error is temporary and the task is retried. + Temporary, + /// The error is critical and is propagated to the engine actor. + Critical, + /// The error indicates that the engine should be reset. + Reset, + /// The error indicates that the engine should be flushed. + Flush, +} + +/// The interface for an engine task error. +/// +/// An engine task error should have an associated severity level to specify how to handle the error +/// when draining the engine task queue. +pub trait EngineTaskError { + /// The severity of the error. + fn severity(&self) -> EngineTaskErrorSeverity; +} + +/// The interface for an engine task. +#[async_trait] +pub trait EngineTaskExt { + /// The output type of the task. + type Output; + + /// The error type of the task. + type Error: EngineTaskError; + + /// Executes the task, taking a shared lock on the engine state and `self`. + async fn execute(&self, state: &mut EngineState) -> Result; +} + +/// An error that may occur during an [`EngineTask`]'s execution. +#[derive(Error, Debug)] +pub enum EngineTaskErrors { + /// An error that occurred while updating the forkchoice state. + #[error(transparent)] + Forkchoice(#[from] ForkchoiceTaskError), + /// An error that occurred while inserting a block into the engine. + #[error(transparent)] + Insert(#[from] InsertTaskError), + /// An error that occurred while building a block. + #[error(transparent)] + Build(#[from] BuildTaskError), + /// An error that occurred while consolidating the engine state. + #[error(transparent)] + Consolidate(#[from] ConsolidateTaskError), + /// An error that occurred while finalizing an L2 block. + #[error(transparent)] + Finalize(#[from] FinalizeTaskError), +} + +impl EngineTaskError for EngineTaskErrors { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::Forkchoice(inner) => inner.severity(), + Self::Insert(inner) => inner.severity(), + Self::Build(inner) => inner.severity(), + Self::Consolidate(inner) => inner.severity(), + Self::Finalize(inner) => inner.severity(), + } + } +} + /// Tasks that may be inserted into and executed by the [`Engine`]. /// /// [`Engine`]: crate::Engine @@ -16,10 +88,10 @@ pub enum EngineTask { /// Perform a `engine_forkchoiceUpdated` call with the current [`EngineState`]'s forkchoice, /// and no payload attributes. ForkchoiceUpdate(ForkchoiceTask), - /// Inserts an unsafe payload into the execution engine. - InsertUnsafe(InsertUnsafeTask), + /// Inserts a payload into the execution engine. + Insert(InsertTask), /// Builds a new block with the given attributes, and inserts it into the execution engine. - BuildBlock(BuildTask), + Build(BuildTask), /// Performs consolidation on the engine state, reverting to payload attribute processing /// via the [`BuildTask`] if consolidation fails. Consolidate(ConsolidateTask), @@ -29,14 +101,16 @@ pub enum EngineTask { impl EngineTask { /// Executes the task without consuming it. - async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + async fn execute_inner(&self, state: &mut EngineState) -> Result<(), EngineTaskErrors> { match self.clone() { - Self::ForkchoiceUpdate(task) => task.execute(state).await.map(|_| ()), - Self::InsertUnsafe(task) => task.execute(state).await, - Self::BuildBlock(task) => task.execute(state).await, - Self::Consolidate(task) => task.execute(state).await, - Self::Finalize(task) => task.execute(state).await, - } + Self::ForkchoiceUpdate(task) => task.execute(state).await.map(|_| ())?, + Self::Insert(task) => task.execute(state).await?, + Self::Build(task) => task.execute(state).await?, + Self::Consolidate(task) => task.execute(state).await?, + Self::Finalize(task) => task.execute(state).await?, + }; + + Ok(()) } } @@ -45,8 +119,8 @@ impl PartialEq for EngineTask { matches!( (self, other), (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) | - (Self::InsertUnsafe(_), Self::InsertUnsafe(_)) | - (Self::BuildBlock(_), Self::BuildBlock(_)) | + (Self::Insert(_), Self::Insert(_)) | + (Self::Build(_), Self::Build(_)) | (Self::Consolidate(_), Self::Consolidate(_)) | (Self::Finalize(_), Self::Finalize(_)) ) @@ -76,9 +150,9 @@ impl Ord for EngineTask { // chain via derivation. match (self, other) { // Same variant cases - (Self::InsertUnsafe(_), Self::InsertUnsafe(_)) => Ordering::Equal, + (Self::Insert(_), Self::Insert(_)) => Ordering::Equal, (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal, - (Self::BuildBlock(_), Self::BuildBlock(_)) => Ordering::Equal, + (Self::Build(_), Self::Build(_)) => Ordering::Equal, (Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) => Ordering::Equal, (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal, @@ -87,12 +161,12 @@ impl Ord for EngineTask { (_, Self::ForkchoiceUpdate(_)) => Ordering::Less, // BuildBlock tasks are prioritized over InsertUnsafe and Consolidate tasks - (Self::BuildBlock(_), _) => Ordering::Greater, - (_, Self::BuildBlock(_)) => Ordering::Less, + (Self::Build(_), _) => Ordering::Greater, + (_, Self::Build(_)) => Ordering::Less, // InsertUnsafe tasks are prioritized over Consolidate and Finalize tasks - (Self::InsertUnsafe(_), _) => Ordering::Greater, - (_, Self::InsertUnsafe(_)) => Ordering::Less, + (Self::Insert(_), _) => Ordering::Greater, + (_, Self::Insert(_)) => Ordering::Less, // Consolidate tasks are prioritized over Finalize tasks (Self::Consolidate(_), _) => Ordering::Greater, @@ -105,25 +179,27 @@ impl Ord for EngineTask { impl EngineTaskExt for EngineTask { type Output = (); - async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> { + type Error = EngineTaskErrors; + + async fn execute(&self, state: &mut EngineState) -> Result<(), Self::Error> { // Retry the task until it succeeds or a critical error occurs. while let Err(e) = self.execute_inner(state).await { - match e { - EngineTaskError::Temporary(e) => { + match e.severity() { + EngineTaskErrorSeverity::Temporary => { trace!(target: "engine", "{e}"); continue; } - EngineTaskError::Critical(e) => { + EngineTaskErrorSeverity::Critical => { error!(target: "engine", "{e}"); - return Err(EngineTaskError::Critical(e)); + return Err(e); } - EngineTaskError::Reset(e) => { + EngineTaskErrorSeverity::Reset => { warn!(target: "engine", "Engine requested derivation reset"); - return Err(EngineTaskError::Reset(e)); + return Err(e); } - EngineTaskError::Flush(e) => { + EngineTaskErrorSeverity::Flush => { warn!(target: "engine", "Engine requested derivation flush"); - return Err(EngineTaskError::Flush(e)); + return Err(e); } } } @@ -131,30 +207,3 @@ impl EngineTaskExt for EngineTask { Ok(()) } } - -/// The interface for an engine task. -#[async_trait] -pub trait EngineTaskExt { - /// The output type of the task. - type Output; - - /// Executes the task, taking a shared lock on the engine state and `self`. - async fn execute(&self, state: &mut EngineState) -> Result; -} - -/// An error that may occur during an [`EngineTask`]'s execution. -#[derive(Error, Debug)] -pub enum EngineTaskError { - /// A temporary error within the engine. - #[error("Temporary engine task error: {0}")] - Temporary(Box), - /// A critical error within the engine. - #[error("Critical engine task error: {0}")] - Critical(Box), - /// An error that requires a derivation pipeline reset. - #[error("Derivation pipeline reset required: {0}")] - Reset(Box), - /// An error that requires the derivation pipeline to be flushed. - #[error("Derivation pipeline flush required: {0}")] - Flush(Box), -} diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index f593808c36..8072170323 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -7,7 +7,8 @@ use futures::future::OptionFuture; use kona_derive::{ResetSignal, Signal}; use kona_engine::{ BuildTask, ConsolidateTask, Engine, EngineClient, EngineQueries, - EngineState as InnerEngineState, EngineTask, EngineTaskError, InsertUnsafeTask, + EngineState as InnerEngineState, EngineTask, EngineTaskError, EngineTaskErrorSeverity, + InsertTask, }; use kona_genesis::RollupConfig; use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; @@ -269,33 +270,37 @@ impl EngineActorState { Ok(_) => { trace!(target: "engine", "[ENGINE] tasks drained"); } - Err(EngineTaskError::Reset(err)) => { - warn!(target: "engine", ?err, "Received reset request"); - self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer).await?; - } - Err(EngineTaskError::Flush(err)) => { - // This error is encountered when the payload is marked INVALID - // by the engine api. Post-holocene, the payload is replaced by - // a "deposits-only" block and re-executed. At the same time, - // the channel and any remaining buffered batches are flushed. - warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline."); - match derivation_signal_tx.send(Signal::FlushChannel).await { - Ok(_) => { - debug!(target: "engine", "Sent flush signal to derivation actor") + Err(err) => { + match err.severity() { + EngineTaskErrorSeverity::Critical => { + error!(target: "engine", ?err, "Critical error draining engine tasks"); + return Err(err.into()); } - Err(err) => { - error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor."); - return Err(EngineError::ChannelClosed); + EngineTaskErrorSeverity::Reset => { + warn!(target: "engine", ?err, "Received reset request"); + self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer).await?; + } + EngineTaskErrorSeverity::Flush => { + // This error is encountered when the payload is marked INVALID + // by the engine api. Post-holocene, the payload is replaced by + // a "deposits-only" block and re-executed. At the same time, + // the channel and any remaining buffered batches are flushed. + warn!(target: "engine", ?err, "Invalid payload, Flushing derivation pipeline."); + match derivation_signal_tx.send(Signal::FlushChannel).await { + Ok(_) => { + debug!(target: "engine", "Sent flush signal to derivation actor") + } + Err(err) => { + error!(target: "engine", ?err, "Failed to send flush signal to the derivation actor."); + return Err(EngineError::ChannelClosed); + } + } + } + EngineTaskErrorSeverity::Temporary => { + trace!(target: "engine", ?err, "Temporary error draining engine tasks"); } } } - Err(err @ EngineTaskError::Critical(_)) => { - error!(target: "engine", ?err, "Critical error draining engine tasks"); - return Err(err.into()); - } - Err(EngineTaskError::Temporary(err)) => { - trace!(target: "engine", ?err, "Temporary error draining engine tasks"); - } } self.maybe_update_safe_head(engine_l2_safe_head_tx); @@ -432,7 +437,7 @@ impl NodeActor for EngineActor { return Err(EngineError::ChannelClosed); }; - let task = EngineTask::BuildBlock(BuildTask::new( + let task = EngineTask::Build(BuildTask::new( state.client.clone(), state.rollup.clone(), attributes, @@ -448,10 +453,11 @@ impl NodeActor for EngineActor { cancellation.cancel(); return Err(EngineError::ChannelClosed); }; - let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new( + let task = EngineTask::Insert(InsertTask::new( state.client.clone(), state.rollup.clone(), envelope, + false, // The payload is not derived in this case. This is an unsafe block. )); state.engine.enqueue(task); } diff --git a/crates/node/service/src/actors/engine/error.rs b/crates/node/service/src/actors/engine/error.rs index 459748aa11..64fe428d00 100644 --- a/crates/node/service/src/actors/engine/error.rs +++ b/crates/node/service/src/actors/engine/error.rs @@ -2,7 +2,7 @@ //! //! [`EngineActor`]: super::EngineActor -use kona_engine::{EngineResetError, EngineTaskError}; +use kona_engine::{EngineResetError, EngineTaskErrors}; /// An error from the [`EngineActor`]. /// @@ -17,5 +17,5 @@ pub enum EngineError { EngineReset(#[from] EngineResetError), /// Engine task error. #[error(transparent)] - EngineTask(#[from] EngineTaskError), + EngineTask(#[from] EngineTaskErrors), }