Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 37 additions & 76 deletions crates/node/service/src/actors/engine/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,63 +261,21 @@ impl EngineActorState {
trace!(target: "engine", ?sent, "Attempted L2 Safe Head Update");
}

async fn process(
&mut self,
msg: InboundEngineMessage,
derivation_signal_tx: &mpsc::Sender<Signal>,
engine_l2_safe_head_tx: &watch::Sender<L2BlockInfo>,
finalizer: &mut L2Finalizer,
cancellation: &CancellationToken,
) -> Result<(), EngineError> {
match msg {
InboundEngineMessage::ResetRequest => {
warn!(target: "engine", "Received reset request");
self.reset(derivation_signal_tx, engine_l2_safe_head_tx, finalizer, cancellation)
.await?;
}
InboundEngineMessage::UnsafeBlockReceived(envelope) => {
let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new(
self.client.clone(),
self.rollup.clone(),
*envelope,
));
self.engine.enqueue(task);
}
InboundEngineMessage::DerivedAttributesReceived(attributes) => {
finalizer.enqueue_for_finalization(&attributes);

let task = EngineTask::Consolidate(ConsolidateTask::new(
self.client.clone(),
Arc::clone(&self.rollup),
*attributes,
true,
));
self.engine.enqueue(task);
}
InboundEngineMessage::RuntimeConfigUpdate(config) => {
let client = self.client.clone();
tokio::task::spawn(async move {
debug!(target: "engine", config = ?config, "Received runtime config");
let recommended = config.recommended_protocol_version;
let required = config.required_protocol_version;
match client.signal_superchain_v1(recommended, required).await {
Ok(v) => info!(target: "engine", ?v, "[SUPERCHAIN::SIGNAL]"),
Err(e) => {
// Since the `engine_signalSuperchainV1` endpoint is OPTIONAL,
// a warning is logged instead of an error.
warn!(target: "engine", ?e, "Failed to send superchain signal (OPTIONAL)");
}
}
});
}
InboundEngineMessage::NewFinalizedL1Block => {
// Attempt to finalize any L2 blocks that are contained within the finalized L1
// chain.
finalizer.try_finalize_next(&mut self.engine).await;
fn runtime_config_update(&mut self, config: RuntimeConfig) {
let client = self.client.clone();
tokio::task::spawn(async move {
debug!(target: "engine", config = ?config, "Received runtime config");
let recommended = config.recommended_protocol_version;
let required = config.required_protocol_version;
match client.signal_superchain_v1(recommended, required).await {
Ok(v) => info!(target: "engine", ?v, "[SUPERCHAIN::SIGNAL]"),
Err(e) => {
// Since the `engine_signalSuperchainV1` endpoint is OPTIONAL,
// a warning is logged instead of an error.
warn!(target: "engine", ?e, "Failed to send superchain signal (OPTIONAL)");
}
}
}

Ok(())
});
}
}

Expand Down Expand Up @@ -378,60 +336,63 @@ impl NodeActor for EngineActor {
cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
self.state.process(InboundEngineMessage::ResetRequest, &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?;
warn!(target: "engine", "Received reset request");
self.state
.reset(&self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation)
.await?;
}
unsafe_block = unsafe_block_rx.recv() => {
let Some(envelope) = unsafe_block else {
error!(target: "engine", "Unsafe block receiver closed unexpectedly");
cancellation.cancel();
return Err(EngineError::ChannelClosed);
};
self.state.process(InboundEngineMessage::UnsafeBlockReceived(envelope.into()), &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?;
let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new(
self.state.client.clone(),
self.state.rollup.clone(),
envelope,
));
self.state.engine.enqueue(task);
}
attributes = attributes_rx.recv() => {
let Some(attributes) = attributes else {
error!(target: "engine", "Attributes receiver closed unexpectedly");
cancellation.cancel();
return Err(EngineError::ChannelClosed);
};
self.state.process(InboundEngineMessage::DerivedAttributesReceived(attributes.into()), &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?;
finalizer.enqueue_for_finalization(&attributes);

let task = EngineTask::Consolidate(ConsolidateTask::new(
self.state.client.clone(),
Arc::clone(&self.state.rollup),
attributes,
true,
));
self.state.engine.enqueue(task);
}
config = runtime_config_rx.as_mut().map(|rx| rx.recv()).unwrap(), if runtime_config_rx.is_some() => {
let Some(config) = config else {
error!(target: "engine", "Runtime config receiver closed unexpectedly");
cancellation.cancel();
return Err(EngineError::ChannelClosed);
};
self.state.process(InboundEngineMessage::RuntimeConfigUpdate(config.into()), &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?;
self.state.runtime_config_update(config);
}
msg = finalizer.new_finalized_block() => {
if let Err(err) = msg {
error!(target: "engine", ?err, "L1 finalized block receiver closed unexpectedly");
cancellation.cancel();
return Err(EngineError::ChannelClosed);
}
self.state.process(InboundEngineMessage::NewFinalizedL1Block, &self.derivation_signal_tx, &self.engine_l2_safe_head_tx, &mut finalizer, &cancellation).await?;
// Attempt to finalize any L2 blocks that are contained within the finalized L1
// chain.
finalizer.try_finalize_next(&mut self.state.engine).await;
}
}
}
}
}

/// An event that is received by the [`EngineActor`] for processing.
#[derive(Debug)]
pub enum InboundEngineMessage {
/// Engine reset requested.
ResetRequest,
/// Received a new unsafe [`OpExecutionPayloadEnvelope`].
UnsafeBlockReceived(Box<OpExecutionPayloadEnvelope>),
/// Received new derived [`OpAttributesWithParent`].
DerivedAttributesReceived(Box<OpAttributesWithParent>),
/// Received an update to the runtime configuration.
RuntimeConfigUpdate(Box<RuntimeConfig>),
/// Received a new finalized L1 block
NewFinalizedL1Block,
}

/// Configuration for the Engine Actor.
#[derive(Debug, Clone)]
pub struct EngineLauncher {
Expand Down
5 changes: 1 addition & 4 deletions crates/node/service/src/actors/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
//! The [`EngineActor`] and its components.

mod actor;
pub use actor::{
EngineActor, EngineActorState, EngineContext, EngineLauncher, EngineOutboundData,
InboundEngineMessage,
};
pub use actor::{EngineActor, EngineActorState, EngineContext, EngineLauncher, EngineOutboundData};

mod error;
pub use error::EngineError;
Expand Down
2 changes: 1 addition & 1 deletion crates/node/service/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use runtime::{RuntimeActor, RuntimeContext, RuntimeOutboundData, RuntimeStat
mod engine;
pub use engine::{
EngineActor, EngineActorState, EngineContext, EngineError, EngineLauncher, EngineOutboundData,
InboundEngineMessage, L2Finalizer,
L2Finalizer,
};

mod supervisor;
Expand Down
13 changes: 6 additions & 7 deletions crates/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ mod actors;
pub use actors::{
CancellableContext, DerivationActor, DerivationContext, DerivationError,
DerivationOutboundChannels, DerivationState, EngineActor, EngineActorState, EngineContext,
EngineError, EngineLauncher, EngineOutboundData, InboundDerivationMessage,
InboundEngineMessage, L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError,
L1WatcherRpcOutboundChannels, L1WatcherRpcState, L2Finalizer, NetworkActor, NetworkActorError,
NetworkContext, NetworkOutboundData, NodeActor, RpcActor, RpcActorError, RpcContext,
RuntimeActor, RuntimeContext, RuntimeOutboundData, RuntimeState, SupervisorActor,
SupervisorActorContext, SupervisorActorError, SupervisorExt, SupervisorOutboundData,
SupervisorRpcServerExt,
EngineError, EngineLauncher, EngineOutboundData, InboundDerivationMessage, L1WatcherRpc,
L1WatcherRpcContext, L1WatcherRpcError, L1WatcherRpcOutboundChannels, L1WatcherRpcState,
L2Finalizer, NetworkActor, NetworkActorError, NetworkContext, NetworkOutboundData, NodeActor,
RpcActor, RpcActorError, RpcContext, RuntimeActor, RuntimeContext, RuntimeOutboundData,
RuntimeState, SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt,
SupervisorOutboundData, SupervisorRpcServerExt,
};

mod metrics;
Expand Down