Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions substrate/network-libp2p/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,6 @@ impl NetworkState {
self.disabled_peers.lock().insert(peer_info.id.clone(), timeout);
}

/// Returns true if a peer is disabled.
pub fn is_peer_disabled(&self, node_id: &PeerstorePeerId) -> bool {
is_peer_disabled(&self.disabled_peers, node_id)
}

/// Flushes the caches to the disk.
///
/// This is done in an atomical way, so that an error doesn't corrupt
Expand Down Expand Up @@ -887,7 +882,6 @@ fn open_priv_key_file<P>(path: P) -> Result<fs::File, IoError>

#[cfg(test)]
mod tests {
use futures::sync::mpsc;
use libp2p::core::{Endpoint, PublicKey};
use network_state::NetworkState;

Expand Down
84 changes: 41 additions & 43 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,6 @@ impl NetworkService {
})
}

/// Register a new protocol handler with the event loop.
pub fn register_protocol(
&self,
handler: Arc<NetworkProtocolHandler + Send + Sync>,
protocol: ProtocolId,
versions: &[(u8, u8)]
) {
if self.shared.network_state.has_connected_peer() {
// TODO: figure out if that's correct
warn!(target: "sub-libp2p", "a new network protocol was registered \
while the service was already active ; this is a programmer \
error");
}

self.shared.protocols.write().0
.push(RegisteredProtocol::new(handler.clone(), protocol, versions));

handler.initialize(&NetworkContextImpl {
inner: self.shared.clone(),
protocol: protocol.clone(),
current_peer: None,
});
}

/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
Expand All @@ -178,13 +154,23 @@ impl NetworkService {
)
}

/// Start network IO
/// Start network IO.
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
// TODO (design): the notion of having a `NetworkService` alive should mean
// that it is running ; the `start` and `stop` functions are bad design
pub fn start(&self) -> Result<(), (Error, Option<SocketAddr>)> {
pub fn start(
&self,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<(), (Error, Option<SocketAddr>)> {
// TODO: check that service is started already?

*self.shared.protocols.write() = Default::default();
*self.shared.protocols.write() = RegisteredProtocols(
protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
);

// Channel we use to signal success or failure of the bg thread
// initialization process.
Expand All @@ -193,6 +179,17 @@ impl NetworkService {
// should stop
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;

// Initialize all the protocols now.
for protocol in self.shared.protocols.read().0.iter() {
protocol.custom_data().initialize(&NetworkContextImpl {
inner: self.shared.clone(),
protocol: protocol.id().clone(),
current_peer: None,
});
}

let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
Expand Down Expand Up @@ -229,7 +226,6 @@ impl NetworkService {
.map_err(|err| (err, self.shared.config.listen_address.clone()))?;

*self.bg_thread.lock() = Some((close_tx, join_handle));
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;
Ok(())
}

Expand Down Expand Up @@ -479,6 +475,7 @@ fn init_thread(
}

// Explicitely connect to the boostrap nodes as a temporary measure.
trace!(target: "sub-libp2p", "Dialing bootnodes");
for bootnode in shared.config.boot_nodes.iter() {
// TODO: this code is copy-pasted from `network_state`, but it is
// temporary anyway
Expand All @@ -491,7 +488,6 @@ fn init_thread(
_ => return Err(ErrorKind::BadProtocol.into()),
};

trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
Expand Down Expand Up @@ -1002,11 +998,8 @@ fn open_peer_custom_proto<T, To, St, C>(
// Don't connect to ourselves.
// TODO: remove this eventually
if &expected_peer_id == shared.kad_system.local_peer_id() {
return
}

// Don't connect to a disabled peer.
if shared.network_state.is_peer_disabled(&expected_peer_id) {
trace!(target: "sub-libp2p", "Skipped connecting to {:?} because \
it is ourselves", expected_peer_id);
return
}

Expand Down Expand Up @@ -1058,15 +1051,20 @@ fn open_peer_custom_proto<T, To, St, C>(
}
});

if let Ok((peer_id, unique_connec)) = shared2.network_state
.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}
match shared2.network_state.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
Ok((peer_id, unique_connec)) => {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}

// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p", "Error while opening connection to
{:?} with proto {:?} => {:?}", node_id, proto_id, err);
},
}
}

Expand Down Expand Up @@ -1296,6 +1294,6 @@ mod tests {
fn builds_and_finishes_in_finite_time() {
// Checks that merely starting the network doesn't end up in an infinite loop.
let service = NetworkService::new(Default::default(), None).unwrap();
service.start().map_err(|(err, _)| err).unwrap();
service.start(vec![]).map_err(|(err, _)| err).unwrap();
}
}
31 changes: 12 additions & 19 deletions substrate/network-libp2p/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ impl TestProtocol {
drop_session: drop_session,
}
}
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService, drop_session: bool) -> Arc<TestProtocol> {
let handler = Arc::new(TestProtocol::new(drop_session));
service.register_protocol(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)]);
handler
}

pub fn got_packet(&self) -> bool {
self.packet.lock()[..] == b"hello"[..]
Expand Down Expand Up @@ -100,17 +94,16 @@ impl NetworkProtocolHandler for TestProtocol {
#[test]
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)]);
service.start(vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]).unwrap();
}

#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
service.start().unwrap();
service.start(vec![]).unwrap();
service.stop();
service.start().unwrap();
service.start(vec![]).unwrap();
}

#[test]
Expand All @@ -120,14 +113,14 @@ fn net_disconnect() {
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::new(config1, None).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let service1 = NetworkService::new(config1, None).unwrap();
let handler1 = Arc::new(TestProtocol::new(false));
service1.start(vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let mut service2 = NetworkService::new(config2, None).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, true);
let service2 = NetworkService::new(config2, None).unwrap();
let handler2 = Arc::new(TestProtocol::new(true));
service2.start(vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
Expand All @@ -138,9 +131,9 @@ fn net_disconnect() {
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let mut service = NetworkService::new(config, None).unwrap();
service.start().unwrap();
let handler = TestProtocol::register(&mut service, false);
let service = NetworkService::new(config, None).unwrap();
let handler = Arc::new(TestProtocol::new(false));
service.start(vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
Expand Down
5 changes: 3 additions & 2 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,14 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
}

fn start(&self) {
match self.network.start().map_err(|e| e.0.into()) {
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(self.handler.clone() as Arc<_>, self.protocol_id, &versions[..])];
match self.network.start(protocols).map_err(|e| e.0.into()) {
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
self.network.register_protocol(self.handler.clone(), self.protocol_id, &[(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]);
}

fn stop(&self) {
Expand Down