Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

82: use s2n quic with bbr #50

Merged
merged 9 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
726 changes: 490 additions & 236 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ wildcard_imports = "warn"
[workspace.dependencies]
anyhow = { version = "1.0" }
byteorder = { version = "1", default-features = false }
bytes = { version = "1.9.0" }
cfg-if = { version = "1", default-features = false }
const_format = { version = "0.2", default-features = false }
heapless = { version = "0.8", default-features = false }
Expand All @@ -46,6 +47,7 @@ postcard = { version = "1", default-features = false }
proptest = { version = "1.6", default-features = false, features = ["no_std", "alloc"] }
proptest-derive = { version = "0.5" }
quinn = { version = "0.10.1" }
s2n-quic = { version = "1.51.0" }
rand = { version = "0.8", default-features = false }
rand_core = { version = "0.6", default-features = false }
rustix = { version = "0.38", default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion crates/aranya-quic-syncer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ aranya-buggy = { version = "0.1.0", path = "../aranya-buggy", features = ["std"]
aranya-crypto = { version = "0.2.0", path = "../aranya-crypto", features = ["std"] }
aranya-runtime = { version = "0.3.0", path = "../aranya-runtime", features = ["std"] }

bytes = { workspace = true }
heapless = { workspace = true, features = ["serde"] }
postcard = { workspace = true, features = ["alloc"] }
quinn = { workspace = true }
rustls = { workspace = true, default-features = false, features = ["quic"] }
s2n-quic = { workspace = true }
thiserror = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
Expand Down
59 changes: 23 additions & 36 deletions crates/aranya-quic-syncer/benches/quic_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
#![allow(clippy::unwrap_used)]

use std::{
net::{Ipv4Addr, SocketAddr},
ops::DerefMut,
sync::Arc,
time::{Duration, Instant},
};

use anyhow::{Context, Result};
use anyhow::Result;
use aranya_crypto::Rng;
use aranya_quic_syncer::{run_syncer, Syncer};
use aranya_runtime::{
Expand All @@ -21,8 +20,7 @@ use aranya_runtime::{
ClientState, GraphId, Sink, SyncRequester,
};
use criterion::{criterion_group, criterion_main, Criterion};
use quinn::{Endpoint, ServerConfig};
use rustls::{Certificate, PrivateKey};
use s2n_quic::Server;
use tokio::{
runtime::Runtime,
sync::{mpsc, Mutex as TMutex},
Expand Down Expand Up @@ -92,6 +90,14 @@ fn add_commands(
}
}

fn get_server(cert: String, key: String) -> Result<Server> {
let server = Server::builder()
.with_tls((&cert[..], &key[..]))?
.with_io("127.0.0.1:0")?
.start()?;
Ok(server)
}

// benchmark the time to sync a command.
fn sync_bench(c: &mut Criterion) {
let rt = Runtime::new().expect("error creating runtime");
Expand All @@ -101,34 +107,35 @@ fn sync_bench(c: &mut Criterion) {
// setup
let request_sink = Arc::new(TMutex::new(CountSink::new()));
let request_client = Arc::new(TMutex::new(create_client()));
let (key, cert) = certs().expect("generating certs failed");
let server_addr1 =
get_server_addr(key.clone(), cert.clone()).expect("getting server addr failed");
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])
.expect("error generating cert");
let key = cert.serialize_private_key_pem();
let cert = cert.serialize_pem().expect("error serilizing cert");
let server1 = get_server(cert.clone(), key.clone()).expect("error getting server");
let (tx1, _) = mpsc::unbounded_channel();
let syncer1 = Arc::new(TMutex::new(
Syncer::new(
&[cert.clone()],
&*cert.clone(),
request_client.clone(),
request_sink.clone(),
tx1,
server_addr1.local_addr().expect("error getting local addr"),
server1.local_addr().expect("error getting local addr"),
)
.expect("Syncer creation must succeed"),
));

let response_sink = Arc::new(TMutex::new(CountSink::new()));
let response_client = Arc::new(TMutex::new(create_client()));
let server_addr2 =
get_server_addr(key.clone(), cert.clone()).expect("getting server addr failed");
let addr2 = server_addr2.local_addr().expect("error getting local addr");
let server2 = get_server(cert.clone(), key.clone()).expect("error getting server");
let server2_addr = server2.local_addr().expect("error getting local addr");
let (tx2, rx2) = mpsc::unbounded_channel();
let syncer2 = Arc::new(TMutex::new(
Syncer::new(
&[cert.clone()],
&*cert,
response_client.clone(),
response_sink.clone(),
tx2,
server_addr2.local_addr().expect("error getting local addr"),
server2_addr,
)
.expect("Syncer creation must succeed"),
));
Expand All @@ -139,7 +146,7 @@ fn sync_bench(c: &mut Criterion) {
)
.expect("creating graph failed");

let task = tokio::spawn(run_syncer(syncer2.clone(), server_addr2, rx2));
let task = tokio::spawn(run_syncer(syncer2.clone(), server2, rx2));
add_commands(
response_client.lock().await.deref_mut(),
storage_id,
Expand All @@ -150,7 +157,7 @@ fn sync_bench(c: &mut Criterion) {
// Start timing for benchmark
let start = Instant::now();
while request_sink.lock().await.count() < iters.try_into().unwrap() {
let sync_requester = SyncRequester::new(storage_id, &mut Rng::new(), addr2);
let sync_requester = SyncRequester::new(storage_id, &mut Rng::new(), server2_addr);
if let Err(e) = syncer1
.lock()
.await
Expand All @@ -172,26 +179,6 @@ fn sync_bench(c: &mut Criterion) {
});
}

fn get_server_addr(key: PrivateKey, cert: Certificate) -> Result<Endpoint> {
let mut server_config = ServerConfig::with_single_cert(vec![cert], key)?;
let transport_config =
Arc::get_mut(&mut server_config.transport).context("unique transport")?;
transport_config.max_concurrent_uni_streams(0_u8.into());
let endpoint = Endpoint::server(
server_config,
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0),
)?;
Ok(endpoint)
}

fn certs() -> Result<(PrivateKey, Certificate)> {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])?;
Ok((
PrivateKey(cert.serialize_private_key_der()),
Certificate(cert.serialize_der()?),
))
}

criterion_group!(
name = benches;
config = Criterion::default().sample_size(20).measurement_time(Duration::from_secs(10));
Expand Down
40 changes: 20 additions & 20 deletions crates/aranya-quic-syncer/examples/quic_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use aranya_runtime::{
ClientState, Engine, GraphId, StorageProvider, SyncRequester,
};
use clap::Parser;
use quinn::ServerConfig;
use s2n_quic::Server;
use tokio::sync::{mpsc, Mutex as TMutex};

/// An error returned by the syncer.
Expand Down Expand Up @@ -92,20 +92,30 @@ async fn sync_peer<EN, SP, S>(
}
}

fn get_server(cert: String, key: String, addr: SocketAddr) -> Result<Server> {
let server = Server::builder()
.with_tls((&cert[..], &key[..]))?
.with_io(addr)?
.start()?;
Ok(server)
}

#[tokio::main]
async fn run(options: Opt) -> Result<()> {
let dirs = directories_next::ProjectDirs::from("org", "spideroak", "aranya")
.expect("unable to load directory");
let path = dirs.data_local_dir();
let cert_path = path.join("cert.der");
let key_path = path.join("key.der");
let (cert, key) = match fs::read(&cert_path).and_then(|x| Ok((x, fs::read(&key_path)?))) {
let cert_path = path.join("cert.pem");
let key_path = path.join("key.pem");
let (cert, key) = match fs::read_to_string(&cert_path)
.and_then(|cert| fs::read_to_string(&key_path).map(|key| (cert, key)))
{
Ok(x) => x,
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()])
.expect("error generating cert");
let key = cert.serialize_private_key_der();
let cert = cert.serialize_der().expect("error serializing cert");
let key = cert.serialize_private_key_pem();
let cert = cert.serialize_pem().expect("error serializing cert");
fs::create_dir_all(path).context("failed to create certificate directory")?;
fs::write(&cert_path, &cert).context("failed to write certificate")?;
fs::write(&key_path, &key).context("failed to write private key")?;
Expand All @@ -121,24 +131,14 @@ async fn run(options: Opt) -> Result<()> {

let client = Arc::new(TMutex::new(ClientState::new(engine, storage)));
let sink = Arc::new(TMutex::new(PrintSink {}));
let key = rustls::PrivateKey(key);
let cert = rustls::Certificate(cert);
let cert_chain = vec![cert];
let server = get_server(cert.clone(), key, options.listen)?;
let (tx1, _) = mpsc::unbounded_channel();
let mut server_config = ServerConfig::with_single_cert(cert_chain.clone(), key.clone())?;
let transport_config =
Arc::get_mut(&mut server_config.transport).expect("error creating transport config");
transport_config.max_concurrent_uni_streams(0_u8.into());
let endpoint =
quinn::Endpoint::server(server_config, options.listen).map_err(|e| SyncError {
error_msg: e.to_string(),
})?;
let syncer = Arc::new(TMutex::new(Syncer::new(
&cert_chain,
&cert[..],
client.clone(),
sink.clone(),
tx1,
endpoint.local_addr()?,
server.local_addr()?,
)?));

let storage_id;
Expand Down Expand Up @@ -166,7 +166,7 @@ async fn run(options: Opt) -> Result<()> {
}

let (_, rx1) = mpsc::unbounded_channel();
let task = tokio::spawn(run_syncer(syncer.clone(), endpoint, rx1));
let task = tokio::spawn(run_syncer(syncer.clone(), server, rx1));
// Initial sync to sync the Init command
if !options.new_graph {
sync_peer(
Expand Down
Loading
Loading