diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 67bf9469342..b0fb41c8a87 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -40,77 +40,261 @@ //! `MuxedTransport` trait. use fnv::FnvHashMap; -use futures::future::{self, Either, FutureResult}; -use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, FutureResult}; use futures::stream::FuturesUnordered; -use futures::sync::mpsc; +use futures::{stream, task, Async, Future, Poll, Stream}; use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; -use std::io::{self, Error as IoError}; -use std::sync::Arc; +use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; use upgrade::ConnectionUpgrade; +use std::clone::Clone; + /// Allows reusing the same muxed connection multiple times. /// /// Can be created from an `UpgradedNode` through the `From` trait. /// /// Implements the `Transport` trait. -#[derive(Clone)] pub struct ConnectionReuse where T: Transport, - T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + Clone, +{ + /// Struct shared between most of the `ConnectionReuse` infrastructure. + shared: Arc>, +} + +enum PeerState +where + M: StreamMuxer + Clone, +{ + /// Connection is active and can be used to open substreams. + Active { + /// The muxer to open new substreams. + muxer: M, + /// Future of the address of the client. + client_addr: Multiaddr, + /// Unique identifier for this connection in the `ConnectionReuse`. + connection_id: usize, + /// Number of open substreams. + num_substreams: usize, + /// Id of the listener that created this connection, or `None` if it was opened by a + /// dialer. + listener_id: Option, + }, + + /// Connection is pending. + // TODO: stronger Future type + Pending { + /// Future that produces the muxer. + future: Box>, + /// All the tasks to notify when `future` resolves. + notify: FnvHashMap, + /// Unique identifier for this connection in the `ConnectionReuse`. + connection_id: usize, + }, + + /// An earlier connection attempt errored. + Errored(IoError), +} + +/// Struct shared between most of the `ConnectionReuse` infrastructure. +// #[derive(Clone)] +struct Shared +where + T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { - // Underlying transport and connection upgrade for when we need to dial or listen. - inner: UpgradedNode, + /// Underlying transport and connection upgrade, used when we need to dial or listen. + transport: UpgradedNode, + + /// All the connections that were opened, whether successful and/or active or not. + // TODO: this will grow forever + connections: Mutex>>, + + /// Tasks to notify when one or more new elements were added to `connections`. + notify_on_new_connec: Mutex>, - // Struct shared between most of the `ConnectionReuse` infrastructure. - shared: Arc>>, + /// Next `connection_id` to use when opening a connection. + next_connection_id: AtomicUsize, + + /// Next `listener_id` for the next listener we create. + next_listener_id: AtomicUsize, } -struct Shared +impl Shared where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, { - // List of active muxers. - active_connections: FnvHashMap, + /// notify all `Tasks` in `self.notify_on_new_connec` that a new connection was established + /// consume the tasks in that process + fn new_con_notify(&self) { + let mut notifiers = self.notify_on_new_connec.lock(); + for to_notify in notifiers.drain() { + to_notify.1.notify(); + } + } + + /// generate a new connection id + #[inline] + pub fn gen_next_connection_id(&self) -> usize { + self.next_connection_id.fetch_add(1, Ordering::Relaxed) + } - // List of pending inbound substreams from dialed nodes. - // Only add to this list elements received through `add_to_next_rx`. - next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>, + /// generate a new listener id + #[inline] + pub fn gen_next_listener_id(&self) -> usize { + self.next_listener_id.fetch_add(1, Ordering::Relaxed) + } - // New elements are not directly added to `next_incoming`. Instead they are sent to this - // channel. This is done so that we can wake up tasks whenever a new element is added. - add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>, + /// Insert a new connection, returns Some if an entry was present, notify listeners + pub fn insert_connection( + &self, + addr: Multiaddr, + state: PeerState, + ) -> Option> { + let r = self.connections.lock().insert(addr, state); + self.new_con_notify(); + r + } - // Other side of `add_to_next_rx`. - add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>, + /// Removes one substream from an active connection. Closes the connection if necessary. + pub fn remove_substream(&self, connec_id: usize, addr: &Multiaddr) { + self.connections.lock().retain(|_, connec| { + if let PeerState::Active { + connection_id, + ref mut num_substreams, + .. + } = connec + { + if *connection_id == connec_id { + *num_substreams -= 1; + if *num_substreams == 0 { + trace!( + "All substreams to {} closed ; closing main connection", + addr + ); + return false; + } + } + } + true + }); + } + + /// Polls the incoming substreams on all the incoming connections that match the `listener`. + /// + /// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if + /// one or more connections are matching the `listener` but they are not ready. + fn poll_incoming( + &self, + listener: Option, + ) -> Poll::Substream, usize, Multiaddr)>, IoError> { + // Keys of the elements in `connections` to remove afterwards. + let mut to_remove = Vec::new(); + // Substream to return, if any found. + let mut ret_value = None; + let mut found_one = false; + + for (addr, state) in self.connections.lock().iter_mut() { + let res = { + if let PeerState::Active { + ref mut muxer, + ref mut num_substreams, + connection_id, + client_addr, + listener_id, + } = state + { + if *listener_id == listener { + continue; + } + found_one = true; + + match muxer.clone().inbound().poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("New incoming substream from {}", client_addr); + *num_substreams += 1; + Ok((inner, connection_id.clone(), client_addr.clone())) + } + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + continue; + } + Ok(Async::NotReady) => continue, + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!( + "Error while opening inbound substream to {}: {:?}", + addr, + err + ); + to_remove.push(addr.clone()); + Err(err) + } + } + } else { + if listener.is_none() { + if let PeerState::Pending { ref mut notify, .. } = state { + notify.insert(TASK_ID.with(|&t| t), task::current()); + } + } + continue; + } + }; + + ret_value = Some(res); + break; + } + + for to_remove in to_remove { + self.connections.lock().remove(&to_remove); + } + + match ret_value { + Some(Ok(val)) => Ok(Async::Ready(Some(val))), + Some(Err(err)) => Err(err), + None => { + if found_one { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + } + } + } } impl From> for ConnectionReuse where T: Transport, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade, - C::Output: StreamMuxer, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + Clone, { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { - let (tx, rx) = mpsc::unbounded(); - ConnectionReuse { - inner: node, - shared: Arc::new(Mutex::new(Shared { - active_connections: Default::default(), - next_incoming: Vec::new(), - add_to_next_rx: rx, - add_to_next_tx: tx, - })), + shared: Arc::new(Shared { + transport: node, + connections: Default::default(), + notify_on_new_connec: Default::default(), + next_connection_id: Default::default(), + next_listener_id: Default::default(), + }), } } } @@ -121,22 +305,23 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, + ::Substream: Clone, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + C::NamesIter: Clone, + UpgradedNode: Clone, { - type Output = ::Substream; + type Output = ConnectionReuseSubstream; type MultiaddrFuture = future::FutureResult; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; - type Dial = Box>; + type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { + let (listener, new_addr) = match self.shared.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), - Err((inner, addr)) => { + Err((_, addr)) => { return Err(( ConnectionReuse { - inner: inner, shared: self.shared, }, addr, @@ -145,88 +330,58 @@ where }; let listener = listener - .fuse() .map(|upgr| { upgr.and_then(|(out, addr)| { + trace!("Waiting for remote's address as listener"); addr.map(move |addr| (out, addr)) }) - }); + }) + .fuse(); + + let listener_id = self.shared.gen_next_listener_id(); let listener = ConnectionReuseListener { - shared: self.shared.clone(), - listener: listener, + shared: self.shared, + listener, + listener_id, current_upgrades: FuturesUnordered::new(), - connections: Vec::new(), }; Ok((Box::new(listener) as Box<_>, new_addr)) } + #[inline] fn dial(self, addr: Multiaddr) -> Result { - // If we already have an active connection, use it! - let substream = if let Some(muxer) = self.shared - .lock() - .active_connections - .get(&addr) - .map(|muxer| muxer.clone()) - { - let a = addr.clone(); - Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, future::ok(a))))) - } else { - Either::B(future::ok(None)) - }; - - let shared = self.shared.clone(); - let inner = self.inner; - let future = substream.and_then(move |outbound| { - if let Some(o) = outbound { - debug!("Using existing multiplexed connection to {}", addr); - return Either::A(future::ok(o)); - } - // The previous stream muxer did not yield a new substream => start new dial - debug!("No existing connection to {}; dialing", addr); - match inner.dial(addr.clone()) { - Ok(dial) => { - let future = dial - .and_then(move |(muxer, addr_fut)| { - trace!("Waiting for remote's address"); - addr_fut.map(move |addr| (muxer, addr)) - }) - .and_then(move |(muxer, addr)| { - muxer.clone().outbound().and_then(move |substream| { - if let Some(s) = substream { - // Replace the active connection because we are the most recent. - let mut lock = shared.lock(); - lock.active_connections.insert(addr.clone(), muxer.clone()); - // TODO: doesn't need locking ; the sender could be extracted - let _ = lock.add_to_next_tx.unbounded_send(( - muxer.clone(), - muxer.inbound(), - addr.clone(), - )); - Ok((s, future::ok(addr))) - } else { - error!("failed to dial to {}", addr); - shared.lock().active_connections.remove(&addr); - Err(io::Error::new(io::ErrorKind::Other, "dial failed")) - } - }) - }); - Either::B(Either::A(future)) - } - Err(_) => { - let e = io::Error::new(io::ErrorKind::Other, "transport rejected dial"); - Either::B(Either::B(future::err(e))) - } + // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise + // the returned `Future` will immediately produce the error. + let must_clear = match self.shared.connections.lock().get(&addr) { + Some(&PeerState::Errored(ref err)) => { + trace!( + "Clearing existing connection to {} which errored earlier: {:?}", + addr, + err + ); + true } - }); + _ => false, + }; + if must_clear { + self.shared.connections.lock().remove(&addr); + } - Ok(Box::new(future) as Box<_>) + Ok(ConnectionReuseDial { + outbound: None, + shared: self.shared, + addr, + }) } #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.transport().nat_traversal(server, observed) + self.shared + .transport + .transport() + .nat_traversal(server, observed) } } @@ -236,42 +391,304 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, + ::Substream: Clone, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + C::NamesIter: Clone, + UpgradedNode: Clone, { - type Incoming = ConnectionReuseIncoming; + type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = - future::FutureResult<(::Substream, Self::MultiaddrFuture), IoError>; + future::FutureResult<(ConnectionReuseSubstream, Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { ConnectionReuseIncoming { - shared: self.shared.clone(), + shared: self.shared, } } } -/// Implementation of `Stream` for the connections incoming from listening on a specific address. -pub struct ConnectionReuseListener +static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); +// `TASK_ID` is used internally to uniquely identify each task. +task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) +} + +/// Implementation of `Future` for dialing a node. +pub struct ConnectionReuseDial where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, { - // The main listener. `S` is from the underlying transport. - listener: S, - current_upgrades: FuturesUnordered, - connections: Vec<(M, ::InboundSubstream, Multiaddr)>, + /// The future that will construct the substream, the connection id the muxer comes from, and + /// the `Future` of the client's multiaddr. + /// If `None`, we need to grab a new outbound substream from the muxer. + outbound: Option>, // Shared between the whole connection reuse mechanism. - shared: Arc>>, + shared: Arc>, + + // The address we're trying to dial. + addr: Multiaddr, +} + +struct ConnectionReuseDialOut +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + /// The pending outbound substream. + stream: ::OutboundSubstream, + /// Id of the connection that was used to create the substream. + connection_id: usize, + /// Address of the remote. + client_addr: Multiaddr, } -impl Stream for ConnectionReuseListener +impl Future for ConnectionReuseDial where - S: Stream, - F: Future, - M: StreamMuxer + Clone + 'static, // TODO: 'static :( + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone + 'static, + ::Substream: Clone, + UpgradedNode: Transport + Clone, + as Transport>::Dial: 'static, + as Transport>::MultiaddrFuture: 'static, { - type Item = FutureResult<(M::Substream, FutureResult), IoError>; + type Item = ( + ConnectionReuseSubstream, + FutureResult, + ); + type Error = IoError; + + fn poll(&mut self) -> Poll { + loop { + let should_kill_existing_muxer = match self.outbound.take() { + Some(mut outbound) => { + match outbound.stream.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("Opened new outgoing substream to {}", self.addr); + let shared = self.shared.clone(); + return Ok(Async::Ready(( + ConnectionReuseSubstream { + connection_id: outbound.connection_id, + inner, + shared, + addr: outbound.client_addr.clone(), + }, + future::ok(outbound.client_addr), + ))); + } + Ok(Async::NotReady) => { + self.outbound = Some(outbound); + return Ok(Async::NotReady); + } + Ok(Async::Ready(None)) => { + // The muxer can no longer produce outgoing substreams. + // Let's reopen a connection. + trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); + true + } + Err(err) => { + // If we get an error while opening a substream, we decide to ignore it + // and open a new muxer. + // If opening the muxer produces an error, *then* we will return it. + debug!( + "Error while opening outgoing substream to {}: {:?}", + self.addr, err + ); + true + } + } + } + _ => false, + }; + + let is_empty = { + let mut conns = self.shared.connections.lock(); + if should_kill_existing_muxer { + conns.remove(&self.addr); + true + } else { + let item = conns.get(&self.addr); + + // error'ed, quit early + if let Some(PeerState::Errored(err)) = item { + trace!( + "Existing new connection to {} errored earlier: {:?}", + self.addr, + err + ); + let io_err = IoError::new(err.kind(), err.to_string()); + return Err(io_err); + }; + + item.is_none() + } + }; + + if is_empty { + // dial new and return early + let state = match self.shared.transport.clone().dial(self.addr.clone()) { + Ok(future) => { + trace!("Opened new connection to {:?}", self.addr); + let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); + let future = Box::new(future); + let connection_id = self.shared.gen_next_connection_id(); + PeerState::Pending { + future, + notify: Default::default(), + connection_id, + } + } + Err(_) => { + trace!( + "Failed to open connection to {:?}, multiaddr not supported", + self.addr + ); + let err = + IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); + PeerState::Errored(err) + } + }; + + self.shared.insert_connection(self.addr.clone(), state); + return Ok(Async::NotReady); + } + + let mut conn = self.shared.connections.lock(); + if let Some(state) = conn.get_mut(&self.addr) { + let replace_with = { + match state { + PeerState::Active { + muxer, + connection_id, + client_addr, + ref mut num_substreams, + .. + } => { + trace!( + "Using existing connection to {} to open outbound substream", + self.addr + ); + self.outbound = Some(ConnectionReuseDialOut { + stream: muxer.clone().outbound(), + connection_id: connection_id.clone(), + client_addr: client_addr.clone(), + }); + + // mutating the param in place, nothing to be done + *num_substreams += 1; + None + } + PeerState::Pending { + ref mut future, + ref mut notify, + connection_id, + } => { + match future.poll() { + Ok(Async::Ready((muxer, client_addr))) => { + trace!( + "Successful new connection to {} ({})", + self.addr, + client_addr + ); + for task in notify { + task.1.notify(); + } + let first_outbound = muxer.clone().outbound(); + self.outbound = Some(ConnectionReuseDialOut { + stream: first_outbound, + connection_id: connection_id.clone(), + client_addr: client_addr.clone(), + }); + + // our connection was upgraded, replace it. + Some(PeerState::Active { + muxer, + connection_id: connection_id.clone(), + num_substreams: 1, + listener_id: None, + client_addr, + }) + } + Ok(Async::NotReady) => { + notify.insert(TASK_ID.with(|&t| t), task::current()); + return Ok(Async::NotReady); + } + Err(err) => { + trace!("Failed new connection to {}: {:?}", self.addr, err); + Some(PeerState::Errored(err)) + } + } + } + _ => { + // Because of a race someone might has changed the Peerstate between + // this get_mut and the previous lock. But no harm for us, we can just + // loop on and will find it soon enough. + continue; + } + } + }; + + if let Some(new_state) = replace_with { + mem::replace(&mut *state, new_state); + } + } + } + } +} + +impl Drop for ConnectionReuseDial +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + fn drop(&mut self) { + if let Some(outbound) = self.outbound.take() { + self.shared.remove_substream(outbound.connection_id, &outbound.client_addr); + } + } +} + +/// Implementation of `Stream` for the connections incoming from listening on a specific address. +pub struct ConnectionReuseListener +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + /// The main listener. + listener: stream::Fuse, + /// Identifier for this listener. Used to determine which connections were opened by it. + listener_id: usize, + /// Opened connections that need to be upgraded. + current_upgrades: FuturesUnordered>>, + + /// Shared between the whole connection reuse mechanism. + shared: Arc>, +} + +impl Stream for ConnectionReuseListener +where + T: Transport, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, + ::Substream: Clone, + L: Stream, + Lu: Future + 'static, +{ + type Item = FutureResult< + ( + ConnectionReuseSubstream, + FutureResult, + ), + IoError, + >; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -281,145 +698,227 @@ where loop { match self.listener.poll() { Ok(Async::Ready(Some(upgrade))) => { - self.current_upgrades.push(upgrade); + trace!("New incoming connection"); + self.current_upgrades.push(Box::new(upgrade)); } Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { - debug!("listener has been closed"); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Ok(Async::Ready(None)); - } - break + debug!("Listener has been closed"); + break; } Err(err) => { - debug!("error while polling listener: {:?}", err); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Err(err); - } - break + debug!("Error while polling listener: {:?}", err); + return Err(err); } - } + }; } + // Process the connections being upgraded. loop { - match self.current_upgrades.poll() { - Ok(Async::Ready(Some((muxer, client_addr)))) => { - let next_incoming = muxer.clone().inbound(); - self.connections - .push((muxer.clone(), next_incoming, client_addr.clone())); - } + let (muxer, client_addr) = match self.current_upgrades.poll() { + Ok(Async::Ready(Some((muxer, client_addr)))) => (muxer, client_addr), + Ok(Async::Ready(None)) | Ok(Async::NotReady) => break, Err(err) => { - debug!("error while upgrading listener connection: {:?}", err); + // Insert the rest of the pending upgrades, but not the current one. + debug!("Error while upgrading listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } - _ => break, - } + }; + + // Successfully upgraded a new incoming connection. + trace!("New multiplexed connection from {}", client_addr); + let connection_id = self.shared.gen_next_connection_id(); + + self.shared.insert_connection( + client_addr.clone(), + PeerState::Active { + muxer, + connection_id, + listener_id: Some(self.listener_id), + num_substreams: 1, + client_addr: client_addr, + }, + ); } - // Check whether any incoming substream is ready. - for n in (0..self.connections.len()).rev() { - let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n); - match next_incoming.poll() { - Ok(Async::Ready(None)) => { - // stream muxer gave us a `None` => connection should be considered closed - debug!("no more inbound substreams on {}", client_addr); - self.shared.lock().active_connections.remove(&client_addr); - } - Ok(Async::Ready(Some(incoming))) => { - // We overwrite any current active connection to that multiaddr because we - // are the freshest possible connection. - self.shared - .lock() - .active_connections - .insert(client_addr.clone(), muxer.clone()); - // A new substream is ready. - let mut new_next = muxer.clone().inbound(); - self.connections - .push((muxer, new_next, client_addr.clone())); - return Ok(Async::Ready(Some( - future::ok((incoming, future::ok(client_addr))), - ))); - } - Ok(Async::NotReady) => { - self.connections.push((muxer, next_incoming, client_addr)); - } - Err(err) => { - debug!("error while upgrading the multiplexed incoming connection: {:?}", err); - // Insert the rest of the pending connections, but not the current one. - return Ok(Async::Ready(Some(future::err(err)))); + // Poll all the incoming connections on all the connections we opened. + match self.shared.poll_incoming(Some(self.listener_id)) { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.listener.is_done() && self.current_upgrades.is_empty() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) } } + Ok(Async::Ready(Some((inner, connection_id, addr)))) => { + Ok(Async::Ready(Some(future::ok(( + ConnectionReuseSubstream { + inner: inner.clone(), + shared: self.shared.clone(), + connection_id: connection_id.clone(), + addr: addr.clone(), + }, + future::ok(addr.clone()), + ))))) + } + Err(err) => Ok(Async::Ready(Some(future::err(IoError::new( + err.kind(), + err.to_string(), + ))))), } - - // Nothing is ready, return `NotReady`. - Ok(Async::NotReady) } } /// Implementation of `Future` that yields the next incoming substream from a dialed connection. -pub struct ConnectionReuseIncoming +pub struct ConnectionReuseIncoming where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, { // Shared between the whole connection reuse system. - shared: Arc>>, + shared: Arc>, } -impl Future for ConnectionReuseIncoming +impl Future for ConnectionReuseIncoming where - M: Clone + StreamMuxer, + T: Transport, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, + ::Substream: Clone, { - type Item = future::FutureResult<(M::Substream, future::FutureResult), IoError>; + type Item = future::FutureResult< + ( + ConnectionReuseSubstream, + future::FutureResult, + ), + IoError, + >; type Error = IoError; + #[inline] fn poll(&mut self) -> Poll { - let mut lock = self.shared.lock(); - - // Try to get any new muxer from `add_to_next_rx`. - // We push the new muxers to a channel instead of adding them to `next_incoming`, so that - // tasks are notified when something is pushed. - loop { - match lock.add_to_next_rx.poll() { - Ok(Async::Ready(Some(elem))) => { - lock.next_incoming.push(elem); - } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) | Err(_) => unreachable!( - "the sender and receiver are both in the same struct, therefore \ - the link can never break" - ), + match self.shared.poll_incoming(None) { + Ok(Async::Ready(Some((inner, connection_id, addr)))) => Ok(Async::Ready(future::ok(( + ConnectionReuseSubstream { + inner, + shared: self.shared.clone(), + connection_id, + addr: addr.clone(), + }, + future::ok(addr), + )))), + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { + self.shared + .notify_on_new_connec + .lock() + .insert(TASK_ID.with(|&v| v), task::current()); + Ok(Async::NotReady) } + Err(err) => Err(err), } + } +} - // Check whether any incoming substream is ready. - for n in (0..lock.next_incoming.len()).rev() { - let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n); - match future.poll() { - Ok(Async::Ready(None)) => { - debug!("no inbound substream for {}", addr); - lock.active_connections.remove(&addr); - } - Ok(Async::Ready(Some(value))) => { - // A substream is ready ; push back the muxer for the next time this function - // is called, then return. - debug!("New incoming substream"); - let next = muxer.clone().inbound(); - lock.next_incoming.push((muxer, next, addr.clone())); - return Ok(Async::Ready(future::ok((value, future::ok(addr))))); - } - Ok(Async::NotReady) => { - lock.next_incoming.push((muxer, future, addr)); - } - Err(err) => { - // In case of error, we just not push back the element, which drops it. - debug!("ConnectionReuse incoming: one of the \ - multiplexed substreams produced an error: {:?}", - err); - } - } - } +/// Wraps around the `Substream`. +pub struct ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + inner: ::Substream, + shared: Arc>, + /// Id this connection was created from. + connection_id: usize, + /// Address of the remote. + addr: Multiaddr, +} + +impl Deref for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + type Target = ::Substream; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Read for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner.read(buf) + } +} + +impl AsyncRead for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ +} + +impl Write for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + self.inner.flush() + } +} - // Nothing is ready. - Ok(Async::NotReady) +impl AsyncWrite for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + self.inner.shutdown() + } +} + +impl Drop for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + fn drop(&mut self) { + self.shared.remove_substream(self.connection_id, &self.addr); } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 8c21897bfb6..dc3f7349f3f 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -50,14 +50,14 @@ impl<'a, T, C> UpgradedNode where T: Transport + 'a, T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + 'a, + C: ConnectionUpgrade + 'a + Clone, { /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. #[inline] pub fn into_connection_reuse(self) -> ConnectionReuse where - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { From::from(self) } diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index dbb4f123134..4c0c9210551 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -29,9 +29,8 @@ extern crate tokio_io; use bytes::BytesMut; use futures::future::Future; use futures::{Sink, Stream}; -use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport}; -use libp2p_tcp_transport::TcpConfig; -use std::sync::{atomic, mpsc}; +use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport}; +use std::sync::atomic; use std::thread; use tokio_io::codec::length_delimited::Framed; @@ -74,19 +73,14 @@ fn client_to_server_outbound() { // A client opens a connection to a server, then an outgoing substream, then sends a message // on that substream. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = TcpConfig::new() + let future = rx .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); - - let (listener, addr) = transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + .into_connection_reuse() + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) @@ -107,11 +101,10 @@ fn client_to_server_outbound() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new()); - - let future = transport - .dial(rx.recv().unwrap()) - .unwrap() + let future = tx + .with_upgrade(multiplex::MplexConfig::new()) + .dial("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()) .and_then(|client| client.0.outbound()) .map(|server| Framed::<_, BytesMut>::new(server.unwrap())) .and_then(|server| server.send("hello world".into())) @@ -126,19 +119,14 @@ fn connection_reused_for_dialing() { // A client dials the same multiaddress twice in a row. We check that it uses two substreams // instead of opening two different connections. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = OnlyOnce::from(TcpConfig::new()) + let future = OnlyOnce::from(rx) .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); - - let (listener, addr) = transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + .into_connection_reuse() + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest))) @@ -170,22 +158,20 @@ fn connection_reused_for_dialing() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = OnlyOnce::from(TcpConfig::new()) + let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) .into_connection_reuse(); - let listen_addr = rx.recv().unwrap(); - let future = transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .and_then(|server| server.send("hello world".into())) .and_then(|first_connec| { transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .map(|server| (first_connec, server)) @@ -203,19 +189,13 @@ fn use_opened_listen_to_dial() { // substream on that same connection, that the client has to accept. The client then sends a // message on that new substream. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = OnlyOnce::from(TcpConfig::new()) - .with_upgrade(multiplex::MplexConfig::new()); - - let (listener, addr) = transport - .clone() - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + let future = OnlyOnce::from(rx) + .with_upgrade(multiplex::MplexConfig::new()) + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) @@ -247,15 +227,13 @@ fn use_opened_listen_to_dial() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = OnlyOnce::from(TcpConfig::new()) + let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) .into_connection_reuse(); - let listen_addr = rx.recv().unwrap(); - let future = transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .and_then(|server| server.send("hello world".into()))