From 30012aeceecd9f219870ff25d8768ed39a8e3caa Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 24 Feb 2020 11:27:07 -0800 Subject: [PATCH] feat(http2): add adaptive window size support using BDP This adds support for calculating the Bandwidth-delay product when using HTTP2. When a DATA frame is received, a PING is sent to the remote. While the PING acknoledgement is outstanding, the amount of bytes of all received DATA frames is accumulated. Once we receive the PING acknowledgement, we calculate the BDP based on the number of received bytes and the round-trip-time of the PING. If we are near the current maximum window size, the size is doubled. It's disabled by default until tested more extensively. --- src/body/body.rs | 26 +++++- src/client/conn.rs | 38 +++++---- src/client/mod.rs | 10 +++ src/proto/h2/bdp.rs | 190 +++++++++++++++++++++++++++++++++++++++++ src/proto/h2/client.rs | 96 ++++++++++++++++----- src/proto/h2/mod.rs | 4 + src/proto/h2/server.rs | 102 +++++++++++++++++++--- src/server/conn.rs | 46 +++++----- src/server/mod.rs | 10 +++ 9 files changed, 451 insertions(+), 71 deletions(-) create mode 100644 src/proto/h2/bdp.rs diff --git a/src/body/body.rs b/src/body/body.rs index c31635e26a..339b77d63a 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -12,6 +12,7 @@ use http::HeaderMap; use http_body::{Body as HttpBody, SizeHint}; use crate::common::{task, watch, Future, Never, Pin, Poll}; +use crate::proto::h2::bdp; use crate::proto::DecodedLength; use crate::upgrade::OnUpgrade; @@ -61,6 +62,11 @@ struct Extra { /// connection yet. delayed_eof: Option, on_upgrade: OnUpgrade, + + /// Records bytes read to compute the BDP. + /// + /// Only used with `H2` variant. + h2_bdp: bdp::Sampler, } type DelayEofUntil = oneshot::Receiver; @@ -175,11 +181,21 @@ impl Body { Body { kind, extra: None } } - pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self { - Body::new(Kind::H2 { + pub(crate) fn h2( + recv: h2::RecvStream, + content_length: DecodedLength, + bdp: bdp::Sampler, + ) -> Self { + let mut body = Body::new(Kind::H2 { content_length, recv, - }) + }); + + if bdp.is_enabled() { + body.extra_mut().h2_bdp = bdp; + } + + body } pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) { @@ -204,6 +220,7 @@ impl Body { Box::new(Extra { delayed_eof: None, on_upgrade: OnUpgrade::none(), + h2_bdp: bdp::disabled(), }) }) } @@ -262,6 +279,9 @@ impl Body { Some(Ok(bytes)) => { let _ = h2.flow_control().release_capacity(bytes.len()); len.sub_if(bytes.len() as u64); + if let Some(ref extra) = self.extra { + extra.h2_bdp.sample(bytes.len()); + } Poll::Ready(Some(Ok(bytes))) } Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), diff --git a/src/client/conn.rs b/src/client/conn.rs index a81f1f05ed..f7b24b5268 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -35,12 +35,6 @@ where H2(#[pin] proto::h2::ClientTask), } -// Our defaults are chosen for the "majority" case, which usually are not -// resource constrained, and so the spec default of 64kb can be too limiting -// for performance. -const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb -const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb - /// Returns a handshake future over some IO. /// /// This is a shortcut for `Builder::new().handshake(io)`. @@ -82,7 +76,7 @@ pub struct Builder { h1_read_buf_exact_size: Option, h1_max_buf_size: Option, http2: bool, - h2_builder: h2::client::Builder, + h2_builder: proto::h2::client::Config, } /// A future returned by `SendRequest::send_request`. @@ -420,12 +414,6 @@ impl Builder { /// Creates a new connection builder. #[inline] pub fn new() -> Builder { - let mut h2_builder = h2::client::Builder::default(); - h2_builder - .initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW) - .initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW) - .enable_push(false); - Builder { exec: Exec::Default, h1_writev: true, @@ -433,7 +421,7 @@ impl Builder { h1_title_case_headers: false, h1_max_buf_size: None, http2: false, - h2_builder, + h2_builder: Default::default(), } } @@ -491,7 +479,8 @@ impl Builder { /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_stream_window_size = sz; } self } @@ -506,7 +495,24 @@ impl Builder { sz: impl Into>, ) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_connection_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_conn_window_size = sz; + } + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + use proto::h2::SPEC_WINDOW_SIZE; + + self.h2_builder.adaptive_window = enabled; + if enabled { + self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; + self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; } self } diff --git a/src/client/mod.rs b/src/client/mod.rs index ce598d40b3..a9f7ab0666 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -996,6 +996,16 @@ impl Builder { self } + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + self.conn_builder.http2_adaptive_window(enabled); + self + } + /// Sets the maximum idle connection per host allowed in the pool. /// /// Default is `usize::MAX` (no limit). diff --git a/src/proto/h2/bdp.rs b/src/proto/h2/bdp.rs new file mode 100644 index 0000000000..611eb7acc4 --- /dev/null +++ b/src/proto/h2/bdp.rs @@ -0,0 +1,190 @@ +// What should it do? +// +// # BDP Algorithm +// +// 1. When receiving a DATA frame, if a BDP ping isn't outstanding: +// 1a. Record current time. +// 1b. Send a BDP ping. +// 2. Increment the number of received bytes. +// 3. When the BDP ping ack is received: +// 3a. Record duration from sent time. +// 3b. Merge RTT with a running average. +// 3c. Calculate bdp as bytes/rtt. +// 3d. If bdp is over 2/3 max, set new max to bdp and update windows. +// +// +// # Implementation +// +// - `hyper::Body::h2` variant includes a "bdp channel" +// - When the body's `poll_data` yields bytes, call `bdp.sample(bytes.len())` +// + +use std::sync::{Arc, Mutex, Weak}; +use std::task::{self, Poll}; +use std::time::{Duration, Instant}; + +use h2::{Ping, PingPong}; + +type WindowSize = u32; + +/// Any higher than this likely will be hitting the TCP flow control. +const BDP_LIMIT: usize = 1024 * 1024 * 16; + +pub(crate) fn disabled() -> Sampler { + Sampler { + shared: Weak::new(), + } +} + +pub(super) fn channel(ping_pong: PingPong, initial_window: WindowSize) -> (Sampler, Estimator) { + let shared = Arc::new(Mutex::new(Shared { + bytes: 0, + ping_pong, + ping_sent: false, + sent_at: Instant::now(), + })); + + ( + Sampler { + shared: Arc::downgrade(&shared), + }, + Estimator { + bdp: initial_window, + max_bandwidth: 0.0, + shared, + samples: 0, + rtt: 0.0, + }, + ) +} + +#[derive(Clone)] +pub(crate) struct Sampler { + shared: Weak>, +} + +pub(super) struct Estimator { + shared: Arc>, + + /// Current BDP in bytes + bdp: u32, + /// Largest bandwidth we've seen so far. + max_bandwidth: f64, + /// Count of samples made (ping sent and received) + samples: usize, + /// Round trip time in seconds + rtt: f64, +} + +struct Shared { + bytes: usize, + ping_pong: PingPong, + ping_sent: bool, + sent_at: Instant, +} + +impl Sampler { + pub(crate) fn sample(&self, bytes: usize) { + let shared = if let Some(shared) = self.shared.upgrade() { + shared + } else { + return; + }; + + let mut inner = shared.lock().unwrap(); + + if !inner.ping_sent { + if let Ok(()) = inner.ping_pong.send_ping(Ping::opaque()) { + inner.ping_sent = true; + inner.sent_at = Instant::now(); + trace!("sending BDP ping"); + } else { + return; + } + } + + inner.bytes += bytes; + } + + pub(crate) fn is_enabled(&self) -> bool { + self.shared.strong_count() > 0 + } +} + +impl Estimator { + pub(super) fn poll_estimate(&mut self, cx: &mut task::Context<'_>) -> Poll { + let mut inner = self.shared.lock().unwrap(); + if !inner.ping_sent { + // XXX: this doesn't register a waker...? + return Poll::Pending; + } + + let (bytes, rtt) = match ready!(inner.ping_pong.poll_pong(cx)) { + Ok(_pong) => { + let rtt = inner.sent_at.elapsed(); + let bytes = inner.bytes; + inner.bytes = 0; + inner.ping_sent = false; + self.samples += 1; + trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt); + (bytes, rtt) + } + Err(e) => { + debug!("bdp pong error: {}", e); + return Poll::Pending; + } + }; + + drop(inner); + + if let Some(bdp) = self.calculate(bytes, rtt) { + Poll::Ready(bdp) + } else { + // XXX: this doesn't register a waker...? + Poll::Pending + } + } + + fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option { + // No need to do any math if we're at the limit. + if self.bdp as usize == BDP_LIMIT { + return None; + } + + // average the rtt + let rtt = seconds(rtt); + if self.samples < 10 { + // Average the first 10 samples + self.rtt += (rtt - self.rtt) / (self.samples as f64); + } else { + self.rtt += (rtt - self.rtt) / 0.9; + } + + // calculate the current bandwidth + let bw = (bytes as f64) / (self.rtt * 1.5); + trace!("current bandwidth = {:.1}B/s", bw); + + if bw < self.max_bandwidth { + // not a faster bandwidth, so don't update + return None; + } else { + self.max_bandwidth = bw; + } + + // if the current `bytes` sample is at least 2/3 the previous + // bdp, increase to double the current sample. + if (bytes as f64) >= (self.bdp as f64) * 0.66 { + self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize; + trace!("BDP increased to {}", self.bdp); + Some(self.bdp) + } else { + None + } + } +} + +fn seconds(dur: Duration) -> f64 { + const NANOS_PER_SEC: f64 = 1_000_000_000.0; + let secs = dur.as_secs() as f64; + secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC +} diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 9b87121e87..0fc00d33ea 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -4,7 +4,7 @@ use futures_util::stream::StreamExt as _; use h2::client::{Builder, SendRequest}; use tokio::io::{AsyncRead, AsyncWrite}; -use super::{decode_content_length, PipeToSendStream, SendBuf}; +use super::{bdp, decode_content_length, PipeToSendStream, SendBuf}; use crate::body::Payload; use crate::common::{task, Exec, Future, Never, Pin, Poll}; use crate::headers; @@ -21,17 +21,43 @@ type ConnDropRef = mpsc::Sender; ///// the "dispatch" task will be notified and can shutdown sooner. type ConnEof = oneshot::Receiver; +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + } + } +} + pub(crate) async fn handshake( io: T, req_rx: ClientRx, - builder: &Builder, + config: &Config, exec: Exec, ) -> crate::Result> where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, B: Payload, { - let (h2_tx, conn) = builder + let (h2_tx, mut conn) = Builder::default() + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size) + .enable_push(false) .handshake::<_, SendBuf>(io) .await .map_err(crate::Error::new_h2)?; @@ -49,27 +75,34 @@ where } }); - let conn = conn.map_err(|e| debug!("connection error: {}", e)); + let sampler = if config.adaptive_window { + let (sampler, mut estimator) = + bdp::channel(conn.ping_pong().unwrap(), config.initial_stream_window_size); - let conn_task = async move { - match future::select(conn, conn_drop_rx).await { - Either::Left(_) => { - // ok or err, the `conn` has finished - } - Either::Right(((), conn)) => { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close - trace!("send_request dropped, starting conn shutdown"); - drop(cancel_tx); - let _ = conn.await; + let conn = future::poll_fn(move |cx| { + match estimator.poll_estimate(cx) { + Poll::Ready(wnd) => { + conn.set_target_window_size(wnd); + conn.set_initial_window_size(wnd)?; + } + Poll::Pending => {} } - } - }; - exec.execute(conn_task); + Pin::new(&mut conn).poll(cx) + }); + let conn = conn.map_err(|e| debug!("connection error: {}", e)); + + exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); + sampler + } else { + let conn = conn.map_err(|e| debug!("connection error: {}", e)); + + exec.execute(conn_task(conn, conn_drop_rx, cancel_tx)); + bdp::disabled() + }; Ok(ClientTask { + bdp: sampler, conn_drop_ref, conn_eof, executor: exec, @@ -78,10 +111,31 @@ where }) } +async fn conn_task(conn: C, drop_rx: D, cancel_tx: oneshot::Sender) +where + C: Future + Unpin, + D: Future + Unpin, +{ + match future::select(conn, drop_rx).await { + Either::Left(_) => { + // ok or err, the `conn` has finished + } + Either::Right(((), conn)) => { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + drop(cancel_tx); + let _ = conn.await; + } + } +} + pub(crate) struct ClientTask where B: Payload, { + bdp: bdp::Sampler, conn_drop_ref: ConnDropRef, conn_eof: ConnEof, executor: Exec, @@ -156,10 +210,12 @@ where } } + let bdp = self.bdp.clone(); let fut = fut.map(move |result| match result { Ok(res) => { let content_length = decode_content_length(res.headers()); - let res = res.map(|stream| crate::Body::h2(stream, content_length)); + let res = + res.map(|stream| crate::Body::h2(stream, content_length, bdp)); Ok(res) } Err(err) => { diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index eab57be297..80d52349c7 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -12,12 +12,16 @@ use crate::body::Payload; use crate::common::{task, Future, Pin, Poll}; use crate::headers::content_length_parse_all; +pub(crate) mod bdp; pub(crate) mod client; pub(crate) mod server; pub(crate) use self::client::ClientTask; pub(crate) use self::server::Server; +/// Default initial stream window size defined in HTTP2 spec. +pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535; + fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { // List of connection headers from: // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 84689b3d46..b8d1afb925 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -1,12 +1,12 @@ use std::error::Error as StdError; use std::marker::Unpin; -use h2::server::{Builder, Connection, Handshake, SendResponse}; +use h2::server::{Connection, Handshake, SendResponse}; use h2::Reason; use pin_project::{pin_project, project}; use tokio::io::{AsyncRead, AsyncWrite}; -use super::{decode_content_length, PipeToSendStream, SendBuf}; +use super::{bdp, decode_content_length, PipeToSendStream, SendBuf}; use crate::body::Payload; use crate::common::exec::H2Exec; use crate::common::{task, Future, Pin, Poll}; @@ -16,6 +16,34 @@ use crate::service::HttpService; use crate::{Body, Response}; +// Our defaults are chosen for the "majority" case, which usually are not +// resource constrained, and so the spec default of 64kb can be too limiting +// for performance. +// +// At the same time, a server more often has multiple clients connected, and +// so is more likely to use more resources than a client would. +const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb +const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb + +#[derive(Clone, Debug)] +pub(crate) struct Config { + pub(crate) adaptive_window: bool, + pub(crate) initial_conn_window_size: u32, + pub(crate) initial_stream_window_size: u32, + pub(crate) max_concurrent_streams: Option, +} + +impl Default for Config { + fn default() -> Config { + Config { + adaptive_window: false, + initial_conn_window_size: DEFAULT_CONN_WINDOW, + initial_stream_window_size: DEFAULT_STREAM_WINDOW, + max_concurrent_streams: None, + } + } +} + #[pin_project] pub(crate) struct Server where @@ -31,7 +59,13 @@ enum State where B: Payload, { - Handshaking(Handshake>), + Handshaking { + /// If Some, bdp is enabled with the initial size. + /// + /// If None, bdp is disabled. + bdp_initial_size: Option, + hs: Handshake>, + }, Serving(Serving), Closed, } @@ -40,6 +74,7 @@ struct Serving where B: Payload, { + bdp: Option<(bdp::Sampler, bdp::Estimator)>, conn: Connection>, closing: Option, } @@ -52,11 +87,28 @@ where B: Payload, E: H2Exec, { - pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server { + pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server { + let mut builder = h2::server::Builder::default(); + builder + .initial_window_size(config.initial_stream_window_size) + .initial_connection_window_size(config.initial_conn_window_size); + if let Some(max) = config.max_concurrent_streams { + builder.max_concurrent_streams(max); + } let handshake = builder.handshake(io); + + let bdp = if config.adaptive_window { + Some(config.initial_stream_window_size) + } else { + None + }; + Server { exec, - state: State::Handshaking(handshake), + state: State::Handshaking { + bdp_initial_size: bdp, + hs: handshake, + }, service, } } @@ -64,7 +116,7 @@ where pub fn graceful_shutdown(&mut self) { trace!("graceful_shutdown"); match self.state { - State::Handshaking(..) => { + State::Handshaking { .. } => { // fall-through, to replace state with Closed } State::Serving(ref mut srv) => { @@ -95,9 +147,15 @@ where let me = &mut *self; loop { let next = match me.state { - State::Handshaking(ref mut h) => { - let conn = ready!(Pin::new(h).poll(cx).map_err(crate::Error::new_h2))?; + State::Handshaking { + ref mut hs, + ref bdp_initial_size, + } => { + let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?; + let bdp = bdp_initial_size + .map(|wnd| bdp::channel(conn.ping_pong().expect("ping_pong"), wnd)); State::Serving(Serving { + bdp, conn, closing: None, }) @@ -135,7 +193,12 @@ where { if self.closing.is_none() { loop { - // At first, polls the readiness of supplied service. + self.poll_bdp(cx); + + // Check that the service is ready to accept a new request. + // + // - If not, just drive the connection some. + // - If ready, try to accept a new request from the connection. match service.poll_ready(cx) { Poll::Ready(Ok(())) => (), Poll::Pending => { @@ -168,7 +231,14 @@ where Some(Ok((req, respond))) => { trace!("incoming request"); let content_length = decode_content_length(req.headers()); - let req = req.map(|stream| crate::Body::h2(stream, content_length)); + let bdp_sampler = self + .bdp + .as_ref() + .map(|bdp| bdp.0.clone()) + .unwrap_or_else(bdp::disabled); + + let req = + req.map(|stream| crate::Body::h2(stream, content_length, bdp_sampler)); let fut = H2Stream::new(service.call(req), respond); exec.execute_h2stream(fut); } @@ -193,6 +263,18 @@ where Poll::Ready(Err(self.closing.take().expect("polled after error"))) } + + fn poll_bdp(&mut self, cx: &mut task::Context<'_>) { + if let Some((_, ref mut estimator)) = self.bdp { + match estimator.poll_estimate(cx) { + Poll::Ready(wnd) => { + self.conn.set_target_window_size(wnd); + let _ = self.conn.set_initial_window_size(wnd); + } + Poll::Pending => {} + } + } + } } #[allow(missing_debug_implementations)] diff --git a/src/server/conn.rs b/src/server/conn.rs index 9e7cec9ff1..889cb6b206 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -36,15 +36,6 @@ pub(super) use self::upgrades::UpgradeableConnection; #[cfg(feature = "tcp")] pub use super::tcp::{AddrIncoming, AddrStream}; -// Our defaults are chosen for the "majority" case, which usually are not -// resource constrained, and so the spec default of 64kb can be too limiting -// for performance. -// -// At the same time, a server more often has multiple clients connected, and -// so is more likely to use more resources than a client would. -const DEFAULT_HTTP2_CONN_WINDOW: u32 = 1024 * 1024; // 1mb -const DEFAULT_HTTP2_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb - /// A lower-level configuration of the HTTP protocol. /// /// This structure is used to configure options for an HTTP server connection. @@ -56,7 +47,7 @@ pub struct Http { exec: E, h1_half_close: bool, h1_writev: bool, - h2_builder: h2::server::Builder, + h2_builder: proto::h2::server::Config, mode: ConnectionMode, keep_alive: bool, max_buf_size: Option, @@ -145,7 +136,7 @@ where #[derive(Clone, Debug)] enum Fallback { - ToHttp2(h2::server::Builder, E), + ToHttp2(proto::h2::server::Config, E), Http1Only, } @@ -188,16 +179,11 @@ impl Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. pub fn new() -> Http { - let mut h2_builder = h2::server::Builder::default(); - h2_builder - .initial_window_size(DEFAULT_HTTP2_STREAM_WINDOW) - .initial_connection_window_size(DEFAULT_HTTP2_CONN_WINDOW); - Http { exec: Exec::Default, h1_half_close: false, h1_writev: true, - h2_builder, + h2_builder: Default::default(), mode: ConnectionMode::Fallback, keep_alive: true, max_buf_size: None, @@ -268,7 +254,8 @@ impl Http { /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_stream_window_size = sz; } self } @@ -283,7 +270,24 @@ impl Http { sz: impl Into>, ) -> &mut Self { if let Some(sz) = sz.into() { - self.h2_builder.initial_connection_window_size(sz); + self.h2_builder.adaptive_window = false; + self.h2_builder.initial_conn_window_size = sz; + } + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + use proto::h2::SPEC_WINDOW_SIZE; + + self.h2_builder.adaptive_window = enabled; + if enabled { + self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE; + self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE; } self } @@ -295,9 +299,7 @@ impl Http { /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - if let Some(max) = max.into() { - self.h2_builder.max_concurrent_streams(max); - } + self.h2_builder.max_concurrent_streams = max.into(); self } diff --git a/src/server/mod.rs b/src/server/mod.rs index 6479278764..c6a16a211b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -322,6 +322,16 @@ impl Builder { self } + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + pub fn http2_adaptive_window(mut self, enabled: bool) -> Self { + self.protocol.http2_adaptive_window(enabled); + self + } + /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 /// connections. ///