Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 58 additions & 59 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,64 +209,6 @@ where
}
}
}
}

#[async_trait]
impl<P> NodeActor for DerivationActor<P>
where
P: Pipeline + SignalReceiver + Send + Sync,
{
type InboundEvent = InboundDerivationMessage;
type Error = DerivationError;

async fn start(mut self) -> Result<(), Self::Error> {
loop {
select! {
biased;

_ = self.cancellation.cancelled() => {
info!(
target: "derivation",
"Received shutdown signal. Exiting derivation task."
);
return Ok(());
}
signal = self.derivation_signal_rx.recv() => {
let Some(signal) = signal else {
error!(
target: "derivation",
?signal,
"DerivationActor failed to receive signal"
);
return Err(DerivationError::SignalReceiveFailed);
};

self.signal(signal).await;
self.waiting_for_signal = false;
}
msg = self.l1_head_updates.changed() => {
if let Err(err) = msg {
error!(
target: "derivation",
?err,
"L1 head update stream closed without cancellation. Exiting derivation task."
);
return Ok(());
}

self.process(InboundDerivationMessage::NewDataAvailable).await?;
}
_ = self.engine_l2_safe_head.changed() => {
self.process(InboundDerivationMessage::SafeHeadUpdated).await?;
}
_ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => {
info!(target: "derivation", "Engine finished syncing, starting derivation.");
// Optimistically process the first message.
self.process(InboundDerivationMessage::NewDataAvailable).await?;
}
}
}
}

/// Attempts to process the next payload attributes.
///
Expand All @@ -280,7 +222,7 @@ where
/// attributes are successfully produced. If the pipeline step errors,
/// the same [`L2BlockInfo`] is used again. If the [`L2BlockInfo`] is the
/// zero hash, the pipeline is not stepped on.
async fn process(&mut self, msg: Self::InboundEvent) -> Result<(), Self::Error> {
async fn process(&mut self, msg: InboundDerivationMessage) -> Result<(), DerivationError> {
// Only attempt derivation once the engine finishes syncing.
if !self.el_sync_complete_rx.is_terminated() {
trace!(target: "derivation", "Engine not ready, skipping derivation");
Expand Down Expand Up @@ -344,6 +286,63 @@ where
}
}

#[async_trait]
impl<P> NodeActor for DerivationActor<P>
where
P: Pipeline + SignalReceiver + Send + Sync,
{
type Error = DerivationError;

async fn start(mut self) -> Result<(), Self::Error> {
loop {
select! {
biased;

_ = self.cancellation.cancelled() => {
info!(
target: "derivation",
"Received shutdown signal. Exiting derivation task."
);
return Ok(());
}
signal = self.derivation_signal_rx.recv() => {
let Some(signal) = signal else {
error!(
target: "derivation",
?signal,
"DerivationActor failed to receive signal"
);
return Err(DerivationError::SignalReceiveFailed);
};

self.signal(signal).await;
self.waiting_for_signal = false;
}
msg = self.l1_head_updates.changed() => {
if let Err(err) = msg {
error!(
target: "derivation",
?err,
"L1 head update stream closed without cancellation. Exiting derivation task."
);
return Ok(());
}

self.process(InboundDerivationMessage::NewDataAvailable).await?;
}
_ = self.engine_l2_safe_head.changed() => {
self.process(InboundDerivationMessage::SafeHeadUpdated).await?;
}
_ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => {
info!(target: "derivation", "Engine finished syncing, starting derivation.");
// Optimistically process the first message.
self.process(InboundDerivationMessage::NewDataAvailable).await?;
}
}
}
}
}

/// Messages that the [DerivationActor] can receive from other actors.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InboundDerivationMessage {
Expand Down
103 changes: 51 additions & 52 deletions crates/node/service/src/actors/engine/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,61 @@ impl EngineActor {
}
})
}

async fn process(&mut self, msg: InboundEngineMessage) -> Result<(), EngineError> {
match msg {
InboundEngineMessage::ResetRequest => {
warn!(target: "engine", "Received reset request");
self.reset().await?;
}
InboundEngineMessage::UnsafeBlockReceived(envelope) => {
let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new(
Arc::clone(&self.client),
Arc::clone(&self.config),
*envelope,
));
self.engine.enqueue(task);
}
InboundEngineMessage::DerivedAttributesReceived(attributes) => {
self.finalizer.enqueue_for_finalization(&attributes);

let task = EngineTask::Consolidate(ConsolidateTask::new(
Arc::clone(&self.client),
Arc::clone(&self.config),
*attributes,
true,
));
self.engine.enqueue(task);
}
InboundEngineMessage::RuntimeConfigUpdate(config) => {
let client = Arc::clone(&self.client);
tokio::task::spawn(async move {
debug!(target: "engine", config = ?config, "Received runtime config");
let recommended = config.recommended_protocol_version;
let required = config.required_protocol_version;
match client.signal_superchain_v1(recommended, required).await {
Ok(v) => info!(target: "engine", ?v, "[SUPERCHAIN::SIGNAL]"),
Err(e) => {
// Since the `engine_signalSuperchainV1` endpoint is OPTIONAL,
// a warning is logged instead of an error.
warn!(target: "engine", ?e, "Failed to send superchain signal (OPTIONAL)");
}
}
});
}
InboundEngineMessage::NewFinalizedL1Block => {
// Attempt to finalize any L2 blocks that are contained within the finalized L1
// chain.
self.finalizer.try_finalize_next(&mut self.engine).await;
}
}

Ok(())
}
}

#[async_trait]
impl NodeActor for EngineActor {
type InboundEvent = InboundEngineMessage;
type Error = EngineError;

async fn start(mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -289,57 +339,6 @@ impl NodeActor for EngineActor {
}
}
}

async fn process(&mut self, msg: Self::InboundEvent) -> Result<(), Self::Error> {
match msg {
InboundEngineMessage::ResetRequest => {
warn!(target: "engine", "Received reset request");
self.reset().await?;
}
InboundEngineMessage::UnsafeBlockReceived(envelope) => {
let task = EngineTask::InsertUnsafe(InsertUnsafeTask::new(
Arc::clone(&self.client),
Arc::clone(&self.config),
*envelope,
));
self.engine.enqueue(task);
}
InboundEngineMessage::DerivedAttributesReceived(attributes) => {
self.finalizer.enqueue_for_finalization(&attributes);

let task = EngineTask::Consolidate(ConsolidateTask::new(
Arc::clone(&self.client),
Arc::clone(&self.config),
*attributes,
true,
));
self.engine.enqueue(task);
}
InboundEngineMessage::RuntimeConfigUpdate(config) => {
let client = Arc::clone(&self.client);
tokio::task::spawn(async move {
debug!(target: "engine", config = ?config, "Received runtime config");
let recommended = config.recommended_protocol_version;
let required = config.required_protocol_version;
match client.signal_superchain_v1(recommended, required).await {
Ok(v) => info!(target: "engine", ?v, "[SUPERCHAIN::SIGNAL]"),
Err(e) => {
// Since the `engine_signalSuperchainV1` endpoint is OPTIONAL,
// a warning is logged instead of an error.
warn!(target: "engine", ?e, "Failed to send superchain signal (OPTIONAL)");
}
}
});
}
InboundEngineMessage::NewFinalizedL1Block => {
// Attempt to finalize any L2 blocks that are contained within the finalized L1
// chain.
self.finalizer.try_finalize_next(&mut self.engine).await;
}
}

Ok(())
}
}

/// An event that is received by the [`EngineActor`] for processing.
Expand Down
6 changes: 0 additions & 6 deletions crates/node/service/src/actors/l1_watcher_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ impl L1WatcherRpc {

#[async_trait]
impl NodeActor for L1WatcherRpc {
type InboundEvent = ();
type Error = L1WatcherRpcError<BlockInfo>;

async fn start(mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -222,11 +221,6 @@ impl NodeActor for L1WatcherRpc {
}
}
}

async fn process(&mut self, _msg: Self::InboundEvent) -> Result<(), Self::Error> {
// The L1 watcher does not process any incoming messages.
Ok(())
}
}

/// A wrapper around a [`PollerBuilder`] that observes [`BlockId`] updates on a [`RootProvider`].
Expand Down
5 changes: 0 additions & 5 deletions crates/node/service/src/actors/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl NetworkActor {

#[async_trait]
impl NodeActor for NetworkActor {
type InboundEvent = ();
type Error = NetworkActorError;

async fn start(mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -118,10 +117,6 @@ impl NodeActor for NetworkActor {
}
}
}

async fn process(&mut self, _: Self::InboundEvent) -> Result<(), Self::Error> {
Ok(())
}
}

/// An error from the network actor.
Expand Down
5 changes: 0 additions & 5 deletions crates/node/service/src/actors/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ impl RpcActor {

#[async_trait]
impl NodeActor for RpcActor {
type InboundEvent = ();
type Error = RpcActorError;

async fn start(mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -81,8 +80,4 @@ impl NodeActor for RpcActor {
self.cancellation.cancel();
return Err(RpcActorError::ServerStopped);
}

async fn process(&mut self, _: Self::InboundEvent) -> Result<(), Self::Error> {
Ok(())
}
}
6 changes: 0 additions & 6 deletions crates/node/service/src/actors/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl RuntimeLauncher {

#[async_trait]
impl NodeActor for RuntimeActor {
type InboundEvent = ();
type Error = RuntimeLoaderError;

async fn start(mut self) -> Result<(), Self::Error> {
Expand All @@ -99,9 +98,4 @@ impl NodeActor for RuntimeActor {
}
}
}

async fn process(&mut self, e: Self::InboundEvent) -> Result<(), Self::Error> {
trace!(target: "runtime", ?e, "Runtime Actor received unexpected inbound event. Ignoring.");
Ok(())
}
}
6 changes: 0 additions & 6 deletions crates/node/service/src/actors/supervisor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ impl<E> NodeActor for SupervisorActor<E>
where
E: SupervisorExt + Send + Sync,
{
type InboundEvent = ();
type Error = SupervisorActorError;

async fn start(mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -75,11 +74,6 @@ where
}
}
}

async fn process(&mut self, _msg: Self::InboundEvent) -> Result<(), Self::Error> {
// The supervisor actor does not process any incoming messages.
Ok(())
}
}

/// The error type for the [`SupervisorActor`].
Expand Down
5 changes: 0 additions & 5 deletions crates/node/service/src/actors/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ use async_trait::async_trait;
/// - Emit new events for other actors to process.
#[async_trait]
pub trait NodeActor {
/// The event type received by the actor.
type InboundEvent;
/// The error type for the actor.
type Error: std::fmt::Debug;

/// Starts the actor.
async fn start(self) -> Result<(), Self::Error>;

/// Processes an incoming message.
async fn process(&mut self, msg: Self::InboundEvent) -> Result<(), Self::Error>;
}