Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/node/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ extern crate tracing;

mod task_queue;
pub use task_queue::{
BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError,
EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, EngineTaskExt,
FinalizeTask, FinalizeTaskError, ForkchoiceTask, ForkchoiceTaskError, InsertTask,
InsertTaskError,
BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineBuildError,
EngineResetError, EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors,
EngineTaskExt, FinalizeTask, FinalizeTaskError, InsertTask, InsertTaskError, SynchronizeTask,
SynchronizeTaskError,
};

mod attributes;
Expand Down
28 changes: 15 additions & 13 deletions crates/node/engine/src/task_queue/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use super::EngineTaskExt;
use crate::{
EngineClient, EngineState, EngineSyncStateUpdate, EngineTask, EngineTaskError,
EngineTaskErrorSeverity, ForkchoiceTask, Metrics, task_queue::EngineTaskErrors,
EngineTaskErrorSeverity, Metrics, SynchronizeTask, SynchronizeTaskError,
task_queue::EngineTaskErrors,
};
use alloy_provider::Provider;
use alloy_rpc_types_eth::Transaction;
Expand Down Expand Up @@ -42,9 +43,6 @@ pub struct Engine {

impl Engine {
/// Creates a new [`Engine`] with an empty task queue and the passed initial [`EngineState`].
///
/// An initial [`EngineTask::ForkchoiceUpdate`] is added to the task queue to synchronize the
/// engine with the forkchoice state of the [`EngineState`].
pub fn new(
initial_state: EngineState,
state_sender: Sender<EngineState>,
Expand Down Expand Up @@ -89,7 +87,8 @@ impl Engine {
let start =
find_starting_forkchoice(&config, client.l1_provider(), client.l2_provider()).await?;

if let Err(err) = ForkchoiceTask::new(
// Retry to synchronize the engine until we succeeds or a critical error occurs.
while let Err(err) = SynchronizeTask::new(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Motivation, the synchronize task can only fail if:

  • RPC error (temporary error) -> we should retry to reset
  • finalized ahead of unsafe (critical error) -> should never happen if find_starting_forkchoice is correct
  • unexpected payload status (temporary error) -> this should never happen when we don't provide payload attributes. The synchronize task should only return Valid or EngineSyncing status
  • invalid forkchoice state -> try to reset again

If we don't have a while loop here:

  • we will keep adding tasks with the former engine state
  • this will cause the engine to reset because the tasks will return invalid forkchoice state

client.clone(),
config.clone(),
EngineSyncStateUpdate {
Expand All @@ -99,16 +98,19 @@ impl Engine {
safe_head: Some(start.safe),
finalized_head: Some(start.finalized),
},
None,
)
.execute(&mut self.state)
.await
{
// Ignore temporary errors.
if matches!(err.severity(), EngineTaskErrorSeverity::Temporary) {
debug!(target: "engine", "Forkchoice update failed temporarily during reset: {}", err);
} else {
return Err(EngineTaskErrors::Forkchoice(err).into());
match err.severity() {
EngineTaskErrorSeverity::Temporary |
EngineTaskErrorSeverity::Flush |
EngineTaskErrorSeverity::Reset => {
debug!(target: "engine", ?err, "Forkchoice update failed during reset. Trying again...");
}
EngineTaskErrorSeverity::Critical => {
return Err(EngineResetError::Forkchoice(err));
}
}
}

Expand Down Expand Up @@ -172,9 +174,9 @@ impl Engine {
/// An error occurred while attempting to reset the [`Engine`].
#[derive(Debug, Error)]
pub enum EngineResetError {
/// An error that originated from within an engine task.
/// An error that occurred while updating the forkchoice state.
#[error(transparent)]
Task(#[from] EngineTaskErrors),
Forkchoice(#[from] SynchronizeTaskError),
/// An error occurred while traversing the L1 for the sync starting point.
#[error(transparent)]
SyncStart(#[from] SyncStartError),
Expand Down
70 changes: 47 additions & 23 deletions crates/node/engine/src/task_queue/tasks/build/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains error types for the [crate::ForkchoiceTask].
//! Contains error types for the [crate::SynchronizeTask].

use crate::{
EngineTaskError, ForkchoiceTaskError, InsertTaskError,
EngineTaskError, InsertTaskError, SynchronizeTaskError,
task_queue::tasks::task::EngineTaskErrorSeverity,
};
use alloy_rpc_types_engine::PayloadStatusEnum;
Expand All @@ -11,33 +11,44 @@ use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope;
use thiserror::Error;
use tokio::sync::mpsc;

/// An error that occurs when running the [crate::ForkchoiceTask].
/// An error that occurs when building a payload in the engine.
#[derive(Debug, Error)]
pub enum BuildTaskError {
/// The forkchoice update is not needed.
#[error("No forkchoice update needed")]
NoForkchoiceUpdateNeeded,
pub enum EngineBuildError {
/// The finalized head is ahead of the unsafe head.
#[error("Finalized head is ahead of unsafe head")]
FinalizedAheadOfUnsafe(u64, u64),
/// The forkchoice update call to the engine api failed.
#[error("Failed to build payload attributes in the engine. Forkchoice RPC error: {0}")]
AttributesInsertionFailed(#[from] RpcError<TransportErrorKind>),
/// The inserted payload is invalid.
#[error("The inserted payload is invalid: {0}")]
InvalidPayload(String),
/// The inserted payload status is unexpected.
#[error("The inserted payload status is unexpected: {0}")]
UnexpectedPayloadStatus(PayloadStatusEnum),
/// The payload ID is missing.
#[error("The inserted payload ID is missing")]
MissingPayloadId,
/// The engine is syncing.
#[error("Attempting to update forkchoice state while EL syncing")]
#[error("The engine is syncing")]
EngineSyncing,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Missing payload ID.
#[error("Missing payload ID")]
MissingPayloadId,
}

/// An error that occurs when running the [crate::SynchronizeTask].
#[derive(Debug, Error)]
pub enum BuildTaskError {
/// An error occurred when building the payload attributes in the engine.
#[error("An error occurred when building the payload attributes to the engine.")]
EngineBuildError(EngineBuildError),
/// The initial forkchoice update call to the engine api failed.
#[error(transparent)]
ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError),
ForkchoiceUpdateFailed(#[from] SynchronizeTaskError),
/// Impossible to insert the payload into the engine.
#[error(transparent)]
PayloadInsertionFailed(#[from] InsertTaskError),
/// Unexpected payload status
#[error("Unexpected payload status: {0}")]
UnexpectedPayloadStatus(PayloadStatusEnum),
/// The get payload call to the engine api failed.
#[error(transparent)]
GetPayloadFailed(RpcError<TransportErrorKind>),
/// The new payload call to the engine api failed.
#[error(transparent)]
NewPayloadFailed(RpcError<TransportErrorKind>),
/// A deposit-only payload failed to import.
#[error("Deposit-only payload failed to import")]
DepositOnlyPayloadFailed,
Expand All @@ -64,13 +75,26 @@ impl EngineTaskError for BuildTaskError {
match self {
Self::ForkchoiceUpdateFailed(inner) => inner.severity(),
Self::PayloadInsertionFailed(inner) => inner.severity(),
Self::NoForkchoiceUpdateNeeded => EngineTaskErrorSeverity::Temporary,
Self::EngineSyncing => EngineTaskErrorSeverity::Temporary,
Self::EngineBuildError(EngineBuildError::FinalizedAheadOfUnsafe(_, _)) => {
EngineTaskErrorSeverity::Critical
}
Self::EngineBuildError(EngineBuildError::AttributesInsertionFailed(_)) => {
EngineTaskErrorSeverity::Temporary
}
Self::EngineBuildError(EngineBuildError::InvalidPayload(_)) => {
EngineTaskErrorSeverity::Temporary
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also should probably be

Suggested change
EngineTaskErrorSeverity::Temporary
EngineTaskErrorSeverity::Critical
  • trying to insert an invalid payload should never happen

}
Self::EngineBuildError(EngineBuildError::UnexpectedPayloadStatus(_)) => {
EngineTaskErrorSeverity::Temporary
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check: I think that this should instead be

Suggested change
}
Self::EngineBuildError(EngineBuildError::UnexpectedPayloadStatus(_)) => {
EngineTaskErrorSeverity::Critical
}

The forkchoice_updated should never return the Accepted variant

Self::EngineBuildError(EngineBuildError::MissingPayloadId) => {
EngineTaskErrorSeverity::Temporary
}
Self::EngineBuildError(EngineBuildError::EngineSyncing) => {
EngineTaskErrorSeverity::Temporary
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note here that:

Self::GetPayloadFailed(_) => EngineTaskErrorSeverity::Temporary,
Self::NewPayloadFailed(_) => EngineTaskErrorSeverity::Temporary,
Self::HoloceneInvalidFlush => EngineTaskErrorSeverity::Flush,
Self::MissingPayloadId => EngineTaskErrorSeverity::Critical,
Self::UnexpectedPayloadStatus(_) => EngineTaskErrorSeverity::Critical,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these variants were either unused or outdated

Self::DepositOnlyPayloadReattemptFailed => EngineTaskErrorSeverity::Critical,
Self::DepositOnlyPayloadFailed => EngineTaskErrorSeverity::Critical,
Self::FromBlock(_) => EngineTaskErrorSeverity::Critical,
Expand Down
2 changes: 1 addition & 1 deletion crates/node/engine/src/task_queue/tasks/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mod task;
pub use task::BuildTask;

mod error;
pub use error::BuildTaskError;
pub use error::{BuildTaskError, EngineBuildError};
142 changes: 119 additions & 23 deletions crates/node/engine/src/task_queue/tasks/build/task.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! A task for building a new block and importing it.
use super::BuildTaskError;
use crate::{
EngineClient, EngineGetPayloadVersion, EngineState, EngineTaskExt, ForkchoiceTask,
ForkchoiceTaskError, InsertTask,
EngineClient, EngineForkchoiceVersion, EngineGetPayloadVersion, EngineState, EngineTaskExt,
InsertTask,
InsertTaskError::{self},
state::EngineSyncStateUpdate,
task_queue::tasks::build::error::EngineBuildError,
};
use alloy_rpc_types_engine::{ExecutionPayload, PayloadId};
use alloy_rpc_types_engine::{ExecutionPayload, PayloadId, PayloadStatusEnum};
use async_trait::async_trait;
use kona_genesis::RollupConfig;
use kona_protocol::{L2BlockInfo, OpAttributesWithParent};
Expand Down Expand Up @@ -43,6 +44,112 @@ impl BuildTask {
Self { engine, cfg, attributes, is_attributes_derived, payload_tx }
}

/// Starts the block building process by sending an initial `engine_forkchoiceUpdate` call with
/// the payload attributes to build.
///
/// ## Observed [PayloadStatusEnum] Variants
/// The `engine_forkchoiceUpdate` payload statuses that this function observes are below. Any
/// other [PayloadStatusEnum] variant is considered a failure.
///
/// ### Success (`VALID`)
/// If the build is successful, the [PayloadId] is returned for sealing and the external
/// actor is notified of the successful forkchoice update.
///
/// ### Failure (`INVALID`)
/// If the forkchoice update fails, the external actor is notified of the failure.
///
/// ### Syncing (`SYNCING`)
/// If the EL is syncing, the payload attributes are buffered and the function returns early.
/// This is a temporary state, and the function should be called again later.
async fn start_build(
&self,
state: &EngineState,
engine_client: &EngineClient,
attributes_envelope: OpAttributesWithParent,
) -> Result<PayloadId, BuildTaskError> {
debug!(
target: "engine_builder",
txs = attributes_envelope.inner.transactions.as_ref().map_or(0, |txs| txs.len()),
"Starting new build job"
);

// Sanity check if the head is behind the finalized head. If it is, this is a critical
// error.
if state.sync_state.unsafe_head().block_info.number <
state.sync_state.finalized_head().block_info.number
{
return Err(BuildTaskError::EngineBuildError(EngineBuildError::FinalizedAheadOfUnsafe(
state.sync_state.unsafe_head().block_info.number,
state.sync_state.finalized_head().block_info.number,
)));
}

// When inserting a payload, we advertise the parent's unsafe head as the current unsafe
// head to build on top of.
let new_forkchoice = state
.sync_state
.apply_update(EngineSyncStateUpdate {
unsafe_head: Some(attributes_envelope.parent),
..Default::default()
})
.create_forkchoice_state();

let forkchoice_version = EngineForkchoiceVersion::from_cfg(
&self.cfg,
attributes_envelope.inner.payload_attributes.timestamp,
);
let update = match forkchoice_version {
EngineForkchoiceVersion::V3 => {
engine_client
.fork_choice_updated_v3(new_forkchoice, Some(attributes_envelope.inner))
.await
}
EngineForkchoiceVersion::V2 => {
engine_client
.fork_choice_updated_v2(new_forkchoice, Some(attributes_envelope.inner))
.await
}
}
.map_err(|e| {
error!(target: "engine_builder", "Forkchoice update failed: {}", e);
BuildTaskError::EngineBuildError(EngineBuildError::AttributesInsertionFailed(e))
})?;

match update.payload_status.status {
PayloadStatusEnum::Valid => {
debug!(
target: "engine_builder",
unsafe_hash = new_forkchoice.head_block_hash.to_string(),
safe_hash = new_forkchoice.safe_block_hash.to_string(),
finalized_hash = new_forkchoice.finalized_block_hash.to_string(),
"Forkchoice update with attributes successful"
);
}
PayloadStatusEnum::Invalid { validation_error } => {
error!(target: "engine_builder", "Forkchoice update failed: {}", validation_error);
return Err(BuildTaskError::EngineBuildError(EngineBuildError::InvalidPayload(
validation_error,
)));
}
PayloadStatusEnum::Syncing => {
warn!(target: "engine_builder", "Forkchoice update failed temporarily: EL is syncing");
return Err(BuildTaskError::EngineBuildError(EngineBuildError::EngineSyncing));
}
s => {
// Other codes are never returned by `engine_forkchoiceUpdate`
return Err(BuildTaskError::EngineBuildError(
EngineBuildError::UnexpectedPayloadStatus(s),
));
}
}

// Fetch the payload ID from the FCU. If no payload ID was returned, something went wrong -
// the block building job on the EL should have been initiated.
update
.payload_id
.ok_or(BuildTaskError::EngineBuildError(EngineBuildError::MissingPayloadId))
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Fetches the execution payload from the EL.
///
/// ## Engine Method Selection
Expand Down Expand Up @@ -129,18 +236,8 @@ impl EngineTaskExt for BuildTask {
// Start the build by sending an FCU call with the current forkchoice and the input
// payload attributes.
let fcu_start_time = Instant::now();
let payload_id = ForkchoiceTask::new(
self.engine.clone(),
self.cfg.clone(),
EngineSyncStateUpdate {
unsafe_head: Some(self.attributes.parent),
..Default::default()
},
Some(self.attributes.clone()),
)
.execute(state)
.await?
.ok_or(BuildTaskError::MissingPayloadId)?;
let payload_id = self.start_build(state, &self.engine, self.attributes.clone()).await?;

let fcu_duration = fcu_start_time.elapsed();

// Fetch the payload just inserted from the EL and import it into the engine.
Expand All @@ -166,18 +263,17 @@ impl EngineTaskExt for BuildTask {
.execute(state)
.await
{
Err(InsertTaskError::ForkchoiceUpdateFailed(
ForkchoiceTaskError::InvalidPayloadStatus(e),
)) if self.attributes.is_deposits_only() => {
Err(InsertTaskError::UnexpectedPayloadStatus(e))
if self.attributes.is_deposits_only() =>
{
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug in the refactor. The error we should match on should be raised when we first try to call new_payload inside the InsertTask. See https://github.com/op-rs/kona/pull/2388/files#diff-47bc5e44b8483f9c829c8829f56e6f5b6d9211888f53f7afe36b44c185515ccbR169-R220 (for the former build code), and

// Check the `engine_newPayload` response.
let response = match response {
Ok(resp) => resp,
Err(e) => return Err(InsertTaskError::InsertFailed(e)),
};
if !self.check_new_payload_status(&response.status) {
return Err(InsertTaskError::UnexpectedPayloadStatus(response.status));
}
let insert_duration = insert_time_start.elapsed();
(for the counterpart inside the InsertTask)

error!(target: "engine_builder", error = ?e, "Critical: Deposit-only payload import failed");
return Err(BuildTaskError::DepositOnlyPayloadFailed)
}
// HOLOCENE: Re-attempt payload import with deposits only
Err(InsertTaskError::ForkchoiceUpdateFailed(
ForkchoiceTaskError::InvalidPayloadStatus(e),
)) if self
.cfg
.is_holocene_active(self.attributes.inner().payload_attributes.timestamp) =>
Err(InsertTaskError::UnexpectedPayloadStatus(e))
Comment on lines +266 to +273
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error pattern matching is inconsistent with the error type being matched. The code is matching InsertTaskError::UnexpectedPayloadStatus(e) but based on the context and other files, this should likely be matching a different error variant that actually exists in the InsertTaskError enum.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as line 276 - the error pattern matching appears to be using a non-existent error variant UnexpectedPayloadStatus on the InsertTaskError enum.

Copilot uses AI. Check for mistakes.
if self
.cfg
.is_holocene_active(self.attributes.inner().payload_attributes.timestamp) =>
{
warn!(target: "engine_builder", error = ?e, "Re-attempting payload import with deposits only.");
// HOLOCENE: Re-attempt payload import with deposits only
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains error types for the [`crate::ConsolidateTask`].

use crate::{
BuildTaskError, EngineTaskError, ForkchoiceTaskError,
BuildTaskError, EngineTaskError, SynchronizeTaskError,
task_queue::tasks::task::EngineTaskErrorSeverity,
};
use thiserror::Error;
Expand All @@ -20,7 +20,7 @@ pub enum ConsolidateTaskError {
BuildTaskFailed(#[from] BuildTaskError),
/// The consolidation forkchoice update call to the engine api failed.
#[error(transparent)]
ForkchoiceUpdateFailed(#[from] ForkchoiceTaskError),
ForkchoiceUpdateFailed(#[from] SynchronizeTaskError),
}

impl EngineTaskError for ConsolidateTaskError {
Expand Down
Loading