Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/node/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ extern crate tracing;
mod task_queue;
pub use task_queue::{
BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError,
EngineTask, EngineTaskError, EngineTaskExt, FinalizeTask, FinalizeTaskError, ForkchoiceTask,
ForkchoiceTaskError, InsertUnsafeTask, InsertUnsafeTaskError,
EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, EngineTaskExt,
FinalizeTask, FinalizeTaskError, ForkchoiceTask, ForkchoiceTaskError, InsertTask,
InsertTaskError,
};

mod attributes;
Expand Down
12 changes: 7 additions & 5 deletions crates/node/engine/src/task_queue/core.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s.

use super::{EngineTaskError, EngineTaskExt};
use super::EngineTaskExt;
use crate::{
EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, ForkchoiceTask, Metrics,
task_queue::EngineTaskErrors,
};
use alloy_provider::Provider;
use alloy_rpc_types_eth::Transaction;
Expand Down Expand Up @@ -88,7 +89,8 @@ impl Engine {
None,
)
.execute(&mut self.state)
.await?;
.await
.map_err(EngineTaskErrors::Forkchoice)?;

// Find the new safe head's L1 origin and SystemConfig.
let origin_block = start
Expand Down Expand Up @@ -128,7 +130,7 @@ impl Engine {
/// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
/// an error along the way, it is not popped from the queue (in case it must be retried) and
/// the error is returned.
pub async fn drain(&mut self) -> Result<(), EngineTaskError> {
pub async fn drain(&mut self) -> Result<(), EngineTaskErrors> {
// Drain tasks in order of priority, halting on errors for a retry to be attempted.
while let Some(task) = self.tasks.peek() {
// Execute the task
Expand All @@ -148,9 +150,9 @@ impl Engine {
/// An error occurred while attempting to reset the [`Engine`].
#[derive(Debug, Error)]
pub enum EngineResetError {
/// An error that originated from within the engine task.
/// An error that originated from within an engine task.
#[error(transparent)]
Task(#[from] EngineTaskError),
Task(#[from] EngineTaskErrors),
/// An error occurred while traversing the L1 for the sync starting point.
#[error(transparent)]
SyncStart(#[from] SyncStartError),
Expand Down
41 changes: 26 additions & 15 deletions crates/node/engine/src/task_queue/tasks/build/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Contains error types for the [crate::ForkchoiceTask].

use crate::EngineTaskError;
use crate::{
EngineTaskError, ForkchoiceTaskError, InsertTaskError,
task_queue::tasks::task::EngineTaskErrorSeverity,
};
use alloy_rpc_types_engine::PayloadStatusEnum;
use alloy_transport::{RpcError, TransportErrorKind};
use kona_protocol::FromBlockError;
Expand All @@ -20,6 +23,12 @@ pub enum BuildTaskError {
/// Missing payload ID.
#[error("Missing payload ID")]
MissingPayloadId,
/// The initial forkchoice update call to the engine api failed.
#[error(transparent)]
ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError),
/// Impossible to insert the payload into the engine.
#[error(transparent)]
PayloadInsertionFailed(#[from] InsertTaskError),
/// Unexpected payload status
#[error("Unexpected payload status: {0}")]
UnexpectedPayloadStatus(PayloadStatusEnum),
Expand Down Expand Up @@ -50,20 +59,22 @@ pub enum BuildTaskError {
MpscSend(#[from] mpsc::error::SendError<OpExecutionPayloadEnvelope>),
}

impl From<BuildTaskError> for EngineTaskError {
fn from(value: BuildTaskError) -> Self {
match value {
BuildTaskError::NoForkchoiceUpdateNeeded => Self::Temporary(Box::new(value)),
BuildTaskError::EngineSyncing => Self::Temporary(Box::new(value)),
BuildTaskError::GetPayloadFailed(_) => Self::Temporary(Box::new(value)),
BuildTaskError::NewPayloadFailed(_) => Self::Temporary(Box::new(value)),
BuildTaskError::HoloceneInvalidFlush => Self::Flush(Box::new(value)),
BuildTaskError::MissingPayloadId => Self::Critical(Box::new(value)),
BuildTaskError::UnexpectedPayloadStatus(_) => Self::Critical(Box::new(value)),
BuildTaskError::DepositOnlyPayloadReattemptFailed => Self::Critical(Box::new(value)),
BuildTaskError::DepositOnlyPayloadFailed => Self::Critical(Box::new(value)),
BuildTaskError::FromBlock(_) => Self::Critical(Box::new(value)),
BuildTaskError::MpscSend(_) => Self::Critical(Box::new(value)),
impl EngineTaskError for BuildTaskError {
fn severity(&self) -> EngineTaskErrorSeverity {
match self {
Self::ForkchoiceUpdateFailed(inner) => inner.severity(),
Self::PayloadInsertionFailed(inner) => inner.severity(),
Self::NoForkchoiceUpdateNeeded => EngineTaskErrorSeverity::Temporary,
Self::EngineSyncing => EngineTaskErrorSeverity::Temporary,
Self::GetPayloadFailed(_) => EngineTaskErrorSeverity::Temporary,
Self::NewPayloadFailed(_) => EngineTaskErrorSeverity::Temporary,
Self::HoloceneInvalidFlush => EngineTaskErrorSeverity::Flush,
Self::MissingPayloadId => EngineTaskErrorSeverity::Critical,
Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical,
Self::DepositOnlyPayloadReattemptFailed => EngineTaskErrorSeverity::Critical,
Self::DepositOnlyPayloadFailed => EngineTaskErrorSeverity::Critical,
Self::FromBlock(_) => EngineTaskErrorSeverity::Critical,
Self::MpscSend(_) => EngineTaskErrorSeverity::Critical,
}
}
}
Loading