Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/node/src/commands/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl NodeCommand {
let engine_client = kona_engine::EngineClient::new_http(
self.l2_engine_rpc.clone(),
self.l2_provider_rpc.clone(),
self.l1_eth_rpc.clone(),
Arc::new(config.clone()),
jwt_secret,
);
Expand Down
1 change: 1 addition & 0 deletions crates/node/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
kona-genesis.workspace = true
kona-macros.workspace = true
kona-protocol = {workspace = true, features = ["serde", "std"]}
kona-sources.workspace = true

# alloy
alloy-eips.workspace = true
Expand Down
36 changes: 25 additions & 11 deletions crates/node/engine/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy_rpc_types_engine::{
ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2,
ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus,
};
use alloy_rpc_types_eth::{Block, SyncStatus};
use alloy_rpc_types_eth::Block;
use alloy_transport::{RpcError, TransportErrorKind, TransportResult};
use alloy_transport_http::{
AuthLayer, AuthService, Http, HyperClient,
Expand Down Expand Up @@ -56,7 +56,9 @@ pub struct EngineClient {
#[deref]
engine: RootProvider<AnyNetwork>,
/// The L2 chain provider.
rpc: RootProvider<Optimism>,
l2_provider: RootProvider<Optimism>,
/// The L1 chain provider.
l1_provider: RootProvider,
/// The [RollupConfig] for the chain used to timestamp which version of the engine api to use.
cfg: Arc<RollupConfig>,
}
Expand All @@ -75,33 +77,45 @@ impl EngineClient {
}

/// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret].
pub fn new_http(engine: Url, rpc: Url, cfg: Arc<RollupConfig>, jwt: JwtSecret) -> Self {
pub fn new_http(
engine: Url,
l2_rpc: Url,
l1_rpc: Url,
cfg: Arc<RollupConfig>,
jwt: JwtSecret,
) -> Self {
let engine = Self::rpc_client::<AnyNetwork>(engine, jwt);
let rpc = Self::rpc_client::<Optimism>(rpc, jwt);
let l2_provider = RootProvider::<Optimism>::new_http(l2_rpc);
let l1_provider = RootProvider::new_http(l1_rpc);

Self { engine, rpc, cfg }
Self { engine, l2_provider, l1_provider, cfg }
}

/// Returns the [`SyncStatus`] of the engine.
pub async fn syncing(&self) -> Result<SyncStatus, EngineClientError> {
let status = <RootProvider<AnyNetwork>>::syncing(&self.engine).await?;
Ok(status)
/// Returns a reference to the inner L2 [`RootProvider`].
pub const fn l2_provider(&self) -> &RootProvider<Optimism> {
&self.l2_provider
}

/// Returns a reference to the inner L1 [`RootProvider`].
pub const fn l1_provider(&self) -> &RootProvider {
&self.l1_provider
}

/// Fetches the [`Block<T>`] for the given [`BlockNumberOrTag`].
pub async fn l2_block_by_label(
&self,
numtag: BlockNumberOrTag,
) -> Result<Option<Block<Transaction>>, EngineClientError> {
Ok(<RootProvider<Optimism>>::get_block_by_number(&self.rpc, numtag).full().await?)
Ok(<RootProvider<Optimism>>::get_block_by_number(&self.l2_provider, numtag).full().await?)
}

/// Fetches the [L2BlockInfo] by [BlockNumberOrTag].
pub async fn l2_block_info_by_label(
&self,
numtag: BlockNumberOrTag,
) -> Result<Option<L2BlockInfo>, EngineClientError> {
let block = <RootProvider<Optimism>>::get_block_by_number(&self.rpc, numtag).full().await?;
let block =
<RootProvider<Optimism>>::get_block_by_number(&self.l2_provider, numtag).full().await?;
let Some(block) = block else {
return Ok(None);
};
Expand Down
6 changes: 3 additions & 3 deletions crates/node/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ extern crate tracing;

mod task_queue;
pub use task_queue::{
BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineTask,
EngineTaskError, EngineTaskExt, EngineTaskType, ForkchoiceTask, ForkchoiceTaskError,
InsertUnsafeTask, InsertUnsafeTaskError, init_unknowns,
BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineResetError,
EngineTask, EngineTaskError, EngineTaskExt, EngineTaskType, ForkchoiceTask,
ForkchoiceTaskError, InsertUnsafeTask, InsertUnsafeTaskError,
};

mod attributes;
Expand Down
88 changes: 78 additions & 10 deletions crates/node/engine/src/task_queue/core.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
//! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s.

use super::{EngineTaskError, EngineTaskExt};
use crate::{EngineState, EngineTask, EngineTaskType};
use std::collections::{HashMap, VecDeque};
use crate::{EngineClient, EngineState, EngineTask, EngineTaskType, ForkchoiceTask};
use alloy_provider::Provider;
use alloy_rpc_types_eth::Transaction;
use kona_genesis::{RollupConfig, SystemConfig};
use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config};
use kona_sources::{SyncStartError, find_starting_forkchoice};
use op_alloy_consensus::OpTxEnvelope;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use thiserror::Error;
use tokio::sync::watch::Sender;

/// The [`Engine`] task queue.
Expand Down Expand Up @@ -40,14 +50,63 @@ impl Engine {
}
}

/// Returns a reference to the inner [`EngineState`].
pub const fn state(&self) -> &EngineState {
&self.state
}

/// Returns a receiver that can be used to listen to engine state updates.
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<EngineState> {
self.state_sender.subscribe()
}

/// Enqueues a new [`EngineTask`] for execution.
pub fn enqueue(&mut self, task: EngineTask) {
self.tasks.entry(task.ty()).or_default().push_back(task);
}

/// Returns a reference to the inner [`EngineState`].
pub const fn state(&self) -> &EngineState {
&self.state
/// Resets the engine by finding a plausible sync starting point via
/// [`find_starting_forkchoice`]. The state will be updated to the starting point, and a
/// forkchoice update will be enqueued in order to reorg the execution layer.
pub async fn reset(
&mut self,
client: Arc<EngineClient>,
config: &RollupConfig,
) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> {
let start =
find_starting_forkchoice(config, client.l1_provider(), client.l2_provider()).await?;
Comment thread
clabby marked this conversation as resolved.

self.state.set_unsafe_head(start.un_safe);
self.state.set_cross_unsafe_head(start.un_safe);
self.state.set_local_safe_head(start.safe);
self.state.set_safe_head(start.safe);
self.state.set_finalized_head(start.finalized);

self.enqueue(EngineTask::ForkchoiceUpdate(ForkchoiceTask::new(client.clone())));

let origin_block =
start.safe.l1_origin.number - config.channel_timeout(start.safe.block_info.timestamp);
let l1_origin_info: BlockInfo = client
.l1_provider()
.get_block(origin_block.into())
.await
.map_err(SyncStartError::RpcError)?
.ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
.into_consensus()
.into();

let l2_safe_block = client
.l2_provider()
.get_block(start.safe.block_info.hash.into())
.full()
.await
.map_err(SyncStartError::RpcError)?
.ok_or(SyncStartError::BlockNotFound(origin_block.into()))?
.into_consensus()
.map_transactions(|t| <Transaction<OpTxEnvelope> as Clone>::clone(&t).into_inner());
Comment thread
clabby marked this conversation as resolved.
let system_config = to_system_config(&l2_safe_block, config)?;

Ok((start.safe, l1_origin_info, system_config))
}

/// Clears the task queue.
Expand All @@ -69,11 +128,6 @@ impl Engine {
ty
}

/// Returns a receiver that can be used to listen to engine state updates.
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<EngineState> {
self.state_sender.subscribe()
}

/// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
/// an error along the way, it is not popped from the queue (in case it must be retried) and
/// the error is returned.
Expand Down Expand Up @@ -109,3 +163,17 @@ impl Engine {
}
}
}

/// An error occurred while attempting to reset the [`Engine`].
#[derive(Debug, Error)]
pub enum EngineResetError {
/// An error that originated from within the engine task.
#[error(transparent)]
Task(#[from] EngineTaskError),
/// An error occurred while traversing the L1 for the sync starting point.
#[error(transparent)]
SyncStart(#[from] SyncStartError),
/// An error occurred while constructing the SystemConfig for the new safe head.
#[error(transparent)]
SystemConfigConversion(#[from] OpBlockConversionError),
}
2 changes: 1 addition & 1 deletion crates/node/engine/src/task_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The [`Engine`] task queue and the [`EngineTask`]s it can execute.

mod core;
pub use core::Engine;
pub use core::{Engine, EngineResetError};

mod tasks;
pub use tasks::*;
17 changes: 14 additions & 3 deletions crates/node/engine/src/task_queue/tasks/forkchoice/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use crate::{
EngineClient, EngineState, EngineTaskError, EngineTaskExt, ForkchoiceTaskError, Metrics,
SyncStatus,
};
use alloy_rpc_types_engine::INVALID_FORK_CHOICE_STATE_ERROR;
use async_trait::async_trait;
Expand Down Expand Up @@ -31,8 +32,8 @@ impl EngineTaskExt for ForkchoiceTask {
return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded.into());
}

// If the engine is syncing, log a warning. We can still attempt to apply the forkchoice
// update.
// If the engine is syncing, log a warning. We can still attempt to apply the
// forkchoice update.
if state.sync_status.is_syncing() {
warn!(target: "engine", "Attempting to update forkchoice state while EL syncing");
}
Expand All @@ -47,7 +48,17 @@ impl EngineTaskExt for ForkchoiceTask {
}

// Send the forkchoice update through the input.
let forkchoice = state.create_forkchoice_state();
let mut forkchoice = state.create_forkchoice_state();

// For the first FCU triggered by the initial engine reset, zero out the `safe` and
// `finalized` hashes to trigger optimistic pipeline sync on `op-reth`.
if matches!(state.sync_status, SyncStatus::ExecutionLayerWillStart) {
// Progress the engine sync state machine.
state.sync_status = SyncStatus::ExecutionLayerStarted;

forkchoice.safe_block_hash = Default::default();
forkchoice.finalized_block_hash = Default::default();
}

// Handle the forkchoice update result.
if let Err(e) = self.client.fork_choice_updated_v3(forkchoice, None).await {
Expand Down
3 changes: 0 additions & 3 deletions crates/node/engine/src/task_queue/tasks/insert/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,6 @@ impl EngineTaskExt for InsertUnsafeTask {
state.sync_status = SyncStatus::ExecutionLayerFinished;
}

// Initialize unknowns if needed.
crate::init_unknowns(state, self.client.clone()).await;

info!(
target: "engine",
hash = new_unsafe_ref.block_info.hash.to_string(),
Expand Down
3 changes: 0 additions & 3 deletions crates/node/engine/src/task_queue/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Tasks to update the engine state.

mod unknowns;
pub use unknowns::init_unknowns;

mod task;
pub use task::{EngineTask, EngineTaskError, EngineTaskExt, EngineTaskType};

Expand Down
8 changes: 4 additions & 4 deletions crates/node/engine/src/task_queue/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ pub trait EngineTaskExt {
pub enum EngineTaskError {
/// A temporary error within the engine.
#[error("Temporary engine task error: {0}")]
Temporary(Box<dyn std::error::Error>),
Temporary(Box<dyn std::error::Error + Send + Sync>),
/// A critical error within the engine.
#[error("Critical engine task error: {0}")]
Critical(Box<dyn std::error::Error>),
Critical(Box<dyn std::error::Error + Send + Sync>),
/// An error that requires a derivation pipeline reset.
#[error("Derivation pipeline reset required: {0}")]
Reset(Box<dyn std::error::Error>),
Reset(Box<dyn std::error::Error + Send + Sync>),
/// An error that requires the derivation pipeline to be flushed.
#[error("Derivation pipeline flush required: {0}")]
Flush(Box<dyn std::error::Error>),
Flush(Box<dyn std::error::Error + Send + Sync>),
}
67 changes: 0 additions & 67 deletions crates/node/engine/src/task_queue/tasks/unknowns.rs

This file was deleted.

Loading
Loading