Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ vergen-git2 = "1.0.0"
parking_lot = "0.12.3"
async-trait = "0.1.88"
tokio-stream = "0.1.17"
async-stream = "0.3.6"
async-channel = "2.3.1"
http-body-util = "0.1.3"
unsigned-varint = "0.8.0"
Expand Down
94 changes: 46 additions & 48 deletions crates/node/engine/src/attributes.rs

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/node/engine/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ impl EngineClient {
&self.l1_provider
}

/// Returns a reference to the inner [`RollupConfig`].
pub fn cfg(&self) -> &RollupConfig {
&self.cfg
}

/// Fetches the [`Block<T>`] for the given [`BlockNumberOrTag`].
pub async fn l2_block_by_label(
&self,
Expand Down
4 changes: 2 additions & 2 deletions crates/node/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ extern crate tracing;
mod task_queue;
pub use task_queue::{
BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError,
EngineTask, EngineTaskError, EngineTaskExt, ForkchoiceTask, ForkchoiceTaskError,
InsertUnsafeTask, InsertUnsafeTaskError,
EngineTask, EngineTaskError, EngineTaskExt, FinalizeTask, FinalizeTaskError, ForkchoiceTask,
ForkchoiceTaskError, InsertUnsafeTask, InsertUnsafeTaskError,
};

mod attributes;
Expand Down
3 changes: 3 additions & 0 deletions crates/node/engine/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ impl Metrics {
pub const FORKCHOICE_TASK_LABEL: &str = "forkchoice-update";
/// Build task label.
pub const BUILD_TASK_LABEL: &str = "build";
/// Finalize task label.
pub const FINALIZE_TASK_LABEL: &str = "finalize";

/// Identifier for the histogram that tracks engine method call time.
pub const ENGINE_METHOD_REQUEST_DURATION: &str = "kona_node_engine_method_request_duration";
Expand Down Expand Up @@ -85,6 +87,7 @@ impl Metrics {
kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::CONSOLIDATE_TASK_LABEL, 0);
kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::FORKCHOICE_TASK_LABEL, 0);
kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::BUILD_TASK_LABEL, 0);
kona_macros::set!(counter, Self::ENGINE_TASK_COUNT, Self::FINALIZE_TASK_LABEL, 0);

// Engine reset count
kona_macros::set!(counter, Self::ENGINE_RESET_COUNT, 0);
Expand Down
17 changes: 8 additions & 9 deletions crates/node/engine/src/task_queue/tasks/build/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,31 +66,31 @@ impl BuildTask {
) -> Result<PayloadId, BuildTaskError> {
debug!(
target: "engine_builder",
txs = attributes_envelope.attributes.transactions.as_ref().map_or(0, |txs| txs.len()),
txs = attributes_envelope.inner().transactions.as_ref().map_or(0, |txs| txs.len()),
"Starting new build job"
);

let forkchoice_version = EngineForkchoiceVersion::from_cfg(
&self.cfg,
attributes_envelope.attributes.payload_attributes.timestamp,
attributes_envelope.inner().payload_attributes.timestamp,
);
debug!(target: "engine_builder", ?forkchoice_version, "Forkchoice version");
let update = match forkchoice_version {
EngineForkchoiceVersion::V3 => {
engine_client
.fork_choice_updated_v3(forkchoice, Some(attributes_envelope.attributes))
.fork_choice_updated_v3(forkchoice, Some(attributes_envelope.inner))
.await
}
EngineForkchoiceVersion::V2 => {
engine_client
.fork_choice_updated_v2(forkchoice, Some(attributes_envelope.attributes))
.fork_choice_updated_v2(forkchoice, Some(attributes_envelope.inner))
.await
}
EngineForkchoiceVersion::V1 => {
engine_client
.fork_choice_updated_v1(
forkchoice,
Some(attributes_envelope.attributes.payload_attributes),
Some(attributes_envelope.inner.payload_attributes),
)
.await
}
Expand Down Expand Up @@ -149,7 +149,7 @@ impl BuildTask {
payload_id: PayloadId,
payload_attrs: OpAttributesWithParent,
) -> Result<L2BlockInfo, BuildTaskError> {
let payload_timestamp = payload_attrs.attributes.payload_attributes.timestamp;
let payload_timestamp = payload_attrs.inner().payload_attributes.timestamp;

debug!(
target: "engine_builder",
Expand Down Expand Up @@ -233,16 +233,15 @@ impl BuildTask {

Ok(L2BlockInfo::from_payload_and_genesis(
payload,
payload_attrs.attributes.payload_attributes.parent_beacon_block_root,
payload_attrs.inner().payload_attributes.parent_beacon_block_root,
&cfg.genesis,
)?)
}
PayloadStatusEnum::Invalid { validation_error } => {
if payload_attrs.is_deposits_only() {
error!(target: "engine_builder", "Critical: Deposit-only payload import failed: {validation_error}");
Err(BuildTaskError::DepositOnlyPayloadFailed)
} else if cfg
.is_holocene_active(payload_attrs.attributes.payload_attributes.timestamp)
} else if cfg.is_holocene_active(payload_attrs.inner().payload_attributes.timestamp)
{
warn!(target: "engine_builder", "Payload import failed: {validation_error}");
warn!(target: "engine_builder", "Re-attempting payload import with deposits only.");
Expand Down
36 changes: 36 additions & 0 deletions crates/node/engine/src/task_queue/tasks/finalize/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//! Contains error types for the [crate::FinalizeTask].

use crate::EngineTaskError;
use alloy_transport::{RpcError, TransportErrorKind};
use kona_protocol::FromBlockError;
use thiserror::Error;

/// An error that occurs when running the [crate::FinalizeTask].
#[derive(Debug, Error)]
pub enum FinalizeTaskError {
/// The block is not safe, and therefore cannot be finalized.
#[error("Attempted to finalize a block that is not yet safe")]
BlockNotSafe,
/// The block to finalize was not found.
#[error("The block to finalize was not found: Number {0}")]
BlockNotFound(u64),
/// An error occurred while transforming the RPC block into [`L2BlockInfo`].
///
/// [`L2BlockInfo`]: kona_protocol::L2BlockInfo
#[error(transparent)]
FromBlock(#[from] FromBlockError),
/// A temporary RPC failure.
#[error(transparent)]
TransportError(#[from] RpcError<TransportErrorKind>),
}

impl From<FinalizeTaskError> for EngineTaskError {
fn from(value: FinalizeTaskError) -> Self {
match value {
FinalizeTaskError::BlockNotSafe => Self::Critical(Box::new(value)),
FinalizeTaskError::BlockNotFound(_) => Self::Critical(Box::new(value)),
FinalizeTaskError::FromBlock(_) => Self::Critical(Box::new(value)),
FinalizeTaskError::TransportError(_) => Self::Temporary(Box::new(value)),
}
}
}
7 changes: 7 additions & 0 deletions crates/node/engine/src/task_queue/tasks/finalize/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! Task and its associated types for finalizing an L2 block.

mod task;
pub use task::FinalizeTask;

mod error;
pub use error::FinalizeTaskError;
60 changes: 60 additions & 0 deletions crates/node/engine/src/task_queue/tasks/finalize/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! A task for finalizing an L2 block.

use crate::{
EngineClient, EngineState, EngineTaskError, EngineTaskExt, FinalizeTaskError, ForkchoiceTask,
Metrics,
};
use alloy_provider::Provider;
use async_trait::async_trait;
use kona_protocol::L2BlockInfo;
use std::sync::Arc;

/// The [`FinalizeTask`] fetches the [`L2BlockInfo`] at `block_number`, updates the [`EngineState`],
/// and dispatches a forkchoice update to finalize the block.
#[derive(Debug, Clone)]
pub struct FinalizeTask {
/// The engine client.
pub client: Arc<EngineClient>,
/// The number of the L2 block to finalize.
pub block_number: u64,
}

impl FinalizeTask {
/// Creates a new [`ForkchoiceTask`].
pub const fn new(client: Arc<EngineClient>, block_number: u64) -> Self {
Self { client, block_number }
}
}

#[async_trait]
impl EngineTaskExt for FinalizeTask {
async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> {
// Sanity check that the block that is being finalized is at least safe.
if state.safe_head().block_info.number < self.block_number {
return Err(FinalizeTaskError::BlockNotSafe.into());
}

let block = self
.client
.l2_provider()
.get_block(self.block_number.into())
.full()
.await
.map_err(FinalizeTaskError::TransportError)?
.ok_or(FinalizeTaskError::BlockNotFound(self.block_number))?
.into_consensus();
let block_info = L2BlockInfo::from_block_and_genesis(&block, &self.client.cfg().genesis)
.map_err(FinalizeTaskError::FromBlock)?;

// Update the finalized block in the engine state.
state.set_finalized_head(block_info);

// Dispatch a forkchoice update.
ForkchoiceTask::new(self.client.clone()).execute(state).await?;

// Update metrics.
kona_macros::inc!(counter, Metrics::ENGINE_TASK_COUNT, Metrics::FINALIZE_TASK_LABEL);

Ok(())
}
}
3 changes: 3 additions & 0 deletions crates/node/engine/src/task_queue/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ pub use build::{BuildTask, BuildTaskError};

mod consolidate;
pub use consolidate::{ConsolidateTask, ConsolidateTaskError};

mod finalize;
pub use finalize::{FinalizeTask, FinalizeTaskError};
15 changes: 12 additions & 3 deletions crates/node/engine/src/task_queue/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! [`Engine`]: crate::Engine

use super::{BuildTask, ConsolidateTask, ForkchoiceTask, InsertUnsafeTask};
use super::{BuildTask, ConsolidateTask, FinalizeTask, ForkchoiceTask, InsertUnsafeTask};
use crate::EngineState;
use async_trait::async_trait;
use std::cmp::Ordering;
Expand All @@ -23,6 +23,8 @@ pub enum EngineTask {
/// Performs consolidation on the engine state, reverting to payload attribute processing
/// via the [`BuildTask`] if consolidation fails.
Consolidate(ConsolidateTask),
/// Finalizes an L2 block
Finalize(FinalizeTask),
}

impl EngineTask {
Expand All @@ -33,6 +35,7 @@ impl EngineTask {
Self::InsertUnsafe(task) => task.execute(state).await,
Self::BuildBlock(task) => task.execute(state).await,
Self::Consolidate(task) => task.execute(state).await,
Self::Finalize(task) => task.execute(state).await,
}
}
}
Expand All @@ -44,7 +47,8 @@ impl PartialEq for EngineTask {
(Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) |
(Self::InsertUnsafe(_), Self::InsertUnsafe(_)) |
(Self::BuildBlock(_), Self::BuildBlock(_)) |
(Self::Consolidate(_), Self::Consolidate(_))
(Self::Consolidate(_), Self::Consolidate(_)) |
(Self::Finalize(_), Self::Finalize(_))
)
}
}
Expand Down Expand Up @@ -76,6 +80,7 @@ impl Ord for EngineTask {
(Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal,
(Self::BuildBlock(_), Self::BuildBlock(_)) => Ordering::Equal,
(Self::ForkchoiceUpdate(_), Self::ForkchoiceUpdate(_)) => Ordering::Equal,
(Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal,

// Individual ForkchoiceUpdate tasks are the highest priority
(Self::ForkchoiceUpdate(_), _) => Ordering::Greater,
Expand All @@ -85,9 +90,13 @@ impl Ord for EngineTask {
(Self::BuildBlock(_), _) => Ordering::Greater,
(_, Self::BuildBlock(_)) => Ordering::Less,

// InsertUnsafe tasks are prioritized over Consolidate tasks
// InsertUnsafe tasks are prioritized over Consolidate and Finalize tasks
(Self::InsertUnsafe(_), _) => Ordering::Greater,
(_, Self::InsertUnsafe(_)) => Ordering::Less,

// Consolidate tasks are prioritized over Finalize tasks
(Self::Consolidate(_), _) => Ordering::Greater,
(_, Self::Consolidate(_)) => Ordering::Less,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ tracing.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
async-trait.workspace = true
async-stream.workspace = true
derive_more = { workspace = true, features = ["debug"] }
jsonrpsee = { workspace = true, features = ["server"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ where
///
/// Specs: <https://specs.optimism.io/protocol/derivation.html#l1-sync-payload-attributes-processing>
derivation_signal_rx: UnboundedReceiver<Signal>,

/// The receiver for L1 head update notifications.
l1_head_updates: UnboundedReceiver<BlockInfo>,

/// The sender for derived [`OpAttributesWithParent`]s produced by the actor.
attributes_out: UnboundedSender<OpAttributesWithParent>,
/// The reset request sender, used to handle [`PipelineErrorKind::Reset`] events and forward
Expand Down
Loading
Loading