From 87e431fa9137dfc02fc429136402125ef9337086 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 23 Jul 2025 21:32:31 +0530 Subject: [PATCH 1/2] feat(supervisor/core): added invalidated block on managed node --- .../core/src/chain_processor/chain.rs | 132 +++++++----------- .../core/src/chain_processor/task.rs | 4 +- crates/supervisor/core/src/syncnode/client.rs | 20 ++- crates/supervisor/core/src/syncnode/node.rs | 7 +- .../supervisor/core/src/syncnode/resetter.rs | 3 +- crates/supervisor/core/src/syncnode/task.rs | 3 +- crates/supervisor/core/src/syncnode/traits.rs | 14 +- 7 files changed, 96 insertions(+), 87 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index a94f9e7a5c..fd5feb7d4c 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -142,99 +142,70 @@ mod tests { use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, }; - use kona_supervisor_types::{Log, OutputV0, Receipts}; + use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; - use std::{ - sync::atomic::{AtomicBool, Ordering}, - time::Duration, - }; + use std::time::Duration; use tokio::time::sleep; - #[derive(Debug)] - struct MockNode { - subscribed: Arc, - } - - impl MockNode { - fn new() -> Self { - Self { subscribed: Arc::new(AtomicBool::new(false)) } - } - } + mock!( + #[derive(Debug)] + pub Node {} - #[async_trait] - impl NodeSubscriber for MockNode { - async fn start_subscription( - &self, - _tx: mpsc::Sender, - ) -> Result<(), ManagedNodeError> { - self.subscribed.store(true, Ordering::SeqCst); - Ok(()) + #[async_trait] + impl NodeSubscriber for Node { + async fn start_subscription( + &self, + _event_tx: mpsc::Sender, + ) -> Result<(), ManagedNodeError>; } - } - #[async_trait] - impl BlockProvider for MockNode { - async fn fetch_receipts(&self, _block_hash: B256) -> Result { - Ok(vec![]) // dummy + #[async_trait] + impl BlockProvider for Node { + async fn fetch_receipts(&self, _block_hash: B256) -> Result; + async fn block_by_number(&self, _number: u64) -> Result; } - async fn block_by_number(&self, _number: u64) -> Result { - Ok(BlockInfo::default()) - } - } + #[async_trait] + impl ManagedNodeDataProvider for Node { + async fn output_v0_at_timestamp( + &self, + _timestamp: u64, + ) -> Result; - #[async_trait] - impl ManagedNodeDataProvider for MockNode { - async fn output_v0_at_timestamp( - &self, - _timestamp: u64, - ) -> Result { - Ok(OutputV0::default()) - } + async fn pending_output_v0_at_timestamp( + &self, + _timestamp: u64, + ) -> Result; - async fn pending_output_v0_at_timestamp( - &self, - _timestamp: u64, - ) -> Result { - Ok(OutputV0::default()) + async fn l2_block_ref_by_timestamp( + &self, + _timestamp: u64, + ) -> Result; } - async fn l2_block_ref_by_timestamp( - &self, - _timestamp: u64, - ) -> Result { - Ok(BlockInfo::default()) - } - } + #[async_trait] + impl ManagedNodeController for Node { + async fn update_finalized( + &self, + _finalized_block_id: BlockNumHash, + ) -> Result<(), ManagedNodeError>; - #[async_trait] - impl ManagedNodeController for MockNode { - async fn update_finalized( - &self, - _finalized_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { - Ok(()) - } + async fn update_cross_unsafe( + &self, + cross_unsafe_block_id: BlockNumHash, + ) -> Result<(), ManagedNodeError>; - async fn update_cross_unsafe( - &self, - _cross_unsafe_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { - Ok(()) - } + async fn update_cross_safe( + &self, + source_block_id: BlockNumHash, + derived_block_id: BlockNumHash, + ) -> Result<(), ManagedNodeError>; - async fn update_cross_safe( - &self, - _source_block_id: BlockNumHash, - _derived_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { - Ok(()) - } + async fn reset(&self) -> Result<(), ManagedNodeError>; - async fn reset(&self) -> Result<(), ManagedNodeError> { - Ok(()) + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>; } - } + ); mock!( #[derive(Debug)] @@ -297,7 +268,9 @@ mod tests { #[tokio::test] async fn test_chain_processor_start_sets_task_and_calls_subscription() { - let mock_node = Arc::new(MockNode::new()); + let mut mock_node = MockNode::new(); + mock_node.expect_start_subscription().returning(|_| Ok(())); + let storage = Arc::new(MockDb::new()); let cancel_token = CancellationToken::new(); @@ -305,7 +278,7 @@ mod tests { let mut processor = ChainProcessor::new( rollup_config, 1, - Arc::clone(&mock_node), + Arc::new(mock_node), Arc::clone(&storage), cancel_token, ); @@ -315,9 +288,6 @@ mod tests { // Wait a moment for task to spawn and subscription to run sleep(Duration::from_millis(50)).await; - // Ensure start_subscription was called - assert!(mock_node.subscribed.load(Ordering::SeqCst)); - let handle_guard = processor.task_handle.lock().await; assert!(handle_guard.is_some()); } diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index e8eb69f0a4..902484f9a0 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -457,7 +457,7 @@ mod tests { use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, }; - use kona_supervisor_types::{Log, OutputV0, Receipts}; + use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; use std::time::Duration; use tokio::sync::mpsc; @@ -517,6 +517,8 @@ mod tests { ) -> Result<(), ManagedNodeError>; async fn reset(&self) -> Result<(), ManagedNodeError>; + + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>; } ); diff --git a/crates/supervisor/core/src/syncnode/client.rs b/crates/supervisor/core/src/syncnode/client.rs index 9fc0b731f7..d932f1f3ba 100644 --- a/crates/supervisor/core/src/syncnode/client.rs +++ b/crates/supervisor/core/src/syncnode/client.rs @@ -9,7 +9,7 @@ use jsonrpsee::{ }; use kona_supervisor_metrics::observe_metrics_for_result_async; use kona_supervisor_rpc::{BlockInfo, ManagedModeApiClient, jsonrpsee::SubscriptionTopic}; -use kona_supervisor_types::{OutputV0, Receipts, SubscriptionEvent}; +use kona_supervisor_types::{BlockSeal, OutputV0, Receipts, SubscriptionEvent}; use std::{ fmt::Debug, sync::{Arc, OnceLock}, @@ -55,6 +55,9 @@ pub trait ManagedNodeClient: Debug { finalised_id: BlockNumHash, ) -> Result<(), ClientError>; + /// Invalidates a block in the managed node. + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>; + /// Provides L1 [`BlockInfo`] to the managed node. async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; @@ -350,6 +353,21 @@ impl ManagedNodeClient for Client { Ok(()) } + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError> { + let client = self.get_ws_client().await?; + observe_metrics_for_result_async!( + Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, + Metrics::MANAGED_NODE_RPC_REQUESTS_ERROR_TOTAL, + Metrics::MANAGED_NODE_RPC_REQUEST_DURATION_SECONDS, + "invalidate_block", + async { + ManagedModeApiClient::invalidate_block(client.as_ref(), seal).await + }, + "node" => self.config.url.clone() + )?; + Ok(()) + } + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( diff --git a/crates/supervisor/core/src/syncnode/node.rs b/crates/supervisor/core/src/syncnode/node.rs index 841da84daf..e465abd505 100644 --- a/crates/supervisor/core/src/syncnode/node.rs +++ b/crates/supervisor/core/src/syncnode/node.rs @@ -7,7 +7,7 @@ use alloy_rpc_types_eth::BlockNumHash; use async_trait::async_trait; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; -use kona_supervisor_types::{OutputV0, Receipts}; +use kona_supervisor_types::{BlockSeal, OutputV0, Receipts}; use std::sync::Arc; use tokio::{ sync::{Mutex, mpsc}, @@ -192,4 +192,9 @@ where self.resetter.reset().await?; Ok(()) } + + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError> { + self.client.invalidate_block(seal).await?; + Ok(()) + } } diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 213b4979f1..544ebf3663 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -160,7 +160,7 @@ mod tests { use kona_interop::{DerivedRefPair, SafetyLevel}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, StorageError}; - use kona_supervisor_types::{OutputV0, Receipts, SubscriptionEvent, SuperHead}; + use kona_supervisor_types::{BlockSeal, OutputV0, Receipts, SubscriptionEvent, SuperHead}; use mockall::{mock, predicate}; // Mock for HeadRefStorageReader @@ -195,6 +195,7 @@ mod tests { async fn block_ref_by_number(&self, block_number: u64) -> Result; async fn reset_pre_interop(&self) -> Result<(), ClientError>; async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>; async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 2439a53283..29b006b5e4 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -257,7 +257,7 @@ mod tests { use kona_interop::{BlockReplacement, DerivedRefPair, SafetyLevel}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, LogStorageReader, StorageError}; - use kona_supervisor_types::{Log, OutputV0, Receipts, SubscriptionEvent, SuperHead}; + use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts, SubscriptionEvent, SuperHead}; use mockall::mock; mock! { @@ -297,6 +297,7 @@ mod tests { async fn block_ref_by_number(&self, block_number: u64) -> Result; async fn reset_pre_interop(&self) -> Result<(), ClientError>; async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>; async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; diff --git a/crates/supervisor/core/src/syncnode/traits.rs b/crates/supervisor/core/src/syncnode/traits.rs index a4f57dc474..6841b13293 100644 --- a/crates/supervisor/core/src/syncnode/traits.rs +++ b/crates/supervisor/core/src/syncnode/traits.rs @@ -4,7 +4,7 @@ use alloy_eips::BlockNumHash; use alloy_primitives::B256; use async_trait::async_trait; use kona_protocol::BlockInfo; -use kona_supervisor_types::{OutputV0, Receipts}; +use kona_supervisor_types::{BlockSeal, OutputV0, Receipts}; use std::fmt::Debug; use tokio::sync::mpsc; @@ -138,6 +138,18 @@ pub trait ManagedNodeController: Send + Sync + Debug { /// * `Ok(())` on success /// * `Err(ManagedNodeError)` if the reset fails async fn reset(&self) -> Result<(), ManagedNodeError>; + + /// Instructs the managed node to invalidate a block. + /// This is used when the supervisor detects an invalid block + /// and needs to roll back the node's state. + /// + /// # Arguments + /// * `seal` - The [`BlockSeal`] of the block. + /// + /// # Returns + /// * `Ok(())` on success + /// * `Err(ManagedNodeError)` if the invalidation fails + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>; } /// Composite trait for any node that provides: From a6a37ce04df113c3a9a3de4a1dc1ab511627e687 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 24 Jul 2025 12:17:36 +0530 Subject: [PATCH 2/2] added missing metrics --- crates/supervisor/core/src/syncnode/metrics.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/supervisor/core/src/syncnode/metrics.rs b/crates/supervisor/core/src/syncnode/metrics.rs index 34df2490de..fca829fff3 100644 --- a/crates/supervisor/core/src/syncnode/metrics.rs +++ b/crates/supervisor/core/src/syncnode/metrics.rs @@ -18,12 +18,21 @@ impl Metrics { // --- RPC Method Names (for zeroing) --- /// Lists all managed mode RPC methods here to ensure they are pre-registered. - const RPC_METHODS: [&'static str; 5] = [ + const RPC_METHODS: [&'static str; 14] = [ + "chain_id", + "subscribe_events", "fetch_receipts", "output_v0_at_timestamp", "pending_output_v0_at_timestamp", "l2_block_ref_by_timestamp", + "block_ref_by_number", + "reset_pre_interop", + "reset", + "invalidate_block", "provide_l1", + "update_finalized", + "update_cross_unsafe", + "update_cross_safe", ]; /// Initializes metrics for the Supervisor RPC service.