Skip to content

Commit

Permalink
use tokio_udt on linux only
Browse files Browse the repository at this point in the history
  • Loading branch information
amatissart committed Aug 8, 2022
1 parent 58c5fcd commit 07daaaf
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ socket2 = { version = "0.4.4" }
nix = { version = "0.24.2" }
num_cpus = { version = "1.13" }

[target.'cfg(target_os="linux")'.dependencies]
#tokio-udt = { git = "https://github.com/amatissart/tokio-udt/", rev="f9fdae" }
tokio-udt = "0.1.0-alpha.5"
2 changes: 1 addition & 1 deletion src/link/rendezvous/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
crypto::{Identity, KeyCard},
link::rendezvous::{ClientSettings, Request, Response, ShardId},
net::traits::{ConnectSettings, Connect},
net::traits::{Connect, ConnectSettings},
};

use doomstack::{here, Doom, ResultExt, Top};
Expand Down
17 changes: 10 additions & 7 deletions src/link/rendezvous/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use tokio::sync::{
mpsc::{Receiver, Sender},
};

use tokio_udt::UdtListener;

type Outlet = Receiver<(Identity, SecureConnection)>;

pub(crate) enum RawListener {
Tcp(TcpListener),
Udt(UdtListener),
#[cfg(target_os = "linux")]
Udt(tokio_udt::UdtListener),
}

pub struct Listener {
Expand Down Expand Up @@ -57,11 +56,14 @@ impl Listener {
let port = listener.local_addr().unwrap().port();
(RawListener::Tcp(listener), port)
}
#[cfg(target_os = "linux")]
TransportProtocol::UDT(ref config) => {
let listener =
UdtListener::bind((Ipv4Addr::UNSPECIFIED, 0).into(), Some(config.clone()))
.await
.unwrap();
let listener = tokio_udt::UdtListener::bind(
(Ipv4Addr::UNSPECIFIED, 0).into(),
Some(config.clone()),
)
.await
.unwrap();
let port = listener.local_addr().unwrap().port();
(RawListener::Udt(listener), port)
}
Expand Down Expand Up @@ -101,6 +103,7 @@ impl Listener {
Ok((stream.into(), addr))
})
}
#[cfg(target_os = "linux")]
RawListener::Udt(ref udt_listener) => udt_listener
.accept()
.await
Expand Down
5 changes: 3 additions & 2 deletions src/link/rendezvous/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{
};

use tokio::{io, net::TcpListener};
use tokio_udt::UdtListener;

pub struct Server {
_fuse: Fuse,
Expand Down Expand Up @@ -64,8 +63,9 @@ impl Server {
let listener = {
let result = match settings.connect.transport {
TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::Tcp),
#[cfg(target_os = "linux")]
TransportProtocol::UDT(ref config) => {
UdtListener::bind(address, Some(config.clone()))
tokio_udt::UdtListener::bind(address, Some(config.clone()))
.await
.map(RawListener::Udt)
}
Expand Down Expand Up @@ -96,6 +96,7 @@ impl Server {
.accept()
.await
.map(|(stream, address)| (stream.into(), address)),
#[cfg(target_os = "linux")]
RawListener::Udt(ref udt_listener) => udt_listener
.accept()
.await
Expand Down
2 changes: 1 addition & 1 deletion src/link/test/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod context {
},
net::{
test::{System as NetSystem, TestConnector},
traits::{ConnectSettings, Connect},
traits::{Connect, ConnectSettings},
Connector, Listener, PlainConnection,
},
time::test::join,
Expand Down
4 changes: 3 additions & 1 deletion src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ mod session_connector;
mod session_control;
mod session_listener;
mod socket;
mod udt;
mod unit_receiver;
mod unit_sender;

pub mod sockets;
pub mod traits;

#[cfg(target_os = "linux")]
mod udt;

#[cfg(any(test, feature = "test_utilities"))]
pub mod test;

Expand Down
2 changes: 1 addition & 1 deletion src/net/test/test_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use crate::{
crypto::{Identity, KeyChain},
net::{
traits::{ConnectSettings, Connect},
traits::{Connect, ConnectSettings},
Connector, SecureConnection,
},
};
Expand Down
18 changes: 8 additions & 10 deletions src/net/traits/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@ use crate::net::PlainConnection;
use std::io::Result;

use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_udt::{UdtConfiguration, UdtConnection};

/* TODO:
* Define generic Connect
* Retry handshake / RDV queue in UDT
*/

#[derive(Clone, Debug)]
pub enum TransportProtocol {
TCP,
UDT(UdtConfiguration),
#[cfg(target_os = "linux")]
UDT(tokio_udt::UdtConfiguration),
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -47,9 +42,12 @@ where
stream.set_nodelay(true)?;
Ok(stream.into())
}),
TransportProtocol::UDT(config) => UdtConnection::connect(&self, Some(config.clone()))
.await
.map(Into::into),
#[cfg(target_os = "linux")]
TransportProtocol::UDT(config) => {
tokio_udt::UdtConnection::connect(&self, Some(config.clone()))
.await
.map(Into::into)
}
}
}
}
2 changes: 1 addition & 1 deletion src/net/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod connect;

pub use connect::{ConnectSettings, Connect, TransportProtocol};
pub use connect::{Connect, ConnectSettings, TransportProtocol};

0 comments on commit 07daaaf

Please sign in to comment.