diff --git a/Cargo.lock b/Cargo.lock index 160d6d3e23..b46b545085 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5044,6 +5044,7 @@ name = "kona-node" version = "1.0.0-rc.1" dependencies = [ "alloy-chains", + "alloy-eips", "alloy-genesis", "alloy-primitives", "alloy-provider", @@ -5053,6 +5054,7 @@ dependencies = [ "alloy-transport", "alloy-transport-http", "anyhow", + "async-trait", "backon", "clap", "derive_more", diff --git a/bin/node/Cargo.toml b/bin/node/Cargo.toml index 9feedad43a..1b88a20837 100644 --- a/bin/node/Cargo.toml +++ b/bin/node/Cargo.toml @@ -16,7 +16,7 @@ workspace = true [dependencies] # workspace -kona-rpc.workspace = true +kona-rpc = { workspace = true, features = ["client"] } kona-peers.workspace = true kona-genesis = { workspace = true, features = ["tabled"] } kona-protocol.workspace = true @@ -41,6 +41,7 @@ alloy-transport-http.workspace = true alloy-primitives.workspace = true alloy-signer-local.workspace = true alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } +alloy-eips.workspace = true # op-alloy op-alloy-provider.workspace = true @@ -62,10 +63,11 @@ metrics.workspace = true reqwest.workspace = true tracing.workspace = true thiserror.workspace = true +async-trait.workspace = true tokio-stream.workspace = true tokio-util.workspace = true serde_json = { workspace = true, features = ["std"] } -jsonrpsee = { workspace = true, features = ["server"] } +jsonrpsee = { workspace = true, features = ["server", "http-client"] } clap = { workspace = true, features = ["derive", "env"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } backon = { workspace = true, features = ["std", "tokio", "tokio-sleep"] } diff --git a/bin/node/src/commands/node.rs b/bin/node/src/commands/node.rs index bd30bcdf81..7703d3365a 100644 --- a/bin/node/src/commands/node.rs +++ b/bin/node/src/commands/node.rs @@ -2,8 +2,8 @@ use crate::{ flags::{ - BuilderClientArgs, GlobalArgs, L1ClientArgs, L2ClientArgs, P2PArgs, RollupBoostFlags, - RpcArgs, SequencerArgs, + BuilderClientArgs, FollowClientArgs, GlobalArgs, L1ClientArgs, L2ClientArgs, P2PArgs, + RollupBoostFlags, RpcArgs, SequencerArgs, }, metrics::{CliMetrics, init_rollup_config_metrics}, }; @@ -16,7 +16,9 @@ use clap::Parser; use kona_cli::{LogConfig, MetricsArgs}; use kona_engine::{HyperAuthClient, OpEngineClient}; use kona_genesis::{L1ChainConfig, RollupConfig}; -use kona_node_service::{EngineConfig, L1ConfigBuilder, NodeMode, RollupNodeBuilder}; +use kona_node_service::{ + EngineConfig, FollowClientConfig, L1ConfigBuilder, NodeMode, RollupNodeBuilder, +}; use kona_registry::{L1Config, scr_rollup_config_by_alloy_ident}; use op_alloy_network::Optimism; use op_alloy_provider::ext::engine::OpEngineApi; @@ -98,6 +100,10 @@ pub struct NodeCommand { #[clap(flatten)] pub builder_client_args: BuilderClientArgs, + /// Optional follow client for L2 CL sync status. + #[clap(flatten)] + pub follow_client_args: FollowClientArgs, + /// Path to a custom L2 rollup configuration file /// (overrides the default rollup configuration from the registry) #[arg(long, visible_alias = "rollup-cfg", env = "KONA_NODE_ROLLUP_CONFIG")] @@ -127,6 +133,7 @@ impl Default for NodeCommand { l1_rpc_args: L1ClientArgs::default(), l2_client_args: L2ClientArgs::default(), builder_client_args: BuilderClientArgs::default(), + follow_client_args: FollowClientArgs::default(), l2_config_file: None, l1_config_file: None, node_mode: NodeMode::Validator, @@ -311,9 +318,29 @@ impl NodeCommand { l2_timeout: Duration::from_millis(self.l2_client_args.l2_engine_timeout), l1_url: self.l1_rpc_args.l1_eth_rpc.clone(), mode: self.node_mode, + follow_enabled: self.follow_client_args.l2_follow_source.is_some(), rollup_boost: self.rollup_boost_flags.as_rollup_boost_args(), }; + // Create the follow client config if a follow source URL is provided + let follow_client_config = + self.follow_client_args.l2_follow_source.as_ref().map(|l2_follow_url| { + info!( + target: "rollup_node", + l2_follow_url = %l2_follow_url, + l1_url = %self.l1_rpc_args.l1_eth_rpc, + "Follow client config provided" + ); + FollowClientConfig { + l2_url: l2_follow_url.clone(), + l1_url: self.l1_rpc_args.l1_eth_rpc.clone(), + } + }); + + if follow_client_config.is_none() { + debug!(target: "rollup_node", "No follow source configured"); + } + RollupNodeBuilder::new( cfg, l1_config, @@ -321,6 +348,7 @@ impl NodeCommand { engine_config, p2p_config, rpc_config, + follow_client_config, ) .with_sequencer_config(self.sequencer_flags.config()) .build() diff --git a/bin/node/src/flags/engine/mod.rs b/bin/node/src/flags/engine/mod.rs index 65359a419c..8ac92b8220 100644 --- a/bin/node/src/flags/engine/mod.rs +++ b/bin/node/src/flags/engine/mod.rs @@ -2,7 +2,7 @@ mod flashblocks; pub use flashblocks::{FlashblocksFlags, FlashblocksWebsocketFlags}; mod providers; -pub use providers::{BuilderClientArgs, L1ClientArgs, L2ClientArgs}; +pub use providers::{BuilderClientArgs, FollowClientArgs, L1ClientArgs, L2ClientArgs}; mod rollup_boost; pub use rollup_boost::RollupBoostFlags; diff --git a/bin/node/src/flags/engine/providers.rs b/bin/node/src/flags/engine/providers.rs index fd07d1617d..58dd04ccf5 100644 --- a/bin/node/src/flags/engine/providers.rs +++ b/bin/node/src/flags/engine/providers.rs @@ -131,3 +131,12 @@ impl Default for L2ClientArgs { } } } + +/// L2 follow client arguments. +#[derive(Clone, Debug, Default, clap::Args)] +pub struct FollowClientArgs { + /// URL of the L2 follow source RPC API. + /// The source must be the L2 CL RPC. + #[arg(long, visible_alias = "l2.follow.source", env = "KONA_NODE_L2_FOLLOW_SOURCE")] + pub l2_follow_source: Option, +} diff --git a/bin/node/src/flags/mod.rs b/bin/node/src/flags/mod.rs index 07580fa969..00280d4653 100644 --- a/bin/node/src/flags/mod.rs +++ b/bin/node/src/flags/mod.rs @@ -23,6 +23,6 @@ pub use signer::{SignerArgs, SignerArgsParseError}; mod engine; pub use engine::{ - BuilderClientArgs, FlashblocksFlags, FlashblocksWebsocketFlags, L1ClientArgs, L2ClientArgs, - RollupBoostFlags, + BuilderClientArgs, FlashblocksFlags, FlashblocksWebsocketFlags, FollowClientArgs, L1ClientArgs, + L2ClientArgs, RollupBoostFlags, }; diff --git a/bin/node/src/follow.rs b/bin/node/src/follow.rs new file mode 100644 index 0000000000..2e649bb758 --- /dev/null +++ b/bin/node/src/follow.rs @@ -0,0 +1,180 @@ +//! Follow client for querying sync status from an L2 CL RPC. + +use alloy_eips::BlockNumberOrTag; +use alloy_provider::{Provider, RootProvider}; +use async_trait::async_trait; +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use kona_protocol::{BlockInfo, L2BlockInfo}; +use kona_rpc::RollupNodeApiClient; +use std::time::Duration; +use thiserror::Error; +use url::Url; + +/// Default timeout for follow client requests in milliseconds. +const DEFAULT_FOLLOW_TIMEOUT: u64 = 5000; + +/// An error that occurred in the [`FollowClient`]. +#[derive(Error, Debug)] +pub enum FollowClientError { + /// An error occurred while building the HTTP client + #[error("Failed to build HTTP client: {0}")] + HttpClientBuild(String), + + /// An RPC error occurred + #[error("RPC error: {0}")] + RpcError(#[from] jsonrpsee::core::ClientError), + + /// An error occurred while fetching L1 block + #[error("Failed to fetch L1 block: {0}")] + L1BlockFetchError(String), + + /// Block not found + #[error("Block not found")] + BlockNotFound, +} + +/// Simplified follow status containing only the essential sync information. +/// +/// This struct contains a subset of fields from [`kona_protocol::SyncStatus`], +/// focusing only on the fields needed for following another rollup node. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FollowStatus { + /// The current L1 block that the derivation process is last idled at. + pub current_l1: BlockInfo, + /// The safe L2 block ref, derived from the L1 chain. + pub safe_l2: L2BlockInfo, + /// The finalized L2 block ref, derived from finalized L1 information. + pub finalized_l2: L2BlockInfo, +} + +/// Follow client trait for querying sync status from an L2 consensus layer RPC. +/// +/// This trait defines the interface for communicating with another rollup node's +/// consensus layer to fetch its synchronization status and querying L1 blocks. +/// The main reason this trait exists is for mocking and unit testing. +#[async_trait] +pub trait FollowClient: Send + Sync { + /// Gets the synchronization status from the follow source. + /// + /// Calls the `optimism_syncStatus` RPC method on the remote rollup node, + /// extracts the essential fields, and returns a simplified status. + /// + /// # Returns + /// + /// Returns the [`FollowStatus`] containing only the current L1, safe L2, + /// and finalized L2 blocks from the remote rollup node, or an error if + /// the RPC call fails. + async fn get_follow_status(&self) -> Result; + + /// Fetches the L1 [`BlockInfo`] by block number. + /// + /// Queries the L1 execution layer for the block at the given number and + /// returns the block information. + /// + /// # Arguments + /// + /// * `number` - The L1 block number to fetch + /// + /// # Returns + /// + /// Returns the [`BlockInfo`] for the requested L1 block, or an error if + /// the block cannot be fetched or does not exist. + async fn l1_block_info_by_number(&self, number: u64) -> Result; +} + +/// Builder for creating a [`HttpFollowClient`]. +#[derive(Debug, Clone)] +pub struct FollowClientBuilder { + /// The L2 consensus layer RPC URL. + pub l2_url: Url, + /// The L1 execution layer RPC URL. + pub l1_url: Url, + /// The timeout duration for requests. + pub timeout: Duration, +} + +impl FollowClientBuilder { + /// Creates a new [`FollowClientBuilder`] with the given URLs. + /// + /// # Arguments + /// + /// * `l2_url` - The L2 consensus layer RPC endpoint URL + /// * `l1_url` - The L1 execution layer RPC endpoint URL + pub const fn new(l2_url: Url, l1_url: Url) -> Self { + Self { l2_url, l1_url, timeout: Duration::from_millis(DEFAULT_FOLLOW_TIMEOUT) } + } + + /// Sets the timeout duration for requests. + /// + /// # Arguments + /// + /// * `timeout` - The timeout duration + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + /// Builds the [`HttpFollowClient`]. + /// + /// # Returns + /// + /// Returns a new `HttpFollowClient` instance or an error if the HTTP client + /// cannot be built. + pub fn build(self) -> Result { + // Build L2 CL client for sync status queries + let l2_client = HttpClientBuilder::default() + .request_timeout(self.timeout) + .build(self.l2_url) + .map_err(|e| FollowClientError::HttpClientBuild(e.to_string()))?; + + // Build L1 provider for block queries + let l1_provider = RootProvider::new_http(self.l1_url); + + Ok(HttpFollowClient { l2_client, l1_provider }) + } +} + +/// HTTP-based follow client for querying sync status from an L2 consensus layer RPC. +/// +/// The [`HttpFollowClient`] wraps JSON-RPC clients to communicate with another +/// rollup node's consensus layer (L2) and the L1 execution layer. +#[derive(Clone, Debug)] +pub struct HttpFollowClient { + /// The L2 consensus layer HTTP client for making RPC calls. + l2_client: HttpClient, + /// The L1 execution layer provider for querying blocks. + l1_provider: RootProvider, +} + +#[async_trait] +impl FollowClient for HttpFollowClient { + async fn get_follow_status(&self) -> Result { + // Fetch the full sync status from the remote rollup node + let sync_status = self.l2_client.op_sync_status().await?; + + // Extract only the fields we need + Ok(FollowStatus { + current_l1: sync_status.current_l1, + safe_l2: sync_status.safe_l2, + finalized_l2: sync_status.finalized_l2, + }) + } + + async fn l1_block_info_by_number(&self, number: u64) -> Result { + // Fetch the block from L1 + let block = self + .l1_provider + .get_block_by_number(BlockNumberOrTag::Number(number)) + .await + .map_err(|e| FollowClientError::L1BlockFetchError(e.to_string()))? + .ok_or(FollowClientError::BlockNotFound)?; + + // Convert to BlockInfo + Ok(BlockInfo { + hash: block.header.hash, + number: block.header.number, + parent_hash: block.header.parent_hash, + timestamp: block.header.timestamp, + }) + } +} diff --git a/bin/node/src/main.rs b/bin/node/src/main.rs index 70333e112b..6d19cacf91 100644 --- a/bin/node/src/main.rs +++ b/bin/node/src/main.rs @@ -9,6 +9,7 @@ pub mod cli; pub mod commands; pub mod flags; +pub mod follow; pub mod metrics; pub(crate) mod version; diff --git a/crates/node/engine/src/lib.rs b/crates/node/engine/src/lib.rs index 0140f708ed..152d9c155f 100644 --- a/crates/node/engine/src/lib.rs +++ b/crates/node/engine/src/lib.rs @@ -42,8 +42,8 @@ mod task_queue; pub use task_queue::{ BuildTask, BuildTaskError, ConsolidateTask, ConsolidateTaskError, Engine, EngineBuildError, EngineResetError, EngineTask, EngineTaskError, EngineTaskErrorSeverity, EngineTaskErrors, - EngineTaskExt, FinalizeTask, FinalizeTaskError, InsertTask, InsertTaskError, SealTask, - SealTaskError, SynchronizeTask, SynchronizeTaskError, + EngineTaskExt, FinalizeTask, FinalizeTaskError, FollowTask, FollowTaskError, InsertTask, + InsertTaskError, SealTask, SealTaskError, SynchronizeTask, SynchronizeTaskError, }; mod attributes; diff --git a/crates/node/engine/src/metrics/mod.rs b/crates/node/engine/src/metrics/mod.rs index 87a65e372e..de3b01596f 100644 --- a/crates/node/engine/src/metrics/mod.rs +++ b/crates/node/engine/src/metrics/mod.rs @@ -59,6 +59,8 @@ impl Metrics { pub const BUILD_TASK_LABEL: &str = "build"; /// Seal task label. pub const SEAL_TASK_LABEL: &str = "seal"; + /// Follow task label. + pub const FOLLOW_TASK_LABEL: &str = "follow"; /// Finalize task label. pub const FINALIZE_TASK_LABEL: &str = "finalize"; diff --git a/crates/node/engine/src/task_queue/tasks/follow/error.rs b/crates/node/engine/src/task_queue/tasks/follow/error.rs new file mode 100644 index 0000000000..99c5fdc429 --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/follow/error.rs @@ -0,0 +1,21 @@ +//! Errors for the [`FollowTask`](super::FollowTask). + +use crate::{EngineTaskErrorSeverity, SynchronizeTaskError}; + +/// An error that occurred while executing a [`FollowTask`]. +/// +/// [`FollowTask`]: super::FollowTask +#[derive(Debug, thiserror::Error)] +pub enum FollowTaskError { + /// An error occurred while synchronizing the engine state. + #[error(transparent)] + Synchronize(#[from] SynchronizeTaskError), +} + +impl crate::EngineTaskError for FollowTaskError { + fn severity(&self) -> EngineTaskErrorSeverity { + match self { + Self::Synchronize(e) => e.severity(), + } + } +} diff --git a/crates/node/engine/src/task_queue/tasks/follow/mod.rs b/crates/node/engine/src/task_queue/tasks/follow/mod.rs new file mode 100644 index 0000000000..26ce722698 --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/follow/mod.rs @@ -0,0 +1,7 @@ +//! Contains the [`FollowTask`] implementation. + +mod error; +pub use error::FollowTaskError; + +mod task; +pub use task::FollowTask; diff --git a/crates/node/engine/src/task_queue/tasks/follow/task.rs b/crates/node/engine/src/task_queue/tasks/follow/task.rs new file mode 100644 index 0000000000..b04fce5eff --- /dev/null +++ b/crates/node/engine/src/task_queue/tasks/follow/task.rs @@ -0,0 +1,70 @@ +//! A task for synchronizing engine state from an external follow source. + +use crate::{ + EngineClient, EngineState, EngineTaskExt, FollowTaskError, SynchronizeTask, + state::EngineSyncStateUpdate, +}; +use async_trait::async_trait; +use derive_more::Constructor; +use kona_genesis::RollupConfig; +use std::{sync::Arc, time::Instant}; + +/// The [`FollowTask`] synchronizes the engine state with externally provided safe and finalized +/// heads from a follow source (e.g., an external L2 Consensus Layer node). +/// +/// This task is used in "follow mode" where the node trusts an external source for safe/finalized +/// heads instead of deriving them from L1 data. It updates the engine state by calling +/// [`SynchronizeTask`] internally. +/// +/// ## Why this task exists +/// +/// Unlike [`ConsolidateTask`] and [`FinalizeTask`], which operate on locally derived or finalized +/// data, [`FollowTask`] handles external state updates. By implementing the [`EngineTaskExt`] +/// trait, it can receive `&mut EngineState` as a parameter, allowing it to inline the +/// `SynchronizeTask::new().execute(state)` call just like other tasks. +/// +/// This is the idiomatic pattern for tasks that need to update engine state: they are enqueued +/// as [`EngineTask`] variants and processed by the engine's task queue, rather than being called +/// directly from the actor's select! loop. +/// +/// [`ConsolidateTask`]: crate::ConsolidateTask +/// [`FinalizeTask`]: crate::FinalizeTask +/// [`EngineTask`]: crate::EngineTask +#[derive(Debug, Clone, Constructor)] +pub struct FollowTask { + /// The engine client. + pub client: Arc, + /// The rollup config. + pub cfg: Arc, + /// The sync state update containing external safe and finalized heads. + pub update: EngineSyncStateUpdate, +} + +#[async_trait] +impl EngineTaskExt for FollowTask { + type Output = (); + + type Error = FollowTaskError; + + async fn execute(&self, state: &mut EngineState) -> Result<(), FollowTaskError> { + let start = Instant::now(); + + // Synchronize the engine state with the external update + SynchronizeTask::new(self.client.clone(), self.cfg.clone(), self.update) + .execute(state) + .await?; + + let duration = start.elapsed(); + + info!( + target: "engine", + unsafe_head = ?self.update.unsafe_head.as_ref().map(|h| h.block_info.number), + safe_head = ?self.update.safe_head.as_ref().map(|h| h.block_info.number), + finalized_head = ?self.update.finalized_head.as_ref().map(|h| h.block_info.number), + ?duration, + "Synchronized engine state with external follow source" + ); + + Ok(()) + } +} diff --git a/crates/node/engine/src/task_queue/tasks/mod.rs b/crates/node/engine/src/task_queue/tasks/mod.rs index d24d75fdf3..c1eda67afc 100644 --- a/crates/node/engine/src/task_queue/tasks/mod.rs +++ b/crates/node/engine/src/task_queue/tasks/mod.rs @@ -23,5 +23,8 @@ pub use consolidate::{ConsolidateTask, ConsolidateTaskError}; mod finalize; pub use finalize::{FinalizeTask, FinalizeTaskError}; +mod follow; +pub use follow::{FollowTask, FollowTaskError}; + mod util; pub(super) use util::{BuildAndSealError, build_and_seal}; diff --git a/crates/node/engine/src/task_queue/tasks/task.rs b/crates/node/engine/src/task_queue/tasks/task.rs index 5a60cc63c7..c6456559cf 100644 --- a/crates/node/engine/src/task_queue/tasks/task.rs +++ b/crates/node/engine/src/task_queue/tasks/task.rs @@ -2,10 +2,10 @@ //! //! [`Engine`]: crate::Engine -use super::{BuildTask, ConsolidateTask, FinalizeTask, InsertTask}; +use super::{BuildTask, ConsolidateTask, FinalizeTask, FollowTask, InsertTask}; use crate::{ BuildTaskError, ConsolidateTaskError, EngineClient, EngineState, FinalizeTaskError, - InsertTaskError, + FollowTaskError, InsertTaskError, task_queue::{SealTask, SealTaskError}, }; use async_trait::async_trait; @@ -73,6 +73,9 @@ pub enum EngineTaskErrors { /// An error that occurred while finalizing an L2 block. #[error(transparent)] Finalize(#[from] FinalizeTaskError), + /// An error that occurred while synchronizing with an external follow source. + #[error(transparent)] + Follow(#[from] FollowTaskError), } impl EngineTaskError for EngineTaskErrors { @@ -83,6 +86,7 @@ impl EngineTaskError for EngineTaskErrors { Self::Seal(inner) => inner.severity(), Self::Consolidate(inner) => inner.severity(), Self::Finalize(inner) => inner.severity(), + Self::Follow(inner) => inner.severity(), } } } @@ -104,6 +108,8 @@ pub enum EngineTask { Consolidate(Box>), /// Finalizes an L2 block Finalize(Box>), + /// Synchronizes engine state with an external follow source + Follow(Box>), } impl EngineTask { @@ -114,6 +120,7 @@ impl EngineTask { Self::Seal(task) => task.execute(state).await?, Self::Consolidate(task) => task.execute(state).await?, Self::Finalize(task) => task.execute(state).await?, + Self::Follow(task) => task.execute(state).await?, Self::Build(task) => { task.execute(state).await?; } @@ -128,6 +135,7 @@ impl EngineTask { Self::Consolidate(_) => crate::Metrics::CONSOLIDATE_TASK_LABEL, Self::Build(_) => crate::Metrics::BUILD_TASK_LABEL, Self::Seal(_) => crate::Metrics::SEAL_TASK_LABEL, + Self::Follow(_) => crate::Metrics::FOLLOW_TASK_LABEL, Self::Finalize(_) => crate::Metrics::FINALIZE_TASK_LABEL, } } @@ -141,6 +149,7 @@ impl PartialEq for EngineTask { (Self::Build(_), Self::Build(_)) | (Self::Seal(_), Self::Seal(_)) | (Self::Consolidate(_), Self::Consolidate(_)) | + (Self::Follow(_), Self::Follow(_)) | (Self::Finalize(_), Self::Finalize(_)) ) } @@ -156,37 +165,47 @@ impl PartialOrd for EngineTask { impl Ord for EngineTask { fn cmp(&self, other: &Self) -> Ordering { - // Order (descending): BuildBlock -> InsertUnsafe -> Consolidate -> Finalize + // Order (descending): Seal -> Build -> Insert -> Follow -> Consolidate -> Finalize // // https://specs.optimism.io/protocol/derivation.html#forkchoice-synchronization // + // - Seal tasks are prioritized highest, as they complete block building. // - Block building jobs are prioritized above all other tasks, to give priority to the // sequencer. BuildTask handles forkchoice updates automatically. - // - InsertUnsafe tasks are prioritized over Consolidate tasks, to ensure that unsafe block - // gossip is imported promptly. - // - Consolidate tasks are prioritized over Finalize tasks, as they advance the safe chain - // via derivation. - // - Finalize tasks have the lowest priority, as they only update finalized status. + // - InsertUnsafe tasks are prioritized over Follow, to ensure that unsafe block gossip is + // imported promptly (important for sequencer + follow mode). + // - Follow tasks update safe/finalized from external sources (follow mode only). + // - Consolidate tasks advance safe chain via derivation (not used when follow mode + // enabled). + // - Finalize tasks have the lowest priority (not used when follow mode enabled). + // + // Note: Consolidate and Finalize tasks are mutually exclusive with Follow tasks in + // practice (follow mode disables derivation), but priorities are defined for completeness. match (self, other) { // Same variant cases (Self::Insert(_), Self::Insert(_)) => Ordering::Equal, - (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal, (Self::Build(_), Self::Build(_)) => Ordering::Equal, (Self::Seal(_), Self::Seal(_)) => Ordering::Equal, + (Self::Follow(_), Self::Follow(_)) => Ordering::Equal, + (Self::Consolidate(_), Self::Consolidate(_)) => Ordering::Equal, (Self::Finalize(_), Self::Finalize(_)) => Ordering::Equal, // SealBlock tasks are prioritized over all others (Self::Seal(_), _) => Ordering::Greater, (_, Self::Seal(_)) => Ordering::Less, - // BuildBlock tasks are prioritized over InsertUnsafe and Consolidate tasks + // BuildBlock tasks are prioritized over all other tasks except Seal (Self::Build(_), _) => Ordering::Greater, (_, Self::Build(_)) => Ordering::Less, - // InsertUnsafe tasks are prioritized over Consolidate and Finalize tasks + // InsertUnsafe tasks are prioritized over Follow, Consolidate, and Finalize (Self::Insert(_), _) => Ordering::Greater, (_, Self::Insert(_)) => Ordering::Less, + // Follow tasks are prioritized over Consolidate and Finalize + (Self::Follow(_), _) => Ordering::Greater, + (_, Self::Follow(_)) => Ordering::Less, + // Consolidate tasks are prioritized over Finalize tasks (Self::Consolidate(_), _) => Ordering::Greater, (_, Self::Consolidate(_)) => Ordering::Less, diff --git a/crates/node/rpc/src/lib.rs b/crates/node/rpc/src/lib.rs index 108c5e56ea..d864791869 100644 --- a/crates/node/rpc/src/lib.rs +++ b/crates/node/rpc/src/lib.rs @@ -38,6 +38,9 @@ pub use jsonrpsee::{ OpP2PApiServer, RollupBoostHealthzApiServer, RollupNodeApiServer, WsServer, }; +#[cfg(feature = "client")] +pub use jsonrpsee::RollupNodeApiClient; + mod rollup; pub use rollup::RollupRpc; diff --git a/crates/node/service/Cargo.toml b/crates/node/service/Cargo.toml index b265ce005f..9881fa4a16 100644 --- a/crates/node/service/Cargo.toml +++ b/crates/node/service/Cargo.toml @@ -21,7 +21,7 @@ kona-genesis.workspace = true kona-derive.workspace = true kona-protocol.workspace = true kona-providers-alloy.workspace = true -kona-rpc.workspace = true +kona-rpc = { workspace = true, features = ["client"] } kona-peers.workspace = true kona-macros.workspace = true @@ -61,7 +61,7 @@ tokio-stream.workspace = true strum = { workspace = true, features = ["derive"] } backon.workspace = true derive_more = { workspace = true, features = ["debug"] } -jsonrpsee = { workspace = true, features = ["server"] } +jsonrpsee = { workspace = true, features = ["server", "http-client"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tower.workspace = true http-body-util.workspace = true diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 28ebcf79dc..1b98fe54d7 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -9,9 +9,9 @@ use futures::{FutureExt, future::OptionFuture}; use kona_derive::{ResetSignal, Signal}; use kona_engine::{ BuildTask, ConsolidateTask, Engine, EngineClient, EngineClientBuilder, - EngineClientBuilderError, EngineQueries, EngineState as InnerEngineState, EngineTask, - EngineTaskError, EngineTaskErrorSeverity, InsertTask, OpEngineClient, RollupBoostServer, - RollupBoostServerArgs, SealTask, SealTaskError, + EngineClientBuilderError, EngineQueries, EngineState as InnerEngineState, + EngineSyncStateUpdate, EngineTask, EngineTaskError, EngineTaskErrorSeverity, FollowTask, + InsertTask, OpEngineClient, RollupBoostServer, RollupBoostServerArgs, SealTask, SealTaskError, }; use kona_genesis::RollupConfig; use kona_protocol::{BlockInfo, L2BlockInfo, OpAttributesWithParent}; @@ -66,7 +66,9 @@ pub struct SealRequest { #[derive(Debug)] pub struct EngineActor { /// A channel to receive [`OpAttributesWithParent`] from the derivation actor. - attributes_rx: mpsc::Receiver, + /// ## Note + /// This is `Some` when derivation is enabled, and `None` when follow mode is used instead. + attributes_rx: Option>, /// The [`EngineConfig`] used to build the actor. builder: EngineConfig, /// A channel to receive build requests. @@ -101,13 +103,19 @@ pub struct EngineActor { /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator /// mode. unsafe_head_tx: Option>, + /// A channel to receive [`crate::FollowStatus`] updates from the follow actor. + /// ## Note + /// This is `Some` when the follow source is configured, and `None` when not. + follow_status_rx: Option>, } /// The outbound data for the [`EngineActor`]. #[derive(Debug)] pub struct EngineInboundData { /// A channel to send [`OpAttributesWithParent`] to the engine actor. - pub attributes_tx: mpsc::Sender, + /// ## Note + /// This is `Some` when derivation is enabled, and `None` when follow mode is used instead. + pub attributes_tx: Option>, /// A channel to use to send [`BuildRequest`] payloads to the engine actor. /// /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator @@ -141,6 +149,10 @@ pub struct EngineInboundData { /// This is `Some` when the node is in sequencer mode, and `None` when the node is in validator /// mode. pub unsafe_head_rx: Option>, + /// A channel to send [`crate::FollowStatus`] updates from the follow actor. + /// + /// This is `Some` when the follow source is configured, and `None` when not. + pub follow_status_tx: Option>, } /// Configuration for the Engine Actor. @@ -171,6 +183,11 @@ pub struct EngineConfig { /// from the sequencer actor. pub mode: NodeMode, + /// Whether follow mode is enabled. + /// When enabled, the node will use external safe/finalized heads from a follow source + /// instead of deriving them from L1. + pub follow_enabled: bool, + /// The rollup boost arguments. pub rollup_boost: RollupBoostServerArgs, } @@ -206,6 +223,7 @@ impl EngineConfig { rollup: self.config, client, engine: Engine::new(state, engine_state_send, engine_queue_length_send), + follow_enabled: self.follow_enabled, }) } } @@ -219,6 +237,8 @@ pub(super) struct EngineActorState { pub(super) client: Arc, /// The [`Engine`] task queue. pub(super) engine: Engine, + /// Whether follow mode is enabled (for conditional derivation signal sending). + pub(super) follow_enabled: bool, } /// The communication context used by the engine actor. @@ -234,6 +254,10 @@ pub struct EngineContext { /// sync is re-triggered can occur, but we will not block derivation on it. pub sync_complete_tx: oneshot::Sender<()>, /// A way for the engine actor to send a [`Signal`] back to the derivation actor. + /// + /// ## Note + /// In follow mode, this channel's receiver is dropped, so sends will fail. + /// EngineActor checks `follow_enabled` before sending to avoid errors. pub derivation_signal_tx: mpsc::Sender, } @@ -257,10 +281,25 @@ impl EngineActor { pub fn new(config: EngineConfig) -> (EngineInboundData, Self) { let (finalized_l1_block_tx, finalized_l1_block_rx) = watch::channel(None); let (inbound_queries_tx, inbound_queries_rx) = mpsc::channel(1024); - let (attributes_tx, attributes_rx) = mpsc::channel(1024); let (unsafe_block_tx, unsafe_block_rx) = mpsc::channel(1024); let (reset_request_tx, reset_request_rx) = mpsc::channel(1024); + // Only create attributes channel when follow mode is disabled + let (attributes_tx, attributes_rx) = if !config.follow_enabled { + let (tx, rx) = mpsc::channel(1024); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + + // Only create follow_status channel when follow mode is enabled + let (follow_status_tx, follow_status_rx) = if config.follow_enabled { + let (tx, rx) = mpsc::channel(1024); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + let sequencer_channels = if config.mode.is_sequencer() { let (build_request_tx, build_request_rx) = mpsc::channel(1024); let (seal_request_tx, seal_request_rx) = mpsc::channel(1024); @@ -300,6 +339,7 @@ impl EngineActor { finalizer: L2Finalizer::new(finalized_l1_block_rx), rollup_boost_admin_query_rx, rollup_boost_health_query_rx, + follow_status_rx, }; let outbound_data = EngineInboundData { @@ -313,6 +353,7 @@ impl EngineActor { seal_request_tx: sequencer_channels.seal_request_tx, unsafe_block_tx, unsafe_head_rx: sequencer_channels.unsafe_head_rx, + follow_status_tx, }; (outbound_data, actor) @@ -405,13 +446,17 @@ impl EngineActorState { // condition where the derivation actor receives the pre-reset safe head. self.maybe_update_safe_head(engine_l2_safe_head_tx); - // Signal the derivation actor to reset. - let signal = ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; - match derivation_signal_tx.send(signal.signal()).await { - Ok(_) => info!(target: "engine", "Sent reset signal to derivation actor"), - Err(err) => { - error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor"); - return Err(EngineError::ChannelClosed); + // Signal the derivation actor to reset (only when derivation is enabled). + // In follow mode, skip sending because derivation actor doesn't exist. + if !self.follow_enabled { + let signal = + ResetSignal { l2_safe_head, l1_origin, system_config: Some(system_config) }; + match derivation_signal_tx.send(signal.signal()).await { + Ok(_) => info!(target: "engine", "Sent reset signal to derivation actor"), + Err(err) => { + error!(target: "engine", ?err, "Failed to send reset signal to the derivation actor"); + return Err(EngineError::ChannelClosed); + } } } @@ -688,7 +733,7 @@ impl NodeActor for EngineActor { ))); state.engine.enqueue(task); } - attributes = self.attributes_rx.recv() => { + Some(attributes) = OptionFuture::from(self.attributes_rx.as_mut().map(|rx| rx.recv())), if self.attributes_rx.is_some() => { let Some(attributes) = attributes else { error!(target: "engine", "Attributes receiver closed unexpectedly"); cancellation.cancel(); @@ -714,6 +759,127 @@ impl NodeActor for EngineActor { // chain. self.finalizer.try_finalize_next(&mut state).await; } + Some(follow_status) = OptionFuture::from(self.follow_status_rx.as_mut().map(|rx| rx.recv())), if self.follow_status_rx.is_some() => { + let Some(status) = follow_status else { + warn!(target: "engine", "Follow status receiver closed unexpectedly"); + // Don't cancel the whole engine if follow fails, just log and continue + self.follow_status_rx = None; + continue; + }; + + // Get current local state + // Note: FollowActor gates on el_sync_complete signal, so it won't send + // updates until initial EL sync completes + let local_unsafe = state.engine.state().sync_state.unsafe_head(); + let local_safe = state.engine.state().sync_state.safe_head(); + let external_safe = status.safe_l2; + let external_finalized = status.finalized_l2; + + // Log current state with full context (matches op-node logging) + info!( + target: "engine", + local_unsafe_number = local_unsafe.block_info.number, + local_unsafe_hash = %local_unsafe.block_info.hash, + local_safe_number = local_safe.block_info.number, + local_safe_hash = %local_safe.block_info.hash, + external_safe_number = external_safe.block_info.number, + external_safe_hash = %external_safe.block_info.hash, + external_finalized_number = external_finalized.block_info.number, + "Follow Source: Processing external refs" + ); + + // Helper: Create update that promotes all heads (unsafe, safe, finalized) + let create_full_update = || EngineSyncStateUpdate { + unsafe_head: Some(external_safe), + cross_unsafe_head: Some(external_safe), + safe_head: Some(external_safe), + local_safe_head: Some(external_safe), + finalized_head: Some(external_finalized), + }; + + // Helper: Create update that only updates safe and finalized (preserves unsafe) + let create_safe_only_update = || EngineSyncStateUpdate { + unsafe_head: None, + cross_unsafe_head: None, + safe_head: Some(external_safe), + local_safe_head: Some(external_safe), + finalized_head: Some(external_finalized), + }; + + // Helper: Enqueue a FollowTask with the given update + let mut enqueue_update = |update: EngineSyncStateUpdate| { + let task = EngineTask::Follow(Box::new(FollowTask::new( + state.client.clone(), + state.rollup.clone(), + update, + ))); + state.engine.enqueue(task); + }; + + // Case 1: External safe ahead of local unsafe + if local_unsafe.block_info.number < external_safe.block_info.number { + info!( + target: "engine", + local_unsafe_number = local_unsafe.block_info.number, + external_safe_number = external_safe.block_info.number, + "Follow Source: External safe ahead of current unsafe" + ); + enqueue_update(create_full_update()); + continue; + } + + // Query local EL for block at external safe number + let local_block_result = state.client + .l2_block_info_by_label(alloy_eips::BlockNumberOrTag::Number(external_safe.block_info.number)) + .await; + + match local_block_result { + // Case 2b: Query error + Err(err) => { + debug!( + target: "engine", + ?err, + external_safe_number = external_safe.block_info.number, + "Follow Source: Failed to fetch external safe from local EL" + ); + // Skip update on error + } + // Case 2a: Block not found (EL still syncing) + Ok(None) => { + debug!( + target: "engine", + external_safe_number = external_safe.block_info.number, + local_unsafe_number = local_unsafe.block_info.number, + "Follow Source: EL Sync in progress" + ); + enqueue_update(create_safe_only_update()); + } + // Block found locally + Ok(Some(local_block)) => { + if local_block.block_info.hash == external_safe.block_info.hash { + // Case 3: Hashes match (consolidation) + debug!( + target: "engine", + external_safe_number = external_safe.block_info.number, + external_safe_hash = %external_safe.block_info.hash, + "Follow Source: Consolidation" + ); + enqueue_update(create_safe_only_update()); + } else { + // Case 4: Hashes differ (reorg required) + warn!( + target: "engine", + external_safe_number = external_safe.block_info.number, + external_safe_hash = %external_safe.block_info.hash, + local_block_number = local_block.block_info.number, + local_block_hash = %local_block.block_info.hash, + "Follow Source: Reorg detected. May trigger EL sync" + ); + enqueue_update(create_full_update()); + } + } + } + } } } } diff --git a/crates/node/service/src/actors/follow/actor.rs b/crates/node/service/src/actors/follow/actor.rs new file mode 100644 index 0000000000..cf4df8596a --- /dev/null +++ b/crates/node/service/src/actors/follow/actor.rs @@ -0,0 +1,278 @@ +//! [`NodeActor`] implementation for a follow client that periodically polls another L2 CL node's +//! sync status. + +use crate::{ + FollowClient, FollowStatus, NodeActor, + actors::{CancellableContext, follow::error::FollowActorError}, +}; +use async_trait::async_trait; +use std::time::Duration; +use tokio::{ + select, + sync::{mpsc, oneshot}, + time::interval, +}; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; + +/// Default polling interval in seconds for querying the follow source. +const DEFAULT_FOLLOW_POLL_INTERVAL: u64 = 2; + +/// An actor that periodically polls a follow client for sync status updates. +/// +/// The [`FollowActor`] queries another L2 consensus layer node's sync status at regular +/// intervals and sends the results to the engine actor. +/// +/// Similar to the derivation actor, the follow actor waits for the initial EL sync to +/// complete before starting to poll the external source. +#[derive(Debug)] +pub struct FollowActor +where + FC: FollowClient, +{ + /// The follow client for querying sync status. + follow_client: FC, + /// Channel to send follow status updates to the engine. + follow_status_tx: mpsc::Sender, + /// Receiver for the initial EL sync completion signal. + /// The actor will not poll the external source until this signal is received. + el_sync_complete_rx: oneshot::Receiver<()>, + /// The polling interval for querying the follow source. + poll_interval: Duration, + /// The cancellation token, shared between all tasks. + cancellation: CancellationToken, +} + +impl FollowActor +where + FC: FollowClient, +{ + /// Creates a new [`FollowActor`] with the default polling interval. + /// + /// # Arguments + /// + /// * `follow_client` - The follow client for querying sync status + /// * `follow_status_tx` - Channel to send follow status updates to the engine + /// * `el_sync_complete_rx` - Receiver for initial EL sync completion signal + /// * `cancellation` - Cancellation token for graceful shutdown + #[allow(clippy::missing_const_for_fn)] + pub fn new( + follow_client: FC, + follow_status_tx: mpsc::Sender, + el_sync_complete_rx: oneshot::Receiver<()>, + cancellation: CancellationToken, + ) -> Self { + Self::new_with_interval( + follow_client, + follow_status_tx, + el_sync_complete_rx, + cancellation, + Duration::from_secs(DEFAULT_FOLLOW_POLL_INTERVAL), + ) + } + + /// Creates a new [`FollowActor`] with a custom polling interval. + /// + /// # Arguments + /// + /// * `follow_client` - The follow client for querying sync status + /// * `follow_status_tx` - Channel to send follow status updates to the engine + /// * `el_sync_complete_rx` - Receiver for initial EL sync completion signal + /// * `cancellation` - Cancellation token for graceful shutdown + /// * `poll_interval` - Custom polling interval + pub const fn new_with_interval( + follow_client: FC, + follow_status_tx: mpsc::Sender, + el_sync_complete_rx: oneshot::Receiver<()>, + cancellation: CancellationToken, + poll_interval: Duration, + ) -> Self { + Self { follow_client, follow_status_tx, el_sync_complete_rx, poll_interval, cancellation } + } + + /// Validates L1 block canonicality and sends the status to the engine if valid. + /// + /// Checks that the L1 blocks referenced in the follow status (external safe L1 origin, + /// external finalized L1 origin, and current L1) are canonical on the L1 chain. + /// + /// # Arguments + /// + /// * `status` - The follow status to validate and send + /// + /// # Returns + /// + /// Returns `Ok(())` if validation passes and status is sent successfully, or an error + /// if validation fails or the send fails. + async fn validate_and_send(&self, status: FollowStatus) -> Result<(), FollowActorError> { + // Validate external safe L1 origin + if !self + .validate_l1_block(status.safe_l2.l1_origin.number, status.safe_l2.l1_origin.hash) + .await? + { + warn!( + target: "follow_actor", + l1_number = status.safe_l2.l1_origin.number, + l1_hash = ?status.safe_l2.l1_origin.hash, + "Invalid L1 origin for external safe block, dropping update" + ); + return Ok(()); + } + + // Validate external finalized L1 origin + if !self + .validate_l1_block( + status.finalized_l2.l1_origin.number, + status.finalized_l2.l1_origin.hash, + ) + .await? + { + warn!( + target: "follow_actor", + l1_number = status.finalized_l2.l1_origin.number, + l1_hash = ?status.finalized_l2.l1_origin.hash, + "Invalid L1 origin for external finalized block, dropping update" + ); + return Ok(()); + } + + // Validate current L1 block + if !self.validate_l1_block(status.current_l1.number, status.current_l1.hash).await? { + warn!( + target: "follow_actor", + l1_number = status.current_l1.number, + l1_hash = ?status.current_l1.hash, + "Invalid current L1 block, dropping update" + ); + return Ok(()); + } + + // All validations passed, send to engine + self.follow_status_tx + .send(status) + .await + .map_err(|e| FollowActorError::ChannelClosed(e.to_string()))?; + + Ok(()) + } + + /// Validates that an L1 block is canonical on the L1 chain. + /// + /// Fetches the canonical block at the given number and compares the hash. + /// + /// # Arguments + /// + /// * `number` - The L1 block number + /// * `hash` - The expected L1 block hash + /// + /// # Returns + /// + /// Returns `Ok(true)` if the block is canonical, `Ok(false)` if it's not canonical, + /// or an error if the L1 RPC call fails. + async fn validate_l1_block( + &self, + number: u64, + hash: alloy_primitives::B256, + ) -> Result { + // Fetch the canonical block at this number from L1 + let canonical_block = self + .follow_client + .l1_block_info_by_number(number) + .await + .map_err(|e| FollowActorError::L1ValidationError(e.to_string()))?; + + // Compare hashes + Ok(canonical_block.hash == hash) + } +} + +#[async_trait] +impl NodeActor for FollowActor +where + FC: FollowClient + 'static, +{ + type Error = FollowActorError; + type StartData = (); + + /// Start the main processing loop. + /// + /// Waits for the initial EL sync to complete, then periodically polls the follow client + /// for sync status. Continues until cancellation is requested. + async fn start(mut self, _: Self::StartData) -> Result<(), Self::Error> { + let cancel = self.cancellation.clone(); + let mut ticker = interval(self.poll_interval); + + info!( + target: "follow_actor", + interval_secs = ?self.poll_interval.as_secs(), + "Starting follow actor" + ); + + loop { + select! { + _ = cancel.cancelled() => { + info!( + target: "follow_actor", + "Received shutdown signal. Exiting follow actor task." + ); + return Ok(()); + }, + _ = &mut self.el_sync_complete_rx, if !self.el_sync_complete_rx.is_terminated() => { + info!( + target: "follow_actor", + "Initial EL sync complete, starting to poll external source" + ); + }, + _ = ticker.tick() => { + // Skip polling until initial EL sync completes (similar to derivation actor) + if !self.el_sync_complete_rx.is_terminated() { + trace!( + target: "follow_actor", + "Engine not ready, skipping follow poll" + ); + continue; + } + + // Query the follow client for sync status + match self.follow_client.get_follow_status().await { + Ok(status) => { + info!( + target: "follow_actor", + current_l1_number = status.current_l1.number, + current_l1_hash = ?status.current_l1.hash, + safe_l2_number = status.safe_l2.block_info.number, + safe_l2_hash = ?status.safe_l2.block_info.hash, + finalized_l2_number = status.finalized_l2.block_info.number, + finalized_l2_hash = ?status.finalized_l2.block_info.hash, + "Received follow status update" + ); + + // Validate L1 block canonicality before sending to engine + if let Err(e) = self.validate_and_send(status).await { + warn!( + target: "follow_actor", + error = ?e, + "Failed to validate or send follow status" + ); + } + } + Err(e) => { + warn!( + target: "follow_actor", + error = ?e, + "Failed to get follow status, will retry on next interval" + ); + } + } + } + } + } + } +} + +impl CancellableContext for FollowActor +where + FC: FollowClient, +{ + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} diff --git a/crates/node/service/src/actors/follow/error.rs b/crates/node/service/src/actors/follow/error.rs new file mode 100644 index 0000000000..a2df8fdfe7 --- /dev/null +++ b/crates/node/service/src/actors/follow/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +/// The error type for the `FollowActor`. +#[derive(Error, Debug)] +pub enum FollowActorError { + /// Error from the follow client. + #[error("Follow client error: {0}")] + FollowClient(#[from] crate::FollowClientError), + + /// Channel closed error. + #[error("Channel closed: {0}")] + ChannelClosed(String), + + /// L1 validation error. + #[error("L1 validation error: {0}")] + L1ValidationError(String), +} diff --git a/crates/node/service/src/actors/follow/mod.rs b/crates/node/service/src/actors/follow/mod.rs new file mode 100644 index 0000000000..040f5241f0 --- /dev/null +++ b/crates/node/service/src/actors/follow/mod.rs @@ -0,0 +1,5 @@ +mod actor; +pub use actor::FollowActor; + +mod error; +pub use error::FollowActorError; diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index fd5083d4a5..a7c79e12d2 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -38,6 +38,9 @@ pub use sequencer::{ SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig, }; +mod follow; +pub use follow::{FollowActor, FollowActorError}; + #[cfg(test)] pub use engine::MockBlockBuildingClient; #[cfg(test)] diff --git a/crates/node/service/src/follow.rs b/crates/node/service/src/follow.rs new file mode 100644 index 0000000000..1ece567014 --- /dev/null +++ b/crates/node/service/src/follow.rs @@ -0,0 +1,174 @@ +//! Follow client for querying sync status from an L2 CL RPC. + +use alloy_eips::BlockNumberOrTag; +use alloy_provider::{Provider, RootProvider}; +use async_trait::async_trait; +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use kona_protocol::{BlockInfo, L2BlockInfo}; +use kona_rpc::RollupNodeApiClient; +use std::time::Duration; +use thiserror::Error; +use url::Url; + +/// Default timeout for follow client requests in milliseconds. +const DEFAULT_FOLLOW_TIMEOUT: u64 = 5000; + +/// An error that occurred in the [`FollowClient`]. +#[derive(Error, Debug)] +pub enum FollowClientError { + /// An error occurred while building the HTTP client + #[error("Failed to build HTTP client: {0}")] + HttpClientBuild(String), + + /// An RPC error occurred + #[error("RPC error: {0}")] + RpcError(#[from] jsonrpsee::core::ClientError), + + /// An error occurred while fetching L1 block + #[error("Failed to fetch L1 block: {0}")] + L1BlockFetchError(String), + + /// Block not found + #[error("Block not found")] + BlockNotFound, +} + +/// Simplified follow status containing only the essential sync information. +/// +/// This struct contains a subset of fields from [`kona_protocol::SyncStatus`], +/// focusing only on the fields needed for following another rollup node. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct FollowStatus { + /// The current L1 block that the derivation process is last idled at. + pub current_l1: BlockInfo, + /// The safe L2 block ref, derived from the L1 chain. + pub safe_l2: L2BlockInfo, + /// The finalized L2 block ref, derived from finalized L1 information. + pub finalized_l2: L2BlockInfo, +} + +/// Follow client trait for querying sync status from an L2 consensus layer RPC. +/// +/// This trait defines the interface for communicating with another rollup node's +/// consensus layer to fetch its synchronization status and querying L1 blocks. +/// The main reason this trait exists is for mocking and unit testing. +#[async_trait] +pub trait FollowClient: Send + Sync { + /// Gets the synchronization status from the follow source. + /// + /// Calls the `optimism_syncStatus` RPC method on the remote rollup node, + /// extracts the essential fields, and returns a simplified status. + /// + /// # Returns + /// + /// Returns the [`FollowStatus`] containing only the current L1, safe L2, + /// and finalized L2 blocks from the remote rollup node, or an error if + /// the RPC call fails. + async fn get_follow_status(&self) -> Result; + + /// Fetches the L1 [`BlockInfo`] by block number. + /// + /// Queries the L1 execution layer for the block at the given number and + /// returns the block information. + /// + /// # Arguments + /// + /// * `number` - The L1 block number to fetch + /// + /// # Returns + /// + /// Returns the [`BlockInfo`] for the requested L1 block, or an error if + /// the block cannot be fetched or does not exist. + async fn l1_block_info_by_number(&self, number: u64) -> Result; +} + +/// HTTP-based follow client for querying sync status from an L2 consensus layer RPC. +/// +/// The [`HttpFollowClient`] wraps JSON-RPC clients to communicate with another +/// rollup node's consensus layer (L2) and the L1 execution layer. +#[derive(Clone, Debug)] +pub struct HttpFollowClient { + /// The L2 consensus layer HTTP client for making RPC calls. + l2_client: HttpClient, + /// The L1 execution layer provider for querying blocks. + l1_provider: RootProvider, +} + +impl HttpFollowClient { + /// Creates a new [`HttpFollowClient`] from the given configuration. + /// + /// # Arguments + /// + /// * `l2_url` - The L2 consensus layer RPC endpoint URL + /// * `l1_url` - The L1 execution layer RPC endpoint URL + /// + /// # Returns + /// + /// Returns a new `HttpFollowClient` instance or an error if the HTTP client + /// cannot be built. + pub fn new(l2_url: Url, l1_url: Url) -> Result { + Self::new_with_timeout(l2_url, l1_url, Duration::from_millis(DEFAULT_FOLLOW_TIMEOUT)) + } + + /// Creates a new [`HttpFollowClient`] with a custom timeout. + /// + /// # Arguments + /// + /// * `l2_url` - The L2 consensus layer RPC endpoint URL + /// * `l1_url` - The L1 execution layer RPC endpoint URL + /// * `timeout` - The timeout duration for requests + /// + /// # Returns + /// + /// Returns a new `HttpFollowClient` instance or an error if the HTTP client + /// cannot be built. + pub fn new_with_timeout( + l2_url: Url, + l1_url: Url, + timeout: Duration, + ) -> Result { + // Build L2 CL client for sync status queries + let l2_client = HttpClientBuilder::default() + .request_timeout(timeout) + .build(l2_url) + .map_err(|e| FollowClientError::HttpClientBuild(e.to_string()))?; + + // Build L1 provider for block queries + let l1_provider = RootProvider::new_http(l1_url); + + Ok(Self { l2_client, l1_provider }) + } +} + +#[async_trait] +impl FollowClient for HttpFollowClient { + async fn get_follow_status(&self) -> Result { + // Fetch the full sync status from the remote rollup node + let sync_status = self.l2_client.op_sync_status().await?; + + // Extract only the fields we need + Ok(FollowStatus { + current_l1: sync_status.current_l1, + safe_l2: sync_status.safe_l2, + finalized_l2: sync_status.finalized_l2, + }) + } + + async fn l1_block_info_by_number(&self, number: u64) -> Result { + // Fetch the block from L1 + let block = self + .l1_provider + .get_block_by_number(BlockNumberOrTag::Number(number)) + .await + .map_err(|e| FollowClientError::L1BlockFetchError(e.to_string()))? + .ok_or(FollowClientError::BlockNotFound)?; + + // Convert to BlockInfo + Ok(BlockInfo { + hash: block.header.hash, + number: block.header.number, + parent_hash: block.header.parent_hash, + timestamp: block.header.timestamp, + }) + } +} diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index 35921d0291..0f403f1451 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -11,24 +11,28 @@ extern crate tracing; mod service; pub use service::{ - InteropMode, L1Config, L1ConfigBuilder, NodeMode, RollupNode, RollupNodeBuilder, + FollowClientConfig, InteropMode, L1Config, L1ConfigBuilder, NodeMode, RollupNode, + RollupNodeBuilder, }; +mod follow; +pub use follow::{FollowClient, FollowClientError, FollowStatus, HttpFollowClient}; + mod actors; pub use actors::{ BlockBuildingClient, BlockEngineError, BlockEngineResult, BlockStream, BuildRequest, CancellableContext, Conductor, ConductorClient, ConductorError, DelayedL1OriginSelectorProvider, DerivationActor, DerivationBuilder, DerivationContext, DerivationError, DerivationInboundChannels, DerivationState, EngineActor, EngineConfig, - EngineContext, EngineError, EngineInboundData, InboundDerivationMessage, L1OriginSelector, - L1OriginSelectorError, L1OriginSelectorProvider, L1WatcherActor, L1WatcherActorError, - L2Finalizer, NetworkActor, NetworkActorError, NetworkBuilder, NetworkBuilderError, - NetworkConfig, NetworkContext, NetworkDriver, NetworkDriverError, NetworkHandler, - NetworkInboundData, NodeActor, OriginSelector, PipelineBuilder, QueuedBlockBuildingClient, - QueuedSequencerAdminAPIClient, QueuedUnsafePayloadGossipClient, ResetRequest, RpcActor, - RpcActorError, RpcContext, SealRequest, SequencerActor, SequencerActorError, - SequencerAdminQuery, SequencerConfig, UnsafePayloadGossipClient, - UnsafePayloadGossipClientError, + EngineContext, EngineError, EngineInboundData, FollowActor, FollowActorError, + InboundDerivationMessage, L1OriginSelector, L1OriginSelectorError, L1OriginSelectorProvider, + L1WatcherActor, L1WatcherActorError, L2Finalizer, NetworkActor, NetworkActorError, + NetworkBuilder, NetworkBuilderError, NetworkConfig, NetworkContext, NetworkDriver, + NetworkDriverError, NetworkHandler, NetworkInboundData, NodeActor, OriginSelector, + PipelineBuilder, QueuedBlockBuildingClient, QueuedSequencerAdminAPIClient, + QueuedUnsafePayloadGossipClient, ResetRequest, RpcActor, RpcActorError, RpcContext, + SealRequest, SequencerActor, SequencerActorError, SequencerAdminQuery, SequencerConfig, + UnsafePayloadGossipClient, UnsafePayloadGossipClientError, }; mod metrics; diff --git a/crates/node/service/src/service/builder.rs b/crates/node/service/src/service/builder.rs index c441223414..e6abbda47d 100644 --- a/crates/node/service/src/service/builder.rs +++ b/crates/node/service/src/service/builder.rs @@ -36,6 +36,15 @@ pub struct L1ConfigBuilder { pub slot_duration_override: Option, } +/// Configuration for initializing a follow client. +#[derive(Debug, Clone)] +pub struct FollowClientConfig { + /// The L2 consensus layer RPC URL. + pub l2_url: Url, + /// The L1 execution layer RPC URL. + pub l1_url: Url, +} + /// The [`RollupNodeBuilder`] is used to construct a [`RollupNode`] service. #[derive(Debug)] pub struct RollupNodeBuilder { @@ -55,6 +64,8 @@ pub struct RollupNodeBuilder { pub sequencer_config: Option, /// Whether to run the node in interop mode. pub interop_mode: InteropMode, + /// Optional follow client configuration for querying another L2 CL for sync status. + pub follow_client_config: Option, } impl RollupNodeBuilder { @@ -66,6 +77,7 @@ impl RollupNodeBuilder { engine_config: EngineConfig, p2p_config: NetworkConfig, rpc_config: Option, + follow_client_config: Option, ) -> Self { Self { config, @@ -76,6 +88,7 @@ impl RollupNodeBuilder { rpc_config, interop_mode: InteropMode::default(), sequencer_config: None, + follow_client_config, } } @@ -145,6 +158,7 @@ impl RollupNodeBuilder { rpc_builder: self.rpc_config, p2p_config, sequencer_config, + follow_client_config: self.follow_client_config, } } } diff --git a/crates/node/service/src/service/mod.rs b/crates/node/service/src/service/mod.rs index 101fce8d62..6b21df9daf 100644 --- a/crates/node/service/src/service/mod.rs +++ b/crates/node/service/src/service/mod.rs @@ -4,7 +4,7 @@ //! [`NodeActor`]: crate::NodeActor mod builder; -pub use builder::{L1ConfigBuilder, RollupNodeBuilder}; +pub use builder::{FollowClientConfig, L1ConfigBuilder, RollupNodeBuilder}; mod mode; pub use mode::{InteropMode, NodeMode}; diff --git a/crates/node/service/src/service/node.rs b/crates/node/service/src/service/node.rs index c32a0c172e..21bd781d3f 100644 --- a/crates/node/service/src/service/node.rs +++ b/crates/node/service/src/service/node.rs @@ -5,15 +5,13 @@ use crate::{ L1WatcherActor, NetworkActor, NetworkBuilder, NetworkConfig, NetworkContext, NodeActor, NodeMode, QueuedBlockBuildingClient, QueuedSequencerAdminAPIClient, RpcActor, RpcContext, SequencerActor, SequencerConfig, - actors::{ - BlockStream, DerivationInboundChannels, EngineInboundData, NetworkInboundData, - QueuedUnsafePayloadGossipClient, - }, + actors::{BlockStream, EngineInboundData, NetworkInboundData, QueuedUnsafePayloadGossipClient}, }; use alloy_eips::BlockNumberOrTag; use alloy_provider::RootProvider; use kona_derive::StatefulAttributesBuilder; use kona_genesis::{L1ChainConfig, RollupConfig}; +use kona_protocol::L2BlockInfo; use kona_providers_alloy::{AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient}; use kona_rpc::RpcBuilder; use op_alloy_network::Optimism; @@ -60,6 +58,8 @@ pub struct RollupNode { pub(crate) p2p_config: NetworkConfig, /// The [`SequencerConfig`] for the node. pub(crate) sequencer_config: SequencerConfig, + /// The follow client configuration for querying another L2 CL for sync status. + pub(crate) follow_client_config: Option, } impl RollupNode { @@ -144,16 +144,38 @@ impl RollupNode { // Create a global cancellation token for graceful shutdown of tasks. let cancellation = CancellationToken::new(); - // Create the derivation actor. + // Create the derivation actor (only when follow mode is disabled). + // In follow mode, create dummy channels and save el_sync_complete_rx for FollowActor. let ( - DerivationInboundChannels { - derivation_signal_tx, - l1_head_updates_tx, - engine_l2_safe_head_tx, - el_sync_complete_tx, - }, + derivation_signal_tx, + l1_head_updates_tx, + engine_l2_safe_head_tx, + el_sync_complete_tx, derivation, - ) = DerivationActor::new(self.derivation_builder()); + el_sync_complete_rx, + ); + + if !self.engine_config.follow_enabled { + // Normal mode: Create derivation actor which creates all channels + let (channels, actor) = DerivationActor::new(self.derivation_builder()); + derivation_signal_tx = channels.derivation_signal_tx; + l1_head_updates_tx = channels.l1_head_updates_tx; + engine_l2_safe_head_tx = channels.engine_l2_safe_head_tx; + el_sync_complete_tx = channels.el_sync_complete_tx; + derivation = Some(actor); + el_sync_complete_rx = None; + } else { + // Follow mode: Create dummy channels (no derivation actor). + // Save el_sync_complete_rx to pass to FollowActor. + use tokio::sync::{mpsc, oneshot, watch}; + (l1_head_updates_tx, _) = watch::channel(None); + (engine_l2_safe_head_tx, _) = watch::channel(L2BlockInfo::default()); + let (tx, rx) = oneshot::channel(); + el_sync_complete_tx = tx; + el_sync_complete_rx = Some(rx); + (derivation_signal_tx, _) = mpsc::channel(1024); + derivation = None; + } // Create the engine actor. let ( @@ -161,6 +183,7 @@ impl RollupNode { attributes_tx, build_request_tx, finalized_l1_block_tx, + follow_status_tx, inbound_queries_tx: engine_rpc, reset_request_tx, rollup_boost_admin_query_tx: rollup_boost_admin_rpc, @@ -270,6 +293,50 @@ impl RollupNode { (None, None) }; + // Create the follow actor if configured + let follow_actor = self.follow_client_config.as_ref().map_or_else(|| None, |follow_config| { + info!( + target: "rollup_node", + l2_url = %follow_config.l2_url, + l1_url = %follow_config.l1_url, + "Initializing follow actor" + ); + match crate::HttpFollowClient::new( + follow_config.l2_url.clone(), + follow_config.l1_url.clone(), + ) { + Ok(follow_client) => { + // Get the follow_status_tx sender and el_sync_complete_rx from earlier setup + match (follow_status_tx.clone(), el_sync_complete_rx) { + (Some(status_tx), Some(el_sync_complete_rx)) => { + Some(crate::FollowActor::new( + follow_client, + status_tx, + el_sync_complete_rx, + cancellation.clone(), + )) + } + (None, _) => { + error!(target: "rollup_node", "Follow status channel not available"); + None + } + (_, None) => { + error!(target: "rollup_node", "EL sync complete receiver not available for follow actor"); + None + } + } + } + Err(e) => { + error!( + target: "rollup_node", + error = ?e, + "Failed to initialize follow client, follow actor will not be started" + ); + None + } + } + }); + crate::service::spawn_and_wait!( cancellation, actors = [ @@ -287,16 +354,18 @@ impl RollupNode { } )), sequencer_actor.map(|s| (s, ())), + follow_actor.map(|f| (f, ())), Some(( network, NetworkContext { blocks: unsafe_block_tx, cancellation: cancellation.clone() } )), Some((l1_watcher, ())), - Some(( - derivation, + derivation.map(|d| ( + d, DerivationContext { reset_request_tx: reset_request_tx.clone(), - derived_attributes_tx: attributes_tx, + derived_attributes_tx: attributes_tx + .expect("attributes_tx must be Some when derivation is enabled"), cancellation: cancellation.clone(), } )),