Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
211 changes: 139 additions & 72 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,96 @@
// You should have received a copy of the GNU General Public License
// along with Open Ethereum. If not, see <http://www.gnu.org/licenses/>.

//! Ethereum Node Discovery Protocol V4

use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::hash_map::Entry;
use std::default::Default;
use std::convert::{TryFrom, TryInto};
use std::net::SocketAddr;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use ethereum_types::{H256, H520};
use keccak_hash::keccak;
use log::{debug, trace, warn};
use lru_cache::LruCache;
use network::{Error, IpFilter};
use parity_bytes::Bytes;
use parity_crypto::publickey::{KeyPair, recover, Secret, Signature, sign};
use rlp::{Rlp, RlpStream};

use parity_crypto::publickey::{KeyPair, recover, Secret, sign};
use network::Error;
use network::IpFilter;

use crate::node_table::*;
use crate::node_table::{NodeEndpoint, NodeId};
use crate::PROTOCOL_VERSION;

const ADDRESS_BYTES_SIZE: usize = 32; // Size of address type in bytes.
const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademlia].
const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover)
const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
/// Maximum Node discovery packet size
pub const MAX_DATAGRAM_SIZE: usize = 1280;
/// Minimum node discovery packet size
// TODO(niklasad1): why 4?
const MIN_DATAGRAM_SIZE: usize = HEADER_SIZE + 4;

/// Size of the `hash` and `signature` in bytes (denoted MAC)
const HEADER_MAC_SIZE: usize = ADDRESS_BYTES_SIZE + SIGNATURE_BYTES_LEN;
/// Size of the Node discovery wire protocol header
const HEADER_SIZE: usize = HEADER_MAC_SIZE + PACKET_TYPE_BYTES_LEN;
/// Size of address in bytes.
const ADDRESS_BYTES_SIZE: usize = 32;
/// Denoted by n in [Kademlia].
const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE;
/// Max iterations of discovery. (discover)
const DISCOVERY_MAX_STEPS: u16 = 8;
/// Denoted by k in [Kademlia]. Number of nodes stored in each bucket.
const BUCKET_SIZE: usize = 16;
/// Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests.
const ALPHA: usize = 3;

const PACKET_PING: u8 = 1;
const PACKET_PONG: u8 = 2;
const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4;
/// The length of the packet type in bytes
const PACKET_TYPE_BYTES_LEN: usize = 1;

/// Length of `Ethereum signature` in number of bytes
const SIGNATURE_BYTES_LEN: usize = 65;
const PING_TIMEOUT: Duration = Duration::from_millis(500);
const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2);
const EXPIRY_TIME: Duration = Duration::from_secs(20);
const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
/// Max nodes to add/ping at once
const MAX_NODES_PING: usize = 32;
const REQUEST_BACKOFF: [Duration; 4] = [
Duration::from_secs(1),
Duration::from_secs(4),
Duration::from_secs(16),
Duration::from_secs(64)
];

const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60);

const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60);
const OBSERVED_NODES_MAX_SIZE: usize = 10_000;


/// Node discovery packet kinds
// TODO: Add support for `Node Discovery v4 ENR Extension` <https://eips.ethereum.org/EIPS/eip-868>
#[derive(Debug)]
enum PacketType {
Ping = 1,
Pong = 2,
FindNode = 3,
Neighbours = 4,
}

impl TryFrom<u8> for PacketType {
type Error = Error;

fn try_from(kind: u8) -> Result<Self, Self::Error> {
match kind {
PACKET_PING => Ok(Self::Ping),
PACKET_PONG => Ok(Self::Pong),
PACKET_FIND_NODE => Ok(Self::FindNode),
PACKET_NEIGHBOURS => Ok(Self::Neighbours),
_ => Err(Error::BadProtocol),
}
}
}

#[derive(Clone, Debug)]
pub struct NodeEntry {
pub id: NodeId,
Expand Down Expand Up @@ -372,7 +415,7 @@ impl Discovery {
self.public_endpoint.to_rlp_list(&mut rlp);
node.endpoint.to_rlp_list(&mut rlp);
append_expiration(&mut rlp);
let hash = self.send_packet(PACKET_PING, node.endpoint.udp_address(), rlp.drain())?;
let hash = self.send_packet(PacketType::Ping, node.endpoint.udp_address(), rlp.drain())?;

self.in_flight_pings.insert(node.id, PingRequest {
sent_at: Instant::now(),
Expand All @@ -389,7 +432,7 @@ impl Discovery {
let mut rlp = RlpStream::new_list(2);
rlp.append(target);
append_expiration(&mut rlp);
self.send_packet(PACKET_FIND_NODE, node.endpoint.udp_address(), rlp.drain())?;
self.send_packet(PacketType::FindNode, node.endpoint.udp_address(), rlp.drain())?;

self.in_flight_find_nodes.insert(node.id, FindNodeRequest {
sent_at: Instant::now(),
Expand All @@ -401,9 +444,8 @@ impl Discovery {
Ok(())
}

fn send_packet(&mut self, packet_id: u8, address: SocketAddr, payload: Bytes) -> Result<H256, Error> {
let packet = assemble_packet(packet_id, payload, &self.secret)?;
let hash = H256::from_slice(&packet[0..32]);
fn send_packet(&mut self, packet_id: PacketType, address: SocketAddr, payload: Bytes) -> Result<H256, Error> {
let (packet, hash) = assemble_packet(packet_id, payload, &self.secret)?;
self.send_to(packet, address);
Ok(hash)
}
Expand Down Expand Up @@ -464,30 +506,13 @@ impl Discovery {
}

pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
// validate packet
if packet.len() < 32 + 65 + 4 + 1 {
return Err(Error::BadProtocol);
}

let hash_signed = keccak(&packet[32..]);
if hash_signed[..] != packet[0..32] {
return Err(Error::BadProtocol);
}

let signed = &packet[(32 + 65)..];
let signature = H520::from_slice(&packet[32..(32 + 65)]);
let node_id = recover(&signature.into(), &keccak(signed))?;
let packet_id = signed[0];
let rlp = Rlp::new(&signed[1..]);
let (node_id, payload, packet_id, signed_hash) = disassemble_packet(packet)?;
let rlp = Rlp::new(payload);
match packet_id {
PACKET_PING => self.on_ping(&rlp, node_id, from, hash_signed.as_bytes()),
PACKET_PONG => self.on_pong(&rlp, node_id, from),
PACKET_FIND_NODE => self.on_find_node(&rlp, node_id, from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, node_id, from),
_ => {
debug!(target: "discovery", "Unknown UDP packet: {}", packet_id);
Ok(None)
}
PacketType::Ping => self.on_ping(&rlp, node_id, from, signed_hash),
PacketType::Pong => self.on_pong(&rlp, node_id, from),
PacketType::FindNode => self.on_find_node(&rlp, node_id, from),
PacketType::Neighbours => self.on_neighbours(&rlp, node_id, from),
}
}

Expand All @@ -505,7 +530,7 @@ impl Discovery {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
}

fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: H256) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from);
let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) {
node_endpoint
Expand Down Expand Up @@ -537,7 +562,7 @@ impl Discovery {

response.append(&echo_hash);
append_expiration(&mut response);
self.send_packet(PACKET_PONG, from, response.drain())?;
self.send_packet(PacketType::Pong, from, response.drain())?;

let entry = NodeEntry { id: node_id, endpoint: pong_to };
if !entry.endpoint.is_valid_discovery_node() {
Expand Down Expand Up @@ -657,7 +682,7 @@ impl Discovery {
}
let mut packets = Discovery::prepare_neighbours_packets(&nearest);
for p in packets.drain(..) {
self.send_packet(PACKET_NEIGHBOURS, node.endpoint.address, p)?;
self.send_packet(PacketType::Neighbours, node.endpoint.address, p)?;
}
trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint);
Ok(())
Expand Down Expand Up @@ -831,24 +856,66 @@ fn append_expiration(rlp: &mut RlpStream) {
rlp.append(&timestamp);
}

fn assemble_packet(packet_id: u8, payload: Bytes, secret: &Secret) -> Result<Bytes, Error> {
let mut packet = Bytes::with_capacity(payload.len() + 32 + 65 + 1);
packet.resize(32 + 65, 0); // Filled in below
packet.push(packet_id);
/// Helper function to assemble node discovery packets
///
/// The packet format is: `hash || signature || packet_type || payload`, where the maximum packet length is 1280 bytes
fn assemble_packet(packet_id: PacketType, payload: Bytes, secret: &Secret) -> Result<(Bytes, H256), Error> {
let packet_len = payload.len() + HEADER_SIZE;

if !packet_has_valid_length(packet_len) {
warn!(target: "discovery", "Ignored to write discovery packet with invalid packet length: {}, expected to be in range {} - {}", packet_len, MIN_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE);
return Err(Error::BadProtocol);
}

let mut packet = Bytes::with_capacity(packet_len);
packet.resize(HEADER_MAC_SIZE, 0); // Filled in below
packet.push(packet_id as u8);
packet.extend(payload);

let hash = keccak(&packet[(32 + 65)..]);
let signature = match sign(secret, &hash) {
Ok(s) => s,
Err(e) => {
warn!(target: "discovery", "Error signing UDP packet");
return Err(Error::from(e));
}
};
packet[32..(32 + 65)].copy_from_slice(&signature[..]);
let signed_hash = keccak(&packet[32..]);
packet[0..32].copy_from_slice(signed_hash.as_bytes());
Ok(packet)
let signature = sign_payload_with_packet_id(secret, &packet[HEADER_MAC_SIZE..])?;
packet[ADDRESS_BYTES_SIZE..HEADER_MAC_SIZE].copy_from_slice(&signature[..]);
let signed_hash = keccak(&packet[ADDRESS_BYTES_SIZE..]);
packet[..ADDRESS_BYTES_SIZE].copy_from_slice(signed_hash.as_bytes());
Ok((packet, signed_hash))
}

/// Helper to disassemble node discovery packets
///
/// The packet format is: `hash || signature || packet_type || payload`, where the maximum packet length is 1280 bytes
fn disassemble_packet(packet: &[u8]) -> Result<(NodeId, &[u8], PacketType, H256), Error> {
if !packet_has_valid_length(packet.len()) {
warn!(target: "discovery", "Ignored to read discovery packet with invalid packet length: {}, expected to be in range {} - {}", packet.len(), MIN_DATAGRAM_SIZE, MAX_DATAGRAM_SIZE);
return Err(Error::BadProtocol);
}

let payload_with_packet_id = &packet[HEADER_MAC_SIZE..];
let packet_id = payload_with_packet_id[0];
let packet_id: PacketType = packet_id.try_into().map_err(|e| {
warn!(target: "discovery", "Unknown discovery packet id: {:?}", packet_id);
e
})?;

let signed_hash = keccak(&packet[ADDRESS_BYTES_SIZE..]);
if signed_hash[..] != packet[0..ADDRESS_BYTES_SIZE] {
return Err(Error::BadProtocol);
}

let signature = H520::from_slice(&packet[ADDRESS_BYTES_SIZE..HEADER_MAC_SIZE]);
let node_id = recover(&signature.into(), &keccak(payload_with_packet_id))?;

Ok((node_id, &payload_with_packet_id[1..], packet_id, signed_hash))
}

fn sign_payload_with_packet_id(secret: &Secret, payload_with_packet_id: &[u8]) -> Result<Signature, Error> {
let hash = keccak(payload_with_packet_id);
sign(secret, &hash).map_err(|e| {
warn!(target: "discovery", "Error signing UDP packet");
e.into()
})
}

fn packet_has_valid_length(packet_len: usize) -> bool {
packet_len >= MIN_DATAGRAM_SIZE && packet_len <= MAX_DATAGRAM_SIZE
}

// Selects the next node in a bucket to ping. Chooses the eligible node least recently seen.
Expand Down Expand Up @@ -1012,8 +1079,8 @@ mod tests {
let key = Random.generate();
discovery.send_find_node(&node_entries[100], key.public()).unwrap();
for payload in Discovery::prepare_neighbours_packets(&node_entries[101..116]) {
let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from.clone()).unwrap();
let (packet, _hash) = assemble_packet(PacketType::Neighbours, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from).unwrap();
}

let num_nodes = total_bucket_nodes(&discovery.node_buckets);
Expand All @@ -1024,8 +1091,8 @@ mod tests {
// FIND_NODE does not time out because it receives k results.
discovery.send_find_node(&node_entries[100], key.public()).unwrap();
for payload in Discovery::prepare_neighbours_packets(&node_entries[101..117]) {
let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from.clone()).unwrap();
let (packet, _hash) = assemble_packet(PacketType::Neighbours, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from).unwrap();
}

let num_nodes = total_bucket_nodes(&discovery.node_buckets);
Expand Down Expand Up @@ -1257,15 +1324,15 @@ mod tests {
ep1.to_rlp_list(&mut incorrect_pong_rlp);
incorrect_pong_rlp.append(&H256::zero());
append_expiration(&mut incorrect_pong_rlp);
let incorrect_pong_data = assemble_packet(
PACKET_PONG, incorrect_pong_rlp.drain(), &discovery2.secret
let (incorrect_pong_data, _hash) = assemble_packet(
PacketType::Pong, incorrect_pong_rlp.drain(), &discovery2.secret
).unwrap();
if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address.clone()).unwrap() {
if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address).unwrap() {
panic!("Expected no changes to discovery1's table because pong hash is incorrect");
}

// Delivery of valid pong response should add to routing table.
if let Some(table_updates) = discovery1.on_packet(&pong_data.payload, ep2.address.clone()).unwrap() {
if let Some(table_updates) = discovery1.on_packet(&pong_data.payload, ep2.address).unwrap() {
assert_eq!(table_updates.added.len(), 1);
assert_eq!(table_updates.removed.len(), 0);
assert!(table_updates.added.contains_key(&discovery2.id));
Expand All @@ -1286,10 +1353,10 @@ mod tests {
ep3.to_rlp_list(&mut unexpected_pong_rlp);
unexpected_pong_rlp.append(&H256::zero());
append_expiration(&mut unexpected_pong_rlp);
let unexpected_pong = assemble_packet(
PACKET_PONG, unexpected_pong_rlp.drain(), key3.secret()
let (unexpected_pong, _hash) = assemble_packet(
PacketType::Pong, unexpected_pong_rlp.drain(), key3.secret()
).unwrap();
if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address.clone()).unwrap() {
if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address).unwrap() {
panic!("Expected no changes to discovery1's table for unexpected pong");
}
}
Expand Down