Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 135 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions substrate/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "fad12c89ea2b6f1f6420557db6e9305fb03f9f67", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
libp2p = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
ethereum-types = "0.3"
Expand All @@ -20,10 +20,10 @@ parking_lot = "0.5"
libc = "0.2"
log = "0.3"
rand = "0.5.0"
tokio-core = "0.1"
tokio = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
varint = { git = "https://github.com/libp2p/rust-libp2p" }
varint = { git = "https://github.com/tomaka/libp2p-rs", branch = "polkadot-2" }

[dev-dependencies]
assert_matches = "1.2"
Expand Down
3 changes: 1 addition & 2 deletions substrate/network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

extern crate parking_lot;
extern crate fnv;
#[macro_use]
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_timer;
extern crate ethkey;
Expand Down
8 changes: 2 additions & 6 deletions substrate/network-libp2p/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,6 @@ impl NetworkState {
peer_info.id,
peer_info.kad_connec.is_alive(),
peer_info.protocols.iter().filter(|c| c.1.is_alive()).count());
// TODO: we manually clear the connections as a work-around for
// networking bugs ; normally it should automatically drop
for c in peer_info.protocols.iter() { c.1.clear(); }
peer_info.kad_connec.clear();
let old = connections.peer_by_nodeid.remove(&peer_info.id);
debug_assert_eq!(old, Some(who));
}
Expand Down Expand Up @@ -852,11 +848,11 @@ fn parse_and_add_to_node_store(
NodeStore::Memory(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
NodeStore::Json(ref node_store) =>
node_store
.peer_or_create(&who)
.add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
.set_addr_ttl(addr, Duration::from_secs(100000 * 365 * 24 * 3600)),
}

Ok(who)
Expand Down
50 changes: 28 additions & 22 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::thread;
use std::time::{Duration, Instant};
use futures::{future, Future, Stream, IntoFuture};
use futures::sync::{mpsc, oneshot};
use tokio_core::reactor::{Core, Handle};
use tokio::runtime::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Interval, Deadline};

Expand Down Expand Up @@ -118,7 +118,7 @@ impl NetworkService {
local_peer_id: local_peer_id.clone(),
kbuckets_timeout: Duration::from_secs(600),
request_timeout: Duration::from_secs(10),
known_initial_peers: network_state.known_peers().collect(),
known_initial_peers: network_state.known_peers(),
});

let shared = Arc::new(Shared {
Expand Down Expand Up @@ -191,16 +191,16 @@ impl NetworkService {

let shared = self.shared.clone();
let join_handle = thread::spawn(move || {
// Tokio core that is going to run everything in this thread.
let mut core = match Core::new() {
// Tokio runtime that is going to run everything in this thread.
let mut runtime = match current_thread::Runtime::new() {
Ok(c) => c,
Err(err) => {
let _ = init_tx.send(Err(err.into()));
return
}
};

let fut = match init_thread(core.handle(), shared,
let fut = match init_thread(shared,
timeouts_register_rx, close_rx) {
Ok(future) => {
debug!(target: "sub-libp2p", "Successfully started networking service");
Expand All @@ -213,7 +213,7 @@ impl NetworkService {
}
};

match core.run(fut) {
match runtime.block_on(fut) {
Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"),
Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err),
}
Expand Down Expand Up @@ -395,7 +395,6 @@ impl NetworkContext for NetworkContextImpl {
/// - `timeouts_register_rx` should receive newly-registered timeouts.
/// - `close_rx` should be triggered when we want to close the network.
fn init_thread(
core: Handle,
shared: Arc<Shared>,
timeouts_register_rx: mpsc::UnboundedReceiver<
(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))
Expand All @@ -405,7 +404,6 @@ fn init_thread(
// Build the transport layer.
let transport = {
let base = transport::build_transport(
core.clone(),
transport::UnencryptedAllowed::Denied,
shared.network_state.local_private_key().clone()
);
Expand Down Expand Up @@ -535,7 +533,7 @@ fn init_thread(

// Build the timeouts system for the `register_timeout` function.
// (note: this has nothing to do with socket timeouts)
let timeouts = timeouts::build_timeouts_stream(core.clone(), timeouts_register_rx)
let timeouts = timeouts::build_timeouts_stream(timeouts_register_rx)
.for_each({
let shared = shared.clone();
move |(handler, protocol_id, timer_token)| {
Expand Down Expand Up @@ -630,7 +628,7 @@ fn listener_handle<'a, C>(
match shared.network_state.ping_connection(node_id.clone()) {
Ok((_, ping_connec)) => {
trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id);
let fut = ping_connec.set_until(pinger, future);
let fut = ping_connec.tie_or_passthrough(pinger, future);
Box::new(fut) as Box<_>
},
Err(err) => Box::new(future::err(err)) as Box<_>
Expand Down Expand Up @@ -687,7 +685,7 @@ fn handle_kademlia_connection(
val
});

Ok(kad_connec.set_until(controller, future))
Ok(kad_connec.tie_or_passthrough(controller, future))
}

/// When a remote performs a `FIND_NODE` Kademlia request for `searched`,
Expand Down Expand Up @@ -823,7 +821,7 @@ fn handle_custom_connection(
});

let val = (custom_proto_out.outgoing, custom_proto_out.protocol_version);
let final_fut = unique_connec.set_until(val, fut)
let final_fut = unique_connec.tie_or_stop(val, fut)
.then(move |val| {
// Makes sure that `dc_guard` is kept alive until here.
drop(dc_guard);
Expand Down Expand Up @@ -950,7 +948,7 @@ fn perform_kademlia_query<T, To, St, C>(
let random_peer_id = random_key.into_peer_id();
trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", random_peer_id);

shared.clone()
let future = shared.clone()
.kad_system
.find_node(random_peer_id, {
let shared = shared.clone();
Expand All @@ -974,7 +972,10 @@ fn perform_kademlia_query<T, To, St, C>(
)
.into_future()
.map_err(|(err, _)| err)
.map(|_| ())
.map(|_| ());

// Note that we use a `Box` in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}

/// Connects to additional nodes, if necessary.
Expand Down Expand Up @@ -1163,8 +1164,7 @@ fn open_peer_custom_proto<T, To, St, C>(
);
}

// TODO: this future should be used
let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err);
unique_connec.dial(&swarm_controller, &addr, with_err);
},
Err(err) => {
trace!(target: "sub-libp2p",
Expand Down Expand Up @@ -1200,11 +1200,14 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>,
})
});

shared.network_state
let future = shared.network_state
.kad_connection(who.clone())
.into_future()
.map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport))
.flatten()
.map(move |(_, k)| k.dial(&swarm_controller, &addr, transport))
.flatten();

// Note that we use a Box in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}

/// Processes the information about a node.
Expand Down Expand Up @@ -1305,7 +1308,7 @@ fn ping_all<T, St, C>(

let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes()));
let fut = pinger
.get_or_dial(&swarm_controller, &addr, transport.clone())
.dial(&swarm_controller, &addr, transport.clone())
.and_then(move |mut p| {
trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who);
p.ping()
Expand Down Expand Up @@ -1334,7 +1337,7 @@ fn ping_all<T, St, C>(
ping_futures.push(fut);
}

future::loop_fn(ping_futures, |ping_futures| {
let future = future::loop_fn(ping_futures, |ping_futures| {
if ping_futures.is_empty() {
let fut = future::ok(future::Loop::Break(()));
return future::Either::A(fut)
Expand All @@ -1344,7 +1347,10 @@ fn ping_all<T, St, C>(
.map(|((), _, rest)| future::Loop::Continue(rest))
.map_err(|(err, _, _)| err);
future::Either::B(fut)
})
});

// Note that we use a Box in order to speed up compilation.
Box::new(future) as Box<Future<Item = _, Error = _>>
}

/// Expects a multiaddr of the format `/p2p/<node_id>` and returns the node ID.
Expand Down
39 changes: 21 additions & 18 deletions substrate/network-libp2p/src/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.?

use futures::{Async, future, Future, Poll, stream, Stream, sync::mpsc};
use std::io::Error as IoError;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio_core::reactor::{Handle, Timeout};
use tokio_timer::{self, Delay};

/// Builds the timeouts system.
///
Expand All @@ -27,21 +27,18 @@ use tokio_core::reactor::{Handle, Timeout};
/// `T` can be anything you want, as it is transparently passed from the input
/// to the output. Timeouts continue to fire forever, as there is no way to
/// unregister them.
pub fn build_timeouts_stream<T>(
core: Handle,
pub fn build_timeouts_stream<'a, T>(
timeouts_rx: mpsc::UnboundedReceiver<(Duration, T)>
) -> impl Stream<Item = T, Error = IoError>
where T: Clone {
) -> Box<Stream<Item = T, Error = IoError> + 'a>
where T: Clone + 'a {
let next_timeout = next_in_timeouts_stream(timeouts_rx);

// The `unfold` function is essentially a loop turned into a stream. The
// first parameter is the initial state, and the closure returns the new
// state and an item.
stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
let stream = stream::unfold(vec![future::Either::A(next_timeout)], move |timeouts| {
// `timeouts` is a `Vec` of futures that produce an `Out`.

let core = core.clone();

// `select_ok` panics if `timeouts` is empty anyway.
if timeouts.is_empty() {
return None
Expand All @@ -53,8 +50,7 @@ pub fn build_timeouts_stream<T>(
Out::NewTimeout((Some((duration, item)), next_timeouts)) => {
// Received a new timeout request on the channel.
let next_timeout = next_in_timeouts_stream(next_timeouts);
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
timeouts.push(future::Either::A(next_timeout));
Expand All @@ -66,16 +62,18 @@ pub fn build_timeouts_stream<T>(
Out::Timeout(duration, item) => {
// A timeout has happened.
let returned = item.clone();
let at = Instant::now() + duration;
let timeout = Timeout::new_at(at, &core)?;
let timeout = Delay::new(Instant::now() + duration);
let timeout = TimeoutWrapper(timeout, duration, Some(item), PhantomData);
timeouts.push(future::Either::B(timeout));
Ok((Some(returned), timeouts))
},
}
)
)
}).filter_map(|item| item)
}).filter_map(|item| item);

// Note that we use a `Box` in order to speed up compilation time.
Box::new(stream) as Box<Stream<Item = _, Error = _>>
}

/// Local enum representing the output of the selection.
Expand All @@ -97,15 +95,20 @@ fn next_in_timeouts_stream<T, B>(
.map_err(|_| unreachable!("an UnboundedReceiver can never error"))
}

/// Does the equivalent to `future.map(move |()| (duration, item))`.
/// Does the equivalent to `future.map(move |()| (duration, item)).map_err(|err| to_io_err(err))`.
struct TimeoutWrapper<A, F, T>(F, Duration, Option<T>, PhantomData<A>);
impl<A, F, T> Future for TimeoutWrapper<A, F, T>
where F: Future<Item = ()> {
where F: Future<Item = (), Error = tokio_timer::Error> {
type Item = Out<A, T>;
type Error = F::Error;
type Error = IoError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let _ready: () = try_ready!(self.0.poll());
match self.0.poll() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => return Err(IoError::new(IoErrorKind::Other, err.to_string())),
}

let out = Out::Timeout(self.1, self.2.take().expect("poll() called again after success"));
Ok(Async::Ready(out))
}
Expand Down
4 changes: 1 addition & 3 deletions substrate/network-libp2p/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ use libp2p::{self, Transport, mplex, secio, yamux};
use libp2p::core::{MuxedTransport, either, upgrade};
use libp2p::transport_timeout::TransportTimeout;
use std::time::Duration;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};

/// Builds the transport that serves as a common ground for all connections.
pub fn build_transport(
core: Handle,
unencrypted_allowed: UnencryptedAllowed,
local_private_key: secio::SecioKeyPair
) -> impl MuxedTransport<Output = impl AsyncRead + AsyncWrite> + Clone {
let base = libp2p::CommonTransport::new(core)
let base = libp2p::CommonTransport::new()
.with_upgrade({
let secio = secio::SecioConfig {
key: local_private_key,
Expand Down