From 73cd38a9f54d92ebd5983fbcddf25e7f4e8d1251 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sun, 22 Jul 2018 18:17:15 +0200 Subject: [PATCH 1/8] Rewrite ConnectionReuse --- core/src/connection_reuse.rs | 712 +++++++++++++++++++++++++---------- 1 file changed, 505 insertions(+), 207 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 67bf9469342..9b0d37f8906 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -40,14 +40,16 @@ //! `MuxedTransport` trait. use fnv::FnvHashMap; -use futures::future::{self, Either, FutureResult}; -use futures::{Async, Future, Poll, Stream}; +use futures::future::{self, FutureResult}; +use futures::{Async, Future, Poll, Stream, stream, task}; use futures::stream::FuturesUnordered; -use futures::sync::mpsc; use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; -use std::io::{self, Error as IoError}; +use std::collections::hash_map::Entry; +use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; +use std::mem; +use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; @@ -62,54 +64,80 @@ use upgrade::ConnectionUpgrade; pub struct ConnectionReuse where T: Transport, - T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer, { - // Underlying transport and connection upgrade for when we need to dial or listen. - inner: UpgradedNode, - - // Struct shared between most of the `ConnectionReuse` infrastructure. - shared: Arc>>, + /// Struct shared between most of the `ConnectionReuse` infrastructure. + shared: Arc>>, } -struct Shared +/// Struct shared between most of the `ConnectionReuse` infrastructure. +struct Shared where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, { - // List of active muxers. - active_connections: FnvHashMap, + /// Underlying transport and connection upgrade, used when we need to dial or listen. + transport: UpgradedNode, + + /// All the connections that were opened, whether successful and/or active or not. + // TODO: this will grow forever + connections: FnvHashMap>, - // List of pending inbound substreams from dialed nodes. - // Only add to this list elements received through `add_to_next_rx`. - next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>, + /// Tasks to notify when one or more new elements were added to `connections`. + notify_on_new_connec: Vec, - // New elements are not directly added to `next_incoming`. Instead they are sent to this - // channel. This is done so that we can wake up tasks whenever a new element is added. - add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>, + /// Next `connection_id` to use when opening a connection. + next_connection_id: u64, +} - // Other side of `add_to_next_rx`. - add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>, +enum PeerState where M: StreamMuxer { + /// Connection is active and can be used to open substreams. + Active { + /// The muxer to open new substreams. + muxer: M, + /// Next incoming substream. + next_incoming: M::InboundSubstream, + /// Future of the address of the client. + client_addr: Multiaddr, + /// Unique identifier for this connection in the `ConnectionReuse`. + connection_id: u64, + /// Number of open substreams. + num_substreams: u64, + }, + + /// Connection is pending. + // TODO: stronger Future type + Pending { + /// Future that produces the muxer. + future: Box>, + /// All the tasks to notify when `future` resolves. + notify: Vec, + }, + + /// An earlier connection attempt errored. + Errored(IoError), + + /// The `PeerState` is poisonned. Happens if a panic happened while executing some of the + /// functions. + Poisonned, } impl From> for ConnectionReuse where T: Transport, - T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer, { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { - let (tx, rx) = mpsc::unbounded(); - ConnectionReuse { - inner: node, shared: Arc::new(Mutex::new(Shared { - active_connections: Default::default(), - next_incoming: Vec::new(), - add_to_next_rx: rx, - add_to_next_tx: tx, + transport: node, + connections: Default::default(), + notify_on_new_connec: Vec::new(), + next_connection_id: 0, })), } } @@ -122,21 +150,22 @@ where C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + C::NamesIter: Clone, + UpgradedNode: Clone, { - type Output = ::Substream; + type Output = ConnectionReuseSubstream; type MultiaddrFuture = future::FutureResult; type Listener = Box>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; - type Dial = Box>; + type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { + let transport = self.shared.lock().transport.clone(); + let (listener, new_addr) = match transport.listen_on(addr.clone()) { Ok((l, a)) => (l, a), - Err((inner, addr)) => { + Err((_, addr)) => { return Err(( ConnectionReuse { - inner: inner, shared: self.shared, }, addr, @@ -145,88 +174,50 @@ where }; let listener = listener - .fuse() .map(|upgr| { upgr.and_then(|(out, addr)| { + trace!("Waiting for remote's address as listener"); addr.map(move |addr| (out, addr)) }) - }); + }) + .fuse(); let listener = ConnectionReuseListener { shared: self.shared.clone(), listener: listener, current_upgrades: FuturesUnordered::new(), - connections: Vec::new(), }; Ok((Box::new(listener) as Box<_>, new_addr)) } + #[inline] fn dial(self, addr: Multiaddr) -> Result { - // If we already have an active connection, use it! - let substream = if let Some(muxer) = self.shared - .lock() - .active_connections - .get(&addr) - .map(|muxer| muxer.clone()) - { - let a = addr.clone(); - Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, future::ok(a))))) - } else { - Either::B(future::ok(None)) + let mut shared = self.shared.lock(); + + // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise + // the returned `Future` will immediately produce the error. + let must_clear = match shared.connections.get(&addr) { + Some(&PeerState::Errored(ref err)) => { + trace!("Clearing existing connection to {} which errored earlier: {:?}", addr, err); + true + }, + _ => false, }; + if must_clear { + shared.connections.remove(&addr); + } - let shared = self.shared.clone(); - let inner = self.inner; - let future = substream.and_then(move |outbound| { - if let Some(o) = outbound { - debug!("Using existing multiplexed connection to {}", addr); - return Either::A(future::ok(o)); - } - // The previous stream muxer did not yield a new substream => start new dial - debug!("No existing connection to {}; dialing", addr); - match inner.dial(addr.clone()) { - Ok(dial) => { - let future = dial - .and_then(move |(muxer, addr_fut)| { - trace!("Waiting for remote's address"); - addr_fut.map(move |addr| (muxer, addr)) - }) - .and_then(move |(muxer, addr)| { - muxer.clone().outbound().and_then(move |substream| { - if let Some(s) = substream { - // Replace the active connection because we are the most recent. - let mut lock = shared.lock(); - lock.active_connections.insert(addr.clone(), muxer.clone()); - // TODO: doesn't need locking ; the sender could be extracted - let _ = lock.add_to_next_tx.unbounded_send(( - muxer.clone(), - muxer.inbound(), - addr.clone(), - )); - Ok((s, future::ok(addr))) - } else { - error!("failed to dial to {}", addr); - shared.lock().active_connections.remove(&addr); - Err(io::Error::new(io::ErrorKind::Other, "dial failed")) - } - }) - }); - Either::B(Either::A(future)) - } - Err(_) => { - let e = io::Error::new(io::ErrorKind::Other, "transport rejected dial"); - Either::B(Either::B(future::err(e))) - } - } - }); - - Ok(Box::new(future) as Box<_>) + Ok(ConnectionReuseDial { + outbound: None, + shared: self.shared.clone(), + addr, + }) } #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.transport().nat_traversal(server, observed) + self.shared.lock().transport.transport().nat_traversal(server, observed) } } @@ -237,11 +228,12 @@ where C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant + C::NamesIter: Clone, + UpgradedNode: Clone, { - type Incoming = ConnectionReuseIncoming; + type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = - future::FutureResult<(::Substream, Self::MultiaddrFuture), IoError>; + future::FutureResult<(ConnectionReuseSubstream, Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -251,27 +243,206 @@ where } } +/// Implementation of `Future` for dialing a node. +pub struct ConnectionReuseDial +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + /// The future that will construct the substream, the connection id the muxer comes from, and + /// the `Future` of the client's multiaddr. + /// If `None`, we need to grab a new outbound substream from the muxer. + outbound: Option>, + + // Shared between the whole connection reuse mechanism. + shared: Arc>>, + + // The address we're trying to dial. + addr: Multiaddr, +} + +struct ConnectionReuseDialOut +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + /// The pending outbound substream. + stream: ::OutboundSubstream, + /// Id of the connection that was used to create the substream. + connection_id: u64, + /// Address of the remote. + client_addr: Multiaddr, +} + +impl Future for ConnectionReuseDial +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone + 'static, + UpgradedNode: Transport + Clone, + as Transport>::Dial: 'static, + as Transport>::MultiaddrFuture: 'static, +{ + type Item = (ConnectionReuseSubstream, FutureResult); + type Error = IoError; + + fn poll(&mut self) -> Poll { + loop { + let should_kill_existing_muxer; + if let Some(mut outbound) = self.outbound.take() { + match outbound.stream.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("Opened new outgoing substream to {}", self.addr); + let substream = ConnectionReuseSubstream { + connection_id: outbound.connection_id, + shared: self.shared.clone(), + inner, + addr: outbound.client_addr.clone(), + }; + return Ok(Async::Ready((substream, future::ok(outbound.client_addr)))); + }, + Ok(Async::NotReady) => { + self.outbound = Some(outbound); + return Ok(Async::NotReady); + }, + Ok(Async::Ready(None)) => { + // The muxer can no longer produce outgoing substreams. + // Let's reopen a connection. + trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); + should_kill_existing_muxer = true; + }, + Err(err) => { + // If we get an error while opening a substream, we decide to ignore it + // and open a new muxer. + // If opening the muxer produces an error, *then* we will return it. + debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); + should_kill_existing_muxer = true; + }, + } + } else { + should_kill_existing_muxer = false; + } + + // If we reach this point, that means we have to fill `self.outbound`. + // If `should_kill_existing_muxer`, do not use any existing connection but create a + // new one instead. + let mut shared = self.shared.lock(); + let shared = &mut *shared; // Avoids borrow errors + + // TODO: could be optimized + if should_kill_existing_muxer { + shared.connections.remove(&self.addr); + } + let connec = match shared.connections.entry(self.addr.clone()) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + // Build the connection. + let state = match shared.transport.clone().dial(self.addr.clone()) { + Ok(future) => { + trace!("Opened new connection to {:?}", self.addr); + let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); + let future = Box::new(future); + PeerState::Pending { future, notify: Vec::new() } + }, + Err(_) => { + trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); + let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); + PeerState::Errored(err) + }, + }; + + for task in shared.notify_on_new_connec.drain(..) { + task.notify(); + } + + e.insert(state) + }, + }; + + match mem::replace(&mut *connec, PeerState::Poisonned) { + PeerState::Active { muxer, next_incoming, connection_id, num_substreams, client_addr } => { + let first_outbound = muxer.clone().outbound(); + *connec = PeerState::Active { muxer, next_incoming, connection_id, num_substreams, client_addr: client_addr.clone() }; + trace!("Using existing connection to {} to open outbound substream", self.addr); + self.outbound = Some(ConnectionReuseDialOut { + stream: first_outbound, + connection_id, + client_addr, + }); + }, + PeerState::Pending { mut future, mut notify } => { + match future.poll() { + Ok(Async::Ready((muxer, client_addr))) => { + trace!("Successful new connection to {} ({})", self.addr, client_addr); + for task in notify { + task.notify(); + } + let next_incoming = muxer.clone().inbound(); + let first_outbound = muxer.clone().outbound(); + let connection_id = shared.next_connection_id; + shared.next_connection_id += 1; + *connec = PeerState::Active { muxer, next_incoming, connection_id, num_substreams: 1, client_addr: client_addr.clone() }; + self.outbound = Some(ConnectionReuseDialOut { + stream: first_outbound, + connection_id, + client_addr, + }); + }, + Ok(Async::NotReady) => { + notify.push(task::current()); + *connec = PeerState::Pending { future, notify }; + return Ok(Async::NotReady); + }, + Err(err) => { + trace!("Failed new connection to {}: {:?}", self.addr, err); + let io_err = IoError::new(err.kind(), err.to_string()); + *connec = PeerState::Errored(err); + return Err(io_err); + }, + } + }, + PeerState::Errored(err) => { + trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); + let io_err = IoError::new(err.kind(), err.to_string()); + *connec = PeerState::Errored(err); + return Err(io_err); + }, + PeerState::Poisonned => { + panic!("Poisonned peer state"); + }, + } + } + } +} + /// Implementation of `Stream` for the connections incoming from listening on a specific address. -pub struct ConnectionReuseListener +pub struct ConnectionReuseListener where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, { - // The main listener. `S` is from the underlying transport. - listener: S, - current_upgrades: FuturesUnordered, - connections: Vec<(M, ::InboundSubstream, Multiaddr)>, + /// The main listener. + listener: stream::Fuse, + /// Opened connections that need to be upgraded. + current_upgrades: FuturesUnordered>>, - // Shared between the whole connection reuse mechanism. - shared: Arc>>, + /// Shared between the whole connection reuse mechanism. + shared: Arc>>, } -impl Stream for ConnectionReuseListener +impl Stream for ConnectionReuseListener where - S: Stream, - F: Future, - M: StreamMuxer + Clone + 'static, // TODO: 'static :( + T: Transport, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, + L: Stream, + Lu: Future + 'static, { - type Item = FutureResult<(M::Substream, FutureResult), IoError>; + type Item = FutureResult<(ConnectionReuseSubstream, FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -281,145 +452,272 @@ where loop { match self.listener.poll() { Ok(Async::Ready(Some(upgrade))) => { - self.current_upgrades.push(upgrade); + trace!("New incoming connection"); + self.current_upgrades.push(Box::new(upgrade)); } Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { - debug!("listener has been closed"); - if self.connections.is_empty() && self.current_upgrades.is_empty() { + debug!("Listener has been closed"); + if self.current_upgrades.is_empty() { return Ok(Async::Ready(None)); + } else { + break; } - break } Err(err) => { - debug!("error while polling listener: {:?}", err); - if self.connections.is_empty() && self.current_upgrades.is_empty() { + debug!("Error while polling listener: {:?}", err); + if self.current_upgrades.is_empty() { return Err(err); + } else { + break; } - break } - } + }; } + // Process the connections being upgraded. loop { match self.current_upgrades.poll() { Ok(Async::Ready(Some((muxer, client_addr)))) => { + // Successfully upgraded a new incoming connection. + trace!("New multiplexed connection from {}", client_addr); + let mut shared = self.shared.lock(); let next_incoming = muxer.clone().inbound(); - self.connections - .push((muxer.clone(), next_incoming, client_addr.clone())); - } - Err(err) => { - debug!("error while upgrading listener connection: {:?}", err); - return Ok(Async::Ready(Some(future::err(err)))); + let connection_id = shared.next_connection_id; + shared.next_connection_id += 1; + let state = PeerState::Active { muxer, next_incoming, connection_id, num_substreams: 1, client_addr: client_addr.clone() }; + shared.connections.insert(client_addr, state); + for to_notify in shared.notify_on_new_connec.drain(..) { + to_notify.notify(); + } } - _ => break, - } - } - - // Check whether any incoming substream is ready. - for n in (0..self.connections.len()).rev() { - let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n); - match next_incoming.poll() { Ok(Async::Ready(None)) => { - // stream muxer gave us a `None` => connection should be considered closed - debug!("no more inbound substreams on {}", client_addr); - self.shared.lock().active_connections.remove(&client_addr); - } - Ok(Async::Ready(Some(incoming))) => { - // We overwrite any current active connection to that multiaddr because we - // are the freshest possible connection. - self.shared - .lock() - .active_connections - .insert(client_addr.clone(), muxer.clone()); - // A new substream is ready. - let mut new_next = muxer.clone().inbound(); - self.connections - .push((muxer, new_next, client_addr.clone())); - return Ok(Async::Ready(Some( - future::ok((incoming, future::ok(client_addr))), - ))); + // No upgrade remaining ; if the listener is closed, close everything. + if self.listener.is_done() { + return Ok(Async::Ready(None)); + } else { + break; + } } Ok(Async::NotReady) => { - self.connections.push((muxer, next_incoming, client_addr)); - } + break; + }, Err(err) => { - debug!("error while upgrading the multiplexed incoming connection: {:?}", err); - // Insert the rest of the pending connections, but not the current one. + // Insert the rest of the pending upgrades, but not the current one. + debug!("Error while upgrading listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } } } - // Nothing is ready, return `NotReady`. + // TODO: the listener should also poll the incoming substreams on the active connections + // that it opened, so that the muxed transport doesn't have to do it + Ok(Async::NotReady) } } /// Implementation of `Future` that yields the next incoming substream from a dialed connection. -pub struct ConnectionReuseIncoming +pub struct ConnectionReuseIncoming where - M: StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, { // Shared between the whole connection reuse system. - shared: Arc>>, + shared: Arc>>, } -impl Future for ConnectionReuseIncoming +impl Future for ConnectionReuseIncoming where - M: Clone + StreamMuxer, + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, { - type Item = future::FutureResult<(M::Substream, future::FutureResult), IoError>; + type Item = future::FutureResult<(ConnectionReuseSubstream, future::FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll { - let mut lock = self.shared.lock(); - - // Try to get any new muxer from `add_to_next_rx`. - // We push the new muxers to a channel instead of adding them to `next_incoming`, so that - // tasks are notified when something is pushed. - loop { - match lock.add_to_next_rx.poll() { - Ok(Async::Ready(Some(elem))) => { - lock.next_incoming.push(elem); - } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) | Err(_) => unreachable!( - "the sender and receiver are both in the same struct, therefore \ - the link can never break" - ), + let mut shared = self.shared.lock(); + + // Keys of the elements in `shared.connections` to remove afterwards. + let mut to_remove = Vec::new(); + // Substream to return, if any found. + let mut ret_value = None; + + for (addr, state) in shared.connections.iter_mut() { + match *state { + PeerState::Active { ref mut next_incoming, ref muxer, ref mut num_substreams, connection_id, ref client_addr } => { + match next_incoming.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("New incoming substream from {}", client_addr); + let next = muxer.clone().inbound(); + *next_incoming = next; + *num_substreams += 1; + let substream = ConnectionReuseSubstream { + inner, + shared: self.shared.clone(), + connection_id, + addr: client_addr.clone(), + }; + ret_value = Some(Ok((substream, future::ok(client_addr.clone())))); + break; + }, + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + }, + Ok(Async::NotReady) => (), + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!("Error while opening inbound substream to {}: {:?}", addr, err); + to_remove.push(addr.clone()); + ret_value = Some(Err(err)); + break; + }, + } + }, + PeerState::Pending { ref mut notify, .. } => { + // TODO: this will add a new element at each iteration + notify.push(task::current()); + }, + PeerState::Errored(_) => {}, + PeerState::Poisonned => { + panic!("Poisonned peer state"); + }, } } - // Check whether any incoming substream is ready. - for n in (0..lock.next_incoming.len()).rev() { - let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n); - match future.poll() { - Ok(Async::Ready(None)) => { - debug!("no inbound substream for {}", addr); - lock.active_connections.remove(&addr); - } - Ok(Async::Ready(Some(value))) => { - // A substream is ready ; push back the muxer for the next time this function - // is called, then return. - debug!("New incoming substream"); - let next = muxer.clone().inbound(); - lock.next_incoming.push((muxer, next, addr.clone())); - return Ok(Async::Ready(future::ok((value, future::ok(addr))))); - } - Ok(Async::NotReady) => { - lock.next_incoming.push((muxer, future, addr)); - } - Err(err) => { - // In case of error, we just not push back the element, which drops it. - debug!("ConnectionReuse incoming: one of the \ - multiplexed substreams produced an error: {:?}", - err); + for to_remove in to_remove { + shared.connections.remove(&to_remove); + } + + match ret_value { + Some(Ok(val)) => Ok(Async::Ready(future::ok(val))), + Some(Err(err)) => Err(err), + None => { + // TODO: will add an element to the list every time + shared.notify_on_new_connec.push(task::current()); + Ok(Async::NotReady) + }, + } + } +} + +/// Wraps around the `Substream`. +pub struct ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + inner: ::Substream, + shared: Arc>>, + /// Id this connection was created from. + connection_id: u64, + /// Address of the remote. + addr: Multiaddr, +} + +impl Deref for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + type Target = ::Substream; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Read for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.inner.read(buf) + } +} + +impl AsyncRead for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ +} + +impl Write for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + self.inner.flush() + } +} + +impl AsyncWrite for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + self.inner.shutdown() + } +} + +impl Drop for ConnectionReuseSubstream +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + fn drop(&mut self) { + // Remove one substream from all the connections whose connection_id matches ours. + let mut shared = self.shared.lock(); + shared.connections.retain(|_, connec| { + if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { + if *connection_id == self.connection_id { + *num_substreams -= 1; + if *num_substreams == 0 { + trace!("All substreams to {} closed ; closing main connection", self.addr); + return false; + } } } - } - // Nothing is ready. - Ok(Async::NotReady) + true + }); } } From 4495ba68b2a0841b70d8e2c5356f691e8677b404 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 23 Jul 2018 12:05:26 +0200 Subject: [PATCH 2/8] Listen to incoming substreams on the listener --- core/src/connection_reuse.rs | 228 +++++++++++++++++++++-------------- 1 file changed, 137 insertions(+), 91 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 9b0d37f8906..4a6834ac623 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -90,6 +90,9 @@ where /// Next `connection_id` to use when opening a connection. next_connection_id: u64, + + /// Next `listener_id` for the next listener we create. + next_listener_id: u64, } enum PeerState where M: StreamMuxer { @@ -105,6 +108,9 @@ enum PeerState where M: StreamMuxer { connection_id: u64, /// Number of open substreams. num_substreams: u64, + /// Id of the listener that created this connection, or `None` if it was opened by a + /// dialer. + listener_id: Option, }, /// Connection is pending. @@ -138,6 +144,7 @@ where connections: Default::default(), notify_on_new_connec: Vec::new(), next_connection_id: 0, + next_listener_id: 0, })), } } @@ -160,13 +167,14 @@ where type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let transport = self.shared.lock().transport.clone(); - let (listener, new_addr) = match transport.listen_on(addr.clone()) { + let mut shared = self.shared.lock(); + + let (listener, new_addr) = match shared.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), Err((_, addr)) => { return Err(( ConnectionReuse { - shared: self.shared, + shared: self.shared.clone(), }, addr, )); @@ -182,9 +190,13 @@ where }) .fuse(); + let listener_id = shared.next_listener_id; + shared.next_listener_id += 1; + let listener = ConnectionReuseListener { shared: self.shared.clone(), - listener: listener, + listener, + listener_id, current_upgrades: FuturesUnordered::new(), }; @@ -362,9 +374,9 @@ where }; match mem::replace(&mut *connec, PeerState::Poisonned) { - PeerState::Active { muxer, next_incoming, connection_id, num_substreams, client_addr } => { + PeerState::Active { muxer, next_incoming, connection_id, listener_id, num_substreams, client_addr } => { let first_outbound = muxer.clone().outbound(); - *connec = PeerState::Active { muxer, next_incoming, connection_id, num_substreams, client_addr: client_addr.clone() }; + *connec = PeerState::Active { muxer, next_incoming, connection_id, listener_id, num_substreams, client_addr: client_addr.clone() }; trace!("Using existing connection to {} to open outbound substream", self.addr); self.outbound = Some(ConnectionReuseDialOut { stream: first_outbound, @@ -383,7 +395,7 @@ where let first_outbound = muxer.clone().outbound(); let connection_id = shared.next_connection_id; shared.next_connection_id += 1; - *connec = PeerState::Active { muxer, next_incoming, connection_id, num_substreams: 1, client_addr: client_addr.clone() }; + *connec = PeerState::Active { muxer, next_incoming, connection_id, num_substreams: 1, listener_id: None, client_addr: client_addr.clone() }; self.outbound = Some(ConnectionReuseDialOut { stream: first_outbound, connection_id, @@ -426,6 +438,8 @@ where { /// The main listener. listener: stream::Fuse, + /// Identifier for this listener. Used to determine which connections were opened by it. + listener_id: u64, /// Opened connections that need to be upgraded. current_upgrades: FuturesUnordered>>, @@ -458,19 +472,11 @@ where Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { debug!("Listener has been closed"); - if self.current_upgrades.is_empty() { - return Ok(Async::Ready(None)); - } else { - break; - } + break; } Err(err) => { debug!("Error while polling listener: {:?}", err); - if self.current_upgrades.is_empty() { - return Err(err); - } else { - break; - } + return Err(err); } }; } @@ -485,21 +491,13 @@ where let next_incoming = muxer.clone().inbound(); let connection_id = shared.next_connection_id; shared.next_connection_id += 1; - let state = PeerState::Active { muxer, next_incoming, connection_id, num_substreams: 1, client_addr: client_addr.clone() }; + let state = PeerState::Active { muxer, next_incoming, connection_id, listener_id: Some(self.listener_id), num_substreams: 1, client_addr: client_addr.clone() }; shared.connections.insert(client_addr, state); for to_notify in shared.notify_on_new_connec.drain(..) { to_notify.notify(); } } - Ok(Async::Ready(None)) => { - // No upgrade remaining ; if the listener is closed, close everything. - if self.listener.is_done() { - return Ok(Async::Ready(None)); - } else { - break; - } - } - Ok(Async::NotReady) => { + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { break; }, Err(err) => { @@ -510,10 +508,26 @@ where } } - // TODO: the listener should also poll the incoming substreams on the active connections - // that it opened, so that the muxed transport doesn't have to do it - - Ok(Async::NotReady) + // Poll all the incoming connections on all the connections we opened. + let mut shared = self.shared.lock(); + match poll_incoming(&self.shared, &mut shared, Some(self.listener_id)) { + Ok(Async::Ready(None)) => { + if self.listener.is_done() && self.current_upgrades.is_empty() { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + }, + Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(Some(substream))) + }, + Ok(Async::NotReady) => { + Ok(Async::NotReady) + } + Err(err) => { + Ok(Async::Ready(Some(future::err(err)))) + } + } } } @@ -537,76 +551,108 @@ where type Item = future::FutureResult<(ConnectionReuseSubstream, future::FutureResult), IoError>; type Error = IoError; + #[inline] fn poll(&mut self) -> Poll { let mut shared = self.shared.lock(); - - // Keys of the elements in `shared.connections` to remove afterwards. - let mut to_remove = Vec::new(); - // Substream to return, if any found. - let mut ret_value = None; - - for (addr, state) in shared.connections.iter_mut() { - match *state { - PeerState::Active { ref mut next_incoming, ref muxer, ref mut num_substreams, connection_id, ref client_addr } => { - match next_incoming.poll() { - Ok(Async::Ready(Some(inner))) => { - trace!("New incoming substream from {}", client_addr); - let next = muxer.clone().inbound(); - *next_incoming = next; - *num_substreams += 1; - let substream = ConnectionReuseSubstream { - inner, - shared: self.shared.clone(), - connection_id, - addr: client_addr.clone(), - }; - ret_value = Some(Ok((substream, future::ok(client_addr.clone())))); - break; - }, - Ok(Async::Ready(None)) => { - // The muxer isn't capable of opening any inbound stream anymore, so - // we close the connection entirely. - trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); - to_remove.push(addr.clone()); - }, - Ok(Async::NotReady) => (), - Err(err) => { - // If an error happens while opening an inbound stream, we close the - // connection entirely. - trace!("Error while opening inbound substream to {}: {:?}", addr, err); - to_remove.push(addr.clone()); - ret_value = Some(Err(err)); - break; - }, - } - }, - PeerState::Pending { ref mut notify, .. } => { - // TODO: this will add a new element at each iteration - notify.push(task::current()); - }, - PeerState::Errored(_) => {}, - PeerState::Poisonned => { - panic!("Poisonned peer state"); - }, - } - } - - for to_remove in to_remove { - shared.connections.remove(&to_remove); - } - - match ret_value { - Some(Ok(val)) => Ok(Async::Ready(future::ok(val))), - Some(Err(err)) => Err(err), - None => { + match poll_incoming(&self.shared, &mut shared, None) { + Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(substream)) + }, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { // TODO: will add an element to the list every time shared.notify_on_new_connec.push(task::current()); Ok(Async::NotReady) }, + Err(err) => Err(err) } } } +/// Polls the incoming substreams on all the incoming connections that match the `listener`. +/// +/// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if +/// one or more connections are matching the `listener` but they are not ready. +fn poll_incoming(shared_arc: &Arc>>, shared: &mut Shared, listener: Option) + -> Poll, FutureResult), IoError>>, IoError> +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + // Keys of the elements in `shared.connections` to remove afterwards. + let mut to_remove = Vec::new(); + // Substream to return, if any found. + let mut ret_value = None; + let mut found_one = false; + + for (addr, state) in shared.connections.iter_mut() { + match *state { + PeerState::Active { ref mut next_incoming, ref muxer, ref mut num_substreams, connection_id, ref client_addr, listener_id } => { + if listener_id != listener { + continue; + } + found_one = true; + + match next_incoming.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("New incoming substream from {}", client_addr); + let next = muxer.clone().inbound(); + *next_incoming = next; + *num_substreams += 1; + let substream = ConnectionReuseSubstream { + inner, + shared: shared_arc.clone(), + connection_id, + addr: client_addr.clone(), + }; + ret_value = Some(Ok((substream, future::ok(client_addr.clone())))); + break; + }, + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + }, + Ok(Async::NotReady) => (), + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!("Error while opening inbound substream to {}: {:?}", addr, err); + to_remove.push(addr.clone()); + ret_value = Some(Err(err)); + break; + }, + } + }, + PeerState::Pending { ref mut notify, .. } => { + // TODO: this will add a new element at each iteration + notify.push(task::current()); + }, + PeerState::Errored(_) => {}, + PeerState::Poisonned => { + panic!("Poisonned peer state"); + }, + } + } + + for to_remove in to_remove { + shared.connections.remove(&to_remove); + } + + match ret_value { + Some(Ok(val)) => Ok(Async::Ready(Some(future::ok(val)))), + Some(Err(err)) => Err(err), + None => { + if found_one { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + }, + } +} + /// Wraps around the `Substream`. pub struct ConnectionReuseSubstream where From 4c008ea04fd82d03258368a5816993ee5f57497a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 23 Jul 2018 15:47:57 +0200 Subject: [PATCH 3/8] Fix dialer not counting as active substream --- core/src/connection_reuse.rs | 58 +++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 4a6834ac623..f1328ad4487 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -374,12 +374,13 @@ where }; match mem::replace(&mut *connec, PeerState::Poisonned) { - PeerState::Active { muxer, next_incoming, connection_id, listener_id, num_substreams, client_addr } => { - let first_outbound = muxer.clone().outbound(); + PeerState::Active { muxer, next_incoming, connection_id, listener_id, mut num_substreams, client_addr } => { + let outbound = muxer.clone().outbound(); + num_substreams += 1; *connec = PeerState::Active { muxer, next_incoming, connection_id, listener_id, num_substreams, client_addr: client_addr.clone() }; trace!("Using existing connection to {} to open outbound substream", self.addr); self.outbound = Some(ConnectionReuseDialOut { - stream: first_outbound, + stream: outbound, connection_id, client_addr, }); @@ -429,6 +430,20 @@ where } } +impl Drop for ConnectionReuseDial +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + fn drop(&mut self) { + if let Some(outbound) = self.outbound.take() { + let mut shared = self.shared.lock(); + remove_one_substream(&mut *shared, outbound.connection_id, &outbound.client_addr); + } + } +} + /// Implementation of `Stream` for the connections incoming from listening on a specific address. pub struct ConnectionReuseListener where @@ -653,6 +668,28 @@ where } } +/// Removes one substream from an active connection. Closes the connection if necessary. +fn remove_one_substream(shared: &mut Shared, connec_id: u64, addr: &Multiaddr) +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer, +{ + shared.connections.retain(|_, connec| { + if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { + if *connection_id == connec_id { + *num_substreams -= 1; + if *num_substreams == 0 { + trace!("All substreams to {} closed ; closing main connection", addr); + return false; + } + } + } + + true + }); +} + /// Wraps around the `Substream`. pub struct ConnectionReuseSubstream where @@ -750,20 +787,7 @@ where C::Output: StreamMuxer, { fn drop(&mut self) { - // Remove one substream from all the connections whose connection_id matches ours. let mut shared = self.shared.lock(); - shared.connections.retain(|_, connec| { - if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { - if *connection_id == self.connection_id { - *num_substreams -= 1; - if *num_substreams == 0 { - trace!("All substreams to {} closed ; closing main connection", self.addr); - return false; - } - } - } - - true - }); + remove_one_substream(&mut *shared, self.connection_id, &self.addr); } } From 583ef9c1afa56e27876ec0b0a762b54f3e0eb061 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 25 Jul 2018 14:46:11 +0200 Subject: [PATCH 4/8] Rewrite multiplexing tests to use MemoryTransport --- core/tests/multiplex.rs | 74 +++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 48 deletions(-) diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs index dbb4f123134..4c0c9210551 100644 --- a/core/tests/multiplex.rs +++ b/core/tests/multiplex.rs @@ -29,9 +29,8 @@ extern crate tokio_io; use bytes::BytesMut; use futures::future::Future; use futures::{Sink, Stream}; -use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport}; -use libp2p_tcp_transport::TcpConfig; -use std::sync::{atomic, mpsc}; +use libp2p_core::{Multiaddr, MuxedTransport, StreamMuxer, Transport, transport}; +use std::sync::atomic; use std::thread; use tokio_io::codec::length_delimited::Framed; @@ -74,19 +73,14 @@ fn client_to_server_outbound() { // A client opens a connection to a server, then an outgoing substream, then sends a message // on that substream. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = TcpConfig::new() + let future = rx .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); - - let (listener, addr) = transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + .into_connection_reuse() + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) @@ -107,11 +101,10 @@ fn client_to_server_outbound() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = TcpConfig::new().with_upgrade(multiplex::MplexConfig::new()); - - let future = transport - .dial(rx.recv().unwrap()) - .unwrap() + let future = tx + .with_upgrade(multiplex::MplexConfig::new()) + .dial("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()) .and_then(|client| client.0.outbound()) .map(|server| Framed::<_, BytesMut>::new(server.unwrap())) .and_then(|server| server.send("hello world".into())) @@ -126,19 +119,14 @@ fn connection_reused_for_dialing() { // A client dials the same multiaddress twice in a row. We check that it uses two substreams // instead of opening two different connections. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = OnlyOnce::from(TcpConfig::new()) + let future = OnlyOnce::from(rx) .with_upgrade(multiplex::MplexConfig::new()) - .into_connection_reuse(); - - let (listener, addr) = transport - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + .into_connection_reuse() + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest))) @@ -170,22 +158,20 @@ fn connection_reused_for_dialing() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = OnlyOnce::from(TcpConfig::new()) + let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) .into_connection_reuse(); - let listen_addr = rx.recv().unwrap(); - let future = transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .and_then(|server| server.send("hello world".into())) .and_then(|first_connec| { transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .map(|server| (first_connec, server)) @@ -203,19 +189,13 @@ fn use_opened_listen_to_dial() { // substream on that same connection, that the client has to accept. The client then sends a // message on that new substream. - let (tx, rx) = mpsc::channel(); + let (tx, rx) = transport::connector(); let bg_thread = thread::spawn(move || { - let transport = OnlyOnce::from(TcpConfig::new()) - .with_upgrade(multiplex::MplexConfig::new()); - - let (listener, addr) = transport - .clone() - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) - .unwrap_or_else(|_| panic!()); - tx.send(addr).unwrap(); - - let future = listener + let future = OnlyOnce::from(rx) + .with_upgrade(multiplex::MplexConfig::new()) + .listen_on("/memory".parse().unwrap()) + .unwrap_or_else(|_| panic!()).0 .into_future() .map_err(|(err, _)| err) .and_then(|(client, _)| client.unwrap()) @@ -247,15 +227,13 @@ fn use_opened_listen_to_dial() { tokio_current_thread::block_on_all(future).unwrap(); }); - let transport = OnlyOnce::from(TcpConfig::new()) + let transport = OnlyOnce::from(tx) .with_upgrade(multiplex::MplexConfig::new()) .into_connection_reuse(); - let listen_addr = rx.recv().unwrap(); - let future = transport .clone() - .dial(listen_addr.clone()) + .dial("/memory".parse().unwrap()) .unwrap_or_else(|_| panic!()) .map(|server| Framed::<_, BytesMut>::new(server.0)) .and_then(|server| server.send("hello world".into())) From 2a45393e1556b95e32c300ace2d61e3d7b5c8df8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 26 Jul 2018 13:17:12 +0200 Subject: [PATCH 5/8] Fix leak --- core/src/connection_reuse.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index f1328ad4487..f5a5c7282e1 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -50,7 +50,7 @@ use std::collections::hash_map::Entry; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::mem; use std::ops::{Deref, DerefMut}; -use std::sync::Arc; +use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; use upgrade::ConnectionUpgrade; @@ -86,7 +86,7 @@ where connections: FnvHashMap>, /// Tasks to notify when one or more new elements were added to `connections`. - notify_on_new_connec: Vec, + notify_on_new_connec: FnvHashMap, /// Next `connection_id` to use when opening a connection. next_connection_id: u64, @@ -142,7 +142,7 @@ where shared: Arc::new(Mutex::new(Shared { transport: node, connections: Default::default(), - notify_on_new_connec: Vec::new(), + notify_on_new_connec: Default::default(), next_connection_id: 0, next_listener_id: 0, })), @@ -365,8 +365,8 @@ where }, }; - for task in shared.notify_on_new_connec.drain(..) { - task.notify(); + for task in shared.notify_on_new_connec.drain() { + task.1.notify(); } e.insert(state) @@ -508,8 +508,8 @@ where shared.next_connection_id += 1; let state = PeerState::Active { muxer, next_incoming, connection_id, listener_id: Some(self.listener_id), num_substreams: 1, client_addr: client_addr.clone() }; shared.connections.insert(client_addr, state); - for to_notify in shared.notify_on_new_connec.drain(..) { - to_notify.notify(); + for to_notify in shared.notify_on_new_connec.drain() { + to_notify.1.notify(); } } Ok(Async::Ready(None)) | Ok(Async::NotReady) => { @@ -575,7 +575,11 @@ where }, Ok(Async::Ready(None)) | Ok(Async::NotReady) => { // TODO: will add an element to the list every time - shared.notify_on_new_connec.push(task::current()); + static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); + task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) + } + shared.notify_on_new_connec.insert(TASK_ID.with(|&v| v), task::current()); Ok(Async::NotReady) }, Err(err) => Err(err) From 87b8d75b0c3fcbd7350c2b10e43a89355f8791c0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 28 Jul 2018 14:33:27 +0200 Subject: [PATCH 6/8] Fix potentially using tons of memory --- core/src/connection_reuse.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index f5a5c7282e1..88b950676b9 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -119,7 +119,7 @@ enum PeerState where M: StreamMuxer { /// Future that produces the muxer. future: Box>, /// All the tasks to notify when `future` resolves. - notify: Vec, + notify: FnvHashMap, }, /// An earlier connection attempt errored. @@ -255,6 +255,12 @@ where } } +static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); +// `TASK_ID` is used internally to uniquely identify each task. +task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) +} + /// Implementation of `Future` for dialing a node. pub struct ConnectionReuseDial where @@ -356,7 +362,7 @@ where trace!("Opened new connection to {:?}", self.addr); let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); let future = Box::new(future); - PeerState::Pending { future, notify: Vec::new() } + PeerState::Pending { future, notify: Default::default() } }, Err(_) => { trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); @@ -390,7 +396,7 @@ where Ok(Async::Ready((muxer, client_addr))) => { trace!("Successful new connection to {} ({})", self.addr, client_addr); for task in notify { - task.notify(); + task.1.notify(); } let next_incoming = muxer.clone().inbound(); let first_outbound = muxer.clone().outbound(); @@ -404,7 +410,7 @@ where }); }, Ok(Async::NotReady) => { - notify.push(task::current()); + notify.insert(TASK_ID.with(|&t| t), task::current()); *connec = PeerState::Pending { future, notify }; return Ok(Async::NotReady); }, @@ -575,10 +581,6 @@ where }, Ok(Async::Ready(None)) | Ok(Async::NotReady) => { // TODO: will add an element to the list every time - static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); - task_local!{ - static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) - } shared.notify_on_new_connec.insert(TASK_ID.with(|&v| v), task::current()); Ok(Async::NotReady) }, @@ -645,8 +647,7 @@ where } }, PeerState::Pending { ref mut notify, .. } => { - // TODO: this will add a new element at each iteration - notify.push(task::current()); + notify.insert(TASK_ID.with(|&t| t), task::current()); }, PeerState::Errored(_) => {}, PeerState::Poisonned => { From 8c83d17f2a6bdee03814a1ccd70fd10a178bcd99 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Wed, 1 Aug 2018 15:35:04 +0200 Subject: [PATCH 7/8] Refactor to use Shared in a centralised manner --- core/src/connection_reuse.rs | 707 ++++++++++++++++++---------------- core/src/transport/upgrade.rs | 4 +- 2 files changed, 379 insertions(+), 332 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 88b950676b9..dcf911ae1cf 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -46,7 +46,6 @@ use futures::stream::FuturesUnordered; use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; -use std::collections::hash_map::Entry; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::mem; use std::ops::{Deref, DerefMut}; @@ -55,62 +54,37 @@ use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; use upgrade::ConnectionUpgrade; +use std::clone::Clone; + /// Allows reusing the same muxed connection multiple times. /// /// Can be created from an `UpgradedNode` through the `From` trait. /// /// Implements the `Transport` trait. -#[derive(Clone)] pub struct ConnectionReuse where T: Transport, - C: ConnectionUpgrade, - C::Output: StreamMuxer, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + Clone, { /// Struct shared between most of the `ConnectionReuse` infrastructure. - shared: Arc>>, + shared: Arc>, } -/// Struct shared between most of the `ConnectionReuse` infrastructure. -struct Shared -where - T: Transport, - C: ConnectionUpgrade, - C::Output: StreamMuxer, -{ - /// Underlying transport and connection upgrade, used when we need to dial or listen. - transport: UpgradedNode, - - /// All the connections that were opened, whether successful and/or active or not. - // TODO: this will grow forever - connections: FnvHashMap>, - - /// Tasks to notify when one or more new elements were added to `connections`. - notify_on_new_connec: FnvHashMap, - - /// Next `connection_id` to use when opening a connection. - next_connection_id: u64, - - /// Next `listener_id` for the next listener we create. - next_listener_id: u64, -} - -enum PeerState where M: StreamMuxer { +enum PeerState where M: StreamMuxer + Clone { /// Connection is active and can be used to open substreams. Active { /// The muxer to open new substreams. muxer: M, - /// Next incoming substream. - next_incoming: M::InboundSubstream, /// Future of the address of the client. client_addr: Multiaddr, /// Unique identifier for this connection in the `ConnectionReuse`. - connection_id: u64, + connection_id: usize, /// Number of open substreams. - num_substreams: u64, + num_substreams: usize, /// Id of the listener that created this connection, or `None` if it was opened by a /// dialer. - listener_id: Option, + listener_id: Option, }, /// Connection is pending. @@ -120,32 +94,189 @@ enum PeerState where M: StreamMuxer { future: Box>, /// All the tasks to notify when `future` resolves. notify: FnvHashMap, + /// Unique identifier for this connection in the `ConnectionReuse`. + connection_id: usize, }, /// An earlier connection attempt errored. - Errored(IoError), + Errored(IoError) +} + + +/// Struct shared between most of the `ConnectionReuse` infrastructure. +// #[derive(Clone)] +struct Shared +where + T: Transport, + C: ConnectionUpgrade, + C::Output: StreamMuxer + Clone, +{ + /// Underlying transport and connection upgrade, used when we need to dial or listen. + transport: UpgradedNode, - /// The `PeerState` is poisonned. Happens if a panic happened while executing some of the - /// functions. - Poisonned, + /// All the connections that were opened, whether successful and/or active or not. + // TODO: this will grow forever + connections: Mutex>>, + + /// Tasks to notify when one or more new elements were added to `connections`. + notify_on_new_connec: Mutex>, + + /// Next `connection_id` to use when opening a connection. + next_connection_id: AtomicUsize, + + /// Next `listener_id` for the next listener we create. + next_listener_id: AtomicUsize, } -impl From> for ConnectionReuse +impl Shared where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, +{ + /// notify all `Tasks` in `self.notify_on_new_connec` that a new connection was established + /// consume the tasks in that process + fn new_con_notify(&self) { + let mut notifiers = self.notify_on_new_connec.lock(); + for to_notify in notifiers.drain() { + to_notify.1.notify(); + }; + } + + /// generate a new connection id + #[inline] + pub fn gen_next_connection_id(&self) -> usize { + self.next_connection_id.fetch_add(1, Ordering::Relaxed) + } + + /// generate a new listener id + #[inline] + pub fn gen_next_listener_id(&self) -> usize { + self.next_listener_id.fetch_add(1, Ordering::Relaxed) + } + + /// Insert a new connection, returns Some if an entry was present, notify listeners + pub fn insert_connection(&self, addr: Multiaddr, state: PeerState) + -> Option> { + let r = self.replace_connection(addr, state); + self.new_con_notify(); + r + } + + /// Replace a new connection, returns Some if an entry was present, **do not** notify listeners + pub fn replace_connection(&self, addr: Multiaddr, state: PeerState) + -> Option> { + self.connections.lock().insert(addr, state) + } + + /// Removes one substream from an active connection. Closes the connection if necessary. + pub fn remove_substream(&self, connec_id: usize, addr: &Multiaddr) { + self.connections.lock().retain(|_, connec| { + if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { + if *connection_id == connec_id { + *num_substreams -= 1; + if *num_substreams == 0 { + trace!("All substreams to {} closed ; closing main connection", addr); + return false; + } + } + } + true + }); + } + + /// Polls the incoming substreams on all the incoming connections that match the `listener`. + /// + /// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if + /// one or more connections are matching the `listener` but they are not ready. + fn poll_incoming(&self, listener: Option) + -> Poll::Substream, usize, Multiaddr)>, IoError> + { + // Keys of the elements in `connections` to remove afterwards. + let mut to_remove = Vec::new(); + // Substream to return, if any found. + let mut ret_value = None; + let mut found_one = false; + + for (addr, state) in self.connections.lock().iter_mut() { + let res = if let PeerState::Active { ref mut muxer, ref mut num_substreams, + connection_id, client_addr, listener_id } = state { + if *listener_id == listener { + continue; + } + found_one = true; + + match muxer.clone().inbound().poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("New incoming substream from {}", client_addr); + *num_substreams += 1; + Ok((connection_id.clone(), inner, client_addr.clone())) + }, + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!("Error while opening inbound substream to {}: {:?}", addr, err); + to_remove.push(addr.clone()); + Err(err) + }, + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + continue + }, + Ok(Async::NotReady) => { continue } + } + } else { + if listener.is_none() { + if let PeerState::Pending { ref mut notify, .. } = state { + notify.insert(TASK_ID.with(|&t| t), task::current()); + } + } + continue + }; + + ret_value = Some(res); + break; + } + + for to_remove in to_remove { + self.connections.lock().remove(&to_remove); + } + + match ret_value { + Some(Ok((connection_id, inner, addr))) => + Ok(Async::Ready(Some((inner, connection_id, addr)))), + Some(Err(err)) => Err(err), + None => { + if found_one { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) + } + } + } + } +} + + +impl From> for ConnectionReuse +where + T: Transport, + C: ConnectionUpgrade + Clone, + C::Output: StreamMuxer + Clone, { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { ConnectionReuse { - shared: Arc::new(Mutex::new(Shared { + shared: Arc::new(Shared { transport: node, connections: Default::default(), notify_on_new_connec: Default::default(), - next_connection_id: 0, - next_listener_id: 0, - })), + next_connection_id: Default::default(), + next_listener_id: Default::default(), + }), } } } @@ -156,6 +287,7 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, + ::Substream : Clone, C::MultiaddrFuture: Future, C::NamesIter: Clone, UpgradedNode: Clone, @@ -167,14 +299,13 @@ where type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let mut shared = self.shared.lock(); - let (listener, new_addr) = match shared.transport.clone().listen_on(addr.clone()) { + let (listener, new_addr) = match self.shared.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), Err((_, addr)) => { return Err(( ConnectionReuse { - shared: self.shared.clone(), + shared: self.shared, }, addr, )); @@ -190,11 +321,10 @@ where }) .fuse(); - let listener_id = shared.next_listener_id; - shared.next_listener_id += 1; + let listener_id = self.shared.gen_next_listener_id(); let listener = ConnectionReuseListener { - shared: self.shared.clone(), + shared: self.shared, listener, listener_id, current_upgrades: FuturesUnordered::new(), @@ -205,11 +335,10 @@ where #[inline] fn dial(self, addr: Multiaddr) -> Result { - let mut shared = self.shared.lock(); // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise // the returned `Future` will immediately produce the error. - let must_clear = match shared.connections.get(&addr) { + let must_clear = match self.shared.connections.lock().get(&addr) { Some(&PeerState::Errored(ref err)) => { trace!("Clearing existing connection to {} which errored earlier: {:?}", addr, err); true @@ -217,19 +346,19 @@ where _ => false, }; if must_clear { - shared.connections.remove(&addr); + self.shared.connections.lock().remove(&addr); } Ok(ConnectionReuseDial { outbound: None, - shared: self.shared.clone(), + shared: self.shared, addr, }) } #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.shared.lock().transport.transport().nat_traversal(server, observed) + self.shared.transport.transport().nat_traversal(server, observed) } } @@ -239,6 +368,7 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, + ::Substream : Clone, C::MultiaddrFuture: Future, C::NamesIter: Clone, UpgradedNode: Clone, @@ -250,7 +380,7 @@ where #[inline] fn next_incoming(self) -> Self::Incoming { ConnectionReuseIncoming { - shared: self.shared.clone(), + shared: self.shared, } } } @@ -266,7 +396,7 @@ pub struct ConnectionReuseDial where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { /// The future that will construct the substream, the connection id the muxer comes from, and /// the `Future` of the client's multiaddr. @@ -274,7 +404,7 @@ where outbound: Option>, // Shared between the whole connection reuse mechanism. - shared: Arc>>, + shared: Arc>, // The address we're trying to dial. addr: Multiaddr, @@ -284,12 +414,12 @@ struct ConnectionReuseDialOut where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { /// The pending outbound substream. stream: ::OutboundSubstream, /// Id of the connection that was used to create the substream. - connection_id: u64, + connection_id: usize, /// Address of the remote. client_addr: Multiaddr, } @@ -299,6 +429,7 @@ where T: Transport, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone + 'static, + ::Substream : Clone, UpgradedNode: Transport + Clone, as Transport>::Dial: 'static, as Transport>::MultiaddrFuture: 'static, @@ -308,129 +439,139 @@ where fn poll(&mut self) -> Poll { loop { - let should_kill_existing_muxer; - if let Some(mut outbound) = self.outbound.take() { - match outbound.stream.poll() { - Ok(Async::Ready(Some(inner))) => { - trace!("Opened new outgoing substream to {}", self.addr); - let substream = ConnectionReuseSubstream { - connection_id: outbound.connection_id, - shared: self.shared.clone(), - inner, - addr: outbound.client_addr.clone(), - }; - return Ok(Async::Ready((substream, future::ok(outbound.client_addr)))); - }, - Ok(Async::NotReady) => { - self.outbound = Some(outbound); - return Ok(Async::NotReady); - }, - Ok(Async::Ready(None)) => { - // The muxer can no longer produce outgoing substreams. - // Let's reopen a connection. - trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); - should_kill_existing_muxer = true; - }, - Err(err) => { - // If we get an error while opening a substream, we decide to ignore it - // and open a new muxer. - // If opening the muxer produces an error, *then* we will return it. - debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); - should_kill_existing_muxer = true; - }, - } - } else { - should_kill_existing_muxer = false; - } - - // If we reach this point, that means we have to fill `self.outbound`. - // If `should_kill_existing_muxer`, do not use any existing connection but create a - // new one instead. - let mut shared = self.shared.lock(); - let shared = &mut *shared; // Avoids borrow errors - - // TODO: could be optimized - if should_kill_existing_muxer { - shared.connections.remove(&self.addr); - } - let connec = match shared.connections.entry(self.addr.clone()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // Build the connection. - let state = match shared.transport.clone().dial(self.addr.clone()) { - Ok(future) => { - trace!("Opened new connection to {:?}", self.addr); - let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); - let future = Box::new(future); - PeerState::Pending { future, notify: Default::default() } + let should_kill_existing_muxer = match self.outbound.take() { + Some(mut outbound) => { + match outbound.stream.poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("Opened new outgoing substream to {}", self.addr); + let shared = self.shared.clone(); + return Ok(Async::Ready((ConnectionReuseSubstream { + connection_id: outbound.connection_id, + inner, + shared, + addr: outbound.client_addr.clone(), + }, future::ok(outbound.client_addr)))); }, - Err(_) => { - trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); - let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); - PeerState::Errored(err) + Ok(Async::NotReady) => { + self.outbound = Some(outbound); + return Ok(Async::NotReady); + }, + Ok(Async::Ready(None)) => { + // The muxer can no longer produce outgoing substreams. + // Let's reopen a connection. + trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); + true + }, + Err(err) => { + // If we get an error while opening a substream, we decide to ignore it + // and open a new muxer. + // If opening the muxer produces an error, *then* we will return it. + debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); + true }, - }; - - for task in shared.notify_on_new_connec.drain() { - task.1.notify(); } - - e.insert(state) }, + _ => false }; - match mem::replace(&mut *connec, PeerState::Poisonned) { - PeerState::Active { muxer, next_incoming, connection_id, listener_id, mut num_substreams, client_addr } => { - let outbound = muxer.clone().outbound(); - num_substreams += 1; - *connec = PeerState::Active { muxer, next_incoming, connection_id, listener_id, num_substreams, client_addr: client_addr.clone() }; - trace!("Using existing connection to {} to open outbound substream", self.addr); - self.outbound = Some(ConnectionReuseDialOut { - stream: outbound, - connection_id, - client_addr, - }); - }, - PeerState::Pending { mut future, mut notify } => { - match future.poll() { - Ok(Async::Ready((muxer, client_addr))) => { - trace!("Successful new connection to {} ({})", self.addr, client_addr); - for task in notify { - task.1.notify(); - } - let next_incoming = muxer.clone().inbound(); - let first_outbound = muxer.clone().outbound(); - let connection_id = shared.next_connection_id; - shared.next_connection_id += 1; - *connec = PeerState::Active { muxer, next_incoming, connection_id, num_substreams: 1, listener_id: None, client_addr: client_addr.clone() }; + + let is_empty = { + let mut conns = self.shared.connections.lock(); + if should_kill_existing_muxer { + conns.remove(&self.addr); + true + + } else { + let item = conns.get(&self.addr); + + // error'ed, quit early + if let Some(PeerState::Errored(err)) = item { + trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); + let io_err = IoError::new(err.kind(), err.to_string()); + return Err(io_err) + }; + + item.is_none() + } + }; + + if is_empty { + // dial new and return early + let state = match self.shared.transport.clone().dial(self.addr.clone()) { + Ok(future) => { + trace!("Opened new connection to {:?}", self.addr); + let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); + let future = Box::new(future); + let connection_id = self.shared.gen_next_connection_id(); + PeerState::Pending { future, notify: Default::default(), connection_id } + }, + Err(_) => { + trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); + let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); + PeerState::Errored(err) + }, + }; + + self.shared.insert_connection(self.addr.clone(), state); + return Ok(Async::NotReady); + } + + let mut conn = self.shared.connections.lock(); + if let Some(state) = conn.get_mut(&self.addr) { + + let replace_with = { + match state { + PeerState::Active { muxer, connection_id, client_addr, ref mut num_substreams, .. } => { + trace!("Using existing connection to {} to open outbound substream", self.addr); self.outbound = Some(ConnectionReuseDialOut { - stream: first_outbound, - connection_id, - client_addr, + stream: muxer.clone().outbound(), + connection_id: connection_id.clone(), + client_addr: client_addr.clone(), }); + + *num_substreams += 1; + None }, - Ok(Async::NotReady) => { - notify.insert(TASK_ID.with(|&t| t), task::current()); - *connec = PeerState::Pending { future, notify }; - return Ok(Async::NotReady); - }, - Err(err) => { - trace!("Failed new connection to {}: {:?}", self.addr, err); - let io_err = IoError::new(err.kind(), err.to_string()); - *connec = PeerState::Errored(err); - return Err(io_err); + PeerState::Pending { ref mut future, ref mut notify, connection_id } => { + match future.poll() { + Ok(Async::Ready((muxer, client_addr))) => { + trace!("Successful new connection to {} ({})", self.addr, client_addr); + for task in notify { + task.1.notify(); + } + let first_outbound = muxer.clone().outbound(); + self.outbound = Some(ConnectionReuseDialOut { + stream: first_outbound, + connection_id: connection_id.clone(), + client_addr: client_addr.clone(), + }); + + Some(PeerState::Active { muxer, connection_id: connection_id.clone(), + num_substreams: 1, listener_id: None, + client_addr }) + }, + Ok(Async::NotReady) => { + notify.insert(TASK_ID.with(|&t| t), task::current()); + return Ok(Async::NotReady); + }, + Err(err) => { + trace!("Failed new connection to {}: {:?}", self.addr, err); + Some(PeerState::Errored(err)) + }, + } }, + _ => { + // Because of a race someone might has changed the Peerstate between + // this get_mut and the previous lock. But no harm for us, we can just + // loop on and will find it soon enough. + continue + } } - }, - PeerState::Errored(err) => { - trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); - let io_err = IoError::new(err.kind(), err.to_string()); - *connec = PeerState::Errored(err); - return Err(io_err); - }, - PeerState::Poisonned => { - panic!("Poisonned peer state"); - }, + }; + + if let Some(new_state) = replace_with { + mem::replace(&mut *state, new_state); + } } } } @@ -440,12 +581,12 @@ impl Drop for ConnectionReuseDial where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { fn drop(&mut self) { + let shared = &self.shared; if let Some(outbound) = self.outbound.take() { - let mut shared = self.shared.lock(); - remove_one_substream(&mut *shared, outbound.connection_id, &outbound.client_addr); + shared.remove_substream(outbound.connection_id, &outbound.client_addr); } } } @@ -455,17 +596,17 @@ pub struct ConnectionReuseListener where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { /// The main listener. listener: stream::Fuse, /// Identifier for this listener. Used to determine which connections were opened by it. - listener_id: u64, + listener_id: usize, /// Opened connections that need to be upgraded. current_upgrades: FuturesUnordered>>, /// Shared between the whole connection reuse mechanism. - shared: Arc>>, + shared: Arc>, } impl Stream for ConnectionReuseListener @@ -474,10 +615,12 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, + ::Substream : Clone, L: Stream, Lu: Future + 'static, { - type Item = FutureResult<(ConnectionReuseSubstream, FutureResult), IoError>; + type Item = FutureResult<(ConnectionReuseSubstream, + FutureResult), IoError>; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -504,20 +647,8 @@ where // Process the connections being upgraded. loop { - match self.current_upgrades.poll() { - Ok(Async::Ready(Some((muxer, client_addr)))) => { - // Successfully upgraded a new incoming connection. - trace!("New multiplexed connection from {}", client_addr); - let mut shared = self.shared.lock(); - let next_incoming = muxer.clone().inbound(); - let connection_id = shared.next_connection_id; - shared.next_connection_id += 1; - let state = PeerState::Active { muxer, next_incoming, connection_id, listener_id: Some(self.listener_id), num_substreams: 1, client_addr: client_addr.clone() }; - shared.connections.insert(client_addr, state); - for to_notify in shared.notify_on_new_connec.drain() { - to_notify.1.notify(); - } - } + let (muxer, client_addr) = match self.current_upgrades.poll() { + Ok(Async::Ready(Some((muxer, client_addr)))) => (muxer, client_addr), Ok(Async::Ready(None)) | Ok(Async::NotReady) => { break; }, @@ -526,12 +657,24 @@ where debug!("Error while upgrading listener connection: {:?}", err); return Ok(Async::Ready(Some(future::err(err)))); } - } + }; + + + // Successfully upgraded a new incoming connection. + trace!("New multiplexed connection from {}", client_addr); + let connection_id = self.shared.gen_next_connection_id(); + + + self.shared.insert_connection(client_addr.clone(), + PeerState::Active { muxer, connection_id, + listener_id: Some(self.listener_id), + num_substreams: 1, + client_addr: client_addr }); } // Poll all the incoming connections on all the connections we opened. - let mut shared = self.shared.lock(); - match poll_incoming(&self.shared, &mut shared, Some(self.listener_id)) { + match self.shared.poll_incoming(Some(self.listener_id)) { + Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => { if self.listener.is_done() && self.current_upgrades.is_empty() { Ok(Async::Ready(None)) @@ -539,14 +682,16 @@ where Ok(Async::NotReady) } }, - Ok(Async::Ready(Some(substream))) => { - Ok(Async::Ready(Some(substream))) - }, - Ok(Async::NotReady) => { - Ok(Async::NotReady) - } + Ok(Async::Ready(Some((inner, connection_id, addr)))) => + Ok(Async::Ready(Some(future::ok(( + ConnectionReuseSubstream { + inner: inner.clone(), + shared: self.shared.clone(), + connection_id: connection_id.clone(), + addr: addr.clone(), + }, future::ok(addr.clone())))))), Err(err) => { - Ok(Async::Ready(Some(future::err(err)))) + Ok(Async::Ready(Some(future::err(IoError::new(err.kind(), err.to_string()))))) } } } @@ -557,31 +702,38 @@ pub struct ConnectionReuseIncoming where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { // Shared between the whole connection reuse system. - shared: Arc>>, + shared: Arc>, } impl Future for ConnectionReuseIncoming where + T: Transport, + T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, + ::Substream : Clone, { - type Item = future::FutureResult<(ConnectionReuseSubstream, future::FutureResult), IoError>; + type Item = future::FutureResult<(ConnectionReuseSubstream, + future::FutureResult), IoError>; type Error = IoError; #[inline] fn poll(&mut self) -> Poll { - let mut shared = self.shared.lock(); - match poll_incoming(&self.shared, &mut shared, None) { - Ok(Async::Ready(Some(substream))) => { - Ok(Async::Ready(substream)) - }, + match self.shared.poll_incoming(None) { + Ok(Async::Ready(Some((inner, connection_id, addr)))) => + Ok(Async::Ready(future::ok(( + ConnectionReuseSubstream { + inner, + shared: self.shared.clone(), + connection_id, + addr: addr.clone(), + }, future::ok(addr))))), Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - // TODO: will add an element to the list every time - shared.notify_on_new_connec.insert(TASK_ID.with(|&v| v), task::current()); + self.shared.notify_on_new_connec.lock().insert(TASK_ID.with(|&v| v), task::current()); Ok(Async::NotReady) }, Err(err) => Err(err) @@ -589,123 +741,19 @@ where } } -/// Polls the incoming substreams on all the incoming connections that match the `listener`. -/// -/// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if -/// one or more connections are matching the `listener` but they are not ready. -fn poll_incoming(shared_arc: &Arc>>, shared: &mut Shared, listener: Option) - -> Poll, FutureResult), IoError>>, IoError> -where - T: Transport, - C: ConnectionUpgrade, - C::Output: StreamMuxer + Clone, -{ - // Keys of the elements in `shared.connections` to remove afterwards. - let mut to_remove = Vec::new(); - // Substream to return, if any found. - let mut ret_value = None; - let mut found_one = false; - - for (addr, state) in shared.connections.iter_mut() { - match *state { - PeerState::Active { ref mut next_incoming, ref muxer, ref mut num_substreams, connection_id, ref client_addr, listener_id } => { - if listener_id != listener { - continue; - } - found_one = true; - match next_incoming.poll() { - Ok(Async::Ready(Some(inner))) => { - trace!("New incoming substream from {}", client_addr); - let next = muxer.clone().inbound(); - *next_incoming = next; - *num_substreams += 1; - let substream = ConnectionReuseSubstream { - inner, - shared: shared_arc.clone(), - connection_id, - addr: client_addr.clone(), - }; - ret_value = Some(Ok((substream, future::ok(client_addr.clone())))); - break; - }, - Ok(Async::Ready(None)) => { - // The muxer isn't capable of opening any inbound stream anymore, so - // we close the connection entirely. - trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); - to_remove.push(addr.clone()); - }, - Ok(Async::NotReady) => (), - Err(err) => { - // If an error happens while opening an inbound stream, we close the - // connection entirely. - trace!("Error while opening inbound substream to {}: {:?}", addr, err); - to_remove.push(addr.clone()); - ret_value = Some(Err(err)); - break; - }, - } - }, - PeerState::Pending { ref mut notify, .. } => { - notify.insert(TASK_ID.with(|&t| t), task::current()); - }, - PeerState::Errored(_) => {}, - PeerState::Poisonned => { - panic!("Poisonned peer state"); - }, - } - } - - for to_remove in to_remove { - shared.connections.remove(&to_remove); - } - - match ret_value { - Some(Ok(val)) => Ok(Async::Ready(Some(future::ok(val)))), - Some(Err(err)) => Err(err), - None => { - if found_one { - Ok(Async::NotReady) - } else { - Ok(Async::Ready(None)) - } - }, - } -} - -/// Removes one substream from an active connection. Closes the connection if necessary. -fn remove_one_substream(shared: &mut Shared, connec_id: u64, addr: &Multiaddr) -where - T: Transport, - C: ConnectionUpgrade, - C::Output: StreamMuxer, -{ - shared.connections.retain(|_, connec| { - if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { - if *connection_id == connec_id { - *num_substreams -= 1; - if *num_substreams == 0 { - trace!("All substreams to {} closed ; closing main connection", addr); - return false; - } - } - } - - true - }); -} /// Wraps around the `Substream`. pub struct ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { inner: ::Substream, - shared: Arc>>, + shared: Arc>, /// Id this connection was created from. - connection_id: u64, + connection_id: usize, /// Address of the remote. addr: Multiaddr, } @@ -714,7 +762,7 @@ impl Deref for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { type Target = ::Substream; @@ -728,7 +776,7 @@ impl DerefMut for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { @@ -740,7 +788,7 @@ impl Read for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { @@ -752,7 +800,7 @@ impl AsyncRead for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { } @@ -760,7 +808,7 @@ impl Write for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { #[inline] fn write(&mut self, buf: &[u8]) -> Result { @@ -777,7 +825,7 @@ impl AsyncWrite for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { @@ -789,10 +837,9 @@ impl Drop for ConnectionReuseSubstream where T: Transport, C: ConnectionUpgrade, - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { fn drop(&mut self) { - let mut shared = self.shared.lock(); - remove_one_substream(&mut *shared, self.connection_id, &self.addr); + self.shared.remove_substream(self.connection_id, &self.addr); } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 8c21897bfb6..dc3f7349f3f 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -50,14 +50,14 @@ impl<'a, T, C> UpgradedNode where T: Transport + 'a, T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + 'a, + C: ConnectionUpgrade + 'a + Clone, { /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. #[inline] pub fn into_connection_reuse(self) -> ConnectionReuse where - C::Output: StreamMuxer, + C::Output: StreamMuxer + Clone, { From::from(self) } From aec3aadc01cb361500b01a59eae436cb05756aa4 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Wed, 1 Aug 2018 15:50:45 +0200 Subject: [PATCH 8/8] Fix code style, minor changes for clarity --- core/src/connection_reuse.rs | 375 +++++++++++++++++++++-------------- 1 file changed, 227 insertions(+), 148 deletions(-) diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index dcf911ae1cf..b0fb41c8a87 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -41,15 +41,15 @@ use fnv::FnvHashMap; use futures::future::{self, FutureResult}; -use futures::{Async, Future, Poll, Stream, stream, task}; use futures::stream::FuturesUnordered; +use futures::{stream, task, Async, Future, Poll, Stream}; use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; use std::mem; use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; use upgrade::ConnectionUpgrade; @@ -71,7 +71,10 @@ where shared: Arc>, } -enum PeerState where M: StreamMuxer + Clone { +enum PeerState +where + M: StreamMuxer + Clone, +{ /// Connection is active and can be used to open substreams. Active { /// The muxer to open new substreams. @@ -99,10 +102,9 @@ enum PeerState where M: StreamMuxer + Clone { }, /// An earlier connection attempt errored. - Errored(IoError) + Errored(IoError), } - /// Struct shared between most of the `ConnectionReuse` infrastructure. // #[derive(Clone)] struct Shared @@ -140,7 +142,7 @@ where let mut notifiers = self.notify_on_new_connec.lock(); for to_notify in notifiers.drain() { to_notify.1.notify(); - }; + } } /// generate a new connection id @@ -156,27 +158,32 @@ where } /// Insert a new connection, returns Some if an entry was present, notify listeners - pub fn insert_connection(&self, addr: Multiaddr, state: PeerState) - -> Option> { - let r = self.replace_connection(addr, state); + pub fn insert_connection( + &self, + addr: Multiaddr, + state: PeerState, + ) -> Option> { + let r = self.connections.lock().insert(addr, state); self.new_con_notify(); r } - /// Replace a new connection, returns Some if an entry was present, **do not** notify listeners - pub fn replace_connection(&self, addr: Multiaddr, state: PeerState) - -> Option> { - self.connections.lock().insert(addr, state) - } - /// Removes one substream from an active connection. Closes the connection if necessary. pub fn remove_substream(&self, connec_id: usize, addr: &Multiaddr) { self.connections.lock().retain(|_, connec| { - if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { + if let PeerState::Active { + connection_id, + ref mut num_substreams, + .. + } = connec + { if *connection_id == connec_id { *num_substreams -= 1; if *num_substreams == 0 { - trace!("All substreams to {} closed ; closing main connection", addr); + trace!( + "All substreams to {} closed ; closing main connection", + addr + ); return false; } } @@ -189,9 +196,10 @@ where /// /// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if /// one or more connections are matching the `listener` but they are not ready. - fn poll_incoming(&self, listener: Option) - -> Poll::Substream, usize, Multiaddr)>, IoError> - { + fn poll_incoming( + &self, + listener: Option, + ) -> Poll::Substream, usize, Multiaddr)>, IoError> { // Keys of the elements in `connections` to remove afterwards. let mut to_remove = Vec::new(); // Substream to return, if any found. @@ -199,42 +207,54 @@ where let mut found_one = false; for (addr, state) in self.connections.lock().iter_mut() { - let res = if let PeerState::Active { ref mut muxer, ref mut num_substreams, - connection_id, client_addr, listener_id } = state { - if *listener_id == listener { - continue; - } - found_one = true; + let res = { + if let PeerState::Active { + ref mut muxer, + ref mut num_substreams, + connection_id, + client_addr, + listener_id, + } = state + { + if *listener_id == listener { + continue; + } + found_one = true; - match muxer.clone().inbound().poll() { - Ok(Async::Ready(Some(inner))) => { - trace!("New incoming substream from {}", client_addr); - *num_substreams += 1; - Ok((connection_id.clone(), inner, client_addr.clone())) - }, - Err(err) => { - // If an error happens while opening an inbound stream, we close the - // connection entirely. - trace!("Error while opening inbound substream to {}: {:?}", addr, err); - to_remove.push(addr.clone()); - Err(err) - }, - Ok(Async::Ready(None)) => { - // The muxer isn't capable of opening any inbound stream anymore, so - // we close the connection entirely. - trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); - to_remove.push(addr.clone()); - continue - }, - Ok(Async::NotReady) => { continue } - } - } else { - if listener.is_none() { - if let PeerState::Pending { ref mut notify, .. } = state { - notify.insert(TASK_ID.with(|&t| t), task::current()); + match muxer.clone().inbound().poll() { + Ok(Async::Ready(Some(inner))) => { + trace!("New incoming substream from {}", client_addr); + *num_substreams += 1; + Ok((inner, connection_id.clone(), client_addr.clone())) + } + Ok(Async::Ready(None)) => { + // The muxer isn't capable of opening any inbound stream anymore, so + // we close the connection entirely. + trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); + to_remove.push(addr.clone()); + continue; + } + Ok(Async::NotReady) => continue, + Err(err) => { + // If an error happens while opening an inbound stream, we close the + // connection entirely. + trace!( + "Error while opening inbound substream to {}: {:?}", + addr, + err + ); + to_remove.push(addr.clone()); + Err(err) + } + } + } else { + if listener.is_none() { + if let PeerState::Pending { ref mut notify, .. } = state { + notify.insert(TASK_ID.with(|&t| t), task::current()); + } } + continue; } - continue }; ret_value = Some(res); @@ -246,8 +266,7 @@ where } match ret_value { - Some(Ok((connection_id, inner, addr))) => - Ok(Async::Ready(Some((inner, connection_id, addr)))), + Some(Ok(val)) => Ok(Async::Ready(Some(val))), Some(Err(err)) => Err(err), None => { if found_one { @@ -260,7 +279,6 @@ where } } - impl From> for ConnectionReuse where T: Transport, @@ -287,7 +305,7 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, - ::Substream : Clone, + ::Substream: Clone, C::MultiaddrFuture: Future, C::NamesIter: Clone, UpgradedNode: Clone, @@ -299,7 +317,6 @@ where type Dial = ConnectionReuseDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (listener, new_addr) = match self.shared.transport.clone().listen_on(addr.clone()) { Ok((l, a)) => (l, a), Err((_, addr)) => { @@ -335,14 +352,17 @@ where #[inline] fn dial(self, addr: Multiaddr) -> Result { - // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise // the returned `Future` will immediately produce the error. let must_clear = match self.shared.connections.lock().get(&addr) { Some(&PeerState::Errored(ref err)) => { - trace!("Clearing existing connection to {} which errored earlier: {:?}", addr, err); + trace!( + "Clearing existing connection to {} which errored earlier: {:?}", + addr, + err + ); true - }, + } _ => false, }; if must_clear { @@ -358,7 +378,10 @@ where #[inline] fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.shared.transport.transport().nat_traversal(server, observed) + self.shared + .transport + .transport() + .nat_traversal(server, observed) } } @@ -368,14 +391,14 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :( C::Output: StreamMuxer + Clone, - ::Substream : Clone, + ::Substream: Clone, C::MultiaddrFuture: Future, C::NamesIter: Clone, UpgradedNode: Clone, { type Incoming = ConnectionReuseIncoming; type IncomingUpgrade = - future::FutureResult<(ConnectionReuseSubstream, Self::MultiaddrFuture), IoError>; + future::FutureResult<(ConnectionReuseSubstream, Self::MultiaddrFuture), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -393,7 +416,7 @@ task_local!{ /// Implementation of `Future` for dialing a node. pub struct ConnectionReuseDial -where +where T: Transport, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, @@ -411,7 +434,7 @@ where } struct ConnectionReuseDialOut -where +where T: Transport, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, @@ -425,16 +448,19 @@ where } impl Future for ConnectionReuseDial -where +where T: Transport, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone + 'static, - ::Substream : Clone, + ::Substream: Clone, UpgradedNode: Transport + Clone, as Transport>::Dial: 'static, as Transport>::MultiaddrFuture: 'static, { - type Item = (ConnectionReuseSubstream, FutureResult); + type Item = ( + ConnectionReuseSubstream, + FutureResult, + ); type Error = IoError; fn poll(&mut self) -> Poll { @@ -445,50 +471,58 @@ where Ok(Async::Ready(Some(inner))) => { trace!("Opened new outgoing substream to {}", self.addr); let shared = self.shared.clone(); - return Ok(Async::Ready((ConnectionReuseSubstream { - connection_id: outbound.connection_id, - inner, - shared, - addr: outbound.client_addr.clone(), - }, future::ok(outbound.client_addr)))); - }, + return Ok(Async::Ready(( + ConnectionReuseSubstream { + connection_id: outbound.connection_id, + inner, + shared, + addr: outbound.client_addr.clone(), + }, + future::ok(outbound.client_addr), + ))); + } Ok(Async::NotReady) => { self.outbound = Some(outbound); return Ok(Async::NotReady); - }, + } Ok(Async::Ready(None)) => { // The muxer can no longer produce outgoing substreams. // Let's reopen a connection. trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); true - }, + } Err(err) => { // If we get an error while opening a substream, we decide to ignore it // and open a new muxer. // If opening the muxer produces an error, *then* we will return it. - debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); + debug!( + "Error while opening outgoing substream to {}: {:?}", + self.addr, err + ); true - }, + } } - }, - _ => false + } + _ => false, }; - let is_empty = { let mut conns = self.shared.connections.lock(); if should_kill_existing_muxer { conns.remove(&self.addr); true - } else { let item = conns.get(&self.addr); // error'ed, quit early if let Some(PeerState::Errored(err)) = item { - trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); + trace!( + "Existing new connection to {} errored earlier: {:?}", + self.addr, + err + ); let io_err = IoError::new(err.kind(), err.to_string()); - return Err(io_err) + return Err(io_err); }; item.is_none() @@ -503,13 +537,21 @@ where let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); let future = Box::new(future); let connection_id = self.shared.gen_next_connection_id(); - PeerState::Pending { future, notify: Default::default(), connection_id } - }, + PeerState::Pending { + future, + notify: Default::default(), + connection_id, + } + } Err(_) => { - trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); - let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); + trace!( + "Failed to open connection to {:?}, multiaddr not supported", + self.addr + ); + let err = + IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); PeerState::Errored(err) - }, + } }; self.shared.insert_connection(self.addr.clone(), state); @@ -518,24 +560,41 @@ where let mut conn = self.shared.connections.lock(); if let Some(state) = conn.get_mut(&self.addr) { - - let replace_with = { + let replace_with = { match state { - PeerState::Active { muxer, connection_id, client_addr, ref mut num_substreams, .. } => { - trace!("Using existing connection to {} to open outbound substream", self.addr); + PeerState::Active { + muxer, + connection_id, + client_addr, + ref mut num_substreams, + .. + } => { + trace!( + "Using existing connection to {} to open outbound substream", + self.addr + ); self.outbound = Some(ConnectionReuseDialOut { stream: muxer.clone().outbound(), connection_id: connection_id.clone(), client_addr: client_addr.clone(), }); + // mutating the param in place, nothing to be done *num_substreams += 1; None - }, - PeerState::Pending { ref mut future, ref mut notify, connection_id } => { + } + PeerState::Pending { + ref mut future, + ref mut notify, + connection_id, + } => { match future.poll() { Ok(Async::Ready((muxer, client_addr))) => { - trace!("Successful new connection to {} ({})", self.addr, client_addr); + trace!( + "Successful new connection to {} ({})", + self.addr, + client_addr + ); for task in notify { task.1.notify(); } @@ -546,25 +605,30 @@ where client_addr: client_addr.clone(), }); - Some(PeerState::Active { muxer, connection_id: connection_id.clone(), - num_substreams: 1, listener_id: None, - client_addr }) - }, + // our connection was upgraded, replace it. + Some(PeerState::Active { + muxer, + connection_id: connection_id.clone(), + num_substreams: 1, + listener_id: None, + client_addr, + }) + } Ok(Async::NotReady) => { notify.insert(TASK_ID.with(|&t| t), task::current()); return Ok(Async::NotReady); - }, + } Err(err) => { trace!("Failed new connection to {}: {:?}", self.addr, err); Some(PeerState::Errored(err)) - }, + } } - }, + } _ => { // Because of a race someone might has changed the Peerstate between // this get_mut and the previous lock. But no harm for us, we can just // loop on and will find it soon enough. - continue + continue; } } }; @@ -578,15 +642,14 @@ where } impl Drop for ConnectionReuseDial -where +where T: Transport, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, { fn drop(&mut self) { - let shared = &self.shared; if let Some(outbound) = self.outbound.take() { - shared.remove_substream(outbound.connection_id, &outbound.client_addr); + self.shared.remove_substream(outbound.connection_id, &outbound.client_addr); } } } @@ -615,12 +678,17 @@ where T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, - ::Substream : Clone, + ::Substream: Clone, L: Stream, Lu: Future + 'static, { - type Item = FutureResult<(ConnectionReuseSubstream, - FutureResult), IoError>; + type Item = FutureResult< + ( + ConnectionReuseSubstream, + FutureResult, + ), + IoError, + >; type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { @@ -649,9 +717,7 @@ where loop { let (muxer, client_addr) = match self.current_upgrades.poll() { Ok(Async::Ready(Some((muxer, client_addr)))) => (muxer, client_addr), - Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - break; - }, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => break, Err(err) => { // Insert the rest of the pending upgrades, but not the current one. debug!("Error while upgrading listener connection: {:?}", err); @@ -659,17 +725,20 @@ where } }; - // Successfully upgraded a new incoming connection. trace!("New multiplexed connection from {}", client_addr); let connection_id = self.shared.gen_next_connection_id(); - - self.shared.insert_connection(client_addr.clone(), - PeerState::Active { muxer, connection_id, - listener_id: Some(self.listener_id), - num_substreams: 1, - client_addr: client_addr }); + self.shared.insert_connection( + client_addr.clone(), + PeerState::Active { + muxer, + connection_id, + listener_id: Some(self.listener_id), + num_substreams: 1, + client_addr: client_addr, + }, + ); } // Poll all the incoming connections on all the connections we opened. @@ -681,18 +750,22 @@ where } else { Ok(Async::NotReady) } - }, - Ok(Async::Ready(Some((inner, connection_id, addr)))) => + } + Ok(Async::Ready(Some((inner, connection_id, addr)))) => { Ok(Async::Ready(Some(future::ok(( ConnectionReuseSubstream { - inner: inner.clone(), - shared: self.shared.clone(), - connection_id: connection_id.clone(), - addr: addr.clone(), - }, future::ok(addr.clone())))))), - Err(err) => { - Ok(Async::Ready(Some(future::err(IoError::new(err.kind(), err.to_string()))))) + inner: inner.clone(), + shared: self.shared.clone(), + connection_id: connection_id.clone(), + addr: addr.clone(), + }, + future::ok(addr.clone()), + ))))) } + Err(err) => Ok(Async::Ready(Some(future::err(IoError::new( + err.kind(), + err.to_string(), + ))))), } } } @@ -710,39 +783,45 @@ where impl Future for ConnectionReuseIncoming where - T: Transport, T::Output: AsyncRead + AsyncWrite, C: ConnectionUpgrade, C::Output: StreamMuxer + Clone, - ::Substream : Clone, + ::Substream: Clone, { - type Item = future::FutureResult<(ConnectionReuseSubstream, - future::FutureResult), IoError>; + type Item = future::FutureResult< + ( + ConnectionReuseSubstream, + future::FutureResult, + ), + IoError, + >; type Error = IoError; #[inline] fn poll(&mut self) -> Poll { match self.shared.poll_incoming(None) { - Ok(Async::Ready(Some((inner, connection_id, addr)))) => - Ok(Async::Ready(future::ok(( - ConnectionReuseSubstream { - inner, - shared: self.shared.clone(), - connection_id, - addr: addr.clone(), - }, future::ok(addr))))), + Ok(Async::Ready(Some((inner, connection_id, addr)))) => Ok(Async::Ready(future::ok(( + ConnectionReuseSubstream { + inner, + shared: self.shared.clone(), + connection_id, + addr: addr.clone(), + }, + future::ok(addr), + )))), Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - self.shared.notify_on_new_connec.lock().insert(TASK_ID.with(|&v| v), task::current()); + self.shared + .notify_on_new_connec + .lock() + .insert(TASK_ID.with(|&v| v), task::current()); Ok(Async::NotReady) - }, - Err(err) => Err(err) + } + Err(err) => Err(err), } } } - - /// Wraps around the `Substream`. pub struct ConnectionReuseSubstream where