From 7a97cdd2b9861242a42e1250e8aa238aab107a8e Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Tue, 8 Jul 2025 18:40:05 +0600 Subject: [PATCH 1/8] safety head ref init fixed --- crates/supervisor/storage/src/chaindb.rs | 8 +------- .../storage/src/providers/head_ref_provider.rs | 14 +++++++++++++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/supervisor/storage/src/chaindb.rs b/crates/supervisor/storage/src/chaindb.rs index 4f3a52956b..d0255fdc38 100644 --- a/crates/supervisor/storage/src/chaindb.rs +++ b/crates/supervisor/storage/src/chaindb.rs @@ -73,13 +73,7 @@ impl ChainDb { self.env.update(|tx| { DerivationProvider::new(tx).initialise(anchor)?; LogProvider::new(tx).initialise(anchor.derived)?; - - let sp = SafetyHeadRefProvider::new(tx); - // todo: cross check if we can consider following safety head ref update - sp.update_safety_head_ref(SafetyLevel::LocalUnsafe, &anchor.derived)?; - sp.update_safety_head_ref(SafetyLevel::CrossUnsafe, &anchor.derived)?; - sp.update_safety_head_ref(SafetyLevel::LocalSafe, &anchor.derived)?; - sp.update_safety_head_ref(SafetyLevel::CrossSafe, &anchor.derived) + SafetyHeadRefProvider::new(tx).initialise(anchor.derived) })? } } diff --git a/crates/supervisor/storage/src/providers/head_ref_provider.rs b/crates/supervisor/storage/src/providers/head_ref_provider.rs index bc5031258e..2a7f2a7896 100644 --- a/crates/supervisor/storage/src/providers/head_ref_provider.rs +++ b/crates/supervisor/storage/src/providers/head_ref_provider.rs @@ -43,8 +43,20 @@ where impl SafetyHeadRefProvider<'_, Tx> where - Tx: DbTxMut, + Tx: DbTxMut + DbTx, { + pub(crate) fn initialise(&self, anchor: BlockInfo) -> Result<(), StorageError> { + match self.get_safety_head_ref(SafetyLevel::LocalUnsafe) { + Ok(_) => Ok(()), // if it is set already, skip. + Err(StorageError::EntryNotFound(_)) => { + self.update_safety_head_ref(SafetyLevel::LocalUnsafe, &anchor)?; + self.update_safety_head_ref(SafetyLevel::CrossUnsafe, &anchor)?; + self.update_safety_head_ref(SafetyLevel::LocalSafe, &anchor)?; + self.update_safety_head_ref(SafetyLevel::CrossSafe, &anchor) + } + Err(err) => Err(err), + } + } pub(crate) fn update_safety_head_ref( &self, safety_level: SafetyLevel, From 85d9b7d9cd7e7741b31dd03f38e0d7d4aa402f31 Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Wed, 9 Jul 2025 17:22:38 +0600 Subject: [PATCH 2/8] added task retirier to subscription start --- Cargo.lock | 3 + .../core/src/chain_processor/chain.rs | 14 +++- crates/supervisor/core/src/syncnode/client.rs | 9 +++ crates/supervisor/core/src/syncnode/node.rs | 36 +++++++--- .../supervisor/core/src/syncnode/resetter.rs | 1 + crates/supervisor/core/src/syncnode/task.rs | 1 + crates/supervisor/types/Cargo.toml | 3 + crates/supervisor/types/src/lib.rs | 3 + crates/supervisor/types/src/utils.rs | 67 +++++++++++++++++++ tests/supervisor/interop_sync_test.go | 56 ++++++++++++++++ 10 files changed, 181 insertions(+), 12 deletions(-) create mode 100644 crates/supervisor/types/src/utils.rs create mode 100644 tests/supervisor/interop_sync_test.go diff --git a/Cargo.lock b/Cargo.lock index 65bfaa74f1..a0cd084edc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5148,6 +5148,9 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.12", + "tokio", + "tokio-util", + "tracing", ] [[package]] diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index ed2f763977..4e470e0dd8 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -2,6 +2,7 @@ use super::{ChainProcessorError, ChainProcessorTask}; use crate::{event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_supervisor_storage::{DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter}; +use kona_supervisor_types::spawn_task_with_retry; use std::sync::Arc; use tokio::{ sync::{Mutex, mpsc}, @@ -90,7 +91,18 @@ where // todo: figure out value for buffer size let (event_tx, event_rx) = mpsc::channel::(100); self.event_tx = Some(event_tx.clone()); - self.managed_node.start_subscription(event_tx.clone()).await?; + + let node = self.managed_node.clone(); + + spawn_task_with_retry( + move || { + let node = node.clone(); + let tx = event_tx.clone(); + async move { node.start_subscription(tx).await } + }, + self.cancel_token.clone(), + usize::MAX, + ); let mut task = ChainProcessorTask::new( self.chain_id, diff --git a/crates/supervisor/core/src/syncnode/client.rs b/crates/supervisor/core/src/syncnode/client.rs index b922ff4036..5c5f3afea7 100644 --- a/crates/supervisor/core/src/syncnode/client.rs +++ b/crates/supervisor/core/src/syncnode/client.rs @@ -78,6 +78,9 @@ pub trait ManagedNodeClient: Debug { source_block_id: BlockNumHash, derived_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError>; + + /// Resets the ws-client to None when server disconnects + async fn reset_ws_client(&self); } /// [`ClientConfig`] sets the configuration for the managed node client. @@ -182,6 +185,12 @@ impl Client { #[async_trait] impl ManagedNodeClient for Client { + async fn reset_ws_client(&self) { + let mut ws_client_guard = self.ws_client.lock().await; + if ws_client_guard.is_some() { + *ws_client_guard = None; + }; + } async fn chain_id(&self) -> Result { if let Some(chain_id) = self.chain_id.get() { return Ok(*chain_id); diff --git a/crates/supervisor/core/src/syncnode/node.rs b/crates/supervisor/core/src/syncnode/node.rs index 1a8ac0b54a..e202e8ff19 100644 --- a/crates/supervisor/core/src/syncnode/node.rs +++ b/crates/supervisor/core/src/syncnode/node.rs @@ -9,10 +9,7 @@ use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; use kona_supervisor_types::{OutputV0, Receipts}; use std::sync::Arc; -use tokio::{ - sync::{Mutex, mpsc}, - task::JoinHandle, -}; +use tokio::sync::{Mutex, mpsc}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -36,7 +33,7 @@ pub struct ManagedNode { /// Cancellation token to stop the processor cancel_token: CancellationToken, /// Handle to the async subscription task - task_handle: Mutex>>, + task_handle: Mutex, } impl ManagedNode @@ -53,7 +50,7 @@ where ) -> Self { let resetter = Arc::new(Resetter::new(client.clone(), db_provider)); - Self { client, resetter, cancel_token, task_handle: Mutex::new(None), l1_provider } + Self { client, resetter, cancel_token, task_handle: Mutex::new(false), l1_provider } } /// Returns the [`ChainId`] of the [`ManagedNode`]. @@ -78,9 +75,13 @@ where &self, event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError> { - let mut task_handle_guard = self.task_handle.lock().await; - if task_handle_guard.is_some() { - Err(SubscriptionError::AlreadyActive)? + let mut running = self.task_handle.lock().await; + if *running { + error!( + target: "managed_node", + "Failed to subscribe to events as it is running" + ); + return Err(SubscriptionError::AlreadyActive)?; } let mut subscription = self.client.subscribe_events().await.inspect_err(|err| { @@ -100,6 +101,12 @@ where self.resetter.clone(), event_tx, ); + + *running = true; // mark as running + drop(running); // release the lock early + + let client = Arc::clone(&self.client); + // Start background task to handle events let handle = tokio::spawn(async move { info!(target: "managed_node", "Subscription task started"); @@ -128,6 +135,10 @@ where None => { // Subscription closed by the server warn!(target: "managed_node", "Subscription closed by server"); + + // This can happen if the underlying ws-client got disconnected. + // We need to set the client to None so that it can be initiated again. + client.reset_ws_client().await; break; } } @@ -147,9 +158,12 @@ where info!(target: "managed_node", "Subscription task finished"); }); - *task_handle_guard = Some(handle); + let _ = handle.await; + + // Task done + let mut running = self.task_handle.lock().await; + *running = false; - info!(target: "managed_node", "Subscription started successfully"); Ok(()) } } diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index fa571b8613..407e9eb9fc 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -119,6 +119,7 @@ mod tests { async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; + async fn reset_ws_client(&self); } } diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 9dbf7ca495..ecfee206bd 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -241,6 +241,7 @@ mod tests { async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; + async fn reset_ws_client(&self); } } diff --git a/crates/supervisor/types/Cargo.toml b/crates/supervisor/types/Cargo.toml index bbfbdddcbc..e00fba5fee 100644 --- a/crates/supervisor/types/Cargo.toml +++ b/crates/supervisor/types/Cargo.toml @@ -33,6 +33,9 @@ op-alloy-consensus.workspace = true serde.workspace = true derive_more = { workspace = true, default-features = false, features = ["constructor"] } thiserror = {workspace = true} +tokio = { workspace = true, features = ["sync", "macros", "time", "rt"] } +tokio-util.workspace = true +tracing.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/crates/supervisor/types/src/lib.rs b/crates/supervisor/types/src/lib.rs index d8a036df14..09dc2efcee 100644 --- a/crates/supervisor/types/src/lib.rs +++ b/crates/supervisor/types/src/lib.rs @@ -20,6 +20,9 @@ pub use access_list::{Access, AccessListError, parse_access_list}; mod chain_id; mod types; +mod utils; +pub use utils::spawn_task_with_retry; + pub use chain_id::HexChainId; pub use types::{BlockSeal, OutputV0, SubscriptionEvent}; diff --git a/crates/supervisor/types/src/utils.rs b/crates/supervisor/types/src/utils.rs new file mode 100644 index 0000000000..939cac67b5 --- /dev/null +++ b/crates/supervisor/types/src/utils.rs @@ -0,0 +1,67 @@ +use std::{future::Future, time::Duration}; +use tokio::{select, task::JoinHandle, time::sleep}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +/// Spawns a background task that retries the given async operation with backoff on failure. +/// +/// - `operation`: The async task to retry (must return `Result<(), E>`) +/// - `cancel_token`: Cancels the retry loop +/// - `max_retries`: Max retries before exiting (use `usize::MAX` for infinite) +pub fn spawn_task_with_retry( + operation: impl Fn() -> Fut + Send + Sync + 'static, + cancel_token: CancellationToken, + max_retries: usize, +) -> JoinHandle<()> +where + Fut: Future> + Send + 'static, + E: std::fmt::Display + Send + 'static, +{ + tokio::spawn(async move { + let mut attempt = 0; + + loop { + if cancel_token.is_cancelled() { + info!(target: "retrier", "Retry loop cancelled before starting"); + break; + } + + match operation().await { + Ok(()) => { + info!(target: "retrier", "Task exited successfully, restarting"); + attempt = 0; // Reset attempt count on success + } + Err(err) => { + attempt += 1; + + if attempt > max_retries { + error!(target: "retrier", %err, "Retry limit ({max_retries}) exceeded"); + break; + } + + let delay = backoff_delay(attempt); + warn!( + target: "retrier", + %err, + ?delay, + "Attempt {attempt}/{max_retries} failed, retrying after delay" + ); + + select! { + _ = sleep(delay) => {} + _ = cancel_token.cancelled() => { + warn!(target: "retrier", "Retry loop cancelled during backoff"); + break; + } + } + } + } + } + }) +} + +/// Calculates exponential backoff delay with a max cap (30s). +fn backoff_delay(attempt: usize) -> Duration { + let secs = 2u64.saturating_pow(attempt.min(5) as u32); + Duration::from_secs(secs.min(30)) +} diff --git a/tests/supervisor/interop_sync_test.go b/tests/supervisor/interop_sync_test.go new file mode 100644 index 0000000000..ab968f2d85 --- /dev/null +++ b/tests/supervisor/interop_sync_test.go @@ -0,0 +1,56 @@ +package supervisor + +import ( + "testing" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-devstack/presets" + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +// TestL2CLResync checks that unsafe head advances after restarting L2CL. +// Resync is only possible when supervisor and L2CL reconnects. +// Acceptance Test: https://github.com/ethereum-optimism/optimism/blob/develop/op-acceptance-tests/tests/interop/sync/simple_interop/interop_sync_test.go +func TestL2CLResync(gt *testing.T) { + t := devtest.SerialT(gt) + sys := presets.NewSimpleInterop(t) + logger := sys.Log.With("Test", "TestL2CLResync") + + logger.Info("Check unsafe chains are advancing") + dsl.CheckAll(t, + sys.L2ELA.AdvancedFn(eth.Unsafe, 5), + sys.L2ELB.AdvancedFn(eth.Unsafe, 5), + ) + + logger.Info("Stop L2CL nodes") + sys.L2CLA.Stop() + sys.L2CLB.Stop() + + logger.Info("Make sure L2ELs does not advance") + dsl.CheckAll(t, + sys.L2ELA.NotAdvancedFn(eth.Unsafe), + sys.L2ELB.NotAdvancedFn(eth.Unsafe), + ) + + logger.Info("Restart L2CL nodes") + sys.L2CLA.Start() + sys.L2CLB.Start() + + // L2CL may advance a few blocks without supervisor connection, but eventually it will stop without the connection + // we must check that unsafe head is advancing due to reconnection + logger.Info("Boot up L2CL nodes") + dsl.CheckAll(t, + sys.L2ELA.AdvancedFn(eth.Unsafe, 10), + sys.L2ELB.AdvancedFn(eth.Unsafe, 10), + ) + + // supervisor will attempt to reconnect with L2CLs at this point because L2CL ws endpoint is recovered + logger.Info("Check unsafe chains are advancing again") + dsl.CheckAll(t, + sys.L2ELA.AdvancedFn(eth.Unsafe, 10), + sys.L2ELB.AdvancedFn(eth.Unsafe, 10), + ) + + // supervisor successfully connected with managed L2CLs +} From b06ac4152049d4750f665dc1b53e444ac229f517 Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Wed, 9 Jul 2025 18:26:42 +0600 Subject: [PATCH 3/8] increase test timeout --- tests/justfile | 2 +- tests/supervisor/interop_sync_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/justfile b/tests/justfile index 06ffaa19c7..6bf0920639 100644 --- a/tests/justfile +++ b/tests/justfile @@ -57,7 +57,7 @@ test-e2e DEVNET GO_PKG_NAME="" DEVNET_CUSTOM_PATH="" : export DISABLE_OP_E2E_LEGACY=true export DEVSTACK_ORCHESTRATOR=sysext - cd {{SOURCE}} && go test -v ./$GO_PKG_NAME + cd {{SOURCE}} && go test -timeout 20m -v ./$GO_PKG_NAME build-deploy-devnet DEVNET BINARY OP_PACKAGE_PATH="": (build-devnet BINARY) (devnet DEVNET OP_PACKAGE_PATH) diff --git a/tests/supervisor/interop_sync_test.go b/tests/supervisor/interop_sync_test.go index 1011da2cf7..ba866722c1 100644 --- a/tests/supervisor/interop_sync_test.go +++ b/tests/supervisor/interop_sync_test.go @@ -56,7 +56,7 @@ func TestL2CLResync(gt *testing.T) { // supervisor successfully connected with managed L2CLs } -// TestSupervisorResync checks that unsafe head advances after restarting the Supervisor. +// TestSupervisorResync checks that heads advances after restarting the Supervisor. func TestSupervisorResync(gt *testing.T) { t := devtest.SerialT(gt) sys := presets.NewSimpleInterop(t) From 3537c17910c16e7bc8556b0e17f3669e70f5d135 Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Thu, 10 Jul 2025 00:57:57 +0600 Subject: [PATCH 4/8] moved retry logic to syncednode --- .../core/src/chain_processor/chain.rs | 16 +- .../core/src/chain_processor/task.rs | 2 +- crates/supervisor/core/src/syncnode/node.rs | 163 +++++++++--------- crates/supervisor/core/src/syncnode/traits.rs | 4 +- tests/supervisor/interop_sync_test.go | 9 +- 5 files changed, 91 insertions(+), 103 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index 0b371234ad..96606d5157 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -4,7 +4,6 @@ use alloy_primitives::ChainId; use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, }; -use kona_supervisor_types::spawn_task_with_retry; use std::sync::Arc; use tokio::{ sync::{Mutex, mpsc}, @@ -97,18 +96,7 @@ where // todo: figure out value for buffer size let (event_tx, event_rx) = mpsc::channel::(100); self.event_tx = Some(event_tx.clone()); - - let node = self.managed_node.clone(); - - spawn_task_with_retry( - move || { - let node = node.clone(); - let tx = event_tx.clone(); - async move { node.start_subscription(tx).await } - }, - self.cancel_token.clone(), - usize::MAX, - ); + self.managed_node.clone().start_subscription(event_tx.clone()).await?; let mut task = ChainProcessorTask::new( self.chain_id, @@ -170,7 +158,7 @@ mod tests { #[async_trait] impl NodeSubscriber for MockNode { async fn start_subscription( - &self, + self: Arc, _tx: mpsc::Sender, ) -> Result<(), ManagedNodeError> { self.subscribed.store(true, Ordering::SeqCst); diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index d4471cd7f1..7e6ddae7eb 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -432,7 +432,7 @@ mod tests { #[async_trait] impl NodeSubscriber for Node { async fn start_subscription( - &self, + self: Arc, _event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError>; } diff --git a/crates/supervisor/core/src/syncnode/node.rs b/crates/supervisor/core/src/syncnode/node.rs index 41d72becd5..8be2068119 100644 --- a/crates/supervisor/core/src/syncnode/node.rs +++ b/crates/supervisor/core/src/syncnode/node.rs @@ -7,9 +7,12 @@ use alloy_rpc_types_eth::BlockNumHash; use async_trait::async_trait; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; -use kona_supervisor_types::{OutputV0, Receipts}; +use kona_supervisor_types::{OutputV0, Receipts, spawn_task_with_retry}; use std::sync::Arc; -use tokio::sync::{Mutex, mpsc}; +use tokio::{ + sync::{Mutex, mpsc}, + task::JoinHandle, +}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -34,7 +37,7 @@ pub struct ManagedNode { /// Cancellation token to stop the processor cancel_token: CancellationToken, /// Handle to the async subscription task - task_handle: Mutex, + task_handle: Mutex>>, } impl ManagedNode @@ -51,7 +54,7 @@ where ) -> Self { let resetter = Arc::new(Resetter::new(client.clone(), db_provider)); - Self { client, resetter, cancel_token, task_handle: Mutex::new(false), l1_provider } + Self { client, resetter, cancel_token, task_handle: Mutex::new(None), l1_provider } } /// Returns the [`ChainId`] of the [`ManagedNode`]. @@ -73,97 +76,93 @@ where /// Establishes a WebSocket connection and subscribes to node events. /// Spawns a background task to process incoming events. async fn start_subscription( - &self, + self: Arc, event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError> { - let mut running = self.task_handle.lock().await; - if *running { - error!( - target: "managed_node", - "Failed to subscribe to events as it is running" - ); - return Err(SubscriptionError::AlreadyActive)?; + let chain_id = self.chain_id().await?; + let mut task_handle_guard = self.task_handle.lock().await; + if task_handle_guard.is_some() { + Err(SubscriptionError::AlreadyActive)? } - let mut subscription = self.client.subscribe_events().await.inspect_err(|err| { - error!( - target: "managed_node", - %err, - "Failed to subscribe to events" - ); - })?; - - let cancel_token = self.cancel_token.clone(); - - // Creates a task instance to sort and process the events from the subscription - let task = ManagedEventTask::new( + let task = Arc::new(ManagedEventTask::new( self.client.clone(), self.l1_provider.clone(), self.resetter.clone(), - event_tx, - ); - - *running = true; // mark as running - drop(running); // release the lock early - - let client = Arc::clone(&self.client); - - // Start background task to handle events - let handle = tokio::spawn(async move { - info!(target: "managed_node", "Subscription task started"); - loop { - tokio::select! { - // Listen for stop signal - _ = cancel_token.cancelled() => { - info!(target: "managed_node", "Cancellation token triggered, shutting down subscription"); - break; - } - - // Listen for events from subscription - incoming_event = subscription.next() => { - match incoming_event { - Some(Ok(subscription_event)) => { - task.handle_managed_event(subscription_event.data).await; - } - Some(Err(err)) => { - error!( - target: "managed_node", - %err, - "Error in event deserialization" - ); - // Continue processing next events despite this error - } - None => { - // Subscription closed by the server - warn!(target: "managed_node", "Subscription closed by server"); - - // This can happen if the underlying ws-client got disconnected. - // We need to set the client to None so that it can be initiated again. - client.reset_ws_client().await; - break; + event_tx.clone(), + )); + + let node = self.clone(); + + // spawn a task which will be retried in failures + let handle = spawn_task_with_retry( + move || { + let node = node.clone(); + let task = task.clone(); + async move { + let mut subscription = + node.client.subscribe_events().await.inspect_err(|err| { + error!( + target: "managed_node", + %chain_id, + %err, + "Failed to subscribe to events" + ); + })?; + + let handle = tokio::spawn(async move { + info!(target: "managed_node", %chain_id, "Subscription task started"); + loop { + tokio::select! { + _ = node.cancel_token.cancelled() => { + info!(target: "managed_node", %chain_id, "Cancellation token triggered, shutting down subscription"); + break; + } + incoming_event = subscription.next() => { + match incoming_event { + Some(Ok(subscription_event)) => { + task.handle_managed_event(subscription_event.data).await; + } + Some(Err(err)) => { + error!( + target: "managed_node", + %chain_id, + %err, + "Error in event deserialization" + ); + } + None => { + warn!(target: "managed_node",%chain_id, "Subscription closed by server"); + node.client.reset_ws_client().await; + break; + } + } + } } } - } - } - } - // Try to unsubscribe gracefully - if let Err(err) = subscription.unsubscribe().await { - warn!( - target: "managed_node", - %err, - "Failed to unsubscribe gracefully" - ); - } + if let Err(err) = subscription.unsubscribe().await { + warn!( + target: "managed_node", + %chain_id, + %err, + "Failed to unsubscribe gracefully" + ); + } + + info!(target: "managed_node", %chain_id, "Subscription task finished"); + }); - info!(target: "managed_node", "Subscription task finished"); - }); + let _ = handle.await; - let _ = handle.await; + Ok::<_, ManagedNodeError>(()) + } + }, + self.cancel_token.clone(), + usize::MAX, + ); - // Task done - let mut running = self.task_handle.lock().await; - *running = false; + *task_handle_guard = Some(handle); Ok(()) } diff --git a/crates/supervisor/core/src/syncnode/traits.rs b/crates/supervisor/core/src/syncnode/traits.rs index a4f57dc474..c607552097 100644 --- a/crates/supervisor/core/src/syncnode/traits.rs +++ b/crates/supervisor/core/src/syncnode/traits.rs @@ -5,7 +5,7 @@ use alloy_primitives::B256; use async_trait::async_trait; use kona_protocol::BlockInfo; use kona_supervisor_types::{OutputV0, Receipts}; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use tokio::sync::mpsc; /// Represents a node that can subscribe to L2 events from the chain. @@ -23,7 +23,7 @@ pub trait NodeSubscriber: Send + Sync { /// * `Ok(())` on successful subscription /// * `Err(ManagedNodeError)` if subscription setup fails async fn start_subscription( - &self, + self: Arc, event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError>; } diff --git a/tests/supervisor/interop_sync_test.go b/tests/supervisor/interop_sync_test.go index ba866722c1..bbdb31cbee 100644 --- a/tests/supervisor/interop_sync_test.go +++ b/tests/supervisor/interop_sync_test.go @@ -41,9 +41,10 @@ func TestL2CLResync(gt *testing.T) { // L2CL may advance a few blocks without supervisor connection, but eventually it will stop without the connection // we must check that unsafe head is advancing due to reconnection logger.Info("Boot up L2CL nodes") + dsl.CheckAll(t, - sys.L2ELA.AdvancedFn(eth.Unsafe, 10), - sys.L2ELB.AdvancedFn(eth.Unsafe, 10), + sys.L2ELA.AdvancedFn(eth.Unsafe, 30), + sys.L2ELB.AdvancedFn(eth.Unsafe, 30), ) // supervisor will attempt to reconnect with L2CLs at this point because L2CL ws endpoint is recovered @@ -65,8 +66,8 @@ func TestSupervisorResync(gt *testing.T) { logger.Info("Check unsafe chains are advancing") for _, level := range []types.SafetyLevel{types.LocalUnsafe, types.LocalSafe, types.CrossUnsafe, types.CrossSafe} { - sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainA.ChainID(), 2, level, 10) - sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainB.ChainID(), 2, level, 10) + sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainA.ChainID(), 2, level, 20) + sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainB.ChainID(), 2, level, 20) } logger.Info("Stop Supervisor node") From 2882ab77308f7a488b71fbbbf58ccce6fd9e9e7a Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Thu, 10 Jul 2025 00:58:58 +0600 Subject: [PATCH 5/8] import refactored --- tests/supervisor/interop_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisor/interop_sync_test.go b/tests/supervisor/interop_sync_test.go index bbdb31cbee..061d2cb22a 100644 --- a/tests/supervisor/interop_sync_test.go +++ b/tests/supervisor/interop_sync_test.go @@ -1,13 +1,13 @@ package supervisor import ( - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "testing" "github.com/ethereum-optimism/optimism/op-devstack/devtest" "github.com/ethereum-optimism/optimism/op-devstack/dsl" "github.com/ethereum-optimism/optimism/op-devstack/presets" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) // TestL2CLResync checks that unsafe head advances after restarting L2CL. From b9fa6de18c6a06f4a69b1bff7fd3c9c864907d01 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 10 Jul 2025 14:42:31 +0530 Subject: [PATCH 6/8] refactor --- .../core/src/chain_processor/chain.rs | 2 +- .../core/src/chain_processor/task.rs | 2 +- .../supervisor/core/src/logindexer/indexer.rs | 8 +- crates/supervisor/core/src/supervisor.rs | 5 +- crates/supervisor/core/src/syncnode/client.rs | 70 +++++------ crates/supervisor/core/src/syncnode/error.rs | 67 +++++----- crates/supervisor/core/src/syncnode/mod.rs | 5 +- crates/supervisor/core/src/syncnode/node.rs | 118 ++++++------------ .../supervisor/core/src/syncnode/resetter.rs | 34 ++--- crates/supervisor/core/src/syncnode/task.rs | 99 ++++++++++++--- crates/supervisor/core/src/syncnode/traits.rs | 4 +- .../{types/src => core/src/syncnode}/utils.rs | 2 +- crates/supervisor/types/src/lib.rs | 2 - 13 files changed, 217 insertions(+), 201 deletions(-) rename crates/supervisor/{types/src => core/src/syncnode}/utils.rs (98%) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index cf1a9269e4..02264bf680 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -164,7 +164,7 @@ mod tests { #[async_trait] impl NodeSubscriber for MockNode { async fn start_subscription( - self: Arc, + &self, _tx: mpsc::Sender, ) -> Result<(), ManagedNodeError> { self.subscribed.store(true, Ordering::SeqCst); diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index b4896905c2..792eca941e 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -438,7 +438,7 @@ mod tests { #[async_trait] impl NodeSubscriber for Node { async fn start_subscription( - self: Arc, + &self, _event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError>; } diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index 2cfd30d6c0..02eac64c06 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -141,9 +141,9 @@ pub enum LogIndexerError { #[cfg(test)] mod tests { use super::*; + use crate::syncnode::{AuthenticationError, ClientError}; use alloy_primitives::{Address, B256, Bytes}; use async_trait::async_trait; - use jsonrpsee::core::ClientError; use kona_interop::{ExecutingMessageBuilder, InteropProvider, SuperchainBuilder}; use kona_protocol::BlockInfo; use kona_supervisor_storage::StorageError; @@ -271,7 +271,11 @@ mod tests { let mut mock_provider = MockBlockProvider::new(); mock_provider.expect_fetch_receipts().withf(move |hash| *hash == block_hash).returning( - |_| Err(ManagedNodeError::Client(ClientError::Custom("forced error".to_string()))), + |_| { + Err(ManagedNodeError::Client(ClientError::Authentication( + AuthenticationError::InvalidHeader, + ))) + }, ); mock_provider.expect_block_by_number().returning(|_| Ok(BlockInfo::default())); // Not used diff --git a/crates/supervisor/core/src/supervisor.rs b/crates/supervisor/core/src/supervisor.rs index 038683573c..6ed1d8d579 100644 --- a/crates/supervisor/core/src/supervisor.rs +++ b/crates/supervisor/core/src/supervisor.rs @@ -241,7 +241,10 @@ impl Supervisor { let provider = RootProvider::::new_http(url); let client = Arc::new(Client::new(config.clone())); - let chain_id = client.chain_id().await?; + let chain_id = client.chain_id().await.map_err(|err| { + error!(target: "supervisor_service", %err, "Failed to get chain ID from client"); + SupervisorError::Initialise("failed to get chain id from client".to_string()) + })?; let db = self.database_factory.get_db(chain_id)?; let managed_node = ManagedNode::::new( diff --git a/crates/supervisor/core/src/syncnode/client.rs b/crates/supervisor/core/src/syncnode/client.rs index 5c5f3afea7..8706c944a3 100644 --- a/crates/supervisor/core/src/syncnode/client.rs +++ b/crates/supervisor/core/src/syncnode/client.rs @@ -1,4 +1,4 @@ -use super::{AuthenticationError, ManagedNodeError, metrics::Metrics}; +use super::{AuthenticationError, ClientError, metrics::Metrics}; use alloy_primitives::{B256, ChainId}; use alloy_rpc_types_engine::{Claims, JwtSecret}; use alloy_rpc_types_eth::BlockNumHash; @@ -21,31 +21,26 @@ use tracing::{error, info}; #[async_trait] pub trait ManagedNodeClient: Debug { /// Returns the [`ChainId`] of the managed node. - async fn chain_id(&self) -> Result; + async fn chain_id(&self) -> Result; /// Subscribes to [`SubscriptionEvent`] from the managed node. - async fn subscribe_events(&self) -> Result, ManagedNodeError>; + async fn subscribe_events(&self) -> Result, ClientError>; /// Fetches [`Receipts`] for a given block hash. - async fn fetch_receipts(&self, block_hash: B256) -> Result; + async fn fetch_receipts(&self, block_hash: B256) -> Result; /// Fetches the [`OutputV0`] at a specific timestamp. - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; /// Fetches the pending [`OutputV0`] at a specific timestamp. - async fn pending_output_v0_at_timestamp( - &self, - timestamp: u64, - ) -> Result; + async fn pending_output_v0_at_timestamp(&self, timestamp: u64) + -> Result; /// Fetches the L2 [`BlockInfo`] by timestamp. - async fn l2_block_ref_by_timestamp( - &self, - timestamp: u64, - ) -> Result; + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; /// Fetches the [`BlockInfo`] by block number. - async fn block_ref_by_number(&self, block_number: u64) -> Result; + async fn block_ref_by_number(&self, block_number: u64) -> Result; /// Resets the node state with the provided block IDs. async fn reset( @@ -55,29 +50,26 @@ pub trait ManagedNodeClient: Debug { local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + ) -> Result<(), ClientError>; /// Provides L1 [`BlockInfo`] to the managed node. - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError>; + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; /// Updates the finalized block ID in the managed node. - async fn update_finalized( - &self, - finalized_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; /// Updates the cross-unsafe block ID in the managed node. async fn update_cross_unsafe( &self, cross_unsafe_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + ) -> Result<(), ClientError>; /// Updates the cross-safe block ID in the managed node. async fn update_cross_safe( &self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + ) -> Result<(), ClientError>; /// Resets the ws-client to None when server disconnects async fn reset_ws_client(&self); @@ -137,7 +129,7 @@ impl Client { } /// Creates authentication headers using JWT secret. - fn create_auth_headers(&self) -> Result { + fn create_auth_headers(&self) -> Result { let Some(jwt_secret) = self.config.jwt_secret() else { error!(target: "managed_node", "JWT secret not found or invalid"); return Err(AuthenticationError::InvalidJwt.into()) @@ -166,7 +158,7 @@ impl Client { /// Returns a reference to the WebSocket client, creating it if it doesn't exist. // todo: support http client as well - pub async fn get_ws_client(&self) -> Result, ManagedNodeError> { + pub async fn get_ws_client(&self) -> Result, ClientError> { let mut ws_client_guard = self.ws_client.lock().await; if ws_client_guard.is_none() { let headers = self.create_auth_headers().inspect_err(|err| { @@ -191,7 +183,7 @@ impl ManagedNodeClient for Client { *ws_client_guard = None; }; } - async fn chain_id(&self) -> Result { + async fn chain_id(&self) -> Result { if let Some(chain_id) = self.chain_id.get() { return Ok(*chain_id); } @@ -219,7 +211,7 @@ impl ManagedNodeClient for Client { Ok(chain_id) } - async fn subscribe_events(&self) -> Result, ManagedNodeError> { + async fn subscribe_events(&self) -> Result, ClientError> { let client = self.get_ws_client().await?; // This returns ManagedNodeError, handled by your function let subscription = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -235,7 +227,7 @@ impl ManagedNodeClient for Client { Ok(subscription) } - async fn fetch_receipts(&self, block_hash: B256) -> Result { + async fn fetch_receipts(&self, block_hash: B256) -> Result { let client = self.get_ws_client().await?; // This returns ManagedNodeError, handled by your function let receipts = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -251,7 +243,7 @@ impl ManagedNodeClient for Client { Ok(receipts) } - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result { + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result { let client = self.get_ws_client().await?; let output_v0 = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -270,7 +262,7 @@ impl ManagedNodeClient for Client { async fn pending_output_v0_at_timestamp( &self, timestamp: u64, - ) -> Result { + ) -> Result { let client = self.get_ws_client().await?; let output_v0 = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -286,10 +278,7 @@ impl ManagedNodeClient for Client { Ok(output_v0) } - async fn l2_block_ref_by_timestamp( - &self, - timestamp: u64, - ) -> Result { + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result { let client = self.get_ws_client().await?; let block_info = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -305,7 +294,7 @@ impl ManagedNodeClient for Client { Ok(block_info) } - async fn block_ref_by_number(&self, block_number: u64) -> Result { + async fn block_ref_by_number(&self, block_number: u64) -> Result { let client = self.get_ws_client().await?; let block_info = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -328,7 +317,7 @@ impl ManagedNodeClient for Client { local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + ) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -343,7 +332,7 @@ impl ManagedNodeClient for Client { Ok(()) } - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError> { + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -358,10 +347,7 @@ impl ManagedNodeClient for Client { Ok(()) } - async fn update_finalized( - &self, - finalized_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -379,7 +365,7 @@ impl ManagedNodeClient for Client { async fn update_cross_unsafe( &self, cross_unsafe_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + ) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -398,7 +384,7 @@ impl ManagedNodeClient for Client { &self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + ) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, diff --git a/crates/supervisor/core/src/syncnode/error.rs b/crates/supervisor/core/src/syncnode/error.rs index 51fa921811..ba9c88f13f 100644 --- a/crates/supervisor/core/src/syncnode/error.rs +++ b/crates/supervisor/core/src/syncnode/error.rs @@ -3,44 +3,21 @@ use kona_supervisor_storage::StorageError; use thiserror::Error; /// Represents various errors that can occur during node management, -#[derive(Debug, Error)] +#[derive(Debug, Error, PartialEq, Eq)] pub enum ManagedNodeError { /// Represents an error that occurred while starting the managed node. #[error(transparent)] - Client(#[from] jsonrpsee::core::ClientError), - - /// Represents an error that occurred while parsing a chain ID from a string. - #[error(transparent)] - ChainIdParseError(#[from] std::num::ParseIntError), + Client(#[from] ClientError), /// Represents an error that occurred while subscribing to the managed node. #[error("subscription error: {0}")] Subscription(#[from] SubscriptionError), - /// Represents an error that occurred while authenticating to the managed node. - #[error("failed to authenticate: {0}")] - Authentication(#[from] AuthenticationError), - /// Represents an error that occurred while fetching data from the storage. #[error(transparent)] StorageError(#[from] StorageError), } -impl PartialEq for ManagedNodeError { - fn eq(&self, other: &Self) -> bool { - use ManagedNodeError::*; - match (self, other) { - (Client(a), Client(b)) => a.to_string() == b.to_string(), - (ChainIdParseError(a), ChainIdParseError(b)) => a == b, - (Subscription(a), Subscription(b)) => a == b, - (Authentication(a), Authentication(b)) => a == b, - _ => false, - } - } -} - -impl Eq for ManagedNodeError {} - /// Error establishing authenticated connection to managed node. #[derive(Debug, Error, PartialEq, Eq)] pub enum AuthenticationError { @@ -61,8 +38,12 @@ pub enum SubscriptionError { } /// Error handling managed event task. -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Error, PartialEq, Eq)] pub enum ManagedEventTaskError { + /// Represents an error that occurred while starting the managed node. + #[error(transparent)] + Client(#[from] ClientError), + /// Unable to successfully fetch next L1 block. #[error("failed to get block by number, number: {0}")] GetBlockByNumberFailed(u64), @@ -78,10 +59,6 @@ pub enum ManagedEventTaskError { parent: B256, }, - /// This should never happen, new() always sets the rpc client when creating the task. - #[error("rpc client for managed node is not set")] - ManagedNodeClientMissing, - /// Managed node api call failed. #[error("managed node api call failed")] ManagedNodeAPICallFailed, @@ -90,3 +67,33 @@ pub enum ManagedEventTaskError { #[error(transparent)] StorageError(#[from] StorageError), } + +/// Represents errors that can occur while interacting with the managed node client. +#[derive(Debug, Error)] +pub enum ClientError { + /// Represents an error that occurred while starting the managed node. + #[error(transparent)] + Client(#[from] jsonrpsee::core::ClientError), + + /// Represents an error that occurred while authenticating to the managed node. + #[error("failed to authenticate: {0}")] + Authentication(#[from] AuthenticationError), + + /// Represents an error that occurred while parsing a chain ID from a string. + #[error(transparent)] + ChainIdParseError(#[from] std::num::ParseIntError), +} + +impl PartialEq for ClientError { + fn eq(&self, other: &Self) -> bool { + use ClientError::*; + match (self, other) { + (Client(a), Client(b)) => a.to_string() == b.to_string(), + (Authentication(a), Authentication(b)) => a == b, + (ChainIdParseError(a), ChainIdParseError(b)) => a == b, + _ => false, + } + } +} + +impl Eq for ClientError {} diff --git a/crates/supervisor/core/src/syncnode/mod.rs b/crates/supervisor/core/src/syncnode/mod.rs index 54a338251a..0aaee3884f 100644 --- a/crates/supervisor/core/src/syncnode/mod.rs +++ b/crates/supervisor/core/src/syncnode/mod.rs @@ -5,7 +5,9 @@ mod node; pub use node::ManagedNode; mod error; -pub use error::{AuthenticationError, ManagedEventTaskError, ManagedNodeError, SubscriptionError}; +pub use error::{ + AuthenticationError, ClientError, ManagedEventTaskError, ManagedNodeError, SubscriptionError, +}; mod traits; pub use traits::{ @@ -19,3 +21,4 @@ pub use client::{Client, ClientConfig, ManagedNodeClient}; pub(super) mod metrics; pub(super) mod resetter; pub(super) mod task; +pub(super) mod utils; diff --git a/crates/supervisor/core/src/syncnode/node.rs b/crates/supervisor/core/src/syncnode/node.rs index 8be2068119..841da84daf 100644 --- a/crates/supervisor/core/src/syncnode/node.rs +++ b/crates/supervisor/core/src/syncnode/node.rs @@ -7,19 +7,18 @@ use alloy_rpc_types_eth::BlockNumHash; use async_trait::async_trait; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; -use kona_supervisor_types::{OutputV0, Receipts, spawn_task_with_retry}; +use kona_supervisor_types::{OutputV0, Receipts}; use std::sync::Arc; use tokio::{ sync::{Mutex, mpsc}, task::JoinHandle, }; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; use super::{ BlockProvider, ManagedNodeClient, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber, SubscriptionError, resetter::Resetter, - task::ManagedEventTask, + task::ManagedEventTask, utils::spawn_task_with_retry, }; use crate::event::ChainEvent; @@ -61,7 +60,8 @@ where /// If the chain ID is already cached, it returns that. /// If not, it fetches the chain ID from the managed node. pub async fn chain_id(&self) -> Result { - self.client.chain_id().await + let chain_id = self.client.chain_id().await?; + Ok(chain_id) } } @@ -76,87 +76,30 @@ where /// Establishes a WebSocket connection and subscribes to node events. /// Spawns a background task to process incoming events. async fn start_subscription( - self: Arc, + &self, event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError> { - let chain_id = self.chain_id().await?; let mut task_handle_guard = self.task_handle.lock().await; if task_handle_guard.is_some() { Err(SubscriptionError::AlreadyActive)? } - let task = Arc::new(ManagedEventTask::new( - self.client.clone(), - self.l1_provider.clone(), - self.resetter.clone(), - event_tx.clone(), - )); - - let node = self.clone(); + let client = self.client.clone(); + let l1_provider = self.l1_provider.clone(); + let resetter = self.resetter.clone(); + let cancel_token = self.cancel_token.clone(); // spawn a task which will be retried in failures let handle = spawn_task_with_retry( move || { - let node = node.clone(); - let task = task.clone(); - async move { - let mut subscription = - node.client.subscribe_events().await.inspect_err(|err| { - error!( - target: "managed_node", - %chain_id, - %err, - "Failed to subscribe to events" - ); - })?; - - let handle = tokio::spawn(async move { - info!(target: "managed_node", %chain_id, "Subscription task started"); - loop { - tokio::select! { - _ = node.cancel_token.cancelled() => { - info!(target: "managed_node", %chain_id, "Cancellation token triggered, shutting down subscription"); - break; - } - incoming_event = subscription.next() => { - match incoming_event { - Some(Ok(subscription_event)) => { - task.handle_managed_event(subscription_event.data).await; - } - Some(Err(err)) => { - error!( - target: "managed_node", - %chain_id, - %err, - "Error in event deserialization" - ); - } - None => { - warn!(target: "managed_node",%chain_id, "Subscription closed by server"); - node.client.reset_ws_client().await; - break; - } - } - } - } - } - - if let Err(err) = subscription.unsubscribe().await { - warn!( - target: "managed_node", - %chain_id, - %err, - "Failed to unsubscribe gracefully" - ); - } - - info!(target: "managed_node", %chain_id, "Subscription task finished"); - }); - - let _ = handle.await; - - Ok::<_, ManagedNodeError>(()) - } + let task = ManagedEventTask::new( + client.clone(), + l1_provider.clone(), + resetter.clone(), + cancel_token.clone(), + event_tx.clone(), + ); + async move { task.run().await } }, self.cancel_token.clone(), usize::MAX, @@ -177,10 +120,12 @@ where C: ManagedNodeClient + Send + Sync + 'static, { async fn block_by_number(&self, number: u64) -> Result { - self.client.block_ref_by_number(number).await + let block = self.client.block_ref_by_number(number).await?; + Ok(block) } async fn fetch_receipts(&self, block_hash: B256) -> Result { - self.client.fetch_receipts(block_hash).await + let receipt = self.client.fetch_receipts(block_hash).await?; + Ok(receipt) } } @@ -191,21 +136,24 @@ where C: ManagedNodeClient + Send + Sync + 'static, { async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result { - self.client.output_v0_at_timestamp(timestamp).await + let outputv0 = self.client.output_v0_at_timestamp(timestamp).await?; + Ok(outputv0) } async fn pending_output_v0_at_timestamp( &self, timestamp: u64, ) -> Result { - self.client.pending_output_v0_at_timestamp(timestamp).await + let outputv0 = self.client.pending_output_v0_at_timestamp(timestamp).await?; + Ok(outputv0) } async fn l2_block_ref_by_timestamp( &self, timestamp: u64, ) -> Result { - self.client.l2_block_ref_by_timestamp(timestamp).await + let block = self.client.l2_block_ref_by_timestamp(timestamp).await?; + Ok(block) } } @@ -219,14 +167,16 @@ where &self, finalized_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError> { - self.client.update_finalized(finalized_block_id).await + self.client.update_finalized(finalized_block_id).await?; + Ok(()) } async fn update_cross_unsafe( &self, cross_unsafe_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError> { - self.client.update_cross_unsafe(cross_unsafe_block_id).await + self.client.update_cross_unsafe(cross_unsafe_block_id).await?; + Ok(()) } async fn update_cross_safe( @@ -234,10 +184,12 @@ where source_block_id: BlockNumHash, derived_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError> { - self.client.update_cross_safe(source_block_id, derived_block_id).await + self.client.update_cross_safe(source_block_id, derived_block_id).await?; + Ok(()) } async fn reset(&self) -> Result<(), ManagedNodeError> { - self.resetter.reset().await + self.resetter.reset().await?; + Ok(()) } } diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 407e9eb9fc..f15a6a1e9b 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -79,7 +79,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::syncnode::{AuthenticationError, ManagedNodeError}; + use crate::syncnode::{AuthenticationError, ClientError}; use alloy_eips::BlockNumHash; use alloy_primitives::{B256, ChainId}; use async_trait::async_trait; @@ -107,18 +107,18 @@ mod tests { #[async_trait] impl ManagedNodeClient for Client { - async fn chain_id(&self) -> Result; - async fn subscribe_events(&self) -> Result, ManagedNodeError>; - async fn fetch_receipts(&self, block_hash: B256) -> Result; - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; - async fn block_ref_by_number(&self, block_number: u64) -> Result; - async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError>; - async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; + async fn chain_id(&self) -> Result; + async fn subscribe_events(&self) -> Result, ClientError>; + async fn fetch_receipts(&self, block_hash: B256) -> Result; + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; + async fn block_ref_by_number(&self, block_number: u64) -> Result; + async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ClientError>; async fn reset_ws_client(&self); } } @@ -171,9 +171,9 @@ mod tests { db.expect_get_super_head().returning(move || Ok(super_head)); let mut client = MockClient::new(); - client.expect_block_ref_by_number().returning(|_| { - Err(ManagedNodeError::Authentication(AuthenticationError::InvalidHeader)) - }); + client + .expect_block_ref_by_number() + .returning(|_| Err(ClientError::Authentication(AuthenticationError::InvalidHeader))); let resetter = Resetter::new(Arc::new(client), Arc::new(db)); @@ -208,7 +208,7 @@ mod tests { let mut client = MockClient::new(); client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe)); client.expect_reset().returning(|_, _, _, _, _| { - Err(ManagedNodeError::Authentication(AuthenticationError::InvalidJwt)) + Err(ClientError::Authentication(AuthenticationError::InvalidJwt)) }); let resetter = Resetter::new(Arc::new(client), Arc::new(db)); diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index ecfee206bd..1c225c8c6e 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -8,6 +8,7 @@ use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; use std::sync::Arc; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// [`ManagedEventTask`] sorts and processes individual events coming from a subscription. @@ -17,6 +18,8 @@ pub(super) struct ManagedEventTask { client: Arc, /// The URL of the L1 RPC endpoint to use for fetching L1 data l1_provider: RootProvider, + /// Cancellation token to stop the task gracefully + cancel_token: CancellationToken, /// The resetter for handling node resets resetter: Arc>, /// The channel to send the events to which require further processing e.g. db updates @@ -33,9 +36,65 @@ where client: Arc, l1_provider: RootProvider, resetter: Arc>, + cancel_token: CancellationToken, event_tx: mpsc::Sender, ) -> Self { - Self { client, l1_provider, resetter, event_tx } + Self { client, l1_provider, resetter, cancel_token, event_tx } + } + + pub(super) async fn run(&self) -> Result<(), ManagedEventTaskError> { + let chain_id = self.client.chain_id().await?; + + let mut subscription = self.client.subscribe_events().await.inspect_err(|err| { + error!( + target: "managed_event_task", + %chain_id, + %err, + "Failed to subscribe to events" + ); + })?; + + info!(target: "managed_event_task", %chain_id, "Subscription task started"); + loop { + tokio::select! { + _ = self.cancel_token.cancelled() => { + info!(target: "managed_event_task", %chain_id, "Cancellation token triggered, shutting down subscription"); + break; + } + incoming_event = subscription.next() => { + match incoming_event { + Some(Ok(subscription_event)) => { + self.handle_managed_event(subscription_event.data).await; + } + Some(Err(err)) => { + error!( + target: "managed_event_task", + %chain_id, + %err, + "Error in event deserialization" + ); + } + None => { + warn!(target: "managed_event_task", %chain_id, "Subscription closed by server"); + self.client.reset_ws_client().await; + break; + } + } + } + } + } + + if let Err(err) = subscription.unsubscribe().await { + warn!( + target: "managed_event_task", + %chain_id, + %err, + "Failed to unsubscribe gracefully" + ); + } + + info!(target: "managed_event_task", %chain_id, "Subscription task finished"); + Ok(()) } /// Processes a managed event received from the subscription. @@ -188,7 +247,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::syncnode::ManagedNodeError; + use crate::syncnode::ClientError; use alloy_eips::BlockNumHash; use alloy_primitives::{B256, ChainId}; use alloy_rpc_client::RpcClient; @@ -229,18 +288,18 @@ mod tests { #[async_trait] impl ManagedNodeClient for ManagedNodeClient { - async fn chain_id(&self) -> Result; - async fn subscribe_events(&self) -> Result, ManagedNodeError>; - async fn fetch_receipts(&self, block_hash: B256) -> Result; - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; - async fn block_ref_by_number(&self, block_number: u64) -> Result; - async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError>; - async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; + async fn chain_id(&self) -> Result; + async fn subscribe_events(&self) -> Result, ClientError>; + async fn fetch_receipts(&self, block_hash: B256) -> Result; + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; + async fn block_ref_by_number(&self, block_number: u64) -> Result; + async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ClientError>; async fn reset_ws_client(&self); } } @@ -276,7 +335,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); task.handle_managed_event(Some(managed_event)).await; @@ -326,7 +386,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); task.handle_managed_event(Some(managed_event)).await; @@ -373,7 +434,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); task.handle_managed_event(Some(managed_event)).await; @@ -456,7 +518,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); // push the value that we expect on next call asserter.push(MockResponse::Success(serde_json::from_str(next_block).unwrap())); diff --git a/crates/supervisor/core/src/syncnode/traits.rs b/crates/supervisor/core/src/syncnode/traits.rs index c607552097..a4f57dc474 100644 --- a/crates/supervisor/core/src/syncnode/traits.rs +++ b/crates/supervisor/core/src/syncnode/traits.rs @@ -5,7 +5,7 @@ use alloy_primitives::B256; use async_trait::async_trait; use kona_protocol::BlockInfo; use kona_supervisor_types::{OutputV0, Receipts}; -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; use tokio::sync::mpsc; /// Represents a node that can subscribe to L2 events from the chain. @@ -23,7 +23,7 @@ pub trait NodeSubscriber: Send + Sync { /// * `Ok(())` on successful subscription /// * `Err(ManagedNodeError)` if subscription setup fails async fn start_subscription( - self: Arc, + &self, event_tx: mpsc::Sender, ) -> Result<(), ManagedNodeError>; } diff --git a/crates/supervisor/types/src/utils.rs b/crates/supervisor/core/src/syncnode/utils.rs similarity index 98% rename from crates/supervisor/types/src/utils.rs rename to crates/supervisor/core/src/syncnode/utils.rs index 939cac67b5..1f85d3bb50 100644 --- a/crates/supervisor/types/src/utils.rs +++ b/crates/supervisor/core/src/syncnode/utils.rs @@ -8,7 +8,7 @@ use tracing::{error, info, warn}; /// - `operation`: The async task to retry (must return `Result<(), E>`) /// - `cancel_token`: Cancels the retry loop /// - `max_retries`: Max retries before exiting (use `usize::MAX` for infinite) -pub fn spawn_task_with_retry( +pub(super) fn spawn_task_with_retry( operation: impl Fn() -> Fut + Send + Sync + 'static, cancel_token: CancellationToken, max_retries: usize, diff --git a/crates/supervisor/types/src/lib.rs b/crates/supervisor/types/src/lib.rs index 09dc2efcee..202cd4f023 100644 --- a/crates/supervisor/types/src/lib.rs +++ b/crates/supervisor/types/src/lib.rs @@ -20,8 +20,6 @@ pub use access_list::{Access, AccessListError, parse_access_list}; mod chain_id; mod types; -mod utils; -pub use utils::spawn_task_with_retry; pub use chain_id::HexChainId; From cf0b0607894c25cae00820b39fc5f36de31ce1db Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Thu, 10 Jul 2025 16:04:16 +0600 Subject: [PATCH 7/8] removed redundant clone --- crates/supervisor/core/src/chain_processor/chain.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index 02264bf680..a94f9e7a5c 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -101,7 +101,7 @@ where // todo: figure out value for buffer size let (event_tx, event_rx) = mpsc::channel::(1000); self.event_tx = Some(event_tx.clone()); - self.managed_node.clone().start_subscription(event_tx.clone()).await?; + self.managed_node.start_subscription(event_tx.clone()).await?; let mut task = ChainProcessorTask::new( self.rollup_config.clone(), From 17129e2e3e18ee8a484a6f7ae69bfe966e816797 Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Thu, 10 Jul 2025 16:40:09 +0600 Subject: [PATCH 8/8] removed unused import --- Cargo.lock | 3 --- crates/supervisor/types/Cargo.toml | 3 --- 2 files changed, 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1b864ad3c0..ce3b556c20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5155,9 +5155,6 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.12", - "tokio", - "tokio-util", - "tracing", ] [[package]] diff --git a/crates/supervisor/types/Cargo.toml b/crates/supervisor/types/Cargo.toml index e00fba5fee..bbfbdddcbc 100644 --- a/crates/supervisor/types/Cargo.toml +++ b/crates/supervisor/types/Cargo.toml @@ -33,9 +33,6 @@ op-alloy-consensus.workspace = true serde.workspace = true derive_more = { workspace = true, default-features = false, features = ["constructor"] } thiserror = {workspace = true} -tokio = { workspace = true, features = ["sync", "macros", "time", "rt"] } -tokio-util.workspace = true -tracing.workspace = true [dev-dependencies] serde_json.workspace = true