diff --git a/Cargo.lock b/Cargo.lock index 3769d0366b3bf..132cf41d257ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4345,7 +4345,9 @@ dependencies = [ "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/peerset/Cargo.toml b/core/peerset/Cargo.toml index efa115f5be3de..dfe68154804aa 100644 --- a/core/peerset/Cargo.toml +++ b/core/peerset/Cargo.toml @@ -13,4 +13,8 @@ libp2p = { version = "0.7.0", default-features = false } linked-hash-map = "0.5" log = "0.4" lru-cache = "0.1.2" -serde_json = "1.0.24" \ No newline at end of file +serde_json = "1.0.24" + +[dev-dependencies] +rand = "0.6" +tokio = "0.1" diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 97200c6437361..ac076be4b51f3 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -90,7 +90,7 @@ pub enum Message { } /// Opaque identifier for an incoming connection. Allocated by the network. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct IncomingIndex(pub u64); impl From for IncomingIndex { @@ -316,8 +316,9 @@ impl Peerset { /// connection implicitely means `Connect`, but incoming connections aren't cancelled by /// `dropped`. /// - /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the - /// peerset is already connected to, in which case it must not answer. + // Implementation note: because of concurrency issues, it is possible that we push a `Connect` + // message to the output channel with a `PeerId`, and that `incoming` gets called with the same + // `PeerId` before that message has been read by the user. In this situation we must not answer. pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { trace!(target: "peerset", "Incoming {:?}", peer_id); diff --git a/core/peerset/tests/fuzz.rs b/core/peerset/tests/fuzz.rs new file mode 100644 index 0000000000000..42a7f2770cc9c --- /dev/null +++ b/core/peerset/tests/fuzz.rs @@ -0,0 +1,138 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use futures::prelude::*; +use libp2p::PeerId; +use rand::distributions::{Distribution, Uniform, WeightedIndex}; +use rand::seq::IteratorRandom; +use std::{collections::HashMap, collections::HashSet, iter}; +use substrate_peerset::{IncomingIndex, Message, PeersetConfig, Peerset}; + +#[test] +fn run() { + for _ in 0..50 { + test_once(); + } +} + +fn test_once() { + // PRNG to use. + let mut rng = rand::thread_rng(); + + // Nodes that the peerset knows about. + let mut known_nodes = HashSet::::new(); + // Nodes that we have reserved. Always a subset of `known_nodes`. + let mut reserved_nodes = HashSet::::new(); + + let (mut peerset, peerset_handle) = Peerset::from_config(PeersetConfig { + bootnodes: (0 .. Uniform::new_inclusive(0, 4).sample(&mut rng)).map(|_| { + let id = PeerId::random(); + known_nodes.insert(id.clone()); + id + }).collect(), + reserved_nodes: (0 .. Uniform::new_inclusive(0, 2).sample(&mut rng)).map(|_| { + let id = PeerId::random(); + known_nodes.insert(id.clone()); + reserved_nodes.insert(id.clone()); + id + }).collect(), + reserved_only: Uniform::new_inclusive(0, 10).sample(&mut rng) == 0, + in_peers: Uniform::new_inclusive(0, 25).sample(&mut rng), + out_peers: Uniform::new_inclusive(0, 25).sample(&mut rng), + }); + + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(futures::future::poll_fn(move || -> Result<_, ()> { + // List of nodes the user of `peerset` assumes it's connected to. Always a subset of + // `known_nodes`. + let mut connected_nodes = HashSet::::new(); + // List of nodes the user of `peerset` called `incoming` with and that haven't been + // accepted or rejected yet. + let mut incoming_nodes = HashMap::::new(); + // Next id for incoming connections. + let mut next_incoming_id = IncomingIndex(0); + + // Perform a certain number of actions while checking that the state is consistent. If we + // reach the end of the loop, the run has succeeded. + for _ in 0 .. 2500 { + // Each of these weights corresponds to an action that we may perform. + let action_weights = [150, 90, 90, 30, 30, 1, 1, 4, 4]; + match WeightedIndex::new(&action_weights).unwrap().sample(&mut rng) { + // If we generate 0, poll the peerset. + 0 => match peerset.poll().unwrap() { + Async::Ready(Some(Message::Connect(id))) => { + if let Some(id) = incoming_nodes.iter().find(|(_, v)| **v == id).map(|(&id, _)| id) { + incoming_nodes.remove(&id); + } + assert!(connected_nodes.insert(id)); + } + Async::Ready(Some(Message::Drop(id))) => { connected_nodes.remove(&id); } + Async::Ready(Some(Message::Accept(n))) => + assert!(connected_nodes.insert(incoming_nodes.remove(&n).unwrap())), + Async::Ready(Some(Message::Reject(n))) => + assert!(!connected_nodes.contains(&incoming_nodes.remove(&n).unwrap())), + Async::Ready(None) => panic!(), + Async::NotReady => {} + } + + // If we generate 1, discover a new node. + 1 => { + let new_id = PeerId::random(); + known_nodes.insert(new_id.clone()); + peerset.discovered(iter::once(new_id)); + } + + // If we generate 2, adjust a random reputation. + 2 => if let Some(id) = known_nodes.iter().choose(&mut rng) { + let val = Uniform::new_inclusive(i32::min_value(), i32::max_value()).sample(&mut rng); + peerset_handle.report_peer(id.clone(), val); + } + + // If we generate 3, disconnect from a random node. + 3 => if let Some(id) = connected_nodes.iter().choose(&mut rng).cloned() { + connected_nodes.remove(&id); + peerset.dropped(id); + } + + // If we generate 4, connect to a random node. + 4 => if let Some(id) = known_nodes.iter() + .filter(|n| incoming_nodes.values().all(|m| m != *n) && !connected_nodes.contains(n)) + .choose(&mut rng) { + peerset.incoming(id.clone(), next_incoming_id.clone()); + incoming_nodes.insert(next_incoming_id.clone(), id.clone()); + next_incoming_id.0 += 1; + } + + // 5 and 6 are the reserved-only mode. + 5 => peerset_handle.set_reserved_only(true), + 6 => peerset_handle.set_reserved_only(false), + + // 7 and 8 are about switching a random node in or out of reserved mode. + 7 => if let Some(id) = known_nodes.iter().filter(|n| !reserved_nodes.contains(n)).choose(&mut rng) { + peerset_handle.add_reserved_peer(id.clone()); + reserved_nodes.insert(id.clone()); + } + 8 => if let Some(id) = reserved_nodes.iter().choose(&mut rng).cloned() { + reserved_nodes.remove(&id); + peerset_handle.remove_reserved_peer(id); + } + + _ => unreachable!() + } + } + + Ok(Async::Ready(())) + })).unwrap(); +}