Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .config/kurtosis_network_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ optimism_package:
count: 1
- el_type: op-reth
cl_type: kona-node
cl_image: kona-node:local
el_log_level: "debug"
cl_log_level: "debug"
count: 1
Expand All @@ -31,7 +32,7 @@ optimism_package:
ethereum_package:
participants:
- el_type: geth
cl_type: nimbus
cl_type: teku
network_params:
preset: minimal
genesis_delay: 5
Expand Down
58 changes: 27 additions & 31 deletions crates/node/engine/src/task_queue/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use kona_genesis::{RollupConfig, SystemConfig};
use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config};
use kona_sources::{SyncStartError, find_starting_forkchoice};
use op_alloy_consensus::OpTxEnvelope;
use std::{collections::BinaryHeap, sync::Arc};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::watch::Sender;

Expand All @@ -32,16 +32,20 @@ pub struct Engine {
/// A sender that can be used to notify the engine actor of state changes.
state_sender: Sender<EngineState>,
/// The task queue.
tasks: BinaryHeap<EngineTask>,
tasks: tokio::sync::mpsc::Receiver<EngineTask>,
}

impl Engine {
/// Creates a new [`Engine`] with an empty task queue and the passed initial [`EngineState`].
///
/// An initial [`EngineTask::ForkchoiceUpdate`] is added to the task queue to synchronize the
/// engine with the forkchoice state of the [`EngineState`].
pub fn new(initial_state: EngineState, state_sender: Sender<EngineState>) -> Self {
Self { state: initial_state, state_sender, tasks: BinaryHeap::default() }
pub const fn new(
initial_state: EngineState,
state_sender: Sender<EngineState>,
task_receiver: tokio::sync::mpsc::Receiver<EngineTask>,
) -> Self {
Self { state: initial_state, state_sender, tasks: task_receiver }
}

/// Returns true if the inner [`EngineState`] is initialized.
Expand All @@ -59,11 +63,6 @@ impl Engine {
self.state_sender.subscribe()
}

/// Enqueues a new [`EngineTask`] for execution.
pub fn enqueue(&mut self, task: EngineTask) {
self.tasks.push(task);
}

/// Resets the engine by finding a plausible sync starting point via
/// [`find_starting_forkchoice`]. The state will be updated to the starting point, and a
/// forkchoice update will be enqueued in order to reorg the execution layer.
Expand All @@ -73,7 +72,7 @@ impl Engine {
config: &RollupConfig,
) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> {
// Clear any outstanding tasks to prepare for the reset.
self.clear();
self.clear().await;

let start =
find_starting_forkchoice(config, client.l1_provider(), client.l2_provider()).await?;
Expand Down Expand Up @@ -114,31 +113,28 @@ impl Engine {
Ok((start.safe, l1_origin_info, system_config))
}

/// Clears the task queue.
pub fn clear(&mut self) {
self.tasks.clear();
/// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
pub async fn receive_tasks(&mut self) -> Result<(), EngineTaskError> {
let Some(task) = self.tasks.recv().await else {
return Err(EngineTaskError::Critical("Task queue closed unexpectedly".into()));
};

task.execute(&mut self.state).await?;

self.state_sender.send_replace(self.state);

Ok(())
}

/// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
/// an error along the way, it is not popped from the queue (in case it must be retried) and
/// the error is returned.
///
/// If an [`EngineTaskError::Reset`] is encountered, the remaining tasks in the queue are
/// cleared.
pub async fn drain(&mut self) -> Result<(), EngineTaskError> {
// Drain tasks in order of priority, halting on errors for a retry to be attempted.
while let Some(task) = self.tasks.peek() {
// Execute the task
task.execute(&mut self.state).await?;

// Update the state and notify the engine actor.
self.state_sender.send_replace(self.state);

// Pop the task from the queue now that it's been executed.
self.tasks.pop();
/// Clears the task queue.
pub async fn clear(&mut self) {
let mut sink = Vec::with_capacity(self.tasks.max_capacity());

if self.tasks.is_empty() {
return;
}

Ok(())
self.tasks.recv_many(&mut sink, self.tasks.max_capacity()).await;
}
}

Expand Down
89 changes: 55 additions & 34 deletions crates/node/service/src/actors/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct EngineActor {
pub client: Arc<EngineClient>,
/// The [`Engine`].
pub engine: Engine,
/// The channel to send the engine tasks to the engine internal task queue.
engine_task_sender: tokio::sync::mpsc::Sender<EngineTask>,
/// The channel to send the l2 safe head to the derivation actor.
engine_l2_safe_head_tx: WatchSender<L2BlockInfo>,
/// Handler for inbound queries to the engine.
Expand Down Expand Up @@ -75,6 +77,7 @@ impl EngineActor {
unsafe_block_rx: UnboundedReceiver<OpNetworkPayloadEnvelope>,
reset_request_rx: UnboundedReceiver<()>,
inbound_queries: Option<Receiver<EngineQueries>>,
engine_task_sender: tokio::sync::mpsc::Sender<EngineTask>,
cancellation: CancellationToken,
) -> Self {
Self {
Expand All @@ -83,6 +86,7 @@ impl EngineActor {
engine,
engine_l2_safe_head_tx,
sync_complete_tx,
engine_task_sender,
derivation_signal_tx,
runtime_config_rx,
attributes_rx,
Expand Down Expand Up @@ -190,6 +194,42 @@ impl NodeActor for EngineActor {

return Ok(());
}
res = self.engine.receive_tasks() => {
match res {
Ok(_) => {
trace!(target: "engine", "[ENGINE] tasks drained");
}
Err(EngineTaskError::Reset(e)) => {
warn!(target: "engine", err = ?e, "Received reset request");
self.reset().await?;
}
Err(EngineTaskError::Flush(e)) => {
// This error is encountered when the payload is marked INVALID
// by the engine api. Post-holocene, the payload is replaced by
// a "deposits-only" block and re-executed. At the same time,
// the channel and any remaining buffered batches are flushed.
warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline.");
match self.derivation_signal_tx.send(Signal::FlushChannel) {
Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"),
Err(e) => {
error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor.");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
}
}
Err(EngineTaskError::Critical(e)) => {
error!(target: "engine", ?e, "Critical engine task error");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
Err(EngineTaskError::Temporary(e)) => {
warn!(target: "engine", ?e, "Temporary engine task error");
}
}

self.maybe_update_safe_head();
}
reset = self.reset_request_rx.recv() => {
let Some(_) = reset else {
error!(target: "engine", "Reset request receiver closed unexpectedly, exiting node");
Expand All @@ -213,7 +253,7 @@ impl NodeActor for EngineActor {
envelope,
);
let task = EngineTask::InsertUnsafe(task);
self.engine.enqueue(task);
self.engine_task_sender.send(task).await?;
debug!(target: "engine", ?hash, "Enqueued unsafe block task.");
self.check_sync().await?;
}
Expand All @@ -230,10 +270,16 @@ impl NodeActor for EngineActor {
true,
);
let task = EngineTask::Consolidate(task);
self.engine.enqueue(task);
self.engine_task_sender.send(task).await?;
debug!(target: "engine", "Enqueued attributes consolidation task.");
}
Some(config) = self.runtime_config_rx.recv() => {
config = self.runtime_config_rx.recv() => {
let Some(config) = config else {
error!(target: "engine", "Runtime config receiver closed unexpectedly, exiting node");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
};

let client = Arc::clone(&self.client);
tokio::task::spawn(async move {
debug!(target: "engine", config = ?config, "Received runtime config");
Expand All @@ -249,35 +295,6 @@ impl NodeActor for EngineActor {
}
});
}
res = self.engine.drain() => {
match res {
Ok(_) => {
trace!(target: "engine", "[ENGINE] tasks drained");
}
Err(EngineTaskError::Reset(e)) => {
warn!(target: "engine", err = ?e, "Received reset request");
self.reset().await?;
}
Err(EngineTaskError::Flush(e)) => {
// This error is encountered when the payload is marked INVALID
// by the engine api. Post-holocene, the payload is replaced by
// a "deposits-only" block and re-executed. At the same time,
// the channel and any remaining buffered batches are flushed.
warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline.");
match self.derivation_signal_tx.send(Signal::FlushChannel) {
Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"),
Err(e) => {
error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor.");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
}
}
Err(e) => warn!(target: "engine", ?e, "Error draining engine tasks"),
}

self.maybe_update_safe_head();
}
}
}
}
Expand All @@ -296,6 +313,9 @@ pub enum EngineError {
/// Engine reset error.
#[error(transparent)]
EngineReset(#[from] EngineResetError),
/// Failed to send engine task.
#[error("Failed to send engine task: {0}")]
SendError(#[from] tokio::sync::mpsc::error::SendError<EngineTask>),
}

/// Configuration for the Engine Actor.
Expand All @@ -316,10 +336,11 @@ pub struct EngineLauncher {
impl EngineLauncher {
/// Launches the [`Engine`]. Returns the [`Engine`] and a channel to receive engine state
/// updates.
pub fn launch(self) -> Engine {
pub fn launch(self) -> (Engine, tokio::sync::mpsc::Sender<EngineTask>) {
let state = EngineState::default();
let (engine_state_send, _) = tokio::sync::watch::channel(state);
Engine::new(state, engine_state_send)
let (engine_task_sender, engine_task_receiver) = tokio::sync::mpsc::channel(1024);
(Engine::new(state, engine_state_send, engine_task_receiver), engine_task_sender)
}

/// Returns the [`EngineClient`].
Expand Down
3 changes: 2 additions & 1 deletion crates/node/service/src/service/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub trait ValidatorNodeService {

let launcher = self.engine();
let client = launcher.client();
let engine = launcher.launch();
let (engine, engine_task_sender) = launcher.launch();

let engine = EngineActor::new(
std::sync::Arc::new(self.config().clone()),
Expand All @@ -162,6 +162,7 @@ pub trait ValidatorNodeService {
unsafe_block_rx,
reset_request_rx,
Some(engine_query_recv),
engine_task_sender,
cancellation.clone(),
);
let engine = Some(engine);
Expand Down
Loading