Skip to content

Commit

Permalink
feat(http2): add adaptive window size support using BDP
Browse files Browse the repository at this point in the history
This adds support for calculating the Bandwidth-delay product when using
HTTP2. When a DATA frame is received, a PING is sent to the remote.
While the PING acknoledgement is outstanding, the amount of bytes of all
received DATA frames is accumulated. Once we receive the PING
acknowledgement, we calculate the BDP based on the number of received
bytes and the round-trip-time of the PING. If we are near the current
maximum window size, the size is doubled.

It's disabled by default until tested more extensively.
  • Loading branch information
seanmonstar committed Feb 24, 2020
1 parent 22dc6fe commit 30012ae
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 71 deletions.
26 changes: 23 additions & 3 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::bdp;
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade;

Expand Down Expand Up @@ -61,6 +62,11 @@ struct Extra {
/// connection yet.
delayed_eof: Option<DelayEof>,
on_upgrade: OnUpgrade,

/// Records bytes read to compute the BDP.
///
/// Only used with `H2` variant.
h2_bdp: bdp::Sampler,
}

type DelayEofUntil = oneshot::Receiver<Never>;
Expand Down Expand Up @@ -175,11 +181,21 @@ impl Body {
Body { kind, extra: None }
}

pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self {
Body::new(Kind::H2 {
pub(crate) fn h2(
recv: h2::RecvStream,
content_length: DecodedLength,
bdp: bdp::Sampler,
) -> Self {
let mut body = Body::new(Kind::H2 {
content_length,
recv,
})
});

if bdp.is_enabled() {
body.extra_mut().h2_bdp = bdp;
}

body
}

pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
Expand All @@ -204,6 +220,7 @@ impl Body {
Box::new(Extra {
delayed_eof: None,
on_upgrade: OnUpgrade::none(),
h2_bdp: bdp::disabled(),
})
})
}
Expand Down Expand Up @@ -262,6 +279,9 @@ impl Body {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
if let Some(ref extra) = self.extra {
extra.h2_bdp.sample(bytes.len());
}
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
Expand Down
38 changes: 22 additions & 16 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ where
H2(#[pin] proto::h2::ClientTask<B>),
}

// Our defaults are chosen for the "majority" case, which usually are not
// resource constrained, and so the spec default of 64kb can be too limiting
// for performance.
const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb

/// Returns a handshake future over some IO.
///
/// This is a shortcut for `Builder::new().handshake(io)`.
Expand Down Expand Up @@ -82,7 +76,7 @@ pub struct Builder {
h1_read_buf_exact_size: Option<usize>,
h1_max_buf_size: Option<usize>,
http2: bool,
h2_builder: h2::client::Builder,
h2_builder: proto::h2::client::Config,
}

/// A future returned by `SendRequest::send_request`.
Expand Down Expand Up @@ -420,20 +414,14 @@ impl Builder {
/// Creates a new connection builder.
#[inline]
pub fn new() -> Builder {
let mut h2_builder = h2::client::Builder::default();
h2_builder
.initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW)
.initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW)
.enable_push(false);

Builder {
exec: Exec::Default,
h1_writev: true,
h1_read_buf_exact_size: None,
h1_title_case_headers: false,
h1_max_buf_size: None,
http2: false,
h2_builder,
h2_builder: Default::default(),
}
}

Expand Down Expand Up @@ -491,7 +479,8 @@ impl Builder {
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.initial_window_size(sz);
self.h2_builder.adaptive_window = false;
self.h2_builder.initial_stream_window_size = sz;
}
self
}
Expand All @@ -506,7 +495,24 @@ impl Builder {
sz: impl Into<Option<u32>>,
) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.initial_connection_window_size(sz);
self.h2_builder.adaptive_window = false;
self.h2_builder.initial_conn_window_size = sz;
}
self
}

/// Sets whether to use an adaptive flow control.
///
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
use proto::h2::SPEC_WINDOW_SIZE;

self.h2_builder.adaptive_window = enabled;
if enabled {
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
}
self
}
Expand Down
10 changes: 10 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,16 @@ impl Builder {
self
}

/// Sets whether to use an adaptive flow control.
///
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
self.conn_builder.http2_adaptive_window(enabled);
self
}

/// Sets the maximum idle connection per host allowed in the pool.
///
/// Default is `usize::MAX` (no limit).
Expand Down
190 changes: 190 additions & 0 deletions src/proto/h2/bdp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// What should it do?
//
// # BDP Algorithm
//
// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
// 1a. Record current time.
// 1b. Send a BDP ping.
// 2. Increment the number of received bytes.
// 3. When the BDP ping ack is received:
// 3a. Record duration from sent time.
// 3b. Merge RTT with a running average.
// 3c. Calculate bdp as bytes/rtt.
// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
//
//
// # Implementation
//
// - `hyper::Body::h2` variant includes a "bdp channel"
// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())`
//

use std::sync::{Arc, Mutex, Weak};
use std::task::{self, Poll};
use std::time::{Duration, Instant};

use h2::{Ping, PingPong};

type WindowSize = u32;

/// Any higher than this likely will be hitting the TCP flow control.
const BDP_LIMIT: usize = 1024 * 1024 * 16;

pub(crate) fn disabled() -> Sampler {
Sampler {
shared: Weak::new(),
}
}

pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) {
let shared = Arc::new(Mutex::new(Shared {
bytes: 0,
ping_pong,
ping_sent: false,
sent_at: Instant::now(),
}));

(
Sampler {
shared: Arc::downgrade(&shared),
},
Estimator {
bdp: initial_window,
max_bandwidth: 0.0,
shared,
samples: 0,
rtt: 0.0,
},
)
}

#[derive(Clone)]
pub(crate) struct Sampler {
shared: Weak<Mutex<Shared>>,
}

pub(super) struct Estimator {
shared: Arc<Mutex<Shared>>,

/// Current BDP in bytes
bdp: u32,
/// Largest bandwidth we've seen so far.
max_bandwidth: f64,
/// Count of samples made (ping sent and received)
samples: usize,
/// Round trip time in seconds
rtt: f64,
}

struct Shared {
bytes: usize,
ping_pong: PingPong,
ping_sent: bool,
sent_at: Instant,
}

impl Sampler {
pub(crate) fn sample(&self, bytes: usize) {
let shared = if let Some(shared) = self.shared.upgrade() {
shared
} else {
return;
};

let mut inner = shared.lock().unwrap();

if !inner.ping_sent {
if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) {
inner.ping_sent = true;
inner.sent_at = Instant::now();
trace!("sending BDP ping");
} else {
return;
}
}

inner.bytes += bytes;
}

pub(crate) fn is_enabled(&self) -> bool {
self.shared.strong_count() > 0
}
}

impl Estimator {
pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll<WindowSize> {
let mut inner = self.shared.lock().unwrap();
if !inner.ping_sent {
// XXX: this doesn't register a waker...?
return Poll::Pending;
}

let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) {
Ok(_pong) => {
let rtt = inner.sent_at.elapsed();
let bytes = inner.bytes;
inner.bytes = 0;
inner.ping_sent = false;
self.samples += 1;
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
(bytes, rtt)
}
Err(e) => {
debug!("bdp pong error: {}", e);
return Poll::Pending;
}
};

drop(inner);

if let Some(bdp) = self.calculate(bytes, rtt) {
Poll::Ready(bdp)
} else {
// XXX: this doesn't register a waker...?
Poll::Pending
}
}

fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
// No need to do any math if we're at the limit.
if self.bdp as usize == BDP_LIMIT {
return None;
}

// average the rtt
let rtt = seconds(rtt);
if self.samples < 10 {
// Average the first 10 samples
self.rtt += (rtt - self.rtt) / (self.samples as f64);
} else {
self.rtt += (rtt - self.rtt) / 0.9;
}

// calculate the current bandwidth
let bw = (bytes as f64) / (self.rtt * 1.5);
trace!("current bandwidth = {:.1}B/s", bw);

if bw < self.max_bandwidth {
// not a faster bandwidth, so don't update
return None;
} else {
self.max_bandwidth = bw;
}

// if the current `bytes` sample is at least 2/3 the previous
// bdp, increase to double the current sample.
if (bytes as f64) >= (self.bdp as f64) * 0.66 {
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
trace!("BDP increased to {}", self.bdp);
Some(self.bdp)
} else {
None
}
}
}

fn seconds(dur: Duration) -> f64 {
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
let secs = dur.as_secs() as f64;
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
}
Loading

0 comments on commit 30012ae

Please sign in to comment.