diff --git a/substrate/network-libp2p/src/network_state.rs b/substrate/network-libp2p/src/network_state.rs index 12ed88b2d5a1a..830a74fc0f431 100644 --- a/substrate/network-libp2p/src/network_state.rs +++ b/substrate/network-libp2p/src/network_state.rs @@ -649,11 +649,6 @@ impl NetworkState { self.disabled_peers.lock().insert(peer_info.id.clone(), timeout); } - /// Returns true if a peer is disabled. - pub fn is_peer_disabled(&self, node_id: &PeerstorePeerId) -> bool { - is_peer_disabled(&self.disabled_peers, node_id) - } - /// Flushes the caches to the disk. /// /// This is done in an atomical way, so that an error doesn't corrupt @@ -887,7 +882,6 @@ fn open_priv_key_file

(path: P) -> Result #[cfg(test)] mod tests { - use futures::sync::mpsc; use libp2p::core::{Endpoint, PublicKey}; use network_state::NetworkState; diff --git a/substrate/network-libp2p/src/service.rs b/substrate/network-libp2p/src/service.rs index f1d36202aab57..6267e4e09c3b8 100644 --- a/substrate/network-libp2p/src/service.rs +++ b/substrate/network-libp2p/src/service.rs @@ -138,30 +138,6 @@ impl NetworkService { }) } - /// Register a new protocol handler with the event loop. - pub fn register_protocol( - &self, - handler: Arc, - protocol: ProtocolId, - versions: &[(u8, u8)] - ) { - if self.shared.network_state.has_connected_peer() { - // TODO: figure out if that's correct - warn!(target: "sub-libp2p", "a new network protocol was registered \ - while the service was already active ; this is a programmer \ - error"); - } - - self.shared.protocols.write().0 - .push(RegisteredProtocol::new(handler.clone(), protocol, versions)); - - handler.initialize(&NetworkContextImpl { - inner: self.shared.clone(), - protocol: protocol.clone(), - current_peer: None, - }); - } - /// Returns network configuration. pub fn config(&self) -> &NetworkConfiguration { &self.shared.config @@ -178,13 +154,23 @@ impl NetworkService { ) } - /// Start network IO + /// Start network IO. + /// Note that we could use an iterator for `protocols`, but having a + /// generic here is too much and crashes the Rust compiler. // TODO (design): the notion of having a `NetworkService` alive should mean // that it is running ; the `start` and `stop` functions are bad design - pub fn start(&self) -> Result<(), (Error, Option)> { + pub fn start( + &self, + protocols: Vec<(Arc, ProtocolId, &[(u8, u8)])> + ) -> Result<(), (Error, Option)> { // TODO: check that service is started already? - *self.shared.protocols.write() = Default::default(); + *self.shared.protocols.write() = RegisteredProtocols( + protocols.into_iter() + .map(|(handler, protocol, versions)| + RegisteredProtocol::new(handler.clone(), protocol, versions)) + .collect() + ); // Channel we use to signal success or failure of the bg thread // initialization process. @@ -193,6 +179,17 @@ impl NetworkService { // should stop let (close_tx, close_rx) = oneshot::channel(); let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded(); + *self.shared.timeouts_register_tx.write() = timeouts_register_tx; + + // Initialize all the protocols now. + for protocol in self.shared.protocols.read().0.iter() { + protocol.custom_data().initialize(&NetworkContextImpl { + inner: self.shared.clone(), + protocol: protocol.id().clone(), + current_peer: None, + }); + } + let shared = self.shared.clone(); let join_handle = thread::spawn(move || { // Tokio core that is going to run everything in this thread. @@ -229,7 +226,6 @@ impl NetworkService { .map_err(|err| (err, self.shared.config.listen_address.clone()))?; *self.bg_thread.lock() = Some((close_tx, join_handle)); - *self.shared.timeouts_register_tx.write() = timeouts_register_tx; Ok(()) } @@ -479,6 +475,7 @@ fn init_thread( } // Explicitely connect to the boostrap nodes as a temporary measure. + trace!(target: "sub-libp2p", "Dialing bootnodes"); for bootnode in shared.config.boot_nodes.iter() { // TODO: this code is copy-pasted from `network_state`, but it is // temporary anyway @@ -491,7 +488,6 @@ fn init_thread( _ => return Err(ErrorKind::BadProtocol.into()), }; - trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id); for proto in shared.protocols.read().0.clone().into_iter() { open_peer_custom_proto( shared.clone(), @@ -1002,11 +998,8 @@ fn open_peer_custom_proto( // Don't connect to ourselves. // TODO: remove this eventually if &expected_peer_id == shared.kad_system.local_peer_id() { - return - } - - // Don't connect to a disabled peer. - if shared.network_state.is_peer_disabled(&expected_peer_id) { + trace!(target: "sub-libp2p", "Skipped connecting to {:?} because \ + it is ourselves", expected_peer_id); return } @@ -1058,15 +1051,20 @@ fn open_peer_custom_proto( } }); - if let Ok((peer_id, unique_connec)) = shared2.network_state - .custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) { - if !unique_connec.is_alive() { - trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \ - proto {:?}", peer_id, node_id, proto_id); - } + match shared2.network_state.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) { + Ok((peer_id, unique_connec)) => { + if !unique_connec.is_alive() { + trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \ + proto {:?}", peer_id, node_id, proto_id); + } - // TODO: this future should be used - let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err); + // TODO: this future should be used + let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err); + }, + Err(err) => { + trace!(target: "sub-libp2p", "Error while opening connection to + {:?} with proto {:?} => {:?}", node_id, proto_id, err); + }, } } @@ -1296,6 +1294,6 @@ mod tests { fn builds_and_finishes_in_finite_time() { // Checks that merely starting the network doesn't end up in an infinite loop. let service = NetworkService::new(Default::default(), None).unwrap(); - service.start().map_err(|(err, _)| err).unwrap(); + service.start(vec![]).map_err(|(err, _)| err).unwrap(); } } diff --git a/substrate/network-libp2p/tests/tests.rs b/substrate/network-libp2p/tests/tests.rs index f763c87e90386..1cd9456127dd6 100644 --- a/substrate/network-libp2p/tests/tests.rs +++ b/substrate/network-libp2p/tests/tests.rs @@ -47,12 +47,6 @@ impl TestProtocol { drop_session: drop_session, } } - /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc { - let handler = Arc::new(TestProtocol::new(drop_session)); - service.register_protocol(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)]); - handler - } pub fn got_packet(&self) -> bool { self.packet.lock()[..] == b"hello"[..] @@ -100,17 +94,16 @@ impl NetworkProtocolHandler for TestProtocol { #[test] fn net_service() { let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service"); - service.start().unwrap(); - service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)]); + service.start(vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]).unwrap(); } #[test] fn net_start_stop() { let config = NetworkConfiguration::new_local(); let service = NetworkService::new(config, None).unwrap(); - service.start().unwrap(); + service.start(vec![]).unwrap(); service.stop(); - service.start().unwrap(); + service.start(vec![]).unwrap(); } #[test] @@ -120,14 +113,14 @@ fn net_disconnect() { let mut config1 = NetworkConfiguration::new_local(); config1.use_secret = Some(key1.secret().clone()); config1.boot_nodes = vec![ ]; - let mut service1 = NetworkService::new(config1, None).unwrap(); - service1.start().unwrap(); - let handler1 = TestProtocol::register(&mut service1, false); + let service1 = NetworkService::new(config1, None).unwrap(); + let handler1 = Arc::new(TestProtocol::new(false)); + service1.start(vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap(); let mut config2 = NetworkConfiguration::new_local(); config2.boot_nodes = vec![ service1.external_url().unwrap() ]; - let mut service2 = NetworkService::new(config2, None).unwrap(); - service2.start().unwrap(); - let handler2 = TestProtocol::register(&mut service2, true); + let service2 = NetworkService::new(config2, None).unwrap(); + let handler2 = Arc::new(TestProtocol::new(true)); + service2.start(vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap(); while !(handler1.got_disconnect() && handler2.got_disconnect()) { thread::sleep(Duration::from_millis(50)); } @@ -138,9 +131,9 @@ fn net_disconnect() { #[test] fn net_timeout() { let config = NetworkConfiguration::new_local(); - let mut service = NetworkService::new(config, None).unwrap(); - service.start().unwrap(); - let handler = TestProtocol::register(&mut service, false); + let service = NetworkService::new(config, None).unwrap(); + let handler = Arc::new(TestProtocol::new(false)); + service.start(vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap(); while !handler.got_timeout() { thread::sleep(Duration::from_millis(50)); } diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index aa6c8aa19b19a..f8b146d19fc86 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -203,13 +203,14 @@ impl> Service { } fn start(&self) { - match self.network.start().map_err(|e| e.0.into()) { + let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]; + let protocols = vec![(self.handler.clone() as Arc<_>, self.protocol_id, &versions[..])]; + match self.network.start(protocols).map_err(|e| e.0.into()) { Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse => warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."), Err(err) => warn!("Error starting network: {}", err), _ => {}, }; - self.network.register_protocol(self.handler.clone(), self.protocol_id, &[(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]); } fn stop(&self) {