Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions crates/supervisor/core/src/chain_processor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@
use super::*;
use crate::{
event::ChainEvent,
syncnode::{ManagedNodeApiProvider, ManagedNodeError, NodeSubscriber, ReceiptProvider},
syncnode::{
ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber,
ReceiptProvider,
},
};
use alloy_primitives::B256;
use alloy_rpc_types_eth::BlockNumHash;
Expand Down Expand Up @@ -165,7 +168,7 @@
}

#[async_trait]
impl ManagedNodeApiProvider for MockNode {
impl ManagedNodeDataProvider for MockNode {
async fn output_v0_at_timestamp(
&self,
_timestamp: u64,
Expand All @@ -186,7 +189,10 @@
) -> Result<BlockInfo, ManagedNodeError> {
Ok(BlockInfo::default())
}
}

#[async_trait]
impl ManagedNodeController for MockNode {
async fn update_finalized(
&self,
_finalized_block_id: BlockNumHash,
Expand All @@ -208,6 +214,10 @@
) -> Result<(), ManagedNodeError> {
Ok(())
}

async fn reset(&self) -> Result<(), ManagedNodeError> {
Ok(())
}

Check warning on line 220 in crates/supervisor/core/src/chain_processor/chain.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/chain.rs#L218-L220

Added lines #L218 - L220 were not covered by tests
}

mock!(
Expand Down
155 changes: 108 additions & 47 deletions crates/supervisor/core/src/chain_processor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use alloy_primitives::ChainId;
use kona_interop::{BlockReplacement, DerivedRefPair};
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter};
use kona_supervisor_storage::{
DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError,
};
use std::{fmt::Debug, sync::Arc};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -175,7 +177,7 @@
.await;
}
ChainEvent::DerivationOriginUpdate { origin } => {
let _ = self.handle_derivation_origin_update(origin).inspect_err(|err| {
let _ = self.handle_derivation_origin_update(origin).await.inspect_err(|err| {
error!(
target: "chain_processor",
chain_id = self.chain_id,
Expand Down Expand Up @@ -270,7 +272,7 @@
Ok(finalized_derived_block)
}

fn handle_derivation_origin_update(
async fn handle_derivation_origin_update(
&self,
origin: BlockInfo,
) -> Result<(), ChainProcessorError> {
Expand All @@ -280,9 +282,31 @@
block_number = origin.number,
"Processing derivation origin update"
);
self.state_manager.update_current_l1(origin)?;
self.state_manager.save_source_block(origin)?;
Ok(())
match self.state_manager.save_source_block(origin) {
Ok(_) => {
self.state_manager.update_current_l1(origin)?;
Ok(())
}
Err(StorageError::BlockOutOfOrder) => {
error!(

Check warning on line 291 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L291

Added line #L291 was not covered by tests
target: "chain_processor",
chain_id = self.chain_id,
block_number = origin.number,
"Source block out of order detected, resetting managed node"

Check warning on line 295 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L295

Added line #L295 was not covered by tests
);

if let Err(err) = self.managed_node.reset().await {
error!(

Check warning on line 299 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L298-L299

Added lines #L298 - L299 were not covered by tests
target: "chain_processor",
chain_id = self.chain_id,
%err,
"Failed to reset managed node after block out of order"

Check warning on line 303 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L303

Added line #L303 was not covered by tests
);
}
Err(StorageError::BlockOutOfOrder.into())

Check warning on line 306 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L305-L306

Added lines #L305 - L306 were not covered by tests
Comment thread
dhyaniarun1993 marked this conversation as resolved.
}
Err(err) => Err(err.into()),

Check warning on line 308 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L308

Added line #L308 was not covered by tests
}
}

async fn handle_safe_event(
Expand All @@ -295,8 +319,37 @@
block_number = derived_ref_pair.derived.number,
"Processing local safe derived block pair"
);
self.state_manager.save_derived_block(derived_ref_pair)?;
Ok(derived_ref_pair.derived)
match self.state_manager.save_derived_block(derived_ref_pair) {
Ok(_) => Ok(derived_ref_pair.derived),
Err(StorageError::BlockOutOfOrder) => {
error!(

Check warning on line 325 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L325

Added line #L325 was not covered by tests
target: "chain_processor",
chain_id = self.chain_id,
block_number = derived_ref_pair.derived.number,
"Block out of order detected, resetting managed node"

Check warning on line 329 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L329

Added line #L329 was not covered by tests
);

if let Err(err) = self.managed_node.reset().await {
error!(

Check warning on line 333 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L332-L333

Added lines #L332 - L333 were not covered by tests
target: "chain_processor",
chain_id = self.chain_id,
%err,
"Failed to reset managed node after block out of order"

Check warning on line 337 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L337

Added line #L337 was not covered by tests
);
}
Err(StorageError::BlockOutOfOrder.into())

Check warning on line 340 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L339-L340

Added lines #L339 - L340 were not covered by tests
Comment thread
dhyaniarun1993 marked this conversation as resolved.
}
Err(err) => {
error!(

Check warning on line 343 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L342-L343

Added lines #L342 - L343 were not covered by tests
target: "chain_processor",
chain_id = self.chain_id,
block_number = derived_ref_pair.derived.number,
%err,
"Failed to save derived block pair"

Check warning on line 348 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L348

Added line #L348 was not covered by tests
);
Err(err.into())

Check warning on line 350 in crates/supervisor/core/src/chain_processor/task.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/task.rs#L350

Added line #L350 was not covered by tests
}
}
}

async fn handle_unsafe_event(
Expand Down Expand Up @@ -352,7 +405,10 @@
use super::*;
use crate::{
event::ChainEvent,
syncnode::{ManagedNodeApiProvider, ManagedNodeError, NodeSubscriber, ReceiptProvider},
syncnode::{
ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber,
ReceiptProvider,
},
};
use alloy_primitives::B256;
use alloy_rpc_types_eth::BlockNumHash;
Expand All @@ -377,46 +433,51 @@
&self,
_event_tx: mpsc::Sender<ChainEvent>,
) -> Result<(), ManagedNodeError>;
}
}

#[async_trait]
impl ReceiptProvider for Node {
async fn fetch_receipts(&self, _block_hash: B256) -> Result<Receipts, ManagedNodeError>;
}
#[async_trait]
impl ReceiptProvider for Node {
async fn fetch_receipts(&self, _block_hash: B256) -> Result<Receipts, ManagedNodeError>;
}

#[async_trait]
impl ManagedNodeApiProvider for Node {
async fn output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError>;

async fn pending_output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError>;

async fn l2_block_ref_by_timestamp(
&self,
_timestamp: u64,
) -> Result<BlockInfo, ManagedNodeError>;

async fn update_finalized(
&self,
_finalized_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn update_cross_unsafe(
&self,
cross_unsafe_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn update_cross_safe(
&self,
source_block_id: BlockNumHash,
derived_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;
}
#[async_trait]
impl ManagedNodeDataProvider for Node {
async fn output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError>;

async fn pending_output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError>;

async fn l2_block_ref_by_timestamp(
&self,
_timestamp: u64,
) -> Result<BlockInfo, ManagedNodeError>;
}

#[async_trait]
impl ManagedNodeController for Node {
async fn update_finalized(
&self,
_finalized_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn update_cross_unsafe(
&self,
cross_unsafe_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn update_cross_safe(
&self,
source_block_id: BlockNumHash,
derived_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn reset(&self) -> Result<(), ManagedNodeError>;
}
);

mock!(
Expand Down
2 changes: 1 addition & 1 deletion crates/supervisor/core/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
event::ChainEvent,
l1_watcher::L1Watcher,
safety_checker::{CrossSafePromoter, CrossUnsafePromoter},
syncnode::{Client, ManagedNode, ManagedNodeApiProvider, ManagedNodeClient},
syncnode::{Client, ManagedNode, ManagedNodeClient, ManagedNodeDataProvider},
};

/// Defines the service for the Supervisor core logic.
Expand Down
6 changes: 3 additions & 3 deletions crates/supervisor/core/src/syncnode/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ pub enum ManagedNodeError {
#[error("failed to authenticate: {0}")]
Authentication(#[from] AuthenticationError),

/// Database provider is not set for the managed node.
#[error("database not initialised for managed node")]
DatabaseNotInitialised,
/// Represents an error that occurred while fetching data from the storage.
#[error(transparent)]
StorageError(#[from] StorageError),
}

impl PartialEq for ManagedNodeError {
Expand Down
5 changes: 4 additions & 1 deletion crates/supervisor/core/src/syncnode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ mod error;
pub use error::{AuthenticationError, ManagedEventTaskError, ManagedNodeError, SubscriptionError};

mod traits;
pub use traits::{ManagedNodeApiProvider, ManagedNodeProvider, NodeSubscriber, ReceiptProvider};
pub use traits::{
ManagedNodeController, ManagedNodeDataProvider, ManagedNodeProvider, NodeSubscriber,
ReceiptProvider,
};

mod client;
pub use client::{Client, ClientConfig, ManagedNodeClient};
Expand Down
33 changes: 17 additions & 16 deletions crates/supervisor/core/src/syncnode/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
use tracing::{error, info, warn};

use super::{
ManagedNodeApiProvider, ManagedNodeClient, ManagedNodeError, NodeSubscriber, ReceiptProvider,
SubscriptionError, resetter::Resetter, task::ManagedEventTask,
ManagedNodeClient, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError,
NodeSubscriber, ReceiptProvider, SubscriptionError, resetter::Resetter, task::ManagedEventTask,
};
use crate::event::ChainEvent;

Expand All @@ -29,8 +29,6 @@
pub struct ManagedNode<DB, C> {
/// The attached web socket client
client: Arc<C>,
/// The database provider for fetching information
db_provider: Arc<DB>,
/// Shared L1 provider for fetching receipts
l1_provider: RootProvider<Ethereum>,
/// Resetter for handling node resets
Expand All @@ -53,16 +51,9 @@
cancel_token: CancellationToken,
l1_provider: RootProvider<Ethereum>,
) -> Self {
let resetter = Arc::new(Resetter::new(client.clone(), db_provider.clone()));

Self {
client,
db_provider,
resetter,
cancel_token,
task_handle: Mutex::new(None),
l1_provider,
}
let resetter = Arc::new(Resetter::new(client.clone(), db_provider));

Self { client, resetter, cancel_token, task_handle: Mutex::new(None), l1_provider }

Check warning on line 56 in crates/supervisor/core/src/syncnode/node.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/syncnode/node.rs#L54-L56

Added lines #L54 - L56 were not covered by tests
}

/// Returns the [`ChainId`] of the [`ManagedNode`].
Expand Down Expand Up @@ -106,7 +97,6 @@
let task = ManagedEventTask::new(
self.client.clone(),
self.l1_provider.clone(),
self.db_provider.clone(),
self.resetter.clone(),
event_tx,
);
Expand Down Expand Up @@ -178,7 +168,7 @@
}

#[async_trait]
impl<DB, C> ManagedNodeApiProvider for ManagedNode<DB, C>
impl<DB, C> ManagedNodeDataProvider for ManagedNode<DB, C>
where
DB: LogStorageReader + DerivationStorageReader + HeadRefStorageReader + Send + Sync + 'static,
C: ManagedNodeClient + Send + Sync + 'static,
Expand All @@ -200,7 +190,14 @@
) -> Result<BlockInfo, ManagedNodeError> {
self.client.l2_block_ref_by_timestamp(timestamp).await
}
}

#[async_trait]
impl<DB, C> ManagedNodeController for ManagedNode<DB, C>
where
DB: LogStorageReader + DerivationStorageReader + HeadRefStorageReader + Send + Sync + 'static,
C: ManagedNodeClient + Send + Sync + 'static,
{
async fn update_finalized(
&self,
finalized_block_id: BlockNumHash,
Expand All @@ -222,4 +219,8 @@
) -> Result<(), ManagedNodeError> {
self.client.update_cross_safe(source_block_id, derived_block_id).await
}

async fn reset(&self) -> Result<(), ManagedNodeError> {
self.resetter.reset().await
}

Check warning on line 225 in crates/supervisor/core/src/syncnode/node.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/syncnode/node.rs#L223-L225

Added lines #L223 - L225 were not covered by tests
}
Loading
Loading