Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions crates/node/engine/src/task_queue/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ impl Engine {
Self { state: initial_state, state_sender, tasks: BinaryHeap::default() }
}

/// Returns true if the inner [`EngineState`] is initialized.
pub fn is_state_initialized(&self) -> bool {
self.state != EngineState::default()
}

/// Returns a reference to the inner [`EngineState`].
pub const fn state(&self) -> &EngineState {
&self.state
Expand Down Expand Up @@ -122,9 +117,6 @@ impl Engine {
/// Attempts to drain the queue by executing all [`EngineTask`]s in-order. If any task returns
/// an error along the way, it is not popped from the queue (in case it must be retried) and
/// the error is returned.
///
/// If an [`EngineTaskError::Reset`] is encountered, the remaining tasks in the queue are
/// cleared.
pub async fn drain(&mut self) -> Result<(), EngineTaskError> {
// Drain tasks in order of priority, halting on errors for a retry to be attempted.
while let Some(task) = self.tasks.peek() {
Expand Down
77 changes: 47 additions & 30 deletions crates/node/service/src/actors/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,43 @@ impl NodeActor for EngineActor {
.map(|inbound_query_channel| self.start_query_task(inbound_query_channel));

loop {
match self.engine.drain().await {
Ok(_) => {
trace!(target: "engine", "[ENGINE] tasks drained");
}
Err(EngineTaskError::Reset(e)) => {
warn!(target: "engine", err = ?e, "Received reset request");
self.reset().await?;
}
Err(EngineTaskError::Flush(e)) => {
// This error is encountered when the payload is marked INVALID
// by the engine api. Post-holocene, the payload is replaced by
// a "deposits-only" block and re-executed. At the same time,
// the channel and any remaining buffered batches are flushed.
warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline.");
match self.derivation_signal_tx.send(Signal::FlushChannel) {
Ok(_) => {
debug!(target: "engine", "[SENT] flush signal to derivation actor")
}
Err(e) => {
error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor.");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
}
}
Err(err @ EngineTaskError::Critical(_)) => {
error!(target: "engine", ?err, "Critical error draining engine tasks");
self.cancellation.cancel();
return Err(err.into());
}
Err(EngineTaskError::Temporary(err)) => {
trace!(target: "engine", ?err, "Temporary error draining engine tasks");
}
}

self.maybe_update_safe_head();

tokio::select! {
biased;

Expand Down Expand Up @@ -233,7 +270,13 @@ impl NodeActor for EngineActor {
self.engine.enqueue(task);
debug!(target: "engine", "Enqueued attributes consolidation task.");
}
Some(config) = self.runtime_config_rx.recv() => {
config = self.runtime_config_rx.recv() => {
let Some(config) = config else {
error!(target: "engine", "Runtime config receiver closed unexpectedly, exiting node");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
};

let client = Arc::clone(&self.client);
tokio::task::spawn(async move {
debug!(target: "engine", config = ?config, "Received runtime config");
Expand All @@ -249,35 +292,6 @@ impl NodeActor for EngineActor {
}
});
}
res = self.engine.drain() => {
match res {
Ok(_) => {
trace!(target: "engine", "[ENGINE] tasks drained");
}
Err(EngineTaskError::Reset(e)) => {
warn!(target: "engine", err = ?e, "Received reset request");
self.reset().await?;
}
Err(EngineTaskError::Flush(e)) => {
// This error is encountered when the payload is marked INVALID
// by the engine api. Post-holocene, the payload is replaced by
// a "deposits-only" block and re-executed. At the same time,
// the channel and any remaining buffered batches are flushed.
warn!(target: "engine", err = ?e, "[HOLOCENE] Invalid payload, Flushing derivation pipeline.");
match self.derivation_signal_tx.send(Signal::FlushChannel) {
Ok(_) => debug!(target: "engine", "[SENT] flush signal to derivation actor"),
Err(e) => {
error!(target: "engine", ?e, "[ENGINE] Failed to send flush signal to the derivation actor.");
self.cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
}
}
Err(e) => warn!(target: "engine", ?e, "Error draining engine tasks"),
}

self.maybe_update_safe_head();
}
}
}
}
Expand All @@ -296,6 +310,9 @@ pub enum EngineError {
/// Engine reset error.
#[error(transparent)]
EngineReset(#[from] EngineResetError),
/// Engine task error.
#[error(transparent)]
EngineTask(#[from] EngineTaskError),
}

/// Configuration for the Engine Actor.
Expand Down
Loading