Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http2): add adaptive window size support using BDP #2138

Merged
merged 1 commit into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::bdp;
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade;

Expand Down Expand Up @@ -61,6 +62,11 @@ struct Extra {
/// connection yet.
delayed_eof: Option<DelayEof>,
on_upgrade: OnUpgrade,

/// Records bytes read to compute the BDP.
///
/// Only used with `H2` variant.
h2_bdp: bdp::Sampler,
}

type DelayEofUntil = oneshot::Receiver<Never>;
Expand Down Expand Up @@ -175,11 +181,21 @@ impl Body {
Body { kind, extra: None }
}

pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self {
Body::new(Kind::H2 {
pub(crate) fn h2(
recv: h2::RecvStream,
content_length: DecodedLength,
bdp: bdp::Sampler,
) -> Self {
let mut body = Body::new(Kind::H2 {
content_length,
recv,
})
});

if bdp.is_enabled() {
body.extra_mut().h2_bdp = bdp;
}

body
}

pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
Expand All @@ -204,6 +220,7 @@ impl Body {
Box::new(Extra {
delayed_eof: None,
on_upgrade: OnUpgrade::none(),
h2_bdp: bdp::disabled(),
})
})
}
Expand Down Expand Up @@ -262,6 +279,9 @@ impl Body {
Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
if let Some(ref extra) = self.extra {
extra.h2_bdp.sample(bytes.len());
}
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
Expand Down
38 changes: 22 additions & 16 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ where
H2(#[pin] proto::h2::ClientTask<B>),
}

// Our defaults are chosen for the "majority" case, which usually are not
// resource constrained, and so the spec default of 64kb can be too limiting
// for performance.
const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb

/// Returns a handshake future over some IO.
///
/// This is a shortcut for `Builder::new().handshake(io)`.
Expand Down Expand Up @@ -82,7 +76,7 @@ pub struct Builder {
h1_read_buf_exact_size: Option<usize>,
h1_max_buf_size: Option<usize>,
http2: bool,
h2_builder: h2::client::Builder,
h2_builder: proto::h2::client::Config,
}

/// A future returned by `SendRequest::send_request`.
Expand Down Expand Up @@ -420,20 +414,14 @@ impl Builder {
/// Creates a new connection builder.
#[inline]
pub fn new() -> Builder {
let mut h2_builder = h2::client::Builder::default();
h2_builder
.initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW)
.initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW)
.enable_push(false);

Builder {
exec: Exec::Default,
h1_writev: true,
h1_read_buf_exact_size: None,
h1_title_case_headers: false,
h1_max_buf_size: None,
http2: false,
h2_builder,
h2_builder: Default::default(),
}
}

Expand Down Expand Up @@ -491,7 +479,8 @@ impl Builder {
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.initial_window_size(sz);
self.h2_builder.adaptive_window = false;
self.h2_builder.initial_stream_window_size = sz;
}
self
}
Expand All @@ -506,7 +495,24 @@ impl Builder {
sz: impl Into<Option<u32>>,
) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.initial_connection_window_size(sz);
self.h2_builder.adaptive_window = false;
self.h2_builder.initial_conn_window_size = sz;
}
self
}

/// Sets whether to use an adaptive flow control.
///
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
use proto::h2::SPEC_WINDOW_SIZE;

self.h2_builder.adaptive_window = enabled;
if enabled {
self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
}
self
}
Expand Down
10 changes: 10 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,16 @@ impl Builder {
self
}

/// Sets whether to use an adaptive flow control.
///
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
self.conn_builder.http2_adaptive_window(enabled);
self
}

/// Sets the maximum idle connection per host allowed in the pool.
///
/// Default is `usize::MAX` (no limit).
Expand Down
190 changes: 190 additions & 0 deletions src/proto/h2/bdp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// What should it do?
//
// # BDP Algorithm
//
// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
// 1a. Record current time.
// 1b. Send a BDP ping.
// 2. Increment the number of received bytes.
// 3. When the BDP ping ack is received:
// 3a. Record duration from sent time.
// 3b. Merge RTT with a running average.
// 3c. Calculate bdp as bytes/rtt.
// 3d. If bdp is over 2/3 max, set new max to bdp and update windows.
//
//
// # Implementation
//
// - `hyper::Body::h2` variant includes a "bdp channel"
// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())`
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
//

use std::sync::{Arc, Mutex, Weak};
use std::task::{self, Poll};
use std::time::{Duration, Instant};

use h2::{Ping, PingPong};

type WindowSize = u32;

/// Any higher than this likely will be hitting the TCP flow control.
const BDP_LIMIT: usize = 1024 * 1024 * 16;

pub(crate) fn disabled() -> Sampler {
Sampler {
shared: Weak::new(),
}
}

pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) {
let shared = Arc::new(Mutex::new(Shared {
bytes: 0,
ping_pong,
ping_sent: false,
sent_at: Instant::now(),
}));

(
Sampler {
shared: Arc::downgrade(&shared),
},
Estimator {
bdp: initial_window,
max_bandwidth: 0.0,
shared,
samples: 0,
rtt: 0.0,
},
)
}

#[derive(Clone)]
pub(crate) struct Sampler {
shared: Weak<Mutex<Shared>>,
}

pub(super) struct Estimator {
shared: Arc<Mutex<Shared>>,

/// Current BDP in bytes
bdp: u32,
/// Largest bandwidth we've seen so far.
max_bandwidth: f64,
/// Count of samples made (ping sent and received)
samples: usize,
/// Round trip time in seconds
rtt: f64,
}

struct Shared {
bytes: usize,
ping_pong: PingPong,
ping_sent: bool,
sent_at: Instant,
}

impl Sampler {
pub(crate) fn sample(&self, bytes: usize) {
let shared = if let Some(shared) = self.shared.upgrade() {
shared
} else {
return;
};

let mut inner = shared.lock().unwrap();

if !inner.ping_sent {
if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) {
inner.ping_sent = true;
inner.sent_at = Instant::now();
trace!("sending BDP ping");
} else {
return;
}
}

inner.bytes += bytes;
}

pub(crate) fn is_enabled(&self) -> bool {
self.shared.strong_count() > 0
}
}

impl Estimator {
pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll<WindowSize> {
let mut inner = self.shared.lock().unwrap();
if !inner.ping_sent {
// XXX: this doesn't register a waker...?
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
return Poll::Pending;
}

let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) {
Ok(_pong) => {
let rtt = inner.sent_at.elapsed();
let bytes = inner.bytes;
inner.bytes = 0;
inner.ping_sent = false;
self.samples += 1;
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
(bytes, rtt)
}
Err(e) => {
debug!("bdp pong error: {}", e);
return Poll::Pending;
}
};

drop(inner);

if let Some(bdp) = self.calculate(bytes, rtt) {
Poll::Ready(bdp)
} else {
// XXX: this doesn't register a waker...?
Poll::Pending
}
}

fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
// No need to do any math if we're at the limit.
if self.bdp as usize == BDP_LIMIT {
return None;
}

// average the rtt
let rtt = seconds(rtt);
if self.samples < 10 {
// Average the first 10 samples
self.rtt += (rtt - self.rtt) / (self.samples as f64);
} else {
self.rtt += (rtt - self.rtt) / 0.9;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this trying to be an exponential moving average? Should this be * 0.9 then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I inversed it by accident. It's fixed in the latest release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah my bad, I didn't notice because I was using an old release.

}

// calculate the current bandwidth
let bw = (bytes as f64) / (self.rtt * 1.5);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can somebody explain why 1.5?

trace!("current bandwidth = {:.1}B/s", bw);

if bw < self.max_bandwidth {
// not a faster bandwidth, so don't update
return None;
} else {
self.max_bandwidth = bw;
}

// if the current `bytes` sample is at least 2/3 the previous
// bdp, increase to double the current sample.
if (bytes as f64) >= (self.bdp as f64) * 0.66 {
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
trace!("BDP increased to {}", self.bdp);
Some(self.bdp)
} else {
None
}
}
}

fn seconds(dur: Duration) -> f64 {
const NANOS_PER_SEC: f64 = 1_000_000_000.0;
let secs = dur.as_secs() as f64;
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
}
Loading