From b7ded335ec404967cdd5c44ef8b36b7a8279032f Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 11 Mar 2023 15:35:50 +0900 Subject: [PATCH 01/17] Make build pass --- examples/find_nodes.rs | 12 +----------- src/config.rs | 11 +++++------ src/discv5.rs | 26 ++++++++++++++++++++++---- src/handler/mod.rs | 19 ++++++++++++++++--- src/ipmode.rs | 11 +++++++++++ src/service.rs | 8 ++++++-- src/socket/mod.rs | 11 +++++------ 7 files changed, 66 insertions(+), 32 deletions(-) diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index 7e01d10bf..c06173979 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -105,17 +105,7 @@ async fn main() { // let config = Discv5ConfigBuilder::new().enable_packet_filter().build(); // default configuration without packet filtering - let config = Discv5ConfigBuilder::new() - .ip_mode(match args.socket_kind { - SocketKind::Ip4 => discv5::IpMode::Ip4, - SocketKind::Ip6 => discv5::IpMode::Ip6 { - enable_mapped_addresses: false, - }, - SocketKind::Ds => discv5::IpMode::Ip6 { - enable_mapped_addresses: true, - }, - }) - .build(); + let config = Discv5ConfigBuilder::new().build(); info!("Node Id: {}", enr.node_id()); if args.enr_ip6.is_some() || args.enr_ip4.is_some() { diff --git a/src/config.rs b/src/config.rs index 9f325a172..76c43dae6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,5 @@ use crate::{ - ipmode::IpMode, kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, - RateLimiterBuilder, + kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, RateLimiterBuilder, }; ///! A set of configuration parameters to tune the discovery protocol. use std::time::Duration; @@ -304,10 +303,10 @@ impl Discv5ConfigBuilder { /// Configures the type of socket to bind to. This also affects the selection of address to use /// to contact an ENR. - pub fn ip_mode(&mut self, ip_mode: IpMode) -> &mut Self { - self.config.ip_mode = ip_mode; - self - } + // pub fn ip_mode(&mut self, ip_mode: IpMode) -> &mut Self { + // self.config.ip_mode = ip_mode; + // self + // } pub fn build(&mut self) -> Discv5Config { // If an executor is not provided, assume a current tokio runtime is running. diff --git a/src/discv5.rs b/src/discv5.rs index 46b405c0c..346afeb09 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -20,10 +20,11 @@ use crate::{ }, node_info::NodeContact, service::{QueryKind, Service, ServiceRequest, TalkRequest}, - Discv5Config, Enr, + Discv5Config, Enr, IpMode, }; use enr::{CombinedKey, EnrError, EnrKey, NodeId}; use parking_lot::RwLock; +use std::net::{Ipv4Addr, Ipv6Addr}; use std::{ future::Future, net::SocketAddr, @@ -38,6 +39,7 @@ use libp2p_core::Multiaddr; // Create lazy static variable for the global permit/ban list use crate::metrics::{Metrics, METRICS}; +use crate::socket::ListenConfig; lazy_static! { pub static ref PERMIT_BAN_LIST: RwLock = RwLock::new(crate::PermitBanList::default()); @@ -83,6 +85,10 @@ pub struct Discv5 { local_enr: Arc>, /// The key associated with the local ENR, required for updating the local ENR. enr_key: Arc>, + /// Type of socket to create. + listen_config: ListenConfig, + // Type of socket we are using + ip_mode: IpMode, } impl Discv5 { @@ -126,6 +132,16 @@ impl Discv5 { // Update the PermitBan list based on initial configuration *PERMIT_BAN_LIST.write() = config.permit_ban_list.clone(); + // FIXME + let listen_config = ListenConfig::DualStack { + ipv4: Ipv4Addr::UNSPECIFIED, + ipv4_port: 9000, + ipv6: Ipv6Addr::UNSPECIFIED, + ipv6_port: 9006, + }; + + let ip_mode = IpMode::new_from_listen_config(&listen_config); + Ok(Discv5 { config, service_channel: None, @@ -133,6 +149,8 @@ impl Discv5 { kbuckets, local_enr, enr_key, + listen_config, + ip_mode, }) } @@ -149,7 +167,7 @@ impl Discv5 { self.enr_key.clone(), self.kbuckets.clone(), self.config.clone(), - listen_socket, + self.listen_config.clone(), ) .await?; self.service_exit = Some(service_exit); @@ -178,7 +196,7 @@ impl Discv5 { /// them upfront. pub fn add_enr(&self, enr: Enr) -> Result<(), &'static str> { // only add ENR's that have a valid udp socket. - if self.config.ip_mode.get_contactable_addr(&enr).is_none() { + if self.ip_mode.get_contactable_addr(&enr).is_none() { warn!("ENR attempted to be added without an UDP socket compatible with configured IpMode has been ignored."); return Err("ENR has no compatible UDP socket to connect to"); } @@ -474,7 +492,7 @@ impl Discv5 { let (callback_send, callback_recv) = oneshot::channel(); let channel = self.clone_channel(); - let ip_mode = self.config.ip_mode; + let ip_mode = self.ip_mode; async move { let node_contact = NodeContact::try_from_enr(enr, ip_mode)?; diff --git a/src/handler/mod.rs b/src/handler/mod.rs index f17bc3fd9..7f7496243 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -40,6 +40,7 @@ use delay_map::HashMapDelay; use enr::{CombinedKey, NodeId}; use futures::prelude::*; use parking_lot::RwLock; +use std::net::{SocketAddrV4, SocketAddrV6}; use std::{ collections::HashMap, convert::TryFrom, @@ -64,6 +65,7 @@ pub use crate::node_info::{NodeAddress, NodeContact}; use crate::metrics::METRICS; use crate::lru_time_cache::LruTimeCache; +use crate::socket::ListenConfig; use active_requests::ActiveRequests; use request_call::RequestCall; use session::Session; @@ -213,8 +215,8 @@ impl Handler { pub async fn spawn( enr: Arc>, key: Arc>, - listen_socket: SocketAddr, config: Discv5Config, + listen_config: ListenConfig, ) -> Result { let (exit_sender, exit) = oneshot::channel(); // create the channels to send/receive messages from the application @@ -238,14 +240,25 @@ impl Handler { max_bans_per_ip: config.filter_max_bans_per_ip, }; + // FIXME + let listen_socket = match listen_config { + ListenConfig::Ipv4 { ip, port } => SocketAddr::V4(SocketAddrV4::new(ip, port)), + ListenConfig::Ipv6 { ip, port } => SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0)), + ListenConfig::DualStack { + ipv4, + ipv4_port, + ipv6: _ipv6, + ipv6_port: _ipv6_port, + } => SocketAddr::V4(SocketAddrV4::new(ipv4, ipv4_port)), + }; + let socket_config = socket::SocketConfig { executor: config.executor.clone().expect("Executor must exist"), - socket_addr: listen_socket, filter_config, + listen_config, local_node_id: node_id, expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, - ip_mode: config.ip_mode, }; // Attempt to bind to the socket before spinning up the send/recv tasks. diff --git a/src/ipmode.rs b/src/ipmode.rs index b70b0dbd9..afb686e0a 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -1,5 +1,8 @@ +use crate::socket::ListenConfig; use crate::Enr; +use crate::IpMode::{DualStack, Ip4, Ip6}; use std::net::SocketAddr; + ///! A set of configuration parameters to tune the discovery protocol. /// Sets the socket type to be established and also determines the type of ENRs that we will store @@ -20,6 +23,14 @@ pub enum IpMode { } impl IpMode { + pub(crate) fn new_from_listen_config(listen_config: &ListenConfig) -> Self { + match listen_config { + ListenConfig::Ipv4 { .. } => Ip4, + ListenConfig::Ipv6 { .. } => Ip6, + ListenConfig::DualStack { .. } => DualStack, + } + } + pub fn is_ipv4(&self) -> bool { self == &IpMode::Ip4 } diff --git a/src/service.rs b/src/service.rs index 4d6ca96fb..d3b87d788 100644 --- a/src/service.rs +++ b/src/service.rs @@ -152,6 +152,7 @@ pub enum ServiceRequest { } use crate::discv5::PERMIT_BAN_LIST; +use crate::socket::ListenConfig; pub struct Service { /// Configuration parameters. @@ -253,7 +254,7 @@ impl Service { enr_key: Arc>, kbuckets: Arc>>, config: Discv5Config, - listen_socket: SocketAddr, + listen_config: ListenConfig, ) -> Result<(oneshot::Sender<()>, mpsc::Sender), std::io::Error> { // process behaviour-level configuration parameters let ip_votes = if config.enr_update { @@ -265,12 +266,14 @@ impl Service { None }; + let ip_mode = IpMode::new_from_listen_config(&listen_config); + // build the session service let (handler_exit, handler_send, handler_recv) = Handler::spawn( local_enr.clone(), enr_key.clone(), - listen_socket, config.clone(), + listen_config, ) .await?; @@ -299,6 +302,7 @@ impl Service { event_stream: None, exit, config: config.clone(), + ip_mode, }; info!("Discv5 Service started"); diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 967e15673..e54a5415c 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -24,6 +24,7 @@ pub use recv::InboundPacket; pub use send::OutboundPacket; /// Configuration for the sockets to listen on. +#[derive(Clone)] pub enum ListenConfig { Ipv4 { ip: Ipv4Addr, @@ -45,8 +46,6 @@ pub enum ListenConfig { pub struct SocketConfig { /// The executor to spawn the tasks. pub executor: Box, - /// The listening socket. - pub socket_addr: SocketAddr, /// Configuration details for the packet filter. pub filter_config: FilterConfig, /// Type of socket to create. @@ -89,7 +88,7 @@ impl Socket { pub(crate) async fn new(config: SocketConfig) -> Result { let SocketConfig { executor, - socket_addr, + // socket_addr, filter_config, listen_config, ban_duration, @@ -106,11 +105,11 @@ impl Socket { ipv4_port, ipv6, ipv6_port, - } => ((ipv4, ipv4_port).into(), Some((ipv6, ipv4_port))), + } => ((ipv4, ipv4_port).into(), Some((ipv6, ipv6_port))), }; let first_socket = Socket::new_socket(&first_addr).await?; let maybe_second_socket = match maybe_second_addr { - Some(second_addr) => Some(Socket::new_socket(&socket_addr).await?), + Some(second_addr) => Some(Socket::new_socket(&second_addr.into()).await?), None => None, }; @@ -134,7 +133,7 @@ impl Socket { let (recv, recv_exit) = RecvHandler::spawn(recv_config); // spawn the sender handler - let (send, sender_exit) = SendHandler::spawn(executor, send_udp); + let (send, sender_exit) = SendHandler::spawn(executor, send_udp, second_send); Ok(Socket { send, From 528a98c5f78a5aaf0335534c2eda437819d98fe5 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 11 Mar 2023 16:13:10 +0900 Subject: [PATCH 02/17] Make Discv5::new requires ListenConfig and update `find_node` example --- examples/find_nodes.rs | 46 +++++++++++++++++++++++++++++++++--------- src/discv5.rs | 10 +-------- 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index c06173979..10ca924e3 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -22,10 +22,11 @@ use discv5::{ Discv5, Discv5ConfigBuilder, Discv5Event, }; use std::{ - net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + net::{Ipv4Addr, Ipv6Addr}, time::Duration, }; use tracing::{info, warn}; +use discv5::socket::ListenConfig; #[derive(Parser)] struct FindNodesArgs { @@ -43,6 +44,10 @@ struct FindNodesArgs { /// randomly. #[clap(long)] port: Option, + /// Port to bind for ipv6. If none is provided, a random one in the 9000 - 9999 range will be picked + /// randomly. + #[clap(long)] + port6: Option, /// Use a default test key. #[clap(long)] use_test_key: bool, @@ -67,6 +72,16 @@ async fn main() { let port = args .port .unwrap_or_else(|| (rand::random::() % 1000) + 9000); + let port6 = args + .port + .unwrap_or_else(|| { + loop { + let port6 = (rand::random::() % 1000) + 9000; + if port6 != port { + return port6; + } + } + }); let enr_key = if args.use_test_key { // A fixed key for testing @@ -93,9 +108,9 @@ async fn main() { if let Some(ip6) = args.enr_ip6 { // if the given address is the UNSPECIFIED address we want to advertise localhost if ip6.is_unspecified() { - builder.ip6(Ipv6Addr::LOCALHOST).udp6(port); + builder.ip6(Ipv6Addr::LOCALHOST).udp6(port6); } else { - builder.ip6(ip6).udp6(port); + builder.ip6(ip6).udp6(port6); } } builder.build(&enr_key).unwrap() @@ -118,14 +133,25 @@ async fn main() { ); } // the address to listen on. - let bind_addr = match args.socket_kind { - SocketKind::Ip4 => Ipv4Addr::UNSPECIFIED.into(), - SocketKind::Ip6 | SocketKind::Ds => Ipv6Addr::UNSPECIFIED.into(), + let listen_config = match args.socket_kind { + SocketKind::Ip4 => ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port, + }, + SocketKind::Ip6 => ListenConfig::Ipv6 { + ip: Ipv6Addr::UNSPECIFIED, + port: port6, + }, + SocketKind::Ds => ListenConfig::DualStack { + ipv4: Ipv4Addr::UNSPECIFIED, + ipv4_port: port, + ipv6: Ipv6Addr::UNSPECIFIED, + ipv6_port: port6, + } }; - let socket_addr = SocketAddr::new(bind_addr, port); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // if we know of another peer's ENR, add it known peers for enr in args.remote_peer { @@ -144,12 +170,12 @@ async fn main() { } // start the discv5 service - discv5.start(socket_addr).await.unwrap(); + discv5.start().await.unwrap(); let mut event_stream = discv5.event_stream().await.unwrap(); let check_evs = args.events; // construct a 30 second interval to search for new peers. - let mut query_interval = tokio::time::interval(Duration::from_secs(30)); + let mut query_interval = tokio::time::interval(Duration::from_secs(1)); loop { tokio::select! { diff --git a/src/discv5.rs b/src/discv5.rs index 346afeb09..415dc3882 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -24,7 +24,6 @@ use crate::{ }; use enr::{CombinedKey, EnrError, EnrKey, NodeId}; use parking_lot::RwLock; -use std::net::{Ipv4Addr, Ipv6Addr}; use std::{ future::Future, net::SocketAddr, @@ -96,6 +95,7 @@ impl Discv5 { local_enr: Enr, enr_key: CombinedKey, mut config: Discv5Config, + listen_config: ListenConfig, ) -> Result { // ensure the keypair matches the one that signed the enr. if local_enr.public_key() != enr_key.public() { @@ -132,14 +132,6 @@ impl Discv5 { // Update the PermitBan list based on initial configuration *PERMIT_BAN_LIST.write() = config.permit_ban_list.clone(); - // FIXME - let listen_config = ListenConfig::DualStack { - ipv4: Ipv4Addr::UNSPECIFIED, - ipv4_port: 9000, - ipv6: Ipv6Addr::UNSPECIFIED, - ipv6_port: 9006, - }; - let ip_mode = IpMode::new_from_listen_config(&listen_config); Ok(Discv5 { From 7d3b28134268822e1d4a0df88102f7aa633e44a6 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 11 Mar 2023 16:17:28 +0900 Subject: [PATCH 03/17] Remove useless comments --- src/config.rs | 7 ------- src/socket/mod.rs | 1 - 2 files changed, 8 deletions(-) diff --git a/src/config.rs b/src/config.rs index 76c43dae6..63aed0265 100644 --- a/src/config.rs +++ b/src/config.rs @@ -301,13 +301,6 @@ impl Discv5ConfigBuilder { self } - /// Configures the type of socket to bind to. This also affects the selection of address to use - /// to contact an ENR. - // pub fn ip_mode(&mut self, ip_mode: IpMode) -> &mut Self { - // self.config.ip_mode = ip_mode; - // self - // } - pub fn build(&mut self) -> Discv5Config { // If an executor is not provided, assume a current tokio runtime is running. if self.config.executor.is_none() { diff --git a/src/socket/mod.rs b/src/socket/mod.rs index e54a5415c..4d2e948db 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -88,7 +88,6 @@ impl Socket { pub(crate) async fn new(config: SocketConfig) -> Result { let SocketConfig { executor, - // socket_addr, filter_config, listen_config, ban_duration, From e087d6a57ea2af43af753c3be5149cd4225c9a11 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 11 Mar 2023 16:19:25 +0900 Subject: [PATCH 04/17] Remove unnecessary qualification --- src/ipmode.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/ipmode.rs b/src/ipmode.rs index afb686e0a..320004494 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -32,7 +32,7 @@ impl IpMode { } pub fn is_ipv4(&self) -> bool { - self == &IpMode::Ip4 + self == &Ip4 } /// Get the contactable Socket address of an Enr under current configuration. When running in @@ -56,9 +56,9 @@ impl IpMode { } match self { - IpMode::Ip4 => enr.udp4_socket().map(SocketAddr::V4), - IpMode::Ip6 => canonical_ipv6_enr_addr(enr).map(SocketAddr::V6), - IpMode::DualStack => { + Ip4 => enr.udp4_socket().map(SocketAddr::V4), + Ip6 => canonical_ipv6_enr_addr(enr).map(SocketAddr::V6), + DualStack => { canonical_ipv6_enr_addr(enr) .map(SocketAddr::V6) // NOTE: general consensus is that ipv6 addresses should be preferred. @@ -91,7 +91,7 @@ mod tests { name, enr_ip4: None, enr_ip6: None, - ip_mode: IpMode::Ip4, + ip_mode: Ip4, expected_socket_addr: None, } } @@ -150,15 +150,15 @@ mod tests { fn empty_enr_no_contactable_address() { // Empty ENR TestCase::new("Empty enr is non contactable by ip4 node") - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .test(); TestCase::new("Empty enr is not contactable by ip6 only node") - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .test(); TestCase::new("Empty enr is not contactable by dual stack node") - .ip_mode(IpMode::DualStack) + .ip_mode(DualStack) .test(); } @@ -167,18 +167,18 @@ mod tests { // Ip4 only ENR TestCase::new("Ipv4 only enr is contactable by ip4 node") .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .expect_ip4(Ipv4Addr::LOCALHOST) .test(); TestCase::new("Ipv4 only enr is not contactable by ip6 only node") .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .test(); TestCase::new("Ipv4 only enr is contactable by dual stack node") .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::DualStack) + .ip_mode(DualStack) .expect_ip4(Ipv4Addr::LOCALHOST) .test(); } @@ -188,18 +188,18 @@ mod tests { // Ip4 only ENR TestCase::new("Ipv6 only enr is not contactable by ip4 node") .enr_ip6(Ipv6Addr::LOCALHOST) - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .test(); TestCase::new("Ipv6 only enr is contactable by ip6 only node") .enr_ip6(Ipv6Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); TestCase::new("Ipv6 only enr is contactable by dual stack node") .enr_ip6(Ipv6Addr::LOCALHOST) - .ip_mode(IpMode::DualStack) + .ip_mode(DualStack) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); } @@ -210,21 +210,21 @@ mod tests { TestCase::new("Dual stack enr is contactable by ip4 node") .enr_ip6(Ipv6Addr::LOCALHOST) .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .expect_ip4(Ipv4Addr::LOCALHOST) .test(); TestCase::new("Dual stack enr is contactable by ip6 only node") .enr_ip6(Ipv6Addr::LOCALHOST) .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); TestCase::new("Dual stack enr is contactable by dual stack node") .enr_ip6(Ipv6Addr::LOCALHOST) .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); } From f63ac1d0233edd4ad9aa4fe8c2d6a85fe994e873 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 15 Mar 2023 21:58:25 +0900 Subject: [PATCH 05/17] Make tests pass --- examples/custom_executor.rs | 12 ++++++++---- examples/find_nodes.rs | 20 ++++++++------------ examples/request_enr.rs | 14 +++++++++----- examples/simple_server.rs | 12 ++++++++---- src/discv5/test.rs | 32 ++++++++++++++++++++++++-------- src/handler/tests.rs | 20 ++++++++++++++++---- src/service/test.rs | 14 +++++++++----- 7 files changed, 82 insertions(+), 42 deletions(-) diff --git a/examples/custom_executor.rs b/examples/custom_executor.rs index 9ec3baaa4..8492e974c 100644 --- a/examples/custom_executor.rs +++ b/examples/custom_executor.rs @@ -9,8 +9,9 @@ //! $ cargo run --example custom_executor //! ``` +use discv5::socket::ListenConfig; use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder, Discv5Event}; -use std::net::SocketAddr; +use std::net::Ipv4Addr; fn main() { // allows detailed logging with the RUST_LOG env variable @@ -22,7 +23,10 @@ fn main() { .try_init(); // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; let enr_key = CombinedKey::generate_secp256k1(); // construct a local ENR @@ -39,7 +43,7 @@ fn main() { let config = Discv5ConfigBuilder::new().build(); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // if we know of another peer's ENR, add it known peers if let Some(base64_enr) = std::env::args().nth(1) { @@ -61,7 +65,7 @@ fn main() { runtime.block_on(async { // start the discv5 service - discv5.start(listen_addr).await.unwrap(); + discv5.start().await.unwrap(); println!("Server started"); // get an event stream diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index 10ca924e3..79f457590 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -16,6 +16,7 @@ //! For a simple CLI discovery service see [discv5-cli](https://github.com/AgeManning/discv5-cli) use clap::Parser; +use discv5::socket::ListenConfig; use discv5::{ enr, enr::{k256, CombinedKey}, @@ -26,7 +27,6 @@ use std::{ time::Duration, }; use tracing::{info, warn}; -use discv5::socket::ListenConfig; #[derive(Parser)] struct FindNodesArgs { @@ -72,16 +72,12 @@ async fn main() { let port = args .port .unwrap_or_else(|| (rand::random::() % 1000) + 9000); - let port6 = args - .port - .unwrap_or_else(|| { - loop { - let port6 = (rand::random::() % 1000) + 9000; - if port6 != port { - return port6; - } - } - }); + let port6 = args.port.unwrap_or_else(|| loop { + let port6 = (rand::random::() % 1000) + 9000; + if port6 != port { + return port6; + } + }); let enr_key = if args.use_test_key { // A fixed key for testing @@ -147,7 +143,7 @@ async fn main() { ipv4_port: port, ipv6: Ipv6Addr::UNSPECIFIED, ipv6_port: port6, - } + }, }; // construct the discv5 server diff --git a/examples/request_enr.rs b/examples/request_enr.rs index 7c4ac820c..3ff6456c7 100644 --- a/examples/request_enr.rs +++ b/examples/request_enr.rs @@ -13,9 +13,10 @@ //! //! This requires the "libp2p" feature. #[cfg(feature = "libp2p")] -use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config}; +use discv5::socket::ListenConfig; #[cfg(feature = "libp2p")] -use std::net::SocketAddr; +use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config}; +use std::net::Ipv4Addr; #[cfg(not(feature = "libp2p"))] fn main() {} @@ -31,7 +32,10 @@ async fn main() { .try_init(); // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; // generate a new enr key let enr_key = CombinedKey::generate_secp256k1(); @@ -46,10 +50,10 @@ async fn main() { .expect("A multiaddr must be supplied"); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // start the discv5 service - discv5.start(listen_addr).await.unwrap(); + discv5.start().await.unwrap(); // search for the ENR match discv5.request_enr(multiaddr).await { diff --git a/examples/simple_server.rs b/examples/simple_server.rs index 41652b5c7..a682ad535 100644 --- a/examples/simple_server.rs +++ b/examples/simple_server.rs @@ -10,8 +10,9 @@ //! $ cargo run --example simple_server -- //! ``` +use discv5::socket::ListenConfig; use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config, Discv5Event}; -use std::net::{Ipv4Addr, SocketAddr}; +use std::net::Ipv4Addr; #[tokio::main] async fn main() { @@ -37,7 +38,10 @@ async fn main() { }; // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; let enr_key = CombinedKey::generate_secp256k1(); @@ -72,7 +76,7 @@ async fn main() { let config = Discv5Config::default(); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // if we know of another peer's ENR, add it known peers if let Some(base64_enr) = std::env::args().nth(3) { @@ -93,7 +97,7 @@ async fn main() { } // start the discv5 service - discv5.start(listen_addr).await.unwrap(); + discv5.start().await.unwrap(); println!("Server started"); // get an event stream diff --git a/src/discv5/test.rs b/src/discv5/test.rs index c5a432d16..c6f845401 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -1,5 +1,6 @@ #![cfg(test)] +use crate::socket::ListenConfig; use crate::{Discv5, *}; use enr::{k256, CombinedKey, Enr, EnrBuilder, EnrKey, NodeId}; use rand_core::{RngCore, SeedableRng}; @@ -23,6 +24,10 @@ async fn build_nodes(n: usize, base_port: u16) -> Vec { for port in base_port..base_port + n as u16 { let enr_key = CombinedKey::generate_secp256k1(); let config = Discv5Config::default(); + let listen_config = ListenConfig::Ipv4 { + ip: ip.clone(), + port, + }; let enr = EnrBuilder::new("v4") .ip4(ip) @@ -30,9 +35,8 @@ async fn build_nodes(n: usize, base_port: u16) -> Vec { .build(&enr_key) .unwrap(); // transport for building a swarm - let socket_addr = enr.udp4_socket().unwrap(); - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); - discv5.start(socket_addr.into()).await.unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); nodes.push(discv5); } nodes @@ -47,6 +51,10 @@ async fn build_nodes_from_keypairs(keys: Vec, base_port: u16) -> Ve let port = base_port + i as u16; let config = Discv5ConfigBuilder::new().build(); + let listen_config = ListenConfig::Ipv4 { + ip: ip.clone(), + port, + }; let enr = EnrBuilder::new("v4") .ip4(ip) @@ -54,9 +62,8 @@ async fn build_nodes_from_keypairs(keys: Vec, base_port: u16) -> Ve .build(&enr_key) .unwrap(); - let socket_addr = enr.udp4_socket().unwrap(); - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); - discv5.start(socket_addr.into()).await.unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); nodes.push(discv5); } nodes @@ -550,9 +557,13 @@ async fn test_table_limits() { .udp4(9050) .build(&enr_key) .unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: enr.ip4().unwrap(), + port: enr.udp4().unwrap(), + }; // let socket_addr = enr.udp_socket().unwrap(); - let discv5: Discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let discv5: Discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); let table_limit: usize = 10; // Generate `table_limit + 2` nodes in the same subnet. let enrs: Vec> = (1..=table_limit + 1) @@ -615,7 +626,12 @@ async fn test_bucket_limits() { .collect(); let config = Discv5ConfigBuilder::new().ip_limit().build(); - let discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: enr.ip4().unwrap(), + port: enr.udp4().unwrap(), + }; + + let discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); for enr in enrs { let _ = discv5.add_enr(enr.clone()); // we expect some of these to fail based on the filter. } diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 5ed889984..d3fe3af89 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -50,8 +50,11 @@ async fn simple_session_message() { let (_exit_send, sender_send, _sender_recv) = Handler::spawn( arc_rw!(sender_enr.clone()), arc_rw!(key1), - sender_enr.udp4_socket().unwrap().into(), config.clone(), + ListenConfig::Ipv4 { + ip: sender_enr.ip4().unwrap(), + port: sender_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -59,8 +62,11 @@ async fn simple_session_message() { let (_exit_recv, recv_send, mut receiver_recv) = Handler::spawn( arc_rw!(receiver_enr.clone()), arc_rw!(key2), - receiver_enr.udp4_socket().unwrap().into(), config, + ListenConfig::Ipv4 { + ip: receiver_enr.ip4().unwrap(), + port: receiver_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -126,8 +132,11 @@ async fn multiple_messages() { let (_exit_send, sender_handler, mut sender_handler_recv) = Handler::spawn( arc_rw!(sender_enr.clone()), arc_rw!(key1), - sender_enr.udp4_socket().unwrap().into(), config.clone(), + ListenConfig::Ipv4 { + ip: sender_enr.ip4().unwrap(), + port: sender_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -135,8 +144,11 @@ async fn multiple_messages() { let (_exit_recv, recv_send, mut receiver_handler) = Handler::spawn( arc_rw!(receiver_enr.clone()), arc_rw!(key2), - receiver_enr.udp4_socket().unwrap().into(), config, + ListenConfig::Ipv4 { + ip: receiver_enr.ip4().unwrap(), + port: receiver_enr.udp4().unwrap(), + }, ) .await .unwrap(); diff --git a/src/service/test.rs b/src/service/test.rs index 160b483d8..6519e9821 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -14,7 +14,7 @@ use crate::{ }; use enr::{CombinedKey, EnrBuilder}; use parking_lot::RwLock; -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{mpsc, oneshot}; fn _connected_state() -> NodeStatus { @@ -40,7 +40,7 @@ fn init() { async fn build_service( local_enr: Arc>, enr_key: Arc>, - listen_socket: SocketAddr, + listen_config: ListenConfig, filters: bool, ) -> Service { let config = Discv5ConfigBuilder::new() @@ -50,8 +50,8 @@ async fn build_service( let (_handler_exit, handler_send, handler_recv) = Handler::spawn( local_enr.clone(), enr_key.clone(), - listen_socket, config.clone(), + listen_config, ) .await .unwrap(); @@ -93,6 +93,7 @@ async fn build_service( event_stream: None, exit, config, + ip_mode: Default::default(), } } @@ -114,12 +115,15 @@ async fn test_updating_connection_on_ping() { .build(&enr_key2) .unwrap(); - let socket_addr = enr.udp4_socket().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: enr.ip4().unwrap(), + port: enr.udp4().unwrap(), + }; let mut service = build_service( Arc::new(RwLock::new(enr)), Arc::new(RwLock::new(enr_key1)), - socket_addr.into(), + listen_config, false, ) .await; From 845b5322d9f5e4b67f6d4f6617e62a0b5c626bce Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 15 Mar 2023 23:16:35 +0900 Subject: [PATCH 06/17] Make SendHandler uses two sockets --- src/socket/mod.rs | 47 +++++++++++++++++++++++++++------------------- src/socket/recv.rs | 2 +- src/socket/send.rs | 44 +++++++++++++++++++++++++++++++++++++------ 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 4d2e948db..490e9114d 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -10,6 +10,7 @@ use std::{ sync::Arc, time::Duration, }; +use tokio::net::UdpSocket; use tokio::sync::{mpsc, oneshot}; mod filter; @@ -95,35 +96,43 @@ impl Socket { local_node_id, } = config; - // For now intentionally forgettig which socket is the ipv4 and which is the ipv6 one. - let (first_addr, maybe_second_addr): (SocketAddr, Option<_>) = match listen_config { - ListenConfig::Ipv4 { ip, port } => ((ip, port).into(), None), - ListenConfig::Ipv6 { ip, port } => ((ip, port).into(), None), + // For recv socket, intentionally forgetting which socket is the ipv4 and which is the ipv6 one. + let (first_recv, second_recv, send_ipv4, send_ipv6): ( + Arc, + Option<_>, + Option<_>, + Option<_>, + ) = match listen_config { + ListenConfig::Ipv4 { ip, port } => { + let ipv4_socket = Arc::new(Socket::new_socket(&(ip, port).into()).await?); + (ipv4_socket.clone(), None, Some(ipv4_socket), None) + } + ListenConfig::Ipv6 { ip, port } => { + let ipv6_socket = Arc::new(Socket::new_socket(&(ip, port).into()).await?); + (ipv6_socket.clone(), None, None, Some(ipv6_socket)) + } ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port, - } => ((ipv4, ipv4_port).into(), Some((ipv6, ipv6_port))), - }; - let first_socket = Socket::new_socket(&first_addr).await?; - let maybe_second_socket = match maybe_second_addr { - Some(second_addr) => Some(Socket::new_socket(&second_addr.into()).await?), - None => None, + } => { + let ipv4_socket = Arc::new(Socket::new_socket(&(ipv4, ipv4_port).into()).await?); + let ipv6_socket = Arc::new(Socket::new_socket(&(ipv6, ipv6_port).into()).await?); + ( + ipv4_socket.clone(), + Some(ipv6_socket.clone()), + Some(ipv4_socket), + Some(ipv6_socket), + ) + } }; - // Arc the udp socket for the send/recv tasks. - let recv_udp = Arc::new(first_socket); - let send_udp = recv_udp.clone(); - - let second_recv = maybe_second_socket.map(Arc::new); - let second_send = second_recv.clone(); - // spawn the recv handler let recv_config = RecvHandlerConfig { filter_config, executor: executor.clone(), - recv: recv_udp, + recv: first_recv, second_recv, local_node_id, expected_responses, @@ -132,7 +141,7 @@ impl Socket { let (recv, recv_exit) = RecvHandler::spawn(recv_config); // spawn the sender handler - let (send, sender_exit) = SendHandler::spawn(executor, send_udp, second_send); + let (send, sender_exit) = SendHandler::spawn(executor, send_ipv4, send_ipv6); Ok(Socket { send, diff --git a/src/socket/recv.rs b/src/socket/recv.rs index c18aff3af..d6322fc9e 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -48,7 +48,7 @@ pub struct RecvHandlerConfig { pub(crate) struct RecvHandler { /// The UDP recv socket. recv: Arc, - /// An option second UDO socket. Used when dialing over both Ipv4 and Ipv6. + /// An option second UDP socket. Used when dialing over both Ipv4 and Ipv6. second_recv: Option>, /// Simple hack to alternate reading from the first or the second socket. /// The list of waiting responses. These are used to allow incoming packets from sources diff --git a/src/socket/send.rs b/src/socket/send.rs index 912699bde..245f4daac 100644 --- a/src/socket/send.rs +++ b/src/socket/send.rs @@ -1,5 +1,6 @@ //! This is a standalone task that encodes and sends Discv5 UDP packets use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; +use std::net::SocketAddr; use std::sync::Arc; use tokio::{ net::UdpSocket, @@ -16,8 +17,10 @@ pub struct OutboundPacket { /// The main task that handles outbound UDP packets. pub(crate) struct SendHandler { - /// The UDP send socket. - send: Arc, + /// The UDP send socket for IPv4. + send_ipv4: Option>, + /// The UDP send socket for IPv6. + send_ipv6: Option>, /// The channel to respond to send requests. handler_recv: mpsc::Receiver, /// Exit channel to shutdown the handler. @@ -30,14 +33,15 @@ impl SendHandler { /// shutdown the handler. pub(crate) fn spawn( executor: Box, - send: Arc, - second_send: Option>, + send_ipv4: Option>, + send_ipv6: Option>, ) -> (mpsc::Sender, oneshot::Sender<()>) { let (exit_send, exit) = oneshot::channel(); let (handler_send, handler_recv) = mpsc::channel(30); let mut send_handler = SendHandler { - send, + send_ipv4, + send_ipv6, handler_recv, exit, }; @@ -59,7 +63,7 @@ impl SendHandler { if encoded_packet.len() > MAX_PACKET_SIZE { warn!("Sending packet larger than max size: {} max: {}", encoded_packet.len(), MAX_PACKET_SIZE); } - if let Err(e) = self.send.send_to(&encoded_packet, &packet.node_address.socket_addr).await { + if let Err(e) = self.send(&encoded_packet, &packet.node_address.socket_addr).await { trace!("Could not send packet. Error: {:?}", e); } else { METRICS.add_sent_bytes(encoded_packet.len()); @@ -72,4 +76,32 @@ impl SendHandler { } } } + + async fn send( + &self, + encoded_packet: &Vec, + socket_addr: &SocketAddr, + ) -> Result { + let socket = match socket_addr { + SocketAddr::V4(_) => { + if let Some(socket) = self.send_ipv4.as_ref() { + socket + } else { + return Err("No IPv4 socket.".to_string()); + } + } + SocketAddr::V6(_) => { + if let Some(socket) = self.send_ipv6.as_ref() { + socket + } else { + return Err("No IPv6 socket.".to_string()); + } + } + }; + + socket + .send_to(encoded_packet, socket_addr) + .await + .map_err(|e| e.to_string()) + } } From d149eb25df680037c49c2d54bb64053527ab2e54 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 17 Mar 2023 09:41:28 +0900 Subject: [PATCH 07/17] listen_socket holds two sockets --- src/handler/mod.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 7f7496243..6b284ba57 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -40,6 +40,7 @@ use delay_map::HashMapDelay; use enr::{CombinedKey, NodeId}; use futures::prelude::*; use parking_lot::RwLock; +use smallvec::SmallVec; use std::net::{SocketAddrV4, SocketAddrV6}; use std::{ collections::HashMap, @@ -196,8 +197,8 @@ pub struct Handler { service_recv: mpsc::UnboundedReceiver, /// The channel to send messages to the application layer. service_send: mpsc::Sender, - /// The listening socket to filter out any attempted requests to self. - listen_socket: SocketAddr, + /// The listening sockets to filter out any attempted requests to self. + listen_sockets: SmallVec<[SocketAddr; 2]>, /// The discovery v5 UDP socket tasks. socket: Socket, /// Exit channel to shutdown the handler. @@ -232,7 +233,6 @@ impl Handler { let node_id = enr.read().node_id(); // enable the packet filter if required - let filter_config = FilterConfig { enabled: config.enable_packet_filter, rate_limiter: config.filter_rate_limiter.clone(), @@ -240,16 +240,23 @@ impl Handler { max_bans_per_ip: config.filter_max_bans_per_ip, }; - // FIXME - let listen_socket = match listen_config { - ListenConfig::Ipv4 { ip, port } => SocketAddr::V4(SocketAddrV4::new(ip, port)), - ListenConfig::Ipv6 { ip, port } => SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0)), + let mut listen_sockets = SmallVec::default(); + match listen_config { + ListenConfig::Ipv4 { ip, port } => { + listen_sockets.push(SocketAddr::V4(SocketAddrV4::new(ip, port))) + } + ListenConfig::Ipv6 { ip, port } => { + listen_sockets.push(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))) + } ListenConfig::DualStack { ipv4, ipv4_port, - ipv6: _ipv6, - ipv6_port: _ipv6_port, - } => SocketAddr::V4(SocketAddrV4::new(ipv4, ipv4_port)), + ipv6, + ipv6_port, + } => { + listen_sockets.push(SocketAddr::V4(SocketAddrV4::new(ipv4, ipv4_port))); + listen_sockets.push(SocketAddr::V6(SocketAddrV6::new(ipv6, ipv6_port, 0, 0))); + } }; let socket_config = socket::SocketConfig { @@ -284,7 +291,7 @@ impl Handler { active_challenges: HashMapDelay::new(config.request_timeout), service_recv, service_send, - listen_socket, + listen_sockets, socket, exit, }; @@ -444,7 +451,7 @@ impl Handler { ) -> Result<(), RequestError> { let node_address = contact.node_address(); - if node_address.socket_addr == self.listen_socket { + if self.listen_sockets.contains(&node_address.socket_addr) { debug!("Filtered request to self"); return Err(RequestError::SelfRequest); } From 86358327706f6f22ce1a69467a69cd662b6d1a03 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 18 Mar 2023 13:10:22 +0900 Subject: [PATCH 08/17] Add a test for the case of self-request --- src/handler/tests.rs | 63 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index d3fe3af89..7080febc0 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -1,10 +1,14 @@ #![cfg(test)] + use super::*; use crate::{ rpc::{Request, Response}, - Discv5ConfigBuilder, + Discv5ConfigBuilder, IpMode, }; +use std::net::{Ipv4Addr, Ipv6Addr}; +use crate::handler::HandlerOut::RequestFailed; +use crate::RequestError::SelfRequest; use active_requests::ActiveRequests; use enr::EnrBuilder; use std::time::Duration; @@ -263,3 +267,60 @@ async fn test_active_requests_insert() { active_requests.remove_by_nonce(&nonce); active_requests.check_invariant(); } + +#[tokio::test] +async fn test_self_request() { + init(); + + let key = CombinedKey::generate_secp256k1(); + let config = Discv5ConfigBuilder::new().enable_packet_filter().build(); + let enr = EnrBuilder::new("v4") + .ip4(Ipv4Addr::LOCALHOST) + .udp4(5000) + .ip6(Ipv6Addr::LOCALHOST) + .udp6(5001) + .build(&key) + .unwrap(); + + let (_exit_send, send, mut recv) = Handler::spawn( + arc_rw!(enr.clone()), + arc_rw!(key), + config, + ListenConfig::DualStack { + ipv4: enr.ip4().unwrap(), + ipv4_port: enr.udp4().unwrap(), + ipv6: enr.ip6().unwrap(), + ipv6_port: enr.udp6().unwrap(), + }, + ) + .await + .unwrap(); + + // self request (IPv4) + let _ = send.send(HandlerIn::Request( + NodeContact::try_from_enr(enr.clone(), IpMode::Ip4).unwrap(), + Box::new(Request { + id: RequestId(vec![1]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + let handler_out = recv.recv().await; + assert_eq!( + Some(RequestFailed(RequestId(vec![1]), SelfRequest)), + handler_out + ); + + // self request (IPv6) + let _ = send.send(HandlerIn::Request( + NodeContact::try_from_enr(enr, IpMode::Ip6).unwrap(), + Box::new(Request { + id: RequestId(vec![2]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + let handler_out = recv.recv().await; + assert_eq!( + Some(RequestFailed(RequestId(vec![2]), SelfRequest)), + handler_out + ); +} From 9c3454abeee39421093a2970ec7fd10d6f5eae9e Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 18 Mar 2023 15:41:48 +0900 Subject: [PATCH 09/17] Add tests for running a simple query test --- src/discv5/test.rs | 130 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 3 deletions(-) diff --git a/src/discv5/test.rs b/src/discv5/test.rs index c6f845401..7d59451b1 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -4,6 +4,7 @@ use crate::socket::ListenConfig; use crate::{Discv5, *}; use enr::{k256, CombinedKey, Enr, EnrBuilder, EnrKey, NodeId}; use rand_core::{RngCore, SeedableRng}; +use std::net::Ipv6Addr; use std::{collections::HashMap, net::Ipv4Addr}; fn init() { @@ -69,6 +70,64 @@ async fn build_nodes_from_keypairs(keys: Vec, base_port: u16) -> Ve nodes } +async fn build_nodes_from_keypairs_ipv6(keys: Vec, base_port: u16) -> Vec { + let mut nodes = Vec::new(); + + for (i, enr_key) in keys.into_iter().enumerate() { + let port = base_port + i as u16; + + let config = Discv5ConfigBuilder::new().build(); + let listen_config = ListenConfig::Ipv6 { + ip: Ipv6Addr::LOCALHOST, + port, + }; + + let enr = EnrBuilder::new("v4") + .ip6(Ipv6Addr::LOCALHOST) + .udp6(port) + .build(&enr_key) + .unwrap(); + + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); + nodes.push(discv5); + } + nodes +} + +async fn build_nodes_from_keypairs_dual_stack( + keys: Vec, + base_port: u16, +) -> Vec { + let mut nodes = Vec::new(); + + for (i, enr_key) in keys.into_iter().enumerate() { + let ipv4_port = base_port + i as u16; + let ipv6_port = ipv4_port + 1000; + + let config = Discv5ConfigBuilder::new().build(); + let listen_config = ListenConfig::DualStack { + ipv4: Ipv4Addr::LOCALHOST, + ipv4_port, + ipv6: Ipv6Addr::LOCALHOST, + ipv6_port, + }; + + let enr = EnrBuilder::new("v4") + .ip4(Ipv4Addr::LOCALHOST) + .udp4(ipv4_port) + .ip6(Ipv6Addr::LOCALHOST) + .udp6(ipv6_port) + .build(&enr_key) + .unwrap(); + + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); + nodes.push(discv5); + } + nodes +} + /// Generate `n` deterministic keypairs from a given seed. fn generate_deterministic_keypair(n: usize, seed: u64) -> Vec { let mut keypairs = Vec::new(); @@ -259,16 +318,81 @@ fn find_seed_linear_topology() { } } -/// This is a smaller version of the star topology test designed to debug issues with queries. +/// Test for running a simple query test for a topology consisting of IPv4 nodes. +#[tokio::test] +async fn test_discovery_three_peers_ipv4() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + // IPv4 + let nodes = build_nodes_from_keypairs(keypairs, 11200).await; + + test_discovery_three_peers(nodes, total_nodes).await; +} + +/// Test for running a simple query test for a topology consisting of IPv6 nodes. +#[tokio::test] +async fn test_discovery_three_peers_ipv6() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + // IPv6 + let nodes = build_nodes_from_keypairs_ipv6(keypairs, 11200).await; + + test_discovery_three_peers(nodes, total_nodes).await; +} + +/// Test for running a simple query test for a topology consisting of dual stack nodes. #[tokio::test] -async fn test_discovery_three_peers() { +async fn test_discovery_three_peers_dual_stack() { init(); let total_nodes = 3; // Seed is chosen such that all nodes are in the 256th bucket of bootstrap let seed = 1652; // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); - let mut nodes = build_nodes_from_keypairs(keypairs, 11200).await; + // DualStack + let nodes = build_nodes_from_keypairs_dual_stack(keypairs, 11200).await; + + test_discovery_three_peers(nodes, total_nodes).await; +} + +/// Test for running a simple query test for a mixed topology of IPv4, IPv6 and dual stack nodes. +#[tokio::test] +async fn test_discovery_three_peers_mixed() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let mut keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + + let mut nodes = vec![]; + // Bootstrap node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12000).await); + // A node to run query (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12010).await); + // IPv4 node + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 12020).await); + // IPv6 node + nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 12030).await); + // Target node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12040).await); + + assert!(keypairs.is_empty()); + assert_eq!(5, nodes.len()); + test_discovery_three_peers(nodes, total_nodes).await; +} + +/// This is a smaller version of the star topology test designed to debug issues with queries. +async fn test_discovery_three_peers(mut nodes: Vec, total_nodes: usize) { + init(); // Last node is bootstrap node in a star topology let bootstrap_node = nodes.remove(0); // target_node is not polled. From 1d41759b467e0c46987c6485baecf4438596a042 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Sat, 18 Mar 2023 16:14:24 +0900 Subject: [PATCH 10/17] Add tests for non-contactable nodes --- src/discv5/test.rs | 57 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/src/discv5/test.rs b/src/discv5/test.rs index 7d59451b1..23863da2c 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -330,7 +330,10 @@ async fn test_discovery_three_peers_ipv4() { // IPv4 let nodes = build_nodes_from_keypairs(keypairs, 11200).await; - test_discovery_three_peers(nodes, total_nodes).await; + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); } /// Test for running a simple query test for a topology consisting of IPv6 nodes. @@ -345,7 +348,10 @@ async fn test_discovery_three_peers_ipv6() { // IPv6 let nodes = build_nodes_from_keypairs_ipv6(keypairs, 11200).await; - test_discovery_three_peers(nodes, total_nodes).await; + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); } /// Test for running a simple query test for a topology consisting of dual stack nodes. @@ -360,10 +366,14 @@ async fn test_discovery_three_peers_dual_stack() { // DualStack let nodes = build_nodes_from_keypairs_dual_stack(keypairs, 11200).await; - test_discovery_three_peers(nodes, total_nodes).await; + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); } /// Test for running a simple query test for a mixed topology of IPv4, IPv6 and dual stack nodes. +/// The node to run the query is DualStack. #[tokio::test] async fn test_discovery_three_peers_mixed() { init(); @@ -387,11 +397,46 @@ async fn test_discovery_three_peers_mixed() { assert!(keypairs.is_empty()); assert_eq!(5, nodes.len()); - test_discovery_three_peers(nodes, total_nodes).await; + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); +} + +/// Test for running a simple query test for a mixed topology of IPv4, IPv6 and dual stack nodes. +/// The node to run the query is IPv4. +#[tokio::test] +async fn test_discovery_three_peers_mixed_query_from_ipv4() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let mut keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + + let mut nodes = vec![]; + // Bootstrap node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12000).await); + // A node to run query (** IPv4 **) + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 12010).await); + // IPv4 node + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 12020).await); + // IPv6 node + nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 12030).await); + // Target node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12040).await); + + assert!(keypairs.is_empty()); + assert_eq!(5, nodes.len()); + + // `2` is expected here since the node that runs the query is IPv4. + // The response from Bootstrap node will include the IPv6 node but that will be ignored due to + // non-contactable. + assert_eq!(2, test_discovery_three_peers(nodes, total_nodes).await); } /// This is a smaller version of the star topology test designed to debug issues with queries. -async fn test_discovery_three_peers(mut nodes: Vec, total_nodes: usize) { +async fn test_discovery_three_peers(mut nodes: Vec, total_nodes: usize) -> usize { init(); // Last node is bootstrap node in a star topology let bootstrap_node = nodes.remove(0); @@ -438,7 +483,7 @@ async fn test_discovery_three_peers(mut nodes: Vec, total_nodes: usize) result_nodes.len(), total_nodes ); - assert_eq!(result_nodes.len(), total_nodes); + result_nodes.len() } /// Test for a star topology with `num_nodes` connected to a `bootstrap_node` From 54002f2118bff4017f26572bfcf1a72bd4dc7559 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 20 Mar 2023 08:16:21 +0900 Subject: [PATCH 11/17] Tweak listen_sockets --- src/handler/mod.rs | 12 ++++-------- src/ipmode.rs | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 6b284ba57..707641d49 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -242,20 +242,16 @@ impl Handler { let mut listen_sockets = SmallVec::default(); match listen_config { - ListenConfig::Ipv4 { ip, port } => { - listen_sockets.push(SocketAddr::V4(SocketAddrV4::new(ip, port))) - } - ListenConfig::Ipv6 { ip, port } => { - listen_sockets.push(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))) - } + ListenConfig::Ipv4 { ip, port } => listen_sockets.push((ip, port).into()), + ListenConfig::Ipv6 { ip, port } => listen_sockets.push((ip, port).into()), ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port, } => { - listen_sockets.push(SocketAddr::V4(SocketAddrV4::new(ipv4, ipv4_port))); - listen_sockets.push(SocketAddr::V6(SocketAddrV6::new(ipv6, ipv6_port, 0, 0))); + listen_sockets.push((ipv4, ipv4_port).into()); + listen_sockets.push((ipv6, ipv6_port).into()); } }; diff --git a/src/ipmode.rs b/src/ipmode.rs index 320004494..d9b1341ff 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -14,7 +14,7 @@ pub enum IpMode { /// routing table if they contain a contactable IPv4 address. #[default] Ip4, - /// IPv4 only. This creates an IPv6 only UDP socket and will only store ENRs in the local + /// IPv6 only. This creates an IPv6 only UDP socket and will only store ENRs in the local /// routing table if they contain a contactable IPv6 address. Mapped addresses will be /// disabled. Ip6, From ae6eb4f20f54800324b304d99664ffd04420c8bb Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 20 Mar 2023 08:50:34 +0900 Subject: [PATCH 12/17] Checking if loopback address --- examples/find_nodes.rs | 3 +-- src/rpc.rs | 54 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index 79f457590..1a758b50d 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -30,8 +30,7 @@ use tracing::{info, warn}; #[derive(Parser)] struct FindNodesArgs { - /// Type of socket to bind ['ds', 'ip4', 'ip6']. The dual stack option will enable mapped - /// addresses over an IpV6 socket. + /// Type of socket to bind ['ds', 'ip4', 'ip6']. #[clap(long, default_value_t = SocketKind::Ds)] socket_kind: SocketKind, /// IpV4 to advertise in the ENR. This is needed so that other IpV4 nodes can connect to us. diff --git a/src/rpc.rs b/src/rpc.rs index a4f9fe7f4..c653e1e9b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -434,8 +434,13 @@ impl Message { let mut ip = [0u8; 16]; ip.copy_from_slice(&ip_bytes); let ipv6 = Ipv6Addr::from(ip); - // If the ipv6 is ipv4 compatible/mapped, simply return the ipv4. - if let Some(ipv4) = ipv6.to_ipv4() { + + if ipv6.is_loopback() { + // Checking if loopback address since IPv6Addr::to_ipv4 returns + // IPv4 address for IPv6 loopback address. + IpAddr::V6(ipv6) + } else if let Some(ipv4) = ipv6.to_ipv4() { + // If the ipv6 is ipv4 compatible/mapped, simply return the ipv4. IpAddr::V4(ipv4) } else { IpAddr::V6(ipv6) @@ -605,6 +610,7 @@ impl Message { mod tests { use super::*; use enr::EnrBuilder; + use std::net::Ipv4Addr; #[test] fn ref_test_encode_request_ping() { @@ -776,6 +782,50 @@ mod tests { assert_eq!(request, decoded); } + #[test] + fn encode_decode_ping_response_ipv4_mapped() { + let id = RequestId(vec![1]); + let request = Message::Response(Response { + id: id.clone(), + body: ResponseBody::Pong { + enr_seq: 15, + ip: IpAddr::V6(Ipv4Addr::new(192, 0, 2, 1).to_ipv6_mapped()), + port: 80, + }, + }); + + let encoded = request.encode(); + let decoded = Message::decode(&encoded).unwrap(); + let expected = Message::Response(Response { + id, + body: ResponseBody::Pong { + enr_seq: 15, + ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)), + port: 80, + }, + }); + + assert_eq!(expected, decoded); + } + + #[test] + fn encode_decode_ping_response_ipv6_loopback() { + let id = RequestId(vec![1]); + let request = Message::Response(Response { + id, + body: ResponseBody::Pong { + enr_seq: 15, + ip: IpAddr::V6(Ipv6Addr::LOCALHOST), + port: 80, + }, + }); + + let encoded = request.clone().encode(); + let decoded = Message::decode(&encoded).unwrap(); + + assert_eq!(request, decoded); + } + #[test] fn encode_decode_find_node_request() { let id = RequestId(vec![1]); From 03f46c87969afea86d82d8782289f63643d37903 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 20 Mar 2023 09:05:02 +0900 Subject: [PATCH 13/17] No longer need to treat mapped address --- src/handler/mod.rs | 1 - src/socket/recv.rs | 25 +++---------------------- 2 files changed, 3 insertions(+), 23 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 707641d49..451af416a 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -41,7 +41,6 @@ use enr::{CombinedKey, NodeId}; use futures::prelude::*; use parking_lot::RwLock; use smallvec::SmallVec; -use std::net::{SocketAddrV4, SocketAddrV6}; use std::{ collections::HashMap, convert::TryFrom, diff --git a/src/socket/recv.rs b/src/socket/recv.rs index d6322fc9e..68e165a52 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -3,16 +3,9 @@ //! Every UDP packet passes a filter before being processed. use super::filter::{Filter, FilterConfig}; -use crate::{ - ipmode::to_ipv4_mapped, metrics::METRICS, node_info::NodeAddress, packet::*, Executor, -}; +use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; use parking_lot::RwLock; -use std::{ - collections::HashMap, - net::{IpAddr, SocketAddr}, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ net::UdpSocket, sync::{mpsc, oneshot}, @@ -139,22 +132,10 @@ impl RecvHandler { /// handler. async fn handle_inbound( &mut self, - mut src_address: SocketAddr, + src_address: SocketAddr, length: usize, recv_buffer: &[u8; MAX_PACKET_SIZE], ) { - // Make sure ip4 addresses in dual stack nodes are reported correctly - if let IpAddr::V6(ip6) = src_address.ip() { - // NOTE: here we don't want to use the `to_ipv4` method, since it also includes compat - // addresses, which are deprecated. Dual stack nodes will report ipv4 addresses as - // mapped addresses and not compat, so this is not needed. This also messes with some - // valid ipv6 local addresses (for example, the ipv6 loopback address). Ideally what we - // want is `to_canonical`. - if let Some(ip4) = to_ipv4_mapped(&ip6) { - trace!("Mapping inbound packet addr from {} to {}", ip6, ip4); - src_address.set_ip(ip4.into()) - } - } // Permit all expected responses let permitted = self.expected_responses.read().get(&src_address).is_some(); From 7d92d2e01c315b210ed8d6c7c2f02c389a684611 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 21 Mar 2023 11:52:55 +0900 Subject: [PATCH 14/17] cargo fmt --- examples/custom_executor.rs | 5 +++-- examples/find_nodes.rs | 2 +- examples/simple_server.rs | 3 +-- src/discv5.rs | 6 ++++-- src/discv5/test.rs | 9 +++++---- src/handler/mod.rs | 3 +-- src/handler/tests.rs | 3 +-- src/ipmode.rs | 10 ++++++---- src/service.rs | 3 +-- src/socket/mod.rs | 12 +++++++----- src/socket/send.rs | 3 +-- 11 files changed, 31 insertions(+), 28 deletions(-) diff --git a/examples/custom_executor.rs b/examples/custom_executor.rs index 8492e974c..cc671be3f 100644 --- a/examples/custom_executor.rs +++ b/examples/custom_executor.rs @@ -9,8 +9,9 @@ //! $ cargo run --example custom_executor //! ``` -use discv5::socket::ListenConfig; -use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder, Discv5Event}; +use discv5::{ + enr, enr::CombinedKey, socket::ListenConfig, Discv5, Discv5ConfigBuilder, Discv5Event, +}; use std::net::Ipv4Addr; fn main() { diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index 1a758b50d..81bc90ab2 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -16,10 +16,10 @@ //! For a simple CLI discovery service see [discv5-cli](https://github.com/AgeManning/discv5-cli) use clap::Parser; -use discv5::socket::ListenConfig; use discv5::{ enr, enr::{k256, CombinedKey}, + socket::ListenConfig, Discv5, Discv5ConfigBuilder, Discv5Event, }; use std::{ diff --git a/examples/simple_server.rs b/examples/simple_server.rs index a682ad535..22c24f8a1 100644 --- a/examples/simple_server.rs +++ b/examples/simple_server.rs @@ -10,8 +10,7 @@ //! $ cargo run --example simple_server -- //! ``` -use discv5::socket::ListenConfig; -use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config, Discv5Event}; +use discv5::{enr, enr::CombinedKey, socket::ListenConfig, Discv5, Discv5Config, Discv5Event}; use std::net::Ipv4Addr; #[tokio::main] diff --git a/src/discv5.rs b/src/discv5.rs index 415dc3882..2f37949ba 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -37,8 +37,10 @@ use tracing::{debug, warn}; use libp2p_core::Multiaddr; // Create lazy static variable for the global permit/ban list -use crate::metrics::{Metrics, METRICS}; -use crate::socket::ListenConfig; +use crate::{ + metrics::{Metrics, METRICS}, + socket::ListenConfig, +}; lazy_static! { pub static ref PERMIT_BAN_LIST: RwLock = RwLock::new(crate::PermitBanList::default()); diff --git a/src/discv5/test.rs b/src/discv5/test.rs index 23863da2c..d7e35ce65 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -1,11 +1,12 @@ #![cfg(test)] -use crate::socket::ListenConfig; -use crate::{Discv5, *}; +use crate::{socket::ListenConfig, Discv5, *}; use enr::{k256, CombinedKey, Enr, EnrBuilder, EnrKey, NodeId}; use rand_core::{RngCore, SeedableRng}; -use std::net::Ipv6Addr; -use std::{collections::HashMap, net::Ipv4Addr}; +use std::{ + collections::HashMap, + net::{Ipv4Addr, Ipv6Addr}, +}; fn init() { let _ = tracing_subscriber::fmt() diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 451af416a..9373542ec 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -64,8 +64,7 @@ pub use crate::node_info::{NodeAddress, NodeContact}; use crate::metrics::METRICS; -use crate::lru_time_cache::LruTimeCache; -use crate::socket::ListenConfig; +use crate::{lru_time_cache::LruTimeCache, socket::ListenConfig}; use active_requests::ActiveRequests; use request_call::RequestCall; use session::Session; diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 7080febc0..e782b49ea 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -7,8 +7,7 @@ use crate::{ }; use std::net::{Ipv4Addr, Ipv6Addr}; -use crate::handler::HandlerOut::RequestFailed; -use crate::RequestError::SelfRequest; +use crate::{handler::HandlerOut::RequestFailed, RequestError::SelfRequest}; use active_requests::ActiveRequests; use enr::EnrBuilder; use std::time::Duration; diff --git a/src/ipmode.rs b/src/ipmode.rs index d9b1341ff..17799b7f9 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -1,6 +1,8 @@ -use crate::socket::ListenConfig; -use crate::Enr; -use crate::IpMode::{DualStack, Ip4, Ip6}; +use crate::{ + socket::ListenConfig, + Enr, + IpMode::{DualStack, Ip4, Ip6}, +}; use std::net::SocketAddr; ///! A set of configuration parameters to tune the discovery protocol. @@ -39,7 +41,7 @@ impl IpMode { /// dual stack, an Enr that advertises both an Ipv4 and a canonical Ipv6 address will be /// contacted using their Ipv6 address. pub fn get_contactable_addr(&self, enr: &Enr) -> Option { - // A function to get a cononical ipv6 address from an Enr + // A function to get a canonical ipv6 address from an Enr /// NOTE: There is nothing in the spec preventing compat/mapped addresses from being /// transmitted in the ENR. Here we choose to enforce canonical addresses since diff --git a/src/service.rs b/src/service.rs index d3b87d788..d343ac5b2 100644 --- a/src/service.rs +++ b/src/service.rs @@ -151,8 +151,7 @@ pub enum ServiceRequest { RequestEventStream(oneshot::Sender>), } -use crate::discv5::PERMIT_BAN_LIST; -use crate::socket::ListenConfig; +use crate::{discv5::PERMIT_BAN_LIST, socket::ListenConfig}; pub struct Service { /// Configuration parameters. diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 490e9114d..4b7d05f3f 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -10,8 +10,10 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::net::UdpSocket; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + net::UdpSocket, + sync::{mpsc, oneshot}, +}; mod filter; mod recv; @@ -70,15 +72,15 @@ pub struct Socket { impl Socket { /// This creates and binds a new UDP socket. // In general this function can be expanded to handle more advanced socket creation. - async fn new_socket(socket_addr: &SocketAddr) -> Result { + async fn new_socket(socket_addr: &SocketAddr) -> Result { match socket_addr { - SocketAddr::V4(ip4) => tokio::net::UdpSocket::bind(ip4).await, + SocketAddr::V4(ip4) => UdpSocket::bind(ip4).await, SocketAddr::V6(ip6) => { let socket = Socket2::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; socket.set_only_v6(true)?; socket.set_nonblocking(true)?; socket.bind(&SocketAddr::V6(*ip6).into())?; - tokio::net::UdpSocket::from_std(socket.into()) + UdpSocket::from_std(socket.into()) } } } diff --git a/src/socket/send.rs b/src/socket/send.rs index 245f4daac..668b503d6 100644 --- a/src/socket/send.rs +++ b/src/socket/send.rs @@ -1,7 +1,6 @@ //! This is a standalone task that encodes and sends Discv5 UDP packets use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; -use std::net::SocketAddr; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use tokio::{ net::UdpSocket, sync::{mpsc, oneshot}, From aa0030965ac917490951b995e1b526fc63a6bcac Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 21 Mar 2023 11:58:14 +0900 Subject: [PATCH 15/17] Fix doc --- src/lib.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a2864b26d..e8ea99677 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,10 +58,8 @@ //! //! ```rust //! use discv5::{enr, enr::{CombinedKey, NodeId}, TokioExecutor, Discv5, Discv5ConfigBuilder}; -//! use std::net::SocketAddr; -//! -//! // listening address and port -//! let listen_addr = "0.0.0.0:9000".parse::().unwrap(); +//! use discv5::socket::ListenConfig; +//! use std::net::{Ipv4Addr, SocketAddr}; //! //! // construct a local ENR //! let enr_key = CombinedKey::generate_secp256k1(); @@ -77,15 +75,21 @@ //! // default configuration //! let config = Discv5ConfigBuilder::new().build(); //! +//! // configuration for the sockets to listen on +//! let listen_config = ListenConfig::Ipv4 { +//! ip: Ipv4Addr::UNSPECIFIED, +//! port: 9000, +//! }; +//! //! // construct the discv5 server -//! let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); +//! let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); //! //! // In order to bootstrap the routing table an external ENR should be added //! // This can be done via add_enr. I.e.: //! // discv5.add_enr() //! //! // start the discv5 server -//! runtime.block_on(discv5.start(listen_addr)); +//! runtime.block_on(discv5.start()); //! //! // run a find_node query //! runtime.block_on(async { From 85fe2ed7a3d1a50d632adb5efa842400e479c293 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 21 Mar 2023 12:00:32 +0900 Subject: [PATCH 16/17] Update README --- README.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 745e37164..29a6b8705 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,9 @@ A simple example of creating this service is as follows: ```rust use discv5::{enr, enr::{CombinedKey, NodeId}, TokioExecutor, Discv5, Discv5ConfigBuilder}; + use discv5::socket::ListenConfig; use std::net::SocketAddr; - // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); - // construct a local ENR let enr_key = CombinedKey::generate_secp256k1(); let enr = enr::EnrBuilder::new("v4").build(&enr_key).unwrap(); @@ -57,15 +55,21 @@ A simple example of creating this service is as follows: // default configuration let config = Discv5ConfigBuilder::new().build(); + // configuration for the sockets to listen on + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; + // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // In order to bootstrap the routing table an external ENR should be added // This can be done via add_enr. I.e.: // discv5.add_enr() // start the discv5 server - runtime.block_on(discv5.start(listen_addr)); + runtime.block_on(discv5.start()); // run a find_node query runtime.block_on(async { From 5cd9409bae14331e97be8810f51e8fda698e49cd Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 22 Mar 2023 07:15:01 +0900 Subject: [PATCH 17/17] Tweak ports to avoid `Address already in use` --- src/discv5/test.rs | 26 +++++++++++++------------- src/handler/tests.rs | 4 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/discv5/test.rs b/src/discv5/test.rs index d7e35ce65..3c445194f 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -329,7 +329,7 @@ async fn test_discovery_three_peers_ipv4() { // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); // IPv4 - let nodes = build_nodes_from_keypairs(keypairs, 11200).await; + let nodes = build_nodes_from_keypairs(keypairs, 10000).await; assert_eq!( total_nodes, @@ -347,7 +347,7 @@ async fn test_discovery_three_peers_ipv6() { // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); // IPv6 - let nodes = build_nodes_from_keypairs_ipv6(keypairs, 11200).await; + let nodes = build_nodes_from_keypairs_ipv6(keypairs, 10010).await; assert_eq!( total_nodes, @@ -365,7 +365,7 @@ async fn test_discovery_three_peers_dual_stack() { // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); // DualStack - let nodes = build_nodes_from_keypairs_dual_stack(keypairs, 11200).await; + let nodes = build_nodes_from_keypairs_dual_stack(keypairs, 10020).await; assert_eq!( total_nodes, @@ -386,15 +386,15 @@ async fn test_discovery_three_peers_mixed() { let mut nodes = vec![]; // Bootstrap node (DualStack) - nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12000).await); + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10030).await); // A node to run query (DualStack) - nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12010).await); + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10031).await); // IPv4 node - nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 12020).await); + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 10032).await); // IPv6 node - nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 12030).await); + nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 10033).await); // Target node (DualStack) - nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12040).await); + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10034).await); assert!(keypairs.is_empty()); assert_eq!(5, nodes.len()); @@ -417,15 +417,15 @@ async fn test_discovery_three_peers_mixed_query_from_ipv4() { let mut nodes = vec![]; // Bootstrap node (DualStack) - nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12000).await); + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10040).await); // A node to run query (** IPv4 **) - nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 12010).await); + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 10041).await); // IPv4 node - nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 12020).await); + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 10042).await); // IPv6 node - nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 12030).await); + nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 10043).await); // Target node (DualStack) - nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 12040).await); + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10044).await); assert!(keypairs.is_empty()); assert_eq!(5, nodes.len()); diff --git a/src/handler/tests.rs b/src/handler/tests.rs index e782b49ea..3fdcc32dc 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -275,9 +275,9 @@ async fn test_self_request() { let config = Discv5ConfigBuilder::new().enable_packet_filter().build(); let enr = EnrBuilder::new("v4") .ip4(Ipv4Addr::LOCALHOST) - .udp4(5000) + .udp4(5004) .ip6(Ipv6Addr::LOCALHOST) - .udp6(5001) + .udp6(5005) .build(&key) .unwrap();