Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ futures.workspace = true
metrics.workspace = true
tracing.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
serde_json = { workspace = true, features = ["std"] }
jsonrpsee = { workspace = true, features = ["server"] }
clap = { workspace = true, features = ["derive", "env"] }
Expand Down
62 changes: 31 additions & 31 deletions bin/node/src/commands/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ use crate::flags::{GlobalArgs, P2PArgs, RpcArgs};
use clap::Parser;
use futures::future::OptionFuture;
use jsonrpsee::{RpcModule, server::Server};
use kona_p2p::{NetworkBuilder, P2pRpcRequest};
use kona_node_service::{
NetworkActor, NetworkBuilder, NetworkContext, NetworkInboundData, NodeActor,
};
use kona_p2p::P2pRpcRequest;
use kona_rpc::{NetworkRpc, OpP2PApiServer, RpcBuilder};
use tracing::{debug, info, warn};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use url::Url;

/// The `net` Subcommand
Expand Down Expand Up @@ -54,21 +58,6 @@ impl NetCommand {

let rpc_config = Option::<RpcBuilder>::from(self.rpc);

let (handle, tx, rx) = if let Some(config) = rpc_config {
info!(target: "net", socket = ?config.socket, "Starting RPC server");
let (tx, rx) = tokio::sync::mpsc::channel(1024);

// Setup the RPC server with the P2P RPC Module
let mut launcher = RpcModule::new(());
launcher.merge(NetworkRpc::new(tx.clone()).into_rpc())?;

let server = Server::builder().build(config.socket).await?;
(Some(server.start(launcher)), Some(tx), Some(rx))
} else {
info!(target: "net", "RPC server disabled");
(None, None, None)
};

// Get the rollup config from the args
let rollup_config = args
.rollup_config()
Expand All @@ -77,29 +66,40 @@ impl NetCommand {
// Start the Network Stack
self.p2p.check_ports()?;
let p2p_config = self.p2p.config(&rollup_config, args, self.l1_eth_rpc).await?;
let mut network = NetworkBuilder::from(p2p_config).build()?;
let mut recv = network.unsafe_block_recv();
network.start(rx).await?;

let (NetworkInboundData { rpc, .. }, network) =
NetworkActor::new(NetworkBuilder::from(p2p_config));

let (blocks, mut blocks_rx) = tokio::sync::mpsc::channel(1024);
network.start(NetworkContext { blocks, cancellation: CancellationToken::new() }).await?;

info!(target: "net", "Network started, receiving blocks.");

// On an interval, use the rpc tx to request stats about the p2p network.
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2));

let handle = if let Some(config) = rpc_config {
info!(target: "net", socket = ?config.socket, "Starting RPC server");

// Setup the RPC server with the P2P RPC Module
let mut launcher = RpcModule::new(());
launcher.merge(NetworkRpc::new(rpc.clone()).into_rpc())?;

let server = Server::builder().build(config.socket).await?;
Some(server.start(launcher))
} else {
info!(target: "net", "RPC server disabled");
None
};

loop {
tokio::select! {
payload = recv.recv() => {
match payload {
Ok(payload) => info!(target: "net", "Received unsafe payload: {:?}", payload.payload.block_hash()),
Err(e) => debug!(target: "net", "Failed to receive unsafe payload: {:?}", e),
}
Some(payload) = blocks_rx.recv() => {
info!(target: "net", "Received unsafe payload: {:?}", payload.payload.block_hash());
}
_ = interval.tick(), if tx.is_some() => {
let Some(ref sender) = tx else {
unreachable!("tx must be some (see above)");
};

_ = interval.tick(), if !rpc.is_closed() => {
let (otx, mut orx) = tokio::sync::oneshot::channel();
if let Err(e) = sender.send(P2pRpcRequest::PeerCount(otx)).await {
if let Err(e) = rpc.send(P2pRpcRequest::PeerCount(otx)).await {
warn!(target: "net", "Failed to send network rpc request: {:?}", e);
continue;
}
Expand Down
9 changes: 5 additions & 4 deletions bin/node/src/flags/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use anyhow::Result;
use clap::Parser;
use discv5::{Enr, enr::k256};
use kona_genesis::RollupConfig;
use kona_p2p::{Config, GaterConfig, LocalNode};
use kona_node_service::NetworkConfig;
use kona_p2p::{GaterConfig, LocalNode};
use kona_peers::{PeerMonitoring, PeerScoreLevel};
use kona_sources::RuntimeLoader;
use libp2p::identity::Keypair;
Expand Down Expand Up @@ -336,7 +337,7 @@ impl P2PArgs {
})
}

/// Constructs kona's P2P network [`Config`] from CLI arguments.
/// Constructs kona's P2P network [`NetworkConfig`] from CLI arguments.
///
/// ## Parameters
///
Expand All @@ -348,7 +349,7 @@ impl P2PArgs {
config: &RollupConfig,
args: &GlobalArgs,
l1_rpc: Option<Url>,
) -> anyhow::Result<Config> {
) -> anyhow::Result<NetworkConfig> {
// Note: the advertised address is contained in the ENR for external peers from the
// discovery layer to use.

Expand Down Expand Up @@ -406,7 +407,7 @@ impl P2PArgs {
.transpose()?
.map(|s| s.with_chain_id(Some(args.l2_chain_id)));

Ok(Config {
Ok(NetworkConfig {
discovery_config,
discovery_interval: Duration::from_secs(self.discovery_interval),
discovery_address,
Expand Down
3 changes: 0 additions & 3 deletions crates/node/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ kona-genesis.workspace = true
# Alloy
alloy-rlp.workspace = true
alloy-eips.workspace = true
alloy-signer.workspace = true
alloy-consensus.workspace = true
alloy-signer-local.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-primitives = { workspace = true, features = ["k256", "getrandom"] }

Expand All @@ -51,7 +49,6 @@ tracing.workspace = true
thiserror.workspace = true
serde_repr.workspace = true
lazy_static.workspace = true
ethereum_ssz.workspace = true
rand = { workspace = true, features = ["thread_rng"] }
backon = { workspace = true, features = ["std", "tokio", "tokio-sleep"] }
derive_more = { workspace = true, features = ["display", "deref", "debug"] }
Expand Down
46 changes: 0 additions & 46 deletions crates/node/p2p/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,6 @@ A p2p library for the OP Stack.

Contains a gossipsub driver to run discv5 peer discovery and block gossip.

### Example

> **Warning**
>
> Notice, the socket address uses `0.0.0.0`.
> If you are experiencing issues connecting to peers for discovery,
> check to make sure you are not using the loopback address,
> `127.0.0.1` aka "localhost", which can prevent outward facing connections.

```rust,no_run
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use alloy_primitives::address;
use kona_genesis::RollupConfig;
use kona_p2p::{LocalNode, Network, Config};
use libp2p::Multiaddr;
use discv5::enr::CombinedKey;

#[tokio::main]
async fn main() {
// Construct the Network
let signer = address!("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
let gossip = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9099);
let mut gossip_addr = Multiaddr::from(gossip.ip());
gossip_addr.push(libp2p::multiaddr::Protocol::Tcp(gossip.port()));
let advertise_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED);

let CombinedKey::Secp256k1(k256_key) = CombinedKey::generate_secp256k1() else {
unreachable!()
};
let disc = LocalNode::new(k256_key, advertise_ip, 9097, 9098);
let network = Network::builder(Config::new(
RollupConfig::default(),
disc,
gossip_addr,
signer
)).build().expect("Failed to builder network driver");

// Starting the network spawns gossip and discovery service
// handling in a new thread so this is a non-blocking,
// synchronous operation that does not need to be awaited.
// If running an RPC server, you'd pass a channel to handle RPC requests as an input to the `start` method
network.start(None).await.expect("Failed to start network driver");
}
```

[!WARNING]: ###example

### Technical note:

Expand Down
66 changes: 62 additions & 4 deletions crates/node/p2p/src/gossip/driver.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Consensus-layer gossipsub driver for Optimism.

use alloy_primitives::Address;
use alloy_primitives::{Address, hex};
use derive_more::Debug;
use discv5::Enr;
use futures::stream::StreamExt;
use futures::{AsyncReadExt, AsyncWriteExt, stream::StreamExt};
use kona_genesis::RollupConfig;
use kona_peers::{EnrValidation, PeerMonitoring, enr_to_multiaddr};
use libp2p::{
Expand Down Expand Up @@ -127,9 +127,67 @@
Ok(Some(id))
}

/// Tells the swarm to listen on the given [`Multiaddr`].
/// Handles the sync request/response protocol.
///
/// This is a mock handler that supports the `payload_by_number` protocol.
/// It always returns: not found (1), version (0). `<https://specs.optimism.io/protocol/rollup-node-p2p.html#payload_by_number>`
///
/// ## Note
///
/// This is used to ensure op-nodes are not penalizing kona-nodes for not supporting it.
/// This feature is being deprecated by the op-node team. Once it is fully removed from the
/// op-node's implementation we will remove this handler.
pub(super) fn sync_protocol_handler(&mut self) {
let Some(mut sync_protocol) = self.sync_protocol.take() else {
return;

Check warning on line 142 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L140-L142

Added lines #L140 - L142 were not covered by tests
};

// Spawn a new task to handle the sync request/response protocol.
tokio::spawn(async move {

Check warning on line 146 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L146

Added line #L146 was not covered by tests
loop {
let Some((peer_id, mut inbound_stream)) = sync_protocol.next().await else {
warn!(target: "gossip", "The sync protocol stream has ended");
return;

Check warning on line 150 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L148-L150

Added lines #L148 - L150 were not covered by tests
};

info!(target: "gossip", "Received a sync request from {peer_id}, spawning a new task to handle it");

Check warning on line 153 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L153

Added line #L153 was not covered by tests

tokio::spawn(async move {
let mut buffer = Vec::new();
let Ok(bytes_received) = inbound_stream.read_to_end(&mut buffer).await else {
error!(target: "gossip", "Failed to read the sync request from {peer_id}");
return;

Check warning on line 159 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L155-L159

Added lines #L155 - L159 were not covered by tests
};

debug!(target: "gossip", bytes_received = bytes_received, peer_id = ?peer_id, payload = ?buffer, "Received inbound sync request");

Check warning on line 162 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L162

Added line #L162 was not covered by tests

// We return: not found (1), version (0). `<https://specs.optimism.io/protocol/rollup-node-p2p.html#payload_by_number>`
// Response format: <response> = <res><version><payload>
// No payload is returned.
const OUTPUT: [u8; 2] = hex!("0100");

// We only write that we're not supporting the sync request.
if let Err(e) = inbound_stream.write_all(&OUTPUT).await {
error!(target: "gossip", err = ?e, "Failed to write the sync response to {peer_id}");
return;
};

debug!(target: "gossip", bytes_sent = OUTPUT.len(), peer_id = ?peer_id, "Sent outbound sync response");
});

Check warning on line 176 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L170-L176

Added lines #L170 - L176 were not covered by tests
}
});
}

Check warning on line 179 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L178-L179

Added lines #L178 - L179 were not covered by tests

/// Starts the libp2p Swarm.
///
/// - Starts the sync request/response protocol handler.
/// - Tells the swarm to listen on the given [`Multiaddr`].
///
/// Waits for the swarm to start listen before returning and connecting to peers.
pub async fn listen(&mut self) -> Result<(), TransportError<std::io::Error>> {
pub async fn start(&mut self) -> Result<(), TransportError<std::io::Error>> {
// Start the sync request/response protocol handler.
self.sync_protocol_handler();

Check warning on line 190 in crates/node/p2p/src/gossip/driver.rs

View check run for this annotation

Codecov / codecov/patch

crates/node/p2p/src/gossip/driver.rs#L187-L190

Added lines #L187 - L190 were not covered by tests
match self.swarm.listen_on(self.addr.clone()) {
Ok(id) => loop {
if let SwarmEvent::NewListenAddr { address, listener_id } =
Expand Down
3 changes: 0 additions & 3 deletions crates/node/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ extern crate tracing;
mod metrics;
pub use metrics::Metrics;

mod net;
pub use net::{Broadcast, Config, Network, NetworkBuilder, NetworkBuilderError};

mod rpc;
pub use rpc::{
Connectedness, Direction, GossipScores, P2pRpcRequest, PeerCount, PeerDump, PeerInfo,
Expand Down
Loading