diff --git a/bin/node/src/commands/net.rs b/bin/node/src/commands/net.rs index 0e7c27f5d1..3bfddc65a3 100644 --- a/bin/node/src/commands/net.rs +++ b/bin/node/src/commands/net.rs @@ -3,8 +3,9 @@ use crate::flags::{GlobalArgs, P2PArgs, RpcArgs}; use clap::Parser; use futures::future::OptionFuture; +use jsonrpsee::{RpcModule, server::Server}; use kona_p2p::{NetworkBuilder, P2pRpcRequest}; -use kona_rpc::{NetworkRpc, OpP2PApiServer, RpcConfig}; +use kona_rpc::{NetworkRpc, OpP2PApiServer, RpcBuilder}; use tracing::{debug, info, warn}; use url::Url; @@ -51,20 +52,22 @@ impl NetCommand { let signer = args.genesis_signer()?; info!(target: "net", "Genesis block signer: {:?}", signer); - // Setup the RPC server with the P2P RPC Module - let (tx, rx) = tokio::sync::mpsc::channel(1024); - let p2p_module = NetworkRpc::new(tx.clone()).into_rpc(); - let rpc_config = RpcConfig::from(self.rpc); + let rpc_config = RpcBuilder::from(self.rpc); - if rpc_config.disabled { + let (handle, tx, rx) = if rpc_config.disabled { info!(target: "net", "RPC server disabled"); + (None, None, None) } else { info!(target: "net", socket = ?rpc_config.socket, "Starting RPC server"); - } + let (tx, rx) = tokio::sync::mpsc::channel(1024); + + // Setup the RPC server with the P2P RPC Module + let mut launcher = RpcModule::new(()); + launcher.merge(NetworkRpc::new(tx.clone()).into_rpc())?; - let mut launcher = rpc_config.as_launcher(); - launcher.merge(p2p_module)?; - let handle = launcher.launch().await?; + let server = Server::builder().build(rpc_config.socket).await?; + (Some(server.start(launcher)), Some(tx), Some(rx)) + }; // Get the rollup config from the args let rollup_config = args @@ -74,9 +77,9 @@ impl NetCommand { // Start the Network Stack self.p2p.check_ports()?; let p2p_config = self.p2p.config(&rollup_config, args, self.l1_eth_rpc).await?; - let mut network = NetworkBuilder::from(p2p_config).with_rpc_receiver(rx).build()?; + let mut network = NetworkBuilder::from(p2p_config).build()?; let mut recv = network.unsafe_block_recv(); - network.start().await?; + network.start(rx).await?; info!(target: "net", "Network started, receiving blocks."); // On an interval, use the rpc tx to request stats about the p2p network. @@ -90,9 +93,13 @@ impl NetCommand { Err(e) => debug!(target: "net", "Failed to receive unsafe payload: {:?}", e), } } - _ = interval.tick() => { + _ = interval.tick(), if tx.is_some() => { + let Some(ref sender) = tx else { + unreachable!("tx must be some (see above)"); + }; + let (otx, mut orx) = tokio::sync::oneshot::channel(); - if let Err(e) = tx.send(P2pRpcRequest::PeerCount(otx)).await { + if let Err(e) = sender.send(P2pRpcRequest::PeerCount(otx)).await { warn!(target: "net", "Failed to send network rpc request: {:?}", e); continue; } diff --git a/bin/node/src/commands/node.rs b/bin/node/src/commands/node.rs index feeeb5f43f..131f7747d9 100644 --- a/bin/node/src/commands/node.rs +++ b/bin/node/src/commands/node.rs @@ -232,8 +232,9 @@ impl NodeCommand { .with_supervisor_rpc_config(supervisor_rpc_config.unwrap_or_default()) .build() .start() - .await - .map_err(Into::into) + .await; + + Ok(()) } /// Get the L2 rollup config, either from a file or the superchain registry. diff --git a/bin/node/src/flags/rpc.rs b/bin/node/src/flags/rpc.rs index c3db5d1b1b..da48c17034 100644 --- a/bin/node/src/flags/rpc.rs +++ b/bin/node/src/flags/rpc.rs @@ -3,7 +3,7 @@ //! Flags for configuring the RPC server. use clap::Parser; -use kona_rpc::RpcConfig; +use kona_rpc::RpcBuilder; use std::{ net::{IpAddr, SocketAddr}, path::PathBuf, @@ -44,7 +44,7 @@ impl Default for RpcArgs { } } -impl From for RpcConfig { +impl From for RpcBuilder { fn from(args: RpcArgs) -> Self { Self { disabled: args.rpc_disabled, diff --git a/crates/node/p2p/README.md b/crates/node/p2p/README.md index 92781e9d6a..53881738bb 100644 --- a/crates/node/p2p/README.md +++ b/crates/node/p2p/README.md @@ -44,7 +44,8 @@ async fn main() { // Starting the network spawns gossip and discovery service // handling in a new thread so this is a non-blocking, // synchronous operation that does not need to be awaited. - network.start().await.expect("Failed to start network driver"); + // If running an RPC server, you'd pass a channel to handle RPC requests as an input to the `start` method + network.start(None).await.expect("Failed to start network driver"); } ``` diff --git a/crates/node/p2p/src/net/builder.rs b/crates/node/p2p/src/net/builder.rs index 41e322d203..a4eb38c02c 100644 --- a/crates/node/p2p/src/net/builder.rs +++ b/crates/node/p2p/src/net/builder.rs @@ -12,7 +12,7 @@ use tokio::sync::broadcast::Sender as BroadcastSender; use crate::{ Broadcast, Config, Discv5Builder, GossipDriverBuilder, Network, NetworkBuilderError, - P2pRpcRequest, discv5::LocalNode, gossip::GaterConfig, + discv5::LocalNode, gossip::GaterConfig, }; /// Constructs a [`Network`] for the OP Stack Consensus Layer. @@ -22,8 +22,6 @@ pub struct NetworkBuilder { discovery: Discv5Builder, /// The gossip driver. gossip: GossipDriverBuilder, - /// A receiver for network RPC requests. - rpc_recv: Option>, /// A broadcast sender for the unsafe block payloads. payload_tx: Option>, /// A local signer for payloads. @@ -75,7 +73,6 @@ impl NetworkBuilder { gossip_addr, keypair, ), - rpc_recv: None, payload_tx: None, local_signer: None, } @@ -139,11 +136,6 @@ impl NetworkBuilder { Self { gossip: self.gossip.with_config(config), ..self } } - /// Sets the rpc receiver for the [`crate::Network`]. - pub fn with_rpc_receiver(self, rpc_recv: tokio::sync::mpsc::Receiver) -> Self { - Self { rpc_recv: Some(rpc_recv), ..self } - } - /// Sets the [`Discv5Config`] for the [`crate::Discv5Driver`]. pub fn with_discovery_config(self, config: Discv5Config) -> Self { Self { discovery: self.discovery.with_discovery_config(config), ..self } @@ -168,10 +160,9 @@ impl NetworkBuilder { } /// Builds the [`Network`]. - pub fn build(mut self) -> Result { + pub fn build(self) -> Result { let (gossip, unsafe_block_signer_sender) = self.gossip.build()?; let discovery = self.discovery.build()?; - let rpc = self.rpc_recv.take(); let payload_tx = self.payload_tx.unwrap_or(tokio::sync::broadcast::channel(256).0); let (_, publish_rx) = tokio::sync::mpsc::channel(256); @@ -179,7 +170,6 @@ impl NetworkBuilder { gossip, discovery, unsafe_block_signer_sender, - rpc, broadcast: Broadcast::new(payload_tx), publish_rx, local_signer: self.local_signer, @@ -250,7 +240,6 @@ mod tests { rollup_config: RollupConfig { l2_chain_id: 10, ..Default::default() }, signer, }) - .with_rpc_receiver(tokio::sync::mpsc::channel(1).1) .with_gossip_address(gossip_addr.clone()) .with_discovery_address(disc_enr) .with_discovery_config(ConfigBuilder::new(disc_listen.into()).build()) @@ -293,7 +282,6 @@ mod tests { .with_gossip_address(gossip_addr) .with_discovery_address(disc) .with_discovery_config(discovery_config) - .with_rpc_receiver(tokio::sync::mpsc::channel(1).1) .build() .unwrap(); diff --git a/crates/node/p2p/src/net/driver.rs b/crates/node/p2p/src/net/driver.rs index 5957ad919c..171a2c0b31 100644 --- a/crates/node/p2p/src/net/driver.rs +++ b/crates/node/p2p/src/net/driver.rs @@ -3,7 +3,7 @@ use alloy_primitives::{Address, hex}; use alloy_signer::SignerSync; use alloy_signer_local::PrivateKeySigner; -use futures::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use futures::{AsyncReadExt, AsyncWriteExt, StreamExt, future::OptionFuture}; use libp2p::TransportError; use libp2p_stream::IncomingStreams; use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelope, OpNetworkPayloadEnvelope}; @@ -30,11 +30,6 @@ pub struct Network { pub(crate) broadcast: Broadcast, /// Channel to send unsafe signer updates. pub(crate) unsafe_block_signer_sender: Sender
, - /// Handler for RPC Requests. - /// - /// This is allowed to be optional since it may not be desirable - /// run a networking stack with RPC access. - pub(crate) rpc: Option>, /// A channel to receive unsafe blocks and send them through the gossip layer. pub(crate) publish_rx: tokio::sync::mpsc::Receiver, /// The swarm instance. @@ -110,8 +105,10 @@ impl Network { /// Starts the Discv5 peer discovery & libp2p services /// and continually listens for new peers and messages to handle - pub async fn start(mut self) -> Result<(), TransportError> { - let mut rpc = self.rpc.unwrap_or_else(|| tokio::sync::mpsc::channel(1024).1); + pub async fn start( + mut self, + mut rpc: Option>, + ) -> Result<(), TransportError> { let (handler, mut enr_receiver) = self.discovery.start(); let mut broadcast = self.broadcast; @@ -235,7 +232,9 @@ impl Network { warn!(err = ?send_err, "Impossible to send a request to the discovery handler. The channel connection is dropped."); } }, - req = rpc.recv() => { + // Note that we're using `if rpc.is_some()` to ensure that the branch does not always immediately resolve + // to avoid CPU contention. + Some(req) = OptionFuture::from(rpc.as_mut().map(|r| r.recv())), if rpc.is_some() => { let Some(req) = req else { error!(target: "node::p2p", "The rpc receiver channel has closed"); return; diff --git a/crates/node/rpc/src/config.rs b/crates/node/rpc/src/config.rs index 093cc90bbb..12f3748e6b 100644 --- a/crates/node/rpc/src/config.rs +++ b/crates/node/rpc/src/config.rs @@ -1,13 +1,10 @@ //! Contains the RPC Configuration. -use jsonrpsee::RpcModule; - -use crate::RpcBuilder; use std::{net::SocketAddr, path::PathBuf}; /// The RPC configuration. #[derive(Debug, Clone)] -pub struct RpcConfig { +pub struct RpcBuilder { /// Disable the rpc server. pub disabled: bool, /// Prevent the rpc server from being restarted. @@ -23,15 +20,24 @@ pub struct RpcConfig { pub ws_enabled: bool, } -impl RpcConfig { - /// Converts the [`RpcConfig`] into a [`RpcBuilder`]. - pub fn as_launcher(self) -> RpcBuilder { - RpcBuilder { config: self, module: RpcModule::new(()) } +impl RpcBuilder { + /// Returns whether WebSocket RPC endpoint is enabled + pub const fn ws_enabled(&self) -> bool { + self.ws_enabled + } + + /// Returns the socket address of the [`RpcBuilder`]. + pub const fn socket(&self) -> SocketAddr { + self.socket + } + + /// Returns the number of times the RPC server will attempt to restart if it stops. + pub const fn restart_count(&self) -> u32 { + if self.no_restart { 0 } else { 3 } } -} -impl From for RpcBuilder { - fn from(config: RpcConfig) -> Self { - config.as_launcher() + /// Sets the given [`SocketAddr`] on the [`RpcBuilder`]. + pub fn set_addr(self, addr: SocketAddr) -> Self { + Self { socket: addr, ..self } } } diff --git a/crates/node/rpc/src/launcher.rs b/crates/node/rpc/src/launcher.rs deleted file mode 100644 index f8e10bcfb0..0000000000 --- a/crates/node/rpc/src/launcher.rs +++ /dev/null @@ -1,156 +0,0 @@ -//! Contains the [`RpcBuilder`] service. - -use jsonrpsee::server::{RegisterMethodError, RpcModule, Server, ServerHandle}; -use std::net::SocketAddr; - -use crate::RpcConfig; - -/// An error that can occur when using the [`RpcBuilder`]. -#[derive(Debug, thiserror::Error)] -pub enum RpcLauncherError { - /// An error occurred while starting the [`Server`]. - #[error("failed to start server: {0}")] - ServerStart(#[from] std::io::Error), - /// Failed to register a method on the [`RpcModule`]. - #[error("failed to register method: {0}")] - RegisterMethod(#[from] RegisterMethodError), -} - -impl PartialEq for RpcLauncherError { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::ServerStart(e1), Self::ServerStart(e2)) => e1.kind() == e2.kind(), - _ => false, - } - } -} - -/// A healthcheck response for the RPC server. -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] -pub struct HealthzResponse { - /// The application version. - version: String, -} - -/// Launches a [`Server`] using a set of [`RpcModule`]s. -#[derive(Debug, Clone)] -pub struct RpcBuilder { - /// The RPC configuration associated with the [`RpcBuilder`]. - pub(crate) config: RpcConfig, - /// The modules to register on the RPC server. - pub(crate) module: RpcModule<()>, -} - -impl RpcBuilder { - /// Creates a new [`RpcBuilder`]. - pub fn new(config: RpcConfig) -> Self { - Self { config, module: RpcModule::new(()) } - } - - /// Creates a new [`RpcBuilder`] that is disabled. - pub fn new_disabled() -> Self { - Self { - config: RpcConfig { - disabled: true, - no_restart: false, - // Use a dummy socket address. The RPC server is disabled, so it will not be used. - socket: SocketAddr::from(([127, 0, 0, 1], 8080)), - enable_admin: false, - admin_persistence: None, - ws_enabled: false, - }, - module: RpcModule::new(()), - } - } - - /// Returns whether WebSocket RPC endpoint is enabled - pub const fn ws_enabled(&self) -> bool { - self.config.ws_enabled - } - - /// Merges a given [`RpcModule`] into the [`RpcBuilder`]. - pub fn merge(&mut self, other: RpcModule) -> Result<(), RegisterMethodError> { - self.module.merge(other)?; - - Ok(()) - } - - /// Returns the socket address of the [`RpcBuilder`]. - pub const fn socket(&self) -> SocketAddr { - self.config.socket - } - - /// Returns the number of times the RPC server will attempt to restart if it stops. - pub const fn restart_count(&self) -> u32 { - if self.config.no_restart { 0 } else { 3 } - } - - /// Sets the given [`SocketAddr`] on the [`RpcBuilder`]. - pub const fn set_addr(mut self, addr: SocketAddr) -> Self { - self.config.socket = addr; - - self - } - - /// Registers the healthz endpoint on the [`RpcBuilder`]. - pub fn with_healthz(mut self) -> Result { - self.module.register_method("healthz", |_, _, _| { - let response = HealthzResponse { version: std::env!("CARGO_PKG_VERSION").to_string() }; - jsonrpsee::core::RpcResult::Ok(response) - })?; - - Ok(self) - } - - /// Launches the jsonrpsee [`Server`]. - /// - /// If the RPC server is disabled, this will return `Ok(None)`. - /// - /// ## Errors - /// - /// - [`RpcLauncherError::ServerStart`] if the server fails to start. - pub async fn launch(self) -> Result, RpcLauncherError> { - if self.config.disabled { - return Ok(None); - } - - let server = Server::builder().build(self.config.socket).await?; - Ok(Some(server.start(self.module))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_launch_no_modules() { - let launcher = RpcBuilder::new(RpcConfig { - disabled: false, - socket: SocketAddr::from(([127, 0, 0, 1], 8080)), - no_restart: false, - enable_admin: false, - admin_persistence: None, - ws_enabled: false, - }); - let result = launcher.launch().await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_launch_with_modules() { - let mut launcher = RpcBuilder::new(RpcConfig { - disabled: false, - socket: SocketAddr::from(([127, 0, 0, 1], 8081)), - no_restart: false, - enable_admin: false, - admin_persistence: None, - ws_enabled: false, - }); - launcher.merge(RpcModule::new(())).expect("module merge"); - launcher.merge::<()>(RpcModule::new(())).expect("module merge"); - launcher.merge(RpcModule::new(())).expect("module merge"); - let result = launcher.launch().await; - assert!(result.is_ok()); - } -} diff --git a/crates/node/rpc/src/lib.rs b/crates/node/rpc/src/lib.rs index eca0cf7fb6..37e616542a 100644 --- a/crates/node/rpc/src/lib.rs +++ b/crates/node/rpc/src/lib.rs @@ -12,10 +12,7 @@ extern crate tracing; mod admin; mod config; -pub use config::RpcConfig; - -mod launcher; -pub use launcher::{HealthzResponse, RpcBuilder, RpcLauncherError}; +pub use config::RpcBuilder; mod net; pub use net::NetworkRpc; @@ -58,3 +55,10 @@ pub use l1_watcher::{L1State, L1WatcherQueries, L1WatcherQuerySender}; mod ws; pub use ws::WsRPC; + +/// A healthcheck response for the RPC server. +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub struct HealthzResponse { + /// The application version. + pub version: String, +} diff --git a/crates/node/service/src/actors/engine/actor.rs b/crates/node/service/src/actors/engine/actor.rs index 7baf9d73e7..e4d4b5c830 100644 --- a/crates/node/service/src/actors/engine/actor.rs +++ b/crates/node/service/src/actors/engine/actor.rs @@ -119,7 +119,7 @@ pub struct EngineContext { /// A channel to receive [`OpExecutionPayloadEnvelope`] from the network actor. pub unsafe_block_rx: mpsc::Receiver, /// Handler for inbound queries to the engine. - pub inbound_queries: mpsc::Receiver, + pub inbound_queries: Option>, /// The cancellation token, shared between all tasks. pub cancellation: CancellationToken, /// The [`L2Finalizer`], used to finalize L2 blocks. @@ -354,7 +354,7 @@ impl NodeActor for EngineActor { let mut state = self.builder.build_state(); // Start the engine query server in a separate task to avoid blocking the main task. - let handle = state.start_query_task(inbound_queries); + let handle = inbound_queries.map(|inbound_queries| state.start_query_task(inbound_queries)); // The sync complete tx is consumed after the first successful send. Hence we need to wrap // it in an `Option` to ensure we satisfy the borrow checker. @@ -376,8 +376,12 @@ impl NodeActor for EngineActor { biased; _ = cancellation.cancelled() => { - warn!(target: "engine", "EngineActor received shutdown signal. Shutting down engine query task."); - handle.abort(); + warn!(target: "engine", "EngineActor received shutdown signal."); + + if let Some(handle) = handle { + warn!(target: "engine", "Aborting engine query task."); + handle.abort(); + } return Ok(()); } diff --git a/crates/node/service/src/actors/l1_watcher_rpc.rs b/crates/node/service/src/actors/l1_watcher_rpc.rs index 9dd6d8ba6c..209da3c6c3 100644 --- a/crates/node/service/src/actors/l1_watcher_rpc.rs +++ b/crates/node/service/src/actors/l1_watcher_rpc.rs @@ -63,7 +63,7 @@ pub struct L1WatcherRpcOutboundChannels { #[derive(Debug)] pub struct L1WatcherRpcContext { /// The inbound queries to the L1 watcher. - pub inbound_queries: tokio::sync::mpsc::Receiver, + pub inbound_queries: Option>, /// The cancellation token, shared between all tasks. pub cancellation: CancellationToken, } @@ -197,8 +197,9 @@ impl NodeActor for L1WatcherRpc { ) .into_stream(); - let inbound_query_processor = - self.start_query_processor(inbound_queries, self.latest_head.subscribe()); + let inbound_query_processor = inbound_queries.map(|inbound_queries| { + self.start_query_processor(inbound_queries, self.latest_head.subscribe()) + }); // Start the main processing loop. loop { @@ -211,7 +212,9 @@ impl NodeActor for L1WatcherRpc { ); // Kill the inbound query processor. - inbound_query_processor.abort(); + if let Some(inbound_query_processor) = inbound_query_processor { + inbound_query_processor.abort(); + } return Ok(()); }, diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index 92d43a995c..6e0752ded6 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -20,7 +20,7 @@ pub use supervisor::{ }; mod rpc; -pub use rpc::{RpcActor, RpcActorError, RpcContext}; +pub use rpc::{RpcActor, RpcActorError, RpcContext, RpcOutboundData}; mod derivation; pub use derivation::{ diff --git a/crates/node/service/src/actors/network.rs b/crates/node/service/src/actors/network.rs index 9f81895f7a..12a4454c64 100644 --- a/crates/node/service/src/actors/network.rs +++ b/crates/node/service/src/actors/network.rs @@ -4,7 +4,7 @@ use crate::{NodeActor, actors::CancellableContext}; use alloy_primitives::Address; use async_trait::async_trait; use derive_more::Debug; -use kona_p2p::{NetworkBuilder, NetworkBuilderError}; +use kona_p2p::{NetworkBuilder, NetworkBuilderError, P2pRpcRequest}; use libp2p::TransportError; use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; use thiserror::Error; @@ -68,6 +68,11 @@ impl NetworkActor { pub struct NetworkContext { /// A channel to receive the unsafe block signer address. pub signer: mpsc::Receiver
, + /// Handler for RPC Requests. + /// + /// This is allowed to be optional since it may not be desirable + /// run a networking stack with RPC access. + pub rpc: Option>, /// Cancels the network actor. pub cancellation: CancellationToken, } @@ -91,7 +96,7 @@ impl NodeActor for NetworkActor { async fn start( mut self, - NetworkContext { mut signer, cancellation }: Self::InboundData, + NetworkContext { mut signer, rpc, cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { let mut driver = self.config.build()?; @@ -102,7 +107,7 @@ impl NodeActor for NetworkActor { let unsafe_block_signer = driver.unsafe_block_signer_sender(); // Start the network driver. - driver.start().await?; + driver.start(rpc).await?; loop { select! { diff --git a/crates/node/service/src/actors/rpc.rs b/crates/node/service/src/actors/rpc.rs index 9e9582c0dc..cbfb03eceb 100644 --- a/crates/node/service/src/actors/rpc.rs +++ b/crates/node/service/src/actors/rpc.rs @@ -2,8 +2,17 @@ use crate::{NodeActor, actors::CancellableContext}; use async_trait::async_trait; -use jsonrpsee::core::RegisterMethodError; -use kona_rpc::{RpcBuilder, RpcLauncherError}; +use kona_p2p::P2pRpcRequest; +use kona_rpc::{HealthzResponse, OpP2PApiServer, RollupNodeApiServer, WsRPC, WsServer}; + +use jsonrpsee::{ + RpcModule, + core::RegisterMethodError, + server::{Server, ServerHandle}, +}; +use kona_engine::EngineQueries; +use kona_rpc::{L1WatcherQueries, NetworkRpc, RollupRpc, RpcBuilder}; +use tokio::sync::mpsc; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; /// An error returned by the [`RpcActor`]. @@ -13,8 +22,8 @@ pub enum RpcActorError { #[error("Failed to register the healthz endpoint")] RegisterHealthz(#[from] RegisterMethodError), /// Failed to launch the RPC server. - #[error("Failed to launch the RPC server")] - LaunchFailed(#[from] RpcLauncherError), + #[error(transparent)] + LaunchFailed(#[from] std::io::Error), /// The [`RpcActor`]'s RPC server stopped unexpectedly. #[error("RPC server stopped unexpectedly")] ServerStopped, @@ -27,13 +36,35 @@ pub enum RpcActorError { #[derive(Debug)] pub struct RpcActor { /// A launcher for the rpc. - launcher: RpcBuilder, + config: RpcBuilder, + /// The network rpc sender. + network: mpsc::Sender, + /// The l1 watcher queries sender. + l1_watcher_queries: mpsc::Sender, + /// The engine query sender. + engine_query: mpsc::Sender, } impl RpcActor { - /// Constructs a new [`RpcActor`] given the [`RpcBuilder`] and [`CancellationToken`]. - pub const fn new(launcher: RpcBuilder) -> Self { - Self { launcher } + /// Constructs a new [`RpcActor`] given the [`RpcBuilder`]. + pub fn new(config: RpcBuilder) -> (RpcOutboundData, Self) { + let (network_sender, network_recv) = mpsc::channel(1024); + let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024); + let (engine_query_sender, engine_query_recv) = mpsc::channel(1024); + + ( + RpcOutboundData { + network: network_recv, + l1_watcher_queries: l1_watcher_queries_recv, + engine_query: engine_query_recv, + }, + Self { + config, + network: network_sender, + l1_watcher_queries: l1_watcher_queries_sender, + engine_query: engine_query_sender, + }, + ) } } @@ -50,24 +81,71 @@ impl CancellableContext for RpcContext { } } +/// The outbound data for the RPC actor. +#[derive(Debug)] +pub struct RpcOutboundData { + /// The network rpc receiver. + pub network: mpsc::Receiver, + /// The l1 watcher queries receiver. + pub l1_watcher_queries: mpsc::Receiver, + /// The engine query receiver. + pub engine_query: mpsc::Receiver, +} + +/// Launches the jsonrpsee [`Server`]. +/// +/// If the RPC server is disabled, this will return `Ok(None)`. +/// +/// ## Errors +/// +/// - [`std::io::Error`] if the server fails to start. +async fn launch( + config: &RpcBuilder, + module: RpcModule<()>, +) -> Result, std::io::Error> { + if config.disabled { + return Ok(None); + } + + let server = Server::builder().build(config.socket).await?; + Ok(Some(server.start(module))) +} + #[async_trait] impl NodeActor for RpcActor { type Error = RpcActorError; type InboundData = RpcContext; - type OutboundData = (); + type OutboundData = RpcOutboundData; type Builder = RpcBuilder; fn build(state: Self::Builder) -> (Self::OutboundData, Self) { - ((), Self { launcher: state }) + Self::new(state) } async fn start( mut self, RpcContext { cancellation }: Self::InboundData, ) -> Result<(), Self::Error> { - let restarts = self.launcher.restart_count(); + let mut modules = RpcModule::new(()); + + modules.register_method("healthz", |_, _, _| { + let response = HealthzResponse { version: std::env!("CARGO_PKG_VERSION").to_string() }; + jsonrpsee::core::RpcResult::Ok(response) + })?; + + modules.merge(NetworkRpc::new(self.network).into_rpc())?; + + // Create context for communication between actors. + let rollup_rpc = RollupRpc::new(self.engine_query.clone(), self.l1_watcher_queries); + modules.merge(rollup_rpc.into_rpc())?; + + if self.config.ws_enabled() { + modules.merge(WsRPC::new(self.engine_query).into_rpc())?; + } + + let restarts = self.config.restart_count(); - let Some(mut handle) = self.launcher.clone().launch().await? else { + let Some(mut handle) = launch(&self.config, modules.clone()).await? else { // The RPC server is disabled, so we can return Ok. return Ok(()); }; @@ -75,7 +153,7 @@ impl NodeActor for RpcActor { for _ in 0..=restarts { tokio::select! { _ = handle.clone().stopped() => { - match self.launcher.clone().launch().await { + match launch(&self.config, modules.clone()).await { Ok(Some(h)) => handle = h, Ok(None) => { // The RPC server is disabled, so we can return Ok. @@ -102,3 +180,44 @@ impl NodeActor for RpcActor { return Err(RpcActorError::ServerStopped); } } + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + + use super::*; + + #[tokio::test] + async fn test_launch_no_modules() { + let launcher = RpcBuilder { + disabled: false, + socket: SocketAddr::from(([127, 0, 0, 1], 8080)), + no_restart: false, + enable_admin: false, + admin_persistence: None, + ws_enabled: false, + }; + let result = launch(&launcher, RpcModule::new(())).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_launch_with_modules() { + let launcher = RpcBuilder { + disabled: false, + socket: SocketAddr::from(([127, 0, 0, 1], 8081)), + no_restart: false, + enable_admin: false, + admin_persistence: None, + ws_enabled: false, + }; + let mut modules = RpcModule::new(()); + + modules.merge(RpcModule::new(())).expect("module merge"); + modules.merge(RpcModule::new(())).expect("module merge"); + modules.merge(RpcModule::new(())).expect("module merge"); + + let result = launch(&launcher, modules).await; + assert!(result.is_ok()); + } +} diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index a063b34c0f..6faafd03e7 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -10,9 +10,7 @@ extern crate tracing; mod service; -pub use service::{ - InteropMode, NodeMode, RollupNode, RollupNodeBuilder, RollupNodeError, RollupNodeService, -}; +pub use service::{InteropMode, NodeMode, RollupNode, RollupNodeBuilder, RollupNodeService}; mod actors; pub use actors::{ @@ -22,8 +20,8 @@ pub use actors::{ L1OriginSelector, L1OriginSelectorError, L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError, L1WatcherRpcOutboundChannels, L1WatcherRpcState, L2Finalizer, NetworkActor, NetworkActorError, NetworkContext, NetworkOutboundData, NodeActor, PipelineBuilder, RpcActor, RpcActorError, - RpcContext, RuntimeActor, RuntimeContext, RuntimeOutboundData, RuntimeState, SequencerActor, - SequencerActorError, SequencerBuilder, SequencerContext, SequencerOutboundData, + RpcContext, RpcOutboundData, RuntimeActor, RuntimeContext, RuntimeOutboundData, RuntimeState, + SequencerActor, SequencerActorError, SequencerBuilder, SequencerContext, SequencerOutboundData, SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt, SupervisorOutboundData, SupervisorRpcServerExt, }; diff --git a/crates/node/service/src/service/core.rs b/crates/node/service/src/service/core.rs index 61c1b8821d..8c4734082f 100644 --- a/crates/node/service/src/service/core.rs +++ b/crates/node/service/src/service/core.rs @@ -5,18 +5,15 @@ use crate::{ SequencerOutboundData, SupervisorActorContext, SupervisorExt, actors::{ DerivationOutboundChannels, EngineOutboundData, L1WatcherRpcOutboundChannels, - NetworkOutboundData, PipelineBuilder, RuntimeOutboundData, SupervisorOutboundData, + NetworkOutboundData, PipelineBuilder, RpcOutboundData, RuntimeOutboundData, + SupervisorOutboundData, }, service::spawn_and_wait, }; use async_trait::async_trait; use kona_derive::{AttributesBuilder, Pipeline, SignalReceiver}; -use kona_rpc::{ - NetworkRpc, OpP2PApiServer, RollupNodeApiServer, RollupRpc, RpcBuilder, RpcLauncherError, - WsRPC, WsServer, -}; +use kona_rpc::RpcBuilder; use std::fmt::Display; -use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; /// The [`RollupNodeService`] trait defines the common interface for running a rollup node. @@ -97,12 +94,12 @@ pub trait RollupNodeService { >; /// The type of rpc actor to use for the service. - type RpcActor: NodeActor; - - /// The type of error for the service's entrypoint. - type Error: From - + From - + std::fmt::Debug; + type RpcActor: NodeActor< + Error: Display, + InboundData = RpcContext, + OutboundData = RpcOutboundData, + Builder = RpcBuilder, + >; /// The mode of operation for the node. fn mode(&self) -> NodeMode; @@ -113,8 +110,8 @@ pub trait RollupNodeService { /// Returns a derivation builder for the node. fn derivation_builder(&self) -> ::Builder; - /// Creates a network builder and the [`NetworkRpc`] for the node. - fn network_builder(&self) -> (::Builder, NetworkRpc); + /// Creates a network builder for the node. + fn network_builder(&self) -> ::Builder; /// Returns a runtime builder for the node. fn runtime_builder(&self) -> Option<::Builder>; @@ -122,8 +119,8 @@ pub trait RollupNodeService { /// Returns an engine builder for the node. fn engine_builder(&self) -> ::Builder; - /// Returns the [`RpcBuilder`] for the node. - fn rpc_builder(&self) -> RpcBuilder; + /// Returns an rpc builder for the node. + fn rpc_builder(&self) -> Option<::Builder>; /// Returns the sequencer builder for the node. fn sequencer_builder(&self) -> ::Builder; @@ -132,7 +129,7 @@ pub trait RollupNodeService { async fn supervisor_ext(&self) -> Option; /// Starts the rollup node service. - async fn start(&self) -> Result<(), Self::Error> { + async fn start(&self) { // Create a global cancellation token for graceful shutdown of tasks. let cancellation = CancellationToken::new(); @@ -178,38 +175,33 @@ pub trait RollupNodeService { ) = Self::EngineActor::build(engine_builder); // Create the p2p actor. - let (network_builder, p2p_rpc_module) = self.network_builder(); - let (NetworkOutboundData { unsafe_block }, network) = - Self::NetworkActor::build(network_builder); + let driver = self.network_builder(); + let (NetworkOutboundData { unsafe_block }, network) = Self::NetworkActor::build(driver); // Create the RPC server actor. - let (engine_query_recv, l1_watcher_queries_recv, (_, rpc)) = { - let mut rpc_builder = self.rpc_builder().with_healthz()?; - - rpc_builder.merge(p2p_rpc_module.into_rpc())?; - - // Create context for communication between actors. - let (l1_watcher_queries_sender, l1_watcher_queries_recv) = mpsc::channel(1024); - let (engine_query_sender, engine_query_recv) = mpsc::channel(1024); - let rollup_rpc = RollupRpc::new(engine_query_sender.clone(), l1_watcher_queries_sender); - rpc_builder.merge(rollup_rpc.into_rpc())?; - - if rpc_builder.ws_enabled() { - rpc_builder - .merge(WsRPC::new(engine_query_sender).into_rpc()) - .map_err(Self::Error::from)?; - } + let rpc_builder = self.rpc_builder(); + let (rpc_outbound_data, rpc) = rpc_builder.map(Self::RpcActor::build).unzip(); + + let (network_rpc, da_watcher_rpc, engine_rpc) = rpc_outbound_data + .map(|rpc_outbound_data| { + ( + Some(rpc_outbound_data.network), + Some(rpc_outbound_data.l1_watcher_queries), + Some(rpc_outbound_data.engine_query), + ) + }) + .unwrap_or_default(); - (engine_query_recv, l1_watcher_queries_recv, Self::RpcActor::build(rpc_builder)) + let network_context = NetworkContext { + signer: block_signer_sender, + rpc: network_rpc, + cancellation: cancellation.clone(), }; let (_, sequencer) = Self::SequencerActor::build(self.sequencer_builder()); - let network_context = - NetworkContext { signer: block_signer_sender, cancellation: cancellation.clone() }; - let da_watcher_context = L1WatcherRpcContext { - inbound_queries: l1_watcher_queries_recv, + inbound_queries: da_watcher_rpc, cancellation: cancellation.clone(), }; @@ -226,13 +218,11 @@ pub trait RollupNodeService { runtime_config_rx: runtime_config, attributes_rx: attributes_out, unsafe_block_rx: unsafe_block, - inbound_queries: engine_query_recv, + inbound_queries: engine_rpc, cancellation: cancellation.clone(), finalizer: L2Finalizer::new(latest_finalized), }; - let rpc_context = RpcContext { cancellation: cancellation.clone() }; - let sequencer_context = SequencerContext { latest_payload_rx: None, unsafe_head: engine_l2_safe_head_rx, @@ -243,14 +233,13 @@ pub trait RollupNodeService { cancellation, actors = [ runtime.map(|r| (r, RuntimeContext { cancellation: cancellation.clone() })), + rpc.map(|r| (r, RpcContext { cancellation: cancellation.clone() })), Some((network, network_context)), Some((da_watcher, da_watcher_context)), Some((derivation, derivation_context)), Some((engine, engine_context)), - Some((rpc, rpc_context)), (self.mode() == NodeMode::Sequencer).then_some((sequencer, sequencer_context)) ] ); - Ok(()) } } diff --git a/crates/node/service/src/service/mod.rs b/crates/node/service/src/service/mod.rs index 9b6c2059c5..5277efa56e 100644 --- a/crates/node/service/src/service/mod.rs +++ b/crates/node/service/src/service/mod.rs @@ -7,7 +7,7 @@ mod core; pub use core::RollupNodeService; mod standard; -pub use standard::{RollupNode, RollupNodeBuilder, RollupNodeError}; +pub use standard::{RollupNode, RollupNodeBuilder}; mod mode; pub use mode::{InteropMode, NodeMode}; diff --git a/crates/node/service/src/service/standard/builder.rs b/crates/node/service/src/service/standard/builder.rs index 7a7caf2ee7..45081b2faf 100644 --- a/crates/node/service/src/service/standard/builder.rs +++ b/crates/node/service/src/service/standard/builder.rs @@ -18,7 +18,7 @@ use url::Url; use kona_genesis::RollupConfig; use kona_p2p::Config; use kona_providers_alloy::OnlineBeaconClient; -use kona_rpc::{RpcBuilder, RpcConfig, SupervisorRpcConfig}; +use kona_rpc::{RpcBuilder, SupervisorRpcConfig}; /// The [`RollupNodeBuilder`] is used to construct a [`RollupNode`] service. #[derive(Debug, Default)] @@ -38,7 +38,7 @@ pub struct RollupNodeBuilder { /// The [`Config`]. p2p_config: Option, /// An RPC Configuration. - rpc_config: Option, + rpc_config: Option, /// An RPC Configuration for the supervisor rpc. supervisor_rpc_config: SupervisorRpcConfig, /// An interval to load the runtime config. @@ -95,8 +95,8 @@ impl RollupNodeBuilder { Self { p2p_config: Some(config), ..self } } - /// Sets the [`RpcConfig`] on the [`RollupNodeBuilder`]. - pub fn with_rpc_config(self, rpc_config: RpcConfig) -> Self { + /// Sets the [`RpcBuilder`] on the [`RollupNodeBuilder`]. + pub fn with_rpc_config(self, rpc_config: RpcBuilder) -> Self { Self { rpc_config: Some(rpc_config), ..self } } @@ -138,9 +138,6 @@ impl RollupNodeBuilder { let rpc_client = RpcClient::new(http_hyper, false); let l2_provider = RootProvider::::new(rpc_client); - let rpc_builder = - self.rpc_config.map(|c| c.as_launcher()).unwrap_or(RpcBuilder::new_disabled()); - let rollup_config = Arc::new(self.config); let engine_builder = EngineBuilder { config: Arc::clone(&rollup_config), @@ -169,7 +166,7 @@ impl RollupNodeBuilder { l1_beacon, l2_provider, engine_builder, - rpc_builder, + rpc_builder: self.rpc_config, runtime_builder, p2p_config, // By default, the supervisor rpc config is disabled. diff --git a/crates/node/service/src/service/standard/error.rs b/crates/node/service/src/service/standard/error.rs deleted file mode 100644 index 2be8306f96..0000000000 --- a/crates/node/service/src/service/standard/error.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! Contains the error type for the [`crate::RollupNode`]. - -use jsonrpsee::server::RegisterMethodError; -use kona_derive::PipelineErrorKind; -use kona_p2p::NetworkBuilderError; -use kona_providers_alloy::AlloyChainProviderError; -use kona_rpc::RpcLauncherError; -use kona_sources::SyncStartError; - -/// Errors that can occur during the operation of the [`crate::RollupNode`]. -#[derive(thiserror::Error, Debug)] -pub enum RollupNodeError { - /// An error occurred while finding the sync starting point. - #[error(transparent)] - SyncStart(#[from] SyncStartError), - /// An error occurred while creating the derivation pipeline. - #[error(transparent)] - OnlinePipeline(#[from] PipelineErrorKind), - /// An error occurred while initializing the derivation pipeline. - #[error(transparent)] - AlloyChainProvider(#[from] AlloyChainProviderError), - /// An error occurred while initializing the network. - #[error(transparent)] - Network(#[from] NetworkBuilderError), - /// An error occurred while launching the RPC server. - #[error(transparent)] - RpcLauncher(#[from] RpcLauncherError), - /// An error occurred while registering RPC methods. - #[error(transparent)] - RegisterMethod(#[from] RegisterMethodError), -} diff --git a/crates/node/service/src/service/standard/mod.rs b/crates/node/service/src/service/standard/mod.rs index b8f0d01a6a..11b04efe89 100644 --- a/crates/node/service/src/service/standard/mod.rs +++ b/crates/node/service/src/service/standard/mod.rs @@ -6,8 +6,5 @@ mod node; pub use node::RollupNode; -mod error; -pub use error::RollupNodeError; - mod builder; pub use builder::RollupNodeBuilder; diff --git a/crates/node/service/src/service/standard/node.rs b/crates/node/service/src/service/standard/node.rs index b7f3c72d71..8f8565f2c8 100644 --- a/crates/node/service/src/service/standard/node.rs +++ b/crates/node/service/src/service/standard/node.rs @@ -1,8 +1,8 @@ //! Contains the [`RollupNode`] implementation. use crate::{ DerivationActor, DerivationBuilder, EngineActor, EngineBuilder, InteropMode, L1WatcherRpc, - L1WatcherRpcState, NetworkActor, NodeMode, RollupNodeBuilder, RollupNodeError, - RollupNodeService, RpcActor, RuntimeActor, SupervisorActor, SupervisorRpcServerExt, + L1WatcherRpcState, NetworkActor, NodeMode, RollupNodeBuilder, RollupNodeService, RpcActor, + RuntimeActor, SupervisorActor, SupervisorRpcServerExt, actors::{RuntimeState, SequencerActor, SequencerBuilder}, }; use alloy_provider::RootProvider; @@ -16,7 +16,7 @@ use kona_p2p::{Config, NetworkBuilder}; use kona_providers_alloy::{ AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlinePipeline, }; -use kona_rpc::{NetworkRpc, RpcBuilder, SupervisorRpcConfig, SupervisorRpcServer}; +use kona_rpc::{RpcBuilder, SupervisorRpcConfig, SupervisorRpcServer}; /// The standard implementation of the [RollupNode] service, using the governance approved OP Stack /// configuration of components. @@ -37,7 +37,7 @@ pub struct RollupNode { /// The [`EngineBuilder`] for the node. pub(crate) engine_builder: EngineBuilder, /// The [`RpcBuilder`] for the node. - pub(crate) rpc_builder: RpcBuilder, + pub(crate) rpc_builder: Option, /// The P2P [`Config`] for the node. pub(crate) p2p_config: Config, /// The [`RuntimeState`] for the runtime loading service. @@ -55,8 +55,6 @@ impl RollupNode { #[async_trait] impl RollupNodeService for RollupNode { - type Error = RollupNodeError; - type DataAvailabilityWatcher = L1WatcherRpc; type AttributesBuilder = StatefulAttributesBuilder; @@ -107,10 +105,6 @@ impl RollupNodeService for RollupNode { self.engine_builder.clone() } - fn rpc_builder(&self) -> RpcBuilder { - self.rpc_builder.clone() - } - fn sequencer_builder(&self) -> SequencerBuilder { SequencerBuilder { cfg: self.config.clone(), @@ -119,11 +113,12 @@ impl RollupNodeService for RollupNode { } } - fn network_builder(&self) -> (NetworkBuilder, NetworkRpc) { - let (tx, rx) = tokio::sync::mpsc::channel(1024); - let p2p_module = NetworkRpc::new(tx); - let builder = NetworkBuilder::from(self.p2p_config.clone()).with_rpc_receiver(rx); - (builder, p2p_module) + fn rpc_builder(&self) -> Option { + self.rpc_builder.clone() + } + + fn network_builder(&self) -> NetworkBuilder { + NetworkBuilder::from(self.p2p_config.clone()) } fn derivation_builder(&self) -> DerivationBuilder { diff --git a/examples/gossip/src/main.rs b/examples/gossip/src/main.rs index d4d4f81315..eb4c7f5501 100644 --- a/examples/gossip/src/main.rs +++ b/examples/gossip/src/main.rs @@ -107,7 +107,7 @@ impl GossipCommand { .build()?; let mut recv = network.unsafe_block_recv(); - network.start().await?; + network.start(None).await?; tracing::info!("Gossip driver started, receiving blocks."); loop { match recv.recv().await {