diff --git a/crates/net/network-api/src/lib.rs b/crates/net/network-api/src/lib.rs index 6c6f8036daf..895fbc08923 100644 --- a/crates/net/network-api/src/lib.rs +++ b/crates/net/network-api/src/lib.rs @@ -66,9 +66,14 @@ pub trait PeersInfo: Send + Sync { /// Provides an API for managing the peers of the network. pub trait Peers: PeersInfo { - /// Adds a peer to the peer set. - fn add_peer(&self, peer: PeerId, addr: SocketAddr) { - self.add_peer_kind(peer, PeerKind::Basic, addr); + /// Adds a peer to the peer set with UDP `SocketAddr`. + fn add_peer(&self, peer: PeerId, tcp_addr: SocketAddr) { + self.add_peer_kind(peer, PeerKind::Basic, tcp_addr, None); + } + + /// Adds a peer to the peer set with TCP and UDP `SocketAddr`. + fn add_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) { + self.add_peer_kind(peer, PeerKind::Basic, tcp_addr, Some(udp_addr)); } /// Adds a trusted [`PeerId`] to the peer set. @@ -76,13 +81,24 @@ pub trait Peers: PeersInfo { /// This allows marking a peer as trusted without having to know the peer's address. fn add_trusted_peer_id(&self, peer: PeerId); - /// Adds a trusted peer to the peer set. - fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) { - self.add_peer_kind(peer, PeerKind::Trusted, addr); + /// Adds a trusted peer to the peer set with UDP `SocketAddr`. + fn add_trusted_peer(&self, peer: PeerId, tcp_addr: SocketAddr) { + self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, None); + } + + /// Adds a trusted peer with TCP and UDP `SocketAddr` to the peer set. + fn add_trusted_peer_with_udp(&self, peer: PeerId, tcp_addr: SocketAddr, udp_addr: SocketAddr) { + self.add_peer_kind(peer, PeerKind::Trusted, tcp_addr, Some(udp_addr)); } /// Adds a peer to the known peer set, with the given kind. - fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr); + fn add_peer_kind( + &self, + peer: PeerId, + kind: PeerKind, + tcp_addr: SocketAddr, + udp_addr: Option, + ); /// Returns the rpc [`PeerInfo`] for all connected [`PeerKind::Trusted`] peers. fn get_trusted_peers( diff --git a/crates/net/network-api/src/noop.rs b/crates/net/network-api/src/noop.rs index 745613f4065..a74204a3f74 100644 --- a/crates/net/network-api/src/noop.rs +++ b/crates/net/network-api/src/noop.rs @@ -71,7 +71,14 @@ impl PeersInfo for NoopNetwork { impl Peers for NoopNetwork { fn add_trusted_peer_id(&self, _peer: PeerId) {} - fn add_peer_kind(&self, _peer: PeerId, _kind: PeerKind, _addr: SocketAddr) {} + fn add_peer_kind( + &self, + _peer: PeerId, + _kind: PeerKind, + _tcp_addr: SocketAddr, + _udp_addr: Option, + ) { + } async fn get_peers_by_kind(&self, _kind: PeerKind) -> Result, NetworkError> { Ok(vec![]) diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index c320c7fe157..2e51a6b71cb 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -4,6 +4,7 @@ use crate::{ cache::LruMap, error::{NetworkError, ServiceKind}, manager::DiscoveredEvent, + peers::PeerAddr, }; use enr::Enr; use futures::StreamExt; @@ -40,7 +41,7 @@ pub struct Discovery { /// All nodes discovered via discovery protocol. /// /// These nodes can be ephemeral and are updated via the discovery protocol. - discovered_nodes: LruMap, + discovered_nodes: LruMap, /// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]). local_enr: NodeRecord, /// Handler to interact with the Discovery v4 service @@ -204,12 +205,14 @@ impl Discovery { /// Processes an incoming [`NodeRecord`] update from a discovery service fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option) { - let id = record.id; - let addr = record.tcp_addr(); + let peer_id = record.id; + let tcp_addr = record.tcp_addr(); + let udp_addr = record.udp_addr(); + let addr = PeerAddr::new(tcp_addr, Some(udp_addr)); _ = - self.discovered_nodes.get_or_insert(id, || { + self.discovered_nodes.get_or_insert(peer_id, || { self.queued_events.push_back(DiscoveryEvent::NewNode( - DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id }, + DiscoveredEvent::EventQueued { peer_id, addr, fork_id }, )); addr @@ -224,8 +227,8 @@ impl Discovery { DiscoveryUpdate::EnrForkId(node, fork_id) => { self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id)) } - DiscoveryUpdate::Removed(node) => { - self.discovered_nodes.remove(&node); + DiscoveryUpdate::Removed(peer_id) => { + self.discovered_nodes.remove(&peer_id); } DiscoveryUpdate::Batch(updates) => { for update in updates { @@ -427,7 +430,7 @@ mod tests { assert_eq!( DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id: discv4_id_2, - socket_addr: discv4_enr_2.tcp_addr(), + addr: PeerAddr::new(discv4_enr_2.tcp_addr(), Some(discv4_enr_2.udp_addr())), fork_id: None }), event_node_1 @@ -435,7 +438,7 @@ mod tests { assert_eq!( DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id: discv4_id_1, - socket_addr: discv4_enr_1.tcp_addr(), + addr: PeerAddr::new(discv4_enr_1.tcp_addr(), Some(discv4_enr_1.udp_addr())), fork_id: None }), event_node_2 diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index c957738e0ce..1093bc8c50e 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -26,7 +26,7 @@ use crate::{ message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, network::{NetworkHandle, NetworkHandleMessage}, - peers::{PeersHandle, PeersManager}, + peers::{PeerAddr, PeersHandle, PeersManager}, poll_nested_stream_with_budget, protocol::IntoRlpxSubProtocol, session::SessionManager, @@ -1030,7 +1030,7 @@ pub enum NetworkEvent { #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveredEvent { - EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, + EventQueued { peer_id: PeerId, addr: PeerAddr, fork_id: Option }, } #[derive(Debug, Default)] diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 632a6028795..ab092a16440 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -1,7 +1,13 @@ use crate::{ - config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest, - peers::PeersHandle, protocol::RlpxSubProtocol, swarm::NetworkConnectionState, - transactions::TransactionsHandle, FetchClient, + config::NetworkMode, + discovery::DiscoveryEvent, + manager::NetworkEvent, + message::PeerRequest, + peers::{PeerAddr, PeersHandle}, + protocol::RlpxSubProtocol, + swarm::NetworkConnectionState, + transactions::TransactionsHandle, + FetchClient, }; use enr::Enr; use parking_lot::Mutex; @@ -257,7 +263,14 @@ impl Peers for NetworkHandle { /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known /// set, with the given kind. - fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) { + fn add_peer_kind( + &self, + peer: PeerId, + kind: PeerKind, + tcp_addr: SocketAddr, + udp_addr: Option, + ) { + let addr = PeerAddr::new(tcp_addr, udp_addr); self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr)); } @@ -420,7 +433,7 @@ pub(crate) enum NetworkHandleMessage { /// Marks a peer as trusted. AddTrustedPeerId(PeerId), /// Adds an address for a peer, including its ID, kind, and socket address. - AddPeerAddress(PeerId, PeerKind, SocketAddr), + AddPeerAddress(PeerId, PeerKind, PeerAddr), /// Removes a peer from the peerset corresponding to the given kind. RemovePeer(PeerId, PeerKind), /// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason. diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index c7e6a05a57d..a4012f446b7 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -155,13 +155,17 @@ impl PeersManager { let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len()); let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len()); - for NodeRecord { address, tcp_port, udp_port: _, id } in trusted_nodes { + for NodeRecord { address, tcp_port, udp_port, id } in trusted_nodes { trusted_peer_ids.insert(id); - peers.entry(id).or_insert_with(|| Peer::trusted(SocketAddr::from((address, tcp_port)))); + peers.entry(id).or_insert_with(|| { + Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port))) + }); } - for NodeRecord { address, tcp_port, udp_port: _, id } in basic_nodes { - peers.entry(id).or_insert_with(|| Peer::new(SocketAddr::from((address, tcp_port)))); + for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes { + peers.entry(id).or_insert_with(|| { + Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port))) + }); } Self { @@ -198,7 +202,27 @@ impl PeersManager { /// Returns an iterator over all peers pub(crate) fn iter_peers(&self) -> impl Iterator + '_ { - self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id)) + self.peers.iter().map(|(peer_id, v)| { + NodeRecord::new_with_ports( + v.addr.tcp.ip(), + v.addr.tcp.port(), + v.addr.udp.map(|addr| addr.port()), + *peer_id, + ) + }) + } + + /// Returns the [`NodeRecord`] for the given peer id + #[allow(dead_code)] + fn peer_by_id(&self, peer_id: PeerId) -> Option { + self.peers.get(&peer_id).map(|v| { + NodeRecord::new_with_ports( + v.addr.tcp.ip(), + v.addr.tcp.port(), + v.addr.udp.map(|addr| addr.port()), + peer_id, + ) + }) } /// Returns an iterator over all peer ids for peers with the given kind @@ -338,7 +362,7 @@ impl PeersManager { Entry::Vacant(entry) => { // peer is missing in the table, we add it but mark it as to be removed after // disconnect, because we only know the outgoing port - let mut peer = Peer::with_state(addr, PeerConnectionState::In); + let mut peer = Peer::with_state(PeerAddr::tcp(addr), PeerConnectionState::In); peer.remove_after_disconnect = true; entry.insert(peer); self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); @@ -663,7 +687,7 @@ impl PeersManager { /// Called for a newly discovered peer. /// /// If the peer already exists, then the address, kind and `fork_id` will be updated. - pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: SocketAddr, fork_id: Option) { + pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option) { self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id) } @@ -676,7 +700,7 @@ impl PeersManager { /// /// If the peer already exists, then the address and kind will be updated. #[allow(dead_code)] - pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: SocketAddr) { + pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) { self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None) } @@ -687,10 +711,10 @@ impl PeersManager { &mut self, peer_id: PeerId, kind: PeerKind, - addr: SocketAddr, + addr: PeerAddr, fork_id: Option, ) { - if self.ban_list.is_banned(&peer_id, &addr.ip()) { + if self.ban_list.is_banned(&peer_id, &addr.tcp.ip()) { return } @@ -709,7 +733,7 @@ impl PeersManager { } } Entry::Vacant(entry) => { - trace!(target: "net::peers", ?peer_id, ?addr, "discovered new node"); + trace!(target: "net::peers", ?peer_id, ?addr.tcp, "discovered new node"); let mut peer = Peer::with_kind(addr, kind); peer.fork_id = fork_id; entry.insert(peer); @@ -813,7 +837,7 @@ impl PeersManager { return } - // as long as there a slots available fill them with the best peers + // as long as there are slots available fill them with the best peers while self.connection_info.has_out_capacity() { let action = { let (peer_id, peer) = match self.best_unconnected() { @@ -824,7 +848,7 @@ impl PeersManager { trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection"); peer.state = PeerConnectionState::PendingOut; - PeerAction::Connect { peer_id, remote_addr: peer.addr } + PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp } }; self.connection_info.inc_pending_out(); @@ -862,7 +886,7 @@ impl PeersManager { while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) { match cmd { PeerCommand::Add(peer_id, addr) => { - self.add_peer(peer_id, addr, None); + self.add_peer(peer_id, PeerAddr::tcp(addr), None); } PeerCommand::Remove(peer) => self.remove_peer(peer), PeerCommand::ReputationChange(peer_id, rep) => { @@ -994,11 +1018,43 @@ impl ConnectionInfo { } } +/// Represents a peer's address information. +/// +/// # Fields +/// +/// - `tcp`: A `SocketAddr` representing the peer's data transfer address. +/// - `udp`: An optional `SocketAddr` representing the peer's discover address. `None` if the peer +/// is directly connecting to us or the port is the same to `tcp`'s +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct PeerAddr { + tcp: SocketAddr, + udp: Option, +} + +impl PeerAddr { + /// Returns a new `PeerAddr` with the given `tcp` and `udp` addresses. + pub const fn new(tcp: SocketAddr, udp: Option) -> Self { + Self { tcp, udp } + } + + /// Returns a new `PeerAddr` with a `tcp` address only. + pub const fn tcp(tcp: SocketAddr) -> Self { + Self { tcp, udp: None } + } + + /// Returns a new `PeerAddr` with the given `tcp` and `udp` ports. + fn new_with_ports(ip: IpAddr, tcp_port: u16, udp_port: Option) -> Self { + let tcp = SocketAddr::new(ip, tcp_port); + let udp = udp_port.map(|port| SocketAddr::new(ip, port)); + Self::new(tcp, udp) + } +} + /// Tracks info about a single peer. #[derive(Debug, Clone)] pub struct Peer { - /// Where to reach the peer - addr: SocketAddr, + /// Where to reach the peer. + addr: PeerAddr, /// Reputation of the peer. reputation: i32, /// The state of the connection, if any. @@ -1019,11 +1075,11 @@ pub struct Peer { // === impl Peer === impl Peer { - fn new(addr: SocketAddr) -> Self { + fn new(addr: PeerAddr) -> Self { Self::with_state(addr, Default::default()) } - fn trusted(addr: SocketAddr) -> Self { + fn trusted(addr: PeerAddr) -> Self { Self { kind: PeerKind::Trusted, ..Self::new(addr) } } @@ -1032,7 +1088,7 @@ impl Peer { self.reputation } - fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self { + fn with_state(addr: PeerAddr, state: PeerConnectionState) -> Self { Self { addr, state, @@ -1045,7 +1101,7 @@ impl Peer { } } - fn with_kind(addr: SocketAddr, kind: PeerKind) -> Self { + fn with_kind(addr: PeerAddr, kind: PeerKind) -> Self { Self { kind, ..Self::new(addr) } } @@ -1266,7 +1322,7 @@ mod tests { use super::PeersManager; use crate::{ peers::{ - ConnectionInfo, InboundConnectionError, PeerAction, PeerBackoffDurations, + ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations, PeerConnectionState, }, session::PendingSessionHandshakeError, @@ -1315,7 +1371,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1330,6 +1386,37 @@ mod tests { } _ => unreachable!(), } + + let record = peers.peer_by_id(peer).unwrap(); + assert_eq!(record.tcp_addr(), socket_addr); + assert_eq!(record.udp_addr(), socket_addr); + } + + #[tokio::test] + async fn test_insert_udp() { + let peer = PeerId::random(); + let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); + let mut peers = PeersManager::default(); + peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None); + + match event!(peers) { + PeerAction::PeerAdded(peer_id) => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + match event!(peers) { + PeerAction::Connect { peer_id, remote_addr } => { + assert_eq!(peer_id, peer); + assert_eq!(remote_addr, tcp_addr); + } + _ => unreachable!(), + } + + let record = peers.peer_by_id(peer).unwrap(); + assert_eq!(record.tcp_addr(), tcp_addr); + assert_eq!(record.udp_addr(), udp_addr); } #[tokio::test] @@ -1338,7 +1425,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); peers.ban_peer(peer); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::BanPeer { peer_id } => { @@ -1360,7 +1447,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); peers.ban_peer(peer); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::BanPeer { peer_id } => { @@ -1397,7 +1484,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::new(PeersConfig::test()); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1456,7 +1543,7 @@ mod tests { let backoff_durations = PeerBackoffDurations::test(); let config = PeersConfig { backoff_durations, ..PeersConfig::test() }; let mut peers = PeersManager::new(config); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1513,7 +1600,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let config = PeersConfig::test(); let mut peers = PeersManager::new(config); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); let peer_struct = peers.peers.get_mut(&peer).unwrap(); let backoff_timestamp = peers @@ -1530,7 +1617,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let config = PeersConfig::default(); let mut peers = PeersManager::new(config); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); let peer_struct = peers.peers.get_mut(&peer).unwrap(); // Simulate a peer that was already backed off once @@ -1558,7 +1645,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1615,7 +1702,7 @@ mod tests { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let config = PeersConfig::test(); let mut peers = PeersManager::new(config.clone()); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); let peer_struct = peers.peers.get_mut(&peer).unwrap(); // Simulate a peer that was already backed off once @@ -1669,7 +1756,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1781,7 +1868,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1899,7 +1986,7 @@ mod tests { // to increase by 1 peers.on_incoming_session_established(peer, socket_addr); let p = peers.peers.get_mut(&peer).expect("peer not found"); - assert_eq!(p.addr, socket_addr); + assert_eq!(p.addr.tcp, socket_addr); assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_inbound, 1); @@ -1914,7 +2001,7 @@ mod tests { peers.on_already_connected(Direction::Incoming); let p = peers.peers.get_mut(&peer).expect("peer not found"); - assert_eq!(p.addr, socket_addr); + assert_eq!(p.addr.tcp, socket_addr); assert_eq!(peers.connection_info.num_pending_in, 0); assert_eq!(peers.connection_info.num_inbound, 1); } @@ -1924,7 +2011,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_trusted_peer(peer, socket_addr); + peers.add_trusted_peer(peer, PeerAddr::tcp(socket_addr)); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -1976,7 +2063,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); assert_eq!(peers.get_reputation(&peer), Some(0)); peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024)); @@ -1991,7 +2078,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2028,7 +2115,7 @@ mod tests { let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::PendingOut); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); let p = peers.peers.get(&peer).unwrap(); assert_eq!(p.state, PeerConnectionState::PendingOut); @@ -2041,7 +2128,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2076,7 +2163,7 @@ mod tests { let peer = PeerId::random(); let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); let mut peers = PeersManager::default(); - peers.add_peer(peer, socket_addr, None); + peers.add_peer(peer, PeerAddr::tcp(socket_addr), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2110,7 +2197,7 @@ mod tests { let ban_list = BanList::new(HashSet::new(), vec![ip]); let config = PeersConfig::default().with_ban_list(ban_list); let mut peer_manager = PeersManager::new(config); - peer_manager.add_peer(B512::default(), socket_addr, None); + peer_manager.add_peer(B512::default(), PeerAddr::tcp(socket_addr), None); assert!(peer_manager.peers.is_empty()); } @@ -2213,7 +2300,7 @@ mod tests { let basic_peer = PeerId::random(); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); - peers.add_peer(basic_peer, basic_sock, None); + peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2253,7 +2340,7 @@ mod tests { let basic_peer = PeerId::random(); let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); - peers.add_peer(basic_peer, basic_sock, None); + peers.add_peer(basic_peer, PeerAddr::tcp(basic_sock), None); match event!(peers) { PeerAction::PeerAdded(peer_id) => { @@ -2361,7 +2448,7 @@ mod tests { let config = PeersConfig::test(); let mut peer_manager = PeersManager::new(config); let peer_id = PeerId::random(); - peer_manager.add_peer(peer_id, socket_addr, None); + peer_manager.add_peer(peer_id, PeerAddr::tcp(socket_addr), None); tokio::time::sleep(Duration::from_secs(1)).await; peer_manager.tick(); @@ -2416,7 +2503,7 @@ mod tests { assert!(peer.remove_after_disconnect); // trigger discovery manually while the peer is still connected - peers.add_peer(peer_id, addr, None); + peers.add_peer(peer_id, PeerAddr::tcp(addr), None); peers.on_active_session_gracefully_closed(peer_id); @@ -2432,7 +2519,7 @@ mod tests { let mut peers = PeersManager::default(); peers.on_incoming_pending_session(addr.ip()).unwrap(); - peers.add_peer(peer_id, addr, None); + peers.add_peer(peer_id, PeerAddr::tcp(addr), None); match event!(peers) { PeerAction::PeerAdded(_) => {} @@ -2460,7 +2547,7 @@ mod tests { let mut peers = PeersManager::default(); peers.on_incoming_pending_session(addr.ip()).unwrap(); - peers.add_peer(peer_id, addr, None); + peers.add_peer(peer_id, PeerAddr::tcp(addr), None); match event!(peers) { PeerAction::PeerAdded(_) => {} @@ -2491,9 +2578,9 @@ mod tests { let config = PeersConfig::default(); let mut peer_manager = PeersManager::new(config); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); - let socket_addr = SocketAddr::new(ip, 8008); + let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008)); for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { - peer_manager.add_peer(PeerId::random(), socket_addr, None); + peer_manager.add_peer(PeerId::random(), peer_addr, None); } peer_manager.fill_outbound_slots(); @@ -2510,11 +2597,11 @@ mod tests { let config = PeersConfig::default(); let mut peer_manager = PeersManager::new(config); let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); - let socket_addr = SocketAddr::new(ip, 8008); + let peer_addr = PeerAddr::tcp(SocketAddr::new(ip, 8008)); // add more peers than allowed for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { - peer_manager.add_peer(PeerId::random(), socket_addr, None); + peer_manager.add_peer(PeerId::random(), peer_addr, None); } for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 { diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index afbf05dde69..7fa5b6d2c08 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -9,7 +9,7 @@ use crate::{ BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, PeerResponseResult, }, - peers::{PeerAction, PeersManager}, + peers::{PeerAction, PeerAddr, PeersManager}, FetchClient, }; use rand::seq::SliceRandom; @@ -274,7 +274,7 @@ where } /// Adds a peer and its address with the given kind to the peerset. - pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: SocketAddr) { + pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) { self.peers_manager.add_peer_kind(peer_id, kind, addr, None) } @@ -288,14 +288,10 @@ where /// Event hook for events received from the discovery service. fn on_discovery_event(&mut self, event: DiscoveryEvent) { match event { - DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { - peer_id, - socket_addr, - fork_id, - }) => { + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => { self.queued_messages.push_back(StateAction::DiscoveredNode { peer_id, - socket_addr, + addr, fork_id, }); } @@ -516,7 +512,7 @@ pub(crate) enum StateAction { fork_id: ForkId, }, /// A new node was found through the discovery, possibly with a `ForkId` - DiscoveredNode { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, + DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option }, /// A peer was added PeerAdded(PeerId), /// A peer was dropped diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index cfc1f841713..057cd114372 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -247,14 +247,14 @@ where } StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)), StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)), - StateAction::DiscoveredNode { peer_id, socket_addr, fork_id } => { + StateAction::DiscoveredNode { peer_id, addr, fork_id } => { // Don't try to connect to peer if node is shutting down if self.is_shutting_down() { return None } // Insert peer only if no fork id or a valid fork id if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) { - self.state_mut().peers_mut().add_peer(peer_id, socket_addr, fork_id); + self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id); } } StateAction::DiscoveredEnrForkId { peer_id, fork_id } => { diff --git a/crates/net/peers/src/node_record.rs b/crates/net/peers/src/node_record.rs index 3b6c38170d5..5f268c25391 100644 --- a/crates/net/peers/src/node_record.rs +++ b/crates/net/peers/src/node_record.rs @@ -92,6 +92,17 @@ impl NodeRecord { Self { address: addr.ip(), tcp_port: addr.port(), udp_port: addr.port(), id } } + /// Creates a new record from an ip address and ports. + pub fn new_with_ports( + ip_addr: IpAddr, + tcp_port: u16, + udp_port: Option, + id: PeerId, + ) -> Self { + let udp_port = udp_port.unwrap_or(tcp_port); + Self { address: ip_addr, tcp_port, udp_port, id } + } + /// The TCP socket address of this node #[must_use] pub const fn tcp_addr(&self) -> SocketAddr { diff --git a/crates/rpc/rpc/src/admin.rs b/crates/rpc/rpc/src/admin.rs index 772bc77e3f7..f294a52c1bf 100644 --- a/crates/rpc/rpc/src/admin.rs +++ b/crates/rpc/rpc/src/admin.rs @@ -38,7 +38,7 @@ where { /// Handler for `admin_addPeer` fn add_peer(&self, record: NodeRecord) -> RpcResult { - self.network.add_peer(record.id, record.tcp_addr()); + self.network.add_peer_with_udp(record.id, record.tcp_addr(), record.udp_addr()); Ok(true) } @@ -51,7 +51,7 @@ where /// Handler for `admin_addTrustedPeer` fn add_trusted_peer(&self, record: AnyNode) -> RpcResult { if let Some(record) = record.node_record() { - self.network.add_trusted_peer(record.id, record.tcp_addr()) + self.network.add_trusted_peer_with_udp(record.id, record.tcp_addr(), record.udp_addr()) } self.network.add_trusted_peer_id(record.peer_id()); Ok(true)