Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
480 changes: 133 additions & 347 deletions crates/node/service/src/actors/derivation/actor.rs

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions crates/node/service/src/actors/derivation/engine_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{EngineActorRequest, EngineClientError, EngineClientResult, ResetRequest};
use async_trait::async_trait;
use derive_more::Constructor;
use kona_protocol::OpAttributesWithParent;
use std::fmt::Debug;
use tokio::sync::mpsc;
Expand All @@ -24,7 +25,7 @@ pub trait DerivationEngineClient: Debug + Send + Sync {
}

/// Client to use to send messages to the Engine Actor's inbound channel.
#[derive(Debug)]
#[derive(Constructor, Debug)]
pub struct QueuedDerivationEngineClient {
/// A channel to use to send the [`EngineActorRequest`]s to the EngineActor.
pub engine_actor_request_tx: mpsc::Sender<EngineActorRequest>,
Expand All @@ -35,21 +36,27 @@ impl DerivationEngineClient for QueuedDerivationEngineClient {
async fn reset_engine_forkchoice(&self) -> EngineClientResult<()> {
let (result_tx, mut result_rx) = mpsc::channel(1);

info!(target: "derivation", "Sending reset request to engine.");
self.engine_actor_request_tx
.send(EngineActorRequest::ResetRequest(Box::new(ResetRequest { result_tx })))
.await
.map_err(|_| EngineClientError::RequestError("request channel closed.".to_string()))?;

result_rx.recv().await.ok_or_else(|| {
error!(target: "derivation_engine_client", "Failed to receive built payload");
EngineClientError::ResponseError("response channel closed.".to_string())
})?
result_rx
.recv()
.await
.inspect(|_| info!(target: "derivation", "Engine reset successfully."))
.ok_or_else(|| {
error!(target: "derivation_engine_client", "Failed to receive built payload");
EngineClientError::ResponseError("response channel closed.".to_string())
})?
}

async fn send_derived_attributes(
&self,
attributes: OpAttributesWithParent,
) -> EngineClientResult<()> {
trace!(target: "derivation", ?attributes, "Sending derived attributes to engine.");
self.engine_actor_request_tx
.send(EngineActorRequest::ProcessDerivedL2AttributesRequest(Box::new(attributes)))
.await
Expand All @@ -59,8 +66,9 @@ impl DerivationEngineClient for QueuedDerivationEngineClient {
}

async fn send_finalized_l2_block(&self, block_number: u64) -> EngineClientResult<()> {
trace!(target: "derivation", block_number, "Sending finalized L2 block number to engine.");
self.engine_actor_request_tx
.send(EngineActorRequest::ProcessFinalizedL2BlockRequest(block_number))
.send(EngineActorRequest::ProcessFinalizedL2BlockNumberRequest(Box::new(block_number)))
.await
.map_err(|_| EngineClientError::RequestError("request channel closed.".to_string()))?;

Expand Down
8 changes: 4 additions & 4 deletions crates/node/service/src/actors/derivation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod actor;
pub use actor::{
DerivationActor, DerivationBuilder, DerivationError, DerivationInboundChannels,
DerivationState, InboundDerivationMessage, PipelineBuilder,
};
pub use actor::{DerivationActor, DerivationError};

mod engine_client;
pub use engine_client::{DerivationEngineClient, QueuedDerivationEngineClient};

mod finalizer;
pub(crate) use finalizer::L2Finalizer;

mod request;
pub use request::{DerivationActorRequest, DerivationClientError, DerivationClientResult};
35 changes: 35 additions & 0 deletions crates/node/service/src/actors/derivation/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use kona_derive::Signal;
use kona_protocol::{BlockInfo, L2BlockInfo};
use thiserror::Error;

/// The result of an Engine client call.
pub type DerivationClientResult<T> = Result<T, DerivationClientError>;

/// Error making requests to the [`crate::DerivationActor`].
#[derive(Debug, Error)]
pub enum DerivationClientError {
/// Error making a request to the [`crate::DerivationActor`]. The request never made it there.
#[error("Error making a request to the derivation actor: {0}.")]
RequestError(String),

/// Error receiving response from the [`crate::DerivationActor`].
/// This means the request may or may not have succeeded.
#[error("Error receiving response from the derivation actor: {0}..")]
ResponseError(String),
}

/// Inbound requests that the [`crate::DerivationActor`] can process.
#[derive(Debug)]
pub enum DerivationActorRequest {
/// Request to process the fact that Engine sync has completed.
ProcessEngineSyncCompletionRequest,
/// Request to process the provided L2 engine safe head update.
ProcessEngineSafeHeadUpdateRequest(Box<L2BlockInfo>),
/// A request containing a [`Signal`] to the derivation pipeline.
/// This allows the Engine to send the DerivationActor signals (e.g. to Flush or Reset).
ProcessEngineSignalRequest(Box<Signal>),
/// A request to process the provided finalized L1 [`BlockInfo`].
ProcessFinalizedL1Block(Box<BlockInfo>),
/// Request to process the provided L1 head block update.
ProcessL1HeadUpdateRequest(Box<BlockInfo>),
}
Loading
Loading