Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions util/network-devp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
error-chain = { version = "0.12", default-features = false }
lru-cache = "0.1"

[dev-dependencies]
env_logger = "0.5"
Expand Down
138 changes: 104 additions & 34 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::{HashSet, HashMap, VecDeque};
use std::collections::hash_map::Entry;
use std::default::Default;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use lru_cache::LruCache;
use hash::keccak;
use ethereum_types::{H256, H520};
use rlp::{Rlp, RlpStream};
Expand Down Expand Up @@ -55,6 +56,8 @@ const REQUEST_BACKOFF: [Duration; 4] = [

const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24*60*60);

const OBSERVED_NODES_MAX_SIZE: usize = 10_000;
Comment thread
kirushik marked this conversation as resolved.

#[derive(Clone, Debug)]
pub struct NodeEntry {
pub id: NodeId,
Expand Down Expand Up @@ -95,7 +98,27 @@ struct FindNodeRequest {
#[derive(Clone, Copy)]
enum PingReason {
Default,
FromDiscoveryRequest(NodeId)
FromDiscoveryRequest(NodeId, NodeValidity),
}

#[derive(Clone, Copy, PartialEq)]
enum NodeCategory {
Bucket,
Observed
}

#[derive(Clone, Copy, PartialEq)]
enum NodeValidity {
Ourselves,
ValidNode(NodeCategory),
ExpiredNode(NodeCategory),
UnknownNode
}

#[derive(Debug)]
enum BucketError {
Ourselves,
NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize},
}

struct PingRequest {
Expand Down Expand Up @@ -145,6 +168,12 @@ pub struct Discovery<'a> {
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,

// Sometimes we don't want to add nodes to the NodeTable, but still want to
// keep track of them to avoid excessive pinging (happens when an unknown node sends
// a discovery request to us -- the node might be on a different net).
other_observed_nodes: LruCache<NodeId, (NodeEndpoint, Instant)>,

in_flight_pings: HashMap<NodeId, PingRequest>,
in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>,
send_queue: VecDeque<Datagram>,
Expand All @@ -171,6 +200,7 @@ impl<'a> Discovery<'a> {
discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
other_observed_nodes: LruCache::new(OBSERVED_NODES_MAX_SIZE),
in_flight_pings: HashMap::new(),
in_flight_find_nodes: HashMap::new(),
send_queue: VecDeque::new(),
Expand Down Expand Up @@ -200,41 +230,53 @@ impl<'a> Discovery<'a> {
}
}

fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
trace!(target: "discovery", "Inserting {:?}", &e);
fn update_bucket_record(&mut self, e: NodeEntry) -> Result<(), BucketError> {
Comment thread
kirushik marked this conversation as resolved.
let id_hash = keccak(e.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist,
None => {
debug!(target: "discovery", "Attempted to update own entry: {:?}", e);
return None;
return Err(BucketError::Ourselves);
}
};
let bucket = &mut self.node_buckets[dist];
bucket.nodes.iter_mut().find(|n| n.address.id == e.id)
.map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| {
entry.address = e;
entry.last_seen = Instant::now();
entry.backoff_until = Instant::now();
entry.fail_count = 0;
Ok(())
})
}

let mut added_map = HashMap::new();
let ping = {
let bucket = &mut self.node_buckets[dist];
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.last_seen = Instant::now();
node.backoff_until = Instant::now();
node.fail_count = 0;
true
} else { false };
fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
trace!(target: "discovery", "Inserting {:?}", &e);

match self.update_bucket_record(e) {
Ok(()) => None,
Err(BucketError::Ourselves) => None,
Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance))
}.map(|(node_entry, bucket_distance)| {
trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance);

if !updated {
added_map.insert(e.id, e.clone());
bucket.nodes.push_front(BucketEntry::new(e));
let mut added = HashMap::with_capacity(1);
added.insert(node_entry.id, node_entry.clone());

let node_to_ping = {
let bucket = &mut self.node_buckets[bucket_distance];
bucket.nodes.push_front(BucketEntry::new(node_entry));
if bucket.nodes.len() > BUCKET_SIZE {
select_bucket_ping(bucket.nodes.iter())
} else { None }
} else { None }
};
if let Some(node) = ping {
self.try_ping(node, PingReason::Default);
}
Some(TableUpdates { added: added_map, removed: HashSet::new() })
} else {
None
}
};
if let Some(node) = node_to_ping {
self.try_ping(node, PingReason::Default);
};
TableUpdates{added, removed: HashSet::new()}
})
}

/// Starts the discovery process at round 0
Expand Down Expand Up @@ -541,10 +583,28 @@ impl<'a> Discovery<'a> {
};

if let Some((node, ping_reason)) = expected_node {
if let PingReason::FromDiscoveryRequest(target) = ping_reason {
if let PingReason::FromDiscoveryRequest(target, validity) = ping_reason {
self.respond_with_discovery(target, &node)?;
// kirushik: I would prefer to probe the network id of the remote node here, and add it to the nodes list if it's on "our" net --
// but `on_packet` happens synchronously, so doing the full TCP handshake ceremony here is a bad idea.
// So instead we just LRU-caching most recently seen nodes to avoid unnecessary pinging
match validity {
NodeValidity::ValidNode(NodeCategory::Bucket) | NodeValidity::ExpiredNode(NodeCategory::Bucket) => {
trace!(target: "discovery", "Updating node {:?} in our Kad buckets", &node);
self.update_bucket_record(node).unwrap_or_else(|error| {
debug!(target: "discovery", "Error occured when processing ping from a bucket node: {:?}", &error);
});
},
NodeValidity::UnknownNode | NodeValidity::ExpiredNode(NodeCategory::Observed) | NodeValidity::ValidNode(NodeCategory::Observed)=> {
trace!(target: "discovery", "Updating node {:?} in the list of other_observed_nodes", &node);
self.other_observed_nodes.insert(node.id, (node.endpoint, Instant::now()));
},
NodeValidity::Ourselves => (),
}
Ok(None)
} else {
Ok(self.update_node(node))
}
Ok(self.update_node(node))
} else {
debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from);
Ok(None)
Expand All @@ -565,31 +625,41 @@ impl<'a> Discovery<'a> {
}
};

if self.is_a_valid_known_node(&node) {
self.respond_with_discovery(target, &node)?;
} else {
match self.check_validity(&node) {
NodeValidity::Ourselves => (), // It makes no sense to respond to the discovery request from ourselves
NodeValidity::ValidNode(_) => self.respond_with_discovery(target, &node)?,
// Make sure the request source is actually there and responds to pings before actually responding
self.try_ping(node, PingReason::FromDiscoveryRequest(target));
invalidity_reason => self.try_ping(node, PingReason::FromDiscoveryRequest(target, invalidity_reason))
Comment thread
kirushik marked this conversation as resolved.
}
Ok(None)
}

fn is_a_valid_known_node(&self, node: &NodeEntry) -> bool {
fn check_validity(&mut self, node: &NodeEntry) -> NodeValidity {
let id_hash = keccak(node.id);
let dist = match Discovery::distance(&self.id_hash, &id_hash) {
Some(dist) => dist,
Comment thread
kirushik marked this conversation as resolved.
None => {
debug!(target: "discovery", "Got an incoming discovery request from self: {:?}", node);
return false;
return NodeValidity::Ourselves;
}
};

let bucket = &self.node_buckets[dist];
if let Some(known_node) = bucket.nodes.iter().find(|n| n.address.id == node.id) {
debug!(target: "discovery", "Found a known node in a bucket when processing discovery: {:?}/{:?}", known_node, node);
(known_node.address.endpoint == node.endpoint) && (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)
match ((known_node.address.endpoint == node.endpoint), (known_node.last_seen.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
(true, true) => NodeValidity::ValidNode(NodeCategory::Bucket),
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Bucket),
_ => NodeValidity::UnknownNode
}
} else {
false
self.other_observed_nodes.get_mut(&node.id).map_or(NodeValidity::UnknownNode, |(endpoint, observed_at)| {
match ((node.endpoint==*endpoint), (observed_at.elapsed() < NODE_LAST_SEEN_TIMEOUT)) {
(true, true) => NodeValidity::ValidNode(NodeCategory::Observed),
(true, false) => NodeValidity::ExpiredNode(NodeCategory::Observed),
_ => NodeValidity::UnknownNode
}
})
}
}

Expand Down
1 change: 1 addition & 0 deletions util/network-devp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ extern crate keccak_hash as hash;
extern crate serde;
extern crate serde_json;
extern crate parity_snappy as snappy;
extern crate lru_cache;

#[macro_use]
extern crate error_chain;
Expand Down