-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(http2): add adaptive window size support using BDP (#2138)
This adds support for calculating the Bandwidth-delay product when using HTTP2. When a DATA frame is received, a PING is sent to the remote. While the PING acknoledgement is outstanding, the amount of bytes of all received DATA frames is accumulated. Once we receive the PING acknowledgement, we calculate the BDP based on the number of received bytes and the round-trip-time of the PING. If we are near the current maximum window size, the size is doubled. It's disabled by default until tested more extensively.
- Loading branch information
1 parent
22dc6fe
commit 48102d6
Showing
9 changed files
with
451 additions
and
71 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
// What should it do? | ||
// | ||
// # BDP Algorithm | ||
// | ||
// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: | ||
// 1a. Record current time. | ||
// 1b. Send a BDP ping. | ||
// 2. Increment the number of received bytes. | ||
// 3. When the BDP ping ack is received: | ||
// 3a. Record duration from sent time. | ||
// 3b. Merge RTT with a running average. | ||
// 3c. Calculate bdp as bytes/rtt. | ||
// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. | ||
// | ||
// | ||
// # Implementation | ||
// | ||
// - `hyper::Body::h2` variant includes a "bdp channel" | ||
// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())` | ||
// | ||
|
||
use std::sync::{Arc, Mutex, Weak}; | ||
use std::task::{self, Poll}; | ||
use std::time::{Duration, Instant}; | ||
|
||
use h2::{Ping, PingPong}; | ||
|
||
type WindowSize = u32; | ||
|
||
/// Any higher than this likely will be hitting the TCP flow control. | ||
const BDP_LIMIT: usize = 1024 * 1024 * 16; | ||
|
||
pub(crate) fn disabled() -> Sampler { | ||
Sampler { | ||
shared: Weak::new(), | ||
} | ||
} | ||
|
||
pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) { | ||
let shared = Arc::new(Mutex::new(Shared { | ||
bytes: 0, | ||
ping_pong, | ||
ping_sent: false, | ||
sent_at: Instant::now(), | ||
})); | ||
|
||
( | ||
Sampler { | ||
shared: Arc::downgrade(&shared), | ||
}, | ||
Estimator { | ||
bdp: initial_window, | ||
max_bandwidth: 0.0, | ||
shared, | ||
samples: 0, | ||
rtt: 0.0, | ||
}, | ||
) | ||
} | ||
|
||
#[derive(Clone)] | ||
pub(crate) struct Sampler { | ||
shared: Weak<Mutex<Shared>>, | ||
} | ||
|
||
pub(super) struct Estimator { | ||
shared: Arc<Mutex<Shared>>, | ||
|
||
/// Current BDP in bytes | ||
bdp: u32, | ||
/// Largest bandwidth we've seen so far. | ||
max_bandwidth: f64, | ||
/// Count of samples made (ping sent and received) | ||
samples: usize, | ||
/// Round trip time in seconds | ||
rtt: f64, | ||
} | ||
|
||
struct Shared { | ||
bytes: usize, | ||
ping_pong: PingPong, | ||
ping_sent: bool, | ||
sent_at: Instant, | ||
} | ||
|
||
impl Sampler { | ||
pub(crate) fn sample(&self, bytes: usize) { | ||
let shared = if let Some(shared) = self.shared.upgrade() { | ||
shared | ||
} else { | ||
return; | ||
}; | ||
|
||
let mut inner = shared.lock().unwrap(); | ||
|
||
if !inner.ping_sent { | ||
if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) { | ||
inner.ping_sent = true; | ||
inner.sent_at = Instant::now(); | ||
trace!("sending BDP ping"); | ||
} else { | ||
return; | ||
} | ||
} | ||
|
||
inner.bytes += bytes; | ||
} | ||
|
||
pub(crate) fn is_enabled(&self) -> bool { | ||
self.shared.strong_count() > 0 | ||
} | ||
} | ||
|
||
impl Estimator { | ||
pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll<WindowSize> { | ||
let mut inner = self.shared.lock().unwrap(); | ||
if !inner.ping_sent { | ||
// XXX: this doesn't register a waker...? | ||
return Poll::Pending; | ||
} | ||
|
||
let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) { | ||
Ok(_pong) => { | ||
let rtt = inner.sent_at.elapsed(); | ||
let bytes = inner.bytes; | ||
inner.bytes = 0; | ||
inner.ping_sent = false; | ||
self.samples += 1; | ||
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); | ||
(bytes, rtt) | ||
} | ||
Err(e) => { | ||
debug!("bdp pong error: {}", e); | ||
return Poll::Pending; | ||
} | ||
}; | ||
|
||
drop(inner); | ||
|
||
if let Some(bdp) = self.calculate(bytes, rtt) { | ||
Poll::Ready(bdp) | ||
} else { | ||
// XXX: this doesn't register a waker...? | ||
Poll::Pending | ||
} | ||
} | ||
|
||
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> { | ||
// No need to do any math if we're at the limit. | ||
if self.bdp as usize == BDP_LIMIT { | ||
return None; | ||
} | ||
|
||
// average the rtt | ||
let rtt = seconds(rtt); | ||
if self.samples < 10 { | ||
// Average the first 10 samples | ||
self.rtt += (rtt - self.rtt) / (self.samples as f64); | ||
} else { | ||
self.rtt += (rtt - self.rtt) / 0.9; | ||
} | ||
|
||
// calculate the current bandwidth | ||
let bw = (bytes as f64) / (self.rtt * 1.5); | ||
trace!("current bandwidth = {:.1}B/s", bw); | ||
|
||
if bw < self.max_bandwidth { | ||
// not a faster bandwidth, so don't update | ||
return None; | ||
} else { | ||
self.max_bandwidth = bw; | ||
} | ||
|
||
// if the current `bytes` sample is at least 2/3 the previous | ||
// bdp, increase to double the current sample. | ||
if (bytes as f64) >= (self.bdp as f64) * 0.66 { | ||
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; | ||
trace!("BDP increased to {}", self.bdp); | ||
Some(self.bdp) | ||
} else { | ||
None | ||
} | ||
} | ||
} | ||
|
||
fn seconds(dur: Duration) -> f64 { | ||
const NANOS_PER_SEC: f64 = 1_000_000_000.0; | ||
let secs = dur.as_secs() as f64; | ||
secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC | ||
} |
Oops, something went wrong.