diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index 885a9ca894f8e..874e08c7bbded 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -32,7 +32,7 @@ use service::{ }; use network::{ self, multiaddr::Protocol, - config::{NetworkConfiguration, NonReservedPeerMode, NodeKeyConfig}, + config::{NetworkConfiguration, TransportConfig, NonReservedPeerMode, NodeKeyConfig}, build_multiaddr, }; use primitives::H256; @@ -354,7 +354,10 @@ fn fill_network_configuration( config.in_peers = cli.in_peers; config.out_peers = cli.out_peers; - config.enable_mdns = !is_dev && !cli.no_mdns; + config.transport = TransportConfig::Normal { + enable_mdns: !is_dev && !cli.no_mdns, + wasm_external_transport: None, + }; Ok(()) } diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index 32cca7973184c..6b5ac2e6b60f4 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -705,7 +705,7 @@ pub fn import_queue( #[cfg(test)] mod tests { use super::*; - use futures::stream::Stream as _; + use futures::{Async, stream::Stream as _}; use consensus_common::NoNetwork as DummyOracle; use network::test::*; use network::test::{Block as TestBlock, PeersClient, PeersFullClient}; @@ -756,11 +756,9 @@ mod tests { } const SLOT_DURATION: u64 = 1; - const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); pub struct AuraTestNet { - peers: Vec>>, - started: bool, + peers: Vec>, } impl TestNetFactory for AuraTestNet { @@ -772,7 +770,6 @@ mod tests { fn from_config(_config: &ProtocolConfig) -> Self { AuraTestNet { peers: Vec::new(), - started: false, } } @@ -800,38 +797,24 @@ mod tests { } } - fn uses_tokio(&self) -> bool { - true - } - fn peer(&self, i: usize) -> &Peer { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F) { + fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } - - fn started(&self) -> bool { - self.started - } - - fn set_started(&mut self, new: bool) { - self.started = new; - } } #[test] #[allow(deprecated)] fn authoring_blocks() { let _ = ::env_logger::try_init(); - let mut net = AuraTestNet::new(3); - - net.start(); + let net = AuraTestNet::new(3); let peers = &[ (0, Keyring::Alice), @@ -884,15 +867,7 @@ mod tests { .map(|_| ()) .map_err(|_| ()); - let drive_to_completion = ::tokio_timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { - net.lock().send_import_notifications(); - net.lock().sync_without_disconnects(); - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); } diff --git a/core/consensus/babe/src/lib.rs b/core/consensus/babe/src/lib.rs index 7bc66f1357704..cdaeae6adb5c8 100644 --- a/core/consensus/babe/src/lib.rs +++ b/core/consensus/babe/src/lib.rs @@ -880,7 +880,7 @@ mod tests { use super::generic::DigestItem; use client::BlockchainEvents; use test_client; - use futures::stream::Stream; + use futures::{Async, stream::Stream as _}; use log::debug; use std::time::Duration; type Item = generic::DigestItem; @@ -919,11 +919,9 @@ mod tests { } const SLOT_DURATION: u64 = 1; - const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); pub struct BabeTestNet { - peers: Vec>>, - started: bool, + peers: Vec>, } impl TestNetFactory for BabeTestNet { @@ -936,7 +934,6 @@ mod tests { debug!(target: "babe", "Creating test network from config"); BabeTestNet { peers: Vec::new(), - started: false, } } @@ -963,34 +960,22 @@ mod tests { }) } - fn uses_tokio(&self) -> bool { - true - } - fn peer(&self, i: usize) -> &Peer { trace!(target: "babe", "Retreiving a peer"); &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec> { trace!(target: "babe", "Retreiving peers"); &self.peers } - fn mut_peers>>)>( + fn mut_peers>)>( &mut self, closure: F, ) { closure(&mut self.peers); } - - fn started(&self) -> bool { - self.started - } - - fn set_started(&mut self, new: bool) { - self.started = new; - } } #[test] @@ -1003,10 +988,9 @@ mod tests { fn authoring_blocks() { drop(env_logger::try_init()); debug!(target: "babe", "checkpoint 1"); - let mut net = BabeTestNet::new(3); + let net = BabeTestNet::new(3); debug!(target: "babe", "checkpoint 2"); - net.start(); debug!(target: "babe", "checkpoint 3"); let peers = &[ @@ -1060,15 +1044,7 @@ mod tests { .map(drop) .map_err(drop); - let drive_to_completion = ::tokio_timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { - net.lock().send_import_notifications(); - net.lock().sync_without_disconnects(); - Ok(()) - }) - .map(drop) - .map_err(drop); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(drop)).unwrap(); } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 010bd7a091f58..67cf518d52067 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -20,7 +20,6 @@ use super::*; use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, PeersClient}; use network::test::{PassThroughVerifier}; use network::config::{ProtocolConfig, Roles}; -use network::consensus_gossip as network_gossip; use parking_lot::Mutex; use tokio::runtime::current_thread; use keyring::ed25519::{Keyring as AuthorityKeyring}; @@ -44,7 +43,6 @@ use fg_primitives::AuthorityId; use authorities::AuthoritySet; use finality_proof::{FinalityProofProvider, AuthoritySetForFinalityProver, AuthoritySetForFinalityChecker}; -use communication::GRANDPA_ENGINE_ID; use consensus_changes::ConsensusChanges; type PeerData = @@ -62,16 +60,14 @@ type PeerData = type GrandpaPeer = Peer; struct GrandpaTestNet { - peers: Vec>, + peers: Vec, test_config: TestApi, - started: bool, } impl GrandpaTestNet { fn new(test_config: TestApi, n_peers: usize) -> Self { let mut net = GrandpaTestNet { peers: Vec::with_capacity(n_peers), - started: false, test_config, }; let config = Self::default_config(); @@ -92,7 +88,6 @@ impl TestNetFactory for GrandpaTestNet { GrandpaTestNet { peers: Vec::new(), test_config: Default::default(), - started: false, } } @@ -163,112 +158,17 @@ impl TestNetFactory for GrandpaTestNet { } } - fn uses_tokio(&self) -> bool { - true - } - fn peer(&self, i: usize) -> &GrandpaPeer { &self.peers[i] } - fn peers(&self) -> &Vec> { + fn peers(&self) -> &Vec { &self.peers } - fn mut_peers>)>(&mut self, closure: F) { + fn mut_peers)>(&mut self, closure: F) { closure(&mut self.peers); } - - fn started(&self) -> bool { - self.started - } - - fn set_started(&mut self, new: bool) { - self.started = new; - } -} - -#[derive(Clone)] -struct MessageRouting { - inner: Arc>, - peer_id: usize, -} - -impl MessageRouting { - fn new(inner: Arc>, peer_id: usize,) -> Self { - MessageRouting { - inner, - peer_id, - } - } -} - -impl Network for MessageRouting { - type In = Box + Send>; - - /// Get a stream of messages for a specific gossip topic. - fn messages_for(&self, topic: Hash) -> Self::In { - let inner = self.inner.lock(); - let peer = inner.peer(self.peer_id); - - let messages = peer.consensus_gossip_messages_for( - GRANDPA_ENGINE_ID, - topic, - ); - - let messages = messages.map_err( - move |_| panic!("Messages for topic {} dropped too early", topic) - ); - - Box::new(messages) - } - - fn register_validator(&self, v: Arc>) { - let inner = self.inner.lock(); - let peer = inner.peer(self.peer_id); - peer.with_gossip(move |gossip, context| { - gossip.register_validator(context, GRANDPA_ENGINE_ID, v); - }); - } - - fn gossip_message(&self, topic: Hash, data: Vec, force: bool) { - let inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message( - topic, - GRANDPA_ENGINE_ID, - data, - force, - ); - } - - fn send_message(&self, who: Vec, data: Vec) { - let inner = self.inner.lock(); - let peer = inner.peer(self.peer_id); - - peer.with_gossip(move |gossip, ctx| for who in &who { - gossip.send_message( - ctx, - who, - network_gossip::ConsensusMessage { - engine_id: GRANDPA_ENGINE_ID, - data: data.clone(), - } - ) - }) - } - - fn register_gossip_message(&self, _topic: Hash, _data: Vec) { - // NOTE: only required to restore previous state on startup - // not required for tests currently - } - - fn report(&self, _who: network::PeerId, _cost_benefit: i32) { - - } - - fn announce(&self, _block: Hash) { - - } } #[derive(Clone)] @@ -440,7 +340,6 @@ impl AuthoritySetForFinalityChecker for TestApi { } const TEST_GOSSIP_DURATION: Duration = Duration::from_millis(500); -const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(substrate_primitives::ed25519::Public, u64)> { keys.iter() @@ -452,6 +351,7 @@ fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(substrate_primitives::ed25519::Pu // run the voters to completion. provide a closure to be invoked after // the voters are spawned but before blocking on them. fn run_to_completion_with( + runtime: &mut current_thread::Runtime, blocks: u64, net: Arc>, peers: &[AuthorityKeyring], @@ -462,7 +362,6 @@ fn run_to_completion_with( use parking_lot::RwLock; let mut wait_for = Vec::new(); - let mut runtime = current_thread::Runtime::new().unwrap(); let highest_finalized = Arc::new(RwLock::new(0)); @@ -472,12 +371,13 @@ fn run_to_completion_with( for (peer_id, key) in peers.iter().enumerate() { let highest_finalized = highest_finalized.clone(); - let (client, link) = { + let (client, net_service, link) = { let net = net.lock(); // temporary needed for some reason let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); ( net.peers[peer_id].client().clone(), + net.peers[peer_id].network_service().clone(), link, ) }; @@ -507,7 +407,7 @@ fn run_to_completion_with( name: Some(format!("peer#{}", peer_id)), }, link: link, - network: MessageRouting::new(net.clone(), peer_id), + network: net_service, inherent_data_providers: InherentDataProviders::new(), on_exit: Exit, telemetry_on_connect: None, @@ -524,35 +424,32 @@ fn run_to_completion_with( .map(|_| ()) .map_err(|_| ()); - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { - net.lock().send_import_notifications(); - net.lock().send_finality_notifications(); - net.lock().sync_without_disconnects(); - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); let highest_finalized = *highest_finalized.read(); highest_finalized } -fn run_to_completion(blocks: u64, net: Arc>, peers: &[AuthorityKeyring]) -> u64 { - run_to_completion_with(blocks, net, peers, |_| None) +fn run_to_completion( + runtime: &mut current_thread::Runtime, + blocks: u64, + net: Arc>, + peers: &[AuthorityKeyring] +) -> u64 { + run_to_completion_with(runtime, blocks, net, peers, |_| None) } #[test] fn finalize_3_voters_no_observers() { let _ = env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); net.peer(0).push_blocks(20, false); - net.sync(); + net.block_until_sync(&mut runtime); for i in 0..3 { assert_eq!(net.peer(i).client().info().chain.best_number, 20, @@ -560,7 +457,7 @@ fn finalize_3_voters_no_observers() { } let net = Arc::new(Mutex::new(net)); - run_to_completion(20, net.clone(), peers); + run_to_completion(&mut runtime, 20, net.clone(), peers); // normally there's no justification for finalized blocks assert!(net.lock().peer(0).client().justification(&BlockId::Number(20)).unwrap().is_none(), @@ -569,28 +466,30 @@ fn finalize_3_voters_no_observers() { #[test] fn finalize_3_voters_1_full_observer() { + let mut runtime = current_thread::Runtime::new().unwrap(); + let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 4); net.peer(0).push_blocks(20, false); - net.sync(); + net.block_until_sync(&mut runtime); let net = Arc::new(Mutex::new(net)); let mut finality_notifications = Vec::new(); - let mut runtime = current_thread::Runtime::new().unwrap(); let all_peers = peers.iter() .cloned() .map(|key| Some(Arc::new(key.into()))) .chain(::std::iter::once(None)); for (peer_id, local_key) in all_peers.enumerate() { - let (client, link) = { + let (client, net_service, link) = { let net = net.lock(); let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); ( net.peers[peer_id].client().clone(), + net.peers[peer_id].network_service().clone(), link, ) }; @@ -608,7 +507,7 @@ fn finalize_3_voters_1_full_observer() { name: Some(format!("peer#{}", peer_id)), }, link: link, - network: MessageRouting::new(net.clone(), peer_id), + network: net_service, inherent_data_providers: InherentDataProviders::new(), on_exit: Exit, telemetry_on_connect: None, @@ -623,11 +522,7 @@ fn finalize_3_voters_1_full_observer() { .map(|_| ()) .map_err(|_| ()); - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) }) - .map(|_| ()) - .map_err(|_| ()); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); } @@ -663,7 +558,7 @@ fn transition_3_voters_twice_1_full_observer() { let mut runtime = current_thread::Runtime::new().unwrap(); net.lock().peer(0).push_blocks(1, false); - net.lock().sync(); + net.lock().block_until_sync(&mut runtime); for (i, peer) in net.lock().peers().iter().enumerate() { let full_client = peer.client().as_full().expect("only full clients are used in test"); @@ -745,11 +640,12 @@ fn transition_3_voters_twice_1_full_observer() { .enumerate(); for (peer_id, local_key) in all_peers { - let (client, link) = { + let (client, net_service, link) = { let net = net.lock(); let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); ( net.peers[peer_id].client().clone(), + net.peers[peer_id].network_service().clone(), link, ) }; @@ -777,7 +673,7 @@ fn transition_3_voters_twice_1_full_observer() { name: Some(format!("peer#{}", peer_id)), }, link: link, - network: MessageRouting::new(net.clone(), peer_id), + network: net_service, inherent_data_providers: InherentDataProviders::new(), on_exit: Exit, telemetry_on_connect: None, @@ -792,30 +688,22 @@ fn transition_3_voters_twice_1_full_observer() { .map(|_| ()) .map_err(|_| ()); - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { - net.lock().send_import_notifications(); - net.lock().send_finality_notifications(); - net.lock().sync_without_disconnects(); - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); } #[test] fn justification_is_emitted_when_consensus_data_changes() { + let mut runtime = current_thread::Runtime::new().unwrap(); let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3); // import block#1 WITH consensus data change let new_authorities = vec![substrate_primitives::sr25519::Public::from_raw([42; 32])]; net.peer(0).push_authorities_change_block(new_authorities); - net.sync(); + net.block_until_sync(&mut runtime); let net = Arc::new(Mutex::new(net)); - run_to_completion(1, net.clone(), peers); + run_to_completion(&mut runtime, 1, net.clone(), peers); // ... and check that there's justification for block#1 assert!(net.lock().peer(0).client().justification(&BlockId::Number(1)).unwrap().is_some(), @@ -824,15 +712,16 @@ fn justification_is_emitted_when_consensus_data_changes() { #[test] fn justification_is_generated_periodically() { + let mut runtime = current_thread::Runtime::new().unwrap(); let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); net.peer(0).push_blocks(32, false); - net.sync(); + net.block_until_sync(&mut runtime); let net = Arc::new(Mutex::new(net)); - run_to_completion(32, net.clone(), peers); + run_to_completion(&mut runtime, 32, net.clone(), peers); // when block#32 (justification_period) is finalized, justification // is required => generated @@ -862,6 +751,7 @@ fn consensus_changes_works() { #[test] fn sync_justifications_on_change_blocks() { + let mut runtime = current_thread::Runtime::new().unwrap(); let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let peers_b = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob]; let voters = make_ids(peers_b); @@ -886,7 +776,7 @@ fn sync_justifications_on_change_blocks() { // add more blocks on top of it (until we have 25) net.peer(0).push_blocks(4, false); - net.sync(); + net.block_until_sync(&mut runtime); for i in 0..4 { assert_eq!(net.peer(i).client().info().chain.best_number, 25, @@ -894,7 +784,7 @@ fn sync_justifications_on_change_blocks() { } let net = Arc::new(Mutex::new(net)); - run_to_completion(25, net.clone(), peers_a); + run_to_completion(&mut runtime, 25, net.clone(), peers_a); // the first 3 peers are grandpa voters and therefore have already finalized // block 21 and stored a justification @@ -903,14 +793,20 @@ fn sync_justifications_on_change_blocks() { } // the last peer should get the justification by syncing from other peers - while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { - net.lock().sync_without_disconnects(); - } + runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> { + if net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { + net.lock().poll(); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap() } #[test] fn finalizes_multiple_pending_changes_in_order() { let _ = env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let peers_b = &[AuthorityKeyring::Dave, AuthorityKeyring::Eve, AuthorityKeyring::Ferdie]; @@ -956,7 +852,7 @@ fn finalizes_multiple_pending_changes_in_order() { // add more blocks on top of it (until we have 30) net.peer(0).push_blocks(4, false); - net.sync(); + net.block_until_sync(&mut runtime); // all peers imported both change blocks for i in 0..6 { @@ -965,11 +861,12 @@ fn finalizes_multiple_pending_changes_in_order() { } let net = Arc::new(Mutex::new(net)); - run_to_completion(30, net.clone(), all_peers); + run_to_completion(&mut runtime, 30, net.clone(), all_peers); } #[test] fn doesnt_vote_on_the_tip_of_the_chain() { + let mut runtime = current_thread::Runtime::new().unwrap(); let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let voters = make_ids(peers_a); let api = TestApi::new(voters); @@ -977,7 +874,7 @@ fn doesnt_vote_on_the_tip_of_the_chain() { // add 100 blocks net.peer(0).push_blocks(100, false); - net.sync(); + net.block_until_sync(&mut runtime); for i in 0..3 { assert_eq!(net.peer(i).client().info().chain.best_number, 100, @@ -985,7 +882,7 @@ fn doesnt_vote_on_the_tip_of_the_chain() { } let net = Arc::new(Mutex::new(net)); - let highest = run_to_completion(75, net.clone(), peers_a); + let highest = run_to_completion(&mut runtime, 75, net.clone(), peers_a); // the highest block to be finalized will be 3/4 deep in the unfinalized chain assert_eq!(highest, 75); @@ -993,6 +890,8 @@ fn doesnt_vote_on_the_tip_of_the_chain() { #[test] fn force_change_to_new_set() { + let _ = env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); // two of these guys are offline. let genesis_authorities = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie, AuthorityKeyring::One, AuthorityKeyring::Two]; let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; @@ -1004,49 +903,44 @@ fn force_change_to_new_set() { let net = GrandpaTestNet::new(api, 3); let net = Arc::new(Mutex::new(net)); - let runner_net = net.clone(); - let add_blocks = move |_| { - net.lock().peer(0).push_blocks(1, false); - - { - // add a forced transition at block 12. - let parent_hash = net.lock().peer(0).client().info().chain.best_hash; - forced_transitions.lock().insert(parent_hash, (0, ScheduledChange { - next_authorities: voters.clone(), - delay: 10, - })); - - // add a normal transition too to ensure that forced changes take priority. - normal_transitions.lock().insert(parent_hash, ScheduledChange { - next_authorities: make_ids(genesis_authorities), - delay: 5, - }); - } + net.lock().peer(0).push_blocks(1, false); - net.lock().peer(0).push_blocks(25, false); - net.lock().sync(); + { + // add a forced transition at block 12. + let parent_hash = net.lock().peer(0).client().info().chain.best_hash; + forced_transitions.lock().insert(parent_hash, (0, ScheduledChange { + next_authorities: voters.clone(), + delay: 10, + })); + + // add a normal transition too to ensure that forced changes take priority. + normal_transitions.lock().insert(parent_hash, ScheduledChange { + next_authorities: make_ids(genesis_authorities), + delay: 5, + }); + } - for (i, peer) in net.lock().peers().iter().enumerate() { - assert_eq!(peer.client().info().chain.best_number, 26, - "Peer #{} failed to sync", i); + net.lock().peer(0).push_blocks(25, false); + net.lock().block_until_sync(&mut runtime); - let full_client = peer.client().as_full().expect("only full clients are used in test"); - let set: AuthoritySet = crate::aux_schema::load_authorities( - #[allow(deprecated)] - &**full_client.backend() - ).unwrap(); + for (i, peer) in net.lock().peers().iter().enumerate() { + assert_eq!(peer.client().info().chain.best_number, 26, + "Peer #{} failed to sync", i); - assert_eq!(set.current(), (1, voters.as_slice())); - assert_eq!(set.pending_changes().count(), 0); - } + let full_client = peer.client().as_full().expect("only full clients are used in test"); + let set: AuthoritySet = crate::aux_schema::load_authorities( + #[allow(deprecated)] + &**full_client.backend() + ).unwrap(); - None - }; + assert_eq!(set.current(), (1, voters.as_slice())); + assert_eq!(set.pending_changes().count(), 0); + } // it will only finalize if the forced transition happens. // we add_blocks after the voters are spawned because otherwise // the link-halfs have the wrong AuthoritySet - run_to_completion_with(25, runner_net, peers_a, add_blocks); + run_to_completion(&mut runtime, 25, net, peers_a); } #[test] @@ -1155,6 +1049,7 @@ fn voter_persists_its_votes() { use futures::sync::mpsc; let _ = env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); // we have two authorities but we'll only be running the voter for alice // we are going to be listening for the prevotes it casts @@ -1164,13 +1059,11 @@ fn voter_persists_its_votes() { // alice has a chain with 20 blocks let mut net = GrandpaTestNet::new(TestApi::new(voters.clone()), 2); net.peer(0).push_blocks(20, false); - net.sync(); + net.block_until_sync(&mut runtime); assert_eq!(net.peer(0).client().info().chain.best_number, 20, "Peer #{} failed to sync", 0); - let mut runtime = current_thread::Runtime::new().unwrap(); - let client = net.peer(0).client().clone(); let net = Arc::new(Mutex::new(net)); @@ -1195,7 +1088,7 @@ fn voter_persists_its_votes() { name: Some(format!("peer#{}", 0)), }, link: link, - network: MessageRouting::new(net.clone(), 0), + network: net.lock().peers[0].network_service().clone(), inherent_data_providers: InherentDataProviders::new(), on_exit: Exit, telemetry_on_connect: None, @@ -1253,9 +1146,8 @@ fn voter_persists_its_votes() { set_state }; - let routing = MessageRouting::new(net.clone(), 1); let (network, routing_work) = communication::NetworkBridge::new( - routing, + net.lock().peers[1].network_service().clone(), config.clone(), set_state, Exit, @@ -1290,25 +1182,33 @@ fn voter_persists_its_votes() { // we push 20 more blocks to alice's chain net.lock().peer(0).push_blocks(20, false); - net.lock().sync(); - assert_eq!(net.lock().peer(0).client().info().chain.best_number, 40, - "Peer #{} failed to sync", 0); - - #[allow(deprecated)] - let block_30_hash = - net.lock().peer(0).client().as_full().unwrap().backend().blockchain().hash(30).unwrap().unwrap(); + let net2 = net.clone(); + let net = net.clone(); + let voter_tx = voter_tx.clone(); + let round_tx = round_tx.clone(); + future::Either::A(tokio_timer::Interval::new_interval(Duration::from_millis(200)) + .take_while(move |_| { + Ok(net2.lock().peer(1).client().info().chain.best_number != 40) + }) + .for_each(|_| Ok(())) + .and_then(move |_| { + #[allow(deprecated)] + let block_30_hash = + net.lock().peer(0).client().as_full().unwrap().backend().blockchain().hash(30).unwrap().unwrap(); - // we restart alice's voter - voter_tx.unbounded_send(()).unwrap(); + // we restart alice's voter + voter_tx.unbounded_send(()).unwrap(); - // and we push our own prevote for block 30 - let prevote = grandpa::Prevote { - target_number: 30, - target_hash: block_30_hash, - }; + // and we push our own prevote for block 30 + let prevote = grandpa::Prevote { + target_number: 30, + target_hash: block_30_hash, + }; - round_tx.lock().start_send(grandpa::Message::Prevote(prevote)).unwrap(); + round_tx.lock().start_send(grandpa::Message::Prevote(prevote)).unwrap(); + Ok(()) + }).map_err(|_| panic!())) } else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 { // the next message we receive should be our own prevote @@ -1324,6 +1224,8 @@ fn voter_persists_its_votes() { // therefore we won't ever receive it again since it will be a // known message on the gossip layer + future::Either::B(future::ok(())) + } else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 { // we then receive a precommit from alice for block 15 // even though we casted a prevote for block 30 @@ -1336,23 +1238,17 @@ fn voter_persists_its_votes() { // signal exit exit_tx.clone().lock().take().unwrap().send(()).unwrap(); + + future::Either::B(future::ok(())) + + } else { + panic!() } - Ok(()) }).map_err(|_| ())); } - let net = net.clone(); - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { - net.lock().send_import_notifications(); - net.lock().send_finality_notifications(); - net.lock().sync_without_disconnects(); - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ()); runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap(); @@ -1361,12 +1257,13 @@ fn voter_persists_its_votes() { #[test] fn finalize_3_voters_1_light_observer() { let _ = env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let authorities = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie]; let voters = make_ids(authorities); let mut net = GrandpaTestNet::new(TestApi::new(voters), 4); net.peer(0).push_blocks(20, false); - net.sync(); + net.block_until_sync(&mut runtime); for i in 0..4 { assert_eq!(net.peer(i).client().info().chain.best_number, 20, @@ -1380,7 +1277,7 @@ fn finalize_3_voters_1_light_observer() { .take_while(|n| Ok(n.header.number() < &20)) .collect(); - run_to_completion_with(20, net.clone(), authorities, |executor| { + run_to_completion_with(&mut runtime, 20, net.clone(), authorities, |executor| { executor.spawn( run_grandpa_observer( Config { @@ -1390,7 +1287,7 @@ fn finalize_3_voters_1_light_observer() { name: Some("observer".to_string()), }, link, - MessageRouting::new(net.clone(), 3), + net.lock().peers[3].network_service().clone(), Exit, ).unwrap() ).unwrap(); @@ -1402,6 +1299,7 @@ fn finalize_3_voters_1_light_observer() { #[test] fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let peers = &[AuthorityKeyring::Alice]; let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1); @@ -1411,14 +1309,18 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() { // && instead fetches finality proof for block #1 net.peer(0).push_authorities_change_block(vec![substrate_primitives::sr25519::Public::from_raw([42; 32])]); let net = Arc::new(Mutex::new(net)); - run_to_completion(1, net.clone(), peers); - net.lock().sync_without_disconnects(); + run_to_completion(&mut runtime, 1, net.clone(), peers); + net.lock().block_until_sync(&mut runtime); // check that the block#1 is finalized on light client - while net.lock().peer(1).client().info().chain.finalized_number != 1 { - net.lock().tick_peer(1); - net.lock().sync_without_disconnects(); - } + runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> { + if net.lock().peer(1).client().info().chain.finalized_number == 1 { + Ok(Async::Ready(())) + } else { + net.lock().poll(); + Ok(Async::NotReady) + } + })).unwrap() } #[test] @@ -1427,6 +1329,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ const FORCE_CHANGE: bool = true; let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); // two of these guys are offline. let genesis_authorities = if FORCE_CHANGE { @@ -1452,40 +1355,36 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ let net = GrandpaTestNet::new(api, 3); let net = Arc::new(Mutex::new(net)); - let runner_net = net.clone(); - let add_blocks = move |_| { - net.lock().peer(0).push_blocks(1, false); // best is #1 - - // add a forced transition at block 5. - if FORCE_CHANGE { - let parent_hash = net.lock().peer(0).client().info().chain.best_hash; - forced_transitions.lock().insert(parent_hash, (0, ScheduledChange { - next_authorities: voters.clone(), - delay: 3, - })); - } + net.lock().peer(0).push_blocks(1, false); // best is #1 - // ensure block#10 enacts authorities set change => justification is generated - // normally it will reach light client, but because of the forced change, it will not - net.lock().peer(0).push_blocks(8, false); // best is #9 - net.lock().peer(0).push_authorities_change_block( - vec![substrate_primitives::sr25519::Public::from_raw([42; 32])] - ); // #10 - net.lock().peer(0).push_blocks(1, false); // best is #11 - net.lock().sync_without_disconnects(); + // add a forced transition at block 5. + if FORCE_CHANGE { + let parent_hash = net.lock().peer(0).client().info().chain.best_hash; + forced_transitions.lock().insert(parent_hash, (0, ScheduledChange { + next_authorities: voters.clone(), + delay: 3, + })); + } - None - }; + // ensure block#10 enacts authorities set change => justification is generated + // normally it will reach light client, but because of the forced change, it will not + net.lock().peer(0).push_blocks(8, false); // best is #9 + net.lock().peer(0).push_authorities_change_block( + vec![substrate_primitives::sr25519::Public::from_raw([42; 32])] + ); // #10 + net.lock().peer(0).push_blocks(1, false); // best is #11 + net.lock().block_until_sync(&mut runtime); // finalize block #11 on full clients - run_to_completion_with(11, runner_net.clone(), peers_a, add_blocks); + run_to_completion(&mut runtime, 11, net.clone(), peers_a); + // request finalization by light client - runner_net.lock().add_light_peer(&GrandpaTestNet::default_config()); - runner_net.lock().sync_without_disconnects(); + net.lock().add_light_peer(&GrandpaTestNet::default_config()); + net.lock().block_until_sync(&mut runtime); // check block, finalized on light client assert_eq!( - runner_net.lock().peer(3).client().info().chain.finalized_number, + net.lock().peer(3).client().info().chain.finalized_number, if FORCE_CHANGE { 0 } else { 10 }, ); } @@ -1493,20 +1392,19 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ #[test] fn voter_catches_up_to_latest_round_when_behind() { let _ = env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob]; let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); net.peer(0).push_blocks(50, false); - net.sync(); + net.block_until_sync(&mut runtime); let net = Arc::new(Mutex::new(net)); let mut finality_notifications = Vec::new(); - let mut runtime = current_thread::Runtime::new().unwrap(); - - let voter = |local_key, peer_id, link, net| -> Box + Send> { + let voter = |local_key, peer_id, link, net: Arc>| -> Box + Send> { let grandpa_params = GrandpaParams { config: Config { gossip_duration: TEST_GOSSIP_DURATION, @@ -1515,7 +1413,7 @@ fn voter_catches_up_to_latest_round_when_behind() { name: Some(format!("peer#{}", peer_id)), }, link: link, - network: MessageRouting::new(net, peer_id), + network: net.lock().peer(peer_id).network_service().clone(), inherent_data_providers: InherentDataProviders::new(), on_exit: Exit, telemetry_on_connect: None, @@ -1590,10 +1488,6 @@ fn voter_catches_up_to_latest_round_when_behind() { }) }; - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) }) - .map(|_| ()) - .map_err(|_| ()); - + let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) }); let _ = runtime.block_on(test.select(drive_to_completion).map_err(|_| ())).unwrap(); } diff --git a/core/network/src/config.rs b/core/network/src/config.rs index fd0a3a924eef1..be7f32d995721 100644 --- a/core/network/src/config.rs +++ b/core/network/src/config.rs @@ -121,16 +121,8 @@ pub struct NetworkConfiguration { pub client_version: String, /// Name of the node. Sent over the wire for debugging purposes. pub node_name: String, - /// If true, the network will use mDNS to discover other libp2p nodes on the local network - /// and connect to them if they support the same chain. - pub enable_mdns: bool, - /// Optional external implementation of a libp2p transport. Used in WASM contexts where we need - /// some binding between the networking provided by the operating system or environment and - /// libp2p. - /// - /// This parameter exists whatever the target platform is, but it is expected to be set to - /// `Some` only when compiling for WASM. - pub wasm_external_transport: Option, + /// Configuration for the transport layer. + pub transport: TransportConfig, } impl Default for NetworkConfiguration { @@ -148,8 +140,10 @@ impl Default for NetworkConfiguration { non_reserved_mode: NonReservedPeerMode::Accept, client_version: "unknown".into(), node_name: "unknown".into(), - enable_mdns: false, - wasm_external_transport: None, + transport: TransportConfig::Normal { + enable_mdns: false, + wasm_external_transport: None, + }, } } } @@ -170,6 +164,40 @@ impl NetworkConfiguration { ]; config } + + /// Create new default configuration for localhost-only connection with random port (useful for testing) + pub fn new_memory() -> NetworkConfiguration { + let mut config = NetworkConfiguration::new(); + config.listen_addresses = vec![ + iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .chain(iter::once(Protocol::Tcp(0))) + .collect() + ]; + config + } +} + +/// Configuration for the transport layer. +#[derive(Clone)] +pub enum TransportConfig { + /// Normal transport mode. + Normal { + /// If true, the network will use mDNS to discover other libp2p nodes on the local network + /// and connect to them if they support the same chain. + enable_mdns: bool, + + /// Optional external implementation of a libp2p transport. Used in WASM contexts where we + /// need some binding between the networking provided by the operating system or environment + /// and libp2p. + /// + /// This parameter exists whatever the target platform is, but it is expected to be set to + /// `Some` only when compiling for WASM. + wasm_external_transport: Option, + }, + + /// Only allow connections within the same process. + /// Only addresses of the form `/memory/...` will be supported. + MemoryOnly, } /// The policy for connections to non-reserved peers. diff --git a/core/network/src/discovery.rs b/core/network/src/discovery.rs index 88d653f68f84b..a88de4d146d40 100644 --- a/core/network/src/discovery.rs +++ b/core/network/src/discovery.rs @@ -22,7 +22,7 @@ use libp2p::kad::{GetValueResult, Kademlia, KademliaOut, PutValueResult}; use libp2p::multihash::Multihash; use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn}; -use std::{cmp, num::NonZeroU8, time::Duration}; +use std::{cmp, collections::VecDeque, num::NonZeroU8, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; @@ -37,6 +37,8 @@ pub struct DiscoveryBehaviour { next_kad_random_query: Delay, /// After `next_kad_random_query` triggers, the next one triggers after this duration. duration_to_next_kad: Duration, + /// Discovered nodes to return. + discoveries: VecDeque, /// `Clock` instance that uses the current execution context's source of time. clock: Clock, /// Identity of our local node. @@ -59,6 +61,7 @@ impl DiscoveryBehaviour { kademlia, next_kad_random_query: Delay::new(clock.now()), duration_to_next_kad: Duration::from_secs(1), + discoveries: VecDeque::new(), clock, local_peer_id: local_public_key.into_peer_id(), } @@ -72,8 +75,11 @@ impl DiscoveryBehaviour { /// Adds a hard-coded address for the given peer, that never expires. /// /// This adds an entry to the parameter that was passed to `new`. + /// + /// If we didn't know this address before, also generates a `Discovered` event. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) { + self.discoveries.push_back(peer_id.clone()); self.user_defined.push((peer_id, addr)); } } @@ -181,6 +187,12 @@ where Self::OutEvent, >, > { + // Immediately process the content of `discovered`. + if let Some(peer_id) = self.discoveries.pop_front() { + let ev = DiscoveryOut::Discovered(peer_id); + return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + // Poll the stream that fires when we need to start a random Kademlia query. loop { match self.next_kad_random_query.poll() { diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 405eba3e9ee55..fbe095e2ecdf9 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -23,7 +23,7 @@ use std::time::Duration; use log::{warn, error, info}; use libp2p::core::swarm::NetworkBehaviour; use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; -use libp2p::multihash::Multihash; +use libp2p::{Multiaddr, multihash::Multihash}; use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; use crate::protocol_behaviour::ProtocolBehaviour; @@ -40,7 +40,7 @@ use crate::protocol::{event::Event, message::Message}; use crate::protocol::on_demand::RequestData; use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo}; use crate::protocol::sync::SyncState; -use crate::config::Params; +use crate::config::{Params, TransportConfig}; use crate::error::Error; use crate::protocol::specialization::NetworkSpecialization; @@ -197,12 +197,19 @@ impl, H: ExHashT> NetworkWorker user_agent, local_public, known_addresses, - params.network_config.enable_mdns - ); - let (transport, bandwidth) = transport::build_transport( - local_identity, - params.network_config.wasm_external_transport + match params.network_config.transport { + TransportConfig::MemoryOnly => false, + TransportConfig::Normal { enable_mdns, .. } => enable_mdns, + } ); + let (transport, bandwidth) = { + let (config_mem, config_wasm) = match params.network_config.transport { + TransportConfig::MemoryOnly => (true, None), + TransportConfig::Normal { wasm_external_transport, .. } => + (false, wasm_external_transport) + }; + transport::build_transport(local_identity, config_mem, config_wasm) + }; (Swarm::::new(transport, behaviour, local_peer_id.clone()), bandwidth) }; @@ -281,6 +288,11 @@ impl, H: ExHashT> NetworkWorker self.network_service.lock().user_protocol_mut().num_sync_peers() } + /// Adds an address for a node. + pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { + self.network_service.lock().add_known_address(peer_id, addr); + } + /// Return a `NetworkService` that can be shared through the code base and can be used to /// manipulate the worker. pub fn service(&self) -> &Arc> { @@ -349,6 +361,13 @@ impl, H: ExHashT> NetworkServic let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who)); } + /// Request a justification for the given block. + pub fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + let _ = self + .protocol_sender + .unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number)); + } + /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) where F: FnOnce(&mut S, &mut dyn Context) + Send + 'static diff --git a/core/network/src/test/block_import.rs b/core/network/src/test/block_import.rs index b5a03ae23a5a3..4155fc22613a1 100644 --- a/core/network/src/test/block_import.rs +++ b/core/network/src/test/block_import.rs @@ -16,16 +16,14 @@ //! Testing block import logic. -use consensus::import_queue::{import_single_block, BasicQueue, BlockImportError, BlockImportResult}; +use consensus::import_queue::{ + import_single_block, IncomingBlock, BasicQueue, BlockImportError, BlockImportResult +}; use test_client::{self, prelude::*}; use test_client::runtime::{Block, Hash}; use runtime_primitives::generic::BlockId; use super::*; -struct TestLink {} - -impl Link for TestLink {} - fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock) { let client = test_client::new(); let block = client.new_block(Default::default()).unwrap().bake().unwrap(); @@ -77,7 +75,7 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = Arc::new(PassThroughVerifier(true)); - let mut queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None); + let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None); drop(queue); } } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index ce6d521ac58b4..289f23bc22b43 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -21,10 +21,10 @@ mod block_import; #[cfg(test)] mod sync; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::HashMap; use std::sync::Arc; -use crate::AlwaysBadChecker; +use crate::build_multiaddr; use log::trace; use crate::chain::FinalityProofProvider; use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications}; @@ -32,26 +32,24 @@ use client::{in_mem::Backend as InMemoryBackend, error::Result as ClientResult}; use client::block_builder::BlockBuilder; use client::backend::AuxStore; use crate::config::Roles; -use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock}; +use consensus::import_queue::BasicQueue; use consensus::import_queue::{ - Link, SharedBlockImport, SharedJustificationImport, Verifier, SharedFinalityProofImport, + SharedBlockImport, SharedJustificationImport, Verifier, SharedFinalityProofImport, SharedFinalityProofRequestBuilder, }; +use consensus::block_import::BlockImport; use consensus::{Error as ConsensusError, well_known_cache_keys::{self, Id as CacheKeyId}}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; -use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; -use futures::{prelude::*, sync::{mpsc, oneshot}}; -use log::info; -use crate::message::Message; +use futures::prelude::*; +use crate::{NetworkWorker, NetworkService, ProtocolId}; +use crate::config::{NetworkConfiguration, TransportConfig}; use libp2p::PeerId; -use parking_lot::{Mutex, RwLock}; use primitives::{H256, Blake2Hasher}; -use crate::SyncState; -use crate::protocol::{Context, Protocol, ProtocolConfig, CustomMessageOutcome, NetworkOut}; +use crate::protocol::{Context, ProtocolConfig}; use runtime_primitives::generic::{BlockId, OpaqueDigestItemId}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; -use runtime_primitives::{Justification, ConsensusEngineId}; -use crate::service::{NetworkMsg, ProtocolMsg, TransactionPool}; +use runtime_primitives::Justification; +use crate::service::TransactionPool; use crate::specialization::NetworkSpecialization; use test_client::{self, AccountKeyring}; @@ -94,84 +92,6 @@ impl Verifier for PassThroughVerifier { } } -/// A link implementation that does nothing. -pub struct NoopLink { } - -impl Link for NoopLink { } - -/// A link implementation that connects to the network. -#[derive(Clone)] -pub struct NetworkLink> { - /// The protocol sender - pub(crate) protocol_sender: mpsc::UnboundedSender>, - /// The network sender - pub(crate) network_sender: mpsc::UnboundedSender>, -} - -impl> Link for NetworkLink { - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlockImportedSync(hash.clone(), number)); - } - - fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error)); - } - - fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); - if !success { - info!("Invalid justification provided by {} for #{}", who, hash); - let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); - } - } - - fn clear_justification_requests(&mut self) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::ClearJustificationRequests); - } - - fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number)); - } - - fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestFinalityProof( - hash.clone(), - number, - )); - } - - fn finality_proof_imported( - &mut self, - who: PeerId, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - let success = finalization_result.is_ok(); - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::FinalityProofImportResult( - request_block, - finalization_result, - )); - if !success { - info!("Invalid finality proof provided by {} for #{}", who, request_block.0); - let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); - let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone())); - } - } - - fn report_peer(&mut self, who: PeerId, reputation_change: i32) { - let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who, reputation_change)); - } - - fn restart(&mut self) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RestartSync); - } - - fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::SetFinalityProofRequestBuilder(request_builder)); - } -} - /// The test specialization. #[derive(Clone)] pub struct DummySpecialization; @@ -286,423 +206,43 @@ impl PeersClient { } } -/// A Link that can wait for a block to have been imported. -pub struct TestLink> { - link: NetworkLink, - - #[cfg(any(test, feature = "test-helpers"))] - network_to_protocol_sender: mpsc::UnboundedSender>, -} - -impl> TestLink { - fn new( - protocol_sender: mpsc::UnboundedSender>, - _network_to_protocol_sender: mpsc::UnboundedSender>, - network_sender: mpsc::UnboundedSender> - ) -> TestLink { - TestLink { - #[cfg(any(test, feature = "test-helpers"))] - network_to_protocol_sender: _network_to_protocol_sender, - link: NetworkLink { - protocol_sender, - network_sender, - } - } - } -} - -impl> Link for TestLink { - fn block_imported(&mut self, hash: &Hash, number: NumberFor) { - self.link.block_imported(hash, number); - } - - fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { - self.link.blocks_processed(processed_blocks, has_error); - } - - fn justification_imported(&mut self, who: PeerId, hash: &Hash, number:NumberFor, success: bool) { - self.link.justification_imported(who, hash, number, success); - } - - fn request_justification(&mut self, hash: &Hash, number: NumberFor) { - self.link.request_justification(hash, number); - } - - fn finality_proof_imported( - &mut self, - who: PeerId, - request_block: (Hash, NumberFor), - finalization_result: Result<(Hash, NumberFor), ()>, - ) { - self.link.finality_proof_imported(who, request_block, finalization_result); - } - - fn request_finality_proof(&mut self, hash: &Hash, number: NumberFor) { - self.link.request_finality_proof(hash, number); - } - - fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { - self.link.set_finality_proof_request_builder(request_builder); - } - - fn report_peer(&mut self, who: PeerId, reputation_change: i32) { - self.link.report_peer(who, reputation_change); - } - - fn restart(&mut self) { - self.link.restart(); - } - - /// Send synchronization request to the block import channel. - /// - /// The caller should wait for the `Link::synchronized` call to ensure that it has synchronized - /// with `ImportQueue`. - #[cfg(any(test, feature = "test-helpers"))] - fn synchronized(&mut self) { - drop(self.network_to_protocol_sender.unbounded_send(FromNetworkMsg::Synchronize)) - } -} - pub struct Peer> { - peer_id: PeerId, - client: PeersClient, - net_proto_channel: ProtocolChannel, - /// This field is used only in test code, but maintaining different - /// instantiation paths or field names is too much hassle, hence - /// we allow it to be unused. - #[cfg_attr(not(test), allow(unused))] - /// `(is_offline, is_major_syncing, num_peers)` - protocol_status: Arc>, - import_queue: Arc>>>, pub data: D, - best_hash: Mutex>, - finalized_hash: Mutex>, -} - -type MessageFilter = dyn Fn(&NetworkMsg) -> bool; - -pub enum FromNetworkMsg { - /// A peer connected. - PeerConnected(PeerId), - /// A peer disconnected. - PeerDisconnected(PeerId), - /// A custom message from another peer. - CustomMessage(PeerId, Message), - /// Synchronization request. - Synchronize, -} - -struct ProtocolChannel> { - /// If true, we expect a tokio executor to be available. If false, we spawn our own. - use_tokio: bool, - buffered_messages: Mutex>>, - network_to_protocol_sender: mpsc::UnboundedSender>, - client_to_protocol_sender: mpsc::UnboundedSender>, - protocol_to_network_receiver: Mutex>>, -} - -impl> ProtocolChannel { - /// Create new buffered network port. - pub fn new( - use_tokio: bool, - network_to_protocol_sender: mpsc::UnboundedSender>, - client_to_protocol_sender: mpsc::UnboundedSender>, - protocol_to_network_receiver: mpsc::UnboundedReceiver>, - ) -> Self { - ProtocolChannel { - use_tokio, - buffered_messages: Mutex::new(VecDeque::new()), - network_to_protocol_sender, - client_to_protocol_sender, - protocol_to_network_receiver: Mutex::new(protocol_to_network_receiver), - } - } - - /// Send message from network to protocol. - pub fn send_from_net(&self, message: FromNetworkMsg) { - let _ = self.network_to_protocol_sender.unbounded_send(message); - - let _ = self.network_to_protocol_sender.unbounded_send(FromNetworkMsg::Synchronize); - let _ = self.wait_sync(); - } - - /// Send message from client to protocol. - pub fn send_from_client(&self, message: ProtocolMsg) { - let _ = self.client_to_protocol_sender.unbounded_send(message); - - let _ = self.client_to_protocol_sender.unbounded_send(ProtocolMsg::Synchronize); - let _ = self.wait_sync(); - } - - /// Wait until synchronization response is generated by the protocol. - pub fn wait_sync(&self) -> Result<(), ()> { - let fut = futures::future::poll_fn(|| { - loop { - let mut protocol_to_network_receiver = self.protocol_to_network_receiver.lock(); - match protocol_to_network_receiver.poll() { - Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => return Ok(Async::Ready(())), - Ok(Async::Ready(None)) | Err(_) => return Err(()), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(Some(msg))) => self.buffered_messages.lock().push_back(msg), - } - } - }); - - if self.use_tokio { - fut.wait() - } else { - tokio::runtime::current_thread::block_on_all(fut) - } - } - - /// Produce the next pending message to send to another peer. - fn pending_message(&self, message_filter: &MessageFilter) -> Option> { - if let Some(message) = self.buffered_message(message_filter) { - return Some(message); - } - - while let Some(message) = self.channel_message() { - if message_filter(&message) { - return Some(message) - } else { - self.buffered_messages.lock().push_back(message); - } - } - - None - } - - /// Whether this peer is done syncing (has no messages to send). - fn is_done(&self) -> bool { - let mut buffered_messages = self.buffered_messages.lock(); - if let Some(msg) = self.channel_message() { - buffered_messages.push_back(msg); - false - } else { - buffered_messages.is_empty() - } - } - - /// Return oldest buffered message if it exists. - fn buffered_message(&self, message_filter: &MessageFilter) -> Option> { - let mut buffered_messages = self.buffered_messages.lock(); - for i in 0..buffered_messages.len() { - if message_filter(&buffered_messages[i]) { - return buffered_messages.remove(i); - } - } - - None - } - - /// Receive message from the channel. - fn channel_message(&self) -> Option> { - let fut = futures::future::poll_fn(|| -> Result<_, ()> { - Ok(Async::Ready(match self.protocol_to_network_receiver.lock().poll() { - Ok(Async::Ready(Some(m))) => Some(m), - Ok(Async::NotReady) => None, - Err(_) => None, - Ok(Async::Ready(None)) => None, - })) - }); - - if self.use_tokio { - fut.wait() - } else { - tokio::runtime::current_thread::block_on_all(fut) - }.ok().and_then(|a| a) - } + client: PeersClient, + /// We keep a copy of the verifier so that we can invoke it for locally-generated blocks, + /// instead of going through the import queue. + verifier: Arc>, + /// We keep a copy of the block_import so that we can invoke it for locally-generated blocks, + /// instead of going through the import queue. + block_import: Arc>, + network: NetworkWorker::Hash>, + to_poll: smallvec::SmallVec<[Box + Send>; 2]>, } impl> Peer { - fn new( - protocol_status: Arc>, - client: PeersClient, - import_queue: Arc>>>, - use_tokio: bool, - network_to_protocol_sender: mpsc::UnboundedSender>, - protocol_sender: mpsc::UnboundedSender>, - _network_sender: mpsc::UnboundedSender>, - network_port: mpsc::UnboundedReceiver>, - data: D, - ) -> Self { - let net_proto_channel = ProtocolChannel::new( - use_tokio, - network_to_protocol_sender.clone(), - protocol_sender.clone(), - network_port, - ); - Peer { - protocol_status, - peer_id: PeerId::random(), - client, - import_queue, - net_proto_channel, - data, - best_hash: Mutex::new(None), - finalized_hash: Mutex::new(None), - } - } - /// Called after blockchain has been populated to updated current state. - fn start(&self) { - // Update the sync state to the latest chain state. - let info = self.client.info(); - let header = self - .client - .header(&BlockId::Hash(info.chain.best_hash)) - .unwrap() - .unwrap(); - self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header)); - } - - #[cfg(test)] - fn on_block_imported( - &self, - hash: ::Hash, - header: &::Header, - ) { - self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(hash, header.clone())); - } - - /// SyncOracle: are we connected to any peer? - #[cfg(test)] - fn is_offline(&self) -> bool { - self.protocol_status.read().0 + /// Returns true if we're major syncing. + pub fn is_major_syncing(&self) -> bool { + self.network.service().is_major_syncing() } - /// SyncOracle: are we in the process of catching-up with the chain? - #[cfg(test)] - fn is_major_syncing(&self) -> bool { - self.protocol_status.read().1 + /// Returns the number of peers we're connected to. + pub fn num_peers(&self) -> usize { + self.network.num_connected_peers() } - /// Get protocol status. - #[cfg(test)] - fn num_peers(&self) -> usize { - self.protocol_status.read().2 + /// Returns true if we have no peer. + pub fn is_offline(&self) -> bool { + self.num_peers() == 0 } - /// Called on connection to other indicated peer. - fn on_connect(&self, other: &Self) { - self.net_proto_channel.send_from_net(FromNetworkMsg::PeerConnected(other.peer_id.clone())); - } - - /// Called on disconnect from other indicated peer. - fn on_disconnect(&self, other: &Self) { - self.net_proto_channel.send_from_net(FromNetworkMsg::PeerDisconnected(other.peer_id.clone())); - } - - /// Receive a message from another peer. Return a set of peers to disconnect. - fn receive_message(&self, from: &PeerId, msg: Message) { - self.net_proto_channel.send_from_net(FromNetworkMsg::CustomMessage(from.clone(), msg)); - } - - /// Produce the next pending message to send to another peer. - fn pending_message(&self, message_filter: &MessageFilter) -> Option> { - self.net_proto_channel.pending_message(message_filter) - } - - /// Whether this peer is done syncing (has no messages to send). - fn is_done(&self) -> bool { - self.net_proto_channel.is_done() - } - - /// Synchronize with import queue. - #[cfg(any(test, feature = "test-helpers"))] - pub fn import_queue_sync(&self) { - self.import_queue.lock().synchronize(); - let _ = self.net_proto_channel.wait_sync(); - } - - /// Execute a "sync step". This is called for each peer after it sends a packet. - fn sync_step(&self) { - self.net_proto_channel.send_from_client(ProtocolMsg::Tick); - } - - /// Send block import notifications. - fn send_import_notifications(&self) { - let info = self.client.info(); - - let mut best_hash = self.best_hash.lock(); - match *best_hash { - None => {}, - Some(hash) if hash != info.chain.best_hash => {}, - _ => return, - } - - let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - self.net_proto_channel.send_from_client(ProtocolMsg::BlockImported(info.chain.best_hash, header)); - *best_hash = Some(info.chain.best_hash); - } - - /// Send block finalization notifications. - fn send_finality_notifications(&self) { - let info = self.client.info(); - - let mut finalized_hash = self.finalized_hash.lock(); - match *finalized_hash { - None => {}, - Some(hash) if hash != info.chain.finalized_hash => {}, - _ => return, - } - - let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap(); - self.net_proto_channel.send_from_client( - ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()) - ); - *finalized_hash = Some(info.chain.finalized_hash); - } - - /// Push a message into the gossip network and relay to peers. - /// `TestNet::sync_step` needs to be called to ensure it's propagated. - pub fn gossip_message( - &self, - topic: ::Hash, - engine_id: ConsensusEngineId, - data: Vec, - force: bool, - ) { - let recipient = if force { - GossipMessageRecipient::BroadcastToAll - } else { - GossipMessageRecipient::BroadcastNew - }; - self.net_proto_channel.send_from_client( - ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient), - ); - } - - /// access the underlying consensus gossip handler - pub fn consensus_gossip_messages_for( - &self, - engine_id: ConsensusEngineId, - topic: ::Hash, - ) -> mpsc::UnboundedReceiver { - let (tx, rx) = oneshot::channel(); - self.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(engine_id, topic); - let _ = tx.send(inner_rx); - }); - rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") - } - - /// Execute a closure with the consensus gossip. - pub fn with_gossip(&self, f: F) - where F: FnOnce(&mut ConsensusGossip, &mut dyn Context) + Send + 'static - { - self.net_proto_channel.send_from_client(ProtocolMsg::ExecuteWithGossip(Box::new(f))); - } - - /// Announce a block to peers. - #[cfg(test)] - fn announce_block(&self, block: Hash) { - self.net_proto_channel.send_from_client(ProtocolMsg::AnnounceBlock(block)); + /// Request a justification for the given block. + pub fn request_justification(&self, hash: &::Hash, number: NumberFor) { + self.network.service().request_justification(hash, number); } - /// Request a justification for the given block. - #[cfg(test)] - fn request_justification(&self, hash: &::primitives::H256, number: NumberFor) { - self.net_proto_channel.send_from_client(ProtocolMsg::RequestJustification(hash.clone(), number)); + /// Announces an important block on the network. + pub fn announce_block(&self, hash: ::Hash) { + self.network.service().announce_block(hash); } /// Add blocks to the peer -- edit the block before adding @@ -737,22 +277,23 @@ impl> Peer { block.header.parent_hash ); let header = block.header.clone(); - at = hash; - self.import_queue.lock().import_blocks( + let (import_block, cache) = self.verifier.verify( origin, - vec![IncomingBlock { - origin: None, - hash, - header: Some(header), - body: Some(block.extrinsics), - justification: None, - }], - ); - - // make sure block import has completed - self.import_queue_sync(); + header.clone(), + None, + Some(block.extrinsics) + ).unwrap(); + let cache = if let Some(cache) = cache { + cache.into_iter().collect() + } else { + Default::default() + }; + self.block_import.import_block(import_block, cache).expect("block_import failed"); + self.network.service().on_block_imported(hash, header); + at = hash; } + self.network.service().announce_block(at.clone()); at } @@ -794,6 +335,11 @@ impl> Peer { pub fn client(&self) -> &PeersClient { &self.client } + + /// Get a reference to the network service. + pub fn network_service(&self) -> &Arc::Hash>> { + &self.network.service() + } } pub struct EmptyTransactionPool; @@ -831,11 +377,8 @@ pub trait TestNetFactory: Sized { /// Get reference to peer. fn peer(&self, i: usize) -> &Peer; - fn peers(&self) -> &Vec>>; - fn mut_peers>>)>(&mut self, closure: F); - - fn started(&self) -> bool; - fn set_started(&mut self, now: bool); + fn peers(&self) -> &Vec>; + fn mut_peers>)>(&mut self, closure: F); /// Get custom block import handle for fresh client, along with peer data. fn make_block_import(&self, client: PeersClient) @@ -859,11 +402,6 @@ pub trait TestNetFactory: Sized { ProtocolConfig::default() } - /// Must return true if the testnet is going to be used from within a tokio context. - fn uses_tokio(&self) -> bool { - false - } - /// Create new test network with this many peers. fn new(n: usize) -> Self { trace!(target: "test_network", "Creating test network"); @@ -877,218 +415,112 @@ pub trait TestNetFactory: Sized { net } - /// Add created peer. - fn add_peer( - &mut self, - protocol_status: Arc>, - import_queue: Arc>>>, - tx_pool: EmptyTransactionPool, - finality_proof_provider: Option>>, - mut protocol: Protocol, - protocol_sender: mpsc::UnboundedSender>, - network_to_protocol_sender: mpsc::UnboundedSender>, - network_sender: mpsc::UnboundedSender>, - mut network_to_protocol_rx: mpsc::UnboundedReceiver>, - mut protocol_rx: mpsc::UnboundedReceiver>, - peer: Arc>, - ) { - std::thread::spawn(move || { - // Implementation of `protocol::NetworkOut` using the available local variables. - struct Ctxt<'a, B: BlockT>(&'a mpsc::UnboundedSender>); - impl<'a, B: BlockT> NetworkOut for Ctxt<'a, B> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - let _ = self.0.unbounded_send(NetworkMsg::ReportPeer(who, reputation)); - } - fn disconnect_peer(&mut self, who: PeerId) { - let _ = self.0.unbounded_send(NetworkMsg::DisconnectPeer(who)); - } - fn send_message(&mut self, who: PeerId, message: Message) { - let _ = self.0.unbounded_send(NetworkMsg::Outgoing(who, message)); - } - } + /// Add a full peer. + fn add_full_peer(&mut self, config: &ProtocolConfig) { + let client = Arc::new(test_client::new()); + let verifier = self.make_verifier(PeersClient::Full(client.clone()), config); + let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) + = self.make_block_import(PeersClient::Full(client.clone())); - tokio::runtime::current_thread::run(futures::future::poll_fn(move || { - import_queue.lock().poll_actions(&mut TestLink::new( - protocol_sender.clone(), - network_to_protocol_sender.clone(), - network_sender.clone(), - )); - - while let Async::Ready(msg) = network_to_protocol_rx.poll().unwrap() { - let outcome = match msg { - Some(FromNetworkMsg::PeerConnected(peer_id)) => { - protocol.on_peer_connected(&mut Ctxt(&network_sender), peer_id); - CustomMessageOutcome::None - }, - Some(FromNetworkMsg::PeerDisconnected(peer_id)) => { - protocol.on_peer_disconnected(&mut Ctxt(&network_sender), peer_id); - CustomMessageOutcome::None - }, - Some(FromNetworkMsg::CustomMessage(peer_id, message)) => - protocol.on_custom_message( - &mut Ctxt(&network_sender), - &tx_pool, - peer_id, - message, - finality_proof_provider.as_ref().map(|p| &**p) - ), - Some(FromNetworkMsg::Synchronize) => { - let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); - CustomMessageOutcome::None - }, - None => return Ok(Async::Ready(())), - }; + let import_queue = Box::new(BasicQueue::new( + verifier.clone(), + block_import.clone(), + justification_import, + finality_proof_import, + finality_proof_request_builder, + )); - match outcome { - CustomMessageOutcome::BlockImport(origin, blocks) => - import_queue.lock().import_blocks(origin, blocks), - CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => - import_queue.lock().import_justification(origin, hash, nb, justification), - CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => - import_queue.lock().import_finality_proof(origin, hash, nb, proof), - CustomMessageOutcome::None => {} + let listen_addr = build_multiaddr![Memory(rand::random::())]; + + let network = NetworkWorker::new(crate::config::Params { + roles: config.roles, + network_config: NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + ..NetworkConfiguration::default() + }, + chain: client.clone(), + finality_proof_provider: self.make_finality_proof_provider(PeersClient::Full(client.clone())), + on_demand: None, + transaction_pool: Arc::new(EmptyTransactionPool), + protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), + import_queue, + specialization: self::SpecializationFactory::create(), + }).unwrap(); + + let blocks_notif_future = { + let network = Arc::downgrade(&network.service().clone()); + client.import_notification_stream() + .for_each(move |notification| { + if let Some(network) = network.upgrade() { + network.on_block_imported(notification.hash, notification.header); } - } - - loop { - let msg = match protocol_rx.poll() { - Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => break, - }; + Ok(()) + }) + .then(|_| Ok(())) + }; - match msg { - ProtocolMsg::BlockImported(hash, header) => - protocol.on_block_imported(&mut Ctxt(&network_sender), hash, &header), - ProtocolMsg::BlockFinalized(hash, header) => - protocol.on_block_finalized(&mut Ctxt(&network_sender), hash, &header), - ProtocolMsg::ExecuteWithSpec(task) => { - let mut ctxt = Ctxt(&network_sender); - let (mut context, spec) = protocol.specialization_lock(&mut ctxt); - task.call_box(spec, &mut context); - }, - ProtocolMsg::ExecuteWithGossip(task) => { - let mut ctxt = Ctxt(&network_sender); - let (mut context, gossip) = protocol.consensus_gossip_lock(&mut ctxt); - task.call_box(gossip, &mut context); - } - ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - protocol.gossip_consensus_message( - &mut Ctxt(&network_sender), - topic, - engine_id, - message, - recipient - ), - ProtocolMsg::BlocksProcessed(hashes, has_error) => - protocol.blocks_processed(&mut Ctxt(&network_sender), hashes, has_error), - ProtocolMsg::RestartSync => - protocol.restart(&mut Ctxt(&network_sender)), - ProtocolMsg::AnnounceBlock(hash) => - protocol.announce_block(&mut Ctxt(&network_sender), hash), - ProtocolMsg::BlockImportedSync(hash, number) => - protocol.block_imported(&hash, number), - ProtocolMsg::ClearJustificationRequests => - protocol.clear_justification_requests(), - ProtocolMsg::RequestJustification(hash, number) => - protocol.request_justification(&mut Ctxt(&network_sender), &hash, number), - ProtocolMsg::JustificationImportResult(hash, number, success) => - protocol.justification_import_result(hash, number, success), - ProtocolMsg::SetFinalityProofRequestBuilder(builder) => - protocol.set_finality_proof_request_builder(builder), - ProtocolMsg::RequestFinalityProof(hash, number) => - protocol.request_finality_proof(&mut Ctxt(&network_sender), &hash, number), - ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => - protocol.finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => - protocol.propagate_extrinsics(&mut Ctxt(&network_sender), &tx_pool), - #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => protocol.tick(&mut Ctxt(&network_sender)), - #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Synchronize => { - trace!(target: "sync", "handle_client_msg: received Synchronize msg"); - let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); + let finality_notif_future = { + let network = Arc::downgrade(&network.service().clone()); + + // A utility stream that drops all ready items and only returns the last one. + // This is used to only keep the last finality notification and avoid + // overloading the sync module with notifications. + struct MostRecentNotification(futures::stream::Fuse>); + + impl Stream for MostRecentNotification { + type Item = as Stream>::Item; + type Error = as Stream>::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut last = None; + let last = loop { + match self.0.poll()? { + Async::Ready(Some(item)) => { last = Some(item) } + Async::Ready(None) => match last { + None => return Ok(Async::Ready(None)), + Some(last) => break last, + }, + Async::NotReady => match last { + None => return Ok(Async::NotReady), + Some(last) => break last, + }, } - } - } + }; - if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender), &tx_pool).unwrap() { - return Ok(Async::Ready(())) + Ok(Async::Ready(Some(last))) } + } - *protocol_status.write() = ( - protocol.num_connected_peers() == 0, - protocol.sync_state() == SyncState::Downloading, - protocol.num_connected_peers() - ); - Ok(Async::NotReady) - })); - }); - - if self.started() { - peer.start(); - self.peers().iter().for_each(|other| { - other.on_connect(&*peer); - peer.on_connect(other); - }); - } + MostRecentNotification(client.finality_notification_stream().fuse()) + .for_each(move |notification| { + if let Some(network) = network.upgrade() { + network.on_block_finalized(notification.hash, notification.header); + } + Ok(()) + }) + .then(|_| Ok(())) + }; self.mut_peers(|peers| { - peers.push(peer) - }); - } - - /// Add a full peer. - fn add_full_peer(&mut self, config: &ProtocolConfig) { - let client = Arc::new(test_client::new()); - let verifier = self.make_verifier(PeersClient::Full(client.clone()), config); - let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) - = self.make_block_import(PeersClient::Full(client.clone())); - let (network_sender, network_port) = mpsc::unbounded(); + for peer in peers.iter_mut() { + peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); + } - let import_queue = Arc::new(Mutex::new(Box::new(BasicQueue::new( - verifier, - block_import, - justification_import, - finality_proof_import, - finality_proof_request_builder, - )))); - let specialization = self::SpecializationFactory::create(); - - let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); - let (protocol_sender, protocol_rx) = mpsc::unbounded(); - - let protocol = Protocol::new( - config.clone(), - client.clone(), - Arc::new(AlwaysBadChecker), - specialization, - ).unwrap(); - - let protocol_status = Arc::new(RwLock::new((true, false, 0))); - self.add_peer( - protocol_status.clone(), - import_queue.clone(), - EmptyTransactionPool, - self.make_finality_proof_provider(PeersClient::Full(client.clone())), - protocol, - protocol_sender.clone(), - network_to_protocol_sender.clone(), - network_sender.clone(), - network_to_protocol_rx, - protocol_rx, - Arc::new(Peer::new( - protocol_status, - PeersClient::Full(client), - import_queue, - self.uses_tokio(), - network_to_protocol_sender, - protocol_sender, - network_sender, - network_port, + peers.push(Peer { data, - )), - ); + client: PeersClient::Full(client), + block_import, + verifier, + to_poll: { + let mut sv = smallvec::SmallVec::new(); + sv.push(Box::new(blocks_notif_future) as Box<_>); + sv.push(Box::new(finality_notif_future) as Box<_>); + sv + }, + network, + }); + }); } /// Add a light peer. @@ -1100,188 +532,103 @@ pub trait TestNetFactory: Sized { let verifier = self.make_verifier(PeersClient::Light(client.clone()), &config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Light(client.clone())); - let (network_sender, network_port) = mpsc::unbounded(); - let import_queue = Arc::new(Mutex::new(Box::new(BasicQueue::new( - verifier, - block_import, + let import_queue = Box::new(BasicQueue::new( + verifier.clone(), + block_import.clone(), justification_import, finality_proof_import, finality_proof_request_builder, - )))); - let specialization = self::SpecializationFactory::create(); - - let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); - let (protocol_sender, protocol_rx) = mpsc::unbounded(); - - let protocol = Protocol::new( - config, - client.clone(), - Arc::new(AlwaysBadChecker), - specialization, - ).unwrap(); - - let protocol_status = Arc::new(RwLock::new((true, false, 0))); - self.add_peer( - protocol_status.clone(), - import_queue.clone(), - EmptyTransactionPool, - self.make_finality_proof_provider(PeersClient::Light(client.clone())), - protocol, - protocol_sender.clone(), - network_to_protocol_sender.clone(), - network_sender.clone(), - network_to_protocol_rx, - protocol_rx, - Arc::new(Peer::new( - protocol_status, - PeersClient::Light(client), - import_queue, - self.uses_tokio(), - network_to_protocol_sender, - protocol_sender, - network_sender, - network_port, - data, - )), - ); - } + )); - /// Start network. - fn start(&mut self) { - if self.started() { - return; - } - for peer in self.peers() { - peer.start(); - for client in self.peers() { - if peer.peer_id != client.peer_id { - peer.on_connect(client); - } - } - } + let listen_addr = build_multiaddr![Memory(rand::random::())]; + + let network = NetworkWorker::new(crate::config::Params { + roles: config.roles, + network_config: NetworkConfiguration { + listen_addresses: vec![listen_addr.clone()], + transport: TransportConfig::MemoryOnly, + ..NetworkConfiguration::default() + }, + chain: client.clone(), + finality_proof_provider: self.make_finality_proof_provider(PeersClient::Light(client.clone())), + on_demand: None, + transaction_pool: Arc::new(EmptyTransactionPool), + protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), + import_queue, + specialization: self::SpecializationFactory::create(), + }).unwrap(); + + let blocks_notif_future = { + let network = Arc::downgrade(&network.service().clone()); + client.import_notification_stream() + .for_each(move |notification| { + if let Some(network) = network.upgrade() { + network.on_block_imported(notification.hash, notification.header); + } + Ok(()) + }) + .then(|_| Ok(())) + }; - loop { - // we only deliver Status messages during start - let need_continue = self.route_single(true, None, &|msg| match *msg { - NetworkMsg::Outgoing(_, crate::message::generic::Message::Status(_)) => true, - NetworkMsg::Outgoing(_, _) => false, - NetworkMsg::DisconnectPeer(_) | - NetworkMsg::ReportPeer(_, _) | NetworkMsg::Synchronized => true, - }); - if !need_continue { - break; + self.mut_peers(|peers| { + for peer in peers.iter_mut() { + peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); } - } - self.set_started(true); + peers.push(Peer { + data, + verifier, + block_import, + client: PeersClient::Light(client), + to_poll: { + let mut sv = smallvec::SmallVec::new(); + sv.push(Box::new(blocks_notif_future) as Box<_>); + sv + }, + network, + }); + }); } - /// Do single round of message routing: single message from every peer is routed. - fn route_single( - &mut self, - disconnect: bool, - disconnected: Option>, - message_filter: &MessageFilter, - ) -> bool { - let mut had_messages = false; - let mut to_disconnect = HashSet::new(); - let peers = self.peers(); - for peer in peers { - if let Some(message) = peer.pending_message(message_filter) { - match message { - NetworkMsg::Outgoing(recipient_id, packet) => { - had_messages = true; - - let sender_pos = peers.iter().position(|p| p.peer_id == peer.peer_id).unwrap(); - let recipient_pos = peers.iter().position(|p| p.peer_id == recipient_id).unwrap(); - if disconnect { - if let Some(ref disconnected) = disconnected { - let mut current = HashSet::new(); - current.insert(sender_pos); - current.insert(recipient_pos); - // Not routing message between "disconnected" nodes. - if disconnected.is_subset(¤t) { - continue; - } - } - } - - peers[recipient_pos].receive_message(&peer.peer_id, packet); - }, - NetworkMsg::DisconnectPeer(who) => { - if disconnect { - to_disconnect.insert(who); - } - }, - _ => (), - } - } - } - - for d in to_disconnect { - if let Some(d) = peers.iter().find(|p| p.peer_id == d) { - for peer in 0..peers.len() { - peers[peer].on_disconnect(d); - } + /// Polls the testnet until all nodes are in sync. + /// + /// Must be executed in a task context. + fn poll_until_sync(&mut self) -> Async<()> { + self.poll(); + + // Return `NotReady` if there's a mismatch in the highest block number. + let mut highest = None; + for peer in self.peers().iter() { + match (highest, peer.client.info().chain.best_number) { + (None, b) => highest = Some(b), + (Some(ref a), ref b) if a == b => {}, + (Some(_), _) => return Async::NotReady, } } - - // make sure that the protocol(s) has processed all messages that have been queued - self.peers().iter().for_each(|peer| peer.import_queue_sync()); - - had_messages - } - - /// Send block import notifications for all peers. - fn send_import_notifications(&mut self) { - self.peers().iter().for_each(|peer| peer.send_import_notifications()) - } - - /// Send block finalization notifications for all peers. - fn send_finality_notifications(&mut self) { - self.peers().iter().for_each(|peer| peer.send_finality_notifications()) - } - - /// Perform synchronization until complete, if provided the - /// given nodes set are excluded from sync. - fn sync_with(&mut self, disconnect: bool, disconnected: Option>) { - self.start(); - while self.route_single(disconnect, disconnected.clone(), &|_| true) { - // give protocol a chance to do its maintain procedures - self.peers().iter().for_each(|peer| peer.sync_step()); - } - } - - /// Deliver at most 1 pending message from every peer. - fn sync_step(&mut self) { - self.route_single(true, None, &|_| true); + Async::Ready(()) } - /// Maintain sync for a peer. - fn tick_peer(&mut self, i: usize) { - self.peers()[i].sync_step(); - } - - /// Deliver pending messages until there are no more. - fn sync(&mut self) { - self.sync_with(true, None) - } - - /// Deliver pending messages until there are no more. Do not disconnect nodes. - fn sync_without_disconnects(&mut self) { - self.sync_with(false, None) + /// Blocks the current thread until we are sync'ed. + /// + /// Calls `poll_until_sync` repeatidely with the runtime passed as parameter. + fn block_until_sync(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) { + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap(); } - /// Whether all peers have no pending outgoing messages. - fn done(&self) -> bool { - self.peers().iter().all(|p| p.is_done()) + /// Polls the testnet. Processes all the pending actions and returns `NotReady`. + fn poll(&mut self) { + self.mut_peers(|peers| { + for peer in peers { + peer.network.poll().unwrap(); + peer.to_poll.retain(|f| f.poll() == Ok(Async::NotReady)); + } + }); } } pub struct TestNet { - peers: Vec>>, - started: bool, + peers: Vec>, } impl TestNetFactory for TestNet { @@ -1293,7 +640,6 @@ impl TestNetFactory for TestNet { fn from_config(_config: &ProtocolConfig) -> Self { TestNet { peers: Vec::new(), - started: false } } @@ -1307,21 +653,13 @@ impl TestNetFactory for TestNet { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F) { + fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } - - fn started(&self) -> bool { - self.started - } - - fn set_started(&mut self, new: bool) { - self.started = new; - } } pub struct ForceFinalized(PeersClient); @@ -1361,22 +699,14 @@ impl TestNetFactory for JustificationTestNet { self.0.peer(i) } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec> { self.0.peers() } - fn mut_peers>>)>(&mut self, closure: F) { + fn mut_peers>)>(&mut self, closure: F) { self.0.mut_peers(closure) } - fn started(&self) -> bool { - self.0.started() - } - - fn set_started(&mut self, new: bool) { - self.0.set_started(new) - } - fn make_block_import(&self, client: PeersClient) -> ( SharedBlockImport, diff --git a/core/network/src/test/sync.rs b/core/network/src/test/sync.rs index 15b866c168b45..a7603f755191c 100644 --- a/core/network/src/test/sync.rs +++ b/core/network/src/test/sync.rs @@ -16,13 +16,14 @@ use client::{backend::Backend, blockchain::HeaderBackend}; use crate::config::Roles; -use crate::message; use consensus::BlockOrigin; -use std::collections::HashSet; +use std::{time::Duration, time::Instant}; +use tokio::runtime::current_thread; use super::*; fn test_ancestor_search_when_common_is(n: usize) { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); net.peer(0).push_blocks(n, false); @@ -33,7 +34,7 @@ fn test_ancestor_search_when_common_is(n: usize) { net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain() .canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); } @@ -41,28 +42,24 @@ fn test_ancestor_search_when_common_is(n: usize) { #[test] fn sync_peers_works() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); - net.sync(); - for peer in 0..3 { - // Assert peers is up to date. - assert_eq!(net.peer(peer).num_peers(), 2); - // And then disconnect. - for other in 0..3 { - if other != peer { - net.peer(peer).on_disconnect(net.peer(other)); + + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + for peer in 0..3 { + if net.peer(peer).num_peers() != 2 { + return Ok(Async::NotReady) } } - } - net.sync(); - // Now peers are disconnected. - for peer in 0..3 { - assert_eq!(net.peer(peer).num_peers(), 0); - } + Ok(Async::Ready(())) + })).unwrap(); } #[test] fn sync_cycle_from_offline_to_syncing_to_offline() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); for peer in 0..3 { // Offline, and not major syncing. @@ -72,63 +69,92 @@ fn sync_cycle_from_offline_to_syncing_to_offline() { // Generate blocks. net.peer(2).push_blocks(100, false); - net.start(); - for peer in 0..3 { - // Online - assert!(!net.peer(peer).is_offline()); - if peer < 2 { - // Major syncing. - assert!(net.peer(peer).is_major_syncing()); - } - } - net.sync(); - for peer in 0..3 { - // All done syncing. - assert!(!net.peer(peer).is_major_syncing()); - } - // Now disconnect them all. - for peer in 0..3 { - for other in 0..3 { - if other != peer { - net.peer(peer).on_disconnect(net.peer(other)); + // Block until all nodes are online and nodes 0 and 1 and major syncing. + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + for peer in 0..3 { + // Online + if net.peer(peer).is_offline() { + return Ok(Async::NotReady) + } + if peer < 2 { + // Major syncing. + if !net.peer(peer).is_major_syncing() { + return Ok(Async::NotReady) + } } } - net.sync(); - assert!(net.peer(peer).is_offline()); - assert!(!net.peer(peer).is_major_syncing()); - } + Ok(Async::Ready(())) + })).unwrap(); + + // Block until all nodes are done syncing. + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + for peer in 0..3 { + if net.peer(peer).is_major_syncing() { + return Ok(Async::NotReady) + } + } + Ok(Async::Ready(())) + })).unwrap(); + + // Now drop nodes 1 and 2, and check that node 0 is offline. + net.peers.remove(2); + net.peers.remove(1); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if !net.peer(0).is_offline() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); } #[test] fn syncing_node_not_major_syncing_when_disconnected() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); // Generate blocks. net.peer(2).push_blocks(100, false); - net.start(); - net.sync_step(); - - // Peer 1 is major-syncing. - assert!(net.peer(1).is_major_syncing()); - - // Disconnect peer 1 form everyone else. - net.peer(1).on_disconnect(net.peer(0)); - net.peer(1).on_disconnect(net.peer(2)); - // Peer 1 is not major-syncing. - net.sync(); + // Check that we're not major syncing when disconnected. assert!(!net.peer(1).is_major_syncing()); + + // Check that we switch to major syncing. + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if !net.peer(1).is_major_syncing() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); + + // Destroy two nodes, and check that we switch to non-major syncing. + net.peers.remove(2); + net.peers.remove(0); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(0).is_major_syncing() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); } #[test] fn sync_from_two_peers_works() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain() .equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); assert!(!net.peer(0).is_major_syncing()); @@ -137,11 +163,12 @@ fn sync_from_two_peers_works() { #[test] fn sync_from_two_peers_with_ancestry_search_works() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); net.peer(0).push_blocks(10, true); net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain() .canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); } @@ -149,13 +176,14 @@ fn sync_from_two_peers_with_ancestry_search_works() { #[test] fn ancestry_search_works_when_backoff_is_one() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); net.peer(0).push_blocks(1, false); net.peer(1).push_blocks(2, false); net.peer(2).push_blocks(2, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain() .canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); } @@ -163,13 +191,14 @@ fn ancestry_search_works_when_backoff_is_one() { #[test] fn ancestry_search_works_when_ancestor_is_genesis() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); net.peer(0).push_blocks(13, true); net.peer(1).push_blocks(100, false); net.peer(2).push_blocks(100, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain() .canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); } @@ -191,9 +220,11 @@ fn ancestry_search_works_when_common_is_hundred() { #[test] fn sync_long_chain_works() { + let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(2); net.peer(1).push_blocks(500, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain() .equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); } @@ -201,10 +232,11 @@ fn sync_long_chain_works() { #[test] fn sync_no_common_longer_chain_fails() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); net.peer(0).push_blocks(20, true); net.peer(1).push_blocks(20, false); - net.sync(); + net.block_until_sync(&mut runtime); assert!(!net.peer(0).client.as_in_memory_backend().blockchain() .canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain())); } @@ -212,9 +244,10 @@ fn sync_no_common_longer_chain_fails() { #[test] fn sync_justifications() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = JustificationTestNet::new(3); net.peer(0).push_blocks(20, false); - net.sync(); + net.block_until_sync(&mut runtime); // there's currently no justification for block #10 assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None); @@ -234,17 +267,26 @@ fn sync_justifications() { net.peer(1).request_justification(&h2.hash().into(), 15); net.peer(1).request_justification(&h3.hash().into(), 20); - net.sync(); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| { + net.poll(); - for height in (10..21).step_by(5) { - assert_eq!(net.peer(0).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new())); - assert_eq!(net.peer(1).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new())); - } + for height in (10..21).step_by(5) { + if net.peer(0).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) { + return Ok(Async::NotReady); + } + if net.peer(1).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) { + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + })).unwrap(); } #[test] fn sync_justifications_across_forks() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = JustificationTestNet::new(3); // we push 5 blocks net.peer(0).push_blocks(5, false); @@ -254,24 +296,31 @@ fn sync_justifications_across_forks() { // peer 1 will only see the longer fork. but we'll request justifications // for both and finalize the small fork instead. - net.sync(); + net.block_until_sync(&mut runtime); net.peer(0).client().finalize_block(BlockId::Hash(f1_best), Some(Vec::new()), true).unwrap(); net.peer(1).request_justification(&f1_best, 10); net.peer(1).request_justification(&f2_best, 11); - net.sync(); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| { + net.poll(); - assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new())); - assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new())); + if net.peer(0).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new()) && + net.peer(1).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new()) + { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + })).unwrap(); } #[test] fn sync_after_fork_works() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); - net.sync_step(); net.peer(0).push_blocks(30, false); net.peer(1).push_blocks(30, false); net.peer(2).push_blocks(30, false); @@ -285,7 +334,7 @@ fn sync_after_fork_works() { // peer 1 has the best chain let peer1_chain = net.peer(1).client.as_in_memory_backend().blockchain().clone(); - net.sync(); + net.block_until_sync(&mut runtime); assert!(net.peer(0).client.as_in_memory_backend().blockchain().canon_equals_to(&peer1_chain)); assert!(net.peer(1).client.as_in_memory_backend().blockchain().canon_equals_to(&peer1_chain)); assert!(net.peer(2).client.as_in_memory_backend().blockchain().canon_equals_to(&peer1_chain)); @@ -294,15 +343,15 @@ fn sync_after_fork_works() { #[test] fn syncs_all_forks() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(4); - net.sync_step(); net.peer(0).push_blocks(2, false); net.peer(1).push_blocks(2, false); net.peer(0).push_blocks(2, true); net.peer(1).push_blocks(4, false); - net.sync(); + net.block_until_sync(&mut runtime); // Check that all peers have all of the blocks. assert_eq!(9, net.peer(0).client.as_in_memory_backend().blockchain().blocks_count()); assert_eq!(9, net.peer(1).client.as_in_memory_backend().blockchain().blocks_count()); @@ -311,13 +360,12 @@ fn syncs_all_forks() { #[test] fn own_blocks_are_announced() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(3); - net.sync(); // connect'em + net.block_until_sync(&mut runtime); // connect'em net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.bake().unwrap()); - let header = net.peer(0).client().header(&BlockId::Number(1)).unwrap().unwrap(); - net.peer(0).on_block_imported(header.hash(), &header); - net.sync(); + net.block_until_sync(&mut runtime); assert_eq!(net.peer(0).client.as_in_memory_backend().blockchain().info().best_number, 1); assert_eq!(net.peer(1).client.as_in_memory_backend().blockchain().info().best_number, 1); @@ -329,6 +377,7 @@ fn own_blocks_are_announced() { #[test] fn blocks_are_not_announced_by_light_nodes() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(0); // full peer0 is connected to light peer @@ -336,35 +385,32 @@ fn blocks_are_not_announced_by_light_nodes() { let mut light_config = ProtocolConfig::default(); light_config.roles = Roles::LIGHT; net.add_full_peer(&ProtocolConfig::default()); - net.add_full_peer(&light_config); - net.add_full_peer(&ProtocolConfig::default()); + net.add_light_peer(&light_config); + // Sync between 0 and 1. net.peer(0).push_blocks(1, false); - net.peer(0).start(); - net.peer(1).start(); - net.peer(2).start(); - net.peer(0).on_connect(net.peer(1)); - net.peer(1).on_connect(net.peer(2)); - - // Only sync between 0 -> 1, and 1 -> 2 - let mut disconnected = HashSet::new(); - disconnected.insert(0); - disconnected.insert(2); - net.sync_with(true, Some(disconnected)); - - // peer 0 has the best chain - // peer 1 has the best chain - // peer 2 has genesis-chain only assert_eq!(net.peer(0).client.info().chain.best_number, 1); + net.block_until_sync(&mut runtime); assert_eq!(net.peer(1).client.info().chain.best_number, 1); - assert_eq!(net.peer(2).client.info().chain.best_number, 0); + + // Add another node and remove node 0. + net.add_full_peer(&ProtocolConfig::default()); + net.peers.remove(0); + + // Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together. + let mut delay = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5)); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| { + net.poll(); + delay.poll().map_err(|_| ()) + })).unwrap(); + assert_eq!(net.peer(1).client.info().chain.best_number, 0); } #[test] fn can_sync_small_non_best_forks() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); let mut net = TestNet::new(2); - net.sync_step(); net.peer(0).push_blocks(30, false); net.peer(1).push_blocks(30, false); @@ -381,7 +427,15 @@ fn can_sync_small_non_best_forks() { assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none()); - net.sync(); + // poll until the two nodes connect, otherwise announcing the block will not work + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(0).num_peers() == 0 { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + })).unwrap(); // synchronization: 0 synced to longer chain and 1 didn't sync to small chain. @@ -391,17 +445,24 @@ fn can_sync_small_non_best_forks() { assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); net.peer(0).announce_block(small_hash); - net.sync(); // after announcing, peer 1 downloads the block. - assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); - assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + + assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); + if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() { + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + })).unwrap(); } #[test] fn can_not_sync_from_light_peer() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); // given the network with 1 full nodes (#0) and 1 light node (#1) let mut net = TestNet::new(1); @@ -411,8 +472,7 @@ fn can_not_sync_from_light_peer() { net.peer(0).push_blocks(1, false); // and let the light client sync from this node - // (mind the #1 is disconnected && not syncing) - net.sync(); + net.block_until_sync(&mut runtime); // ensure #0 && #1 have the same best block let full0_info = net.peer(0).client.info().chain; @@ -421,52 +481,34 @@ fn can_not_sync_from_light_peer() { assert_eq!(light_info.best_number, 1); assert_eq!(light_info.best_hash, full0_info.best_hash); - // add new full client (#2) && sync without #0 + // add new full client (#2) && remove #0 net.add_full_peer(&Default::default()); - net.peer(1).on_connect(net.peer(2)); - net.peer(2).on_connect(net.peer(1)); - net.peer(1).announce_block(light_info.best_hash); - net.sync_with(true, Some(vec![0].into_iter().collect())); - - // ensure that the #2 has failed to sync block #1 - assert_eq!(net.peer(2).client.info().chain.best_number, 0); - // and that the #1 is still connected to #2 - // (because #2 has not tried to fetch block data from the #1 light node) - assert_eq!(net.peer(1).num_peers(), 2); - - // and now try to fetch block data from light peer #1 - // (this should result in disconnect) - net.peer(1).receive_message( - &net.peer(2).peer_id, - message::generic::Message::BlockRequest(message::generic::BlockRequest { - id: 0, - fields: message::BlockAttributes::HEADER, - from: message::FromBlock::Hash(light_info.best_hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }), - ); - net.sync(); - // check that light #1 has disconnected from #2 - assert_eq!(net.peer(1).num_peers(), 1); + net.peers.remove(0); + + // ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds + let mut test_finished = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5)); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + test_finished.poll().map_err(|_| ()) + })).unwrap(); } #[test] fn light_peer_imports_header_from_announce() { let _ = ::env_logger::try_init(); + let mut runtime = current_thread::Runtime::new().unwrap(); - fn import_with_announce(net: &mut TestNet, hash: H256) { - let header = net.peer(0).client().header(&BlockId::Hash(hash)).unwrap().unwrap(); - net.peer(1).receive_message( - &net.peer(0).peer_id, - message::generic::Message::BlockAnnounce(message::generic::BlockAnnounce { - header, - }), - ); - - net.peer(1).import_queue_sync(); - assert!(net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some()); + fn import_with_announce(net: &mut TestNet, runtime: &mut current_thread::Runtime, hash: H256) { + net.peer(0).announce_block(hash); + + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| { + net.poll(); + if net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + })).unwrap(); } // given the network with 1 full nodes (#0) and 1 light node (#1) @@ -474,13 +516,13 @@ fn light_peer_imports_header_from_announce() { net.add_light_peer(&Default::default()); // let them connect to each other - net.sync(); + net.block_until_sync(&mut runtime); // check that NEW block is imported from announce message let new_hash = net.peer(0).push_blocks(1, false); - import_with_announce(&mut net, new_hash); + import_with_announce(&mut net, &mut runtime, new_hash); // check that KNOWN STALE block is imported from announce message let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true); - import_with_announce(&mut net, known_stale_hash); + import_with_announce(&mut net, &mut runtime, known_stale_hash); } diff --git a/core/network/src/transport.rs b/core/network/src/transport.rs index ac6cc633a8c01..901ec18581e1d 100644 --- a/core/network/src/transport.rs +++ b/core/network/src/transport.rs @@ -30,10 +30,14 @@ pub use self::bandwidth::BandwidthSinks; /// Builds the transport that serves as a common ground for all connections. /// +/// If `memory_only` is true, then only communication within the same process are allowed. Only +/// addresses with the format `/memory/...` are allowed. +/// /// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all /// the connections spawned with this transport. pub fn build_transport( keypair: identity::Keypair, + memory_only: bool, wasm_external_transport: Option ) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc) { // Build configuration objects for encryption mechanisms. @@ -63,12 +67,21 @@ pub fn build_transport( OptionalTransport::none() }; #[cfg(not(target_os = "unknown"))] - let transport = { + let transport = transport.or_transport(if !memory_only { let desktop_trans = tcp::TcpConfig::new(); let desktop_trans = websocket::WsConfig::new(desktop_trans.clone()) .or_transport(desktop_trans); - transport.or_transport(dns::DnsConfig::new(desktop_trans)) - }; + OptionalTransport::some(dns::DnsConfig::new(desktop_trans)) + } else { + OptionalTransport::none() + }); + + let transport = transport.or_transport(if memory_only { + OptionalTransport::some(libp2p::core::transport::MemoryTransport::default()) + } else { + OptionalTransport::none() + }); + let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, Duration::from_secs(5)); // Encryption diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 13b7d91737b2c..134ffe507290a 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -35,7 +35,7 @@ use service::{ FactoryExtrinsic, }; use network::{multiaddr, Multiaddr, ManageNetwork}; -use network::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode}; +use network::config::{NetworkConfiguration, TransportConfig, NodeKeyConfig, Secret, NonReservedPeerMode}; use sr_primitives::generic::BlockId; use consensus::{ImportBlock, BlockImport}; @@ -160,8 +160,10 @@ fn node_config ( non_reserved_mode: NonReservedPeerMode::Accept, client_version: "network/test/0.1".to_owned(), node_name: "unknown".to_owned(), - enable_mdns: false, - wasm_external_transport: None, + transport: TransportConfig::Normal { + enable_mdns: false, + wasm_external_transport: None, + }, }; Configuration {