diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 49b5b6ec9ae..23d0729c80e 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -121,7 +121,7 @@ enum NodeValidity { #[derive(Debug)] enum BucketError { Ourselves, - NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize}, + NotInTheBucket { node_entry: NodeEntry, bucket_distance: usize }, } struct PingRequest { @@ -137,22 +137,14 @@ struct PingRequest { reason: PingReason } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct NodeBucket { nodes: VecDeque, //sorted by last active } -impl Default for NodeBucket { - fn default() -> Self { - NodeBucket::new() - } -} - impl NodeBucket { fn new() -> Self { - NodeBucket { - nodes: VecDeque::new() - } + Self::default() } } @@ -161,7 +153,7 @@ pub struct Datagram { pub address: SocketAddr, } -pub struct Discovery<'a> { +pub struct Discovery { id: NodeId, id_hash: H256, secret: Secret, @@ -182,7 +174,7 @@ pub struct Discovery<'a> { check_timestamps: bool, adding_nodes: Vec, ip_filter: IpFilter, - request_backoff: &'a [Duration], + request_backoff: &'static [Duration], } pub struct TableUpdates { @@ -190,8 +182,8 @@ pub struct TableUpdates { pub removed: HashSet, } -impl<'a> Discovery<'a> { - pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery<'static> { +impl Discovery { + pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery { Discovery { id: *key.public(), id_hash: keccak(key.public()), @@ -243,7 +235,8 @@ impl<'a> Discovery<'a> { }; let bucket = &mut self.node_buckets[dist]; bucket.nodes.iter_mut().find(|n| n.address.id == e.id) - .map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| { + .ok_or_else(|| BucketError::NotInTheBucket { node_entry: e.clone(), bucket_distance: dist }) + .and_then(|entry| { entry.address = e; entry.last_seen = Instant::now(); entry.backoff_until = Instant::now(); @@ -389,14 +382,14 @@ impl<'a> Discovery<'a> { node.endpoint.to_rlp_list(&mut rlp); append_expiration(&mut rlp); let old_parity_hash = keccak(rlp.as_raw()); - let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?; + let hash = self.send_packet(PACKET_PING, node.endpoint.udp_address(), rlp.drain())?; self.in_flight_pings.insert(node.id, PingRequest { sent_at: Instant::now(), node: node.clone(), echo_hash: hash, deprecated_echo_hash: old_parity_hash, - reason: reason + reason, }); trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id); @@ -407,7 +400,7 @@ impl<'a> Discovery<'a> { let mut rlp = RlpStream::new_list(2); rlp.append(target); append_expiration(&mut rlp); - self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?; + self.send_packet(PACKET_FIND_NODE, node.endpoint.udp_address(), rlp.drain())?; self.in_flight_find_nodes.insert(node.id, FindNodeRequest { sent_at: Instant::now(), @@ -419,10 +412,10 @@ impl<'a> Discovery<'a> { Ok(()) } - fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) -> Result { + fn send_packet(&mut self, packet_id: u8, address: SocketAddr, payload: Bytes) -> Result { let packet = assemble_packet(packet_id, payload, &self.secret)?; let hash = H256::from_slice(&packet[0..32]); - self.send_to(packet, address.clone()); + self.send_to(packet, address); Ok(hash) } @@ -498,10 +491,10 @@ impl<'a> Discovery<'a> { let packet_id = signed[0]; let rlp = Rlp::new(&signed[1..]); match packet_id { - PACKET_PING => self.on_ping(&rlp, &node_id, &from, hash_signed.as_bytes()), - PACKET_PONG => self.on_pong(&rlp, &node_id, &from), - PACKET_FIND_NODE => self.on_find_node(&rlp, &node_id, &from), - PACKET_NEIGHBOURS => self.on_neighbours(&rlp, &node_id, &from), + PACKET_PING => self.on_ping(&rlp, node_id, from, hash_signed.as_bytes()), + PACKET_PONG => self.on_pong(&rlp, node_id, from), + PACKET_FIND_NODE => self.on_find_node(&rlp, node_id, from), + PACKET_NEIGHBOURS => self.on_neighbours(&rlp, node_id, from), _ => { debug!(target: "discovery", "Unknown UDP packet: {}", packet_id); Ok(None) @@ -523,12 +516,12 @@ impl<'a> Discovery<'a> { entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id } - fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result, Error> { + fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: &[u8]) -> Result, Error> { trace!(target: "discovery", "Got Ping from {:?}", &from); let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) { node_endpoint } else { - let mut address = from.clone(); + let mut address = from; // address here is the node's tcp port. If we are unable to get the `NodeEndpoint` from the `ping_from` // rlp field then this is most likely a BootNode, set the tcp port to 0 because it can not be used for syncing. address.set_port(0); @@ -542,7 +535,7 @@ impl<'a> Discovery<'a> { self.check_timestamp(timestamp)?; let mut response = RlpStream::new_list(3); let pong_to = NodeEndpoint { - address: from.clone(), + address: from, udp_port: ping_from.udp_port }; // Here the PONG's `To` field should be the node we are @@ -555,27 +548,27 @@ impl<'a> Discovery<'a> { response.append(&echo_hash); append_expiration(&mut response); - self.send_packet(PACKET_PONG, from, &response.drain())?; + self.send_packet(PACKET_PONG, from, response.drain())?; - let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() }; + let entry = NodeEntry { id: node_id, endpoint: pong_to }; if !entry.endpoint.is_valid_discovery_node() { debug!(target: "discovery", "Got bad address: {:?}", entry); } else if !self.is_allowed(&entry) { debug!(target: "discovery", "Address not allowed: {:?}", entry); } else { - self.add_node(entry.clone()); + self.add_node(entry); } Ok(None) } - fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result, Error> { + fn on_pong(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result, Error> { trace!(target: "discovery", "Got Pong from {:?} ; node_id={:#x}", &from, node_id); let _pong_to = NodeEndpoint::from_rlp(&rlp.at(0)?)?; let echo_hash: H256 = rlp.val_at(1)?; let timestamp: u64 = rlp.val_at(2)?; self.check_timestamp(timestamp)?; - let expected_node = match self.in_flight_pings.entry(*node_id) { + let expected_node = match self.in_flight_pings.entry(node_id) { Entry::Occupied(entry) => { let expected_node = { let request = entry.get(); @@ -586,7 +579,7 @@ impl<'a> Discovery<'a> { if request.deprecated_echo_hash == echo_hash { trace!(target: "discovery", "Got Pong from an old parity-ethereum version."); } - Some((request.node.clone(), request.reason.clone())) + Some((request.node.clone(), request.reason)) } }; @@ -629,16 +622,16 @@ impl<'a> Discovery<'a> { } } - fn on_find_node(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result, Error> { + fn on_find_node(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result, Error> { trace!(target: "discovery", "Got FindNode from {:?}", &from); let target: NodeId = rlp.val_at(0)?; let timestamp: u64 = rlp.val_at(1)?; self.check_timestamp(timestamp)?; let node = NodeEntry { - id: node_id.clone(), + id: node_id, endpoint: NodeEndpoint { - address: *from, + address: from, udp_port: from.port() } }; @@ -688,7 +681,7 @@ impl<'a> Discovery<'a> { } let mut packets = Discovery::prepare_neighbours_packets(&nearest); for p in packets.drain(..) { - self.send_packet(PACKET_NEIGHBOURS, &node.endpoint.address, &p)?; + self.send_packet(PACKET_NEIGHBOURS, node.endpoint.address, p)?; } trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint); Ok(()) @@ -711,10 +704,10 @@ impl<'a> Discovery<'a> { packets.collect() } - fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result, Error> { + fn on_neighbours(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result, Error> { let results_count = rlp.at(0)?.item_count()?; - let is_expected = match self.in_flight_find_nodes.entry(*node_id) { + let is_expected = match self.in_flight_find_nodes.entry(node_id) { Entry::Occupied(mut entry) => { let expected = { let request = entry.get_mut(); @@ -862,11 +855,11 @@ fn append_expiration(rlp: &mut RlpStream) { rlp.append(×tamp); } -fn assemble_packet(packet_id: u8, bytes: &[u8], secret: &Secret) -> Result { - let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65 + 1); +fn assemble_packet(packet_id: u8, payload: Bytes, secret: &Secret) -> Result { + let mut packet = Bytes::with_capacity(payload.len() + 32 + 65 + 1); packet.resize(32 + 65, 0); // Filled in below packet.push(packet_id); - packet.extend_from_slice(bytes); + packet.extend(payload); let hash = keccak(&packet[(32 + 65)..]); let signature = match sign(secret, &hash) { @@ -1043,7 +1036,7 @@ mod tests { let key = Random.generate(); discovery.send_find_node(&node_entries[100], key.public()).unwrap(); for payload in Discovery::prepare_neighbours_packets(&node_entries[101..116]) { - let packet = assemble_packet(PACKET_NEIGHBOURS, &payload, &key.secret()).unwrap(); + let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap(); discovery.on_packet(&packet, from.clone()).unwrap(); } @@ -1055,7 +1048,7 @@ mod tests { // FIND_NODE does not time out because it receives k results. discovery.send_find_node(&node_entries[100], key.public()).unwrap(); for payload in Discovery::prepare_neighbours_packets(&node_entries[101..117]) { - let packet = assemble_packet(PACKET_NEIGHBOURS, &payload, &key.secret()).unwrap(); + let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap(); discovery.on_packet(&packet, from.clone()).unwrap(); } @@ -1065,8 +1058,8 @@ mod tests { assert_eq!(removed, 0); // Test bucket evictions with retries. - let request_backoff = [Duration::new(0, 0); 2]; - let mut discovery = Discovery { request_backoff: &request_backoff, ..discovery }; + const TEST_REQUEST_BACKOFF: [Duration; 2] = [Duration::from_secs(0); 2]; + let mut discovery = Discovery { request_backoff: &TEST_REQUEST_BACKOFF, ..discovery }; for _ in 0..2 { discovery.ping(&node_entries[101], PingReason::Default).unwrap(); @@ -1289,7 +1282,7 @@ mod tests { incorrect_pong_rlp.append(&H256::zero()); append_expiration(&mut incorrect_pong_rlp); let incorrect_pong_data = assemble_packet( - PACKET_PONG, &incorrect_pong_rlp.drain(), &discovery2.secret + PACKET_PONG, incorrect_pong_rlp.drain(), &discovery2.secret ).unwrap(); if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address.clone()).unwrap() { panic!("Expected no changes to discovery1's table because pong hash is incorrect"); @@ -1318,7 +1311,7 @@ mod tests { unexpected_pong_rlp.append(&H256::zero()); append_expiration(&mut unexpected_pong_rlp); let unexpected_pong = assemble_packet( - PACKET_PONG, &unexpected_pong_rlp.drain(), key3.secret() + PACKET_PONG, unexpected_pong_rlp.drain(), key3.secret() ).unwrap(); if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address.clone()).unwrap() { panic!("Expected no changes to discovery1's table for unexpected pong"); diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 0d4e422f081..170502fadc8 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -267,7 +267,7 @@ pub struct Host { udp_socket: Mutex>, tcp_listener: Mutex, sessions: Arc>>, - discovery: Mutex>>, + discovery: Mutex>, nodes: RwLock, handlers: RwLock>>, timers: RwLock>,