diff --git a/core/Cargo.toml b/core/Cargo.toml index 1670a647d2a..0e1dcb468de 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -29,3 +29,4 @@ tokio = "0.1" tokio-codec = "0.1" tokio-current-thread = "0.1" tokio-timer = "0.2" +env_logger = "0.5.11" \ No newline at end of file diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index ab07bb97067..3a49b8d82db 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -40,116 +40,509 @@ //! `MuxedTransport` trait. use fnv::FnvHashMap; -use futures::future::{self, Either, FutureResult}; -use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, FutureResult}; use futures::stream::FuturesUnordered; -use futures::sync::mpsc; +use futures::{stream, task, Async, Future, Poll, Stream}; use multiaddr::Multiaddr; -use muxing::{self, StreamMuxer}; +use muxing::{self, StreamMuxer, SubstreamRef}; use parking_lot::Mutex; -use std::io::{self, Error as IoError}; -use std::sync::Arc; +use std::collections::HashMap; +use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; +use std::ops::{Deref, DerefMut}; +use std::sync::{atomic::AtomicUsize, atomic::AtomicBool, atomic::Ordering, Arc}; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; use upgrade::ConnectionUpgrade; +use std::hash::{BuildHasherDefault, Hasher}; + +#[derive(Default)] +struct UsizeHasher { + state: usize +} + +impl Hasher for UsizeHasher { + fn finish(&self) -> u64 { + self.state as u64 + } + fn write(&mut self, _i: &[u8]) { + unreachable!() + } + fn write_usize(&mut self, i: usize) { + self.state = i; + } +} + +type UsizeHasherBuilder = BuildHasherDefault; + + + /// Allows reusing the same muxed connection multiple times. /// /// Can be created from an `UpgradedNode` through the `From` trait. /// -/// Implements the `Transport` trait. +/// Implements the `Transport` trait pub struct ConnectionReuse where T: Transport, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade, + C: ConnectionUpgrade + Clone, C::Output: StreamMuxer, { - // Underlying transport and connection upgrade for when we need to dial or listen. - inner: UpgradedNode, - - // Struct shared between most of the `ConnectionReuse` infrastructure. - shared: Arc>>, + /// Struct shared between most of the `ConnectionReuse` infrastructure. + manager: Arc>, } impl Clone for ConnectionReuse where - T: Transport + Clone, - T::Output: AsyncRead + AsyncWrite, + T: Transport, C: ConnectionUpgrade + Clone, - C::Output: StreamMuxer + C::Output: StreamMuxer, { #[inline] fn clone(&self) -> Self { ConnectionReuse { - inner: self.inner.clone(), - shared: self.shared.clone(), + manager: self.manager.clone() } } } -struct Shared { - // List of active muxers. - active_connections: FnvHashMap>, +/// Keeps an internal counter, for every new number issued +/// will increase its internal state. +#[derive(Default)] +struct Counter { + inner: AtomicUsize, +} - // List of pending inbound substreams from dialed nodes. - // Only add to this list elements received through `add_to_next_rx`. - next_incoming: Vec<(Arc, Multiaddr)>, +impl Counter { + /// Returns the next counter, increasing the internal state + pub fn next(&self) -> usize { + self.inner.fetch_add(1, Ordering::Relaxed) + } +} - // New elements are not directly added to `next_incoming`. Instead they are sent to this - // channel. This is done so that we can wake up tasks whenever a new element is added. - add_to_next_rx: mpsc::UnboundedReceiver<(Arc, Multiaddr)>, +enum PeerState { + /// Connection is active and can be used to open substreams. + Active { + /// The muxer to open new substreams. + muxer: Arc, + /// Id of the listener that created this connection, or `None` if it was opened by a + /// dialer. + listener_id: Option, + /// whether this connection was closed + closed: Arc + }, + + /// Connection is pending. + // TODO: stronger Future type + Pending { + /// Future that produces the muxer. + future: Box>, + /// All the tasks to notify when `future` resolves. + notify: HashMap, + }, + + /// An earlier connection attempt errored. + Errored(IoError), +} - // Other side of `add_to_next_rx`. - add_to_next_tx: mpsc::UnboundedSender<(Arc, Multiaddr)>, +impl PeerState +where + M: StreamMuxer +{ + /// resets the connections if the given PeerState is active and has + /// substreams. Consume the PeerState while doing so. + fn reset_conn(self) { + if let PeerState::Active { closed, .. } = self { + closed.store(false, Ordering::Release); + } + } } -impl From> for ConnectionReuse +/// Struct shared between most of the `ConnectionReuse` infrastructure. +/// Knows about all connections and their current state, allows one to poll for +/// incoming and outbound connections, while managing all state-transitions that +/// might occur automatically. +struct ConnectionsManager +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + /// Underlying transport and connection upgrade, used when we need to dial or listen. + transport: UpgradedNode, + + /// All the connections that were opened, whether successful and/or active or not. + // TODO: this will grow forever + connections: Mutex>>, + + /// Tasks to notify when one or more new elements were added to `connections`. + notify_on_new_connec: Mutex>, + + /// Counter giving us the next listener_id + listener_counter: Counter, +} + +impl ConnectionsManager where T: Transport, - T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer, +{ + /// Notify all `Tasks` in `self.notify_on_new_connec` that a new connection was established + /// consume the tasks in that process + fn new_con_notify(&self) { + let mut notifiers = self.notify_on_new_connec.lock(); + trace!("Notify {} listeners about a new connection", notifiers.len()); + for (_, task) in notifiers.drain() { + task.notify(); + } + } + + /// Register `task` under id if not yet registered + fn register_notifier(&self, id: usize, task: task::Task) { + let mut notifiers = self.notify_on_new_connec.lock(); + notifiers.insert(id, task); + } + + /// Clear the cached connection if the entry contains an error. Returns whether an error was + /// found and has been removed. + fn clear_error(&self, addr: &Multiaddr) -> bool { + let mut conns = self.connections.lock(); + + if let Some(PeerState::Errored(ref err)) = conns.get(addr) { + trace!( + "Clearing existing connection to {} which errored earlier: {:?}", + addr, + err + ); + } else { + return false; + } + + // only reaching the point if the entry was an error + conns.remove(addr); + return true; + } +} + +/// Polls the given outbound of addr for a new substream. Returns `Ok(Some(stream))` if available, +/// `Ok(None)` if the stream is not ready yet add an `Err` if the stream had failed. +fn poll_for_substream(mut outbound: O, addr: &Multiaddr) + -> Result, IoError> +where + + S: AsyncRead + AsyncWrite, + O: Future, Error = IoError> +{ + match outbound.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("Opened new outgoing substream to {}", addr); + // all good, return the new stream + Ok(Some(inner)) + } + Ok(Async::NotReady) => Ok(None), + Ok(Async::Ready(None)) => { + // The muxer can no longer produce outgoing substreams. + // Let's reopen a connection. + trace!("Closing existing connection to {} ; can't produce outgoing substreams", addr); + + let err = IoError::new(IoErrorKind::ConnectionRefused, "No Streams left"); + Err(err) + } + Err(err) => { + // If we get an error while opening a substream, we decide to ignore it + // and open a new muxer. + // If opening the muxer produces an error, *then* we will return it. + debug!( + "Error while opening outgoing substream to {}: {:?}", + addr, err + ); + Err(err) + } + } +} + +impl ConnectionsManager +where + T: Transport, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + 'static, + UpgradedNode: Transport + Clone, + as Transport>::Dial: 'static, + as Transport>::MultiaddrFuture: 'static, +{ + /// Informs the Manager about a new inbound connection that has been + /// established, so it + fn new_inbound(&self, addr: &Multiaddr, listener_id: usize, muxer: C::Output) { + let mut conns = self.connections.lock(); + let new_state = PeerState::Active { + muxer: Arc::new(muxer), + listener_id: Some(listener_id), + closed: Arc::new(AtomicBool::new(false)) + }; + let mut old_state = conns.insert(addr.clone(), new_state); + + if let Some(PeerState::Pending { ref mut notify, .. }) = old_state { + // we need to wake some up, that we have a connection now + trace!("Found incoming for pending connection to {}", addr); + for (_, task) in notify.drain() { + task.notify() + } + } else { + old_state.map(|s| s.reset_conn()); + } + } + + /// Polls for the outbound stream of `addr`. Clears the cached value first if `reset` is true. + /// Dials, if no connection is in the internal cache. Returns `Ok(Async::NotReady)` as long as + /// the connection isn't establed and ready yet. + fn poll_outbound( + &self, + addr: &Multiaddr, + ) -> Poll<(SubstreamRef>, Arc), IoError> { + let mut conns = self.connections.lock(); + + let state = conns.entry(addr.clone()).or_insert_with(|| { + let state = match self.transport.clone().dial(addr.clone()) { + Ok(future) => { + trace!("Opening new connection to {:?}", addr); + let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); + let future = Box::new(future); + + let mut notify = HashMap::::default(); + // make sure we are woken up once this connects + notify.insert(TASK_ID.with(|&t| t), task::current()); + + PeerState::Pending { future, notify } + } + Err(_) => { + trace!( + "Failed to open connection to {:?}, multiaddr not supported", + addr + ); + let err = + IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); + PeerState::Errored(err) + } + }; + self.new_con_notify(); + // Althought we've just started in pending state, there are transports, which are + // immediately ready - namely 'memory' - so we need to poll our future right away + // rather than waiting for another poll on it. + state + }); + + let (replace_with, return_with) = match state { + PeerState::Errored(err) => { + // Error: return it + let io_err = IoError::new(err.kind(), err.to_string()); + return Err(io_err); + } + PeerState::Active { + muxer, + closed, + .. + } => { + if closed.load(Ordering::Acquire) { + (PeerState::Errored(IoError::new(IoErrorKind::BrokenPipe, "Connection closed")), + Err(IoError::new(IoErrorKind::BrokenPipe, "Connection closed"))) + } else { + // perfect, let's reuse, update the stream_id and + // return a new outbound + trace!( + "Using existing connection to {} to open outbound substream", + addr + ); + match poll_for_substream(muxing::outbound_from_ref_and_wrap(muxer.clone()), &addr) { + Ok(None) => return Ok(Async::NotReady), + Ok(Some(stream)) => { + return Ok(Async::Ready((stream, closed.clone()))); + } + Err(err) => { + let io_err = IoError::new(err.kind(), err.to_string()); + (PeerState::Errored(io_err), Err(err)) + } + } + } + } + PeerState::Pending { + ref mut future, + ref mut notify, + } => { + // was pending, let's check if that has changed + match future.poll() { + Ok(Async::NotReady) => { + // no it still isn't ready, keep the current task + // informed but return with NotReady + notify.insert(TASK_ID.with(|&t| t), task::current()); + return Ok(Async::NotReady); + } + Err(err) => { + // well, looks like connecting failed, replace the + // entry then return the Error + trace!("Failed new connection to {}: {:?}", addr, err); + let io_err = IoError::new(err.kind(), err.to_string()); + (PeerState::Errored(io_err), Err(err)) + } + Ok(Async::Ready((muxer, client_addr))) => { + trace!("Successful new connection to {} ({})", addr, client_addr); + for (_, task) in notify.drain() { + task.notify(); + } + let muxer = Arc::new(muxer); + match poll_for_substream(muxing::outbound_from_ref_and_wrap(muxer.clone()), &addr) { + Ok(None) => return Ok(Async::NotReady), + Ok(Some(stream)) => { + let closed = Arc::new(AtomicBool::new(false)); + let state = PeerState::Active { + muxer, + closed: closed.clone(), + listener_id: None, + }; + + (state, Ok(Async::Ready((stream, closed)))) + } + Err(err) => { + let io_err = IoError::new(err.kind(), err.to_string()); + (PeerState::Errored(io_err), Err(err)) + } + + } + } + } + } + }; + + *state = replace_with; + return_with + } + + /// Polls the incoming substreams on all the incoming connections that match the `listener`. + /// + /// Returns `Ready>` for the first matching connection. + /// Return `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if + /// one or more connections are matching the `listener` but none is ready yet. + fn poll_incoming( + &self, + listener: Option, + ) -> Poll>, Arc, Multiaddr)>, IoError> { + // Keys of the elements in `connections` to remove afterwards. + let mut to_remove = Vec::new(); + // Substream to return, if any found. + let mut ret_value = None; + let mut found_one = false; + let mut conn = self.connections.lock(); + + for (addr, state) in conn.iter_mut() { + let res = match state { + PeerState::Active { closed, .. } if closed.load(Ordering::Acquire) => { + to_remove.push(addr.clone()); + continue + } + PeerState::Active { listener_id, .. } if *listener_id != listener => continue, + PeerState::Active { ref mut muxer, closed, .. } => { + found_one = true; + + match muxer.poll_inbound() { + Ok(Async::Ready(Some(stream))) => { + trace!("New incoming substream from {}", addr); + let stream = muxing::substream_from_ref(muxer.clone(), stream); + Ok((stream, closed.clone(), addr.clone())) + } + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + continue; + } + Ok(Async::NotReady) => continue, + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!( + "Error while opening inbound substream to {}: {:?}", + addr, + err + ); + to_remove.push(addr.clone()); + Err(err) + } + } + } + PeerState::Pending { ref mut notify, .. } if listener.is_none() => { + notify.insert(TASK_ID.with(|&t| t), task::current()); + continue + } + _ => continue + }; + + ret_value = Some(res); + break; + } + + for to_remove in to_remove { + conn.remove(&to_remove).map(|s| s.reset_conn()); + } + + match ret_value { + Some(Ok(val)) => Ok(Async::Ready(Some(val))), + Some(Err(err)) => Err(err), + None => { + if found_one { + self.register_notifier(TASK_ID.with(|&v| v), task::current()); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + } + } + } +} + +impl From> for ConnectionReuse +where + T: Transport, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer, { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { - let (tx, rx) = mpsc::unbounded(); - ConnectionReuse { - inner: node, - shared: Arc::new(Mutex::new(Shared { - active_connections: Default::default(), - next_incoming: Vec::new(), - add_to_next_rx: rx, - add_to_next_tx: tx, - })), + manager: Arc::new(ConnectionsManager { + transport: node, + connections: Default::default(), + notify_on_new_connec: Mutex::new( + HashMap::::default()), + listener_counter: Default::default(), + }), } } } impl Transport for ConnectionReuse where - T: Transport + 'static, // TODO: 'static :( + T: Transport + 'static, T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + C::NamesIter: Clone, + UpgradedNode: Clone, { - type Output = muxing::SubstreamRef>; + type Output = ConnectionReuseSubstream; type MultiaddrFuture = future::FutureResult; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; - type Dial = Box>; + type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { + let (listener, new_addr) = match self.manager.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), - Err((inner, addr)) => { + Err((_, addr)) => { return Err(( ConnectionReuse { - inner: inner, - shared: self.shared, + manager: self.manager, }, addr, )); @@ -157,88 +550,46 @@ where }; let listener = listener - .fuse() .map(|upgr| { upgr.and_then(|(out, addr)| { - addr.map(move |addr| (out, addr)) + trace!("Waiting for remote's address as listener"); + addr.map(move |addr| { + trace!("Address found:{:?}", addr); + (out, addr) + }) }) - }); + }) + .fuse(); + + let listener_id = self.manager.listener_counter.next(); let listener = ConnectionReuseListener { - shared: self.shared.clone(), - listener: listener, + manager: self.manager, + listener, + listener_id, current_upgrades: FuturesUnordered::new(), - connections: Vec::new(), }; - Ok((Box::new(listener) as Box<_>, new_addr)) + Ok((Box::new(listener), new_addr)) } + #[inline] fn dial(self, addr: Multiaddr) -> Result { - // If we already have an active connection, use it! - let substream = if let Some(muxer) = self.shared - .lock() - .active_connections - .get(&addr) - .map(|muxer| muxer.clone()) - { - let a = addr.clone(); - Either::A(muxing::outbound_from_ref_and_wrap(muxer).map(|o| o.map(move |s| (s, future::ok(a))))) - } else { - Either::B(future::ok(None)) - }; - - let shared = self.shared.clone(); - let inner = self.inner; - let future = substream.and_then(move |outbound| { - if let Some(o) = outbound { - debug!("Using existing multiplexed connection to {}", addr); - return Either::A(future::ok(o)); - } - // The previous stream muxer did not yield a new substream => start new dial - debug!("No existing connection to {}; dialing", addr); - match inner.dial(addr.clone()) { - Ok(dial) => { - let future = dial - .and_then(move |(muxer, addr_fut)| { - trace!("Waiting for remote's address"); - addr_fut.map(move |addr| (Arc::new(muxer), addr)) - }) - .and_then(move |(muxer, addr)| { - muxing::outbound_from_ref(muxer.clone()).and_then(move |substream| { - if let Some(s) = substream { - // Replace the active connection because we are the most recent. - let mut lock = shared.lock(); - lock.active_connections.insert(addr.clone(), muxer.clone()); - // TODO: doesn't need locking ; the sender could be extracted - let _ = lock.add_to_next_tx.unbounded_send(( - muxer.clone(), - addr.clone(), - )); - let s = muxing::substream_from_ref(muxer, s); - Ok((s, future::ok(addr))) - } else { - error!("failed to dial to {}", addr); - shared.lock().active_connections.remove(&addr); - Err(io::Error::new(io::ErrorKind::Other, "dial failed")) - } - }) - }); - Either::B(Either::A(future)) - } - Err(_) => { - let e = io::Error::new(io::ErrorKind::Other, "transport rejected dial"); - Either::B(Either::B(future::err(e))) - } - } - }); - - Ok(Box::new(future) as Box<_>) + // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise + // the returned `Future` will immediately produce the error. + self.manager.clear_error(&addr); + Ok(ConnectionReuseDial { + manager: self.manager, + addr, + }) } #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.transport().nat_traversal(server, observed) + self.manager + .transport + .transport() + .nat_traversal(server, observed) } } @@ -249,41 +600,99 @@ where C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + C::NamesIter: Clone, + UpgradedNode: Clone, { - type Incoming = ConnectionReuseIncoming; + type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = - future::FutureResult<(muxing::SubstreamRef>, Self::MultiaddrFuture), IoError>; + future::FutureResult<(ConnectionReuseSubstream, Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { ConnectionReuseIncoming { - shared: self.shared.clone(), + manager: self.manager, } } } -/// Implementation of `Stream` for the connections incoming from listening on a specific address. -pub struct ConnectionReuseListener +static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); +// `TASK_ID` is used internally to uniquely identify each task. +task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) +} + +/// Implementation of `Future` for dialing a node. +pub struct ConnectionReuseDial where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, { - // The main listener. `S` is from the underlying transport. - listener: S, - current_upgrades: FuturesUnordered, - connections: Vec<(Arc, Multiaddr)>, + // ConnectionsManager between the whole connection reuse mechanism. + manager: Arc>, - // Shared between the whole connection reuse mechanism. - shared: Arc>>, + // The address we're trying to dial. + addr: Multiaddr, } -impl Stream for ConnectionReuseListener +impl Future for ConnectionReuseDial where - S: Stream, - F: Future, - M: StreamMuxer + 'static, // TODO: 'static :( + T: Transport, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + 'static, + UpgradedNode: Transport + Clone, + as Transport>::Dial: 'static, + as Transport>::MultiaddrFuture: 'static, { - type Item = FutureResult<(muxing::SubstreamRef>, FutureResult), IoError>; + type Item = ( + ConnectionReuseSubstream, + FutureResult, + ); + type Error = IoError; + + fn poll(&mut self) -> Poll { + let (inner, closed) = try_ready!(self.manager.poll_outbound(&self.addr)); + Ok(Async::Ready((ConnectionReuseSubstream { inner, closed}, future::ok(self.addr.clone())))) + } +} + +/// Implementation of `Stream` for the connections incoming from listening on a specific address. +pub struct ConnectionReuseListener +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + /// The main listener. + listener: stream::Fuse, + /// Identifier for this listener. Used to determine which connections were opened by it. + listener_id: usize, + /// Opened connections that need to be upgraded. + current_upgrades: FuturesUnordered>>, + + /// ConnectionsManager between the whole connection reuse mechanism. + manager: Arc>, +} + +impl Stream for ConnectionReuseListener +where + T: Transport, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + 'static, + UpgradedNode: Transport + Clone, + as Transport>::Dial: 'static, + as Transport>::MultiaddrFuture: 'static, + L: Stream, + Lu: Future + 'static, +{ + type Item = FutureResult< + ( + ConnectionReuseSubstream, + FutureResult, + ), + IoError, + >; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -293,143 +702,229 @@ where loop { match self.listener.poll() { Ok(Async::Ready(Some(upgrade))) => { - self.current_upgrades.push(upgrade); + trace!("New incoming connection"); + self.current_upgrades.push(Box::new(upgrade)); } Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { - debug!("listener has been closed"); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Ok(Async::Ready(None)); - } - break + debug!("Listener has been closed"); + break; } Err(err) => { - debug!("error while polling listener: {:?}", err); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Err(err); - } - break + debug!("Error while polling listener: {:?}", err); + return Err(err); } - } + }; } + // Process the connections being upgraded. loop { - match self.current_upgrades.poll() { - Ok(Async::Ready(Some((muxer, client_addr)))) => { - self.connections.push((Arc::new(muxer), client_addr.clone())); - } + let (muxer, client_addr) = match self.current_upgrades.poll() { + Ok(Async::Ready(Some(val))) => val, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => break, Err(err) => { - debug!("error while upgrading listener connection: {:?}", err); + // Insert the rest of the pending upgrades, but not the current one. + debug!("Error while upgrading listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } - _ => break, - } + }; + + // Successfully upgraded a new incoming connection. + trace!("New multiplexed connection from {}", client_addr); + + self.manager.new_inbound(&client_addr, self.listener_id, muxer); } - // Check whether any incoming substream is ready. - for n in (0..self.connections.len()).rev() { - let (muxer, client_addr) = self.connections.swap_remove(n); - match muxer.poll_inbound() { - Ok(Async::Ready(None)) => { - // stream muxer gave us a `None` => connection should be considered closed - debug!("no more inbound substreams on {}", client_addr); - self.shared.lock().active_connections.remove(&client_addr); - } - Ok(Async::Ready(Some(incoming))) => { - // We overwrite any current active connection to that multiaddr because we - // are the freshest possible connection. - self.shared - .lock() - .active_connections - .insert(client_addr.clone(), muxer.clone()); - // A new substream is ready. - self.connections - .push((muxer.clone(), client_addr.clone())); - let incoming = muxing::substream_from_ref(muxer, incoming); - return Ok(Async::Ready(Some( - future::ok((incoming, future::ok(client_addr))), - ))); - } - Ok(Async::NotReady) => { - self.connections.push((muxer, client_addr)); - } - Err(err) => { - debug!("error while upgrading the multiplexed incoming connection: {:?}", err); - // Insert the rest of the pending connections, but not the current one. - return Ok(Async::Ready(Some(future::err(err)))); + // Poll all the incoming connections on all the connections we opened. + match self.manager.poll_incoming(Some(self.listener_id)) { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.listener.is_done() && self.current_upgrades.is_empty() { + trace!("Ready but empty; we are, too; Closing!"); + Ok(Async::Ready(None)) + } else { + trace!("Ready but empty; but we're still going strong; We'll wait!"); + self.manager + .register_notifier(TASK_ID.with(|&v| v), task::current()); + Ok(Async::NotReady) } } + Ok(Async::Ready(Some((inner, closed, addr)))) => { + trace!("Stream for {:?} ready.", addr); + Ok(Async::Ready(Some(future::ok( + (ConnectionReuseSubstream {inner, closed}, + future::ok(addr.clone()), + ))))) + } + Err(err) => Ok(Async::Ready(Some(future::err(IoError::new( + err.kind(), + err.to_string(), + ))))), } - - // Nothing is ready, return `NotReady`. - Ok(Async::NotReady) } } /// Implementation of `Future` that yields the next incoming substream from a dialed connection. -pub struct ConnectionReuseIncoming +pub struct ConnectionReuseIncoming where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, { - // Shared between the whole connection reuse system. - shared: Arc>>, + // ConnectionsManager between the whole connection reuse system. + manager: Arc>, } -impl Future for ConnectionReuseIncoming +impl Future for ConnectionReuseIncoming where - M: StreamMuxer, + T: Transport, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + 'static, + UpgradedNode: Transport + Clone, + as Transport>::Dial: 'static, + as Transport>::MultiaddrFuture: 'static, { - type Item = future::FutureResult<(muxing::SubstreamRef>, future::FutureResult), IoError>; + type Item = future::FutureResult< + ( + ConnectionReuseSubstream, + future::FutureResult, + ), + IoError, + >; type Error = IoError; + #[inline] fn poll(&mut self) -> Poll { - let mut lock = self.shared.lock(); - - // Try to get any new muxer from `add_to_next_rx`. - // We push the new muxers to a channel instead of adding them to `next_incoming`, so that - // tasks are notified when something is pushed. - loop { - match lock.add_to_next_rx.poll() { - Ok(Async::Ready(Some(elem))) => { - lock.next_incoming.push(elem); - } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) | Err(_) => unreachable!( - "the sender and receiver are both in the same struct, therefore \ - the link can never break" - ), + match self.manager.poll_incoming(None) { + Ok(Async::Ready(Some((inner, closed, addr)))) => { + trace!("New Incoming Substream for {}", addr); + Ok(Async::Ready(future::ok(( + ConnectionReuseSubstream {inner, closed}, + future::ok(addr),)))) + }, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { + // wake us up, when there is a new connection + self.manager + .register_notifier(TASK_ID.with(|&v| v), task::current()); + Ok(Async::NotReady) } + Err(err) => Err(err), } + } +} - // Check whether any incoming substream is ready. - for n in (0..lock.next_incoming.len()).rev() { - let (muxer, addr) = lock.next_incoming.swap_remove(n); - match muxer.poll_inbound() { - Ok(Async::Ready(None)) => { - debug!("no inbound substream for {}", addr); - lock.active_connections.remove(&addr); - } - Ok(Async::Ready(Some(value))) => { - // A substream is ready ; push back the muxer for the next time this function - // is called, then return. - debug!("New incoming substream"); - lock.next_incoming.push((muxer.clone(), addr.clone())); - let substream = muxing::substream_from_ref(muxer, value); - return Ok(Async::Ready(future::ok((substream, future::ok(addr))))); - } - Ok(Async::NotReady) => { - lock.next_incoming.push((muxer, addr)); - } - Err(err) => { - // In case of error, we just not push back the element, which drops it. - debug!("ConnectionReuse incoming: one of the \ - multiplexed substreams produced an error: {:?}", - err); - } - } +/// Wraps around the `Substream`. +pub struct ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + inner: SubstreamRef>, + closed: Arc +} + +impl ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + + /// Reset the connection this Substream is muxed out of + /// will result in it being removed from the cache on the + /// next poll + pub fn reset_connection(self) { + self.closed.store(true, Ordering::Release); + } + +} + +impl Deref for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + type Target = SubstreamRef>; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Read for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + if self.closed.load(Ordering::Acquire) { + return Err(IoError::new(IoErrorKind::BrokenPipe, "Connection has been closed")); + } + + self.inner.read(buf) + } +} + +impl AsyncRead for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ +} + +impl Write for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + if self.closed.load(Ordering::Acquire) { + return Err(IoError::new(IoErrorKind::BrokenPipe, "Connection has been closed")); } + self.inner.write(buf) + } - // Nothing is ready. - Ok(Async::NotReady) + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + if self.closed.load(Ordering::Acquire) { + return Err(IoError::new(IoErrorKind::BrokenPipe, "Connection has been closed")); + } + self.inner.flush() } } + +impl AsyncWrite for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + if self.closed.load(Ordering::Acquire) { + return Err(IoError::new(IoErrorKind::BrokenPipe, "Connection has been closed")); + } + self.inner.shutdown() + } +} \ No newline at end of file diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 8c21897bfb6..57ceb03812e 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -50,7 +50,7 @@ impl<'a, T, C> UpgradedNode where T: Transport + 'a, T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + 'a, + C: ConnectionUpgrade + 'a + Clone, { /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index 2cdc0580907..0c6d57115b8 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -25,6 +25,8 @@ extern crate libp2p_core; extern crate libp2p_tcp_transport; extern crate tokio_current_thread; extern crate tokio_io; +extern crate env_logger; + use bytes::BytesMut; use futures::future::Future; @@ -50,7 +52,7 @@ impl Clone for OnlyOnce { ) } } -impl Transport for OnlyOnce { +impl Transport for OnlyOnce { type Output = T::Output; type MultiaddrFuture = T::MultiaddrFuture; type Listener = T::Listener; @@ -68,11 +70,14 @@ impl Transport for OnlyOnce { } } + + #[test] fn client_to_server_outbound() { // A client opens a connection to a server, then an outgoing substream, then sends a message // on that substream. + let _ = env_logger::try_init(); let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { @@ -83,7 +88,7 @@ fn client_to_server_outbound() { .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) - .and_then(|(client, _)| client.unwrap()) + .and_then(|(client, _)| { client.expect("Client could not be created") }) .map(|client| client.0) .map(|client| Framed::<_, BytesMut>::new(client)) .and_then(|client| { @@ -119,6 +124,7 @@ fn connection_reused_for_dialing() { // A client dials the same multiaddress twice in a row. We check that it uses two substreams // instead of opening two different connections. + let _ = env_logger::try_init(); let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { @@ -189,6 +195,7 @@ fn use_opened_listen_to_dial() { // substream on that same connection, that the client has to accept. The client then sends a // message on that new substream. + let _ = env_logger::try_init(); let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || {