diff --git a/README.md b/README.md index 745e37164..29a6b8705 100644 --- a/README.md +++ b/README.md @@ -38,11 +38,9 @@ A simple example of creating this service is as follows: ```rust use discv5::{enr, enr::{CombinedKey, NodeId}, TokioExecutor, Discv5, Discv5ConfigBuilder}; + use discv5::socket::ListenConfig; use std::net::SocketAddr; - // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); - // construct a local ENR let enr_key = CombinedKey::generate_secp256k1(); let enr = enr::EnrBuilder::new("v4").build(&enr_key).unwrap(); @@ -57,15 +55,21 @@ A simple example of creating this service is as follows: // default configuration let config = Discv5ConfigBuilder::new().build(); + // configuration for the sockets to listen on + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; + // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // In order to bootstrap the routing table an external ENR should be added // This can be done via add_enr. I.e.: // discv5.add_enr() // start the discv5 server - runtime.block_on(discv5.start(listen_addr)); + runtime.block_on(discv5.start()); // run a find_node query runtime.block_on(async { diff --git a/examples/custom_executor.rs b/examples/custom_executor.rs index 9ec3baaa4..cc671be3f 100644 --- a/examples/custom_executor.rs +++ b/examples/custom_executor.rs @@ -9,8 +9,10 @@ //! $ cargo run --example custom_executor //! ``` -use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder, Discv5Event}; -use std::net::SocketAddr; +use discv5::{ + enr, enr::CombinedKey, socket::ListenConfig, Discv5, Discv5ConfigBuilder, Discv5Event, +}; +use std::net::Ipv4Addr; fn main() { // allows detailed logging with the RUST_LOG env variable @@ -22,7 +24,10 @@ fn main() { .try_init(); // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; let enr_key = CombinedKey::generate_secp256k1(); // construct a local ENR @@ -39,7 +44,7 @@ fn main() { let config = Discv5ConfigBuilder::new().build(); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // if we know of another peer's ENR, add it known peers if let Some(base64_enr) = std::env::args().nth(1) { @@ -61,7 +66,7 @@ fn main() { runtime.block_on(async { // start the discv5 service - discv5.start(listen_addr).await.unwrap(); + discv5.start().await.unwrap(); println!("Server started"); // get an event stream diff --git a/examples/find_nodes.rs b/examples/find_nodes.rs index 7e01d10bf..81bc90ab2 100644 --- a/examples/find_nodes.rs +++ b/examples/find_nodes.rs @@ -19,18 +19,18 @@ use clap::Parser; use discv5::{ enr, enr::{k256, CombinedKey}, + socket::ListenConfig, Discv5, Discv5ConfigBuilder, Discv5Event, }; use std::{ - net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + net::{Ipv4Addr, Ipv6Addr}, time::Duration, }; use tracing::{info, warn}; #[derive(Parser)] struct FindNodesArgs { - /// Type of socket to bind ['ds', 'ip4', 'ip6']. The dual stack option will enable mapped - /// addresses over an IpV6 socket. + /// Type of socket to bind ['ds', 'ip4', 'ip6']. #[clap(long, default_value_t = SocketKind::Ds)] socket_kind: SocketKind, /// IpV4 to advertise in the ENR. This is needed so that other IpV4 nodes can connect to us. @@ -43,6 +43,10 @@ struct FindNodesArgs { /// randomly. #[clap(long)] port: Option, + /// Port to bind for ipv6. If none is provided, a random one in the 9000 - 9999 range will be picked + /// randomly. + #[clap(long)] + port6: Option, /// Use a default test key. #[clap(long)] use_test_key: bool, @@ -67,6 +71,12 @@ async fn main() { let port = args .port .unwrap_or_else(|| (rand::random::() % 1000) + 9000); + let port6 = args.port.unwrap_or_else(|| loop { + let port6 = (rand::random::() % 1000) + 9000; + if port6 != port { + return port6; + } + }); let enr_key = if args.use_test_key { // A fixed key for testing @@ -93,9 +103,9 @@ async fn main() { if let Some(ip6) = args.enr_ip6 { // if the given address is the UNSPECIFIED address we want to advertise localhost if ip6.is_unspecified() { - builder.ip6(Ipv6Addr::LOCALHOST).udp6(port); + builder.ip6(Ipv6Addr::LOCALHOST).udp6(port6); } else { - builder.ip6(ip6).udp6(port); + builder.ip6(ip6).udp6(port6); } } builder.build(&enr_key).unwrap() @@ -105,17 +115,7 @@ async fn main() { // let config = Discv5ConfigBuilder::new().enable_packet_filter().build(); // default configuration without packet filtering - let config = Discv5ConfigBuilder::new() - .ip_mode(match args.socket_kind { - SocketKind::Ip4 => discv5::IpMode::Ip4, - SocketKind::Ip6 => discv5::IpMode::Ip6 { - enable_mapped_addresses: false, - }, - SocketKind::Ds => discv5::IpMode::Ip6 { - enable_mapped_addresses: true, - }, - }) - .build(); + let config = Discv5ConfigBuilder::new().build(); info!("Node Id: {}", enr.node_id()); if args.enr_ip6.is_some() || args.enr_ip4.is_some() { @@ -128,14 +128,25 @@ async fn main() { ); } // the address to listen on. - let bind_addr = match args.socket_kind { - SocketKind::Ip4 => Ipv4Addr::UNSPECIFIED.into(), - SocketKind::Ip6 | SocketKind::Ds => Ipv6Addr::UNSPECIFIED.into(), + let listen_config = match args.socket_kind { + SocketKind::Ip4 => ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port, + }, + SocketKind::Ip6 => ListenConfig::Ipv6 { + ip: Ipv6Addr::UNSPECIFIED, + port: port6, + }, + SocketKind::Ds => ListenConfig::DualStack { + ipv4: Ipv4Addr::UNSPECIFIED, + ipv4_port: port, + ipv6: Ipv6Addr::UNSPECIFIED, + ipv6_port: port6, + }, }; - let socket_addr = SocketAddr::new(bind_addr, port); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // if we know of another peer's ENR, add it known peers for enr in args.remote_peer { @@ -154,12 +165,12 @@ async fn main() { } // start the discv5 service - discv5.start(socket_addr).await.unwrap(); + discv5.start().await.unwrap(); let mut event_stream = discv5.event_stream().await.unwrap(); let check_evs = args.events; // construct a 30 second interval to search for new peers. - let mut query_interval = tokio::time::interval(Duration::from_secs(30)); + let mut query_interval = tokio::time::interval(Duration::from_secs(1)); loop { tokio::select! { diff --git a/examples/request_enr.rs b/examples/request_enr.rs index 7c4ac820c..3ff6456c7 100644 --- a/examples/request_enr.rs +++ b/examples/request_enr.rs @@ -13,9 +13,10 @@ //! //! This requires the "libp2p" feature. #[cfg(feature = "libp2p")] -use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config}; +use discv5::socket::ListenConfig; #[cfg(feature = "libp2p")] -use std::net::SocketAddr; +use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config}; +use std::net::Ipv4Addr; #[cfg(not(feature = "libp2p"))] fn main() {} @@ -31,7 +32,10 @@ async fn main() { .try_init(); // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; // generate a new enr key let enr_key = CombinedKey::generate_secp256k1(); @@ -46,10 +50,10 @@ async fn main() { .expect("A multiaddr must be supplied"); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // start the discv5 service - discv5.start(listen_addr).await.unwrap(); + discv5.start().await.unwrap(); // search for the ENR match discv5.request_enr(multiaddr).await { diff --git a/examples/simple_server.rs b/examples/simple_server.rs index 41652b5c7..22c24f8a1 100644 --- a/examples/simple_server.rs +++ b/examples/simple_server.rs @@ -10,8 +10,8 @@ //! $ cargo run --example simple_server -- //! ``` -use discv5::{enr, enr::CombinedKey, Discv5, Discv5Config, Discv5Event}; -use std::net::{Ipv4Addr, SocketAddr}; +use discv5::{enr, enr::CombinedKey, socket::ListenConfig, Discv5, Discv5Config, Discv5Event}; +use std::net::Ipv4Addr; #[tokio::main] async fn main() { @@ -37,7 +37,10 @@ async fn main() { }; // listening address and port - let listen_addr = "0.0.0.0:9000".parse::().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: Ipv4Addr::UNSPECIFIED, + port: 9000, + }; let enr_key = CombinedKey::generate_secp256k1(); @@ -72,7 +75,7 @@ async fn main() { let config = Discv5Config::default(); // construct the discv5 server - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); // if we know of another peer's ENR, add it known peers if let Some(base64_enr) = std::env::args().nth(3) { @@ -93,7 +96,7 @@ async fn main() { } // start the discv5 service - discv5.start(listen_addr).await.unwrap(); + discv5.start().await.unwrap(); println!("Server started"); // get an event stream diff --git a/src/config.rs b/src/config.rs index 9f325a172..63aed0265 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,5 @@ use crate::{ - ipmode::IpMode, kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, - RateLimiterBuilder, + kbucket::MAX_NODES_PER_BUCKET, Enr, Executor, PermitBanList, RateLimiter, RateLimiterBuilder, }; ///! A set of configuration parameters to tune the discovery protocol. use std::time::Duration; @@ -302,13 +301,6 @@ impl Discv5ConfigBuilder { self } - /// Configures the type of socket to bind to. This also affects the selection of address to use - /// to contact an ENR. - pub fn ip_mode(&mut self, ip_mode: IpMode) -> &mut Self { - self.config.ip_mode = ip_mode; - self - } - pub fn build(&mut self) -> Discv5Config { // If an executor is not provided, assume a current tokio runtime is running. if self.config.executor.is_none() { diff --git a/src/discv5.rs b/src/discv5.rs index 46b405c0c..2f37949ba 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -20,7 +20,7 @@ use crate::{ }, node_info::NodeContact, service::{QueryKind, Service, ServiceRequest, TalkRequest}, - Discv5Config, Enr, + Discv5Config, Enr, IpMode, }; use enr::{CombinedKey, EnrError, EnrKey, NodeId}; use parking_lot::RwLock; @@ -37,7 +37,10 @@ use tracing::{debug, warn}; use libp2p_core::Multiaddr; // Create lazy static variable for the global permit/ban list -use crate::metrics::{Metrics, METRICS}; +use crate::{ + metrics::{Metrics, METRICS}, + socket::ListenConfig, +}; lazy_static! { pub static ref PERMIT_BAN_LIST: RwLock = RwLock::new(crate::PermitBanList::default()); @@ -83,6 +86,10 @@ pub struct Discv5 { local_enr: Arc>, /// The key associated with the local ENR, required for updating the local ENR. enr_key: Arc>, + /// Type of socket to create. + listen_config: ListenConfig, + // Type of socket we are using + ip_mode: IpMode, } impl Discv5 { @@ -90,6 +97,7 @@ impl Discv5 { local_enr: Enr, enr_key: CombinedKey, mut config: Discv5Config, + listen_config: ListenConfig, ) -> Result { // ensure the keypair matches the one that signed the enr. if local_enr.public_key() != enr_key.public() { @@ -126,6 +134,8 @@ impl Discv5 { // Update the PermitBan list based on initial configuration *PERMIT_BAN_LIST.write() = config.permit_ban_list.clone(); + let ip_mode = IpMode::new_from_listen_config(&listen_config); + Ok(Discv5 { config, service_channel: None, @@ -133,6 +143,8 @@ impl Discv5 { kbuckets, local_enr, enr_key, + listen_config, + ip_mode, }) } @@ -149,7 +161,7 @@ impl Discv5 { self.enr_key.clone(), self.kbuckets.clone(), self.config.clone(), - listen_socket, + self.listen_config.clone(), ) .await?; self.service_exit = Some(service_exit); @@ -178,7 +190,7 @@ impl Discv5 { /// them upfront. pub fn add_enr(&self, enr: Enr) -> Result<(), &'static str> { // only add ENR's that have a valid udp socket. - if self.config.ip_mode.get_contactable_addr(&enr).is_none() { + if self.ip_mode.get_contactable_addr(&enr).is_none() { warn!("ENR attempted to be added without an UDP socket compatible with configured IpMode has been ignored."); return Err("ENR has no compatible UDP socket to connect to"); } @@ -474,7 +486,7 @@ impl Discv5 { let (callback_send, callback_recv) = oneshot::channel(); let channel = self.clone_channel(); - let ip_mode = self.config.ip_mode; + let ip_mode = self.ip_mode; async move { let node_contact = NodeContact::try_from_enr(enr, ip_mode)?; diff --git a/src/discv5/test.rs b/src/discv5/test.rs index c5a432d16..3c445194f 100644 --- a/src/discv5/test.rs +++ b/src/discv5/test.rs @@ -1,9 +1,12 @@ #![cfg(test)] -use crate::{Discv5, *}; +use crate::{socket::ListenConfig, Discv5, *}; use enr::{k256, CombinedKey, Enr, EnrBuilder, EnrKey, NodeId}; use rand_core::{RngCore, SeedableRng}; -use std::{collections::HashMap, net::Ipv4Addr}; +use std::{ + collections::HashMap, + net::{Ipv4Addr, Ipv6Addr}, +}; fn init() { let _ = tracing_subscriber::fmt() @@ -23,6 +26,10 @@ async fn build_nodes(n: usize, base_port: u16) -> Vec { for port in base_port..base_port + n as u16 { let enr_key = CombinedKey::generate_secp256k1(); let config = Discv5Config::default(); + let listen_config = ListenConfig::Ipv4 { + ip: ip.clone(), + port, + }; let enr = EnrBuilder::new("v4") .ip4(ip) @@ -30,9 +37,8 @@ async fn build_nodes(n: usize, base_port: u16) -> Vec { .build(&enr_key) .unwrap(); // transport for building a swarm - let socket_addr = enr.udp4_socket().unwrap(); - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); - discv5.start(socket_addr.into()).await.unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); nodes.push(discv5); } nodes @@ -47,6 +53,10 @@ async fn build_nodes_from_keypairs(keys: Vec, base_port: u16) -> Ve let port = base_port + i as u16; let config = Discv5ConfigBuilder::new().build(); + let listen_config = ListenConfig::Ipv4 { + ip: ip.clone(), + port, + }; let enr = EnrBuilder::new("v4") .ip4(ip) @@ -54,9 +64,66 @@ async fn build_nodes_from_keypairs(keys: Vec, base_port: u16) -> Ve .build(&enr_key) .unwrap(); - let socket_addr = enr.udp4_socket().unwrap(); - let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); - discv5.start(socket_addr.into()).await.unwrap(); + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); + nodes.push(discv5); + } + nodes +} + +async fn build_nodes_from_keypairs_ipv6(keys: Vec, base_port: u16) -> Vec { + let mut nodes = Vec::new(); + + for (i, enr_key) in keys.into_iter().enumerate() { + let port = base_port + i as u16; + + let config = Discv5ConfigBuilder::new().build(); + let listen_config = ListenConfig::Ipv6 { + ip: Ipv6Addr::LOCALHOST, + port, + }; + + let enr = EnrBuilder::new("v4") + .ip6(Ipv6Addr::LOCALHOST) + .udp6(port) + .build(&enr_key) + .unwrap(); + + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); + nodes.push(discv5); + } + nodes +} + +async fn build_nodes_from_keypairs_dual_stack( + keys: Vec, + base_port: u16, +) -> Vec { + let mut nodes = Vec::new(); + + for (i, enr_key) in keys.into_iter().enumerate() { + let ipv4_port = base_port + i as u16; + let ipv6_port = ipv4_port + 1000; + + let config = Discv5ConfigBuilder::new().build(); + let listen_config = ListenConfig::DualStack { + ipv4: Ipv4Addr::LOCALHOST, + ipv4_port, + ipv6: Ipv6Addr::LOCALHOST, + ipv6_port, + }; + + let enr = EnrBuilder::new("v4") + .ip4(Ipv4Addr::LOCALHOST) + .udp4(ipv4_port) + .ip6(Ipv6Addr::LOCALHOST) + .udp6(ipv6_port) + .build(&enr_key) + .unwrap(); + + let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); + discv5.start().await.unwrap(); nodes.push(discv5); } nodes @@ -252,16 +319,126 @@ fn find_seed_linear_topology() { } } -/// This is a smaller version of the star topology test designed to debug issues with queries. +/// Test for running a simple query test for a topology consisting of IPv4 nodes. #[tokio::test] -async fn test_discovery_three_peers() { +async fn test_discovery_three_peers_ipv4() { init(); let total_nodes = 3; // Seed is chosen such that all nodes are in the 256th bucket of bootstrap let seed = 1652; // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); - let mut nodes = build_nodes_from_keypairs(keypairs, 11200).await; + // IPv4 + let nodes = build_nodes_from_keypairs(keypairs, 10000).await; + + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); +} + +/// Test for running a simple query test for a topology consisting of IPv6 nodes. +#[tokio::test] +async fn test_discovery_three_peers_ipv6() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + // IPv6 + let nodes = build_nodes_from_keypairs_ipv6(keypairs, 10010).await; + + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); +} + +/// Test for running a simple query test for a topology consisting of dual stack nodes. +#[tokio::test] +async fn test_discovery_three_peers_dual_stack() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + // DualStack + let nodes = build_nodes_from_keypairs_dual_stack(keypairs, 10020).await; + + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); +} + +/// Test for running a simple query test for a mixed topology of IPv4, IPv6 and dual stack nodes. +/// The node to run the query is DualStack. +#[tokio::test] +async fn test_discovery_three_peers_mixed() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let mut keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + + let mut nodes = vec![]; + // Bootstrap node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10030).await); + // A node to run query (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10031).await); + // IPv4 node + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 10032).await); + // IPv6 node + nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 10033).await); + // Target node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10034).await); + + assert!(keypairs.is_empty()); + assert_eq!(5, nodes.len()); + assert_eq!( + total_nodes, + test_discovery_three_peers(nodes, total_nodes).await + ); +} + +/// Test for running a simple query test for a mixed topology of IPv4, IPv6 and dual stack nodes. +/// The node to run the query is IPv4. +#[tokio::test] +async fn test_discovery_three_peers_mixed_query_from_ipv4() { + init(); + let total_nodes = 3; + // Seed is chosen such that all nodes are in the 256th bucket of bootstrap + let seed = 1652; + // Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed + let mut keypairs = generate_deterministic_keypair(total_nodes + 2, seed); + + let mut nodes = vec![]; + // Bootstrap node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10040).await); + // A node to run query (** IPv4 **) + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 10041).await); + // IPv4 node + nodes.append(&mut build_nodes_from_keypairs(vec![keypairs.remove(0)], 10042).await); + // IPv6 node + nodes.append(&mut build_nodes_from_keypairs_ipv6(vec![keypairs.remove(0)], 10043).await); + // Target node (DualStack) + nodes.append(&mut build_nodes_from_keypairs_dual_stack(vec![keypairs.remove(0)], 10044).await); + + assert!(keypairs.is_empty()); + assert_eq!(5, nodes.len()); + + // `2` is expected here since the node that runs the query is IPv4. + // The response from Bootstrap node will include the IPv6 node but that will be ignored due to + // non-contactable. + assert_eq!(2, test_discovery_three_peers(nodes, total_nodes).await); +} + +/// This is a smaller version of the star topology test designed to debug issues with queries. +async fn test_discovery_three_peers(mut nodes: Vec, total_nodes: usize) -> usize { + init(); // Last node is bootstrap node in a star topology let bootstrap_node = nodes.remove(0); // target_node is not polled. @@ -307,7 +484,7 @@ async fn test_discovery_three_peers() { result_nodes.len(), total_nodes ); - assert_eq!(result_nodes.len(), total_nodes); + result_nodes.len() } /// Test for a star topology with `num_nodes` connected to a `bootstrap_node` @@ -550,9 +727,13 @@ async fn test_table_limits() { .udp4(9050) .build(&enr_key) .unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: enr.ip4().unwrap(), + port: enr.udp4().unwrap(), + }; // let socket_addr = enr.udp_socket().unwrap(); - let discv5: Discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let discv5: Discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); let table_limit: usize = 10; // Generate `table_limit + 2` nodes in the same subnet. let enrs: Vec> = (1..=table_limit + 1) @@ -615,7 +796,12 @@ async fn test_bucket_limits() { .collect(); let config = Discv5ConfigBuilder::new().ip_limit().build(); - let discv5 = Discv5::new(enr, enr_key, config).unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: enr.ip4().unwrap(), + port: enr.udp4().unwrap(), + }; + + let discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); for enr in enrs { let _ = discv5.add_enr(enr.clone()); // we expect some of these to fail based on the filter. } diff --git a/src/handler/mod.rs b/src/handler/mod.rs index f17bc3fd9..9373542ec 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -40,6 +40,7 @@ use delay_map::HashMapDelay; use enr::{CombinedKey, NodeId}; use futures::prelude::*; use parking_lot::RwLock; +use smallvec::SmallVec; use std::{ collections::HashMap, convert::TryFrom, @@ -63,7 +64,7 @@ pub use crate::node_info::{NodeAddress, NodeContact}; use crate::metrics::METRICS; -use crate::lru_time_cache::LruTimeCache; +use crate::{lru_time_cache::LruTimeCache, socket::ListenConfig}; use active_requests::ActiveRequests; use request_call::RequestCall; use session::Session; @@ -194,8 +195,8 @@ pub struct Handler { service_recv: mpsc::UnboundedReceiver, /// The channel to send messages to the application layer. service_send: mpsc::Sender, - /// The listening socket to filter out any attempted requests to self. - listen_socket: SocketAddr, + /// The listening sockets to filter out any attempted requests to self. + listen_sockets: SmallVec<[SocketAddr; 2]>, /// The discovery v5 UDP socket tasks. socket: Socket, /// Exit channel to shutdown the handler. @@ -213,8 +214,8 @@ impl Handler { pub async fn spawn( enr: Arc>, key: Arc>, - listen_socket: SocketAddr, config: Discv5Config, + listen_config: ListenConfig, ) -> Result { let (exit_sender, exit) = oneshot::channel(); // create the channels to send/receive messages from the application @@ -230,7 +231,6 @@ impl Handler { let node_id = enr.read().node_id(); // enable the packet filter if required - let filter_config = FilterConfig { enabled: config.enable_packet_filter, rate_limiter: config.filter_rate_limiter.clone(), @@ -238,14 +238,28 @@ impl Handler { max_bans_per_ip: config.filter_max_bans_per_ip, }; + let mut listen_sockets = SmallVec::default(); + match listen_config { + ListenConfig::Ipv4 { ip, port } => listen_sockets.push((ip, port).into()), + ListenConfig::Ipv6 { ip, port } => listen_sockets.push((ip, port).into()), + ListenConfig::DualStack { + ipv4, + ipv4_port, + ipv6, + ipv6_port, + } => { + listen_sockets.push((ipv4, ipv4_port).into()); + listen_sockets.push((ipv6, ipv6_port).into()); + } + }; + let socket_config = socket::SocketConfig { executor: config.executor.clone().expect("Executor must exist"), - socket_addr: listen_socket, filter_config, + listen_config, local_node_id: node_id, expected_responses: filter_expected_responses.clone(), ban_duration: config.ban_duration, - ip_mode: config.ip_mode, }; // Attempt to bind to the socket before spinning up the send/recv tasks. @@ -271,7 +285,7 @@ impl Handler { active_challenges: HashMapDelay::new(config.request_timeout), service_recv, service_send, - listen_socket, + listen_sockets, socket, exit, }; @@ -431,7 +445,7 @@ impl Handler { ) -> Result<(), RequestError> { let node_address = contact.node_address(); - if node_address.socket_addr == self.listen_socket { + if self.listen_sockets.contains(&node_address.socket_addr) { debug!("Filtered request to self"); return Err(RequestError::SelfRequest); } diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 5ed889984..3fdcc32dc 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -1,10 +1,13 @@ #![cfg(test)] + use super::*; use crate::{ rpc::{Request, Response}, - Discv5ConfigBuilder, + Discv5ConfigBuilder, IpMode, }; +use std::net::{Ipv4Addr, Ipv6Addr}; +use crate::{handler::HandlerOut::RequestFailed, RequestError::SelfRequest}; use active_requests::ActiveRequests; use enr::EnrBuilder; use std::time::Duration; @@ -50,8 +53,11 @@ async fn simple_session_message() { let (_exit_send, sender_send, _sender_recv) = Handler::spawn( arc_rw!(sender_enr.clone()), arc_rw!(key1), - sender_enr.udp4_socket().unwrap().into(), config.clone(), + ListenConfig::Ipv4 { + ip: sender_enr.ip4().unwrap(), + port: sender_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -59,8 +65,11 @@ async fn simple_session_message() { let (_exit_recv, recv_send, mut receiver_recv) = Handler::spawn( arc_rw!(receiver_enr.clone()), arc_rw!(key2), - receiver_enr.udp4_socket().unwrap().into(), config, + ListenConfig::Ipv4 { + ip: receiver_enr.ip4().unwrap(), + port: receiver_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -126,8 +135,11 @@ async fn multiple_messages() { let (_exit_send, sender_handler, mut sender_handler_recv) = Handler::spawn( arc_rw!(sender_enr.clone()), arc_rw!(key1), - sender_enr.udp4_socket().unwrap().into(), config.clone(), + ListenConfig::Ipv4 { + ip: sender_enr.ip4().unwrap(), + port: sender_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -135,8 +147,11 @@ async fn multiple_messages() { let (_exit_recv, recv_send, mut receiver_handler) = Handler::spawn( arc_rw!(receiver_enr.clone()), arc_rw!(key2), - receiver_enr.udp4_socket().unwrap().into(), config, + ListenConfig::Ipv4 { + ip: receiver_enr.ip4().unwrap(), + port: receiver_enr.udp4().unwrap(), + }, ) .await .unwrap(); @@ -251,3 +266,60 @@ async fn test_active_requests_insert() { active_requests.remove_by_nonce(&nonce); active_requests.check_invariant(); } + +#[tokio::test] +async fn test_self_request() { + init(); + + let key = CombinedKey::generate_secp256k1(); + let config = Discv5ConfigBuilder::new().enable_packet_filter().build(); + let enr = EnrBuilder::new("v4") + .ip4(Ipv4Addr::LOCALHOST) + .udp4(5004) + .ip6(Ipv6Addr::LOCALHOST) + .udp6(5005) + .build(&key) + .unwrap(); + + let (_exit_send, send, mut recv) = Handler::spawn( + arc_rw!(enr.clone()), + arc_rw!(key), + config, + ListenConfig::DualStack { + ipv4: enr.ip4().unwrap(), + ipv4_port: enr.udp4().unwrap(), + ipv6: enr.ip6().unwrap(), + ipv6_port: enr.udp6().unwrap(), + }, + ) + .await + .unwrap(); + + // self request (IPv4) + let _ = send.send(HandlerIn::Request( + NodeContact::try_from_enr(enr.clone(), IpMode::Ip4).unwrap(), + Box::new(Request { + id: RequestId(vec![1]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + let handler_out = recv.recv().await; + assert_eq!( + Some(RequestFailed(RequestId(vec![1]), SelfRequest)), + handler_out + ); + + // self request (IPv6) + let _ = send.send(HandlerIn::Request( + NodeContact::try_from_enr(enr, IpMode::Ip6).unwrap(), + Box::new(Request { + id: RequestId(vec![2]), + body: RequestBody::Ping { enr_seq: 1 }, + }), + )); + let handler_out = recv.recv().await; + assert_eq!( + Some(RequestFailed(RequestId(vec![2]), SelfRequest)), + handler_out + ); +} diff --git a/src/ipmode.rs b/src/ipmode.rs index b70b0dbd9..17799b7f9 100644 --- a/src/ipmode.rs +++ b/src/ipmode.rs @@ -1,5 +1,10 @@ -use crate::Enr; +use crate::{ + socket::ListenConfig, + Enr, + IpMode::{DualStack, Ip4, Ip6}, +}; use std::net::SocketAddr; + ///! A set of configuration parameters to tune the discovery protocol. /// Sets the socket type to be established and also determines the type of ENRs that we will store @@ -11,7 +16,7 @@ pub enum IpMode { /// routing table if they contain a contactable IPv4 address. #[default] Ip4, - /// IPv4 only. This creates an IPv6 only UDP socket and will only store ENRs in the local + /// IPv6 only. This creates an IPv6 only UDP socket and will only store ENRs in the local /// routing table if they contain a contactable IPv6 address. Mapped addresses will be /// disabled. Ip6, @@ -20,15 +25,23 @@ pub enum IpMode { } impl IpMode { + pub(crate) fn new_from_listen_config(listen_config: &ListenConfig) -> Self { + match listen_config { + ListenConfig::Ipv4 { .. } => Ip4, + ListenConfig::Ipv6 { .. } => Ip6, + ListenConfig::DualStack { .. } => DualStack, + } + } + pub fn is_ipv4(&self) -> bool { - self == &IpMode::Ip4 + self == &Ip4 } /// Get the contactable Socket address of an Enr under current configuration. When running in /// dual stack, an Enr that advertises both an Ipv4 and a canonical Ipv6 address will be /// contacted using their Ipv6 address. pub fn get_contactable_addr(&self, enr: &Enr) -> Option { - // A function to get a cononical ipv6 address from an Enr + // A function to get a canonical ipv6 address from an Enr /// NOTE: There is nothing in the spec preventing compat/mapped addresses from being /// transmitted in the ENR. Here we choose to enforce canonical addresses since @@ -45,9 +58,9 @@ impl IpMode { } match self { - IpMode::Ip4 => enr.udp4_socket().map(SocketAddr::V4), - IpMode::Ip6 => canonical_ipv6_enr_addr(enr).map(SocketAddr::V6), - IpMode::DualStack => { + Ip4 => enr.udp4_socket().map(SocketAddr::V4), + Ip6 => canonical_ipv6_enr_addr(enr).map(SocketAddr::V6), + DualStack => { canonical_ipv6_enr_addr(enr) .map(SocketAddr::V6) // NOTE: general consensus is that ipv6 addresses should be preferred. @@ -80,7 +93,7 @@ mod tests { name, enr_ip4: None, enr_ip6: None, - ip_mode: IpMode::Ip4, + ip_mode: Ip4, expected_socket_addr: None, } } @@ -139,15 +152,15 @@ mod tests { fn empty_enr_no_contactable_address() { // Empty ENR TestCase::new("Empty enr is non contactable by ip4 node") - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .test(); TestCase::new("Empty enr is not contactable by ip6 only node") - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .test(); TestCase::new("Empty enr is not contactable by dual stack node") - .ip_mode(IpMode::DualStack) + .ip_mode(DualStack) .test(); } @@ -156,18 +169,18 @@ mod tests { // Ip4 only ENR TestCase::new("Ipv4 only enr is contactable by ip4 node") .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .expect_ip4(Ipv4Addr::LOCALHOST) .test(); TestCase::new("Ipv4 only enr is not contactable by ip6 only node") .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .test(); TestCase::new("Ipv4 only enr is contactable by dual stack node") .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::DualStack) + .ip_mode(DualStack) .expect_ip4(Ipv4Addr::LOCALHOST) .test(); } @@ -177,18 +190,18 @@ mod tests { // Ip4 only ENR TestCase::new("Ipv6 only enr is not contactable by ip4 node") .enr_ip6(Ipv6Addr::LOCALHOST) - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .test(); TestCase::new("Ipv6 only enr is contactable by ip6 only node") .enr_ip6(Ipv6Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); TestCase::new("Ipv6 only enr is contactable by dual stack node") .enr_ip6(Ipv6Addr::LOCALHOST) - .ip_mode(IpMode::DualStack) + .ip_mode(DualStack) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); } @@ -199,21 +212,21 @@ mod tests { TestCase::new("Dual stack enr is contactable by ip4 node") .enr_ip6(Ipv6Addr::LOCALHOST) .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip4) + .ip_mode(Ip4) .expect_ip4(Ipv4Addr::LOCALHOST) .test(); TestCase::new("Dual stack enr is contactable by ip6 only node") .enr_ip6(Ipv6Addr::LOCALHOST) .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); TestCase::new("Dual stack enr is contactable by dual stack node") .enr_ip6(Ipv6Addr::LOCALHOST) .enr_ip4(Ipv4Addr::LOCALHOST) - .ip_mode(IpMode::Ip6) + .ip_mode(Ip6) .expect_ip6(Ipv6Addr::LOCALHOST) .test(); } diff --git a/src/lib.rs b/src/lib.rs index a2864b26d..e8ea99677 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,10 +58,8 @@ //! //! ```rust //! use discv5::{enr, enr::{CombinedKey, NodeId}, TokioExecutor, Discv5, Discv5ConfigBuilder}; -//! use std::net::SocketAddr; -//! -//! // listening address and port -//! let listen_addr = "0.0.0.0:9000".parse::().unwrap(); +//! use discv5::socket::ListenConfig; +//! use std::net::{Ipv4Addr, SocketAddr}; //! //! // construct a local ENR //! let enr_key = CombinedKey::generate_secp256k1(); @@ -77,15 +75,21 @@ //! // default configuration //! let config = Discv5ConfigBuilder::new().build(); //! +//! // configuration for the sockets to listen on +//! let listen_config = ListenConfig::Ipv4 { +//! ip: Ipv4Addr::UNSPECIFIED, +//! port: 9000, +//! }; +//! //! // construct the discv5 server -//! let mut discv5 = Discv5::new(enr, enr_key, config).unwrap(); +//! let mut discv5 = Discv5::new(enr, enr_key, config, listen_config).unwrap(); //! //! // In order to bootstrap the routing table an external ENR should be added //! // This can be done via add_enr. I.e.: //! // discv5.add_enr() //! //! // start the discv5 server -//! runtime.block_on(discv5.start(listen_addr)); +//! runtime.block_on(discv5.start()); //! //! // run a find_node query //! runtime.block_on(async { diff --git a/src/rpc.rs b/src/rpc.rs index a4f9fe7f4..c653e1e9b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -434,8 +434,13 @@ impl Message { let mut ip = [0u8; 16]; ip.copy_from_slice(&ip_bytes); let ipv6 = Ipv6Addr::from(ip); - // If the ipv6 is ipv4 compatible/mapped, simply return the ipv4. - if let Some(ipv4) = ipv6.to_ipv4() { + + if ipv6.is_loopback() { + // Checking if loopback address since IPv6Addr::to_ipv4 returns + // IPv4 address for IPv6 loopback address. + IpAddr::V6(ipv6) + } else if let Some(ipv4) = ipv6.to_ipv4() { + // If the ipv6 is ipv4 compatible/mapped, simply return the ipv4. IpAddr::V4(ipv4) } else { IpAddr::V6(ipv6) @@ -605,6 +610,7 @@ impl Message { mod tests { use super::*; use enr::EnrBuilder; + use std::net::Ipv4Addr; #[test] fn ref_test_encode_request_ping() { @@ -776,6 +782,50 @@ mod tests { assert_eq!(request, decoded); } + #[test] + fn encode_decode_ping_response_ipv4_mapped() { + let id = RequestId(vec![1]); + let request = Message::Response(Response { + id: id.clone(), + body: ResponseBody::Pong { + enr_seq: 15, + ip: IpAddr::V6(Ipv4Addr::new(192, 0, 2, 1).to_ipv6_mapped()), + port: 80, + }, + }); + + let encoded = request.encode(); + let decoded = Message::decode(&encoded).unwrap(); + let expected = Message::Response(Response { + id, + body: ResponseBody::Pong { + enr_seq: 15, + ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)), + port: 80, + }, + }); + + assert_eq!(expected, decoded); + } + + #[test] + fn encode_decode_ping_response_ipv6_loopback() { + let id = RequestId(vec![1]); + let request = Message::Response(Response { + id, + body: ResponseBody::Pong { + enr_seq: 15, + ip: IpAddr::V6(Ipv6Addr::LOCALHOST), + port: 80, + }, + }); + + let encoded = request.clone().encode(); + let decoded = Message::decode(&encoded).unwrap(); + + assert_eq!(request, decoded); + } + #[test] fn encode_decode_find_node_request() { let id = RequestId(vec![1]); diff --git a/src/service.rs b/src/service.rs index 4d6ca96fb..d343ac5b2 100644 --- a/src/service.rs +++ b/src/service.rs @@ -151,7 +151,7 @@ pub enum ServiceRequest { RequestEventStream(oneshot::Sender>), } -use crate::discv5::PERMIT_BAN_LIST; +use crate::{discv5::PERMIT_BAN_LIST, socket::ListenConfig}; pub struct Service { /// Configuration parameters. @@ -253,7 +253,7 @@ impl Service { enr_key: Arc>, kbuckets: Arc>>, config: Discv5Config, - listen_socket: SocketAddr, + listen_config: ListenConfig, ) -> Result<(oneshot::Sender<()>, mpsc::Sender), std::io::Error> { // process behaviour-level configuration parameters let ip_votes = if config.enr_update { @@ -265,12 +265,14 @@ impl Service { None }; + let ip_mode = IpMode::new_from_listen_config(&listen_config); + // build the session service let (handler_exit, handler_send, handler_recv) = Handler::spawn( local_enr.clone(), enr_key.clone(), - listen_socket, config.clone(), + listen_config, ) .await?; @@ -299,6 +301,7 @@ impl Service { event_stream: None, exit, config: config.clone(), + ip_mode, }; info!("Discv5 Service started"); diff --git a/src/service/test.rs b/src/service/test.rs index 160b483d8..6519e9821 100644 --- a/src/service/test.rs +++ b/src/service/test.rs @@ -14,7 +14,7 @@ use crate::{ }; use enr::{CombinedKey, EnrBuilder}; use parking_lot::RwLock; -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use tokio::sync::{mpsc, oneshot}; fn _connected_state() -> NodeStatus { @@ -40,7 +40,7 @@ fn init() { async fn build_service( local_enr: Arc>, enr_key: Arc>, - listen_socket: SocketAddr, + listen_config: ListenConfig, filters: bool, ) -> Service { let config = Discv5ConfigBuilder::new() @@ -50,8 +50,8 @@ async fn build_service( let (_handler_exit, handler_send, handler_recv) = Handler::spawn( local_enr.clone(), enr_key.clone(), - listen_socket, config.clone(), + listen_config, ) .await .unwrap(); @@ -93,6 +93,7 @@ async fn build_service( event_stream: None, exit, config, + ip_mode: Default::default(), } } @@ -114,12 +115,15 @@ async fn test_updating_connection_on_ping() { .build(&enr_key2) .unwrap(); - let socket_addr = enr.udp4_socket().unwrap(); + let listen_config = ListenConfig::Ipv4 { + ip: enr.ip4().unwrap(), + port: enr.udp4().unwrap(), + }; let mut service = build_service( Arc::new(RwLock::new(enr)), Arc::new(RwLock::new(enr_key1)), - socket_addr.into(), + listen_config, false, ) .await; diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 967e15673..4b7d05f3f 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -10,7 +10,10 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + net::UdpSocket, + sync::{mpsc, oneshot}, +}; mod filter; mod recv; @@ -24,6 +27,7 @@ pub use recv::InboundPacket; pub use send::OutboundPacket; /// Configuration for the sockets to listen on. +#[derive(Clone)] pub enum ListenConfig { Ipv4 { ip: Ipv4Addr, @@ -45,8 +49,6 @@ pub enum ListenConfig { pub struct SocketConfig { /// The executor to spawn the tasks. pub executor: Box, - /// The listening socket. - pub socket_addr: SocketAddr, /// Configuration details for the packet filter. pub filter_config: FilterConfig, /// Type of socket to create. @@ -70,15 +72,15 @@ pub struct Socket { impl Socket { /// This creates and binds a new UDP socket. // In general this function can be expanded to handle more advanced socket creation. - async fn new_socket(socket_addr: &SocketAddr) -> Result { + async fn new_socket(socket_addr: &SocketAddr) -> Result { match socket_addr { - SocketAddr::V4(ip4) => tokio::net::UdpSocket::bind(ip4).await, + SocketAddr::V4(ip4) => UdpSocket::bind(ip4).await, SocketAddr::V6(ip6) => { let socket = Socket2::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; socket.set_only_v6(true)?; socket.set_nonblocking(true)?; socket.bind(&SocketAddr::V6(*ip6).into())?; - tokio::net::UdpSocket::from_std(socket.into()) + UdpSocket::from_std(socket.into()) } } } @@ -89,7 +91,6 @@ impl Socket { pub(crate) async fn new(config: SocketConfig) -> Result { let SocketConfig { executor, - socket_addr, filter_config, listen_config, ban_duration, @@ -97,35 +98,43 @@ impl Socket { local_node_id, } = config; - // For now intentionally forgettig which socket is the ipv4 and which is the ipv6 one. - let (first_addr, maybe_second_addr): (SocketAddr, Option<_>) = match listen_config { - ListenConfig::Ipv4 { ip, port } => ((ip, port).into(), None), - ListenConfig::Ipv6 { ip, port } => ((ip, port).into(), None), + // For recv socket, intentionally forgetting which socket is the ipv4 and which is the ipv6 one. + let (first_recv, second_recv, send_ipv4, send_ipv6): ( + Arc, + Option<_>, + Option<_>, + Option<_>, + ) = match listen_config { + ListenConfig::Ipv4 { ip, port } => { + let ipv4_socket = Arc::new(Socket::new_socket(&(ip, port).into()).await?); + (ipv4_socket.clone(), None, Some(ipv4_socket), None) + } + ListenConfig::Ipv6 { ip, port } => { + let ipv6_socket = Arc::new(Socket::new_socket(&(ip, port).into()).await?); + (ipv6_socket.clone(), None, None, Some(ipv6_socket)) + } ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port, - } => ((ipv4, ipv4_port).into(), Some((ipv6, ipv4_port))), - }; - let first_socket = Socket::new_socket(&first_addr).await?; - let maybe_second_socket = match maybe_second_addr { - Some(second_addr) => Some(Socket::new_socket(&socket_addr).await?), - None => None, + } => { + let ipv4_socket = Arc::new(Socket::new_socket(&(ipv4, ipv4_port).into()).await?); + let ipv6_socket = Arc::new(Socket::new_socket(&(ipv6, ipv6_port).into()).await?); + ( + ipv4_socket.clone(), + Some(ipv6_socket.clone()), + Some(ipv4_socket), + Some(ipv6_socket), + ) + } }; - // Arc the udp socket for the send/recv tasks. - let recv_udp = Arc::new(first_socket); - let send_udp = recv_udp.clone(); - - let second_recv = maybe_second_socket.map(Arc::new); - let second_send = second_recv.clone(); - // spawn the recv handler let recv_config = RecvHandlerConfig { filter_config, executor: executor.clone(), - recv: recv_udp, + recv: first_recv, second_recv, local_node_id, expected_responses, @@ -134,7 +143,7 @@ impl Socket { let (recv, recv_exit) = RecvHandler::spawn(recv_config); // spawn the sender handler - let (send, sender_exit) = SendHandler::spawn(executor, send_udp); + let (send, sender_exit) = SendHandler::spawn(executor, send_ipv4, send_ipv6); Ok(Socket { send, diff --git a/src/socket/recv.rs b/src/socket/recv.rs index c18aff3af..68e165a52 100644 --- a/src/socket/recv.rs +++ b/src/socket/recv.rs @@ -3,16 +3,9 @@ //! Every UDP packet passes a filter before being processed. use super::filter::{Filter, FilterConfig}; -use crate::{ - ipmode::to_ipv4_mapped, metrics::METRICS, node_info::NodeAddress, packet::*, Executor, -}; +use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; use parking_lot::RwLock; -use std::{ - collections::HashMap, - net::{IpAddr, SocketAddr}, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{ net::UdpSocket, sync::{mpsc, oneshot}, @@ -48,7 +41,7 @@ pub struct RecvHandlerConfig { pub(crate) struct RecvHandler { /// The UDP recv socket. recv: Arc, - /// An option second UDO socket. Used when dialing over both Ipv4 and Ipv6. + /// An option second UDP socket. Used when dialing over both Ipv4 and Ipv6. second_recv: Option>, /// Simple hack to alternate reading from the first or the second socket. /// The list of waiting responses. These are used to allow incoming packets from sources @@ -139,22 +132,10 @@ impl RecvHandler { /// handler. async fn handle_inbound( &mut self, - mut src_address: SocketAddr, + src_address: SocketAddr, length: usize, recv_buffer: &[u8; MAX_PACKET_SIZE], ) { - // Make sure ip4 addresses in dual stack nodes are reported correctly - if let IpAddr::V6(ip6) = src_address.ip() { - // NOTE: here we don't want to use the `to_ipv4` method, since it also includes compat - // addresses, which are deprecated. Dual stack nodes will report ipv4 addresses as - // mapped addresses and not compat, so this is not needed. This also messes with some - // valid ipv6 local addresses (for example, the ipv6 loopback address). Ideally what we - // want is `to_canonical`. - if let Some(ip4) = to_ipv4_mapped(&ip6) { - trace!("Mapping inbound packet addr from {} to {}", ip6, ip4); - src_address.set_ip(ip4.into()) - } - } // Permit all expected responses let permitted = self.expected_responses.read().get(&src_address).is_some(); diff --git a/src/socket/send.rs b/src/socket/send.rs index 912699bde..668b503d6 100644 --- a/src/socket/send.rs +++ b/src/socket/send.rs @@ -1,6 +1,6 @@ //! This is a standalone task that encodes and sends Discv5 UDP packets use crate::{metrics::METRICS, node_info::NodeAddress, packet::*, Executor}; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use tokio::{ net::UdpSocket, sync::{mpsc, oneshot}, @@ -16,8 +16,10 @@ pub struct OutboundPacket { /// The main task that handles outbound UDP packets. pub(crate) struct SendHandler { - /// The UDP send socket. - send: Arc, + /// The UDP send socket for IPv4. + send_ipv4: Option>, + /// The UDP send socket for IPv6. + send_ipv6: Option>, /// The channel to respond to send requests. handler_recv: mpsc::Receiver, /// Exit channel to shutdown the handler. @@ -30,14 +32,15 @@ impl SendHandler { /// shutdown the handler. pub(crate) fn spawn( executor: Box, - send: Arc, - second_send: Option>, + send_ipv4: Option>, + send_ipv6: Option>, ) -> (mpsc::Sender, oneshot::Sender<()>) { let (exit_send, exit) = oneshot::channel(); let (handler_send, handler_recv) = mpsc::channel(30); let mut send_handler = SendHandler { - send, + send_ipv4, + send_ipv6, handler_recv, exit, }; @@ -59,7 +62,7 @@ impl SendHandler { if encoded_packet.len() > MAX_PACKET_SIZE { warn!("Sending packet larger than max size: {} max: {}", encoded_packet.len(), MAX_PACKET_SIZE); } - if let Err(e) = self.send.send_to(&encoded_packet, &packet.node_address.socket_addr).await { + if let Err(e) = self.send(&encoded_packet, &packet.node_address.socket_addr).await { trace!("Could not send packet. Error: {:?}", e); } else { METRICS.add_sent_bytes(encoded_packet.len()); @@ -72,4 +75,32 @@ impl SendHandler { } } } + + async fn send( + &self, + encoded_packet: &Vec, + socket_addr: &SocketAddr, + ) -> Result { + let socket = match socket_addr { + SocketAddr::V4(_) => { + if let Some(socket) = self.send_ipv4.as_ref() { + socket + } else { + return Err("No IPv4 socket.".to_string()); + } + } + SocketAddr::V6(_) => { + if let Some(socket) = self.send_ipv6.as_ref() { + socket + } else { + return Err("No IPv6 socket.".to_string()); + } + } + }; + + socket + .send_to(encoded_packet, socket_addr) + .await + .map_err(|e| e.to_string()) + } }