diff --git a/crates/node/engine/src/task_queue/core.rs b/crates/node/engine/src/task_queue/core.rs index 3955c1dd4d..41431583e4 100644 --- a/crates/node/engine/src/task_queue/core.rs +++ b/crates/node/engine/src/task_queue/core.rs @@ -44,11 +44,6 @@ impl Engine { Self { state: initial_state, state_sender, tasks: BinaryHeap::default() } } - /// Returns true if the inner [`EngineState`] is initialized. - pub fn is_state_initialized(&self) -> bool { - self.state != EngineState::default() - } - /// Returns a reference to the inner [`EngineState`]. pub const fn state(&self) -> &EngineState { &self.state @@ -122,9 +117,6 @@ impl Engine { /// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns /// an error along the way, it is not popped from the queue (in case it must be retried) and /// the error is returned. - /// - /// If an [`EngineTaskError::Reset`] is encountered, the remaining tasks in the queue are - /// cleared. pub async fn drain(&mut self) -> Result<(), EngineTaskError> { // Drain tasks in order of priority, halting on errors for a retry to be attempted. while let Some(task) = self.tasks.peek() { diff --git a/crates/node/service/src/actors/engine.rs b/crates/node/service/src/actors/engine.rs index 57668387b0..4293a03278 100644 --- a/crates/node/service/src/actors/engine.rs +++ b/crates/node/service/src/actors/engine.rs @@ -177,6 +177,43 @@ impl NodeActor for EngineActor { .map(|inbound_query_channel| self.start_query_task(inbound_query_channel)); loop { + match self.engine.drain().await { + Ok(_) => { + trace!(target: "engine", "[ENGINE] tasks drained"); + } + Err(EngineTaskError::Reset(e)) => { + warn!(target: "engine", err = ?e, "Received reset request"); + self.reset().await?; + } + Err(EngineTaskError::Flush(e)) => { + // This error is encountered when the payload is marked INVALID + // by the engine api. Post-holocene, the payload is replaced by + // a "deposits-only" block and re-executed. At the same time, + // the channel and any remaining buffered batches are flushed. + warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline."); + match self.derivation_signal_tx.send(Signal::FlushChannel) { + Ok(_) => { + debug!(target: "engine", "[SENT] flush signal to derivation actor") + } + Err(e) => { + error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor."); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + } + } + } + Err(err @ EngineTaskError::Critical(_)) => { + error!(target: "engine", ?err, "Critical error draining engine tasks"); + self.cancellation.cancel(); + return Err(err.into()); + } + Err(EngineTaskError::Temporary(err)) => { + trace!(target: "engine", ?err, "Temporary error draining engine tasks"); + } + } + + self.maybe_update_safe_head(); + tokio::select! { biased; @@ -233,7 +270,13 @@ impl NodeActor for EngineActor { self.engine.enqueue(task); debug!(target: "engine", "Enqueued attributes consolidation task."); } - Some(config) = self.runtime_config_rx.recv() => { + config = self.runtime_config_rx.recv() => { + let Some(config) = config else { + error!(target: "engine", "Runtime config receiver closed unexpectedly, exiting node"); + self.cancellation.cancel(); + return Err(EngineError::ChannelClosed); + }; + let client = Arc::clone(&self.client); tokio::task::spawn(async move { debug!(target: "engine", config = ?config, "Received runtime config"); @@ -249,35 +292,6 @@ impl NodeActor for EngineActor { } }); } - res = self.engine.drain() => { - match res { - Ok(_) => { - trace!(target: "engine", "[ENGINE] tasks drained"); - } - Err(EngineTaskError::Reset(e)) => { - warn!(target: "engine", err = ?e, "Received reset request"); - self.reset().await?; - } - Err(EngineTaskError::Flush(e)) => { - // This error is encountered when the payload is marked INVALID - // by the engine api. Post-holocene, the payload is replaced by - // a "deposits-only" block and re-executed. At the same time, - // the channel and any remaining buffered batches are flushed. - warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline."); - match self.derivation_signal_tx.send(Signal::FlushChannel) { - Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"), - Err(e) => { - error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor."); - self.cancellation.cancel(); - return Err(EngineError::ChannelClosed); - } - } - } - Err(e) => warn!(target: "engine", ?e, "Error draining engine tasks"), - } - - self.maybe_update_safe_head(); - } } } } @@ -296,6 +310,9 @@ pub enum EngineError { /// Engine reset error. #[error(transparent)] EngineReset(#[from] EngineResetError), + /// Engine task error. + #[error(transparent)] + EngineTask(#[from] EngineTaskError), } /// Configuration for the Engine Actor.