From 3a2c9875de8cebfb42ab20dae2d09ef6c94685d6 Mon Sep 17 00:00:00 2001 From: theochap Date: Tue, 8 Jul 2025 15:22:31 -0400 Subject: [PATCH] feat(node/net): integrate the network driver inside the network actor --- Cargo.lock | 13 +- bin/node/Cargo.toml | 1 + bin/node/src/commands/net.rs | 62 ++--- bin/node/src/flags/p2p.rs | 9 +- crates/node/p2p/Cargo.toml | 3 - crates/node/p2p/README.md | 46 ---- crates/node/p2p/src/gossip/driver.rs | 66 ++++- crates/node/p2p/src/lib.rs | 3 - crates/node/p2p/src/net/broadcast.rs | 85 ------ crates/node/p2p/src/net/driver.rs | 259 ------------------ crates/node/p2p/src/net/mod.rs | 16 -- crates/node/p2p/src/rpc/request.rs | 2 +- crates/node/p2p/tests/gossip_connect.rs | 8 +- crates/node/service/Cargo.toml | 7 + crates/node/service/src/actors/mod.rs | 5 +- crates/node/service/src/actors/network.rs | 173 ------------ .../node/service/src/actors/network/README.md | 54 ++++ .../node/service/src/actors/network/actor.rs | 245 +++++++++++++++++ .../src/actors/network}/builder.rs | 67 ++--- .../src/actors/network}/config.rs | 10 +- .../node/service/src/actors/network/driver.rs | 47 ++++ .../src/actors/network}/error.rs | 2 +- .../service/src/actors/network/handler.rs | 101 +++++++ crates/node/service/src/actors/network/mod.rs | 19 ++ crates/node/service/src/lib.rs | 7 +- crates/node/service/src/service/core.rs | 2 +- .../service/src/service/standard/builder.rs | 13 +- .../node/service/src/service/standard/node.rs | 9 +- examples/gossip/Cargo.toml | 2 + examples/gossip/src/main.rs | 71 +++-- 30 files changed, 679 insertions(+), 728 deletions(-) delete mode 100644 crates/node/p2p/src/net/broadcast.rs delete mode 100644 crates/node/p2p/src/net/driver.rs delete mode 100644 crates/node/p2p/src/net/mod.rs delete mode 100644 crates/node/service/src/actors/network.rs create mode 100644 crates/node/service/src/actors/network/README.md create mode 100644 crates/node/service/src/actors/network/actor.rs rename crates/node/{p2p/src/net => service/src/actors/network}/builder.rs (81%) rename crates/node/{p2p/src/net => service/src/actors/network}/config.rs (92%) create mode 100644 crates/node/service/src/actors/network/driver.rs rename crates/node/{p2p/src/net => service/src/actors/network}/error.rs (88%) create mode 100644 crates/node/service/src/actors/network/handler.rs create mode 100644 crates/node/service/src/actors/network/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 65bfaa74f1..9b3293ec94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2846,10 +2846,12 @@ dependencies = [ "clap", "discv5", "kona-cli", + "kona-node-service", "kona-p2p", "kona-registry", "libp2p", "tokio", + "tokio-util", "tracing", "tracing-subscriber 0.3.19", ] @@ -4621,6 +4623,7 @@ dependencies = [ "tabled", "tokio", "tokio-stream", + "tokio-util", "tracing", "tracing-subscriber 0.3.19", "url", @@ -4638,11 +4641,16 @@ dependencies = [ "alloy-rpc-client", "alloy-rpc-types-engine", "alloy-rpc-types-eth", + "alloy-signer", + "alloy-signer-local", "alloy-transport", "alloy-transport-http", "async-stream", "async-trait", + "backon", "derive_more", + "discv5", + "ethereum_ssz", "futures", "http-body-util", "jsonrpsee", @@ -4652,11 +4660,13 @@ dependencies = [ "kona-interop", "kona-macros", "kona-p2p", + "kona-peers", "kona-protocol", "kona-providers-alloy", "kona-rpc", "kona-sources", "libp2p", + "libp2p-stream", "metrics", "op-alloy-network", "op-alloy-provider", @@ -4680,13 +4690,10 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "alloy-rpc-types-engine", - "alloy-signer", - "alloy-signer-local", "arbitrary", "backon", "derive_more", "discv5", - "ethereum_ssz", "futures", "ipnet", "kona-cli", diff --git a/bin/node/Cargo.toml b/bin/node/Cargo.toml index 8675edce53..bde076c5ef 100644 --- a/bin/node/Cargo.toml +++ b/bin/node/Cargo.toml @@ -53,6 +53,7 @@ futures.workspace = true metrics.workspace = true tracing.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true serde_json = { workspace = true, features = ["std"] } jsonrpsee = { workspace = true, features = ["server"] } clap = { workspace = true, features = ["derive", "env"] } diff --git a/bin/node/src/commands/net.rs b/bin/node/src/commands/net.rs index 91fef892a7..c6d5b849e9 100644 --- a/bin/node/src/commands/net.rs +++ b/bin/node/src/commands/net.rs @@ -4,9 +4,13 @@ use crate::flags::{GlobalArgs, P2PArgs, RpcArgs}; use clap::Parser; use futures::future::OptionFuture; use jsonrpsee::{RpcModule, server::Server}; -use kona_p2p::{NetworkBuilder, P2pRpcRequest}; +use kona_node_service::{ + NetworkActor, NetworkBuilder, NetworkContext, NetworkInboundData, NodeActor, +}; +use kona_p2p::P2pRpcRequest; use kona_rpc::{NetworkRpc, OpP2PApiServer, RpcBuilder}; -use tracing::{debug, info, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; use url::Url; /// The `net` Subcommand @@ -54,21 +58,6 @@ impl NetCommand { let rpc_config = Option::::from(self.rpc); - let (handle, tx, rx) = if let Some(config) = rpc_config { - info!(target: "net", socket = ?config.socket, "Starting RPC server"); - let (tx, rx) = tokio::sync::mpsc::channel(1024); - - // Setup the RPC server with the P2P RPC Module - let mut launcher = RpcModule::new(()); - launcher.merge(NetworkRpc::new(tx.clone()).into_rpc())?; - - let server = Server::builder().build(config.socket).await?; - (Some(server.start(launcher)), Some(tx), Some(rx)) - } else { - info!(target: "net", "RPC server disabled"); - (None, None, None) - }; - // Get the rollup config from the args let rollup_config = args .rollup_config() @@ -77,29 +66,40 @@ impl NetCommand { // Start the Network Stack self.p2p.check_ports()?; let p2p_config = self.p2p.config(&rollup_config, args, self.l1_eth_rpc).await?; - let mut network = NetworkBuilder::from(p2p_config).build()?; - let mut recv = network.unsafe_block_recv(); - network.start(rx).await?; + + let (NetworkInboundData { rpc, .. }, network) = + NetworkActor::new(NetworkBuilder::from(p2p_config)); + + let (blocks, mut blocks_rx) = tokio::sync::mpsc::channel(1024); + network.start(NetworkContext { blocks, cancellation: CancellationToken::new() }).await?; + info!(target: "net", "Network started, receiving blocks."); // On an interval, use the rpc tx to request stats about the p2p network. let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2)); + let handle = if let Some(config) = rpc_config { + info!(target: "net", socket = ?config.socket, "Starting RPC server"); + + // Setup the RPC server with the P2P RPC Module + let mut launcher = RpcModule::new(()); + launcher.merge(NetworkRpc::new(rpc.clone()).into_rpc())?; + + let server = Server::builder().build(config.socket).await?; + Some(server.start(launcher)) + } else { + info!(target: "net", "RPC server disabled"); + None + }; + loop { tokio::select! { - payload = recv.recv() => { - match payload { - Ok(payload) => info!(target: "net", "Received unsafe payload: {:?}", payload.payload.block_hash()), - Err(e) => debug!(target: "net", "Failed to receive unsafe payload: {:?}", e), - } + Some(payload) = blocks_rx.recv() => { + info!(target: "net", "Received unsafe payload: {:?}", payload.payload.block_hash()); } - _ = interval.tick(), if tx.is_some() => { - let Some(ref sender) = tx else { - unreachable!("tx must be some (see above)"); - }; - + _ = interval.tick(), if !rpc.is_closed() => { let (otx, mut orx) = tokio::sync::oneshot::channel(); - if let Err(e) = sender.send(P2pRpcRequest::PeerCount(otx)).await { + if let Err(e) = rpc.send(P2pRpcRequest::PeerCount(otx)).await { warn!(target: "net", "Failed to send network rpc request: {:?}", e); continue; } diff --git a/bin/node/src/flags/p2p.rs b/bin/node/src/flags/p2p.rs index 32098f41db..5440c231ae 100644 --- a/bin/node/src/flags/p2p.rs +++ b/bin/node/src/flags/p2p.rs @@ -12,7 +12,8 @@ use anyhow::Result; use clap::Parser; use discv5::{Enr, enr::k256}; use kona_genesis::RollupConfig; -use kona_p2p::{Config, GaterConfig, LocalNode}; +use kona_node_service::NetworkConfig; +use kona_p2p::{GaterConfig, LocalNode}; use kona_peers::{PeerMonitoring, PeerScoreLevel}; use kona_sources::RuntimeLoader; use libp2p::identity::Keypair; @@ -336,7 +337,7 @@ impl P2PArgs { }) } - /// Constructs kona's P2P network [`Config`] from CLI arguments. + /// Constructs kona's P2P network [`NetworkConfig`] from CLI arguments. /// /// ## Parameters /// @@ -348,7 +349,7 @@ impl P2PArgs { config: &RollupConfig, args: &GlobalArgs, l1_rpc: Option, - ) -> anyhow::Result { + ) -> anyhow::Result { // Note: the advertised address is contained in the ENR for external peers from the // discovery layer to use. @@ -406,7 +407,7 @@ impl P2PArgs { .transpose()? .map(|s| s.with_chain_id(Some(args.l2_chain_id))); - Ok(Config { + Ok(NetworkConfig { discovery_config, discovery_interval: Duration::from_secs(self.discovery_interval), discovery_address, diff --git a/crates/node/p2p/Cargo.toml b/crates/node/p2p/Cargo.toml index 3f0e5ef471..1ec401f7a7 100644 --- a/crates/node/p2p/Cargo.toml +++ b/crates/node/p2p/Cargo.toml @@ -24,9 +24,7 @@ kona-genesis.workspace = true # Alloy alloy-rlp.workspace = true alloy-eips.workspace = true -alloy-signer.workspace = true alloy-consensus.workspace = true -alloy-signer-local.workspace = true alloy-rpc-types-engine.workspace = true alloy-primitives = { workspace = true, features = ["k256", "getrandom"] } @@ -51,7 +49,6 @@ tracing.workspace = true thiserror.workspace = true serde_repr.workspace = true lazy_static.workspace = true -ethereum_ssz.workspace = true rand = { workspace = true, features = ["thread_rng"] } backon = { workspace = true, features = ["std", "tokio", "tokio-sleep"] } derive_more = { workspace = true, features = ["display", "deref", "debug"] } diff --git a/crates/node/p2p/README.md b/crates/node/p2p/README.md index 53881738bb..75a8da3d33 100644 --- a/crates/node/p2p/README.md +++ b/crates/node/p2p/README.md @@ -4,52 +4,6 @@ A p2p library for the OP Stack. Contains a gossipsub driver to run discv5 peer discovery and block gossip. -### Example - -> **Warning** -> -> Notice, the socket address uses `0.0.0.0`. -> If you are experiencing issues connecting to peers for discovery, -> check to make sure you are not using the loopback address, -> `127.0.0.1` aka "localhost", which can prevent outward facing connections. - -```rust,no_run -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use alloy_primitives::address; -use kona_genesis::RollupConfig; -use kona_p2p::{LocalNode, Network, Config}; -use libp2p::Multiaddr; -use discv5::enr::CombinedKey; - -#[tokio::main] -async fn main() { - // Construct the Network - let signer = address!("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); - let gossip = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9099); - let mut gossip_addr = Multiaddr::from(gossip.ip()); - gossip_addr.push(libp2p::multiaddr::Protocol::Tcp(gossip.port())); - let advertise_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - - let CombinedKey::Secp256k1(k256_key) = CombinedKey::generate_secp256k1() else { - unreachable!() - }; - let disc = LocalNode::new(k256_key, advertise_ip, 9097, 9098); - let network = Network::builder(Config::new( - RollupConfig::default(), - disc, - gossip_addr, - signer - )).build().expect("Failed to builder network driver"); - - // Starting the network spawns gossip and discovery service - // handling in a new thread so this is a non-blocking, - // synchronous operation that does not need to be awaited. - // If running an RPC server, you'd pass a channel to handle RPC requests as an input to the `start` method - network.start(None).await.expect("Failed to start network driver"); -} -``` - -[!WARNING]: ###example ### Technical note: diff --git a/crates/node/p2p/src/gossip/driver.rs b/crates/node/p2p/src/gossip/driver.rs index c0ee38fa3b..6c6a50246b 100644 --- a/crates/node/p2p/src/gossip/driver.rs +++ b/crates/node/p2p/src/gossip/driver.rs @@ -1,9 +1,9 @@ //! Consensus-layer gossipsub driver for Optimism. -use alloy_primitives::Address; +use alloy_primitives::{Address, hex}; use derive_more::Debug; use discv5::Enr; -use futures::stream::StreamExt; +use futures::{AsyncReadExt, AsyncWriteExt, stream::StreamExt}; use kona_genesis::RollupConfig; use kona_peers::{EnrValidation, PeerMonitoring, enr_to_multiaddr}; use libp2p::{ @@ -127,9 +127,67 @@ where Ok(Some(id)) } - /// Tells the swarm to listen on the given [`Multiaddr`]. + /// Handles the sync request/response protocol. + /// + /// This is a mock handler that supports the `payload_by_number` protocol. + /// It always returns: not found (1), version (0). `` + /// + /// ## Note + /// + /// This is used to ensure op-nodes are not penalizing kona-nodes for not supporting it. + /// This feature is being deprecated by the op-node team. Once it is fully removed from the + /// op-node's implementation we will remove this handler. + pub(super) fn sync_protocol_handler(&mut self) { + let Some(mut sync_protocol) = self.sync_protocol.take() else { + return; + }; + + // Spawn a new task to handle the sync request/response protocol. + tokio::spawn(async move { + loop { + let Some((peer_id, mut inbound_stream)) = sync_protocol.next().await else { + warn!(target: "gossip", "The sync protocol stream has ended"); + return; + }; + + info!(target: "gossip", "Received a sync request from {peer_id}, spawning a new task to handle it"); + + tokio::spawn(async move { + let mut buffer = Vec::new(); + let Ok(bytes_received) = inbound_stream.read_to_end(&mut buffer).await else { + error!(target: "gossip", "Failed to read the sync request from {peer_id}"); + return; + }; + + debug!(target: "gossip", bytes_received = bytes_received, peer_id = ?peer_id, payload = ?buffer, "Received inbound sync request"); + + // We return: not found (1), version (0). `` + // Response format: = + // No payload is returned. + const OUTPUT: [u8; 2] = hex!("0100"); + + // We only write that we're not supporting the sync request. + if let Err(e) = inbound_stream.write_all(&OUTPUT).await { + error!(target: "gossip", err = ?e, "Failed to write the sync response to {peer_id}"); + return; + }; + + debug!(target: "gossip", bytes_sent = OUTPUT.len(), peer_id = ?peer_id, "Sent outbound sync response"); + }); + } + }); + } + + /// Starts the libp2p Swarm. + /// + /// - Starts the sync request/response protocol handler. + /// - Tells the swarm to listen on the given [`Multiaddr`]. + /// /// Waits for the swarm to start listen before returning and connecting to peers. - pub async fn listen(&mut self) -> Result<(), TransportError> { + pub async fn start(&mut self) -> Result<(), TransportError> { + // Start the sync request/response protocol handler. + self.sync_protocol_handler(); + match self.swarm.listen_on(self.addr.clone()) { Ok(id) => loop { if let SwarmEvent::NewListenAddr { address, listener_id } = diff --git a/crates/node/p2p/src/lib.rs b/crates/node/p2p/src/lib.rs index b123d4d8b1..ef19919f09 100644 --- a/crates/node/p2p/src/lib.rs +++ b/crates/node/p2p/src/lib.rs @@ -13,9 +13,6 @@ extern crate tracing; mod metrics; pub use metrics::Metrics; -mod net; -pub use net::{Broadcast, Config, Network, NetworkBuilder, NetworkBuilderError}; - mod rpc; pub use rpc::{ Connectedness, Direction, GossipScores, P2pRpcRequest, PeerCount, PeerDump, PeerInfo, diff --git a/crates/node/p2p/src/net/broadcast.rs b/crates/node/p2p/src/net/broadcast.rs deleted file mode 100644 index 7f9e4fc26f..0000000000 --- a/crates/node/p2p/src/net/broadcast.rs +++ /dev/null @@ -1,85 +0,0 @@ -//! Broadcast handles broadcasting unsafe blocks in-order. - -use backon::{ExponentialBuilder, Retryable}; -use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; -use std::{ - collections::VecDeque, - sync::{Arc, Mutex}, -}; -use tokio::{ - sync::broadcast::{Receiver as BroadcastReceiver, Sender as BroadcastSender}, - time::Duration, -}; - -/// The `Broadcast` struct is responsible for broadcasting unsafe blocks in order. -#[derive(Debug)] -pub struct Broadcast { - /// An in-memory buffer to store blocks. - buffer: VecDeque, - /// The channel to broadcast blocks. - channel: BroadcastSender, - /// Tracks if broadcasting is being attempted. - broadcasting: Arc>, -} - -impl Broadcast { - /// Creates a new `Broadcast` instance with the given channel. - pub fn new(channel: BroadcastSender) -> Self { - Self { buffer: VecDeque::new(), channel, broadcasting: Arc::new(Mutex::new(false)) } - } - - /// Pushes a new unsafe payload to the buffer. - pub fn push(&mut self, payload: impl Into) { - self.buffer.push_back(payload.into()); - } - - /// Subscribe to the broadcast channel. - pub fn subscribe(&self) -> BroadcastReceiver { - self.channel.subscribe() - } - - /// Broadcasts all unsafe blocks **in order** through the channel. - /// - /// If an error occurs, the function will exit early, maintaining the invariant that unsafe - /// blocks are held in order. - pub fn broadcast(&mut self) { - if self.broadcasting.lock().map_or(true, |v| *v) { - trace!(target: "net", "Broadcasting in flight, skipping"); - return; - } - if let Ok(mut broadcasting) = self.broadcasting.lock() { - *broadcasting = true; - } - let flag = Arc::clone(&self.broadcasting); - let channel = self.channel.clone(); - let mut drained = std::mem::take(&mut self.buffer); - tokio::spawn(async move { - loop { - if drained.is_empty() { - break; - } - let front = drained.front(); - let fut = || async { - if let Some(block) = front { - channel.send(block.clone())?; - } - Ok(()) - }; - - let res = fut.retry(ExponentialBuilder::default()) - .notify(|err: &tokio::sync::broadcast::error::SendError, dur: Duration| { - warn!(target: "net", ?err, "Failed to broadcast block [Duration: {:?}]", dur); - }) - .await; - if let Err(e) = res { - warn!(target: "net", ?e, "Block broadcasting failed with exponential backoff"); - break; - } - drained.pop_front(); - } - if let Ok(mut broadcasting) = flag.lock() { - *broadcasting = false; - } - }); - } -} diff --git a/crates/node/p2p/src/net/driver.rs b/crates/node/p2p/src/net/driver.rs deleted file mode 100644 index 171a2c0b31..0000000000 --- a/crates/node/p2p/src/net/driver.rs +++ /dev/null @@ -1,259 +0,0 @@ -//! Driver for network services. - -use alloy_primitives::{Address, hex}; -use alloy_signer::SignerSync; -use alloy_signer_local::PrivateKeySigner; -use futures::{AsyncReadExt, AsyncWriteExt, StreamExt, future::OptionFuture}; -use libp2p::TransportError; -use libp2p_stream::IncomingStreams; -use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelope, OpNetworkPayloadEnvelope}; -use std::collections::HashSet; -use tokio::{ - select, - sync::{broadcast::Receiver as BroadcastReceiver, watch::Sender}, - time::Duration, -}; - -use crate::{ - Broadcast, Config, Discv5Driver, GossipDriver, HandlerRequest, NetworkBuilder, P2pRpcRequest, -}; - -/// Network -/// -/// Contains the logic to run Optimism's consensus-layer networking stack. -/// There are two core services that are run by the driver: -/// - Block gossip through Gossipsub. -/// - Peer discovery with `discv5`. -#[derive(Debug)] -pub struct Network { - /// The broadcast handler to broadcast unsafe payloads. - pub(crate) broadcast: Broadcast, - /// Channel to send unsafe signer updates. - pub(crate) unsafe_block_signer_sender: Sender
, - /// A channel to receive unsafe blocks and send them through the gossip layer. - pub(crate) publish_rx: tokio::sync::mpsc::Receiver, - /// The swarm instance. - pub gossip: GossipDriver, - /// The discovery service driver. - pub discovery: Discv5Driver, - /// The local signer for unsigned payloads. - pub local_signer: Option, -} - -impl Network { - /// The frequency at which to inspect peer scores to ban poorly performing peers. - const PEER_SCORE_INSPECT_FREQUENCY: Duration = Duration::from_secs(1); - - /// Returns the [`NetworkBuilder`] that can be used to construct the [`Network`]. - pub fn builder(config: Config) -> NetworkBuilder { - NetworkBuilder::from(config) - } - - /// Take the unsafe block receiver. - pub fn unsafe_block_recv(&mut self) -> BroadcastReceiver { - self.broadcast.subscribe() - } - - /// Returns a clone of the unsafe block signer sender. - pub fn unsafe_block_signer_sender(&mut self) -> Sender
{ - self.unsafe_block_signer_sender.clone() - } - - /// Handles the sync request/response protocol. - /// - /// This is a mock handler that supports the `payload_by_number` protocol. - /// It always returns: not found (1), version (0). `` - /// - /// ## Note - /// - /// This is used to ensure op-nodes are not penalizing kona-nodes for not supporting it. - /// This feature is being deprecated by the op-node team. Once it is fully removed from the - /// op-node's implementation we will remove this handler. - async fn sync_protocol_handler(mut sync_protocol: IncomingStreams) { - loop { - let Some((peer_id, mut inbound_stream)) = sync_protocol.next().await else { - warn!(target: "node::p2p::sync", "The sync protocol stream has ended"); - return; - }; - - info!(target: "node::p2p::sync", "Received a sync request from {peer_id}, spawning a new task to handle it"); - - tokio::spawn(async move { - let mut buffer = Vec::new(); - let Ok(bytes_received) = inbound_stream.read_to_end(&mut buffer).await else { - error!(target: "node::p2p::sync", "Failed to read the sync request from {peer_id}"); - return; - }; - - debug!(target: "node::p2p::sync", bytes_received = bytes_received, peer_id = ?peer_id, payload = ?buffer, "Received inbound sync request"); - - // We return: not found (1), version (0). `` - // Response format: = - // No payload is returned. - const OUTPUT: [u8; 2] = hex!("0100"); - - // We only write that we're not supporting the sync request. - if let Err(e) = inbound_stream.write_all(&OUTPUT).await { - error!(target: "node::p2p::sync", err = ?e, "Failed to write the sync response to {peer_id}"); - return; - }; - - debug!(target: "node::p2p::sync", bytes_sent = OUTPUT.len(), peer_id = ?peer_id, "Sent outbound sync response"); - }); - } - } - - /// Starts the Discv5 peer discovery & libp2p services - /// and continually listens for new peers and messages to handle - pub async fn start( - mut self, - mut rpc: Option>, - ) -> Result<(), TransportError> { - let (handler, mut enr_receiver) = self.discovery.start(); - let mut broadcast = self.broadcast; - - // We are checking the peer scores every [`Self::PEER_SCORE_INSPECT_FREQUENCY`] seconds. - let mut peer_score_inspector = tokio::time::interval(Self::PEER_SCORE_INSPECT_FREQUENCY); - - // Start the libp2p Swarm - self.gossip.listen().await?; - - // Start the sync request/response protocol handler. - if let Some(sync_protocol) = self.gossip.sync_protocol.take() { - tokio::spawn(Self::sync_protocol_handler(sync_protocol)); - } - - // Spawn the network handler - tokio::spawn(async move { - loop { - select! { - Some(block) = self.publish_rx.recv(), if !self.publish_rx.is_closed() => { - let timestamp = block.payload.timestamp(); - let selector = |handler: &crate::BlockHandler| { - handler.topic(timestamp) - }; - let Some(signer) = self.local_signer.as_ref() else { - warn!(target: "net", "No local signer available to sign the payload"); - continue; - }; - use ssz::Encode; - let ssz_bytes = block.as_ssz_bytes(); - let Ok(signature) = signer.sign_message_sync(&ssz_bytes) else { - warn!(target: "net", "Failed to sign the payload bytes"); - continue; - }; - let payload_hash = block.payload_hash(); - let payload = OpNetworkPayloadEnvelope { - payload: block.payload, - signature, - payload_hash, - parent_beacon_block_root: block.parent_beacon_block_root, - }; - match self.gossip.publish(selector, Some(payload)) { - Ok(id) => info!("Published unsafe payload | {:?}", id), - Err(e) => warn!("Failed to publish unsafe payload: {:?}", e), - } - } - event = self.gossip.next() => { - let Some(event) = event else { - error!(target: "node::p2p", "The gossip swarm stream has ended"); - return; - }; - - if let Some(payload) = self.gossip.handle_event(event) { - broadcast.push(payload); - broadcast.broadcast(); - } - }, - enr = enr_receiver.recv() => { - let Some(enr) = enr else { - error!(target: "node::p2p", "The enr receiver channel has closed"); - return; - }; - self.gossip.dial(enr); - }, - - _ = peer_score_inspector.tick(), if self.gossip.peer_monitoring.as_ref().is_some() => { - // Inspect peer scores and ban peers that are below the threshold. - let Some(ban_peers) = self.gossip.peer_monitoring.as_ref() else { - continue; - }; - - // We iterate over all connected peers and check their scores. - // We collect a list of peers to remove - let peers_to_remove = self.gossip.swarm.connected_peers().filter_map( - |peer_id| { - // If the score is not available, we use a default value of 0. - let score = self.gossip.swarm.behaviour().gossipsub.peer_score(peer_id).unwrap_or_default(); - - // Record the peer score in the metrics. - kona_macros::record!(histogram, crate::Metrics::PEER_SCORES, "peer", peer_id.to_string(), score); - - if score < ban_peers.ban_threshold { - return Some(*peer_id); - } - - None - } - ).collect::>(); - - // We remove the addresses from the gossip layer. - let addrs_to_ban = peers_to_remove.into_iter().filter_map(|peer_to_remove| { - // In that case, we ban the peer. This means... - // 1. We remove the peer from the network gossip. - // 2. We ban the peer from the discv5 service. - if self.gossip.swarm.disconnect_peer_id(peer_to_remove).is_err() { - warn!(peer = ?peer_to_remove, "Trying to disconnect a non-existing peer from the gossip driver."); - } - - // Record the duration of the peer connection. - if let Some(start_time) = self.gossip.peer_connection_start.remove(&peer_to_remove) { - let peer_duration = start_time.elapsed(); - kona_macros::record!( - histogram, - crate::Metrics::GOSSIP_PEER_CONNECTION_DURATION_SECONDS, - peer_duration.as_secs_f64() - ); - } - - if let Some(info) = self.gossip.peerstore.remove(&peer_to_remove){ - use crate::ConnectionGate; - self.gossip.connection_gate.remove_dial(&peer_to_remove); - let score = self.gossip.swarm.behaviour().gossipsub.peer_score(&peer_to_remove).unwrap_or_default(); - kona_macros::inc!(gauge, crate::Metrics::BANNED_PEERS, "peer_id" => peer_to_remove.to_string(), "score" => score.to_string()); - return Some(info.listen_addrs); - } - - None - }).collect::>().into_iter().flatten().collect::>(); - - // We send a request to the discovery handler to ban the set of addresses. - if let Err(send_err) = handler.sender.send(HandlerRequest::BanAddrs { addrs_to_ban: addrs_to_ban.into(), ban_duration: ban_peers.ban_duration }).await{ - warn!(err = ?send_err, "Impossible to send a request to the discovery handler. The channel connection is dropped."); - } - }, - // Note that we're using `if rpc.is_some()` to ensure that the branch does not always immediately resolve - // to avoid CPU contention. - Some(req) = OptionFuture::from(rpc.as_mut().map(|r| r.recv())), if rpc.is_some() => { - let Some(req) = req else { - error!(target: "node::p2p", "The rpc receiver channel has closed"); - return; - }; - let payload = match req { - P2pRpcRequest::PostUnsafePayload { payload } => payload, - req => { - req.handle(&mut self.gossip, &handler); - continue; - } - }; - debug!(target: "node::p2p", "Broadcasting unsafe payload from admin api"); - broadcast.push(payload); - broadcast.broadcast(); - }, - } - } - }); - - Ok(()) - } -} diff --git a/crates/node/p2p/src/net/mod.rs b/crates/node/p2p/src/net/mod.rs deleted file mode 100644 index 33c7b51f77..0000000000 --- a/crates/node/p2p/src/net/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! Network driver module. - -mod broadcast; -pub use broadcast::Broadcast; - -mod error; -pub use error::NetworkBuilderError; - -mod config; -pub use config::Config; - -mod builder; -pub use builder::NetworkBuilder; - -mod driver; -pub use driver::Network; diff --git a/crates/node/p2p/src/rpc/request.rs b/crates/node/p2p/src/rpc/request.rs index 256157dfec..d093271a43 100644 --- a/crates/node/p2p/src/rpc/request.rs +++ b/crates/node/p2p/src/rpc/request.rs @@ -33,7 +33,7 @@ pub enum P2pRpcRequest { /// The payload to post. payload: OpExecutionPayloadEnvelope, }, - /// Returns [`PeerInfo`] for the [`crate::Network`]. + /// Returns [`PeerInfo`] for the p2p network. PeerInfo(Sender), /// Dumps the node's discovery table from the [`crate::Discv5Driver`]. DiscoveryTable(Sender>), diff --git a/crates/node/p2p/tests/gossip_connect.rs b/crates/node/p2p/tests/gossip_connect.rs index 1b35975762..5429c28a64 100644 --- a/crates/node/p2p/tests/gossip_connect.rs +++ b/crates/node/p2p/tests/gossip_connect.rs @@ -5,10 +5,10 @@ mod common; #[tokio::test] async fn test_unknown_peer_connect_fails() { let mut driver = common::gossip_driver(4003); - assert!(driver.listen().await.is_ok()); + assert!(driver.start().await.is_ok()); let mut driver_2 = common::gossip_driver(4004); - assert!(driver_2.listen().await.is_ok()); + assert!(driver_2.start().await.is_ok()); let err = driver.swarm.dial(*driver_2.local_peer_id()).unwrap_err(); assert!(matches!(err, libp2p::swarm::DialError::NoAddresses)); @@ -17,10 +17,10 @@ async fn test_unknown_peer_connect_fails() { #[tokio::test] async fn test_connect_to_peer() { let mut driver = common::gossip_driver(4005); - assert!(driver.listen().await.is_ok()); + assert!(driver.start().await.is_ok()); let mut driver_2 = common::gossip_driver(4006); - assert!(driver_2.listen().await.is_ok()); + assert!(driver_2.start().await.is_ok()); assert!(driver.swarm.dial(driver_2.addr).is_ok()); assert_eq!(driver.connected_peers(), 0); diff --git a/crates/node/service/Cargo.toml b/crates/node/service/Cargo.toml index 23d7a5c1ff..08ae8abced 100644 --- a/crates/node/service/Cargo.toml +++ b/crates/node/service/Cargo.toml @@ -22,9 +22,12 @@ kona-derive.workspace = true kona-protocol.workspace = true kona-providers-alloy.workspace = true kona-rpc.workspace = true +kona-peers.workspace = true kona-macros.workspace = true # alloy +alloy-signer.workspace = true +alloy-signer-local.workspace = true alloy-primitives.workspace = true alloy-rpc-client.workspace = true alloy-rpc-types-eth.workspace = true @@ -42,7 +45,10 @@ op-alloy-provider.workspace = true # general url.workspace = true libp2p.workspace = true +libp2p-stream.workspace = true +discv5.workspace = true futures.workspace = true +ethereum_ssz.workspace = true tracing.workspace = true thiserror.workspace = true tokio-util.workspace = true @@ -50,6 +56,7 @@ async-trait.workspace = true async-stream.workspace = true tokio-stream.workspace = true strum = { workspace = true, features = ["derive"] } +backon.workspace = true derive_more = { workspace = true, features = ["debug"] } jsonrpsee = { workspace = true, features = ["server"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/crates/node/service/src/actors/mod.rs b/crates/node/service/src/actors/mod.rs index f1ad3888e3..2e3f032cf1 100644 --- a/crates/node/service/src/actors/mod.rs +++ b/crates/node/service/src/actors/mod.rs @@ -35,7 +35,10 @@ pub use l1_watcher_rpc::{ }; mod network; -pub use network::{NetworkActor, NetworkActorError, NetworkContext, NetworkInboundData}; +pub use network::{ + NetworkActor, NetworkActorError, NetworkBuilder, NetworkBuilderError, NetworkConfig, + NetworkContext, NetworkDriver, NetworkDriverError, NetworkHandler, NetworkInboundData, +}; mod sequencer; pub use sequencer::{ diff --git a/crates/node/service/src/actors/network.rs b/crates/node/service/src/actors/network.rs deleted file mode 100644 index d29f4f09e6..0000000000 --- a/crates/node/service/src/actors/network.rs +++ /dev/null @@ -1,173 +0,0 @@ -//! Network Actor - -use crate::{NodeActor, actors::CancellableContext}; -use alloy_primitives::Address; -use async_trait::async_trait; -use derive_more::Debug; -use kona_p2p::{NetworkBuilder, NetworkBuilderError, P2pRpcRequest}; -use libp2p::TransportError; -use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; -use thiserror::Error; -use tokio::{select, sync::mpsc}; -use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; - -/// The network actor handles two core networking components of the rollup node: -/// - *discovery*: Peer discovery over UDP using discv5. -/// - *gossip*: Block gossip over TCP using libp2p. -/// -/// The network actor itself is a light wrapper around the [`NetworkBuilder`]. -/// -/// ## Example -/// -/// ```rust,ignore -/// use kona_p2p::NetworkDriver; -/// use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -/// -/// let chain_id = 10; -/// let signer = Address::random(); -/// let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9099); -/// -/// // Construct the `Network` using the builder. -/// // let mut driver = Network::builder() -/// // .with_unsafe_block_signer(signer) -/// // .with_chain_id(chain_id) -/// // .with_gossip_addr(socket) -/// // .build() -/// // .unwrap(); -/// -/// // Construct the `NetworkActor` with the [`Network`]. -/// // let actor = NetworkActor::new(driver); -/// ``` -#[derive(Debug)] -pub struct NetworkActor { - /// Network driver - config: NetworkBuilder, - /// A channel to receive the unsafe block signer address. - signer: mpsc::Receiver
, - /// Handler for RPC Requests. - rpc: tokio::sync::mpsc::Receiver, -} - -/// The inbound data for the network actor. -#[derive(Debug)] -pub struct NetworkInboundData { - /// A channel to send the unsafe block signer address to the network actor. - pub signer: mpsc::Sender
, - /// Handler for RPC Requests sent to the network actor. - pub rpc: mpsc::Sender, -} - -impl NetworkActor { - /// Constructs a new [`NetworkActor`] given the [`NetworkBuilder`] - pub fn new(driver: NetworkBuilder) -> (NetworkInboundData, Self) { - let (signer_tx, signer_rx) = mpsc::channel(16); - let (rpc_tx, rpc_rx) = mpsc::channel(1024); - let actor = Self { config: driver, signer: signer_rx, rpc: rpc_rx }; - let outbound_data = NetworkInboundData { signer: signer_tx, rpc: rpc_tx }; - (outbound_data, actor) - } -} - -/// The communication context used by the network actor. -#[derive(Debug)] -pub struct NetworkContext { - /// The channel used by the sequencer actor for sending unsafe blocks to the network. - pub blocks: mpsc::Sender, - /// Cancels the network actor. - pub cancellation: CancellationToken, -} - -impl CancellableContext for NetworkContext { - fn cancelled(&self) -> WaitForCancellationFuture<'_> { - self.cancellation.cancelled() - } -} - -#[async_trait] -impl NodeActor for NetworkActor { - type Error = NetworkActorError; - type InboundData = NetworkInboundData; - type OutboundData = NetworkContext; - type Builder = NetworkBuilder; - - fn build(state: Self::Builder) -> (Self::InboundData, Self) { - Self::new(state) - } - - async fn start( - mut self, - NetworkContext { blocks, cancellation }: Self::OutboundData, - ) -> Result<(), Self::Error> { - let mut driver = self.config.build()?; - - // Take the unsafe block receiver - let mut unsafe_block_receiver = driver.unsafe_block_recv(); - - // Take the unsafe block signer sender. - let unsafe_block_signer = driver.unsafe_block_signer_sender(); - - // Start the network driver. - driver.start(Some(self.rpc)).await?; - - loop { - select! { - _ = cancellation.cancelled() => { - info!( - target: "network", - "Received shutdown signal. Exiting network task." - ); - return Ok(()); - } - block = unsafe_block_receiver.recv() => { - match block { - Ok(block) => { - match blocks.send(block).await { - Ok(_) => debug!(target: "network", "Forwarded unsafe block"), - Err(_) => warn!(target: "network", "Failed to forward unsafe block"), - } - } - Err(e) => { - warn!(target: "network", "Failed to receive unsafe block: {:?}", e); - return Err(NetworkActorError::ChannelClosed); - } - } - } - signer = self.signer.recv() => { - let Some(signer) = signer else { - warn!( - target: "network", - "Found no unsafe block signer on receive" - ); - return Err(NetworkActorError::ChannelClosed); - }; - if unsafe_block_signer.send(signer).is_err() { - warn!( - target: "network", - "Failed to send unsafe block signer to network driver", - ); - } - } - } - } - } -} - -/// An error from the network actor. -#[derive(Debug, Error)] -pub enum NetworkActorError { - /// Network builder error. - #[error(transparent)] - NetworkBuilder(#[from] NetworkBuilderError), - /// Driver startup failed. - #[error(transparent)] - DriverStartup(#[from] TransportError), - /// The network driver was missing its unsafe block receiver. - #[error("Missing unsafe block receiver in network driver")] - MissingUnsafeBlockReceiver, - /// The network driver was missing its unsafe block signer sender. - #[error("Missing unsafe block signer in network driver")] - MissingUnsafeBlockSigner, - /// Channel closed unexpectedly. - #[error("Channel closed unexpectedly")] - ChannelClosed, -} diff --git a/crates/node/service/src/actors/network/README.md b/crates/node/service/src/actors/network/README.md new file mode 100644 index 0000000000..f22a99f80f --- /dev/null +++ b/crates/node/service/src/actors/network/README.md @@ -0,0 +1,54 @@ +# Network actor + +The network actor is responsible for handling interactions with the p2p layer of the kona-node, specifically the libp2p gossip driver and the discv5 handler. + +### Example + +> **Warning** +> +> Notice, the socket address uses `0.0.0.0`. +> If you are experiencing issues connecting to peers for discovery, +> check to make sure you are not using the loopback address, +> `127.0.0.1` aka "localhost", which can prevent outward facing connections. + +```rust,no_run +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use alloy_primitives::address; +use tokio_util::sync::CancellationToken; +use kona_genesis::RollupConfig; +use kona_p2p::{LocalNode, Config}; +use kona_node_service::{NetworkActor}; +use libp2p::Multiaddr; +use discv5::enr::CombinedKey; + +#[tokio::main] +async fn main() { + // Construct the Network + let signer = address!("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + let gossip = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9099); + let mut gossip_addr = Multiaddr::from(gossip.ip()); + gossip_addr.push(libp2p::multiaddr::Protocol::Tcp(gossip.port())); + let advertise_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + + let CombinedKey::Secp256k1(k256_key) = CombinedKey::generate_secp256k1() else { + unreachable!() + }; + let disc = LocalNode::new(k256_key, advertise_ip, 9097, 9098); + + // The unsafe blocks are sent by the network actor to `blocks_rx`. This channel receiver can be + // used by external actors/modules to handle incoming unsafe blocks. + let (blocks, blocks_rx) = tokio::sync::mpsc::channel(1024); + + let (inbound_data, network) = NetworkActor::new(NetworkActor::builder(Config::new( + RollupConfig::default(), + disc, + gossip_addr, + signer + ))); + + // This will start the p2p stack of the kona-node (ie the libp2p gossip and discovery layers) + network.start(NetworkContext { blocks, cancellation: CancellationToken::new() }).await?; +} +``` + +[!WARNING]: ###example \ No newline at end of file diff --git a/crates/node/service/src/actors/network/actor.rs b/crates/node/service/src/actors/network/actor.rs new file mode 100644 index 0000000000..e5fc1f9336 --- /dev/null +++ b/crates/node/service/src/actors/network/actor.rs @@ -0,0 +1,245 @@ +use alloy_primitives::Address; +use alloy_signer::SignerSync; +use async_trait::async_trait; +use kona_p2p::P2pRpcRequest; +use libp2p::TransportError; +use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelope, OpNetworkPayloadEnvelope}; +use thiserror::Error; +use tokio::{self, select, sync::mpsc}; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; + +use crate::{ + CancellableContext, NodeActor, + actors::network::{ + builder::NetworkBuilder, driver::NetworkDriverError, error::NetworkBuilderError, + }, +}; + +/// The network actor handles two core networking components of the rollup node: +/// - *discovery*: Peer discovery over UDP using discv5. +/// - *gossip*: Block gossip over TCP using libp2p. +/// +/// The network actor itself is a light wrapper around the [`NetworkBuilder`]. +/// +/// ## Example +/// +/// ```rust,ignore +/// use kona_p2p::NetworkDriver; +/// use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +/// +/// let chain_id = 10; +/// let signer = Address::random(); +/// let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9099); +/// +/// // Construct the `Network` using the builder. +/// // let mut driver = Network::builder() +/// // .with_unsafe_block_signer(signer) +/// // .with_chain_id(chain_id) +/// // .with_gossip_addr(socket) +/// // .build() +/// // .unwrap(); +/// +/// // Construct the `NetworkActor` with the [`Network`]. +/// // let actor = NetworkActor::new(driver); +/// ``` +#[derive(Debug)] +pub struct NetworkActor { + /// Network driver + pub(super) builder: NetworkBuilder, + /// A channel to receive the unsafe block signer address. + pub(super) signer: mpsc::Receiver
, + /// Handler for RPC Requests. + pub(super) rpc: mpsc::Receiver, + /// A channel to receive unsafe blocks and send them through the gossip layer. + pub(super) publish_rx: mpsc::Receiver, +} + +/// The inbound data for the network actor. +#[derive(Debug)] +pub struct NetworkInboundData { + /// A channel to send the unsafe block signer address to the network actor. + pub signer: mpsc::Sender
, + /// Handler for RPC Requests sent to the network actor. + pub rpc: mpsc::Sender, + /// A channel to send unsafe blocks to the network actor. + pub unsafe_blocks: mpsc::Sender, +} + +impl NetworkActor { + /// Constructs a new [`NetworkActor`] given the [`NetworkBuilder`] + pub fn new(driver: NetworkBuilder) -> (NetworkInboundData, Self) { + let (signer_tx, signer_rx) = mpsc::channel(16); + let (rpc_tx, rpc_rx) = mpsc::channel(1024); + let (publish_tx, publish_rx) = tokio::sync::mpsc::channel(256); + let actor = Self { builder: driver, signer: signer_rx, rpc: rpc_rx, publish_rx }; + let outbound_data = + NetworkInboundData { signer: signer_tx, rpc: rpc_tx, unsafe_blocks: publish_tx }; + (outbound_data, actor) + } +} + +/// The communication context used by the network actor. +#[derive(Debug)] +pub struct NetworkContext { + /// The channel used by the sequencer actor for sending unsafe blocks to the network. + pub blocks: mpsc::Sender, + /// Cancels the network actor. + pub cancellation: CancellationToken, +} + +impl CancellableContext for NetworkContext { + fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancellation.cancelled() + } +} + +/// An error from the network actor. +#[derive(Debug, Error)] +pub enum NetworkActorError { + /// Network builder error. + #[error(transparent)] + NetworkBuilder(#[from] NetworkBuilderError), + /// Network driver error. + #[error(transparent)] + NetworkDriver(#[from] NetworkDriverError), + /// Driver startup failed. + #[error(transparent)] + DriverStartup(#[from] TransportError), + /// The network driver was missing its unsafe block receiver. + #[error("Missing unsafe block receiver in network driver")] + MissingUnsafeBlockReceiver, + /// The network driver was missing its unsafe block signer sender. + #[error("Missing unsafe block signer in network driver")] + MissingUnsafeBlockSigner, + /// Channel closed unexpectedly. + #[error("Channel closed unexpectedly")] + ChannelClosed, +} + +#[async_trait] +impl NodeActor for NetworkActor { + type Error = NetworkActorError; + type InboundData = NetworkInboundData; + type OutboundData = NetworkContext; + type Builder = NetworkBuilder; + + fn build(state: Self::Builder) -> (Self::InboundData, Self) { + Self::new(state) + } + + async fn start( + mut self, + NetworkContext { blocks, cancellation }: Self::OutboundData, + ) -> Result<(), Self::Error> { + let local_signer = self.builder.local_signer.clone(); + + let mut handler = self.builder.build()?.start().await?; + + // New unsafe block channel. + let (unsafe_block_tx, mut unsafe_block_rx) = tokio::sync::mpsc::unbounded_channel(); + + loop { + select! { + _ = cancellation.cancelled() => { + info!( + target: "network", + "Received shutdown signal. Exiting network task." + ); + return Ok(()); + } + block = unsafe_block_rx.recv() => { + let Some(block) = block else { + error!(target: "node::p2p", "The unsafe block receiver channel has closed"); + return Err(NetworkActorError::ChannelClosed); + }; + + if blocks.send(block).await.is_err() { + warn!(target: "network", "Failed to forward unsafe block"); + return Err(NetworkActorError::ChannelClosed); + } + } + signer = self.signer.recv() => { + let Some(signer) = signer else { + warn!( + target: "network", + "Found no unsafe block signer on receive" + ); + return Err(NetworkActorError::ChannelClosed); + }; + if handler.unsafe_block_signer_sender.send(signer).is_err() { + warn!( + target: "network", + "Failed to send unsafe block signer to network handler", + ); + } + } + Some(block) = self.publish_rx.recv(), if !self.publish_rx.is_closed() => { + let timestamp = block.payload.timestamp(); + let selector = |handler: &kona_p2p::BlockHandler| { + handler.topic(timestamp) + }; + let Some(signer) = local_signer.as_ref() else { + warn!(target: "net", "No local signer available to sign the payload"); + continue; + }; + use ssz::Encode; + let ssz_bytes = block.as_ssz_bytes(); + let Ok(signature) = signer.sign_message_sync(&ssz_bytes) else { + warn!(target: "net", "Failed to sign the payload bytes"); + continue; + }; + let payload_hash = block.payload_hash(); + let payload = OpNetworkPayloadEnvelope { + payload: block.payload, + signature, + payload_hash, + parent_beacon_block_root: block.parent_beacon_block_root, + }; + match handler.gossip.publish(selector, Some(payload)) { + Ok(id) => info!("Published unsafe payload | {:?}", id), + Err(e) => warn!("Failed to publish unsafe payload: {:?}", e), + } + } + event = handler.gossip.next() => { + let Some(event) = event else { + error!(target: "node::p2p", "The gossip swarm stream has ended"); + return Err(NetworkActorError::ChannelClosed); + }; + + if let Some(payload) = handler.gossip.handle_event(event) { + if unsafe_block_tx.send(payload.into()).is_err() { + warn!(target: "node::p2p", "Failed to send unsafe block to network handler"); + } + } + }, + enr = handler.enr_receiver.recv() => { + let Some(enr) = enr else { + error!(target: "node::p2p", "The enr receiver channel has closed"); + return Err(NetworkActorError::ChannelClosed); + }; + handler.gossip.dial(enr); + }, + _ = handler.peer_score_inspector.tick(), if handler.gossip.peer_monitoring.as_ref().is_some() => { + handler.handle_peer_monitoring().await; + }, + req = self.rpc.recv(), if !self.rpc.is_closed() => { + let Some(req) = req else { + error!(target: "node::p2p", "The rpc receiver channel has closed"); + return Err(NetworkActorError::ChannelClosed); + }; + let payload = match req { + P2pRpcRequest::PostUnsafePayload { payload } => payload, + req => { + req.handle(&mut handler.gossip, &handler.discovery); + continue; + } + }; + debug!(target: "node::p2p", "Broadcasting unsafe payload from admin api"); + if unsafe_block_tx.send(payload).is_err() { + warn!(target: "node::p2p", "Failed to send unsafe block to network handler"); + } + }, + } + } + } +} diff --git a/crates/node/p2p/src/net/builder.rs b/crates/node/service/src/actors/network/builder.rs similarity index 81% rename from crates/node/p2p/src/net/builder.rs rename to crates/node/service/src/actors/network/builder.rs index a4eb38c02c..e88b1ac4ca 100644 --- a/crates/node/p2p/src/net/builder.rs +++ b/crates/node/service/src/actors/network/builder.rs @@ -4,32 +4,29 @@ use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; use discv5::{Config as Discv5Config, Enr}; use kona_genesis::RollupConfig; +use kona_p2p::{Discv5Builder, GaterConfig, GossipDriverBuilder, LocalNode}; use kona_peers::{PeerMonitoring, PeerScoreLevel}; use libp2p::{Multiaddr, identity::Keypair}; -use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; use std::{path::PathBuf, time::Duration}; -use tokio::sync::broadcast::Sender as BroadcastSender; use crate::{ - Broadcast, Config, Discv5Builder, GossipDriverBuilder, Network, NetworkBuilderError, - discv5::LocalNode, gossip::GaterConfig, + NetworkBuilderError, + actors::network::{NetworkConfig, NetworkDriver}, }; -/// Constructs a [`Network`] for the OP Stack Consensus Layer. +/// Constructs a [`NetworkDriver`] for the OP Stack Consensus Layer. #[derive(Debug)] pub struct NetworkBuilder { /// The discovery driver. - discovery: Discv5Builder, + pub(super) discovery: Discv5Builder, /// The gossip driver. - gossip: GossipDriverBuilder, - /// A broadcast sender for the unsafe block payloads. - payload_tx: Option>, + pub(super) gossip: GossipDriverBuilder, /// A local signer for payloads. - local_signer: Option, + pub(super) local_signer: Option, } -impl From for NetworkBuilder { - fn from(config: Config) -> Self { +impl From for NetworkBuilder { + fn from(config: NetworkConfig) -> Self { Self::new( config.rollup_config, config.unsafe_block_signer, @@ -73,7 +70,6 @@ impl NetworkBuilder { gossip_addr, keypair, ), - payload_tx: None, local_signer: None, } } @@ -83,12 +79,12 @@ impl NetworkBuilder { Self { gossip: self.gossip.with_gater_config(config), ..self } } - /// Sets the local signer for the [`Network`]. + /// Sets the local signer for the [`NetworkBuilder`]. pub fn with_local_signer(self, local_signer: Option) -> Self { Self { local_signer, ..self } } - /// Sets the bootstore path for the [`crate::Discv5Driver`]. + /// Sets the bootstore path for the [`Discv5Builder`]. pub fn with_bootstore(self, bootstore: Option) -> Self { if let Some(bootstore) = bootstore { return Self { discovery: self.discovery.with_bootstore(bootstore), ..self }; @@ -111,69 +107,52 @@ impl NetworkBuilder { Self { gossip: self.gossip.with_peer_scoring(level), ..self } } - /// Sets topic scoring for the [`crate::GossipDriver`]. + /// Sets topic scoring for the [`GossipDriverBuilder`]. pub fn with_topic_scoring(self, topic_scoring: bool) -> Self { Self { gossip: self.gossip.with_topic_scoring(topic_scoring), ..self } } - /// Sets the peer monitoring for the [`crate::GossipDriver`]. + /// Sets the peer monitoring for the [`GossipDriverBuilder`]. pub fn with_peer_monitoring(self, peer_monitoring: Option) -> Self { Self { gossip: self.gossip.with_peer_monitoring(peer_monitoring), ..self } } - /// Sets the discovery interval for the [`crate::Discv5Driver`]. + /// Sets the discovery interval for the [`Discv5Builder`]. pub fn with_discovery_interval(self, interval: tokio::time::Duration) -> Self { Self { discovery: self.discovery.with_interval(interval), ..self } } - /// Sets the address for the [`crate::Discv5Driver`]. + /// Sets the address for the [`Discv5Builder`]. pub fn with_discovery_address(self, address: LocalNode) -> Self { Self { discovery: self.discovery.with_local_node(address), ..self } } - /// Sets the gossipsub config for the [`crate::GossipDriver`]. + /// Sets the gossipsub config for the [`GossipDriverBuilder`]. pub fn with_gossip_config(self, config: libp2p::gossipsub::Config) -> Self { Self { gossip: self.gossip.with_config(config), ..self } } - /// Sets the [`Discv5Config`] for the [`crate::Discv5Driver`]. + /// Sets the [`Discv5Config`] for the [`Discv5Builder`]. pub fn with_discovery_config(self, config: Discv5Config) -> Self { Self { discovery: self.discovery.with_discovery_config(config), ..self } } - /// Sets the gossip address for the [`crate::GossipDriver`]. + /// Sets the gossip address for the [`GossipDriverBuilder`]. pub fn with_gossip_address(self, addr: Multiaddr) -> Self { Self { gossip: self.gossip.with_address(addr), ..self } } - /// Sets the timeout for the [`crate::GossipDriver`]. + /// Sets the timeout for the [`GossipDriverBuilder`]. pub fn with_timeout(self, timeout: Duration) -> Self { Self { gossip: self.gossip.with_timeout(timeout), ..self } } - /// Sets the unsafe block sender for the [`crate::Network`]. - pub fn with_unsafe_block_sender( - self, - sender: BroadcastSender, - ) -> Self { - Self { payload_tx: Some(sender), ..self } - } - - /// Builds the [`Network`]. - pub fn build(self) -> Result { + /// Builds the [`NetworkDriver`]. + pub fn build(self) -> Result { let (gossip, unsafe_block_signer_sender) = self.gossip.build()?; let discovery = self.discovery.build()?; - let payload_tx = self.payload_tx.unwrap_or(tokio::sync::broadcast::channel(256).0); - let (_, publish_rx) = tokio::sync::mpsc::channel(256); - - Ok(Network { - gossip, - discovery, - unsafe_block_signer_sender, - broadcast: Broadcast::new(payload_tx), - publish_rx, - local_signer: self.local_signer, - }) + + Ok(NetworkDriver { gossip, discovery, unsafe_block_signer_sender }) } } diff --git a/crates/node/p2p/src/net/config.rs b/crates/node/service/src/actors/network/config.rs similarity index 92% rename from crates/node/p2p/src/net/config.rs rename to crates/node/service/src/actors/network/config.rs index aa675512be..b5495a0fa7 100644 --- a/crates/node/p2p/src/net/config.rs +++ b/crates/node/service/src/actors/network/config.rs @@ -1,10 +1,10 @@ //! Configuration for the `Network`. -use crate::{discv5::LocalNode, gossip::GaterConfig}; use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; use discv5::Enr; use kona_genesis::RollupConfig; +use kona_p2p::{GaterConfig, LocalNode}; use kona_peers::{PeerMonitoring, PeerScoreLevel}; use libp2p::{Multiaddr, identity::Keypair}; use std::path::PathBuf; @@ -12,7 +12,7 @@ use tokio::time::Duration; /// Configuration for kona's P2P stack. #[derive(Debug, Clone)] -pub struct Config { +pub struct NetworkConfig { /// Discovery Config. pub discovery_config: discv5::Config, /// The local node's advertised address to external peers. @@ -48,12 +48,12 @@ pub struct Config { pub local_signer: Option, } -impl Config { +impl NetworkConfig { const DEFAULT_DISCOVERY_INTERVAL: Duration = Duration::from_secs(5); const DEFAULT_DISCOVERY_RANDOMIZE: Option = None; - /// Creates a new [`Config`] with the given [`RollupConfig`] with the minimum required fields. - /// Generates a random keypair for the node. + /// Creates a new [`NetworkConfig`] with the given [`RollupConfig`] with the minimum required + /// fields. Generates a random keypair for the node. pub fn new( rollup_config: RollupConfig, discovery_listen: LocalNode, diff --git a/crates/node/service/src/actors/network/driver.rs b/crates/node/service/src/actors/network/driver.rs new file mode 100644 index 0000000000..22f392b07d --- /dev/null +++ b/crates/node/service/src/actors/network/driver.rs @@ -0,0 +1,47 @@ +use alloy_primitives::Address; +use kona_p2p::{ConnectionGater, Discv5Driver, GossipDriver, PEER_SCORE_INSPECT_FREQUENCY}; +use libp2p::TransportError; +use tokio::sync::watch; + +use crate::actors::network::handler::NetworkHandler; + +/// A network driver. This is the driver that is used to start the network. +#[derive(Debug)] +pub struct NetworkDriver { + /// The gossip driver. + pub gossip: GossipDriver, + /// The discovery driver. + pub discovery: Discv5Driver, + /// The unsafe block signer sender. + pub unsafe_block_signer_sender: watch::Sender
, +} + +/// An error from the [`NetworkDriver`]. +#[derive(Debug, thiserror::Error)] +pub enum NetworkDriverError { + /// An error occurred starting the libp2p Swarm. + #[error("error starting libp2p Swarm")] + GossipStartError(#[from] TransportError), +} + +impl NetworkDriver { + /// Starts the network. + pub async fn start(mut self) -> Result { + // Start the discovery service. + let (handler, enr_receiver) = self.discovery.start(); + + // Start the libp2p Swarm + self.gossip.start().await?; + + // We are checking the peer scores every [`PEER_SCORE_INSPECT_FREQUENCY`] seconds. + let peer_score_inspector = tokio::time::interval(*PEER_SCORE_INSPECT_FREQUENCY); + + Ok(NetworkHandler { + gossip: self.gossip, + discovery: handler, + enr_receiver, + unsafe_block_signer_sender: self.unsafe_block_signer_sender, + peer_score_inspector, + }) + } +} diff --git a/crates/node/p2p/src/net/error.rs b/crates/node/service/src/actors/network/error.rs similarity index 88% rename from crates/node/p2p/src/net/error.rs rename to crates/node/service/src/actors/network/error.rs index 3ca0960889..58ff456b93 100644 --- a/crates/node/p2p/src/net/error.rs +++ b/crates/node/service/src/actors/network/error.rs @@ -1,6 +1,6 @@ //! Contains the error type for the network driver builder. -use crate::{Discv5BuilderError, GossipDriverBuilderError}; +use kona_p2p::{Discv5BuilderError, GossipDriverBuilderError}; /// An error from the [`crate::NetworkBuilder`]. #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] diff --git a/crates/node/service/src/actors/network/handler.rs b/crates/node/service/src/actors/network/handler.rs new file mode 100644 index 0000000000..810f66a406 --- /dev/null +++ b/crates/node/service/src/actors/network/handler.rs @@ -0,0 +1,101 @@ +use std::collections::HashSet; + +use alloy_primitives::Address; +use discv5::Enr; +use kona_p2p::{ConnectionGater, Discv5Handler, GossipDriver, HandlerRequest}; +use tokio::sync::{mpsc, watch}; + +/// A network handler used to communicate with the network once it is started. +#[derive(Debug)] +pub struct NetworkHandler { + /// The gossip driver. + pub gossip: GossipDriver, + /// The discovery handler. + pub discovery: Discv5Handler, + /// The receiver for the ENRs. + pub enr_receiver: mpsc::Receiver, + /// The sender for the unsafe block signer. + pub unsafe_block_signer_sender: watch::Sender
, + /// The peer score inspector. Is used to ban peers that are below a given threshold. + pub peer_score_inspector: tokio::time::Interval, +} + +impl NetworkHandler { + pub(super) async fn handle_peer_monitoring(&mut self) { + // Inspect peer scores and ban peers that are below the threshold. + let Some(ban_peers) = self.gossip.peer_monitoring.as_ref() else { + return; + }; + + // We iterate over all connected peers and check their scores. + // We collect a list of peers to remove + let peers_to_remove = self + .gossip + .swarm + .connected_peers() + .filter_map(|peer_id| { + // If the score is not available, we use a default value of 0. + let score = + self.gossip.swarm.behaviour().gossipsub.peer_score(peer_id).unwrap_or_default(); + + // Record the peer score in the metrics. + kona_macros::record!( + histogram, + kona_p2p::Metrics::PEER_SCORES, + "peer", + peer_id.to_string(), + score + ); + + if score < ban_peers.ban_threshold { + return Some(*peer_id); + } + + None + }) + .collect::>(); + + // We remove the addresses from the gossip layer. + let addrs_to_ban = peers_to_remove.into_iter().filter_map(|peer_to_remove| { + // In that case, we ban the peer. This means... + // 1. We remove the peer from the network gossip. + // 2. We ban the peer from the discv5 service. + if self.gossip.swarm.disconnect_peer_id(peer_to_remove).is_err() { + warn!(peer = ?peer_to_remove, "Trying to disconnect a non-existing peer from the gossip driver."); + } + + // Record the duration of the peer connection. + if let Some(start_time) = self.gossip.peer_connection_start.remove(&peer_to_remove) { + let peer_duration = start_time.elapsed(); + kona_macros::record!( + histogram, + kona_p2p::Metrics::GOSSIP_PEER_CONNECTION_DURATION_SECONDS, + peer_duration.as_secs_f64() + ); + } + + if let Some(info) = self.gossip.peerstore.remove(&peer_to_remove){ + use kona_p2p::ConnectionGate; + self.gossip.connection_gate.remove_dial(&peer_to_remove); + let score = self.gossip.swarm.behaviour().gossipsub.peer_score(&peer_to_remove).unwrap_or_default(); + kona_macros::inc!(gauge, kona_p2p::Metrics::BANNED_PEERS, "peer_id" => peer_to_remove.to_string(), "score" => score.to_string()); + return Some(info.listen_addrs); + } + + None + }).collect::>().into_iter().flatten().collect::>(); + + // We send a request to the discovery handler to ban the set of addresses. + if let Err(send_err) = self + .discovery + .sender + .send(HandlerRequest::BanAddrs { + addrs_to_ban: addrs_to_ban.into(), + ban_duration: ban_peers.ban_duration, + }) + .await + { + warn!(err = ?send_err, "Impossible to send a request to the discovery handler. The channel connection is dropped."); + } + } +} diff --git a/crates/node/service/src/actors/network/mod.rs b/crates/node/service/src/actors/network/mod.rs new file mode 100644 index 0000000000..c900cec27e --- /dev/null +++ b/crates/node/service/src/actors/network/mod.rs @@ -0,0 +1,19 @@ +//! Network Actor + +mod actor; +pub use actor::{NetworkActor, NetworkActorError, NetworkContext, NetworkInboundData}; + +mod builder; +pub use builder::NetworkBuilder; + +mod driver; +pub use driver::{NetworkDriver, NetworkDriverError}; + +mod error; +pub use error::NetworkBuilderError; + +mod handler; +pub use handler::NetworkHandler; + +mod config; +pub use config::NetworkConfig; diff --git a/crates/node/service/src/lib.rs b/crates/node/service/src/lib.rs index cc98a5a8a8..48be692ed2 100644 --- a/crates/node/service/src/lib.rs +++ b/crates/node/service/src/lib.rs @@ -19,9 +19,10 @@ pub use actors::{ EngineBuilder, EngineContext, EngineError, EngineInboundData, InboundDerivationMessage, L1OriginSelector, L1OriginSelectorError, L1WatcherRpc, L1WatcherRpcContext, L1WatcherRpcError, L1WatcherRpcInboundChannels, L1WatcherRpcState, L2Finalizer, NetworkActor, NetworkActorError, - NetworkContext, NetworkInboundData, NodeActor, PipelineBuilder, RpcActor, RpcActorError, - RpcContext, RuntimeActor, RuntimeContext, RuntimeState, SequencerActor, SequencerActorError, - SequencerBuilder, SequencerContext, SequencerInboundData, SupervisorActor, + NetworkBuilder, NetworkBuilderError, NetworkConfig, NetworkContext, NetworkDriver, + NetworkDriverError, NetworkHandler, NetworkInboundData, NodeActor, PipelineBuilder, RpcActor, + RpcActorError, RpcContext, RuntimeActor, RuntimeContext, RuntimeState, SequencerActor, + SequencerActorError, SequencerBuilder, SequencerContext, SequencerInboundData, SupervisorActor, SupervisorActorContext, SupervisorActorError, SupervisorExt, SupervisorInboundData, SupervisorRpcServerExt, }; diff --git a/crates/node/service/src/service/core.rs b/crates/node/service/src/service/core.rs index e66d51fe9b..c3d7400cc8 100644 --- a/crates/node/service/src/service/core.rs +++ b/crates/node/service/src/service/core.rs @@ -166,7 +166,7 @@ pub trait RollupNodeService { ) = Self::EngineActor::build(self.engine_builder()); // Create the p2p actor. - let (NetworkInboundData { signer, rpc: network_rpc }, network) = + let (NetworkInboundData { signer, rpc: network_rpc, unsafe_blocks: _ }, network) = Self::NetworkActor::build(self.network_builder()); // Create the RPC server actor. diff --git a/crates/node/service/src/service/standard/builder.rs b/crates/node/service/src/service/standard/builder.rs index 8f211d51da..2de2216dca 100644 --- a/crates/node/service/src/service/standard/builder.rs +++ b/crates/node/service/src/service/standard/builder.rs @@ -1,6 +1,8 @@ //! Contains the builder for the [`RollupNode`]. -use crate::{EngineBuilder, InteropMode, NodeMode, RollupNode, actors::RuntimeState}; +use crate::{ + EngineBuilder, InteropMode, NetworkConfig, NodeMode, RollupNode, actors::RuntimeState, +}; use alloy_primitives::Bytes; use alloy_provider::RootProvider; use alloy_rpc_client::RpcClient; @@ -16,7 +18,6 @@ use tower::ServiceBuilder; use url::Url; use kona_genesis::RollupConfig; -use kona_p2p::Config; use kona_providers_alloy::OnlineBeaconClient; use kona_rpc::{RpcBuilder, SupervisorRpcConfig}; @@ -35,8 +36,8 @@ pub struct RollupNodeBuilder { l2_provider_rpc_url: Option, /// The JWT secret. jwt_secret: Option, - /// The [`Config`]. - p2p_config: Option, + /// The [`NetworkConfig`]. + p2p_config: Option, /// An RPC Configuration. rpc_config: Option, /// An RPC Configuration for the supervisor rpc. @@ -95,8 +96,8 @@ impl RollupNodeBuilder { Self { jwt_secret: Some(jwt_secret), ..self } } - /// Appends the P2P [`Config`] to the builder. - pub fn with_p2p_config(self, config: Config) -> Self { + /// Appends the P2P [`NetworkConfig`] to the builder. + pub fn with_p2p_config(self, config: NetworkConfig) -> Self { Self { p2p_config: Some(config), ..self } } diff --git a/crates/node/service/src/service/standard/node.rs b/crates/node/service/src/service/standard/node.rs index 4d49e922d2..06056a7163 100644 --- a/crates/node/service/src/service/standard/node.rs +++ b/crates/node/service/src/service/standard/node.rs @@ -1,8 +1,8 @@ //! Contains the [`RollupNode`] implementation. use crate::{ DerivationActor, DerivationBuilder, EngineActor, EngineBuilder, InteropMode, L1WatcherRpc, - L1WatcherRpcState, NetworkActor, NodeMode, RollupNodeBuilder, RollupNodeService, RpcActor, - RuntimeActor, SupervisorActor, SupervisorRpcServerExt, + L1WatcherRpcState, NetworkActor, NetworkBuilder, NetworkConfig, NodeMode, RollupNodeBuilder, + RollupNodeService, RpcActor, RuntimeActor, SupervisorActor, SupervisorRpcServerExt, actors::{RuntimeState, SequencerActor, SequencerBuilder}, }; use alloy_provider::RootProvider; @@ -12,7 +12,6 @@ use op_alloy_network::Optimism; use std::sync::Arc; use kona_genesis::RollupConfig; -use kona_p2p::{Config, NetworkBuilder}; use kona_providers_alloy::{ AlloyChainProvider, AlloyL2ChainProvider, OnlineBeaconClient, OnlinePipeline, }; @@ -36,8 +35,8 @@ pub struct RollupNode { pub(crate) engine_builder: EngineBuilder, /// The [`RpcBuilder`] for the node. pub(crate) rpc_builder: Option, - /// The P2P [`Config`] for the node. - pub(crate) p2p_config: Config, + /// The P2P [`NetworkConfig`] for the node. + pub(crate) p2p_config: NetworkConfig, /// The [`RuntimeState`] for the runtime loading service. pub(crate) runtime_builder: Option, /// The supervisor rpc server config. diff --git a/examples/gossip/Cargo.toml b/examples/gossip/Cargo.toml index 47f9fdcd59..28d151d826 100644 --- a/examples/gossip/Cargo.toml +++ b/examples/gossip/Cargo.toml @@ -10,9 +10,11 @@ anyhow.workspace = true tracing.workspace = true kona-cli.workspace = true kona-p2p.workspace = true +kona-node-service.workspace = true kona-registry.workspace = true libp2p.workspace = true clap = { workspace = true, features = ["derive", "env"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tokio-util.workspace = true tracing-subscriber = { workspace = true, features = ["fmt", "env-filter"] } discv5.workspace = true diff --git a/examples/gossip/src/main.rs b/examples/gossip/src/main.rs index eb4c7f5501..e526e1c9ea 100644 --- a/examples/gossip/src/main.rs +++ b/examples/gossip/src/main.rs @@ -20,13 +20,15 @@ use clap::{ArgAction, Parser}; use discv5::enr::CombinedKey; use kona_cli::init_tracing_subscriber; -use kona_p2p::{Config, LocalNode, Network}; +use kona_node_service::{NetworkActor, NetworkConfig, NetworkContext, NodeActor}; +use kona_p2p::LocalNode; use kona_registry::ROLLUP_CONFIGS; use libp2p::{Multiaddr, identity::Keypair}; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, }; +use tokio_util::sync::CancellationToken; use tracing_subscriber::EnvFilter; /// The gossip command. @@ -82,40 +84,49 @@ impl GossipCommand { let disc_addr = LocalNode::new(secret_key, IpAddr::V4(disc_ip), self.disc_port, self.disc_port); - let mut network = Network::builder(Config { - discovery_address: disc_addr, - gossip_address: gossip_addr, - unsafe_block_signer: signer, - discovery_config: discv5::ConfigBuilder::new(discv5::ListenConfig::Ipv4 { - ip: disc_ip, - port: self.disc_port, + let (_, network) = NetworkActor::new( + NetworkConfig { + discovery_address: disc_addr, + gossip_address: gossip_addr, + unsafe_block_signer: signer, + discovery_config: discv5::ConfigBuilder::new(discv5::ListenConfig::Ipv4 { + ip: disc_ip, + port: self.disc_port, + }) + .build(), + discovery_interval: Duration::from_secs(self.interval), + discovery_randomize: None, + keypair: Keypair::generate_secp256k1(), + gossip_config: Default::default(), + scoring: Default::default(), + topic_scoring: Default::default(), + monitor_peers: Default::default(), + bootstore: None, + gater_config: Default::default(), + bootnodes: Default::default(), + rollup_config: rollup_config.clone(), + local_signer: None, + } + .into(), + ); + + let (unsafe_blocks_tx, mut unsafe_blocks_rx) = tokio::sync::mpsc::channel(1024); + + network + .start(NetworkContext { + blocks: unsafe_blocks_tx, + cancellation: CancellationToken::new(), }) - .build(), - discovery_interval: Duration::from_secs(self.interval), - discovery_randomize: None, - keypair: Keypair::generate_secp256k1(), - gossip_config: Default::default(), - scoring: Default::default(), - topic_scoring: Default::default(), - monitor_peers: Default::default(), - bootstore: None, - gater_config: Default::default(), - bootnodes: Default::default(), - rollup_config: rollup_config.clone(), - local_signer: None, - }) - .build()?; + .await?; - let mut recv = network.unsafe_block_recv(); - network.start(None).await?; tracing::info!("Gossip driver started, receiving blocks."); loop { - match recv.recv().await { - Ok(block) => { - tracing::info!("Received unsafe block: {:?}", block); + match unsafe_blocks_rx.recv().await { + Some(block) => { + tracing::info!(target: "gossip", "Received unsafe block: {:?}", block); } - Err(e) => { - tracing::warn!("Failed to receive unsafe block: {:?}", e); + None => { + tracing::warn!(target: "gossip", "unsafe block gossip channel closed"); } } }