diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 987eded7e92..19a7f0b76b6 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -14,9 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Open Ethereum. If not, see . +//! Ethereum Node Discovery Protocol V4 + use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::hash_map::Entry; -use std::default::Default; +use std::convert::{TryFrom, TryInto}; use std::net::SocketAddr; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -24,32 +26,49 @@ use ethereum_types::{H256, H520}; use keccak_hash::keccak; use log::{debug, trace, warn}; use lru_cache::LruCache; +use network::{Error, IpFilter}; use parity_bytes::Bytes; +use parity_crypto::publickey::{KeyPair, recover, Secret, Signature, sign}; use rlp::{Rlp, RlpStream}; -use parity_crypto::publickey::{KeyPair, recover, Secret, sign}; -use network::Error; -use network::IpFilter; - -use crate::node_table::*; +use crate::node_table::{NodeEndpoint, NodeId}; use crate::PROTOCOL_VERSION; -const ADDRESS_BYTES_SIZE: usize = 32; // Size of address type in bytes. -const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia]. -const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover) -const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. -const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. +/// Maximum Node discovery packet size pub const MAX_DATAGRAM_SIZE: usize = 1280; +/// Minimum node discovery packet size +// TODO(niklasad1): why 4? +const MIN_DATAGRAM_SIZE: usize = HEADER_SIZE + 4; + +/// Size of the `hash` and `signature` in bytes (denoted MAC) +const HEADER_MAC_SIZE: usize = ADDRESS_BYTES_SIZE + SIGNATURE_BYTES_LEN; +/// Size of the Node discovery wire protocol header +const HEADER_SIZE: usize = HEADER_MAC_SIZE + PACKET_TYPE_BYTES_LEN; +/// Size of address in bytes. +const ADDRESS_BYTES_SIZE: usize = 32; +/// Denoted by n in [Kademlia]. +const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; +/// Max iterations of discovery. (discover) +const DISCOVERY_MAX_STEPS: u16 = 8; +/// Denoted by k in [Kademlia]. Number of nodes stored in each bucket. +const BUCKET_SIZE: usize = 16; +/// Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. +const ALPHA: usize = 3; const PACKET_PING: u8 = 1; const PACKET_PONG: u8 = 2; const PACKET_FIND_NODE: u8 = 3; const PACKET_NEIGHBOURS: u8 = 4; +/// The length of the packet type in bytes +const PACKET_TYPE_BYTES_LEN: usize = 1; +/// Length of `Ethereum signature` in number of bytes +const SIGNATURE_BYTES_LEN: usize = 65; const PING_TIMEOUT: Duration = Duration::from_millis(500); const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2); const EXPIRY_TIME: Duration = Duration::from_secs(20); -const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once +/// Max nodes to add/ping at once +const MAX_NODES_PING: usize = 32; const REQUEST_BACKOFF: [Duration; 4] = [ Duration::from_secs(1), Duration::from_secs(4), @@ -57,10 +76,34 @@ const REQUEST_BACKOFF: [Duration; 4] = [ Duration::from_secs(64) ]; -const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60); - +const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60); const OBSERVED_NODES_MAX_SIZE: usize = 10_000; + +/// Node discovery packet kinds +// TODO: Add support for `Node Discovery v4 ENR Extension` +#[derive(Debug)] +enum PacketType { + Ping = 1, + Pong = 2, + FindNode = 3, + Neighbours = 4, +} + +impl TryFrom for PacketType { + type Error = Error; + + fn try_from(kind: u8) -> Result { + match kind { + PACKET_PING => Ok(Self::Ping), + PACKET_PONG => Ok(Self::Pong), + PACKET_FIND_NODE => Ok(Self::FindNode), + PACKET_NEIGHBOURS => Ok(Self::Neighbours), + _ => Err(Error::BadProtocol), + } + } +} + #[derive(Clone, Debug)] pub struct NodeEntry { pub id: NodeId, @@ -372,7 +415,7 @@ impl Discovery { self.public_endpoint.to_rlp_list(&mut rlp); node.endpoint.to_rlp_list(&mut rlp); append_expiration(&mut rlp); - let hash = self.send_packet(PACKET_PING, node.endpoint.udp_address(), rlp.drain())?; + let hash = self.send_packet(PacketType::Ping, node.endpoint.udp_address(), rlp.drain())?; self.in_flight_pings.insert(node.id, PingRequest { sent_at: Instant::now(), @@ -389,7 +432,7 @@ impl Discovery { let mut rlp = RlpStream::new_list(2); rlp.append(target); append_expiration(&mut rlp); - self.send_packet(PACKET_FIND_NODE, node.endpoint.udp_address(), rlp.drain())?; + self.send_packet(PacketType::FindNode, node.endpoint.udp_address(), rlp.drain())?; self.in_flight_find_nodes.insert(node.id, FindNodeRequest { sent_at: Instant::now(), @@ -401,9 +444,8 @@ impl Discovery { Ok(()) } - fn send_packet(&mut self, packet_id: u8, address: SocketAddr, payload: Bytes) -> Result { - let packet = assemble_packet(packet_id, payload, &self.secret)?; - let hash = H256::from_slice(&packet[0..32]); + fn send_packet(&mut self, packet_id: PacketType, address: SocketAddr, payload: Bytes) -> Result { + let (packet, hash) = assemble_packet(packet_id, payload, &self.secret)?; self.send_to(packet, address); Ok(hash) } @@ -464,30 +506,13 @@ impl Discovery { } pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result, Error> { - // validate packet - if packet.len() < 32 + 65 + 4 + 1 { - return Err(Error::BadProtocol); - } - - let hash_signed = keccak(&packet[32..]); - if hash_signed[..] != packet[0..32] { - return Err(Error::BadProtocol); - } - - let signed = &packet[(32 + 65)..]; - let signature = H520::from_slice(&packet[32..(32 + 65)]); - let node_id = recover(&signature.into(), &keccak(signed))?; - let packet_id = signed[0]; - let rlp = Rlp::new(&signed[1..]); + let (node_id, payload, packet_id, signed_hash) = disassemble_packet(packet)?; + let rlp = Rlp::new(payload); match packet_id { - PACKET_PING => self.on_ping(&rlp, node_id, from, hash_signed.as_bytes()), - PACKET_PONG => self.on_pong(&rlp, node_id, from), - PACKET_FIND_NODE => self.on_find_node(&rlp, node_id, from), - PACKET_NEIGHBOURS => self.on_neighbours(&rlp, node_id, from), - _ => { - debug!(target: "discovery", "Unknown UDP packet: {}", packet_id); - Ok(None) - } + PacketType::Ping => self.on_ping(&rlp, node_id, from, signed_hash), + PacketType::Pong => self.on_pong(&rlp, node_id, from), + PacketType::FindNode => self.on_find_node(&rlp, node_id, from), + PacketType::Neighbours => self.on_neighbours(&rlp, node_id, from), } } @@ -505,7 +530,7 @@ impl Discovery { entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id } - fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: &[u8]) -> Result, Error> { + fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: H256) -> Result, Error> { trace!(target: "discovery", "Got Ping from {:?}", &from); let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) { node_endpoint @@ -537,7 +562,7 @@ impl Discovery { response.append(&echo_hash); append_expiration(&mut response); - self.send_packet(PACKET_PONG, from, response.drain())?; + self.send_packet(PacketType::Pong, from, response.drain())?; let entry = NodeEntry { id: node_id, endpoint: pong_to }; if !entry.endpoint.is_valid_discovery_node() { @@ -657,7 +682,7 @@ impl Discovery { } let mut packets = Discovery::prepare_neighbours_packets(&nearest); for p in packets.drain(..) { - self.send_packet(PACKET_NEIGHBOURS, node.endpoint.address, p)?; + self.send_packet(PacketType::Neighbours, node.endpoint.address, p)?; } trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint); Ok(()) @@ -831,24 +856,66 @@ fn append_expiration(rlp: &mut RlpStream) { rlp.append(×tamp); } -fn assemble_packet(packet_id: u8, payload: Bytes, secret: &Secret) -> Result { - let mut packet = Bytes::with_capacity(payload.len() + 32 + 65 + 1); - packet.resize(32 + 65, 0); // Filled in below - packet.push(packet_id); +/// Helper function to assemble node discovery packets +/// +/// The packet format is: `hash || signature || packet_type || payload`, where the maximum packet length is 1280 bytes +fn assemble_packet(packet_id: PacketType, payload: Bytes, secret: &Secret) -> Result<(Bytes, H256), Error> { + let packet_len = payload.len() + HEADER_SIZE; + + if !packet_has_valid_length(packet_len) { + warn!(target: "discovery", "Ignored to write discovery packet with invalid packet length: {}, expected to be in range {} - {}", packet_len, MIN_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE); + return Err(Error::BadProtocol); + } + + let mut packet = Bytes::with_capacity(packet_len); + packet.resize(HEADER_MAC_SIZE, 0); // Filled in below + packet.push(packet_id as u8); packet.extend(payload); - let hash = keccak(&packet[(32 + 65)..]); - let signature = match sign(secret, &hash) { - Ok(s) => s, - Err(e) => { - warn!(target: "discovery", "Error signing UDP packet"); - return Err(Error::from(e)); - } - }; - packet[32..(32 + 65)].copy_from_slice(&signature[..]); - let signed_hash = keccak(&packet[32..]); - packet[0..32].copy_from_slice(signed_hash.as_bytes()); - Ok(packet) + let signature = sign_payload_with_packet_id(secret, &packet[HEADER_MAC_SIZE..])?; + packet[ADDRESS_BYTES_SIZE..HEADER_MAC_SIZE].copy_from_slice(&signature[..]); + let signed_hash = keccak(&packet[ADDRESS_BYTES_SIZE..]); + packet[..ADDRESS_BYTES_SIZE].copy_from_slice(signed_hash.as_bytes()); + Ok((packet, signed_hash)) +} + +/// Helper to disassemble node discovery packets +/// +/// The packet format is: `hash || signature || packet_type || payload`, where the maximum packet length is 1280 bytes +fn disassemble_packet(packet: &[u8]) -> Result<(NodeId, &[u8], PacketType, H256), Error> { + if !packet_has_valid_length(packet.len()) { + warn!(target: "discovery", "Ignored to read discovery packet with invalid packet length: {}, expected to be in range {} - {}", packet.len(), MIN_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE); + return Err(Error::BadProtocol); + } + + let payload_with_packet_id = &packet[HEADER_MAC_SIZE..]; + let packet_id = payload_with_packet_id[0]; + let packet_id: PacketType = packet_id.try_into().map_err(|e| { + warn!(target: "discovery", "Unknown discovery packet id: {:?}", packet_id); + e + })?; + + let signed_hash = keccak(&packet[ADDRESS_BYTES_SIZE..]); + if signed_hash[..] != packet[0..ADDRESS_BYTES_SIZE] { + return Err(Error::BadProtocol); + } + + let signature = H520::from_slice(&packet[ADDRESS_BYTES_SIZE..HEADER_MAC_SIZE]); + let node_id = recover(&signature.into(), &keccak(payload_with_packet_id))?; + + Ok((node_id, &payload_with_packet_id[1..], packet_id, signed_hash)) +} + +fn sign_payload_with_packet_id(secret: &Secret, payload_with_packet_id: &[u8]) -> Result { + let hash = keccak(payload_with_packet_id); + sign(secret, &hash).map_err(|e| { + warn!(target: "discovery", "Error signing UDP packet"); + e.into() + }) +} + +fn packet_has_valid_length(packet_len: usize) -> bool { + packet_len >= MIN_DATAGRAM_SIZE && packet_len <= MAX_DATAGRAM_SIZE } // Selects the next node in a bucket to ping. Chooses the eligible node least recently seen. @@ -1012,8 +1079,8 @@ mod tests { let key = Random.generate(); discovery.send_find_node(&node_entries[100], key.public()).unwrap(); for payload in Discovery::prepare_neighbours_packets(&node_entries[101..116]) { - let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap(); - discovery.on_packet(&packet, from.clone()).unwrap(); + let (packet, _hash) = assemble_packet(PacketType::Neighbours, payload, &key.secret()).unwrap(); + discovery.on_packet(&packet, from).unwrap(); } let num_nodes = total_bucket_nodes(&discovery.node_buckets); @@ -1024,8 +1091,8 @@ mod tests { // FIND_NODE does not time out because it receives k results. discovery.send_find_node(&node_entries[100], key.public()).unwrap(); for payload in Discovery::prepare_neighbours_packets(&node_entries[101..117]) { - let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap(); - discovery.on_packet(&packet, from.clone()).unwrap(); + let (packet, _hash) = assemble_packet(PacketType::Neighbours, payload, &key.secret()).unwrap(); + discovery.on_packet(&packet, from).unwrap(); } let num_nodes = total_bucket_nodes(&discovery.node_buckets); @@ -1257,15 +1324,15 @@ mod tests { ep1.to_rlp_list(&mut incorrect_pong_rlp); incorrect_pong_rlp.append(&H256::zero()); append_expiration(&mut incorrect_pong_rlp); - let incorrect_pong_data = assemble_packet( - PACKET_PONG, incorrect_pong_rlp.drain(), &discovery2.secret + let (incorrect_pong_data, _hash) = assemble_packet( + PacketType::Pong, incorrect_pong_rlp.drain(), &discovery2.secret ).unwrap(); - if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address.clone()).unwrap() { + if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address).unwrap() { panic!("Expected no changes to discovery1's table because pong hash is incorrect"); } // Delivery of valid pong response should add to routing table. - if let Some(table_updates) = discovery1.on_packet(&pong_data.payload, ep2.address.clone()).unwrap() { + if let Some(table_updates) = discovery1.on_packet(&pong_data.payload, ep2.address).unwrap() { assert_eq!(table_updates.added.len(), 1); assert_eq!(table_updates.removed.len(), 0); assert!(table_updates.added.contains_key(&discovery2.id)); @@ -1286,10 +1353,10 @@ mod tests { ep3.to_rlp_list(&mut unexpected_pong_rlp); unexpected_pong_rlp.append(&H256::zero()); append_expiration(&mut unexpected_pong_rlp); - let unexpected_pong = assemble_packet( - PACKET_PONG, unexpected_pong_rlp.drain(), key3.secret() + let (unexpected_pong, _hash) = assemble_packet( + PacketType::Pong, unexpected_pong_rlp.drain(), key3.secret() ).unwrap(); - if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address.clone()).unwrap() { + if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address).unwrap() { panic!("Expected no changes to discovery1's table for unexpected pong"); } }