Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support UDT transport protocol #3

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ futures = { version = "0.3" }
flume = "0.10.14"

socket2 = { version = "0.4.4" }
tokio-udt = { version = "0.1.0-alpha.7" }
nix = { version = "0.24.2" }
27 changes: 19 additions & 8 deletions src/link/rendezvous/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::{
crypto::{Identity, KeyCard},
link::rendezvous::{ClientSettings, Request, Response, ShardId},
net::traits::TcpConnect,
net::traits::{Connect, ConnectSettings},
};
use doomstack::{here, Doom, ResultExt, Top};
use std::{io, net::SocketAddr, vec::Vec};

pub struct Client {
server: Box<dyn TcpConnect>,
server: Box<dyn Connect>,
settings: ClientSettings,
}

Expand Down Expand Up @@ -39,7 +39,7 @@ enum AttemptError {
impl Client {
pub fn new<S>(server: S, settings: ClientSettings) -> Self
where
S: 'static + TcpConnect,
S: 'static + Connect,
{
Client {
server: Box::new(server),
Expand Down Expand Up @@ -118,7 +118,7 @@ impl Client {
async fn attempt(&self, request: &Request) -> Result<Response, Top<AttemptError>> {
let mut connection = self
.server
.connect()
.connect(&self.settings.connect)
.await
.map_err(AttemptError::connect_failed)
.map_err(Doom::into_top)
Expand All @@ -136,6 +136,10 @@ impl Client {

Ok(response)
}

pub(crate) fn connect_settings(&self) -> &ConnectSettings {
&self.settings.connect
}
}

#[cfg(test)]
Expand All @@ -146,12 +150,19 @@ mod tests {
link::rendezvous::{Server, ServerSettings},
};
use std::time::Duration;
use tokio::time;
use tokio::{net::lookup_host, time};

async fn setup_server(address: &'static str, shard_sizes: Vec<usize>) -> Server {
Server::new(address, ServerSettings { shard_sizes })
.await
.unwrap()
let addr = lookup_host(address).await.unwrap().next().unwrap();
Server::new(
addr,
ServerSettings {
shard_sizes,
..Default::default()
},
)
.await
.unwrap()
}

async fn setup_clients(
Expand Down
7 changes: 6 additions & 1 deletion src/link/rendezvous/client_settings.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::time::{sleep_schedules::CappedExponential, SleepSchedule};
use crate::{
net::traits::ConnectSettings,
time::{sleep_schedules::CappedExponential, SleepSchedule},
};
use std::{sync::Arc, time::Duration};

#[derive(Debug, Clone)]
pub struct ClientSettings {
pub sleep_schedule: Arc<dyn SleepSchedule>,
pub connect: ConnectSettings,
}

impl Default for ClientSettings {
Expand All @@ -14,6 +18,7 @@ impl Default for ClientSettings {
2.,
Duration::from_secs(300),
)),
connect: ConnectSettings::default(),
}
}
}
13 changes: 9 additions & 4 deletions src/link/rendezvous/connector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
crypto::{Identity, KeyChain},
link::rendezvous::{Client, ConnectorSettings},
net::{traits::TcpConnect, Connector as NetConnector, SecureConnection},
net::{traits::Connect, Connector as NetConnector, SecureConnection},
};
use async_trait::async_trait;
use doomstack::{here, Doom, ResultExt, Stack, Top};
Expand Down Expand Up @@ -36,7 +36,7 @@ pub enum ConnectorError {
impl Connector {
pub fn new<S>(server: S, keychain: KeyChain, settings: ConnectorSettings) -> Self
where
S: 'static + TcpConnect,
S: 'static + Connect,
{
let client = Client::new(server, settings.client_settings);

Expand All @@ -58,7 +58,7 @@ impl Connector {
.spot(here!())?;

let mut connection = address
.connect()
.connect(self.client.connect_settings())
.await
.map_err(ConnectorError::connect_failed)
.map_err(Doom::into_top)
Expand Down Expand Up @@ -135,7 +135,12 @@ mod tests {
const SERVER: &str = "127.0.0.1:1250";
const MESSAGE: &str = "Hello Alice, this is Bob!";

let _server = Server::new(SERVER, Default::default()).await.unwrap();
let server_addr = tokio::net::lookup_host(SERVER)
.await
.unwrap()
.next()
.unwrap();
let _server = Server::new(server_addr, Default::default()).await.unwrap();

let alice_keychain = KeyChain::random();
let bob_keychain = KeyChain::random();
Expand Down
60 changes: 45 additions & 15 deletions src/link/rendezvous/listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{
crypto::{Identity, KeyChain},
link::rendezvous::{Client, ListenerSettings},
net::{traits::TcpConnect, Listener as NetListener, PlainConnection, SecureConnection},
net::{
traits::{Connect, TransportProtocol},
Listener as NetListener, PlainConnection, SecureConnection,
},
sync::fuse::Fuse,
};
use async_trait::async_trait;
Expand All @@ -15,8 +18,15 @@ use tokio::{
},
};

use tokio_udt::UdtListener;

type Outlet = Receiver<(Identity, SecureConnection)>;

pub(crate) enum RawListener {
Tcp(TcpListener),
Udt(UdtListener),
}

pub struct Listener {
outlet: Outlet,
_fuse: Fuse,
Expand All @@ -33,16 +43,29 @@ enum ServeError {
impl Listener {
pub async fn new<S>(server: S, keychain: KeyChain, settings: ListenerSettings) -> Self
where
S: 'static + TcpConnect,
S: 'static + Connect,
{
let listener = TcpListener::bind(
(Ipv4Addr::UNSPECIFIED, 0), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?)
)
.await
.unwrap();
let (listener, port) = match settings.client_settings.connect.transport {
TransportProtocol::TCP => {
let listener = TcpListener::bind(
(Ipv4Addr::UNSPECIFIED, 0), // TODO: Determine if `Ipv6Addr` can be used instead (problems with Docker?)
)
.await
.unwrap();
let port = listener.local_addr().unwrap().port();
(RawListener::Tcp(listener), port)
}
TransportProtocol::UDT(ref config) => {
let listener =
UdtListener::bind((Ipv4Addr::UNSPECIFIED, 0).into(), Some(config.clone()))
.await
.unwrap();
let port = listener.local_addr().unwrap().port();
(RawListener::Udt(listener), port)
}
};

let identity = keychain.keycard().identity();
let port = listener.local_addr().unwrap().port();

let fuse = Fuse::new();

Expand All @@ -63,18 +86,25 @@ impl Listener {

async fn listen(
keychain: KeyChain,
listener: TcpListener,
listener: RawListener,
inlet: Sender<(Identity, SecureConnection)>,
) {
let fuse = Fuse::new();

loop {
if let Ok((stream, _)) = listener.accept().await.and_then(|(stream, addr)| {
stream.set_nodelay(true)?;
Ok((stream, addr))
}) {
let connection = stream.into();

let accept_result = match listener {
RawListener::Tcp(ref tcp_listener) => {
tcp_listener.accept().await.and_then(|(stream, addr)| {
stream.set_nodelay(true)?;
Ok((stream.into(), addr))
})
}
RawListener::Udt(ref udt_listener) => udt_listener
.accept()
.await
.map(|(addr, udt_connection)| (udt_connection.into(), addr)),
};
if let Ok((connection, _)) = accept_result {
let keychain = keychain.clone();
let inlet = inlet.clone();

Expand Down
57 changes: 38 additions & 19 deletions src/link/rendezvous/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
crypto::{Identity, KeyCard},
link::rendezvous::{Request, Response, ServerSettings, ShardId},
net::PlainConnection,
link::rendezvous::{listener::RawListener, Request, Response, ServerSettings, ShardId},
net::{traits::TransportProtocol, PlainConnection},
sync::fuse::Fuse,
};
use doomstack::{here, Doom, ResultExt, Top};
Expand All @@ -11,10 +11,8 @@ use std::{
net::SocketAddr,
sync::Arc,
};
use tokio::{
io,
net::{TcpListener, ToSocketAddrs},
};
use tokio::{io, net::TcpListener};
use tokio_udt::UdtListener;

pub struct Server {
_fuse: Fuse,
Expand All @@ -41,10 +39,10 @@ struct Database {
}

impl Server {
pub async fn new<A>(address: A, settings: ServerSettings) -> Result<Self, Top<ServerError>>
where
A: ToSocketAddrs,
{
pub async fn new(
address: SocketAddr,
settings: ServerSettings,
) -> Result<Self, Top<ServerError>> {
let database = Arc::new(Mutex::new(Database {
shards: settings
.shard_sizes
Expand All @@ -58,11 +56,20 @@ impl Server {

let fuse = Fuse::new();

let listener = TcpListener::bind(address)
.await
.map_err(ServerError::initialize_failed)
.map_err(Doom::into_top)
.spot(here!())?;
let listener = {
let result = match settings.connect.transport {
TransportProtocol::TCP => TcpListener::bind(address).await.map(RawListener::Tcp),
TransportProtocol::UDT(ref config) => {
UdtListener::bind(address, Some(config.clone()))
.await
.map(RawListener::Udt)
}
};
result
.map_err(ServerError::initialize_failed)
.map_err(Doom::into_top)
.spot(here!())?
};

fuse.spawn(async move {
let _ = Server::listen(settings, database, listener).await;
Expand All @@ -74,17 +81,29 @@ impl Server {
async fn listen(
settings: ServerSettings,
database: Arc<Mutex<Database>>,
listener: TcpListener,
listener: RawListener,
) {
let fuse = Fuse::new();

let accept = || async {
match listener {
RawListener::Tcp(ref tcp_listener) => tcp_listener
.accept()
.await
.map(|(stream, address)| (stream.into(), address)),
RawListener::Udt(ref udt_listener) => udt_listener
.accept()
.await
.map(|(address, stream)| (stream.into(), address)),
}
};

loop {
if let Ok((stream, address)) = listener.accept().await {
if let Ok((connection, address)) = accept().await {
let connection: PlainConnection = connection;
let settings = settings.clone();
let database = database.clone();

let connection: PlainConnection = stream.into();

fuse.spawn(async move {
let _ = Server::serve(settings, database, connection, address).await;
});
Expand Down
4 changes: 4 additions & 0 deletions src/link/rendezvous/server_settings.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::net::traits::ConnectSettings;

#[derive(Debug, Clone)]
pub struct ServerSettings {
pub shard_sizes: Vec<usize>,
pub connect: ConnectSettings,
}

impl Default for ServerSettings {
fn default() -> Self {
ServerSettings {
shard_sizes: vec![4],
connect: ConnectSettings::default(),
}
}
}
4 changes: 2 additions & 2 deletions src/link/test/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod context {
},
net::{
test::{System as NetSystem, TestConnector},
traits::TcpConnect,
traits::{Connect, ConnectSettings},
Connector, Listener, PlainConnection,
},
time::test::join,
Expand Down Expand Up @@ -95,7 +95,7 @@ mod context {
impl SlowLoris {
async fn connect(&self, identity: Identity) -> PlainConnection {
let address = self.0.peers.get(&identity).unwrap().clone();
address.connect().await.unwrap()
address.connect(&ConnectSettings::default()).await.unwrap()

// does not complete (no `secure` or `authenticate`)
}
Expand Down
1 change: 1 addition & 0 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod session_connector;
mod session_control;
mod session_listener;
mod socket;
mod udt;
mod unit_receiver;
mod unit_sender;

Expand Down
Loading