Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 42 additions & 49 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ enum NodeValidity {
#[derive(Debug)]
enum BucketError {
Ourselves,
NotInTheBucket{node_entry: NodeEntry, bucket_distance: usize},
NotInTheBucket { node_entry: NodeEntry, bucket_distance: usize },
}

struct PingRequest {
Expand All @@ -137,22 +137,14 @@ struct PingRequest {
reason: PingReason
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct NodeBucket {
nodes: VecDeque<BucketEntry>, //sorted by last active
}

impl Default for NodeBucket {
fn default() -> Self {
NodeBucket::new()
}
}

impl NodeBucket {
fn new() -> Self {
NodeBucket {
nodes: VecDeque::new()
}
Self::default()
}
}

Expand All @@ -161,7 +153,7 @@ pub struct Datagram {
pub address: SocketAddr,
}

pub struct Discovery<'a> {
pub struct Discovery {
id: NodeId,
id_hash: H256,
secret: Secret,
Expand All @@ -182,16 +174,16 @@ pub struct Discovery<'a> {
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
ip_filter: IpFilter,
request_backoff: &'a [Duration],
request_backoff: &'static [Duration],
}

pub struct TableUpdates {
pub added: HashMap<NodeId, NodeEntry>,
pub removed: HashSet<NodeId>,
}

impl<'a> Discovery<'a> {
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery<'static> {
impl Discovery {
pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery {
Discovery {
id: *key.public(),
id_hash: keccak(key.public()),
Expand Down Expand Up @@ -243,7 +235,8 @@ impl<'a> Discovery<'a> {
};
let bucket = &mut self.node_buckets[dist];
bucket.nodes.iter_mut().find(|n| n.address.id == e.id)
.map_or(Err(BucketError::NotInTheBucket{node_entry: e.clone(), bucket_distance: dist}.into()), |entry| {
.ok_or_else(|| BucketError::NotInTheBucket { node_entry: e.clone(), bucket_distance: dist })
.and_then(|entry| {
entry.address = e;
entry.last_seen = Instant::now();
entry.backoff_until = Instant::now();
Expand Down Expand Up @@ -389,14 +382,14 @@ impl<'a> Discovery<'a> {
node.endpoint.to_rlp_list(&mut rlp);
append_expiration(&mut rlp);
let old_parity_hash = keccak(rlp.as_raw());
let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?;
let hash = self.send_packet(PACKET_PING, node.endpoint.udp_address(), rlp.drain())?;

self.in_flight_pings.insert(node.id, PingRequest {
sent_at: Instant::now(),
node: node.clone(),
echo_hash: hash,
deprecated_echo_hash: old_parity_hash,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance we can get rid of this you think?

Copy link
Copy Markdown
Collaborator Author

@niklasad1 niklasad1 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

the entire in_flight_pings.insert ?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean deprecated_echo_hash – I guess we must have made a change here at some point and kept the old hash around for backwards compatibility. Do we still have to include it you think? Can we ever remove it?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably but I'm not sure.

reason: reason
reason,
});

trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id);
Expand All @@ -407,7 +400,7 @@ impl<'a> Discovery<'a> {
let mut rlp = RlpStream::new_list(2);
rlp.append(target);
append_expiration(&mut rlp);
self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?;
self.send_packet(PACKET_FIND_NODE, node.endpoint.udp_address(), rlp.drain())?;

self.in_flight_find_nodes.insert(node.id, FindNodeRequest {
sent_at: Instant::now(),
Expand All @@ -419,10 +412,10 @@ impl<'a> Discovery<'a> {
Ok(())
}

fn send_packet(&mut self, packet_id: u8, address: &SocketAddr, payload: &[u8]) -> Result<H256, Error> {
fn send_packet(&mut self, packet_id: u8, address: SocketAddr, payload: Bytes) -> Result<H256, Error> {
let packet = assemble_packet(packet_id, payload, &self.secret)?;
let hash = H256::from_slice(&packet[0..32]);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit annoying, we just built one of these in assemble_packet(); maybe return the signed_hash from there?

Copy link
Copy Markdown
Collaborator Author

@niklasad1 niklasad1 Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe, right I do get your concern

In terms, of clean code if we change assemble_packet then it does clearly more than one thing but it could be quite nice if we could introduce another type Packet (or similar) to avoid copy_from_slice stuff.

But, let's address it another PR :)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really think having assemble_packet return Result<(Bytes, H256), Error> would be so bad? It's a private method after all, and it wouldn't be doing more or different work?

But, let's address it another PR :)

Absolutely.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really think having assemble_packet return Result<(Bytes, H256), Error> would be so bad? It's a private method after all, and it wouldn't be doing more or different work?

It is fine but my general opinion it should not :)

self.send_to(packet, address.clone());
self.send_to(packet, address);
Ok(hash)
}

Expand Down Expand Up @@ -498,10 +491,10 @@ impl<'a> Discovery<'a> {
let packet_id = signed[0];
let rlp = Rlp::new(&signed[1..]);
match packet_id {
PACKET_PING => self.on_ping(&rlp, &node_id, &from, hash_signed.as_bytes()),
PACKET_PONG => self.on_pong(&rlp, &node_id, &from),
PACKET_FIND_NODE => self.on_find_node(&rlp, &node_id, &from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, &node_id, &from),
PACKET_PING => self.on_ping(&rlp, node_id, from, hash_signed.as_bytes()),
PACKET_PONG => self.on_pong(&rlp, node_id, from),
PACKET_FIND_NODE => self.on_find_node(&rlp, node_id, from),
PACKET_NEIGHBOURS => self.on_neighbours(&rlp, node_id, from),
_ => {
debug!(target: "discovery", "Unknown UDP packet: {}", packet_id);
Ok(None)
Expand All @@ -523,12 +516,12 @@ impl<'a> Discovery<'a> {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
}

fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
fn on_ping(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from);
let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) {
node_endpoint
} else {
let mut address = from.clone();
let mut address = from;
// address here is the node's tcp port. If we are unable to get the `NodeEndpoint` from the `ping_from`
// rlp field then this is most likely a BootNode, set the tcp port to 0 because it can not be used for syncing.
address.set_port(0);
Expand All @@ -542,7 +535,7 @@ impl<'a> Discovery<'a> {
self.check_timestamp(timestamp)?;
let mut response = RlpStream::new_list(3);
let pong_to = NodeEndpoint {
address: from.clone(),
address: from,
udp_port: ping_from.udp_port
};
// Here the PONG's `To` field should be the node we are
Expand All @@ -555,27 +548,27 @@ impl<'a> Discovery<'a> {

response.append(&echo_hash);
append_expiration(&mut response);
self.send_packet(PACKET_PONG, from, &response.drain())?;
self.send_packet(PACKET_PONG, from, response.drain())?;

let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() };
let entry = NodeEntry { id: node_id, endpoint: pong_to };
if !entry.endpoint.is_valid_discovery_node() {
debug!(target: "discovery", "Got bad address: {:?}", entry);
} else if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
} else {
self.add_node(entry.clone());
self.add_node(entry);
}
Ok(None)
}

fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
fn on_pong(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Pong from {:?} ; node_id={:#x}", &from, node_id);
let _pong_to = NodeEndpoint::from_rlp(&rlp.at(0)?)?;
let echo_hash: H256 = rlp.val_at(1)?;
let timestamp: u64 = rlp.val_at(2)?;
self.check_timestamp(timestamp)?;

let expected_node = match self.in_flight_pings.entry(*node_id) {
let expected_node = match self.in_flight_pings.entry(node_id) {
Entry::Occupied(entry) => {
let expected_node = {
let request = entry.get();
Expand All @@ -586,7 +579,7 @@ impl<'a> Discovery<'a> {
if request.deprecated_echo_hash == echo_hash {
trace!(target: "discovery", "Got Pong from an old parity-ethereum version.");
}
Some((request.node.clone(), request.reason.clone()))
Some((request.node.clone(), request.reason))
}
};

Expand Down Expand Up @@ -629,16 +622,16 @@ impl<'a> Discovery<'a> {
}
}

fn on_find_node(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
fn on_find_node(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got FindNode from {:?}", &from);
let target: NodeId = rlp.val_at(0)?;
let timestamp: u64 = rlp.val_at(1)?;
self.check_timestamp(timestamp)?;

let node = NodeEntry {
id: node_id.clone(),
id: node_id,
endpoint: NodeEndpoint {
address: *from,
address: from,
udp_port: from.port()
}
};
Expand Down Expand Up @@ -688,7 +681,7 @@ impl<'a> Discovery<'a> {
}
let mut packets = Discovery::prepare_neighbours_packets(&nearest);
for p in packets.drain(..) {
self.send_packet(PACKET_NEIGHBOURS, &node.endpoint.address, &p)?;
self.send_packet(PACKET_NEIGHBOURS, node.endpoint.address, p)?;
}
trace!(target: "discovery", "Sent {} Neighbours to {:?}", nearest.len(), &node.endpoint);
Ok(())
Expand All @@ -711,10 +704,10 @@ impl<'a> Discovery<'a> {
packets.collect()
}

fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
fn on_neighbours(&mut self, rlp: &Rlp, node_id: NodeId, from: SocketAddr) -> Result<Option<TableUpdates>, Error> {
let results_count = rlp.at(0)?.item_count()?;

let is_expected = match self.in_flight_find_nodes.entry(*node_id) {
let is_expected = match self.in_flight_find_nodes.entry(node_id) {
Entry::Occupied(mut entry) => {
let expected = {
let request = entry.get_mut();
Expand Down Expand Up @@ -862,11 +855,11 @@ fn append_expiration(rlp: &mut RlpStream) {
rlp.append(&timestamp);
}

fn assemble_packet(packet_id: u8, bytes: &[u8], secret: &Secret) -> Result<Bytes, Error> {
let mut packet = Bytes::with_capacity(bytes.len() + 32 + 65 + 1);
fn assemble_packet(packet_id: u8, payload: Bytes, secret: &Secret) -> Result<Bytes, Error> {
let mut packet = Bytes::with_capacity(payload.len() + 32 + 65 + 1);
packet.resize(32 + 65, 0); // Filled in below
packet.push(packet_id);
packet.extend_from_slice(bytes);
packet.extend(payload);

let hash = keccak(&packet[(32 + 65)..]);
let signature = match sign(secret, &hash) {
Expand Down Expand Up @@ -1043,7 +1036,7 @@ mod tests {
let key = Random.generate();
discovery.send_find_node(&node_entries[100], key.public()).unwrap();
for payload in Discovery::prepare_neighbours_packets(&node_entries[101..116]) {
let packet = assemble_packet(PACKET_NEIGHBOURS, &payload, &key.secret()).unwrap();
let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from.clone()).unwrap();
}

Expand All @@ -1055,7 +1048,7 @@ mod tests {
// FIND_NODE does not time out because it receives k results.
discovery.send_find_node(&node_entries[100], key.public()).unwrap();
for payload in Discovery::prepare_neighbours_packets(&node_entries[101..117]) {
let packet = assemble_packet(PACKET_NEIGHBOURS, &payload, &key.secret()).unwrap();
let packet = assemble_packet(PACKET_NEIGHBOURS, payload, &key.secret()).unwrap();
discovery.on_packet(&packet, from.clone()).unwrap();
}

Expand All @@ -1065,8 +1058,8 @@ mod tests {
assert_eq!(removed, 0);

// Test bucket evictions with retries.
let request_backoff = [Duration::new(0, 0); 2];
let mut discovery = Discovery { request_backoff: &request_backoff, ..discovery };
const TEST_REQUEST_BACKOFF: [Duration; 2] = [Duration::from_secs(0); 2];
let mut discovery = Discovery { request_backoff: &TEST_REQUEST_BACKOFF, ..discovery };

for _ in 0..2 {
discovery.ping(&node_entries[101], PingReason::Default).unwrap();
Expand Down Expand Up @@ -1289,7 +1282,7 @@ mod tests {
incorrect_pong_rlp.append(&H256::zero());
append_expiration(&mut incorrect_pong_rlp);
let incorrect_pong_data = assemble_packet(
PACKET_PONG, &incorrect_pong_rlp.drain(), &discovery2.secret
PACKET_PONG, incorrect_pong_rlp.drain(), &discovery2.secret
).unwrap();
if let Some(_) = discovery1.on_packet(&incorrect_pong_data, ep2.address.clone()).unwrap() {
panic!("Expected no changes to discovery1's table because pong hash is incorrect");
Expand Down Expand Up @@ -1318,7 +1311,7 @@ mod tests {
unexpected_pong_rlp.append(&H256::zero());
append_expiration(&mut unexpected_pong_rlp);
let unexpected_pong = assemble_packet(
PACKET_PONG, &unexpected_pong_rlp.drain(), key3.secret()
PACKET_PONG, unexpected_pong_rlp.drain(), key3.secret()
).unwrap();
if let Some(_) = discovery1.on_packet(&unexpected_pong, ep3.address.clone()).unwrap() {
panic!("Expected no changes to discovery1's table for unexpected pong");
Expand Down
2 changes: 1 addition & 1 deletion util/network-devp2p/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ pub struct Host {
udp_socket: Mutex<Option<UdpSocket>>,
tcp_listener: Mutex<TcpListener>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery<'static>>>,
discovery: Mutex<Option<Discovery>>,
nodes: RwLock<NodeTable>,
handlers: RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
Expand Down