Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions dc/s2n-quic-dc/src/stream/server/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ use std::{io, net::SocketAddr, time::Duration};
use tokio::io::unix::AsyncFd;
use tracing::{trace, Instrument as _};

// The kernel contends on fdtable lock when calling accept to locate free file
// descriptors, so even if we don't contend on the underlying socket (due to
// REUSEPORT) it still ends up expensive to have large amounts of threads trying to
// accept() within a single process. Clamp concurrency to at most 4 threads
// executing the TCP acceptor tasks accordingly.
//
// With UDP there's ~no lock contention for receiving packets on separate UDP sockets,
// so we don't clamp concurrency in that case.
const MAX_TCP_WORKERS: usize = 4;

pub mod tcp;
pub mod udp;

Expand Down Expand Up @@ -300,7 +310,11 @@ impl Builder {
};

// split the backlog between all of the workers
let backlog = backlog.div_ceil(concurrency).max(1);
//
// this is only used in TCP, so clamp division to maximum TCP worker concurrency
let backlog = backlog
.div_ceil(concurrency.clamp(0, MAX_TCP_WORKERS))
.max(1);

Start {
enable_tcp: self.enable_tcp,
Expand Down Expand Up @@ -414,15 +428,7 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
self.spawn_udp(socket)?;
}
socket::Protocol::Tcp => {
// The kernel contends on fdtable lock when calling accept to locate free file
// descriptors, so even if we don't contend on the underlying socket (due to
// REUSEPORT) it still ends up expensive to have large amounts of threads trying to
// accept() within a single process. Clamp concurrency to at most 4 threads
// executing the TCP acceptor tasks accordingly.
//
// With UDP there's ~no lock contention for receiving packets on separate UDP sockets,
// so we don't clamp concurrency in that case.
if idx + already_running >= 4 {
if idx + already_running >= MAX_TCP_WORKERS {
continue;
}

Expand All @@ -443,7 +449,12 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Start<'_, H, S> {
fn socket_opts(&self, local_addr: SocketAddr) -> socket::Options {
let mut options = socket::Options::new(local_addr);

options.backlog = self.backlog;
// Explicitly do **not** set the socket backlog to self.backlog. While we split the
// configured backlog amongst our in-process queues as concurrency increases, it doesn't
// make sense to shrink the kernel backlogs -- that just causes packet drops and generally
// bad behavior.
//
// This is especially true for TCP where we don't have workers matching concurrency.
options.send_buffer = self.send_buffer;
options.recv_buffer = self.recv_buffer;
options.reuse_address = self.reuse_addr;
Expand Down
Loading