diff --git a/Cargo.lock b/Cargo.lock index 6d6328cd4e..a030ef46e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4787,6 +4787,7 @@ dependencies = [ "alloy-signer-local", "alloy-transport", "alloy-transport-http", + "anyhow", "arbitrary", "async-stream", "async-trait", diff --git a/bin/node/src/flags/p2p.rs b/bin/node/src/flags/p2p.rs index 8916a9ab50..45707fc399 100644 --- a/bin/node/src/flags/p2p.rs +++ b/bin/node/src/flags/p2p.rs @@ -270,26 +270,6 @@ impl P2PArgs { Ok(()) } - /// Returns the [`discv5::Config`] from the CLI arguments. - pub fn discv5_config( - &self, - listen_config: discv5::ListenConfig, - static_ip: bool, - ) -> discv5::Config { - // We can use a default listen config here since it - // will be overridden by the discovery service builder. - let mut builder = discv5::ConfigBuilder::new(listen_config); - - if static_ip { - builder.disable_enr_update(); - - // If we have a static IP, we don't want to use any kind of NAT discovery mechanism. - builder.auto_nat_listen_duration(None); - } - - builder.build() - } - /// Returns the private key as specified in the raw cli flag or via file path. pub fn private_key(&self) -> Option { if let Some(key) = self.private_key { @@ -399,7 +379,8 @@ impl P2PArgs { }); let discovery_listening_address = SocketAddr::new(self.listen_ip, self.listen_udp_port); - let discovery_config = self.discv5_config(discovery_listening_address.into(), static_ip); + let discovery_config = + NetworkConfig::discv5_config(discovery_listening_address.into(), static_ip); let mut gossip_address = libp2p::Multiaddr::from(self.listen_ip); gossip_address.push(libp2p::multiaddr::Protocol::Tcp(self.listen_tcp_port)); diff --git a/crates/node/service/Cargo.toml b/crates/node/service/Cargo.toml index 9862a4008a..faf6007924 100644 --- a/crates/node/service/Cargo.toml +++ b/crates/node/service/Cargo.toml @@ -72,6 +72,9 @@ metrics = { workspace = true, optional = true } rstest.workspace = true arbitrary.workspace = true rand.workspace = true +anyhow.workspace = true +backon.workspace = true +alloy-primitives = { workspace = true, features = ["k256"] } alloy-rpc-types-engine = { workspace = true, features = ["arbitrary"] } [features] diff --git a/crates/node/service/src/actors/network/config.rs b/crates/node/service/src/actors/network/config.rs index 814a324ae4..2d72ff4266 100644 --- a/crates/node/service/src/actors/network/config.rs +++ b/crates/node/service/src/actors/network/config.rs @@ -52,6 +52,22 @@ impl NetworkConfig { const DEFAULT_DISCOVERY_INTERVAL: Duration = Duration::from_secs(5); const DEFAULT_DISCOVERY_RANDOMIZE: Option = None; + /// Returns the [`discv5::Config`] from the CLI arguments. + pub fn discv5_config(listen_config: discv5::ListenConfig, static_ip: bool) -> discv5::Config { + // We can use a default listen config here since it + // will be overridden by the discovery service builder. + let mut builder = discv5::ConfigBuilder::new(listen_config); + + if static_ip { + builder.disable_enr_update(); + + // If we have a static IP, we don't want to use any kind of NAT discovery mechanism. + builder.auto_nat_listen_duration(None); + } + + builder.build() + } + /// Creates a new [`NetworkConfig`] with the given [`RollupConfig`] with the minimum required /// fields. Generates a random keypair for the node. pub fn new( diff --git a/crates/node/service/tests/actors/mod.rs b/crates/node/service/tests/actors/mod.rs new file mode 100644 index 0000000000..3b8cea91c3 --- /dev/null +++ b/crates/node/service/tests/actors/mod.rs @@ -0,0 +1,5 @@ +//! Integration tests for the node actors. + +mod network; + +pub(crate) mod utils; diff --git a/crates/node/service/tests/actors/network/mocks.rs b/crates/node/service/tests/actors/network/mocks.rs new file mode 100644 index 0000000000..66ff80c51f --- /dev/null +++ b/crates/node/service/tests/actors/network/mocks.rs @@ -0,0 +1,154 @@ +//! Tests interactions with sequencer actor's inputs channels. + +use alloy_chains::Chain; +use alloy_signer::k256; + +use alloy_primitives::Address; +use discv5::{ConfigBuilder, Enr, ListenConfig}; +use kona_disc::LocalNode; +use kona_genesis::RollupConfig; +use kona_gossip::{P2pRpcRequest, PeerDump, PeerInfo}; +use kona_node_service::{ + NetworkActor, NetworkActorError, NetworkBuilder, NetworkContext, NetworkInboundData, NodeActor, +}; +use libp2p::{Multiaddr, identity::Keypair, multiaddr::Protocol}; +use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelope; +use std::str::FromStr; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; + +use crate::actors::utils::SeedGenerator; + +pub(super) struct TestNetwork { + inbound_data: NetworkInboundData, + /// We'll remove those fields as we add more tests. + #[allow(dead_code)] + blocks_rx: mpsc::Receiver, + #[allow(dead_code)] + handle: JoinHandle>, +} + +#[derive(Debug, thiserror::Error)] +pub(super) enum TestNetworkError { + #[error("P2p receiver closed")] + P2pReceiverClosed, + #[error("P2p receiver closed before sending response: {0}")] + OneshotError(#[from] oneshot::error::RecvError), + #[error("Peer info missing ENR")] + PeerInfoMissingEnr, + #[error("Invalid ENR: {0}")] + InvalidEnr(String), + #[error("Peer not connected")] + PeerNotConnected, +} + +fn rollup_config() -> RollupConfig { + RollupConfig { l2_chain_id: Chain::from_id(19_934_000), ..Default::default() } +} + +impl TestNetwork { + pub(super) fn new(bootnodes: Vec, seed_generator: &mut SeedGenerator) -> Self { + let unsafe_block_signer = Address::ZERO; + let keypair = Keypair::generate_secp256k1(); + let secp256k1_key = keypair.clone().try_into_secp256k1() + .map_err(|e| anyhow::anyhow!("Impossible to convert keypair to secp256k1. This is a bug since we only support secp256k1 keys: {e}")).unwrap() + .secret().to_bytes(); + let local_node_key = k256::ecdsa::SigningKey::from_bytes(&secp256k1_key.into()) + .map_err(|e| anyhow::anyhow!("Impossible to convert keypair to k256 signing key. This is a bug since we only support secp256k1 keys: {e}")).unwrap(); + + let node_addr = seed_generator.next_loopback_address(); + + let discovery_port = seed_generator.next_port(); + + let discovery_config = ConfigBuilder::new(ListenConfig::from_ip(node_addr, discovery_port)) + // Only allow loopback addresses. + .table_filter(|enr| { + let Some(ip) = enr.ip4() else { + return false; + }; + + ip.is_loopback() + }) + .build(); + + let gossip_port = seed_generator.next_port(); + let mut gossip_multiaddr = Multiaddr::from(node_addr); + gossip_multiaddr.push(Protocol::Tcp(gossip_port)); + + let gossip_config = kona_gossip::default_config_builder().build().unwrap(); + + // Create a new network actor. No external connections + let builder = NetworkBuilder::new( + // Create a new rollup config. We don't need to specify any of the fields. + rollup_config(), + unsafe_block_signer, + gossip_multiaddr, + keypair, + LocalNode::new(local_node_key, node_addr, gossip_port, discovery_port), + discovery_config, + ) + .with_bootnodes(bootnodes) + .with_gossip_config(gossip_config); + + let (inbound_data, actor) = NetworkActor::new(builder); + + let (blocks_tx, blocks_rx) = mpsc::channel(1024); + let cancellation = CancellationToken::new(); + + let context = NetworkContext { blocks: blocks_tx, cancellation }; + + let handle = tokio::spawn(async move { actor.start(context).await }); + + Self { inbound_data, blocks_rx, handle } + } + + pub(super) async fn peer_info(&self) -> Result { + // Try to get the peer info. Send a peer info request to the network actor. + let (peer_info_tx, peer_info_rx) = oneshot::channel(); + let peer_info_request = P2pRpcRequest::PeerInfo(peer_info_tx); + self.inbound_data + .p2p_rpc + .send(peer_info_request) + .await + .map_err(|_| TestNetworkError::P2pReceiverClosed)?; + + let info = peer_info_rx.await?; + + Ok(info) + } + + pub(super) async fn peers(&self) -> Result { + let (peers_tx, peers_rx) = oneshot::channel(); + let peers_request = P2pRpcRequest::Peers { out: peers_tx, connected: true }; + self.inbound_data + .p2p_rpc + .send(peers_request) + .await + .map_err(|_| TestNetworkError::P2pReceiverClosed)?; + let peers = peers_rx.await?; + Ok(peers) + } + + pub(super) async fn is_connected_to(&self, other: &Self) -> Result<(), TestNetworkError> { + let other_peer_id = other.peer_id().await?; + let peers = self.peers().await?; + if !peers.peers.contains_key(&other_peer_id) { + return Err(TestNetworkError::PeerNotConnected); + } + Ok(()) + } + + pub(super) async fn peer_enr(&self) -> Result { + let enr = self.peer_info().await?.enr.ok_or(TestNetworkError::PeerInfoMissingEnr)?; + // Parse the ENR + let enr = Enr::from_str(&enr).map_err(TestNetworkError::InvalidEnr)?; + Ok(enr) + } + + pub(super) async fn peer_id(&self) -> Result { + Ok(self.peer_info().await?.peer_id) + } +} diff --git a/crates/node/service/tests/actors/network/mod.rs b/crates/node/service/tests/actors/network/mod.rs new file mode 100644 index 0000000000..ec1a9156e0 --- /dev/null +++ b/crates/node/service/tests/actors/network/mod.rs @@ -0,0 +1,36 @@ +//! Integration tests for the network actor. + +use std::time::Duration; + +use backon::{ExponentialBuilder, Retryable}; + +use crate::actors::{ + network::mocks::{TestNetwork, TestNetworkError}, + utils::SEED_GENERATOR_BUILDER, +}; + +pub(super) mod mocks; + +#[tokio::test(flavor = "multi_thread")] +async fn test_p2p_network_conn() -> anyhow::Result<()> { + let mut seed_generator = SEED_GENERATOR_BUILDER.next_generator(); + + let network_1 = TestNetwork::new(vec![], &mut seed_generator); + let enr_1 = network_1.peer_enr().await?; + + let network_2 = TestNetwork::new(vec![enr_1], &mut seed_generator); + + (async || network_2.is_connected_to(&network_1).await) + .retry(ExponentialBuilder::default().with_total_delay(Some(Duration::from_secs(10)))) + // When to retry + .when(|e| matches!(e, TestNetworkError::PeerNotConnected)) + .await?; + + (async || network_1.is_connected_to(&network_2).await) + .retry(ExponentialBuilder::default().with_total_delay(Some(Duration::from_secs(10)))) + // When to retry + .when(|e| matches!(e, TestNetworkError::PeerNotConnected)) + .await?; + + Ok(()) +} diff --git a/crates/node/service/tests/actors/utils.rs b/crates/node/service/tests/actors/utils.rs new file mode 100644 index 0000000000..a76e3a2f0a --- /dev/null +++ b/crates/node/service/tests/actors/utils.rs @@ -0,0 +1,49 @@ +use std::{ + net::{IpAddr, Ipv4Addr}, + sync::{ + LazyLock, + atomic::{AtomicU64, Ordering}, + }, +}; + +use rand::{RngCore, SeedableRng}; + +pub(crate) static SEED_GENERATOR_BUILDER: LazyLock = + LazyLock::new(SeedGeneratorBuilder::new); + +pub(crate) struct SeedGeneratorBuilder(AtomicU64); + +impl SeedGeneratorBuilder { + pub(crate) const fn new() -> Self { + Self(AtomicU64::new(0)) + } + + fn next(&self) -> u64 { + self.0.fetch_add(1, Ordering::Relaxed) + } + + pub(crate) fn next_generator(&self) -> SeedGenerator { + SeedGenerator(rand::rngs::StdRng::seed_from_u64(self.next())) + } +} + +pub(crate) struct SeedGenerator(rand::rngs::StdRng); + +impl SeedGenerator { + pub(crate) fn next_loopback_address(&mut self) -> IpAddr { + let next_u64 = self.0.next_u64(); + + IpAddr::V4(Ipv4Addr::new( + 127, + next_u64 as u8, + (next_u64 >> 8) as u8, + (next_u64 >> 16) as u8, + )) + } + + pub(crate) fn next_port(&mut self) -> u16 { + let next_u32 = self.0.next_u32(); + + next_u32 as u16 + } +} diff --git a/crates/node/service/tests/integration.rs b/crates/node/service/tests/integration.rs new file mode 100644 index 0000000000..c07fb7271f --- /dev/null +++ b/crates/node/service/tests/integration.rs @@ -0,0 +1,4 @@ +//! Integration tests for the node service crate. + +/// Tests for the node actors. +mod actors;