Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 51 additions & 81 deletions crates/supervisor/core/src/chain_processor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,99 +142,70 @@ mod tests {
use kona_supervisor_storage::{
DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError,
};
use kona_supervisor_types::{Log, OutputV0, Receipts};
use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts};
use mockall::mock;
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use std::time::Duration;
use tokio::time::sleep;

#[derive(Debug)]
struct MockNode {
subscribed: Arc<AtomicBool>,
}

impl MockNode {
fn new() -> Self {
Self { subscribed: Arc::new(AtomicBool::new(false)) }
}
}
mock!(
#[derive(Debug)]
pub Node {}

#[async_trait]
impl NodeSubscriber for MockNode {
async fn start_subscription(
&self,
_tx: mpsc::Sender<ChainEvent>,
) -> Result<(), ManagedNodeError> {
self.subscribed.store(true, Ordering::SeqCst);
Ok(())
#[async_trait]
impl NodeSubscriber for Node {
async fn start_subscription(
&self,
_event_tx: mpsc::Sender<ChainEvent>,
) -> Result<(), ManagedNodeError>;
}
}

#[async_trait]
impl BlockProvider for MockNode {
async fn fetch_receipts(&self, _block_hash: B256) -> Result<Receipts, ManagedNodeError> {
Ok(vec![]) // dummy
#[async_trait]
impl BlockProvider for Node {
async fn fetch_receipts(&self, _block_hash: B256) -> Result<Receipts, ManagedNodeError>;
async fn block_by_number(&self, _number: u64) -> Result<BlockInfo, ManagedNodeError>;
}

async fn block_by_number(&self, _number: u64) -> Result<BlockInfo, ManagedNodeError> {
Ok(BlockInfo::default())
}
}
#[async_trait]
impl ManagedNodeDataProvider for Node {
async fn output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError>;

#[async_trait]
impl ManagedNodeDataProvider for MockNode {
async fn output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError> {
Ok(OutputV0::default())
}
async fn pending_output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError>;

async fn pending_output_v0_at_timestamp(
&self,
_timestamp: u64,
) -> Result<OutputV0, ManagedNodeError> {
Ok(OutputV0::default())
async fn l2_block_ref_by_timestamp(
&self,
_timestamp: u64,
) -> Result<BlockInfo, ManagedNodeError>;
}

async fn l2_block_ref_by_timestamp(
&self,
_timestamp: u64,
) -> Result<BlockInfo, ManagedNodeError> {
Ok(BlockInfo::default())
}
}
#[async_trait]
impl ManagedNodeController for Node {
async fn update_finalized(
&self,
_finalized_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

#[async_trait]
impl ManagedNodeController for MockNode {
async fn update_finalized(
&self,
_finalized_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError> {
Ok(())
}
async fn update_cross_unsafe(
&self,
cross_unsafe_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn update_cross_unsafe(
&self,
_cross_unsafe_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError> {
Ok(())
}
async fn update_cross_safe(
&self,
source_block_id: BlockNumHash,
derived_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError>;

async fn update_cross_safe(
&self,
_source_block_id: BlockNumHash,
_derived_block_id: BlockNumHash,
) -> Result<(), ManagedNodeError> {
Ok(())
}
async fn reset(&self) -> Result<(), ManagedNodeError>;

async fn reset(&self) -> Result<(), ManagedNodeError> {
Ok(())
async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>;
}
}
);

mock!(
#[derive(Debug)]
Expand Down Expand Up @@ -297,15 +268,17 @@ mod tests {

#[tokio::test]
async fn test_chain_processor_start_sets_task_and_calls_subscription() {
let mock_node = Arc::new(MockNode::new());
let mut mock_node = MockNode::new();
mock_node.expect_start_subscription().returning(|_| Ok(()));

let storage = Arc::new(MockDb::new());
let cancel_token = CancellationToken::new();

let rollup_config = RollupConfig::default();
let mut processor = ChainProcessor::new(
rollup_config,
1,
Arc::clone(&mock_node),
Arc::new(mock_node),
Arc::clone(&storage),
cancel_token,
);
Expand All @@ -315,9 +288,6 @@ mod tests {
// Wait a moment for task to spawn and subscription to run
sleep(Duration::from_millis(50)).await;

// Ensure start_subscription was called
assert!(mock_node.subscribed.load(Ordering::SeqCst));

let handle_guard = processor.task_handle.lock().await;
assert!(handle_guard.is_some());
}
Expand Down
4 changes: 3 additions & 1 deletion crates/supervisor/core/src/chain_processor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ mod tests {
use kona_supervisor_storage::{
DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError,
};
use kona_supervisor_types::{Log, OutputV0, Receipts};
use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts};
use mockall::mock;
use std::time::Duration;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -517,6 +517,8 @@ mod tests {
) -> Result<(), ManagedNodeError>;

async fn reset(&self) -> Result<(), ManagedNodeError>;

async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>;
}
);

Expand Down
20 changes: 19 additions & 1 deletion crates/supervisor/core/src/syncnode/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use jsonrpsee::{
};
use kona_supervisor_metrics::observe_metrics_for_result_async;
use kona_supervisor_rpc::{BlockInfo, ManagedModeApiClient, jsonrpsee::SubscriptionTopic};
use kona_supervisor_types::{OutputV0, Receipts, SubscriptionEvent};
use kona_supervisor_types::{BlockSeal, OutputV0, Receipts, SubscriptionEvent};
use std::{
fmt::Debug,
sync::{Arc, OnceLock},
Expand Down Expand Up @@ -55,6 +55,9 @@ pub trait ManagedNodeClient: Debug {
finalised_id: BlockNumHash,
) -> Result<(), ClientError>;

/// Invalidates a block in the managed node.
async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>;

/// Provides L1 [`BlockInfo`] to the managed node.
async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>;

Expand Down Expand Up @@ -350,6 +353,21 @@ impl ManagedNodeClient for Client {
Ok(())
}

async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError> {
let client = self.get_ws_client().await?;
observe_metrics_for_result_async!(
Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL,
Metrics::MANAGED_NODE_RPC_REQUESTS_ERROR_TOTAL,
Metrics::MANAGED_NODE_RPC_REQUEST_DURATION_SECONDS,
"invalidate_block",
async {
ManagedModeApiClient::invalidate_block(client.as_ref(), seal).await
Comment thread
dhyaniarun1993 marked this conversation as resolved.
},
"node" => self.config.url.clone()
)?;
Ok(())
}

async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError> {
let client = self.get_ws_client().await?;
observe_metrics_for_result_async!(
Expand Down
11 changes: 10 additions & 1 deletion crates/supervisor/core/src/syncnode/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@ impl Metrics {

// --- RPC Method Names (for zeroing) ---
/// Lists all managed mode RPC methods here to ensure they are pre-registered.
const RPC_METHODS: [&'static str; 5] = [
const RPC_METHODS: [&'static str; 14] = [
"chain_id",
"subscribe_events",
"fetch_receipts",
"output_v0_at_timestamp",
"pending_output_v0_at_timestamp",
"l2_block_ref_by_timestamp",
"block_ref_by_number",
"reset_pre_interop",
"reset",
"invalidate_block",
"provide_l1",
"update_finalized",
"update_cross_unsafe",
"update_cross_safe",
];

/// Initializes metrics for the Supervisor RPC service.
Expand Down
7 changes: 6 additions & 1 deletion crates/supervisor/core/src/syncnode/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy_rpc_types_eth::BlockNumHash;
use async_trait::async_trait;
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader};
use kona_supervisor_types::{OutputV0, Receipts};
use kona_supervisor_types::{BlockSeal, OutputV0, Receipts};
use std::sync::Arc;
use tokio::{
sync::{Mutex, mpsc},
Expand Down Expand Up @@ -192,4 +192,9 @@ where
self.resetter.reset().await?;
Ok(())
}

async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError> {
self.client.invalidate_block(seal).await?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/supervisor/core/src/syncnode/resetter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ mod tests {
use kona_interop::{DerivedRefPair, SafetyLevel};
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, StorageError};
use kona_supervisor_types::{OutputV0, Receipts, SubscriptionEvent, SuperHead};
use kona_supervisor_types::{BlockSeal, OutputV0, Receipts, SubscriptionEvent, SuperHead};
use mockall::{mock, predicate};

// Mock for HeadRefStorageReader
Expand Down Expand Up @@ -195,6 +195,7 @@ mod tests {
async fn block_ref_by_number(&self, block_number: u64) -> Result<BlockInfo, ClientError>;
async fn reset_pre_interop(&self) -> Result<(), ClientError>;
async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>;
async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>;
async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>;
async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>;
async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>;
Expand Down
3 changes: 2 additions & 1 deletion crates/supervisor/core/src/syncnode/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ mod tests {
use kona_interop::{BlockReplacement, DerivedRefPair, SafetyLevel};
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{DerivationStorageReader, LogStorageReader, StorageError};
use kona_supervisor_types::{Log, OutputV0, Receipts, SubscriptionEvent, SuperHead};
use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts, SubscriptionEvent, SuperHead};
use mockall::mock;

mock! {
Expand Down Expand Up @@ -297,6 +297,7 @@ mod tests {
async fn block_ref_by_number(&self, block_number: u64) -> Result<BlockInfo, ClientError>;
async fn reset_pre_interop(&self) -> Result<(), ClientError>;
async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>;
async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>;
async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>;
async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>;
async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>;
Expand Down
14 changes: 13 additions & 1 deletion crates/supervisor/core/src/syncnode/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use async_trait::async_trait;
use kona_protocol::BlockInfo;
use kona_supervisor_types::{OutputV0, Receipts};
use kona_supervisor_types::{BlockSeal, OutputV0, Receipts};
use std::fmt::Debug;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -138,6 +138,18 @@ pub trait ManagedNodeController: Send + Sync + Debug {
/// * `Ok(())` on success
/// * `Err(ManagedNodeError)` if the reset fails
async fn reset(&self) -> Result<(), ManagedNodeError>;

/// Instructs the managed node to invalidate a block.
/// This is used when the supervisor detects an invalid block
/// and needs to roll back the node's state.
///
/// # Arguments
/// * `seal` - The [`BlockSeal`] of the block.
///
/// # Returns
/// * `Ok(())` on success
/// * `Err(ManagedNodeError)` if the invalidation fails
async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>;
}

/// Composite trait for any node that provides:
Expand Down