Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .config/kurtosis_network_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
optimism_package:
chains:
- participants:
- el_type: op-reth
- el_type: op-geth
cl_type: op-node
cl_log_level: "debug"
el_log_level: "debug"
count: 4
- el_type: op-reth
cl_type: kona-node
cl_log_level: "info"
count: 1
cl_log_level: "debug"
el_log_level: "debug"
count: 4
network_params:
network: "kurtosis"
network_id: "2151908"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/node/src/commands/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Node Subcommand.

use alloy_provider::RootProvider;
use alloy_rpc_types_engine::JwtSecret;
use anyhow::{Result, bail};
use backon::{ExponentialBuilder, Retryable};
Expand Down Expand Up @@ -107,8 +108,12 @@ impl NodeCommand {
/// Since the engine client will fail if the jwt token is invalid, this allows to ensure
/// that the jwt token passed as a cli arg is correct.
pub async fn validate_jwt(&self, config: &RollupConfig) -> anyhow::Result<JwtSecret> {
let l1_rpc_url = self.l1_eth_rpc.clone();
let l1_provider = RootProvider::new_http(l1_rpc_url.clone());

let jwt_secret = self.jwt_secret().ok_or(anyhow::anyhow!("Invalid JWT secret"))?;
let engine_client = kona_engine::EngineClient::new_http(
l1_provider,
self.l2_engine_rpc.clone(),
self.l2_provider_rpc.clone(),
Arc::new(config.clone()),
Expand Down
2 changes: 2 additions & 0 deletions crates/node/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ workspace = true
kona-genesis.workspace = true
kona-macros.workspace = true
kona-protocol = {workspace = true, features = ["serde", "std"]}
kona-sources.workspace = true
kona-providers-alloy.workspace = true

# alloy
alloy-eips.workspace = true
Expand Down
30 changes: 27 additions & 3 deletions crates/node/engine/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! An Engine API Client.

use alloy_eips::eip1898::BlockNumberOrTag;
use alloy_network::{AnyNetwork, Network};
use alloy_network::{AnyNetwork, Ethereum, Network};
use alloy_primitives::{B256, BlockHash, Bytes};
use alloy_provider::{Provider, RootProvider};
use alloy_rpc_client::RpcClient;
Expand All @@ -20,6 +20,7 @@ use alloy_transport_http::{
};
use derive_more::Deref;
use http_body_util::Full;
use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider};
use op_alloy_network::Optimism;
use op_alloy_provider::ext::engine::OpEngineApi;
use op_alloy_rpc_types::Transaction;
Expand Down Expand Up @@ -59,9 +60,26 @@ pub struct EngineClient {
rpc: RootProvider<Optimism>,
/// The [RollupConfig] for the chain used to timestamp which version of the engine api to use.
cfg: Arc<RollupConfig>,
/// The L1 provider.
l1_provider: RootProvider<Ethereum>,
}

impl EngineClient {
const ALLOY_PROVIDERS_CACHE_SIZE: usize = 1024;

/// Returns alloy chain providers.
pub fn alloy_providers(&self) -> (AlloyChainProvider, AlloyL2ChainProvider) {
let l1_provider =
AlloyChainProvider::new(self.l1_provider.clone(), Self::ALLOY_PROVIDERS_CACHE_SIZE);
let l2_provider = AlloyL2ChainProvider::new(
self.rpc.clone(),
self.cfg.clone(),
Self::ALLOY_PROVIDERS_CACHE_SIZE,
);

(l1_provider, l2_provider)
}

/// Creates a new RPC client for the given address and JWT secret.
fn rpc_client<T: Network>(addr: Url, jwt: JwtSecret) -> RootProvider<T> {
let hyper_client = Client::builder(TokioExecutor::new()).build_http::<Full<Bytes>>();
Expand All @@ -75,11 +93,17 @@ impl EngineClient {
}

/// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret].
pub fn new_http(engine: Url, rpc: Url, cfg: Arc<RollupConfig>, jwt: JwtSecret) -> Self {
pub fn new_http(
l1_provider: RootProvider<Ethereum>,
engine: Url,
rpc: Url,
cfg: Arc<RollupConfig>,
jwt: JwtSecret,
) -> Self {
let engine = Self::rpc_client::<AnyNetwork>(engine, jwt);
let rpc = Self::rpc_client::<Optimism>(rpc, jwt);

Self { engine, rpc, cfg }
Self { l1_provider, engine, rpc, cfg }
}

/// Returns the [`SyncStatus`] of the engine.
Expand Down
45 changes: 40 additions & 5 deletions crates/node/engine/src/task_queue/core.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
//! The [`Engine`] is a task queue that receives and executes [`EngineTask`]s.

use super::{EngineTaskError, EngineTaskExt};
use crate::{EngineState, EngineTask, EngineTaskType};
use super::{EngineTaskError, EngineTaskExt, ForkchoiceTask};
use crate::{EngineClient, EngineState, EngineTask, EngineTaskType};
use kona_genesis::RollupConfig;
use kona_protocol::L2BlockInfo;
use std::collections::{HashMap, VecDeque};
use kona_sources::find_starting_forkchoice;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use tokio::sync::watch::Sender;

/// The [`Engine`] task queue.
Expand Down Expand Up @@ -46,6 +51,35 @@ impl Engine {
self.tasks.entry(task.ty()).or_default().push_back(task);
}

/// Resets the engine state.
pub async fn reset(
&mut self,
client: Arc<EngineClient>,
config: Arc<RollupConfig>,
) -> Result<(), EngineTaskError> {
let (mut l1_provider, mut l2_provider) = client.alloy_providers();
let forkchoice = find_starting_forkchoice(&config, &mut l1_provider, &mut l2_provider)
.await
.map_err(|e| EngineTaskError::Critical(Box::new(e)))?;

self.state.set_finalized_head(forkchoice.finalized);
self.state.set_safe_head(forkchoice.safe);
self.state.set_unsafe_head(forkchoice.un_safe);
// If the cross unsafe head is not set, set it to the safe head.
self.state.set_cross_unsafe_head(self.state.safe_head);
// If the local safe head is not set, set it to the safe head.
self.state.set_local_safe_head(self.state.safe_head);

self.state.forkchoice_update_needed = true;
Copy link
Contributor

@refcell refcell May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed - setting the safe head via the set_safe_head method above will also set the forkchoice_update_needed = true


debug!(target: "engine", unsafe = ?self.state.unsafe_head(), safe = ?self.state.safe_head(), finalized = ?self.state.finalized_head(),
"Resetted engine state. Sending FCU");

self.enqueue(EngineTask::ForkchoiceUpdate(ForkchoiceTask::new(client)));

Ok(())
}

/// Returns the L2 Safe Head [`L2BlockInfo`] from the state.
pub const fn safe_head(&self) -> L2BlockInfo {
self.state.safe_head()
Expand Down Expand Up @@ -91,10 +125,12 @@ impl Engine {
let Some(task) = task.front() else {
return Ok(());
};
let ty = task.ty();

match task.execute(&mut self.state).await {
Ok(_) => {}
Err(EngineTaskError::Reset(e)) => {
self.clear();
// The engine actor should trigger a reset by calling [`Engine::reset`].
return Err(EngineTaskError::Reset(e));
}
e => return e,
Expand All @@ -103,7 +139,6 @@ impl Engine {
// Update the state and notify the engine actor.
self.state_sender.send_replace(self.state);

let ty = task.ty();
if let Some(queue) = self.tasks.get_mut(&ty) {
queue.pop_front();
};
Expand Down
52 changes: 39 additions & 13 deletions crates/node/engine/src/task_queue/tasks/forkchoice/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use crate::{
EngineClient, EngineState, EngineTaskError, EngineTaskExt, ForkchoiceTaskError, Metrics,
SyncStatus,
};
use alloy_rpc_types_engine::INVALID_FORK_CHOICE_STATE_ERROR;
use alloy_rpc_types_engine::{ForkchoiceState, INVALID_FORK_CHOICE_STATE_ERROR};
use async_trait::async_trait;
use op_alloy_provider::ext::engine::OpEngineApi;
use std::sync::Arc;
Expand All @@ -21,11 +22,47 @@ impl ForkchoiceTask {
pub const fn new(client: Arc<EngineClient>) -> Self {
Self { client }
}

async fn send_fcu(&self, forkchoice: ForkchoiceState) -> Result<(), EngineTaskError> {
// Handle the forkchoice update result.
if let Err(e) = self.client.fork_choice_updated_v3(forkchoice, None).await {
let e = e
.as_error_resp()
.and_then(|e| {
(e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64)
.then_some(ForkchoiceTaskError::InvalidForkchoiceState)
})
.unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e));

return Err(e.into());
}

Ok(())
}
}

#[async_trait]
impl EngineTaskExt for ForkchoiceTask {
async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> {
// The very first FCU. We should:
// - Send a FCU with empty hashes for the safe and finalized blocks, this kickstarts the EL.
// - Reset the engine state.
if state.sync_status == SyncStatus::ExecutionLayerWillStart {
let mut forkchoice = state.create_forkchoice_state();
forkchoice.safe_block_hash = Default::default();
forkchoice.finalized_block_hash = Default::default();

info!(target: "engine", "Starting execution layer sync");
state.sync_status = SyncStatus::ExecutionLayerStarted;

self.send_fcu(forkchoice).await?;

// Reset the engine state.
return Err(EngineTaskError::Reset(Box::new(
ForkchoiceTaskError::InvalidForkchoiceState,
)));
}

// Check if a forkchoice update is not needed, return early.
if !state.forkchoice_update_needed {
return Err(ForkchoiceTaskError::NoForkchoiceUpdateNeeded.into());
Expand All @@ -49,18 +86,7 @@ impl EngineTaskExt for ForkchoiceTask {
// Send the forkchoice update through the input.
let forkchoice = state.create_forkchoice_state();

// Handle the forkchoice update result.
if let Err(e) = self.client.fork_choice_updated_v3(forkchoice, None).await {
let e = e
.as_error_resp()
.and_then(|e| {
(e.code == INVALID_FORK_CHOICE_STATE_ERROR as i64)
.then_some(ForkchoiceTaskError::InvalidForkchoiceState)
})
.unwrap_or_else(|| ForkchoiceTaskError::ForkchoiceUpdateFailed(e));

return Err(e.into());
}
self.send_fcu(forkchoice).await?;

state.forkchoice_update_needed = false;

Expand Down
9 changes: 0 additions & 9 deletions crates/node/engine/src/task_queue/tasks/insert/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ impl InsertUnsafeTask {
#[async_trait]
impl EngineTaskExt for InsertUnsafeTask {
async fn execute(&self, state: &mut EngineState) -> Result<(), EngineTaskError> {
// Always transition to EL sync on startup.
if state.sync_status == SyncStatus::ExecutionLayerWillStart {
info!(target: "engine", "Starting execution layer sync");
state.sync_status = SyncStatus::ExecutionLayerStarted;
}

let time_start = Instant::now();

// Insert the new payload.
Expand Down Expand Up @@ -188,9 +182,6 @@ impl EngineTaskExt for InsertUnsafeTask {
state.sync_status = SyncStatus::ExecutionLayerFinished;
}

// Initialize unknowns if needed.
crate::init_unknowns(state, self.client.clone()).await;

info!(
target: "engine",
hash = new_unsafe_ref.block_info.hash.to_string(),
Expand Down
9 changes: 4 additions & 5 deletions crates/node/engine/src/task_queue/tasks/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ impl EngineTaskExt for EngineTask {
return Err(EngineTaskError::Critical(e));
}
EngineTaskError::Reset(e) => {
warn!(target: "engine", "Engine requested derivation reset");
return Err(EngineTaskError::Reset(e));
}
EngineTaskError::Flush(e) => {
Expand All @@ -115,14 +114,14 @@ pub trait EngineTaskExt {
pub enum EngineTaskError {
/// A temporary error within the engine.
#[error("Temporary engine task error: {0}")]
Temporary(Box<dyn std::error::Error>),
Temporary(Box<dyn std::error::Error + Send + Sync>),
/// A critical error within the engine.
#[error("Critical engine task error: {0}")]
Critical(Box<dyn std::error::Error>),
Critical(Box<dyn std::error::Error + Send + Sync>),
/// An error that requires a derivation pipeline reset.
#[error("Derivation pipeline reset required: {0}")]
Reset(Box<dyn std::error::Error>),
Reset(Box<dyn std::error::Error + Send + Sync>),
/// An error that requires the derivation pipeline to be flushed.
#[error("Derivation pipeline flush required: {0}")]
Flush(Box<dyn std::error::Error>),
Flush(Box<dyn std::error::Error + Send + Sync>),
}
1 change: 1 addition & 0 deletions crates/node/rpc/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ mod tests {
let mut launcher = RpcLauncher::new();
launcher = launcher.set_addr(SocketAddr::from(([127, 0, 0, 1], 8080)));
let result = launcher.launch().await;
println!("{result:?}");
assert!(result.is_ok());
}

Expand Down
Loading