Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 66 additions & 87 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use bytes::Bytes;
use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler};
use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId};
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use libp2p;
use libp2p::multiaddr::{AddrComponent, Multiaddr};
use libp2p::kad::{KadSystem, KadConnecConfig, KadSystemConfig};
Expand Down Expand Up @@ -56,10 +56,10 @@ pub struct NetworkService {
shared: Arc<Shared>,

/// Holds the networking-running background thread alive. The `Option` is
/// `None` if the service is stopped.
/// only set to `None` in the destructor.
/// Sending a message on the channel will trigger the end of the
/// background thread. We can then wait on the join handle.
bg_thread: Mutex<Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>>,
bg_thread: Option<(oneshot::Sender<()>, thread::JoinHandle<()>)>,
}

/// Common struct shared throughout all the components of the service.
Expand All @@ -79,27 +79,32 @@ struct Shared {
/// List of protocols available on the network. It is a logic error to
/// remove protocols from this list, and the code may assume that protocols
/// stay at the same index forever.
protocols: RwLock<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>,
protocols: RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>,

/// Use this channel to send a timeout request to the background thread's
/// events loop. After the timeout, elapsed, it will call `timeout` on the
/// `NetworkProtocolHandler`. This can be closed if the background thread
/// is not running. The sender will be overwritten every time we start
/// the service.
timeouts_register_tx: RwLock<mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>>,
timeouts_register_tx: mpsc::UnboundedSender<(Duration, (Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, TimerToken))>,

/// Original address from the configuration, after being adjusted by the `Transport`.
/// Contains `None` if the network hasn't started yet.
// TODO: because we create the `Shared` before starting to listen, this
// has to be set later ; sort this out
original_listened_addr: RwLock<Option<Multiaddr>>,

/// Contains the addresses we known about ourselves.
listened_addrs: RwLock<Vec<Multiaddr>>,
}

impl NetworkService {
/// Starts IO event loop
/// Starts the networking service.
///
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
pub fn new(
config: NetworkConfiguration,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>,
filter: Option<Arc<ConnectionFilter>>
) -> Result<NetworkService, Error> {
// TODO: for now `filter` is always `None` ; remove it from the code or implement it
Expand All @@ -121,75 +126,41 @@ impl NetworkService {
known_initial_peers: network_state.known_peers(),
});

// Channel we use to signal success or failure of the bg thread
// initialization process.
let (init_tx, init_rx) = sync_mpsc::channel();
// Channel the main thread uses to signal the bg thread that it
// should stop
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();

let shared = Arc::new(Shared {
network_state,
protocols: RwLock::new(Default::default()),
protocols: RegisteredProtocols(protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
),
kad_system,
kad_upgrade: KadConnecConfig::new(),
config,
timeouts_register_tx: RwLock::new(mpsc::unbounded().0),
timeouts_register_tx,
original_listened_addr: RwLock::new(None),
listened_addrs: RwLock::new(Vec::new()),
});

Ok(NetworkService {
shared,
bg_thread: Mutex::new(None),
})
}

/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
}

pub fn external_url(&self) -> Option<String> {
// TODO: in the context of libp2p, it is hard to define what an external
// URL is, as different nodes can have multiple different ways to
// reach us
self.shared.original_listened_addr.read().as_ref()
.map(|addr|
format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
)
}

/// Start network IO.
/// Note that we could use an iterator for `protocols`, but having a
/// generic here is too much and crashes the Rust compiler.
// TODO (design): the notion of having a `NetworkService` alive should mean
// that it is running ; the `start` and `stop` functions are bad design
pub fn start(
&self,
protocols: Vec<(Arc<NetworkProtocolHandler + Send + Sync>, ProtocolId, &[(u8, u8)])>
) -> Result<(), (Error, Option<SocketAddr>)> {
// TODO: check that service is started already?

*self.shared.protocols.write() = RegisteredProtocols(
protocols.into_iter()
.map(|(handler, protocol, versions)|
RegisteredProtocol::new(handler.clone(), protocol, versions))
.collect()
);

// Channel we use to signal success or failure of the bg thread
// initialization process.
let (init_tx, init_rx) = sync_mpsc::channel();
// Channel the main thread uses to signal the bg thread that it
// should stop
let (close_tx, close_rx) = oneshot::channel();
let (timeouts_register_tx, timeouts_register_rx) = mpsc::unbounded();
*self.shared.timeouts_register_tx.write() = timeouts_register_tx;

// Initialize all the protocols now.
for protocol in self.shared.protocols.read().0.iter() {
// TODO: what about failure to initialize? we can't uninitialize a protocol
// TODO: remove this `initialize` method eventually, as it's only used for timers
for protocol in shared.protocols.0.iter() {
protocol.custom_data().initialize(&NetworkContextImpl {
inner: self.shared.clone(),
inner: shared.clone(),
protocol: protocol.id().clone(),
current_peer: None,
});
}

let shared = self.shared.clone();
let shared_clone = shared.clone();
let join_handle = thread::spawn(move || {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
Expand All @@ -200,8 +171,7 @@ impl NetworkService {
}
};

let fut = match init_thread(shared,
timeouts_register_rx, close_rx) {
let fut = match init_thread(shared_clone, timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
let _ = init_tx.send(Ok(()));
Expand All @@ -219,23 +189,27 @@ impl NetworkService {
}
});

init_rx.recv().expect("libp2p background thread panicked")
.map_err(|err| (err, self.shared.config.listen_address.clone()))?;
init_rx.recv().expect("libp2p background thread panicked")?;

*self.bg_thread.lock() = Some((close_tx, join_handle));
Ok(())
Ok(NetworkService {
shared,
bg_thread: Some((close_tx, join_handle)),
})
}

/// Stop network IO.
pub fn stop(&self) {
if let Some((close_tx, join)) = self.bg_thread.lock().take() {
let _ = close_tx.send(());
if let Err(e) = join.join() {
warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
}
}
/// Returns network configuration.
pub fn config(&self) -> &NetworkConfiguration {
&self.shared.config
}

debug_assert!(!self.shared.network_state.has_connected_peer());
pub fn external_url(&self) -> Option<String> {
// TODO: in the context of libp2p, it is hard to define what an external
// URL is, as different nodes can have multiple different ways to
// reach us
self.shared.original_listened_addr.read().as_ref()
.map(|addr|
format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58())
)
}

/// Get a list of all connected peers by id.
Expand Down Expand Up @@ -269,7 +243,7 @@ impl NetworkService {
pub fn with_context_eval<F, T>(&self, protocol: ProtocolId, action: F)
-> Option<T>
where F: FnOnce(&NetworkContext) -> T {
if !self.shared.protocols.read().has_protocol(protocol) {
if !self.shared.protocols.has_protocol(protocol) {
return None
}

Expand All @@ -283,7 +257,14 @@ impl NetworkService {

impl Drop for NetworkService {
fn drop(&mut self) {
self.stop()
if let Some((close_tx, join)) = self.bg_thread.take() {
let _ = close_tx.send(());
if let Err(e) = join.join() {
warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e);
}
}

debug_assert!(!self.shared.network_state.has_connected_peer());
}
}

Expand All @@ -306,7 +287,7 @@ impl NetworkContext for NetworkContextImpl {
packet_id: PacketId,
data: Vec<u8>
) {
debug_assert!(self.inner.protocols.read().has_protocol(protocol),
debug_assert!(self.inner.protocols.has_protocol(protocol),
"invalid protocol id requested in the API of the libp2p networking");
// TODO: could be "optimized" by building `message` only after checking the validity of
// the peer, but that's probably not worth the effort
Expand Down Expand Up @@ -360,12 +341,11 @@ impl NetworkContext for NetworkContextImpl {
fn register_timer(&self, token: usize, duration: Duration)
-> Result<(), Error> {
let handler = self.inner.protocols
.read()
.find_protocol(self.protocol)
.ok_or(ErrorKind::BadProtocol)?
.custom_data()
.clone();
self.inner.timeouts_register_tx.read()
self.inner.timeouts_register_tx
.unbounded_send((duration, (handler, self.protocol, token)))
.map_err(|err| ErrorKind::Io(IoError::new(IoErrorKind::Other, err)))?;
Ok(())
Expand Down Expand Up @@ -485,7 +465,7 @@ fn init_thread(
match shared.network_state.add_peer(bootnode) {
Ok(who) => {
trace!(target: "sub-libp2p", "Dialing bootnode {:?}", who);
for proto in shared.protocols.read().0.clone().into_iter() {
for proto in shared.protocols.0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
transport.clone(),
Expand All @@ -510,7 +490,7 @@ fn init_thread(

if let Ok(addr) = multi {
trace!(target: "sub-libp2p", "Missing NodeIndex for Bootnode {:}. Querying", bootnode);
for proto in shared.protocols.read().0.clone().into_iter() {
for proto in shared.protocols.0.clone().into_iter() {
connect_with_query_peer_id(
shared.clone(),
transport.clone(),
Expand Down Expand Up @@ -1025,7 +1005,7 @@ fn connect_to_nodes<T, To, St, C>(
// Try to dial that node for each registered protocol. Since dialing
// upgrades the connection to use multiplexing, dialing multiple times
// should automatically open multiple substreams.
for proto in shared.protocols.read().0.clone().into_iter() {
for proto in shared.protocols.0.clone().into_iter() {
open_peer_custom_proto(
shared.clone(),
base_transport.clone(),
Expand Down Expand Up @@ -1398,7 +1378,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
type UpgradeIdentifier = <RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier;

fn protocol_names(&self) -> Self::NamesIter {
ConnectionUpgrade::<C, Maf>::protocol_names(&*self.0.protocols.read())
ConnectionUpgrade::<C, Maf>::protocol_names(&self.0.protocols)
}

type Output = <RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>> as ConnectionUpgrade<C, Maf>>::Output;
Expand All @@ -1409,7 +1389,7 @@ where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, endpoint: Endpoint,
remote_addr: Maf) -> Self::Future
{
self.0.protocols.read()
self.0.protocols
.clone()
.upgrade(socket, id, endpoint, remote_addr)
}
Expand All @@ -1422,7 +1402,6 @@ mod tests {
#[test]
fn builds_and_finishes_in_finite_time() {
// Checks that merely starting the network doesn't end up in an infinite loop.
let service = NetworkService::new(Default::default(), None).unwrap();
service.start(vec![]).map_err(|(err, _)| err).unwrap();
let _service = NetworkService::new(Default::default(), vec![], None).unwrap();
}
}
25 changes: 8 additions & 17 deletions substrate/network-libp2p/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,11 @@ impl NetworkProtocolHandler for TestProtocol {

#[test]
fn net_service() {
let service = NetworkService::new(NetworkConfiguration::new_local(), None).expect("Error creating network service");
service.start(vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])]).unwrap();
}

#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
service.start(vec![]).unwrap();
service.stop();
service.start(vec![]).unwrap();
let _service = NetworkService::new(
NetworkConfiguration::new_local(),
vec![(Arc::new(TestProtocol::new(false)), *b"myp", &[(1u8, 1)])],
None
).expect("Error creating network service");
}

#[test]
Expand All @@ -113,14 +107,12 @@ fn net_disconnect() {
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let service1 = NetworkService::new(config1, None).unwrap();
let handler1 = Arc::new(TestProtocol::new(false));
service1.start(vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let service1 = NetworkService::new(config1, vec![(handler1.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.external_url().unwrap() ];
let service2 = NetworkService::new(config2, None).unwrap();
let handler2 = Arc::new(TestProtocol::new(true));
service2.start(vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let _service2 = NetworkService::new(config2, vec![(handler2.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
}
Expand All @@ -131,9 +123,8 @@ fn net_disconnect() {
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::new(config, None).unwrap();
let handler = Arc::new(TestProtocol::new(false));
service.start(vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])]).unwrap();
let _service = NetworkService::new(config, vec![(handler.clone(), *b"tst", &[(42u8, 1), (43u8, 1)])], None).unwrap();
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));
}
Expand Down
Loading