Skip to content

Commit

Permalink
82: use s2n quic with bbr (#50)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Dygert <[email protected]>
  • Loading branch information
benz-spideroak and jdygert-spok authored Jan 22, 2025
1 parent 7483255 commit 53727c6
Show file tree
Hide file tree
Showing 10 changed files with 959 additions and 673 deletions.
725 changes: 473 additions & 252 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ wildcard_imports = "warn"
[workspace.dependencies]
anyhow = { version = "1.0" }
byteorder = { version = "1", default-features = false }
bytes = { version = "1.9.0" }
cfg-if = { version = "1", default-features = false }
const_format = { version = "0.2", default-features = false }
heapless = { version = "0.8", default-features = false }
Expand All @@ -46,6 +47,7 @@ postcard = { version = "1", default-features = false }
proptest = { version = "1.6", default-features = false, features = ["no_std", "alloc"] }
proptest-derive = { version = "0.5" }
quinn = { version = "0.10.1" }
s2n-quic = { version = "1.51.0" }
rand = { version = "0.8", default-features = false }
rand_core = { version = "0.6", default-features = false }
rustix = { version = "0.38", default-features = false }
Expand Down
3 changes: 1 addition & 2 deletions crates/aranya-quic-syncer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ aranya-runtime = { version = "0.3.0", path = "../aranya-runtime", features = ["s

heapless = { workspace = true, features = ["serde"] }
postcard = { workspace = true, features = ["alloc"] }
quinn = { workspace = true }
rustls = { workspace = true, default-features = false, features = ["quic"] }
s2n-quic = { workspace = true }
thiserror = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
Expand Down
59 changes: 23 additions & 36 deletions crates/aranya-quic-syncer/benches/quic_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
#![allow(clippy::unwrap_used)]

use std::{
net::{Ipv4Addr, SocketAddr},
ops::DerefMut,
sync::Arc,
time::{Duration, Instant},
};

use anyhow::{Context, Result};
use anyhow::Result;
use aranya_crypto::Rng;
use aranya_quic_syncer::{run_syncer, Syncer};
use aranya_runtime::{
Expand All @@ -21,8 +20,7 @@ use aranya_runtime::{
ClientState, GraphId, Sink, SyncRequester,
};
use criterion::{criterion_group, criterion_main, Criterion};
use quinn::{Endpoint, ServerConfig};
use rustls::{Certificate, PrivateKey};
use s2n_quic::Server;
use tokio::{
runtime::Runtime,
sync::{mpsc, Mutex as TMutex},
Expand Down Expand Up @@ -92,6 +90,14 @@ fn add_commands(
}
}

fn get_server(cert: String, key: String) -> Result<Server> {
let server = Server::builder()
.with_tls((&cert[..], &key[..]))?
.with_io("127.0.0.1:0")?
.start()?;
Ok(server)
}

// benchmark the time to sync a command.
fn sync_bench(c: &mut Criterion) {
let rt = Runtime::new().expect("error creating runtime");
Expand All @@ -101,34 +107,35 @@ fn sync_bench(c: &mut Criterion) {
// setup
let request_sink = Arc::new(TMutex::new(CountSink::new()));
let request_client = Arc::new(TMutex::new(create_client()));
let (key, cert) = certs().expect("generating certs failed");
let server_addr1 =
get_server_addr(key.clone(), cert.clone()).expect("getting server addr failed");
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])
.expect("error generating cert");
let key = cert.serialize_private_key_pem();
let cert = cert.serialize_pem().expect("error serilizing cert");
let server1 = get_server(cert.clone(), key.clone()).expect("error getting server");
let (tx1, _) = mpsc::unbounded_channel();
let syncer1 = Arc::new(TMutex::new(
Syncer::new(
&[cert.clone()],
&*cert.clone(),
request_client.clone(),
request_sink.clone(),
tx1,
server_addr1.local_addr().expect("error getting local addr"),
server1.local_addr().expect("error getting local addr"),
)
.expect("Syncer creation must succeed"),
));

let response_sink = Arc::new(TMutex::new(CountSink::new()));
let response_client = Arc::new(TMutex::new(create_client()));
let server_addr2 =
get_server_addr(key.clone(), cert.clone()).expect("getting server addr failed");
let addr2 = server_addr2.local_addr().expect("error getting local addr");
let server2 = get_server(cert.clone(), key.clone()).expect("error getting server");
let server2_addr = server2.local_addr().expect("error getting local addr");
let (tx2, rx2) = mpsc::unbounded_channel();
let syncer2 = Arc::new(TMutex::new(
Syncer::new(
&[cert.clone()],
&*cert,
response_client.clone(),
response_sink.clone(),
tx2,
server_addr2.local_addr().expect("error getting local addr"),
server2_addr,
)
.expect("Syncer creation must succeed"),
));
Expand All @@ -139,7 +146,7 @@ fn sync_bench(c: &mut Criterion) {
)
.expect("creating graph failed");

let task = tokio::spawn(run_syncer(syncer2.clone(), server_addr2, rx2));
let task = tokio::spawn(run_syncer(syncer2.clone(), server2, rx2));
add_commands(
response_client.lock().await.deref_mut(),
storage_id,
Expand All @@ -150,7 +157,7 @@ fn sync_bench(c: &mut Criterion) {
// Start timing for benchmark
let start = Instant::now();
while request_sink.lock().await.count() < iters.try_into().unwrap() {
let sync_requester = SyncRequester::new(storage_id, &mut Rng::new(), addr2);
let sync_requester = SyncRequester::new(storage_id, &mut Rng::new(), server2_addr);
if let Err(e) = syncer1
.lock()
.await
Expand All @@ -172,26 +179,6 @@ fn sync_bench(c: &mut Criterion) {
});
}

fn get_server_addr(key: PrivateKey, cert: Certificate) -> Result<Endpoint> {
let mut server_config = ServerConfig::with_single_cert(vec![cert], key)?;
let transport_config =
Arc::get_mut(&mut server_config.transport).context("unique transport")?;
transport_config.max_concurrent_uni_streams(0_u8.into());
let endpoint = Endpoint::server(
server_config,
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0),
)?;
Ok(endpoint)
}

fn certs() -> Result<(PrivateKey, Certificate)> {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
Ok((
PrivateKey(cert.serialize_private_key_der()),
Certificate(cert.serialize_der()?),
))
}

criterion_group!(
name = benches;
config = Criterion::default().sample_size(20).measurement_time(Duration::from_secs(10));
Expand Down
40 changes: 20 additions & 20 deletions crates/aranya-quic-syncer/examples/quic_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use aranya_runtime::{
ClientState, Engine, GraphId, StorageProvider, SyncRequester,
};
use clap::Parser;
use quinn::ServerConfig;
use s2n_quic::Server;
use tokio::sync::{mpsc, Mutex as TMutex};

/// An error returned by the syncer.
Expand Down Expand Up @@ -92,20 +92,30 @@ async fn sync_peer<EN, SP, S>(
}
}

fn get_server(cert: String, key: String, addr: SocketAddr) -> Result<Server> {
let server = Server::builder()
.with_tls((&cert[..], &key[..]))?
.with_io(addr)?
.start()?;
Ok(server)
}

#[tokio::main]
async fn run(options: Opt) -> Result<()> {
let dirs = directories_next::ProjectDirs::from("org", "spideroak", "aranya")
.expect("unable to load directory");
let path = dirs.data_local_dir();
let cert_path = path.join("cert.der");
let key_path = path.join("key.der");
let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) {
let cert_path = path.join("cert.pem");
let key_path = path.join("key.pem");
let (cert, key) = match fs::read_to_string(&cert_path)
.and_then(|cert| fs::read_to_string(&key_path).map(|key| (cert, key)))
{
Ok(x) => x,
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])
.expect("error generating cert");
let key = cert.serialize_private_key_der();
let cert = cert.serialize_der().expect("error serializing cert");
let key = cert.serialize_private_key_pem();
let cert = cert.serialize_pem().expect("error serializing cert");
fs::create_dir_all(path).context("failed to create certificate directory")?;
fs::write(&cert_path, &cert).context("failed to write certificate")?;
fs::write(&key_path, &key).context("failed to write private key")?;
Expand All @@ -121,24 +131,14 @@ async fn run(options: Opt) -> Result<()> {

let client = Arc::new(TMutex::new(ClientState::new(engine, storage)));
let sink = Arc::new(TMutex::new(PrintSink {}));
let key = rustls::PrivateKey(key);
let cert = rustls::Certificate(cert);
let cert_chain = vec![cert];
let server = get_server(cert.clone(), key, options.listen)?;
let (tx1, _) = mpsc::unbounded_channel();
let mut server_config = ServerConfig::with_single_cert(cert_chain.clone(), key.clone())?;
let transport_config =
Arc::get_mut(&mut server_config.transport).expect("error creating transport config");
transport_config.max_concurrent_uni_streams(0_u8.into());
let endpoint =
quinn::Endpoint::server(server_config, options.listen).map_err(|e| SyncError {
error_msg: e.to_string(),
})?;
let syncer = Arc::new(TMutex::new(Syncer::new(
&cert_chain,
&cert[..],
client.clone(),
sink.clone(),
tx1,
endpoint.local_addr()?,
server.local_addr()?,
)?));

let storage_id;
Expand Down Expand Up @@ -166,7 +166,7 @@ async fn run(options: Opt) -> Result<()> {
}

let (_, rx1) = mpsc::unbounded_channel();
let task = tokio::spawn(run_syncer(syncer.clone(), endpoint, rx1));
let task = tokio::spawn(run_syncer(syncer.clone(), server, rx1));
// Initial sync to sync the Init command
if !options.new_graph {
sync_peer(
Expand Down
Loading

0 comments on commit 53727c6

Please sign in to comment.