Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
842 changes: 607 additions & 235 deletions core/src/connection_reuse.rs

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades
//! together in a complex chain of protocols negotiation.

use connection_reuse::ConnectionReuse;
use futures::prelude::*;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
Expand Down Expand Up @@ -116,7 +118,7 @@ pub trait Transport {
/// implementation of `Transport` is only responsible for handling the protocols it supports.
///
/// Returns `None` if nothing can be determined. This happens if this trait implementation
/// doesn't recognize the protocols, or if `server` and `observed` are unrelated.
/// doesn't recognize the protocols, or if `server` and `observed` are related.
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;

/// Applies a function on the output of the `Transport`.
Expand Down Expand Up @@ -207,6 +209,17 @@ pub trait Transport {
DummyMuxing::new(self)
}

/// Turns this `Transport` into a `ConnectionReuse`. If the `Output` implements the
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
#[inline]
fn into_connection_reuse<D, M>(self) -> ConnectionReuse<Self, D, M>
where
Self: Sized + Transport<Output = (D, M)>,
M: StreamMuxer,
{
ConnectionReuse::new(self)
}

/// Wraps around the `Transport` and makes it interruptible.
#[inline]
fn interruptible(self) -> (interruptible::Interruptible<Self>, interruptible::Interrupt)
Expand Down
12 changes: 0 additions & 12 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use connection_reuse::ConnectionReuse;
use futures::prelude::*;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use transport::{MuxedTransport, Transport};
Expand Down Expand Up @@ -52,16 +50,6 @@ where
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'a,
{
/// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
#[inline]
pub fn into_connection_reuse(self) -> ConnectionReuse<T, C>
where
C::Output: StreamMuxer,
{
From::from(self)
}

/// Returns a reference to the inner `Transport`.
#[inline]
pub fn transport(&self) -> &T {
Expand Down
12 changes: 10 additions & 2 deletions core/tests/multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ fn client_to_server_outbound() {
let bg_thread = thread::spawn(move || {
let future = rx
.with_upgrade(multiplex::MplexConfig::new())
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val)
.listen_on("/memory".parse().unwrap())
.unwrap_or_else(|_| panic!()).0
.into_future()
Expand Down Expand Up @@ -124,7 +126,9 @@ fn connection_reused_for_dialing() {
let bg_thread = thread::spawn(move || {
let future = OnlyOnce::from(rx)
.with_upgrade(multiplex::MplexConfig::new())
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val)
.listen_on("/memory".parse().unwrap())
.unwrap_or_else(|_| panic!()).0
.into_future()
Expand Down Expand Up @@ -160,7 +164,9 @@ fn connection_reused_for_dialing() {

let transport = OnlyOnce::from(tx)
.with_upgrade(multiplex::MplexConfig::new())
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

let future = transport
.clone()
Expand Down Expand Up @@ -229,7 +235,9 @@ fn use_opened_listen_to_dial() {

let transport = OnlyOnce::from(tx)
.with_upgrade(multiplex::MplexConfig::new())
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

let future = transport
.clone()
Expand Down
4 changes: 3 additions & 1 deletion examples/echo-dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

// Building a struct that represents the protocol that we are going to use for dialing.
let proto = SimpleProtocol::new("/echo/1.0.0", |socket| {
Expand Down
4 changes: 3 additions & 1 deletion examples/echo-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

// We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply secio and multiplex on top
Expand Down
4 changes: 3 additions & 1 deletion examples/floodsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

// We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply secio and multiplex on top
Expand Down
6 changes: 5 additions & 1 deletion examples/kademlia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

#![type_length_limit = "2097152"]

extern crate bigint;
extern crate bytes;
extern crate env_logger;
Expand Down Expand Up @@ -84,7 +86,9 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

let addr_resolver = {
let peer_store = peer_store.clone();
Expand Down
4 changes: 3 additions & 1 deletion examples/ping-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ fn main() {
// `Transport` because the output of the upgrade is not a stream but a controller for
// muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into
// a `Transport`.
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming
// connections for us. The second parameter we pass is the connection upgrade that is accepted
Expand Down
8 changes: 6 additions & 2 deletions examples/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box<Error>> {
let transport = {
let tcp = TcpConfig::new()
.with_upgrade(libp2p_yamux::Config::default())
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);
RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing()
};

Expand Down Expand Up @@ -161,7 +163,9 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box<Error>> {

let transport = TcpConfig::new()
.with_upgrade(libp2p_yamux::Config::default())
.into_connection_reuse();
.map(|val, _| ((), val))
.into_connection_reuse()
.map(|((), val), _| val);

let relay = RelayConfig::new(opts.me, transport.clone(), store);

Expand Down