From b7a7b57bbebcd7e62251633cf7a6e3c727a242a9 Mon Sep 17 00:00:00 2001 From: theochap Date: Fri, 23 May 2025 20:27:42 +0000 Subject: [PATCH 1/2] fix(node/engine): fix engine cpu consumption --- .config/kurtosis_network_params.yaml | 3 +- crates/node/engine/src/task_queue/core.rs | 58 +++++++-------- crates/node/service/src/actors/engine.rs | 89 ++++++++++++++--------- 3 files changed, 84 insertions(+), 66 deletions(-) diff --git a/.config/kurtosis_network_params.yaml b/.config/kurtosis_network_params.yaml index 01aadde3ad..ee87bf3709 100644 --- a/.config/kurtosis_network_params.yaml +++ b/.config/kurtosis_network_params.yaml @@ -13,6 +13,7 @@ optimism_package: count: 1 - el_type: op-reth cl_type: kona-node + cl_image: kona-node:local el_log_level: "debug" cl_log_level: "debug" count: 1 @@ -31,7 +32,7 @@ optimism_package: ethereum_package: participants: - el_type: geth - cl_type: nimbus + cl_type: teku network_params: preset: minimal genesis_delay: 5 diff --git a/crates/node/engine/src/task_queue/core.rs b/crates/node/engine/src/task_queue/core.rs index 3955c1dd4d..e1986f8176 100644 --- a/crates/node/engine/src/task_queue/core.rs +++ b/crates/node/engine/src/task_queue/core.rs @@ -8,7 +8,7 @@ use kona_genesis::{RollupConfig, SystemConfig}; use kona_protocol::{BlockInfo, L2BlockInfo, OpBlockConversionError, to_system_config}; use kona_sources::{SyncStartError, find_starting_forkchoice}; use op_alloy_consensus::OpTxEnvelope; -use std::{collections::BinaryHeap, sync::Arc}; +use std::sync::Arc; use thiserror::Error; use tokio::sync::watch::Sender; @@ -32,7 +32,7 @@ pub struct Engine { /// A sender that can be used to notify the engine actor of state changes. state_sender: Sender, /// The task queue. - tasks: BinaryHeap, + tasks: tokio::sync::mpsc::Receiver, } impl Engine { @@ -40,8 +40,12 @@ impl Engine { /// /// An initial [`EngineTask::ForkchoiceUpdate`] is added to the task queue to synchronize the /// engine with the forkchoice state of the [`EngineState`]. - pub fn new(initial_state: EngineState, state_sender: Sender) -> Self { - Self { state: initial_state, state_sender, tasks: BinaryHeap::default() } + pub const fn new( + initial_state: EngineState, + state_sender: Sender, + task_receiver: tokio::sync::mpsc::Receiver, + ) -> Self { + Self { state: initial_state, state_sender, tasks: task_receiver } } /// Returns true if the inner [`EngineState`] is initialized. @@ -59,11 +63,6 @@ impl Engine { self.state_sender.subscribe() } - /// Enqueues a new [`EngineTask`] for execution. - pub fn enqueue(&mut self, task: EngineTask) { - self.tasks.push(task); - } - /// Resets the engine by finding a plausible sync starting point via /// [`find_starting_forkchoice`]. The state will be updated to the starting point, and a /// forkchoice update will be enqueued in order to reorg the execution layer. @@ -73,7 +72,7 @@ impl Engine { config: &RollupConfig, ) -> Result<(L2BlockInfo, BlockInfo, SystemConfig), EngineResetError> { // Clear any outstanding tasks to prepare for the reset. - self.clear(); + self.clear().await; let start = find_starting_forkchoice(config, client.l1_provider(), client.l2_provider()).await?; @@ -114,31 +113,28 @@ impl Engine { Ok((start.safe, l1_origin_info, system_config)) } - /// Clears the task queue. - pub fn clear(&mut self) { - self.tasks.clear(); + /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns + pub async fn receive_tasks(&mut self) -> Result<(), EngineTaskError> { + let Some(task) = self.tasks.recv().await else { + return Err(EngineTaskError::Critical("Task queue closed unexpectedly".into())); + }; + + task.execute(&mut self.state).await?; + + self.state_sender.send_replace(self.state); + + Ok(()) } - /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns - /// an error along the way, it is not popped from the queue (in case it must be retried) and - /// the error is returned. - /// - /// If an [`EngineTaskError::Reset`] is encountered, the remaining tasks in the queue are - /// cleared. - pub async fn drain(&mut self) -> Result<(), EngineTaskError> { - // Drain tasks in order of priority, halting on errors for a retry to be attempted. - while let Some(task) = self.tasks.peek() { - // Execute the task - task.execute(&mut self.state).await?; - - // Update the state and notify the engine actor. - self.state_sender.send_replace(self.state); - - // Pop the task from the queue now that it's been executed. - self.tasks.pop(); + /// Clears the task queue. + pub async fn clear(&mut self) { + let mut sink = Vec::with_capacity(self.tasks.max_capacity()); + + if self.tasks.is_empty() { + return; } - Ok(()) + self.tasks.recv_many(&mut sink, self.tasks.max_capacity()).await; } } diff --git a/crates/node/service/src/actors/engine.rs b/crates/node/service/src/actors/engine.rs index 57668387b0..552ced01e8 100644 --- a/crates/node/service/src/actors/engine.rs +++ b/crates/node/service/src/actors/engine.rs @@ -38,6 +38,8 @@ pub struct EngineActor { pub client: Arc, /// The [`Engine`]. pub engine: Engine, + /// The channel to send the engine tasks to the engine internal task queue. + engine_task_sender: tokio::sync::mpsc::Sender, /// The channel to send the l2 safe head to the derivation actor. engine_l2_safe_head_tx: WatchSender, /// Handler for inbound queries to the engine. @@ -75,6 +77,7 @@ impl EngineActor { unsafe_block_rx: UnboundedReceiver, reset_request_rx: UnboundedReceiver<()>, inbound_queries: Option>, + engine_task_sender: tokio::sync::mpsc::Sender, cancellation: CancellationToken, ) -> Self { Self { @@ -83,6 +86,7 @@ impl EngineActor { engine, engine_l2_safe_head_tx, sync_complete_tx, + engine_task_sender, derivation_signal_tx, runtime_config_rx, attributes_rx, @@ -190,6 +194,42 @@ impl NodeActor for EngineActor { return Ok(()); } + res = self.engine.receive_tasks() => { + match res { + Ok(_) => { + trace!(target: "engine", "[ENGINE] tasks drained"); + } + Err(EngineTaskError::Reset(e)) => { + warn!(target: "engine", err = ?e, "Received reset request"); + self.reset().await?; + } + Err(EngineTaskError::Flush(e)) => { + // This error is encountered when the payload is marked INVALID + // by the engine api. Post-holocene, the payload is replaced by + // a "deposits-only" block and re-executed. At the same time, + // the channel and any remaining buffered batches are flushed. + warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline."); + match self.derivation_signal_tx.send(Signal::FlushChannel) { + Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"), + Err(e) => { + error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor."); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + } + } + } + Err(EngineTaskError::Critical(e)) => { + error!(target: "engine", ?e, "Critical engine task error"); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + } + Err(EngineTaskError::Temporary(e)) => { + warn!(target: "engine", ?e, "Temporary engine task error"); + } + } + + self.maybe_update_safe_head(); + } reset = self.reset_request_rx.recv() => { let Some(_) = reset else { error!(target: "engine", "Reset request receiver closed unexpectedly, exiting node"); @@ -213,7 +253,7 @@ impl NodeActor for EngineActor { envelope, ); let task = EngineTask::InsertUnsafe(task); - self.engine.enqueue(task); + self.engine_task_sender.send(task).await?; debug!(target: "engine", ?hash, "Enqueued unsafe block task."); self.check_sync().await?; } @@ -230,10 +270,16 @@ impl NodeActor for EngineActor { true, ); let task = EngineTask::Consolidate(task); - self.engine.enqueue(task); + self.engine_task_sender.send(task).await?; debug!(target: "engine", "Enqueued attributes consolidation task."); } - Some(config) = self.runtime_config_rx.recv() => { + config = self.runtime_config_rx.recv() => { + let Some(config) = config else { + error!(target: "engine", "Runtime config receiver closed unexpectedly, exiting node"); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + }; + let client = Arc::clone(&self.client); tokio::task::spawn(async move { debug!(target: "engine", config = ?config, "Received runtime config"); @@ -249,35 +295,6 @@ impl NodeActor for EngineActor { } }); } - res = self.engine.drain() => { - match res { - Ok(_) => { - trace!(target: "engine", "[ENGINE] tasks drained"); - } - Err(EngineTaskError::Reset(e)) => { - warn!(target: "engine", err = ?e, "Received reset request"); - self.reset().await?; - } - Err(EngineTaskError::Flush(e)) => { - // This error is encountered when the payload is marked INVALID - // by the engine api. Post-holocene, the payload is replaced by - // a "deposits-only" block and re-executed. At the same time, - // the channel and any remaining buffered batches are flushed. - warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline."); - match self.derivation_signal_tx.send(Signal::FlushChannel) { - Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"), - Err(e) => { - error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor."); - self.cancellation.cancel(); - return Err(EngineError::ChannelClosed); - } - } - } - Err(e) => warn!(target: "engine", ?e, "Error draining engine tasks"), - } - - self.maybe_update_safe_head(); - } } } } @@ -296,6 +313,9 @@ pub enum EngineError { /// Engine reset error. #[error(transparent)] EngineReset(#[from] EngineResetError), + /// Failed to send engine task. + #[error("Failed to send engine task: {0}")] + SendError(#[from] tokio::sync::mpsc::error::SendError), } /// Configuration for the Engine Actor. @@ -316,10 +336,11 @@ pub struct EngineLauncher { impl EngineLauncher { /// Launches the [`Engine`]. Returns the [`Engine`] and a channel to receive engine state /// updates. - pub fn launch(self) -> Engine { + pub fn launch(self) -> (Engine, tokio::sync::mpsc::Sender) { let state = EngineState::default(); let (engine_state_send, _) = tokio::sync::watch::channel(state); - Engine::new(state, engine_state_send) + let (engine_task_sender, engine_task_receiver) = tokio::sync::mpsc::channel(1024); + (Engine::new(state, engine_state_send, engine_task_receiver), engine_task_sender) } /// Returns the [`EngineClient`]. From 2d31a78fcbe388945a56617812af4506d29d4a55 Mon Sep 17 00:00:00 2001 From: theochap Date: Fri, 23 May 2025 20:50:35 +0000 Subject: [PATCH 2/2] small fix --- crates/node/service/src/service/validator.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/node/service/src/service/validator.rs b/crates/node/service/src/service/validator.rs index 9cfce252fd..e68af7fe04 100644 --- a/crates/node/service/src/service/validator.rs +++ b/crates/node/service/src/service/validator.rs @@ -148,7 +148,7 @@ pub trait ValidatorNodeService { let launcher = self.engine(); let client = launcher.client(); - let engine = launcher.launch(); + let (engine, engine_task_sender) = launcher.launch(); let engine = EngineActor::new( std::sync::Arc::new(self.config().clone()), @@ -162,6 +162,7 @@ pub trait ValidatorNodeService { unsafe_block_rx, reset_request_rx, Some(engine_query_recv), + engine_task_sender, cancellation.clone(), ); let engine = Some(engine);