Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions bin/node/src/commands/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
use crate::flags::{GlobalArgs, P2PArgs, RpcArgs};
use clap::Parser;
use futures::future::OptionFuture;
use jsonrpsee::{RpcModule, server::Server};
use kona_p2p::{NetworkBuilder, P2pRpcRequest};
use kona_rpc::{NetworkRpc, OpP2PApiServer, RpcConfig};
use kona_rpc::{NetworkRpc, OpP2PApiServer, RpcBuilder};
use tracing::{debug, info, warn};
use url::Url;

Expand Down Expand Up @@ -51,20 +52,22 @@ impl NetCommand {
let signer = args.genesis_signer()?;
info!(target: "net", "Genesis block signer: {:?}", signer);

// Setup the RPC server with the P2P RPC Module
let (tx, rx) = tokio::sync::mpsc::channel(1024);
let p2p_module = NetworkRpc::new(tx.clone()).into_rpc();
let rpc_config = RpcConfig::from(self.rpc);
let rpc_config = RpcBuilder::from(self.rpc);

if rpc_config.disabled {
let (handle, tx, rx) = if rpc_config.disabled {
info!(target: "net", "RPC server disabled");
(None, None, None)
} else {
info!(target: "net", socket = ?rpc_config.socket, "Starting RPC server");
}
let (tx, rx) = tokio::sync::mpsc::channel(1024);

// Setup the RPC server with the P2P RPC Module
let mut launcher = RpcModule::new(());
launcher.merge(NetworkRpc::new(tx.clone()).into_rpc())?;

let mut launcher = rpc_config.as_launcher();
launcher.merge(p2p_module)?;
let handle = launcher.launch().await?;
let server = Server::builder().build(rpc_config.socket).await?;
(Some(server.start(launcher)), Some(tx), Some(rx))
};

// Get the rollup config from the args
let rollup_config = args
Expand All @@ -74,9 +77,9 @@ impl NetCommand {
// Start the Network Stack
self.p2p.check_ports()?;
let p2p_config = self.p2p.config(&rollup_config, args, self.l1_eth_rpc).await?;
let mut network = NetworkBuilder::from(p2p_config).with_rpc_receiver(rx).build()?;
let mut network = NetworkBuilder::from(p2p_config).build()?;
let mut recv = network.unsafe_block_recv();
network.start().await?;
network.start(rx).await?;
info!(target: "net", "Network started, receiving blocks.");

// On an interval, use the rpc tx to request stats about the p2p network.
Expand All @@ -90,9 +93,13 @@ impl NetCommand {
Err(e) => debug!(target: "net", "Failed to receive unsafe payload: {:?}", e),
}
}
_ = interval.tick() => {
_ = interval.tick(), if tx.is_some() => {
let Some(ref sender) = tx else {
unreachable!("tx must be some (see above)");
};

let (otx, mut orx) = tokio::sync::oneshot::channel();
if let Err(e) = tx.send(P2pRpcRequest::PeerCount(otx)).await {
if let Err(e) = sender.send(P2pRpcRequest::PeerCount(otx)).await {
warn!(target: "net", "Failed to send network rpc request: {:?}", e);
continue;
}
Expand Down
5 changes: 3 additions & 2 deletions bin/node/src/commands/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ impl NodeCommand {
.with_supervisor_rpc_config(supervisor_rpc_config.unwrap_or_default())
.build()
.start()
.await
.map_err(Into::into)
.await;

Ok(())
}

/// Get the L2 rollup config, either from a file or the superchain registry.
Expand Down
4 changes: 2 additions & 2 deletions bin/node/src/flags/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Flags for configuring the RPC server.

use clap::Parser;
use kona_rpc::RpcConfig;
use kona_rpc::RpcBuilder;
use std::{
net::{IpAddr, SocketAddr},
path::PathBuf,
Expand Down Expand Up @@ -44,7 +44,7 @@ impl Default for RpcArgs {
}
}

impl From<RpcArgs> for RpcConfig {
impl From<RpcArgs> for RpcBuilder {
fn from(args: RpcArgs) -> Self {
Self {
disabled: args.rpc_disabled,
Expand Down
3 changes: 2 additions & 1 deletion crates/node/p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async fn main() {
// Starting the network spawns gossip and discovery service
// handling in a new thread so this is a non-blocking,
// synchronous operation that does not need to be awaited.
network.start().await.expect("Failed to start network driver");
// If running an RPC server, you'd pass a channel to handle RPC requests as an input to the `start` method
network.start(None).await.expect("Failed to start network driver");
}
```

Expand Down
16 changes: 2 additions & 14 deletions crates/node/p2p/src/net/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use crate::{
Broadcast, Config, Discv5Builder, GossipDriverBuilder, Network, NetworkBuilderError,
P2pRpcRequest, discv5::LocalNode, gossip::GaterConfig,
discv5::LocalNode, gossip::GaterConfig,
};

/// Constructs a [`Network`] for the OP Stack Consensus Layer.
Expand All @@ -22,8 +22,6 @@
discovery: Discv5Builder,
/// The gossip driver.
gossip: GossipDriverBuilder,
/// A receiver for network RPC requests.
rpc_recv: Option<tokio::sync::mpsc::Receiver<P2pRpcRequest>>,
/// A broadcast sender for the unsafe block payloads.
payload_tx: Option<BroadcastSender<OpExecutionPayloadEnvelope>>,
/// A local signer for payloads.
Expand Down Expand Up @@ -75,7 +73,6 @@
gossip_addr,
keypair,
),
rpc_recv: None,
payload_tx: None,
local_signer: None,
}
Expand Down Expand Up @@ -139,11 +136,6 @@
Self { gossip: self.gossip.with_config(config), ..self }
}

/// Sets the rpc receiver for the [`crate::Network`].
pub fn with_rpc_receiver(self, rpc_recv: tokio::sync::mpsc::Receiver<P2pRpcRequest>) -> Self {
Self { rpc_recv: Some(rpc_recv), ..self }
}

/// Sets the [`Discv5Config`] for the [`crate::Discv5Driver`].
pub fn with_discovery_config(self, config: Discv5Config) -> Self {
Self { discovery: self.discovery.with_discovery_config(config), ..self }
Expand All @@ -168,18 +160,16 @@
}

/// Builds the [`Network`].
pub fn build(mut self) -> Result<Network, NetworkBuilderError> {
pub fn build(self) -> Result<Network, NetworkBuilderError> {

Check warning on line 163 in crates/node/p2p/src/net/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/net/builder.rs#L163

Added line #L163 was not covered by tests
let (gossip, unsafe_block_signer_sender) = self.gossip.build()?;
let discovery = self.discovery.build()?;
let rpc = self.rpc_recv.take();
let payload_tx = self.payload_tx.unwrap_or(tokio::sync::broadcast::channel(256).0);
let (_, publish_rx) = tokio::sync::mpsc::channel(256);

Ok(Network {
gossip,
discovery,
unsafe_block_signer_sender,
rpc,
broadcast: Broadcast::new(payload_tx),
publish_rx,
local_signer: self.local_signer,
Expand Down Expand Up @@ -250,7 +240,6 @@
rollup_config: RollupConfig { l2_chain_id: 10, ..Default::default() },
signer,
})
.with_rpc_receiver(tokio::sync::mpsc::channel(1).1)
.with_gossip_address(gossip_addr.clone())
.with_discovery_address(disc_enr)
.with_discovery_config(ConfigBuilder::new(disc_listen.into()).build())
Expand Down Expand Up @@ -293,7 +282,6 @@
.with_gossip_address(gossip_addr)
.with_discovery_address(disc)
.with_discovery_config(discovery_config)
.with_rpc_receiver(tokio::sync::mpsc::channel(1).1)
.build()
.unwrap();

Expand Down
17 changes: 8 additions & 9 deletions crates/node/p2p/src/net/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use alloy_primitives::{Address, hex};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt, future::OptionFuture};
use libp2p::TransportError;
use libp2p_stream::IncomingStreams;
use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelope, OpNetworkPayloadEnvelope};
Expand All @@ -30,11 +30,6 @@
pub(crate) broadcast: Broadcast,
/// Channel to send unsafe signer updates.
pub(crate) unsafe_block_signer_sender: Sender<Address>,
/// Handler for RPC Requests.
///
/// This is allowed to be optional since it may not be desirable
/// run a networking stack with RPC access.
pub(crate) rpc: Option<tokio::sync::mpsc::Receiver<P2pRpcRequest>>,
/// A channel to receive unsafe blocks and send them through the gossip layer.
pub(crate) publish_rx: tokio::sync::mpsc::Receiver<OpExecutionPayloadEnvelope>,
/// The swarm instance.
Expand Down Expand Up @@ -110,8 +105,10 @@

/// Starts the Discv5 peer discovery & libp2p services
/// and continually listens for new peers and messages to handle
pub async fn start(mut self) -> Result<(), TransportError<std::io::Error>> {
let mut rpc = self.rpc.unwrap_or_else(|| tokio::sync::mpsc::channel(1024).1);
pub async fn start(
mut self,
mut rpc: Option<tokio::sync::mpsc::Receiver<P2pRpcRequest>>,
) -> Result<(), TransportError<std::io::Error>> {

Check warning on line 111 in crates/node/p2p/src/net/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/net/driver.rs#L108-L111

Added lines #L108 - L111 were not covered by tests
let (handler, mut enr_receiver) = self.discovery.start();
let mut broadcast = self.broadcast;

Expand Down Expand Up @@ -235,7 +232,9 @@
warn!(err = ?send_err, "Impossible to send a request to the discovery handler. The channel connection is dropped.");
}
},
req = rpc.recv() => {
// Note that we're using `if rpc.is_some()` to ensure that the branch does not always immediately resolve
// to avoid CPU contention.
Comment thread
theochap marked this conversation as resolved.
Some(req) = OptionFuture::from(rpc.as_mut().map(|r| r.recv())), if rpc.is_some() => {

Check warning on line 237 in crates/node/p2p/src/net/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/net/driver.rs#L237

Added line #L237 was not covered by tests
let Some(req) = req else {
error!(target: "node::p2p", "The rpc receiver channel has closed");
return;
Expand Down
30 changes: 18 additions & 12 deletions crates/node/rpc/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
//! Contains the RPC Configuration.

use jsonrpsee::RpcModule;

use crate::RpcBuilder;
use std::{net::SocketAddr, path::PathBuf};

/// The RPC configuration.
#[derive(Debug, Clone)]
pub struct RpcConfig {
pub struct RpcBuilder {
/// Disable the rpc server.
pub disabled: bool,
/// Prevent the rpc server from being restarted.
Expand All @@ -23,15 +20,24 @@
pub ws_enabled: bool,
}

impl RpcConfig {
/// Converts the [`RpcConfig`] into a [`RpcBuilder`].
pub fn as_launcher(self) -> RpcBuilder {
RpcBuilder { config: self, module: RpcModule::new(()) }
impl RpcBuilder {
/// Returns whether WebSocket RPC endpoint is enabled
pub const fn ws_enabled(&self) -> bool {
self.ws_enabled
}

Check warning on line 27 in crates/node/rpc/src/config.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/rpc/src/config.rs#L25-L27

Added lines #L25 - L27 were not covered by tests

/// Returns the socket address of the [`RpcBuilder`].
pub const fn socket(&self) -> SocketAddr {
self.socket
}

Check warning on line 32 in crates/node/rpc/src/config.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/rpc/src/config.rs#L30-L32

Added lines #L30 - L32 were not covered by tests

/// Returns the number of times the RPC server will attempt to restart if it stops.
pub const fn restart_count(&self) -> u32 {
if self.no_restart { 0 } else { 3 }

Check warning on line 36 in crates/node/rpc/src/config.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/rpc/src/config.rs#L35-L36

Added lines #L35 - L36 were not covered by tests
}
}

impl From<RpcConfig> for RpcBuilder {
fn from(config: RpcConfig) -> Self {
config.as_launcher()
/// Sets the given [`SocketAddr`] on the [`RpcBuilder`].
pub fn set_addr(self, addr: SocketAddr) -> Self {
Self { socket: addr, ..self }

Check warning on line 41 in crates/node/rpc/src/config.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/rpc/src/config.rs#L40-L41

Added lines #L40 - L41 were not covered by tests
}
}
Loading