Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions substrate/network-libp2p/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,6 @@ impl NetworkState {
self.disabled_peers.lock().insert(peer_info.id.clone(), timeout);
}

/// Returns true if a peer is disabled.
pub fn is_peer_disabled(&self, node_id: &PeerstorePeerId) -> bool {
is_peer_disabled(&self.disabled_peers, node_id)
}

/// Flushes the caches to the disk.
///
/// This is done in an atomical way, so that an error doesn't corrupt
Expand Down
82 changes: 40 additions & 42 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,30 +138,6 @@ impl NetworkService {
})
}

/// Register a new protocol handler with the event loop.
pub fn register_protocol(
&self,
handler: Arc<NetworkProtocolHandler + Send + Sync>,
protocol: ProtocolId,
versions: &[(u8, u8)]
) {
if self.shared.network_state.has_connected_peer() {
// TODO: figure out if that's correct
warn!(target: "sub-libp2p", "a new network protocol was registered \
while the service was already active ; this is a programmer \
error");
}

self.shared.protocols.write().0
.push(RegisteredProtocol::new(handler.clone(), protocol, versions));

handler.initialize(&NetworkContextImpl {
inner: self.shared.clone(),
protocol: protocol.clone(),
current_peer: None,
});
}

/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
Expand All @@ -178,13 +154,23 @@ impl NetworkService {
)
}

/// Start network IO
/// Start network IO.
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
// TODO (design): the notion of having a `NetworkService` alive should mean
// that it is running ; the `start` and `stop` functions are bad design
pub fn start(&self) -> Result<(), (Error, Option<SocketAddr>)> {
pub fn start(
&self,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<(), (Error, Option<SocketAddr>)> {
// TODO: check that service is started already?

*self.shared.protocols.write() = Default::default();
*self.shared.protocols.write() = RegisteredProtocols(
protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
);

// Channel we use to signal success or failure of the bg thread
// initialization process.
Expand All @@ -193,6 +179,17 @@ impl NetworkService {
// should stop
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;

// Initialize all the protocols now.
for protocol in self.shared.protocols.read().0.iter() {
protocol.custom_data().initialize(&NetworkContextImpl {
inner: self.shared.clone(),
protocol: protocol.id().clone(),
current_peer: None,
});
}

let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
Expand Down Expand Up @@ -229,7 +226,6 @@ impl NetworkService {
.map_err(|err| (err, self.shared.config.listen_address.clone()))?;

*self.bg_thread.lock() = Some((close_tx, join_handle));
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;
Ok(())
}

Expand Down Expand Up @@ -479,6 +475,7 @@ fn init_thread(
}

// Explicitely connect to the boostrap nodes as a temporary measure.
trace!(target: "sub-libp2p", "Dialing bootnodes");
for bootnode in shared.config.boot_nodes.iter() {
// TODO: this code is copy-pasted from `network_state`, but it is
// temporary anyway
Expand All @@ -491,7 +488,6 @@ fn init_thread(
_ => return Err(ErrorKind::BadProtocol.into()),
};

trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
Expand Down Expand Up @@ -1002,11 +998,8 @@ fn open_peer_custom_proto<T, To, St, C>(
// Don't connect to ourselves.
// TODO: remove this eventually
if &expected_peer_id == shared.kad_system.local_peer_id() {
return
}

// Don't connect to a disabled peer.
if shared.network_state.is_peer_disabled(&expected_peer_id) {
trace!(target: "sub-libp2p", "Skipped connecting to {:?} because \
it is ourselves", expected_peer_id);
return
}

Expand Down Expand Up @@ -1058,15 +1051,20 @@ fn open_peer_custom_proto<T, To, St, C>(
}
});

if let Ok((peer_id, unique_connec)) = shared2.network_state
.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}
match shared2.network_state.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) {
Ok((peer_id, unique_connec)) => {
if !unique_connec.is_alive() {
trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \
proto {:?}", peer_id, node_id, proto_id);
}

// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p", "Error while opening connection to
{:?} with proto {:?} => {:?}", node_id, proto_id, err);
},
}
}

Expand Down
5 changes: 3 additions & 2 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,14 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
}

fn start(&self) {
match self.network.start().map_err(|e| e.0.into()) {
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(self.handler.clone() as Arc<_>, self.protocol_id, &versions[..])];
match self.network.start(protocols).map_err(|e| e.0.into()) {
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
self.network.register_protocol(self.handler.clone(), self.protocol_id, &[(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)]);
}

fn stop(&self) {
Expand Down