diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index 2cfd30d6c0..02eac64c06 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -141,9 +141,9 @@ pub enum LogIndexerError { #[cfg(test)] mod tests { use super::*; + use crate::syncnode::{AuthenticationError, ClientError}; use alloy_primitives::{Address, B256, Bytes}; use async_trait::async_trait; - use jsonrpsee::core::ClientError; use kona_interop::{ExecutingMessageBuilder, InteropProvider, SuperchainBuilder}; use kona_protocol::BlockInfo; use kona_supervisor_storage::StorageError; @@ -271,7 +271,11 @@ mod tests { let mut mock_provider = MockBlockProvider::new(); mock_provider.expect_fetch_receipts().withf(move |hash| *hash == block_hash).returning( - |_| Err(ManagedNodeError::Client(ClientError::Custom("forced error".to_string()))), + |_| { + Err(ManagedNodeError::Client(ClientError::Authentication( + AuthenticationError::InvalidHeader, + ))) + }, ); mock_provider.expect_block_by_number().returning(|_| Ok(BlockInfo::default())); // Not used diff --git a/crates/supervisor/core/src/supervisor.rs b/crates/supervisor/core/src/supervisor.rs index 038683573c..6ed1d8d579 100644 --- a/crates/supervisor/core/src/supervisor.rs +++ b/crates/supervisor/core/src/supervisor.rs @@ -241,7 +241,10 @@ impl Supervisor { let provider = RootProvider::::new_http(url); let client = Arc::new(Client::new(config.clone())); - let chain_id = client.chain_id().await?; + let chain_id = client.chain_id().await.map_err(|err| { + error!(target: "supervisor_service", %err, "Failed to get chain ID from client"); + SupervisorError::Initialise("failed to get chain id from client".to_string()) + })?; let db = self.database_factory.get_db(chain_id)?; let managed_node = ManagedNode::::new( diff --git a/crates/supervisor/core/src/syncnode/client.rs b/crates/supervisor/core/src/syncnode/client.rs index b922ff4036..8706c944a3 100644 --- a/crates/supervisor/core/src/syncnode/client.rs +++ b/crates/supervisor/core/src/syncnode/client.rs @@ -1,4 +1,4 @@ -use super::{AuthenticationError, ManagedNodeError, metrics::Metrics}; +use super::{AuthenticationError, ClientError, metrics::Metrics}; use alloy_primitives::{B256, ChainId}; use alloy_rpc_types_engine::{Claims, JwtSecret}; use alloy_rpc_types_eth::BlockNumHash; @@ -21,31 +21,26 @@ use tracing::{error, info}; #[async_trait] pub trait ManagedNodeClient: Debug { /// Returns the [`ChainId`] of the managed node. - async fn chain_id(&self) -> Result; + async fn chain_id(&self) -> Result; /// Subscribes to [`SubscriptionEvent`] from the managed node. - async fn subscribe_events(&self) -> Result, ManagedNodeError>; + async fn subscribe_events(&self) -> Result, ClientError>; /// Fetches [`Receipts`] for a given block hash. - async fn fetch_receipts(&self, block_hash: B256) -> Result; + async fn fetch_receipts(&self, block_hash: B256) -> Result; /// Fetches the [`OutputV0`] at a specific timestamp. - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; /// Fetches the pending [`OutputV0`] at a specific timestamp. - async fn pending_output_v0_at_timestamp( - &self, - timestamp: u64, - ) -> Result; + async fn pending_output_v0_at_timestamp(&self, timestamp: u64) + -> Result; /// Fetches the L2 [`BlockInfo`] by timestamp. - async fn l2_block_ref_by_timestamp( - &self, - timestamp: u64, - ) -> Result; + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; /// Fetches the [`BlockInfo`] by block number. - async fn block_ref_by_number(&self, block_number: u64) -> Result; + async fn block_ref_by_number(&self, block_number: u64) -> Result; /// Resets the node state with the provided block IDs. async fn reset( @@ -55,29 +50,29 @@ pub trait ManagedNodeClient: Debug { local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + ) -> Result<(), ClientError>; /// Provides L1 [`BlockInfo`] to the managed node. - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError>; + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; /// Updates the finalized block ID in the managed node. - async fn update_finalized( - &self, - finalized_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; /// Updates the cross-unsafe block ID in the managed node. async fn update_cross_unsafe( &self, cross_unsafe_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + ) -> Result<(), ClientError>; /// Updates the cross-safe block ID in the managed node. async fn update_cross_safe( &self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError>; + ) -> Result<(), ClientError>; + + /// Resets the ws-client to None when server disconnects + async fn reset_ws_client(&self); } /// [`ClientConfig`] sets the configuration for the managed node client. @@ -134,7 +129,7 @@ impl Client { } /// Creates authentication headers using JWT secret. - fn create_auth_headers(&self) -> Result { + fn create_auth_headers(&self) -> Result { let Some(jwt_secret) = self.config.jwt_secret() else { error!(target: "managed_node", "JWT secret not found or invalid"); return Err(AuthenticationError::InvalidJwt.into()) @@ -163,7 +158,7 @@ impl Client { /// Returns a reference to the WebSocket client, creating it if it doesn't exist. // todo: support http client as well - pub async fn get_ws_client(&self) -> Result, ManagedNodeError> { + pub async fn get_ws_client(&self) -> Result, ClientError> { let mut ws_client_guard = self.ws_client.lock().await; if ws_client_guard.is_none() { let headers = self.create_auth_headers().inspect_err(|err| { @@ -182,7 +177,13 @@ impl Client { #[async_trait] impl ManagedNodeClient for Client { - async fn chain_id(&self) -> Result { + async fn reset_ws_client(&self) { + let mut ws_client_guard = self.ws_client.lock().await; + if ws_client_guard.is_some() { + *ws_client_guard = None; + }; + } + async fn chain_id(&self) -> Result { if let Some(chain_id) = self.chain_id.get() { return Ok(*chain_id); } @@ -210,7 +211,7 @@ impl ManagedNodeClient for Client { Ok(chain_id) } - async fn subscribe_events(&self) -> Result, ManagedNodeError> { + async fn subscribe_events(&self) -> Result, ClientError> { let client = self.get_ws_client().await?; // This returns ManagedNodeError, handled by your function let subscription = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -226,7 +227,7 @@ impl ManagedNodeClient for Client { Ok(subscription) } - async fn fetch_receipts(&self, block_hash: B256) -> Result { + async fn fetch_receipts(&self, block_hash: B256) -> Result { let client = self.get_ws_client().await?; // This returns ManagedNodeError, handled by your function let receipts = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -242,7 +243,7 @@ impl ManagedNodeClient for Client { Ok(receipts) } - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result { + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result { let client = self.get_ws_client().await?; let output_v0 = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -261,7 +262,7 @@ impl ManagedNodeClient for Client { async fn pending_output_v0_at_timestamp( &self, timestamp: u64, - ) -> Result { + ) -> Result { let client = self.get_ws_client().await?; let output_v0 = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -277,10 +278,7 @@ impl ManagedNodeClient for Client { Ok(output_v0) } - async fn l2_block_ref_by_timestamp( - &self, - timestamp: u64, - ) -> Result { + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result { let client = self.get_ws_client().await?; let block_info = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -296,7 +294,7 @@ impl ManagedNodeClient for Client { Ok(block_info) } - async fn block_ref_by_number(&self, block_number: u64) -> Result { + async fn block_ref_by_number(&self, block_number: u64) -> Result { let client = self.get_ws_client().await?; let block_info = observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -319,7 +317,7 @@ impl ManagedNodeClient for Client { local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + ) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -334,7 +332,7 @@ impl ManagedNodeClient for Client { Ok(()) } - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError> { + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -349,10 +347,7 @@ impl ManagedNodeClient for Client { Ok(()) } - async fn update_finalized( - &self, - finalized_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -370,7 +365,7 @@ impl ManagedNodeClient for Client { async fn update_cross_unsafe( &self, cross_unsafe_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + ) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, @@ -389,7 +384,7 @@ impl ManagedNodeClient for Client { &self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { + ) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, diff --git a/crates/supervisor/core/src/syncnode/error.rs b/crates/supervisor/core/src/syncnode/error.rs index 51fa921811..ba9c88f13f 100644 --- a/crates/supervisor/core/src/syncnode/error.rs +++ b/crates/supervisor/core/src/syncnode/error.rs @@ -3,44 +3,21 @@ use kona_supervisor_storage::StorageError; use thiserror::Error; /// Represents various errors that can occur during node management, -#[derive(Debug, Error)] +#[derive(Debug, Error, PartialEq, Eq)] pub enum ManagedNodeError { /// Represents an error that occurred while starting the managed node. #[error(transparent)] - Client(#[from] jsonrpsee::core::ClientError), - - /// Represents an error that occurred while parsing a chain ID from a string. - #[error(transparent)] - ChainIdParseError(#[from] std::num::ParseIntError), + Client(#[from] ClientError), /// Represents an error that occurred while subscribing to the managed node. #[error("subscription error: {0}")] Subscription(#[from] SubscriptionError), - /// Represents an error that occurred while authenticating to the managed node. - #[error("failed to authenticate: {0}")] - Authentication(#[from] AuthenticationError), - /// Represents an error that occurred while fetching data from the storage. #[error(transparent)] StorageError(#[from] StorageError), } -impl PartialEq for ManagedNodeError { - fn eq(&self, other: &Self) -> bool { - use ManagedNodeError::*; - match (self, other) { - (Client(a), Client(b)) => a.to_string() == b.to_string(), - (ChainIdParseError(a), ChainIdParseError(b)) => a == b, - (Subscription(a), Subscription(b)) => a == b, - (Authentication(a), Authentication(b)) => a == b, - _ => false, - } - } -} - -impl Eq for ManagedNodeError {} - /// Error establishing authenticated connection to managed node. #[derive(Debug, Error, PartialEq, Eq)] pub enum AuthenticationError { @@ -61,8 +38,12 @@ pub enum SubscriptionError { } /// Error handling managed event task. -#[derive(Debug, Error, PartialEq)] +#[derive(Debug, Error, PartialEq, Eq)] pub enum ManagedEventTaskError { + /// Represents an error that occurred while starting the managed node. + #[error(transparent)] + Client(#[from] ClientError), + /// Unable to successfully fetch next L1 block. #[error("failed to get block by number, number: {0}")] GetBlockByNumberFailed(u64), @@ -78,10 +59,6 @@ pub enum ManagedEventTaskError { parent: B256, }, - /// This should never happen, new() always sets the rpc client when creating the task. - #[error("rpc client for managed node is not set")] - ManagedNodeClientMissing, - /// Managed node api call failed. #[error("managed node api call failed")] ManagedNodeAPICallFailed, @@ -90,3 +67,33 @@ pub enum ManagedEventTaskError { #[error(transparent)] StorageError(#[from] StorageError), } + +/// Represents errors that can occur while interacting with the managed node client. +#[derive(Debug, Error)] +pub enum ClientError { + /// Represents an error that occurred while starting the managed node. + #[error(transparent)] + Client(#[from] jsonrpsee::core::ClientError), + + /// Represents an error that occurred while authenticating to the managed node. + #[error("failed to authenticate: {0}")] + Authentication(#[from] AuthenticationError), + + /// Represents an error that occurred while parsing a chain ID from a string. + #[error(transparent)] + ChainIdParseError(#[from] std::num::ParseIntError), +} + +impl PartialEq for ClientError { + fn eq(&self, other: &Self) -> bool { + use ClientError::*; + match (self, other) { + (Client(a), Client(b)) => a.to_string() == b.to_string(), + (Authentication(a), Authentication(b)) => a == b, + (ChainIdParseError(a), ChainIdParseError(b)) => a == b, + _ => false, + } + } +} + +impl Eq for ClientError {} diff --git a/crates/supervisor/core/src/syncnode/mod.rs b/crates/supervisor/core/src/syncnode/mod.rs index 54a338251a..0aaee3884f 100644 --- a/crates/supervisor/core/src/syncnode/mod.rs +++ b/crates/supervisor/core/src/syncnode/mod.rs @@ -5,7 +5,9 @@ mod node; pub use node::ManagedNode; mod error; -pub use error::{AuthenticationError, ManagedEventTaskError, ManagedNodeError, SubscriptionError}; +pub use error::{ + AuthenticationError, ClientError, ManagedEventTaskError, ManagedNodeError, SubscriptionError, +}; mod traits; pub use traits::{ @@ -19,3 +21,4 @@ pub use client::{Client, ClientConfig, ManagedNodeClient}; pub(super) mod metrics; pub(super) mod resetter; pub(super) mod task; +pub(super) mod utils; diff --git a/crates/supervisor/core/src/syncnode/node.rs b/crates/supervisor/core/src/syncnode/node.rs index fa969d64b0..841da84daf 100644 --- a/crates/supervisor/core/src/syncnode/node.rs +++ b/crates/supervisor/core/src/syncnode/node.rs @@ -14,12 +14,11 @@ use tokio::{ task::JoinHandle, }; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; use super::{ BlockProvider, ManagedNodeClient, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber, SubscriptionError, resetter::Resetter, - task::ManagedEventTask, + task::ManagedEventTask, utils::spawn_task_with_retry, }; use crate::event::ChainEvent; @@ -61,7 +60,8 @@ where /// If the chain ID is already cached, it returns that. /// If not, it fetches the chain ID from the managed node. pub async fn chain_id(&self) -> Result { - self.client.chain_id().await + let chain_id = self.client.chain_id().await?; + Ok(chain_id) } } @@ -84,73 +84,29 @@ where Err(SubscriptionError::AlreadyActive)? } - let mut subscription = self.client.subscribe_events().await.inspect_err(|err| { - error!( - target: "managed_node", - %err, - "Failed to subscribe to events" - ); - })?; - + let client = self.client.clone(); + let l1_provider = self.l1_provider.clone(); + let resetter = self.resetter.clone(); let cancel_token = self.cancel_token.clone(); - // Creates a task instance to sort and process the events from the subscription - let task = ManagedEventTask::new( - self.client.clone(), - self.l1_provider.clone(), - self.resetter.clone(), - event_tx, - ); - // Start background task to handle events - let handle = tokio::spawn(async move { - info!(target: "managed_node", "Subscription task started"); - loop { - tokio::select! { - // Listen for stop signal - _ = cancel_token.cancelled() => { - info!(target: "managed_node", "Cancellation token triggered, shutting down subscription"); - break; - } - - // Listen for events from subscription - incoming_event = subscription.next() => { - match incoming_event { - Some(Ok(subscription_event)) => { - task.handle_managed_event(subscription_event.data).await; - } - Some(Err(err)) => { - error!( - target: "managed_node", - %err, - "Error in event deserialization" - ); - // Continue processing next events despite this error - } - None => { - // Subscription closed by the server - warn!(target: "managed_node", "Subscription closed by server"); - break; - } - } - } - } - } - - // Try to unsubscribe gracefully - if let Err(err) = subscription.unsubscribe().await { - warn!( - target: "managed_node", - %err, - "Failed to unsubscribe gracefully" + // spawn a task which will be retried in failures + let handle = spawn_task_with_retry( + move || { + let task = ManagedEventTask::new( + client.clone(), + l1_provider.clone(), + resetter.clone(), + cancel_token.clone(), + event_tx.clone(), ); - } - - info!(target: "managed_node", "Subscription task finished"); - }); + async move { task.run().await } + }, + self.cancel_token.clone(), + usize::MAX, + ); *task_handle_guard = Some(handle); - info!(target: "managed_node", "Subscription started successfully"); Ok(()) } } @@ -164,10 +120,12 @@ where C: ManagedNodeClient + Send + Sync + 'static, { async fn block_by_number(&self, number: u64) -> Result { - self.client.block_ref_by_number(number).await + let block = self.client.block_ref_by_number(number).await?; + Ok(block) } async fn fetch_receipts(&self, block_hash: B256) -> Result { - self.client.fetch_receipts(block_hash).await + let receipt = self.client.fetch_receipts(block_hash).await?; + Ok(receipt) } } @@ -178,21 +136,24 @@ where C: ManagedNodeClient + Send + Sync + 'static, { async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result { - self.client.output_v0_at_timestamp(timestamp).await + let outputv0 = self.client.output_v0_at_timestamp(timestamp).await?; + Ok(outputv0) } async fn pending_output_v0_at_timestamp( &self, timestamp: u64, ) -> Result { - self.client.pending_output_v0_at_timestamp(timestamp).await + let outputv0 = self.client.pending_output_v0_at_timestamp(timestamp).await?; + Ok(outputv0) } async fn l2_block_ref_by_timestamp( &self, timestamp: u64, ) -> Result { - self.client.l2_block_ref_by_timestamp(timestamp).await + let block = self.client.l2_block_ref_by_timestamp(timestamp).await?; + Ok(block) } } @@ -206,14 +167,16 @@ where &self, finalized_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError> { - self.client.update_finalized(finalized_block_id).await + self.client.update_finalized(finalized_block_id).await?; + Ok(()) } async fn update_cross_unsafe( &self, cross_unsafe_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError> { - self.client.update_cross_unsafe(cross_unsafe_block_id).await + self.client.update_cross_unsafe(cross_unsafe_block_id).await?; + Ok(()) } async fn update_cross_safe( @@ -221,10 +184,12 @@ where source_block_id: BlockNumHash, derived_block_id: BlockNumHash, ) -> Result<(), ManagedNodeError> { - self.client.update_cross_safe(source_block_id, derived_block_id).await + self.client.update_cross_safe(source_block_id, derived_block_id).await?; + Ok(()) } async fn reset(&self) -> Result<(), ManagedNodeError> { - self.resetter.reset().await + self.resetter.reset().await?; + Ok(()) } } diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index fa571b8613..f15a6a1e9b 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -79,7 +79,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::syncnode::{AuthenticationError, ManagedNodeError}; + use crate::syncnode::{AuthenticationError, ClientError}; use alloy_eips::BlockNumHash; use alloy_primitives::{B256, ChainId}; use async_trait::async_trait; @@ -107,18 +107,19 @@ mod tests { #[async_trait] impl ManagedNodeClient for Client { - async fn chain_id(&self) -> Result; - async fn subscribe_events(&self) -> Result, ManagedNodeError>; - async fn fetch_receipts(&self, block_hash: B256) -> Result; - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; - async fn block_ref_by_number(&self, block_number: u64) -> Result; - async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError>; - async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; + async fn chain_id(&self) -> Result; + async fn subscribe_events(&self) -> Result, ClientError>; + async fn fetch_receipts(&self, block_hash: B256) -> Result; + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; + async fn block_ref_by_number(&self, block_number: u64) -> Result; + async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn reset_ws_client(&self); } } @@ -170,9 +171,9 @@ mod tests { db.expect_get_super_head().returning(move || Ok(super_head)); let mut client = MockClient::new(); - client.expect_block_ref_by_number().returning(|_| { - Err(ManagedNodeError::Authentication(AuthenticationError::InvalidHeader)) - }); + client + .expect_block_ref_by_number() + .returning(|_| Err(ClientError::Authentication(AuthenticationError::InvalidHeader))); let resetter = Resetter::new(Arc::new(client), Arc::new(db)); @@ -207,7 +208,7 @@ mod tests { let mut client = MockClient::new(); client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe)); client.expect_reset().returning(|_, _, _, _, _| { - Err(ManagedNodeError::Authentication(AuthenticationError::InvalidJwt)) + Err(ClientError::Authentication(AuthenticationError::InvalidJwt)) }); let resetter = Resetter::new(Arc::new(client), Arc::new(db)); diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 9dbf7ca495..1c225c8c6e 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -8,6 +8,7 @@ use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; use std::sync::Arc; use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; /// [`ManagedEventTask`] sorts and processes individual events coming from a subscription. @@ -17,6 +18,8 @@ pub(super) struct ManagedEventTask { client: Arc, /// The URL of the L1 RPC endpoint to use for fetching L1 data l1_provider: RootProvider, + /// Cancellation token to stop the task gracefully + cancel_token: CancellationToken, /// The resetter for handling node resets resetter: Arc>, /// The channel to send the events to which require further processing e.g. db updates @@ -33,9 +36,65 @@ where client: Arc, l1_provider: RootProvider, resetter: Arc>, + cancel_token: CancellationToken, event_tx: mpsc::Sender, ) -> Self { - Self { client, l1_provider, resetter, event_tx } + Self { client, l1_provider, resetter, cancel_token, event_tx } + } + + pub(super) async fn run(&self) -> Result<(), ManagedEventTaskError> { + let chain_id = self.client.chain_id().await?; + + let mut subscription = self.client.subscribe_events().await.inspect_err(|err| { + error!( + target: "managed_event_task", + %chain_id, + %err, + "Failed to subscribe to events" + ); + })?; + + info!(target: "managed_event_task", %chain_id, "Subscription task started"); + loop { + tokio::select! { + _ = self.cancel_token.cancelled() => { + info!(target: "managed_event_task", %chain_id, "Cancellation token triggered, shutting down subscription"); + break; + } + incoming_event = subscription.next() => { + match incoming_event { + Some(Ok(subscription_event)) => { + self.handle_managed_event(subscription_event.data).await; + } + Some(Err(err)) => { + error!( + target: "managed_event_task", + %chain_id, + %err, + "Error in event deserialization" + ); + } + None => { + warn!(target: "managed_event_task", %chain_id, "Subscription closed by server"); + self.client.reset_ws_client().await; + break; + } + } + } + } + } + + if let Err(err) = subscription.unsubscribe().await { + warn!( + target: "managed_event_task", + %chain_id, + %err, + "Failed to unsubscribe gracefully" + ); + } + + info!(target: "managed_event_task", %chain_id, "Subscription task finished"); + Ok(()) } /// Processes a managed event received from the subscription. @@ -188,7 +247,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::syncnode::ManagedNodeError; + use crate::syncnode::ClientError; use alloy_eips::BlockNumHash; use alloy_primitives::{B256, ChainId}; use alloy_rpc_client::RpcClient; @@ -229,18 +288,19 @@ mod tests { #[async_trait] impl ManagedNodeClient for ManagedNodeClient { - async fn chain_id(&self) -> Result; - async fn subscribe_events(&self) -> Result, ManagedNodeError>; - async fn fetch_receipts(&self, block_hash: B256) -> Result; - async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; - async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; - async fn block_ref_by_number(&self, block_number: u64) -> Result; - async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ManagedNodeError>; - async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; - async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ManagedNodeError>; + async fn chain_id(&self) -> Result; + async fn subscribe_events(&self) -> Result, ClientError>; + async fn fetch_receipts(&self, block_hash: B256) -> Result; + async fn output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn pending_output_v0_at_timestamp(&self, timestamp: u64) -> Result; + async fn l2_block_ref_by_timestamp(&self, timestamp: u64) -> Result; + async fn block_ref_by_number(&self, block_number: u64) -> Result; + async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; + async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn update_cross_safe(&self, source_block_id: BlockNumHash, derived_block_id: BlockNumHash) -> Result<(), ClientError>; + async fn reset_ws_client(&self); } } @@ -275,7 +335,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); task.handle_managed_event(Some(managed_event)).await; @@ -325,7 +386,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); task.handle_managed_event(Some(managed_event)).await; @@ -372,7 +434,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); task.handle_managed_event(Some(managed_event)).await; @@ -455,7 +518,8 @@ mod tests { let transport = MockTransport::new(asserter.clone()); let provider = RootProvider::::new(RpcClient::new(transport, false)); let resetter = Arc::new(Resetter::new(client.clone(), db)); - let task = ManagedEventTask::new(client, provider, resetter, tx); + let cancel_token = CancellationToken::new(); + let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); // push the value that we expect on next call asserter.push(MockResponse::Success(serde_json::from_str(next_block).unwrap())); diff --git a/crates/supervisor/core/src/syncnode/utils.rs b/crates/supervisor/core/src/syncnode/utils.rs new file mode 100644 index 0000000000..1f85d3bb50 --- /dev/null +++ b/crates/supervisor/core/src/syncnode/utils.rs @@ -0,0 +1,67 @@ +use std::{future::Future, time::Duration}; +use tokio::{select, task::JoinHandle, time::sleep}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +/// Spawns a background task that retries the given async operation with backoff on failure. +/// +/// - `operation`: The async task to retry (must return `Result<(), E>`) +/// - `cancel_token`: Cancels the retry loop +/// - `max_retries`: Max retries before exiting (use `usize::MAX` for infinite) +pub(super) fn spawn_task_with_retry( + operation: impl Fn() -> Fut + Send + Sync + 'static, + cancel_token: CancellationToken, + max_retries: usize, +) -> JoinHandle<()> +where + Fut: Future> + Send + 'static, + E: std::fmt::Display + Send + 'static, +{ + tokio::spawn(async move { + let mut attempt = 0; + + loop { + if cancel_token.is_cancelled() { + info!(target: "retrier", "Retry loop cancelled before starting"); + break; + } + + match operation().await { + Ok(()) => { + info!(target: "retrier", "Task exited successfully, restarting"); + attempt = 0; // Reset attempt count on success + } + Err(err) => { + attempt += 1; + + if attempt > max_retries { + error!(target: "retrier", %err, "Retry limit ({max_retries}) exceeded"); + break; + } + + let delay = backoff_delay(attempt); + warn!( + target: "retrier", + %err, + ?delay, + "Attempt {attempt}/{max_retries} failed, retrying after delay" + ); + + select! { + _ = sleep(delay) => {} + _ = cancel_token.cancelled() => { + warn!(target: "retrier", "Retry loop cancelled during backoff"); + break; + } + } + } + } + } + }) +} + +/// Calculates exponential backoff delay with a max cap (30s). +fn backoff_delay(attempt: usize) -> Duration { + let secs = 2u64.saturating_pow(attempt.min(5) as u32); + Duration::from_secs(secs.min(30)) +} diff --git a/crates/supervisor/types/src/lib.rs b/crates/supervisor/types/src/lib.rs index d8a036df14..202cd4f023 100644 --- a/crates/supervisor/types/src/lib.rs +++ b/crates/supervisor/types/src/lib.rs @@ -20,6 +20,7 @@ pub use access_list::{Access, AccessListError, parse_access_list}; mod chain_id; mod types; + pub use chain_id::HexChainId; pub use types::{BlockSeal, OutputV0, SubscriptionEvent}; diff --git a/tests/justfile b/tests/justfile index 06ffaa19c7..6bf0920639 100644 --- a/tests/justfile +++ b/tests/justfile @@ -57,7 +57,7 @@ test-e2e DEVNET GO_PKG_NAME="" DEVNET_CUSTOM_PATH="" : export DISABLE_OP_E2E_LEGACY=true export DEVSTACK_ORCHESTRATOR=sysext - cd {{SOURCE}} && go test -v ./$GO_PKG_NAME + cd {{SOURCE}} && go test -timeout 20m -v ./$GO_PKG_NAME build-deploy-devnet DEVNET BINARY OP_PACKAGE_PATH="": (build-devnet BINARY) (devnet DEVNET OP_PACKAGE_PATH) diff --git a/tests/supervisor/interop_sync_test.go b/tests/supervisor/interop_sync_test.go new file mode 100644 index 0000000000..061d2cb22a --- /dev/null +++ b/tests/supervisor/interop_sync_test.go @@ -0,0 +1,86 @@ +package supervisor + +import ( + "testing" + + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-devstack/presets" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +// TestL2CLResync checks that unsafe head advances after restarting L2CL. +// Resync is only possible when supervisor and L2CL reconnects. +// Acceptance Test: https://github.com/ethereum-optimism/optimism/blob/develop/op-acceptance-tests/tests/interop/sync/simple_interop/interop_sync_test.go +func TestL2CLResync(gt *testing.T) { + t := devtest.SerialT(gt) + sys := presets.NewSimpleInterop(t) + logger := sys.Log.With("Test", "TestL2CLResync") + + logger.Info("Check unsafe chains are advancing") + dsl.CheckAll(t, + sys.L2ELA.AdvancedFn(eth.Unsafe, 5), + sys.L2ELB.AdvancedFn(eth.Unsafe, 5), + ) + + logger.Info("Stop L2CL nodes") + sys.L2CLA.Stop() + sys.L2CLB.Stop() + + logger.Info("Make sure L2ELs does not advance") + dsl.CheckAll(t, + sys.L2ELA.NotAdvancedFn(eth.Unsafe), + sys.L2ELB.NotAdvancedFn(eth.Unsafe), + ) + + logger.Info("Restart L2CL nodes") + sys.L2CLA.Start() + sys.L2CLB.Start() + + // L2CL may advance a few blocks without supervisor connection, but eventually it will stop without the connection + // we must check that unsafe head is advancing due to reconnection + logger.Info("Boot up L2CL nodes") + + dsl.CheckAll(t, + sys.L2ELA.AdvancedFn(eth.Unsafe, 30), + sys.L2ELB.AdvancedFn(eth.Unsafe, 30), + ) + + // supervisor will attempt to reconnect with L2CLs at this point because L2CL ws endpoint is recovered + logger.Info("Check unsafe chains are advancing again") + dsl.CheckAll(t, + sys.L2ELA.AdvancedFn(eth.Unsafe, 10), + sys.L2ELB.AdvancedFn(eth.Unsafe, 10), + ) + + // supervisor successfully connected with managed L2CLs +} + +// TestSupervisorResync checks that heads advances after restarting the Supervisor. +func TestSupervisorResync(gt *testing.T) { + t := devtest.SerialT(gt) + sys := presets.NewSimpleInterop(t) + logger := sys.Log.With("Test", "TestSupervisorResync") + + logger.Info("Check unsafe chains are advancing") + + for _, level := range []types.SafetyLevel{types.LocalUnsafe, types.LocalSafe, types.CrossUnsafe, types.CrossSafe} { + sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainA.ChainID(), 2, level, 20) + sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainB.ChainID(), 2, level, 20) + } + + logger.Info("Stop Supervisor node") + sys.Supervisor.Stop() + + logger.Info("Restart Supervisor node") + sys.Supervisor.Start() + + logger.Info("Boot up Supervisor node") + + // Re check syncing is not blocked + for _, level := range []types.SafetyLevel{types.LocalUnsafe, types.LocalSafe, types.CrossUnsafe, types.CrossSafe} { + sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainA.ChainID(), 2, level, 20) + sys.Supervisor.WaitForL2HeadToAdvance(sys.L2ChainB.ChainID(), 2, level, 20) + } +}